You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/01 04:49:03 UTC

[incubator-doris] branch branch-0.15 updated (26a81f2 -> 4719771)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git.


    from 26a81f2  [Optimize] optimize some session variable and profile (#6920)
     new 36ae508  Modify Chinese comment (#6951)
     new a946895  [Docs] Fix error KEY url (#6955)
     new b58ae3e  support use char like \x01 in flink-doris-sink column & line delimiter (#6937)
     new 1c8e962  Fix spark connector build error (#6948)
     new 162b9dc  [Bug] Fix failure to stop sync job (#6950)
     new 5bec413  [S3] Support path style endpoint (#6962)
     new 4719771  [Enhance][Load] Reduce the number of segments when loading a large volume data in one batch (#6947)

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/common/config.h                             |  4 +-
 be/src/exec/tablet_sink.cpp                        |  6 +-
 be/src/olap/delta_writer.cpp                       |  4 +-
 be/src/olap/memtable.cpp                           |  5 ++
 be/src/olap/memtable.h                             |  4 ++
 be/src/olap/olap_common.h                          | 19 ++---
 be/src/olap/olap_cond.cpp                          | 38 +++++-----
 be/src/olap/out_stream.cpp                         | 35 +++++----
 be/src/olap/out_stream.h                           | 56 +++++++--------
 be/src/olap/schema_change.cpp                      | 84 +++++++++++-----------
 be/src/runtime/load_channel.cpp                    |  5 +-
 be/src/runtime/tablets_channel.cpp                 | 53 +++++++++++---
 be/src/runtime/tablets_channel.h                   |  4 +-
 be/src/service/internal_service.cpp                |  3 +-
 be/src/util/s3_util.cpp                            | 11 ++-
 be/test/util/s3_storage_backend_test.cpp           | 17 +++--
 .../load-data/s3-load-manual.md                    | 21 +++++-
 docs/en/best-practices/star-schema-benchmark.md    | 12 ++--
 docs/en/downloads/downloads.md                     |  2 +-
 docs/en/extending-doris/flink-doris-connector.md   |  2 +-
 .../load-data/s3-load-manual.md                    | 21 +++++-
 docs/zh-CN/best-practices/star-schema-benchmark.md | 16 +++--
 docs/zh-CN/downloads/downloads.md                  |  2 +-
 .../zh-CN/extending-doris/flink-doris-connector.md |  2 +-
 .../doris/flink/rest/models/RespContent.java       |  4 ++
 .../flink/table/DorisDynamicOutputFormat.java      | 44 ++++++++++--
 .../apache/doris/flink/table/DorisStreamLoad.java  |  3 +-
 extension/spark-doris-connector/pom.xml            |  2 +
 fe/fe-core/pom.xml                                 |  2 +-
 .../org/apache/doris/analysis/OutFileClause.java   |  3 +-
 .../java/org/apache/doris/backup/BlobStorage.java  |  1 -
 .../java/org/apache/doris/backup/Repository.java   | 14 ++--
 .../java/org/apache/doris/backup/S3Storage.java    | 59 ++++++++++-----
 .../org/apache/doris/common/util/BrokerUtil.java   | 15 ++--
 .../java/org/apache/doris/common/util/S3URI.java   | 68 ++++++++++++++----
 .../doris/httpv2/rest/manager/NodeAction.java      | 10 +--
 .../org/apache/doris/load/sync/SyncLifeCycle.java  |  2 +-
 .../org/apache/doris/backup/S3StorageTest.java     | 10 +--
 .../org/apache/doris/common/util/S3URITest.java    | 38 +++++-----
 .../doris/planner/StreamLoadScanNodeTest.java      | 78 ++++++++++++++++++--
 fe/pom.xml                                         |  2 +-
 gensrc/proto/internal_service.proto                |  2 +
 42 files changed, 526 insertions(+), 257 deletions(-)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 06/07: [S3] Support path style endpoint (#6962)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 5bec4132ffbec0d038bfc40b47803d07b554107b
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Mon Nov 1 10:48:10 2021 +0800

    [S3] Support path style endpoint (#6962)
    
    Add a use_path_style property for S3
    Upgrade hadoop-common and hadoop-aws to 2.8.0 to support path style property
    Fix some S3 URI bugs
    Add some logs for tracing load process.
---
 be/src/exec/tablet_sink.cpp                        |  6 +-
 be/src/olap/delta_writer.cpp                       |  4 +-
 be/src/runtime/load_channel.cpp                    |  3 +-
 be/src/runtime/tablets_channel.cpp                 |  5 +-
 be/src/runtime/tablets_channel.h                   |  2 +-
 be/src/service/internal_service.cpp                |  3 +-
 be/src/util/s3_util.cpp                            | 11 ++-
 be/test/util/s3_storage_backend_test.cpp           | 17 +++--
 .../load-data/s3-load-manual.md                    | 21 +++++-
 .../load-data/s3-load-manual.md                    | 21 +++++-
 fe/fe-core/pom.xml                                 |  2 +-
 .../java/org/apache/doris/backup/BlobStorage.java  |  1 -
 .../java/org/apache/doris/backup/Repository.java   | 14 ++--
 .../java/org/apache/doris/backup/S3Storage.java    | 59 +++++++++++-----
 .../org/apache/doris/common/util/BrokerUtil.java   | 15 +++--
 .../java/org/apache/doris/common/util/S3URI.java   | 68 +++++++++++++++----
 .../doris/httpv2/rest/manager/NodeAction.java      | 10 +--
 .../org/apache/doris/backup/S3StorageTest.java     | 10 +--
 .../org/apache/doris/common/util/S3URITest.java    | 38 ++++++-----
 .../doris/planner/StreamLoadScanNodeTest.java      | 78 ++++++++++++++++++++--
 fe/pom.xml                                         |  2 +-
 gensrc/proto/internal_service.proto                |  2 +
 22 files changed, 288 insertions(+), 104 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index c5c02d2..b818918 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -86,6 +86,7 @@ Status NodeChannel::init(RuntimeState* state) {
     _cur_add_batch_request.set_allocated_id(&_parent->_load_id);
     _cur_add_batch_request.set_index_id(_index_id);
     _cur_add_batch_request.set_sender_id(_parent->_sender_id);
+    _cur_add_batch_request.set_backend_id(_node_id);
     _cur_add_batch_request.set_eos(false);
 
     _rpc_timeout_ms = state->query_options().query_timeout * 1000;
@@ -93,7 +94,7 @@ Status NodeChannel::init(RuntimeState* state) {
 
     _load_info = "load_id=" + print_id(_parent->_load_id) +
                  ", txn_id=" + std::to_string(_parent->_txn_id);
-    _name = "NodeChannel[" + std::to_string(_index_id) + "-" + std::to_string(_node_id) + "]";
+    _name = fmt::format("NodeChannel[{}-{}]", _index_id, _node_id);
     return Status::OK();
 }
 
@@ -282,7 +283,8 @@ Status NodeChannel::close_wait(RuntimeState* state) {
         SleepFor(MonoDelta::FromMilliseconds(1));
     }
     timer.stop();
-    VLOG_CRITICAL << name() << " close_wait cost: " << timer.elapsed_time() / 1000000 << " ms";
+    VLOG_CRITICAL << name() << " close_wait cost: " << timer.elapsed_time() / 1000000 << " ms"
+                  << ", " << _load_info;
 
     if (_add_batches_finished) {
         {
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 3868e88..afa014b 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -321,7 +321,9 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
     _delta_written_success = true;
 
     const FlushStatistic& stat = _flush_token->get_stats();
-    VLOG_CRITICAL << "close delta writer for tablet: " << _tablet->tablet_id() << ", stats: " << stat;
+    VLOG_CRITICAL << "close delta writer for tablet: " << _tablet->tablet_id() 
+                  << ", load id: " << print_id(_req.load_id)
+                  << ", stats: " << stat;
     return OLAP_SUCCESS;
 }
 
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 43f8a88..01e8d66 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -94,7 +94,8 @@ Status LoadChannel::add_batch(const PTabletWriterAddBatchRequest& request,
     Status st;
     if (request.has_eos() && request.eos()) {
         bool finished = false;
-        RETURN_IF_ERROR(channel->close(request.sender_id(), &finished, request.partition_ids(),
+        RETURN_IF_ERROR(channel->close(request.sender_id(), request.backend_id(), 
+                                       &finished, request.partition_ids(),
                                        tablet_vec));
         if (finished) {
             std::lock_guard<std::mutex> l(_lock);
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index 531a3bd..db5e23b 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -126,7 +126,7 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) {
     return Status::OK();
 }
 
-Status TabletsChannel::close(int sender_id, bool* finished,
+Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
                              const google::protobuf::RepeatedField<int64_t>& partition_ids,
                              google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
     std::lock_guard<std::mutex> l(_lock);
@@ -138,7 +138,8 @@ Status TabletsChannel::close(int sender_id, bool* finished,
         *finished = (_num_remaining_senders == 0);
         return _close_status;
     }
-    LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id;
+    LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id
+              << ", backend id: " << backend_id;
     for (auto pid : partition_ids) {
         _partition_ids.emplace(pid);
     }
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 0f01512..b0402bd 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -67,7 +67,7 @@ public:
     // If all senders are closed, close this channel, set '*finished' to true, update 'tablet_vec'
     // to include all tablets written in this channel.
     // no-op when this channel has been closed or cancelled
-    Status close(int sender_id, bool* finished,
+    Status close(int sender_id, int64_t backend_id, bool* finished,
                  const google::protobuf::RepeatedField<int64_t>& partition_ids,
                  google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec);
 
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 35e94a9..93dc6b1 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -123,7 +123,8 @@ void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
             if (!st.ok()) {
                 LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg()
                              << ", id=" << request->id() << ", index_id=" << request->index_id()
-                             << ", sender_id=" << request->sender_id();
+                             << ", sender_id=" << request->sender_id()
+                             << ", backend id=" << request->backend_id();
             }
             st.to_protobuf(response->mutable_status());
         }
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index 364f580..160b5f4 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -30,6 +30,7 @@ const static std::string S3_AK = "AWS_ACCESS_KEY";
 const static std::string S3_SK = "AWS_SECRET_KEY";
 const static std::string S3_ENDPOINT = "AWS_ENDPOINT";
 const static std::string S3_REGION = "AWS_REGION";
+const static std::string USE_PATH_STYLE = "use_path_style";
 
 ClientFactory::ClientFactory() {
     _aws_options = Aws::SDKOptions{};
@@ -67,7 +68,15 @@ std::shared_ptr<Aws::S3::S3Client> ClientFactory::create(
     Aws::Client::ClientConfiguration aws_config;
     aws_config.endpointOverride = properties.find(S3_ENDPOINT)->second;
     aws_config.region = properties.find(S3_REGION)->second;
-    return std::make_shared<Aws::S3::S3Client>(std::move(aws_cred), std::move(aws_config));
+
+    // See https://sdk.amazonaws.com/cpp/api/LATEST/class_aws_1_1_s3_1_1_s3_client.html
+    bool use_virtual_addressing = true;
+    if (properties.find(USE_PATH_STYLE) != properties.end()) {
+        use_virtual_addressing = properties.find(USE_PATH_STYLE)->second == "true" ? false : true;
+    }
+    return std::make_shared<Aws::S3::S3Client>(std::move(aws_cred), std::move(aws_config),
+            Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
+            use_virtual_addressing);
 }
 
 } // end namespace doris
diff --git a/be/test/util/s3_storage_backend_test.cpp b/be/test/util/s3_storage_backend_test.cpp
index a8ea878..3e8cc87 100644
--- a/be/test/util/s3_storage_backend_test.cpp
+++ b/be/test/util/s3_storage_backend_test.cpp
@@ -33,18 +33,20 @@
 #include "util/storage_backend.h"
 
 namespace doris {
-static const std::string AK = "AK";
-static const std::string SK = "SK";
+static const std::string AK = "";
+static const std::string SK = "";
 static const std::string ENDPOINT = "http://s3.bj.bcebos.com";
+static const std::string USE_PATH_STYLE = "false";
 static const std::string REGION = "bj";
-static const std::string BUCKET = "s3://yang-repo/";
+static const std::string BUCKET = "s3://cmy-repo/";
 class S3StorageBackendTest : public testing::Test {
 public:
     S3StorageBackendTest()
             : _aws_properties({{"AWS_ACCESS_KEY", AK},
                                {"AWS_SECRET_KEY", SK},
                                {"AWS_ENDPOINT", ENDPOINT},
-                               {"AWS_REGION", "bj"}}) {
+                               {"USE_PATH_STYLE", USE_PATH_STYLE},
+                               {"AWS_REGION", REGION}}) {
         _s3.reset(new S3StorageBackend(_aws_properties));
         _s3_base_path = BUCKET + "s3/" + gen_uuid();
     }
@@ -189,10 +191,7 @@ TEST_F(S3StorageBackendTest, s3_mkdir) {
 int main(int argc, char** argv) {
     ::testing::InitGoogleTest(&argc, argv);
     int ret = 0;
-    Aws::SDKOptions options;
-    Aws::InitAPI(options);
-    // ak sk is secret
+    // set ak sk before running it.
     // ret = RUN_ALL_TESTS();
-    Aws::ShutdownAPI(options);
     return ret;
-}
\ No newline at end of file
+}
diff --git a/docs/en/administrator-guide/load-data/s3-load-manual.md b/docs/en/administrator-guide/load-data/s3-load-manual.md
index 7e3aa28..b9c2b2a 100644
--- a/docs/en/administrator-guide/load-data/s3-load-manual.md
+++ b/docs/en/administrator-guide/load-data/s3-load-manual.md
@@ -48,12 +48,12 @@ Other cloud storage systems can find relevant information compatible with S3 in
 Like Broker Load just replace `WITH BROKER broker_name ()` with
 ```
     WITH S3
- (
+    (
         "AWS_ENDPOINT" = "AWS_ENDPOINT",
         "AWS_ACCESS_KEY" = "AWS_ACCESS_KEY",
         "AWS_SECRET_KEY"="AWS_SECRET_KEY",
         "AWS_REGION" = "AWS_REGION"
-  )
+    )
 ```
 
 example:
@@ -75,4 +75,19 @@ example:
     (
         "timeout" = "3600"
     );
-```
\ No newline at end of file
+```
+
+## FAQ
+
+S3 SDK uses virtual-hosted style by default. However, some object storage systems may not be enabled or support virtual-hosted style access. At this time, we can add the `use_path_style` parameter to force the use of path style:
+
+```
+   WITH S3
+   (
+         "AWS_ENDPOINT" = "AWS_ENDPOINT",
+         "AWS_ACCESS_KEY" = "AWS_ACCESS_KEY",
+         "AWS_SECRET_KEY"="AWS_SECRET_KEY",
+         "AWS_REGION" = "AWS_REGION",
+         "use_path_style" = "true"
+   )
+```
diff --git a/docs/zh-CN/administrator-guide/load-data/s3-load-manual.md b/docs/zh-CN/administrator-guide/load-data/s3-load-manual.md
index 4bf52ce..3c9b6c5 100644
--- a/docs/zh-CN/administrator-guide/load-data/s3-load-manual.md
+++ b/docs/zh-CN/administrator-guide/load-data/s3-load-manual.md
@@ -49,12 +49,12 @@ under the License.
 导入方式和Broker Load 基本相同,只需要将 `WITH BROKER broker_name ()` 语句替换成如下部分
 ```
     WITH S3
- (
+    (
         "AWS_ENDPOINT" = "AWS_ENDPOINT",
         "AWS_ACCESS_KEY" = "AWS_ACCESS_KEY",
         "AWS_SECRET_KEY"="AWS_SECRET_KEY",
         "AWS_REGION" = "AWS_REGION"
-  )
+    )
 ```
 
 完整示例如下
@@ -76,4 +76,19 @@ under the License.
     (
         "timeout" = "3600"
     );
-```
\ No newline at end of file
+```
+
+## 常见问题
+
+S3 SDK 默认使用 virtual-hosted style 方式。但某些对象存储系统可能没开启或没支持 virtual-hosted style 方式的访问,此时我们可以添加 `use_path_style` 参数来强制使用 path style 方式:
+
+```
+  WITH S3
+  (
+        "AWS_ENDPOINT" = "AWS_ENDPOINT",
+        "AWS_ACCESS_KEY" = "AWS_ACCESS_KEY",
+        "AWS_SECRET_KEY"="AWS_SECRET_KEY",
+        "AWS_REGION" = "AWS_REGION",
+        "use_path_style" = "true"
+  )
+```
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 1ebbba5..e40f29a 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -514,7 +514,7 @@ under the License.
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-aws</artifactId>
-            <version>2.7.3</version>
+            <version>2.8.0</version>
             <exclusions>
                 <exclusion>
                     <groupId>org.slf4j</groupId>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java
index 10fb1d4..d5b98fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java
@@ -134,5 +134,4 @@ public abstract class BlobStorage implements Writable {
             Text.writeString(out, entry.getValue());
         }
     }
-
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
index 3a2623d..df8230c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
@@ -29,16 +29,16 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.system.Backend;
 
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.json.JSONObject;
-
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.json.JSONObject;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.File;
@@ -315,7 +315,9 @@ public class Repository implements Writable {
     // Check if this repo is available.
     // If failed to connect this repo, set errMsg and return false.
     public boolean ping() {
-        String path = location + "/" + joinPrefix(PREFIX_REPO, name);
+        // for s3 sdk, the headObject() method does not support list "dir",
+        // so we check FILE_REPO_INFO instead.
+        String path = location + "/" + joinPrefix(PREFIX_REPO, name) + "/" + FILE_REPO_INFO;
         try {
             URI checkUri = new URI(path);
             Status st = storage.checkPathExist(checkUri.normalize().toString());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java b/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java
index 2032229..9a2f29b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java
@@ -71,9 +71,19 @@ public class S3Storage extends BlobStorage {
     public static final String S3_SK = "AWS_SECRET_KEY";
     public static final String S3_ENDPOINT = "AWS_ENDPOINT";
     public static final String S3_REGION = "AWS_REGION";
+    public static final String USE_PATH_STYLE = "use_path_style";
+
     private static final Logger LOG = LogManager.getLogger(S3Storage.class);
     private final CaseInsensitiveMap caseInsensitiveProperties;
     private S3Client client;
+    // false: the s3 client will automatically convert endpoint to virtual-hosted style, eg:
+    //          endpoint:           http://s3.us-east-2.amazonaws.com
+    //          bucket/path:        my_bucket/file.txt
+    //          auto convert:       http://my_bucket.s3.us-east-2.amazonaws.com/file.txt
+    // true: the s3 client will NOT automatically convert endpoint to virtual-hosted style, we need to do some tricks:
+    //          endpoint:           http://cos.ap-beijing.myqcloud.com
+    //          bucket/path:        my_bucket/file.txt
+    //          convert manually:   See S3URI()
     private boolean forceHostedStyle = false;
 
     public S3Storage(Map<String, String> properties) {
@@ -89,21 +99,31 @@ public class S3Storage extends BlobStorage {
         super.setProperties(properties);
         caseInsensitiveProperties.putAll(properties);
         // Virtual hosted-sytle is recommended in the s3 protocol.
-        // The path-style has been abandoned, but for some unexplainable reasons.
-        // The s3 client will determine whether the endpiont starts with `s3`
+        // The path-style has been abandoned, but for some unexplainable reasons,
+        // the s3 client will determine whether the endpiont starts with `s3`
         // when generating a virtual hosted-sytle request.
         // If not, it will not be converted ( https://github.com/aws/aws-sdk-java-v2/pull/763),
         // but the endpoints of many cloud service providers for object storage do not start with s3,
         // so they cannot be converted to virtual hosted-sytle.
-        // Some of them, such as aliyun's oss, only support virtual hosted-sytle,
-        // so we need to do some additional conversion.
-
+        // Some of them, such as aliyun's oss, only support virtual hosted-sytle, and some of them(ceph) may only support
+        // path-style, so we need to do some additional conversion.
+        //
+        //          use_path_style          |     !use_path_style
+        //   S3     forceHostedStyle=false  |     forceHostedStyle=false
+        //  !S3     forceHostedStyle=false  |     forceHostedStyle=true
+        //
+        // That is, for S3 endpoint, ignore the `use_path_style` property, and the s3 client will automatically use
+        // virtual hosted-sytle.
+        // And for other endpoint, if `use_path_style` is true, use path style. Otherwise, use virtual hosted-sytle.
         if (!caseInsensitiveProperties.get(S3_ENDPOINT).toString().toLowerCase().startsWith("s3")) {
-            forceHostedStyle = true;
+            if (caseInsensitiveProperties.getOrDefault(USE_PATH_STYLE, "false").toString().equalsIgnoreCase("true")) {
+                forceHostedStyle = false;
+            } else {
+                forceHostedStyle = true;
+            }
         } else {
             forceHostedStyle = false;
         }
-
     }
 
     public static void checkS3(CaseInsensitiveMap caseInsensitiveProperties) throws UserException {
@@ -168,7 +188,6 @@ public class S3Storage extends BlobStorage {
     @Override
     public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) {
         long start = System.currentTimeMillis();
-        S3URI uri = new S3URI(remoteFilePath, forceHostedStyle);
         // Write the data to a local file
         File localFile = new File(localFilePath);
         if (localFile.exists()) {
@@ -183,6 +202,7 @@ public class S3Storage extends BlobStorage {
             }
         }
         try {
+            S3URI uri = S3URI.create(remoteFilePath, forceHostedStyle);
             GetObjectResponse response = getClient(uri.getVirtualBucket()).getObject(GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), localFile.toPath());
             if (localFile.length() == fileSize) {
                 LOG.info(
@@ -200,7 +220,7 @@ public class S3Storage extends BlobStorage {
                     Status.ErrCode.COMMON_ERROR,
                     "get file from s3 error: " + s3Exception.awsErrorDetails().errorMessage());
         } catch (UserException ue) {
-            LOG.error("connect to s3 failed: ", ue);
+            LOG.warn("connect to s3 failed: ", ue);
             return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
         } catch (Exception e) {
             return new Status(Status.ErrCode.COMMON_ERROR, e.toString());
@@ -209,8 +229,8 @@ public class S3Storage extends BlobStorage {
 
     @Override
     public Status directUpload(String content, String remoteFile) {
-        S3URI uri = new S3URI(remoteFile, forceHostedStyle);
         try {
+            S3URI uri = S3URI.create(remoteFile, forceHostedStyle);
             PutObjectResponse response =
                     getClient(uri.getVirtualBucket())
                             .putObject(
@@ -228,9 +248,9 @@ public class S3Storage extends BlobStorage {
     }
 
     public Status copy(String origFilePath, String destFilePath) {
-        S3URI origUri = new S3URI(origFilePath);
-        S3URI descUri = new S3URI(destFilePath, forceHostedStyle);
         try {
+            S3URI origUri = S3URI.create(origFilePath);
+            S3URI descUri = S3URI.create(destFilePath, forceHostedStyle);
             getClient(descUri.getVirtualBucket())
                     .copyObject(
                             CopyObjectRequest.builder()
@@ -250,8 +270,8 @@ public class S3Storage extends BlobStorage {
 
     @Override
     public Status upload(String localPath, String remotePath) {
-        S3URI uri = new S3URI(remotePath, forceHostedStyle);
         try {
+            S3URI uri = S3URI.create(remotePath, forceHostedStyle);
             PutObjectResponse response =
                     getClient(uri.getVirtualBucket())
                             .putObject(
@@ -280,8 +300,8 @@ public class S3Storage extends BlobStorage {
 
     @Override
     public Status delete(String remotePath) {
-        S3URI uri = new S3URI(remotePath, forceHostedStyle);
         try {
+            S3URI uri = S3URI.create(remotePath, forceHostedStyle);
             DeleteObjectResponse response =
                     getClient(uri.getVirtualBucket())
                             .deleteObject(
@@ -319,6 +339,9 @@ public class S3Storage extends BlobStorage {
             conf.set("fs.s3a.endpoint", s3Endpoint);
             conf.set("fs.s3a.impl.disable.cache", "true");
             conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+            // introducing in hadoop aws 2.8.0
+            conf.set("fs.s3a.path.style.access", forceHostedStyle ? "false" : "true");
+            conf.set("fs.s3a.attempts.maximum", "2");
             FileSystem s3AFileSystem = FileSystem.get(new URI(remotePath), conf);
             org.apache.hadoop.fs.Path pathPattern = new org.apache.hadoop.fs.Path(remotePath);
             FileStatus[] files = s3AFileSystem.globStatus(pathPattern);
@@ -344,8 +367,8 @@ public class S3Storage extends BlobStorage {
         if (!remotePath.endsWith("/")) {
             remotePath += "/";
         }
-        S3URI uri = new S3URI(remotePath, forceHostedStyle);
         try {
+            S3URI uri = S3URI.create(remotePath, forceHostedStyle);
             PutObjectResponse response =
                     getClient(uri.getVirtualBucket())
                             .putObject(
@@ -364,8 +387,8 @@ public class S3Storage extends BlobStorage {
 
     @Override
     public Status checkPathExist(String remotePath) {
-        S3URI uri = new S3URI(remotePath, forceHostedStyle);
         try {
+            S3URI uri = S3URI.create(remotePath, forceHostedStyle);
             getClient(uri.getVirtualBucket())
                     .headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
             return Status.OK;
@@ -373,11 +396,11 @@ public class S3Storage extends BlobStorage {
             if (e.statusCode() == HttpStatus.SC_NOT_FOUND) {
                 return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath);
             } else {
-                LOG.error("headObject failed:", e);
+                LOG.warn("headObject failed:", e);
                 return new Status(Status.ErrCode.COMMON_ERROR, "headObject failed: " + e.getMessage());
             }
         } catch (UserException ue) {
-            LOG.error("connect to s3 failed: ", ue);
+            LOG.warn("connect to s3 failed: ", ue);
             return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index 5295f96..d823a21 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -147,7 +147,7 @@ public class BrokerUtil {
                     fileStatuses.add(tBrokerFileStatus);
                 }
             } catch (TException e) {
-                LOG.warn("Broker list path exception, path={}, address={}, exception={}", path, address, e);
+                LOG.warn("Broker list path exception, path={}, address={}", path, address, e);
                 throw new UserException("Broker list path exception. path=" + path + ", broker=" + address);
             } finally {
                 returnClient(client, address, failed);
@@ -155,10 +155,15 @@ public class BrokerUtil {
         } else if (brokerDesc.getStorageType() == StorageBackend.StorageType.S3) {
             S3Storage s3 = new S3Storage(brokerDesc.getProperties());
             List<RemoteFile> rfiles = new ArrayList<>();
-            Status st = s3.list(path, rfiles, false);
-            if (!st.ok()) {
-                throw new UserException("S3 list path failed. path=" + path
-                    + ",msg=" + st.getErrMsg());
+            try {
+                Status st = s3.list(path, rfiles, false);
+                if (!st.ok()) {
+                    throw new UserException("S3 list path failed. path=" + path
+                            + ",msg=" + st.getErrMsg());
+                }
+            } catch (Exception e) {
+                LOG.warn("s3 list path exception, path={}", path, e);
+                throw new UserException("s3 list path exception. path=" + path + ", err: " + e.getMessage());
             }
             for (RemoteFile r : rfiles) {
                 if (r.isFile()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
index 0aadfe4..1cf1b80 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
@@ -17,11 +17,15 @@
 
 package org.apache.doris.common.util;
 
-import com.google.common.base.Preconditions;
+import org.apache.doris.common.UserException;
+
 import com.google.common.collect.ImmutableSet;
 
+import org.apache.parquet.Strings;
 import org.apache.parquet.glob.GlobExpander;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Set;
 
@@ -44,7 +48,7 @@ public class S3URI {
     private final String virtualBucket;
     private final String bucket;
     private final String key;
-    private boolean forceHosted;
+    private boolean forceVirtualHosted;
 
     /**
      * Creates a new S3URI based on the bucket and key parsed from the location as defined in:
@@ -55,27 +59,60 @@ public class S3URI {
      *
      * @param location fully qualified URI
      */
-    public S3URI(String location) {
-        this(location, false);
+
+    public static S3URI create(String location) throws UserException {
+        return create(location, false);
+    }
+
+    public static S3URI create(String location, boolean forceVirtualHosted) throws UserException {
+        S3URI s3URI = new S3URI(location, forceVirtualHosted);
+        return s3URI;
     }
 
-    public S3URI(String location, boolean forceHosted) {
-        Preconditions.checkNotNull(location, "Location cannot be null.");
-        this.location = location;
-        this.forceHosted = forceHosted;
-        String[] schemeSplit = location.split(SCHEME_DELIM);
-        Preconditions.checkState(schemeSplit.length == 2, "Invalid S3 URI: %s", location);
+    private S3URI(String location, boolean forceVirtualHosted) throws UserException {
+        if (Strings.isNullOrEmpty(location)) {
+            throw new UserException("s3 location can not be null");
+        }
+
+        try {
+            // the location need to be normalized to eliminate double "/", or the hadoop aws api
+            // won't handle it correctly.
+            this.location = new URI(location).normalize().toString();
+        } catch (URISyntaxException e) {
+            throw new UserException("Invalid s3 uri: " + e.getMessage());
+        }
+
+        this.forceVirtualHosted = forceVirtualHosted;
+        String[] schemeSplit = this.location.split(SCHEME_DELIM);
+        if (schemeSplit.length != 2) {
+            throw new UserException("Invalid s3 uri: " + this.location);
+        }
 
         this.scheme = schemeSplit[0];
-        Preconditions.checkState(VALID_SCHEMES.contains(scheme.toLowerCase()), "Invalid scheme: %s", scheme);
+        if (!VALID_SCHEMES.contains(scheme.toLowerCase())) {
+            throw new UserException("Invalid scheme: " + this.location);
+        }
+
         String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2);
-        Preconditions.checkState(authoritySplit.length == 2, "Invalid S3 URI: %s", location);
-        Preconditions.checkState(!authoritySplit[1].trim().isEmpty(), "Invalid S3 key: %s", location);
+        if (authoritySplit.length != 2) {
+            throw new UserException("Invalid s3 uri: " + this.location);
+        }
+        if (authoritySplit[1].trim().isEmpty()) {
+            throw new UserException("Invalid s3 key: " + this.location);
+        }
+
         // Strip query and fragment if they exist
         String path = authoritySplit[1];
         path = path.split(QUERY_DELIM)[0];
         path = path.split(FRAGMENT_DELIM)[0];
-        if (forceHosted) {
+        if (this.forceVirtualHosted) {
+            // If forceVirtualHosted is true, the s3 client will NOT automatically convert to virtual-hosted style.
+            // So we do some convert manually. Eg:
+            //          endpoint:           http://cos.ap-beijing.myqcloud.com
+            //          bucket/path:        my_bucket/file.txt
+            // `virtualBucket` will be "my_bucket"
+            // `bucket` will be `file.txt`
+            // So that when assembling the real endpoint will be: http://my_bucket.cos.ap-beijing.myqcloud.com/file.txt
             this.virtualBucket = authoritySplit[0];
             String[] paths = path.split("/", 2);
             this.bucket = paths[0];
@@ -85,6 +122,9 @@ public class S3URI {
                 key = "";
             }
         } else {
+            // If forceVirtualHosted is false, let the s3 client to determine how to covert endpoint, eg:
+            // For s3 endpoint(start with "s3."), it will convert to virtual-hosted style.
+            // For others, keep as it is (maybe path-style, maybe virtual-hosted style.)
             this.virtualBucket = "";
             this.bucket = authoritySplit[0];
             key = path;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
index fa05289..bd6ec87 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.httpv2.rest.manager;
 
-import lombok.Getter;
-import lombok.Setter;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
@@ -48,7 +46,6 @@ import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.gson.reflect.TypeToken;
 
-import org.apache.commons.httpclient.HttpException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -70,6 +67,9 @@ import java.util.stream.Collectors;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import lombok.Getter;
+import lombok.Setter;
+
 /*
  * Used to return all node information, configuration information and modify node config.
  */
@@ -540,10 +540,10 @@ public class NodeAction extends RestBaseController {
     }
 
     private void parseFeSetConfigResponse(String response, Pair<String, Integer> hostPort,
-                                          List<Map<String, String>> failedTotal) throws HttpException {
+                                          List<Map<String, String>> failedTotal) throws Exception {
         JsonObject jsonObject = JsonParser.parseString(response).getAsJsonObject();
         if (jsonObject.get("code").getAsInt() != HttpUtils.REQUEST_SUCCESS_CODE) {
-            throw new HttpException(jsonObject.get("msg").getAsString());
+            throw new Exception(jsonObject.get("msg").getAsString());
         }
         SetConfigAction.SetConfigEntity setConfigEntity = GsonUtils.GSON.fromJson(jsonObject.get("data").getAsJsonObject(),
                 SetConfigAction.SetConfigEntity.class);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java
index f52d3ef..be74035 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java
@@ -39,7 +39,7 @@ import java.util.UUID;
 @Ignore
 public class S3StorageTest {
     private static String basePath;
-    private final String bucket = "s3://yang-repo/";
+    private final String bucket = "s3://doris-test/";
     private Map<String, String> properties;
     private S3Storage storage;
     private String testFile;
@@ -56,6 +56,8 @@ public class S3StorageTest {
         properties.put("AWS_ACCESS_KEY", System.getenv().getOrDefault("AWS_AK", ""));
         properties.put("AWS_SECRET_KEY", System.getenv().getOrDefault("AWS_SK", ""));
         properties.put("AWS_ENDPOINT", "http://s3.bj.bcebos.com");
+        properties.put(S3Storage.USE_PATH_STYLE, "false");
+
         properties.put("AWS_REGION", "bj");
         storage = new S3Storage(properties);
         testFile = bucket + basePath + "/Ode_to_the_West_Wind";
@@ -123,7 +125,6 @@ public class S3StorageTest {
         storage.rename(testFile + ".bak", testFile + ".bak1");
         Assert.assertEquals(Status.ErrCode.NOT_FOUND, storage.checkPathExist(testFile + ".bak").getErrCode());
         Assert.assertEquals(Status.OK, storage.checkPathExist(testFile + ".bak1"));
-
     }
 
     @Test
@@ -133,17 +134,16 @@ public class S3StorageTest {
         Assert.assertEquals(Status.OK, storage.delete(deleteFile));
         Assert.assertEquals(Status.ErrCode.NOT_FOUND, storage.checkPathExist(deleteFile).getErrCode());
         Assert.assertEquals(Status.OK, storage.delete(deleteFile + "xxxx"));
-
     }
 
     @Test
     public void list() {
         List<RemoteFile> result = new ArrayList<>();
-        String listPath =  bucket + basePath + "_list" + "/Ode_to_the_West_Wind";
+        String listPath = bucket + basePath + "_list" + "/Ode_to_the_West_Wind";
         Assert.assertEquals(Status.OK, storage.directUpload(content, listPath + ".1"));
         Assert.assertEquals(Status.OK, storage.directUpload(content, listPath + ".2"));
         Assert.assertEquals(Status.OK, storage.directUpload(content, listPath + ".3"));
-        Assert.assertEquals(Status.OK, storage.list(bucket + basePath  + "_list/*", result));
+        Assert.assertEquals(Status.OK, storage.list(bucket + basePath + "_list/*", result));
         Assert.assertEquals(3, result.size());
     }
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/S3URITest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/S3URITest.java
index f310c5b..52ab836 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/S3URITest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/S3URITest.java
@@ -17,24 +17,26 @@
 
 package org.apache.doris.common.util;
 
-import org.junit.Test;
+import org.apache.doris.common.UserException;
 
 import org.junit.Assert;
+import org.junit.Test;
 
 public class S3URITest {
     @Test
-    public void testLocationParsing() {
+    public void testLocationParsing() throws UserException {
         String p1 = "s3://bucket/path/to/file";
-        S3URI uri1 = new S3URI(p1);
+        S3URI uri1 = S3URI.create(p1);
 
         Assert.assertEquals("bucket", uri1.getBucket());
         Assert.assertEquals("path/to/file", uri1.getKey());
         Assert.assertEquals(p1, uri1.toString());
     }
+
     @Test
-    public void testPathLocationParsing() {
+    public void testPathLocationParsing() throws UserException {
         String p1 = "s3://bucket/path/";
-        S3URI uri1 = new S3URI(p1);
+        S3URI uri1 = S3URI.create(p1);
 
         Assert.assertEquals("bucket", uri1.getBucket());
         Assert.assertEquals("path/", uri1.getKey());
@@ -42,34 +44,34 @@ public class S3URITest {
     }
 
     @Test
-    public void testEncodedString() {
+    public void testEncodedString() throws UserException {
         String p1 = "s3://bucket/path%20to%20file";
-        S3URI uri1 = new S3URI(p1);
+        S3URI uri1 = S3URI.create(p1);
 
         Assert.assertEquals("bucket", uri1.getBucket());
         Assert.assertEquals("path%20to%20file", uri1.getKey());
         Assert.assertEquals(p1, uri1.toString());
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void missingKey() {
-        new S3URI("https://bucket/");
+    @Test(expected = UserException.class)
+    public void missingKey() throws UserException {
+        S3URI.create("https://bucket/");
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void relativePathing() {
-        new S3URI("/path/to/file");
+    @Test(expected = UserException.class)
+    public void relativePathing() throws UserException {
+        S3URI.create("/path/to/file");
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void invalidScheme() {
-        new S3URI("ftp://bucket/");
+    @Test(expected = UserException.class)
+    public void invalidScheme() throws UserException {
+        S3URI.create("ftp://bucket/");
     }
 
     @Test
-    public void testQueryAndFragment() {
+    public void testQueryAndFragment() throws UserException {
         String p1 = "s3://bucket/path/to/file?query=foo#bar";
-        S3URI uri1 = new S3URI(p1);
+        S3URI uri1 = S3URI.create(p1);
 
         Assert.assertEquals("bucket", uri1.getBucket());
         Assert.assertEquals("path/to/file", uri1.getKey());
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
index 3a21275..e5f2fc5 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
@@ -20,9 +20,7 @@ package org.apache.doris.planner;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.CastExpr;
 import org.apache.doris.analysis.DescriptorTable;
-import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.FunctionName;
-import org.apache.doris.analysis.ImportColumnDesc;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.AggregateType;
@@ -35,13 +33,9 @@ import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarFunction;
 import org.apache.doris.catalog.ScalarType;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.catalog.Table.TableType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
-import org.apache.doris.load.Load;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.task.StreamLoadTask;
 import org.apache.doris.thrift.TExplainLevel;
@@ -502,6 +496,30 @@ public class StreamLoadScanNodeTest {
             }
         }
 
+        new Expectations() {{
+            dstTable.getBaseSchema();
+            minTimes = 0;
+            result = columns;
+            dstTable.getBaseSchema(anyBoolean);
+            minTimes = 0;
+            result = columns;
+            dstTable.getFullSchema();
+            minTimes = 0;
+            result = columns;
+            dstTable.getColumn("k1");
+            minTimes = 0;
+            result = columns.get(0);
+            dstTable.getColumn("k2");
+            minTimes = 0;
+            result = columns.get(1);
+            dstTable.getColumn("v1");
+            minTimes = 0;
+            result = columns.get(2);
+            dstTable.getColumn("v2");
+            minTimes = 0;
+            result = columns.get(3);
+        }};
+
         TStreamLoadPutRequest request = getBaseRequest();
         request.setColumns("k1,k2,v1, v2=k3");
         StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request);
@@ -629,6 +647,30 @@ public class StreamLoadScanNodeTest {
             }
         }
 
+        new Expectations() {{
+            dstTable.getBaseSchema();
+            minTimes = 0;
+            result = columns;
+            dstTable.getBaseSchema(anyBoolean);
+            minTimes = 0;
+            result = columns;
+            dstTable.getFullSchema();
+            minTimes = 0;
+            result = columns;
+            dstTable.getColumn("k1");
+            minTimes = 0;
+            result = columns.get(0);
+            dstTable.getColumn("k2");
+            minTimes = 0;
+            result = columns.get(1);
+            dstTable.getColumn("v1");
+            minTimes = 0;
+            result = columns.get(2);
+            dstTable.getColumn("v2");
+            minTimes = 0;
+            result = columns.get(3);
+        }};
+
         TStreamLoadPutRequest request = getBaseRequest();
         request.setColumns("k1,k2,v1, v2=k1");
         request.setWhere("k5 = 1");
@@ -659,6 +701,30 @@ public class StreamLoadScanNodeTest {
             }
         }
 
+        new Expectations() {{
+            dstTable.getBaseSchema();
+            minTimes = 0;
+            result = columns;
+            dstTable.getBaseSchema(anyBoolean);
+            minTimes = 0;
+            result = columns;
+            dstTable.getFullSchema();
+            minTimes = 0;
+            result = columns;
+            dstTable.getColumn("k1");
+            minTimes = 0;
+            result = columns.get(0);
+            dstTable.getColumn("k2");
+            minTimes = 0;
+            result = columns.get(1);
+            dstTable.getColumn("v1");
+            minTimes = 0;
+            result = columns.get(2);
+            dstTable.getColumn("v2");
+            minTimes = 0;
+            result = columns.get(3);
+        }};
+
         TStreamLoadPutRequest request = getBaseRequest();
         request.setColumns("k1,k2,v1, v2=k1");
         request.setWhere("k1 + v2");
diff --git a/fe/pom.xml b/fe/pom.xml
index bf8b049..453a444 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -590,7 +590,7 @@ under the License.
             <dependency>
                 <groupId>org.apache.hadoop</groupId>
                 <artifactId>hadoop-common</artifactId>
-                <version>2.7.3</version>
+                <version>2.8.0</version>
                 <scope>provided</scope>
                 <exclusions>
                     <exclusion>
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index 5ecbb69..2e07c46 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -94,6 +94,8 @@ message PTabletWriterAddBatchRequest {
     // only valid when eos is true
     // valid partition ids that would write in this writer
     repeated int64 partition_ids = 8;
+    // the backend which send this request
+    optional int64 backend_id = 9 [default = -1];
 };
 
 message PTabletWriterAddBatchResult {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 05/07: [Bug] Fix failure to stop sync job (#6950)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 162b9dc829b6ed4217868d5f6086868d9dc22b32
Author: xy720 <22...@users.noreply.github.com>
AuthorDate: Sat Oct 30 18:17:15 2021 +0800

    [Bug] Fix failure to stop sync job (#6950)
---
 fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java
index 8d98c14..227f175 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java
@@ -55,7 +55,7 @@ public abstract class SyncLifeCycle {
     }
 
     public void stop() {
-        if (isStart()) {
+        if (!isStart()) {
             // Repeated stops are considered successful
             return;
         }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 04/07: Fix spark connector build error (#6948)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 1c8e9629e82f78daf95308c241d56632ea4a7fa5
Author: jiafeng.zhang <zh...@gmail.com>
AuthorDate: Fri Oct 29 14:59:05 2021 +0800

    Fix spark connector build error (#6948)
    
    pom.xml error
---
 extension/spark-doris-connector/pom.xml | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/extension/spark-doris-connector/pom.xml b/extension/spark-doris-connector/pom.xml
index d8ebfe7..b76c823 100644
--- a/extension/spark-doris-connector/pom.xml
+++ b/extension/spark-doris-connector/pom.xml
@@ -150,6 +150,8 @@
             <version>${spark.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
             <version>2.10.0</version>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 01/07: Modify Chinese comment (#6951)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 36ae508076dee8daa75c0f548821c979a26bcdb6
Author: jiafeng.zhang <zh...@gmail.com>
AuthorDate: Thu Oct 28 13:56:59 2021 +0800

    Modify Chinese comment (#6951)
    
    Modify Chinese comment
---
 be/src/olap/olap_common.h     | 19 +++++-----
 be/src/olap/olap_cond.cpp     | 38 ++++++++++----------
 be/src/olap/out_stream.cpp    | 35 +++++++++---------
 be/src/olap/out_stream.h      | 56 ++++++++++++++---------------
 be/src/olap/schema_change.cpp | 84 +++++++++++++++++++++----------------------
 5 files changed, 117 insertions(+), 115 deletions(-)

diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 0769870..7fe970e 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -146,10 +146,10 @@ enum FieldType {
     OLAP_FIELD_TYPE_STRING = 26
 };
 
-// 定义Field支持的所有聚集方法
-// 注意,实际中并非所有的类型都能使用以下所有的聚集方法
-// 例如对于string类型使用SUM就是毫无意义的(但不会导致程序崩溃)
-// Field类的实现并没有进行这类检查,应该在创建表的时候进行约束
+// Define all aggregation methods supported by Field
+// Note that in practice, not all types can use all the following aggregation methods
+// For example, it is meaningless to use SUM for the string type (but it will not cause the program to crash)
+// The implementation of the Field class does not perform such checks, and should be constrained when creating the table
 enum FieldAggregationMethod {
     OLAP_FIELD_AGGREGATION_NONE = 0,
     OLAP_FIELD_AGGREGATION_SUM = 1,
@@ -163,11 +163,14 @@ enum FieldAggregationMethod {
     OLAP_FIELD_AGGREGATION_REPLACE_IF_NOT_NULL = 8,
 };
 
-// 压缩算法类型
+// Compression algorithm type
 enum OLAPCompressionType {
-    OLAP_COMP_TRANSPORT = 1, // 用于网络传输的压缩算法,压缩率低,cpu开销低
-    OLAP_COMP_STORAGE = 2,   // 用于硬盘数据的压缩算法,压缩率高,cpu开销大
-    OLAP_COMP_LZ4 = 3,       // 用于储存的压缩算法,压缩率低,cpu开销低
+    // Compression algorithm used for network transmission, low compression rate, low cpu overhead
+    OLAP_COMP_TRANSPORT = 1,
+    // Compression algorithm used for hard disk data, with high compression rate and high CPU overhead 
+    OLAP_COMP_STORAGE = 2,  
+    // The compression algorithm used for storage, the compression rate is low, and the cpu overhead is low 
+    OLAP_COMP_LZ4 = 3,       
 };
 
 enum PushType {
diff --git a/be/src/olap/olap_cond.cpp b/be/src/olap/olap_cond.cpp
index c9184c3..4bc6b51 100644
--- a/be/src/olap/olap_cond.cpp
+++ b/be/src/olap/olap_cond.cpp
@@ -35,25 +35,25 @@ using std::vector;
 
 using doris::ColumnStatistics;
 
-//此文件主要用于对用户发送的查询条件和删除条件进行处理,逻辑上二者都可以分为三层
+//This file is mainly used to process query conditions and delete conditions sent by users. Logically, both can be divided into three layers
 //Condition->Condcolumn->Cond
-//Condition表示用户发的单个条件
-//Condcolumn表示一列上所有条件的集合。
-//Conds表示一列上的单个条件.
-//对于查询条件而言,各层级的条件之间都是逻辑与的关系
-//对于delete条件则有不同。Cond和Condcolumn之间是逻辑与的关系,而Condtion之间是逻辑或的关系。
-
-//具体到实现。
-//eval是用来过滤查询条件,包括堆row、block、version的过滤,具体使用哪一层看具体的调用地方。
-//  1. 没有单独过滤行的过滤条件,这部分在查询层进行。
-//  2. 过滤block在SegmentReader里面。
-//  3. 过滤version在Reader里面。调用delta_pruing_filter
+//Condition represents a single condition sent by the user
+//Condcolumn represents the collection of all conditions on a column.
+//Conds represents a single condition on a column.
+//For query conditions, the conditions of each level are logical AND relationships
+//There are different conditions for delete. The relationship between Cond and Condcolumn is logical AND, and the relationship between Condtion is logical OR.
+
+//Specific to the realization.
+//eval is used to filter query conditions, including the filtering of heap row, block, and version. Which layer is used depends on the specific calling place.
+// 1. There is no filter condition to filter rows separately, this part is carried out in the query layer.
+// 2. The filter block is in the SegmentReader.
+// 3. Filter version in Reader. Call delta_pruing_filter
 //
-//del_eval用来过滤删除条件,包括堆block和version的过滤,但是这个过滤比eval多了一个状态,即部分过滤。
-//  1. 对行的过滤在DeleteHandler。
-//     这部分直接调用delete_condition_eval实现,内部调用eval函数,因为对row的过滤不涉及部分过滤这种状态。
-//  2. 过滤block是在SegmentReader里面,直接调用del_eval
-//  3. 过滤version实在Reader里面,调用rowset_pruning_filter
+//del_eval is used to filter deletion conditions, including the filtering of heap block and version, but this filtering has one more state than eval, that is, partial filtering.
+// 1. The filtering of rows is in DeleteHandler.
+// This part directly calls delete_condition_eval to achieve, and internally calls the eval function, because the filtering of row does not involve partial filtering.
+// 2. The filter block is in the SegmentReader, call del_eval directly
+// 3. The filter version is actually in Reader, call rowset_pruning_filter
 
 namespace doris {
 
@@ -176,7 +176,7 @@ OLAPStatus Cond::init(const TCondition& tcond, const TabletColumn& column) {
 
 bool Cond::eval(const RowCursorCell& cell) const {
     if (cell.is_null() && op != OP_IS) {
-        //任何非OP_IS operand和NULL的运算都是false
+        //Any operation other than OP_IS operand and NULL is false
         return false;
     }
 
@@ -215,7 +215,7 @@ bool Cond::eval(const RowCursorCell& cell) const {
 }
 
 bool Cond::eval(const std::pair<WrapperField*, WrapperField*>& statistic) const {
-    //通过单列上的单个查询条件对version进行过滤
+    //A single query condition filtered by a single column
     // When we apply column statistic, Field can be NULL when type is Varchar,
     // we just ignore this cond
     if (statistic.first == nullptr || statistic.second == nullptr) {
diff --git a/be/src/olap/out_stream.cpp b/be/src/olap/out_stream.cpp
index 8882085..b6900c6 100644
--- a/be/src/olap/out_stream.cpp
+++ b/be/src/olap/out_stream.cpp
@@ -186,22 +186,22 @@ OLAPStatus OutStream::_spill() {
         return OLAP_SUCCESS;
     }
 
-    // 如果不压缩,直接读取current,注意output之后 current会被清空并设置为NULL
+    // If it is not compressed, read current directly. Note that current will be cleared and set to NULL after output
     if (_compressor == NULL) {
         _current->flip();
         _output_uncompress();
     } else {
-        // 如果需要压缩,
-        // current移动到head后边的位置,留出head的空间
+        //If compression is required,
+        // The current moves to the position behind the head, leaving space for the head
         _current->set_limit(_current->position());
         _current->set_position(sizeof(StreamHead));
 
-        // 分配compress和overflow,这两个buffer大小其实是一样的
+        //Allocate compress and overflow, the two buffer sizes are actually the same
         if (OLAP_SUCCESS != (res = _make_sure_output_buffer())) {
             return res;
         }
 
-        // 吧 current解压到compress和overflow
+        // Decompress current to compress and overflow
         uint64_t head_pos = _compressed->position();
         _compressed->set_position(head_pos + sizeof(StreamHead));
         bool smaller = false;
@@ -213,8 +213,8 @@ OLAPStatus OutStream::_spill() {
         }
 
         if (smaller) {
-            // 数据都压缩到_output和_overflow里, 重置_current
-            // 注意这种情况下,current并没有被释放,因为实际上输出的compress
+            // Data are compressed into _output and _overflow, reset _current
+            // Note that in this case, current is not released, because the compress actually output
             _current->set_position(sizeof(StreamHead));
             _current->set_limit(_current->capacity());
 
@@ -228,15 +228,14 @@ OLAPStatus OutStream::_spill() {
 
             _spilled_bytes += sizeof(StreamHead) + output_bytes;
         } else {
-            // 直接将_current输出
-
-            // 如果之前还有_compress, 先输出m_compress
-            // 注意此时一定没有_overflow
+             // directly output _current
+             // If there is _compress before, output m_compress first
+             // Note that there must be no _overflow at this time
             _compressed->set_position(head_pos);
 
             if (head_pos != 0) {
-                // 之前_compressed里有数据, 这种情况下先输出compressed,
-                // 此时_overflow一定是空的
+                // There was data in _compressed before, in this case, output compressed first,
+                // At this time _overflow must be empty 
                 _output_compressed();
             }
 
@@ -253,11 +252,11 @@ OLAPStatus OutStream::write(const char* buffer, uint64_t length) {
     uint64_t remain = length;
 
     while (remain > 0) {
-        // 之所以扔进来,是因为在压缩的情况下,_current只会被创建一次
-        // 之后一直在复用,输出的是compress
-        // 而在未压缩的情况下,current会被放进列表,而无法复用,原因是
-        // 如果复用的话,会修改之前的内容,因此需要重新分配。
-        // 只分配一次那么第二块就会挂掉
+         // The reason why it was thrown in is because in the case of compression, _current will only be created once
+         // It has been multiplexing since then, and the output is compress
+         // In the case of uncompressed, current will be put into the list and cannot be reused. The reason is
+         // If it is reused, the previous content will be modified, so it needs to be redistributed.
+         // Only allocate once and the second block will hang up
         if (NULL == _current) {
             res = _create_new_input_buffer();
             if (OLAP_SUCCESS != res) {
diff --git a/be/src/olap/out_stream.h b/be/src/olap/out_stream.h
index 6473091..2376758 100644
--- a/be/src/olap/out_stream.h
+++ b/be/src/olap/out_stream.h
@@ -27,26 +27,26 @@
 namespace doris {
 class FileHandler;
 
-// 与OrcFile不同,我们底层没有HDFS无法保证存储数据的可靠性,所以必须写入
-// 校验值,在读取数据的时候检验这一校验值
-// 采用TLV类型的头部,有足够的扩展性
+// Unlike OrcFile, we cannot guarantee the reliability of stored data without HDFS at the bottom, so we must write
+// Check value, check this check value when reading data
+// Adopt TLV type header, which has sufficient scalability
 struct StreamHead {
     enum StreamType { UNCOMPRESSED = 0, COMPRESSED = 1 };
-    uint8_t type;         // 256种类型, 应该足够以后的扩展了
-    uint32_t length : 24; // 24位长度
-    uint32_t checksum;    // 32位校验值
+    uint8_t type;         // 256 types, should be enough for future expansion
+    uint32_t length : 24; // 24-bit length
+    uint32_t checksum;    // 32-bit check value
     StreamHead() : type(COMPRESSED), length(0), checksum(0) {}
 } __attribute__((packed));
 
-// 输出流,使用一组ByteBuffer缓存所有的数据
+// Output stream, use a set of ByteBuffer to buffer all data
 class OutStream {
 public:
-    // 输出流支持压缩或者不压缩两种模式,如果启用压缩,给出压缩函数
+    // The output stream supports two modes: compressed or uncompressed. If compression is enabled, the compression function is given
     explicit OutStream(uint32_t buffer_size, Compressor compressor);
 
     ~OutStream();
 
-    // 向流输出一个字节
+    // Output a byte to the stream
     inline OLAPStatus write(char byte) {
         OLAPStatus res = OLAP_SUCCESS;
         if (_current == nullptr) {
@@ -71,26 +71,26 @@ public:
         return _current->put(byte);
     }
 
-    // 向流输出一段数据
+    // Output a piece of data to the stream
     OLAPStatus write(const char* buffer, uint64_t length);
 
-    // 将流的当前位置记录在索引项中
+    // Record the current position of the stream in the index entry
     void get_position(PositionEntryWriter* index_entry) const;
 
-    // 返回流中所有数据的大小
+    // Returns the size of all data in the stream
     uint64_t get_stream_length() const;
 
-    // 返回已经分配的缓冲区大小
+    // Returns the size of the buffer that has been allocated
     uint64_t get_total_buffer_size() const;
 
-    // 将缓存的数据流输出到文件
+    // Output the cached data stream to a file
     OLAPStatus write_to_file(FileHandler* file_handle, uint32_t write_mbytes_per_sec) const;
 
     bool is_suppressed() const { return _is_suppressed; }
     void suppress() { _is_suppressed = true; }
-    // 将数据输出到output_buffers
+    // Output data to output_buffers
     OLAPStatus flush();
-    // 计算输出数据的crc32值
+    // Calculate the crc32 value of the output data
     uint32_t crc32(uint32_t checksum) const;
     const std::vector<StorageByteBuffer*>& output_buffers() { return _output_buffers; }
 
@@ -115,33 +115,33 @@ private:
     void _output_compressed();
     OLAPStatus _make_sure_output_buffer();
 
-    uint32_t _buffer_size;                           // 压缩块大小
-    Compressor _compressor;                          // 压缩函数,如果为NULL表示不压缩
-    std::vector<StorageByteBuffer*> _output_buffers; // 缓冲所有的输出
-    bool _is_suppressed;                             // 流是否被终止
-    StorageByteBuffer* _current;                     // 缓存未压缩的数据
-    StorageByteBuffer* _compressed;                  // 即将输出到output_buffers中的字节
-    StorageByteBuffer* _overflow;                    // _output中放不下的字节
-    uint64_t _spilled_bytes;                         // 已经输出到output的字节数
+    uint32_t _buffer_size;                           // Compressed block size
+    Compressor _compressor;                          // Compression function, if NULL means no compression
+    std::vector<StorageByteBuffer*> _output_buffers; // Buffer all output
+    bool _is_suppressed;                             // Whether the stream is terminated
+    StorageByteBuffer* _current;                     // Cache uncompressed data
+    StorageByteBuffer* _compressed;                  // Bytes to be output to output_buffers
+    StorageByteBuffer* _overflow;                    // Bytes that can't fit in _output
+    uint64_t _spilled_bytes;                         // The number of bytes that have been output to output
 
     DISALLOW_COPY_AND_ASSIGN(OutStream);
 };
 
-// 定义输出流的工厂方法
-// 将所有的输出流托管,同时封装了诸如压缩算法,是否启用Index,block大小等信息
+// Define the factory method of the output stream
+// Host all output streams, and encapsulate information such as compression algorithm, whether to enable Index, block size, etc.
 class OutStreamFactory {
 public:
     explicit OutStreamFactory(CompressKind compress_kind, uint32_t stream_buffer_size);
 
     ~OutStreamFactory();
 
-    // 创建后的stream的生命期依旧由OutStreamFactory管理
+    //The lifetime of the stream after creation is still managed by OutStreamFactory
     OutStream* create_stream(uint32_t column_unique_id, StreamInfoMessage::Kind kind);
 
     const std::map<StreamName, OutStream*>& streams() const { return _streams; }
 
 private:
-    std::map<StreamName, OutStream*> _streams; // 所有创建过的流
+    std::map<StreamName, OutStream*> _streams; // All created streams
     CompressKind _compress_kind;
     Compressor _compressor;
     uint32_t _stream_buffer_size;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 49a7f96..8a1eb34 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -466,13 +466,13 @@ OLAPStatus RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t
         return OLAP_ERR_NOT_INITED;
     }
 
-    // a.1 先判断数据是否需要过滤,最终只有标记为1的才是留下需要的
-    //   对于没有filter的来说,相当于全部设置为1后留下
+    // a.1 First determine whether the data needs to be filtered, and finally only those marked as 1 are left as needed
+    // For those without filter, it is equivalent to leave after setting all to 1
     const uint32_t row_num = ref_block->row_block_info().row_num;
-    // (0表示过滤掉不要,1表示要,过程中2表示此row要切后续不需要再比较其他列)
+    // (0 means no need to filter out, 1 means yes, during the process 2 means that this row needs to be cut and there is no need to compare other columns later)
     std::vector<int8_t> is_data_left_vec(row_num, 1);
 
-    // 一行一行地进行比较
+    // Compare each row
     for (size_t row_index = 0; row_index < row_num; ++row_index) {
         ref_block->get_row(row_index, &read_helper);
 
@@ -486,14 +486,14 @@ OLAPStatus RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t
         }
     }
 
-    // a.2 计算留下的row num
+    // a.2 Calculate the left row num
     uint32_t new_row_num = row_num - *filtered_rows;
 
     const bool need_filter_data = (new_row_num != row_num);
     const bool filter_all = (new_row_num == 0);
 
     MemPool* mem_pool = mutable_block->mem_pool();
-    // b. 根据前面的过滤信息,只对还标记为1的处理
+    // b. According to the previous filtering information, only processes that are also marked as 1
     for (size_t i = 0, len = mutable_block->tablet_schema().num_columns(); !filter_all && i < len;
          ++i) {
         int32_t ref_column = _schema_mapping[i].ref_column;
@@ -537,15 +537,15 @@ OLAPStatus RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t
             FieldType reftype = ref_block->tablet_schema().column(ref_column).type();
             FieldType newtype = mutable_block->tablet_schema().column(i).type();
             if (newtype == reftype) {
-                // 效率低下,也可以直接计算变长域拷贝,但仍然会破坏封装
+                // Low efficiency, you can also directly calculate the variable length domain copy, but it will still destroy the package
                 for (size_t row_index = 0, new_row_index = 0;
                      row_index < ref_block->row_block_info().row_num; ++row_index) {
-                    // 不需要的row,每次处理到这个row时就跳过
+                    // Unneeded row, skip every time this row is processed
                     if (need_filter_data && is_data_left_vec[row_index] == 0) {
                         continue;
                     }
 
-                    // 指定新的要写入的row index(不同于读的row_index)
+                    // Specify the new row index to be written (different from the read row_index)
                     mutable_block->get_row(new_row_index++, &write_helper);
                     ref_block->get_row(row_index, &read_helper);
 
@@ -597,10 +597,10 @@ OLAPStatus RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t
                         }
                     }
                 }
-                // 从ref_column 写入 i列。
+                // Write column i from ref_column.
             } else {
                 // copy and alter the field
-                // 此处可以暂时不动,新类型暂时不涉及类型转换
+                // You can stay here for the time being, the new type does not involve type conversion for the time being
                 switch (reftype) {
                 case OLAP_FIELD_TYPE_TINYINT:
                     CONVERT_FROM_TYPE(int8_t);
@@ -637,10 +637,10 @@ OLAPStatus RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t
                 }
             }
         } else {
-            // 新增列,写入默认值
+            // New column, write default value
             for (size_t row_index = 0, new_row_index = 0;
                  row_index < ref_block->row_block_info().row_num; ++row_index) {
-                // 不需要的row,每次处理到这个row时就跳过
+                // Unneeded row, skip every time this row is processed
                 if (need_filter_data && is_data_left_vec[row_index] == 0) {
                     continue;
                 }
@@ -658,9 +658,9 @@ OLAPStatus RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t
         }
     }
 
-    // NOTE 当前mutable_block的内存row_num还是和ref一样多
-    //  (其实在init时就可以重新init成少的,filter留下的new_row_num)
-    // 在split_table时,可能会出现因为过滤导致没有数据
+    // NOTE The current row_num of mutable_block is still as much as ref
+    // (Actually, you can re-init into less when init, the new_row_num left by the filter)
+    // In split_table, there may be no data due to filtering
     mutable_block->finalize(new_row_num);
     return OLAP_SUCCESS;
 }
@@ -766,7 +766,7 @@ OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows, bo
         return OLAP_SUCCESS;
     }
 
-    // TODO(lijiao) : 为什么舍弃原有的m_row_block_buffer
+    // TODO(lijiao) : Why abandon the original m_row_block_buffer
     *row_block = new (nothrow) RowBlock(&_tablet_schema);
 
     if (*row_block == nullptr) {
@@ -1041,7 +1041,7 @@ OLAPStatus SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
         RETURN_NOT_OK(reserve_block(&new_row_block, ref_row_block->row_block_info().row_num,
                                     _row_block_allocator));
 
-        // 将ref改为new。这一步按道理来说确实需要等大的块,但理论上和writer无关。
+        // Change ref to new. This step is reasonable to say that it does need to wait for a large block, but theoretically it has nothing to do with the writer.
         uint64_t filtered_rows = 0;
         res = _row_block_changer.change_row_block(ref_row_block, rowset_reader->version().second,
                                                   new_row_block.get(), &filtered_rows);
@@ -1096,10 +1096,10 @@ SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& row_bloc
           _row_block_changer(row_block_changer),
           _memory_limitation(memory_limitation),
           _row_block_allocator(nullptr) {
-    // 每次SchemaChange做外排的时候,会写一些临时版本(比如999,1000,1001),为避免Cache冲突,临时
-    // 版本进行2个处理:
-    // 1. 随机值作为VersionHash
-    // 2. 版本号取一个BIG NUMBER加上当前正在进行SchemaChange的版本号
+    // Every time SchemaChange is used for external rowing, some temporary versions (such as 999, 1000, 1001) will be written, in order to avoid Cache conflicts, temporary
+    // The version performs 2 processes:
+    // 1. Random value as VersionHash
+    // 2. The version number takes a BIG NUMBER plus the version number of the current SchemaChange
     _temp_delta_versions.first = (1 << 28);
     _temp_delta_versions.second = (1 << 28);
     // TODO(zyh): remove the magic number
@@ -1666,8 +1666,8 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl
               << "base_tablet=" << base_tablet->full_name()
               << ", new_tablet=" << new_tablet->full_name();
 
-    // a. 解析Alter请求,转换成内部的表示形式
-    // 不使用DELETE_DATA命令指定的删除条件
+    // a. Parse the Alter request and convert it into an internal representation
+    // Do not use the delete condition specified by the DELETE_DATA command
     RowBlockChanger rb_changer(new_tablet->tablet_schema());
     bool sc_sorting = false;
     bool sc_directly = false;
@@ -1679,9 +1679,9 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl
         return res;
     }
 
-    // NOTE split_table如果使用row_block,会导致原block变小
-    // 但由于历史数据在后续base/cumulative后还是会变成正常,故用directly也可以
-    // b. 生成历史数据转换器
+    // NOTE split_table if row_block is used, the original block will become smaller
+    // But since the historical data will become normal after the subsequent base/cumulative, it is also possible to use directly
+    // b. Generate historical data converter
     SchemaChange* sc_procedure = nullptr;
     if (sc_sorting) {
         size_t memory_limitation = config::memory_limitation_per_thread_for_schema_change;
@@ -1702,7 +1702,7 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl
         return OLAP_ERR_MALLOC_ERROR;
     }
 
-    // c. 转换数据
+    // c. Convert data
     DeleteHandler delete_handler;
     std::vector<ColumnId> return_columns;
     size_t num_cols = base_tablet->tablet_schema().num_columns();
@@ -1817,15 +1817,15 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
         }
     }
 
-    // change中增加了filter信息,在_parse_request中会设置filter的column信息
-    // 并在每次row block的change时,过滤一些数据
+    // Add filter information in change, and filter column information will be set in _parse_request
+    // And filter some data every time the row block changes
     RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), sc_params.delete_handler);
 
     bool sc_sorting = false;
     bool sc_directly = false;
     SchemaChange* sc_procedure = nullptr;
 
-    // a. 解析Alter请求,转换成内部的表示形式
+    // a.Parse the Alter request and convert it into an internal representation
     OLAPStatus res = _parse_request(sc_params.base_tablet, sc_params.new_tablet, &rb_changer,
                                     &sc_sorting, &sc_directly, sc_params.materialized_params_map);
     if (res != OLAP_SUCCESS) {
@@ -1833,7 +1833,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
         goto PROCESS_ALTER_EXIT;
     }
 
-    // b. 生成历史数据转换器
+    // b. Generate historical data converter
     if (sc_sorting) {
         size_t memory_limitation = config::memory_limitation_per_thread_for_schema_change;
         LOG(INFO) << "doing schema change with sorting for base_tablet "
@@ -1857,14 +1857,14 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
         goto PROCESS_ALTER_EXIT;
     }
 
-    // c. 转换历史数据
+    // c.Convert historical data
     for (auto& rs_reader : sc_params.ref_rowset_readers) {
         VLOG_TRACE << "begin to convert a history rowset. version=" << rs_reader->version().first
                    << "-" << rs_reader->version().second;
 
         // set status for monitor
-        // 只要有一个new_table为running,ref table就设置为running
-        // NOTE 如果第一个sub_table先fail,这里会继续按正常走
+        // As long as there is a new_table as running, ref table is set as running
+        // NOTE If the first sub_table fails first, it will continue to go as normal here
         TabletSharedPtr new_tablet = sc_params.new_tablet;
 
         RowsetWriterContext writer_context;
@@ -1906,8 +1906,8 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
         }
         new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
                                                    rowset_writer->rowset_id().to_string());
-        // 将新版本的数据加入header
-        // 为了防止死锁的出现,一定要先锁住旧表,再锁住新表
+        // Add the new version of the data to the header
+        // In order to prevent the occurrence of deadlock, we must first lock the old table, and then lock the new table
         sc_params.new_tablet->obtain_push_lock();
         RowsetSharedPtr new_rowset = rowset_writer->build();
         if (new_rowset == nullptr) {
@@ -1941,7 +1941,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
                    << " version=" << rs_reader->version().first << "-"
                    << rs_reader->version().second;
     }
-    // XXX: 此时应该不取消SchemaChange状态,因为新Delta还要转换成新旧Schema的版本
+    // XXX:The SchemaChange state should not be canceled at this time, because the new Delta has to be converted to the old and new Schema version
 PROCESS_ALTER_EXIT : {
     // save tablet meta here because rowset meta is not saved during add rowset
     WriteLock new_wlock(sc_params.new_tablet->get_header_lock_ptr());
@@ -1960,7 +1960,7 @@ PROCESS_ALTER_EXIT : {
 }
 
 // @static
-// 分析column的mapping以及filter key的mapping
+// Analyze the mapping of the column and the mapping of the filter key
 OLAPStatus SchemaChangeHandler::_parse_request(
         TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, RowBlockChanger* rb_changer,
         bool* sc_sorting, bool* sc_directly,
@@ -2014,7 +2014,7 @@ OLAPStatus SchemaChangeHandler::_parse_request(
             continue;
         }
 
-        // 新加列走这里
+        // Newly added column go here
         //if (new_column_schema.is_allow_null || new_column_schema.has_default_value) {
         {
             column_mapping->ref_column = -1;
@@ -2034,7 +2034,7 @@ OLAPStatus SchemaChangeHandler::_parse_request(
             continue;
         }
 
-        // XXX: 只有DROP COLUMN时,遇到新Schema转旧Schema时会进入这里。
+        // XXX: Only when DROP COLUMN, you will enter here when you encounter a new Schema to an old Schema。
         column_mapping->ref_column = -1;
 
         if (OLAP_SUCCESS != (res = _init_column_mapping(column_mapping, new_column, ""))) {
@@ -2048,7 +2048,7 @@ OLAPStatus SchemaChangeHandler::_parse_request(
 
     // Check if re-aggregation is needed.
     *sc_sorting = false;
-    // 若Key列的引用序列出现乱序,则需要重排序
+    // If the reference sequence of the Key column is out of order, it needs to be reordered
     int num_default_value = 0;
 
     for (int i = 0, new_schema_size = new_tablet->num_key_columns(); i < new_schema_size; ++i) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 07/07: [Enhance][Load] Reduce the number of segments when loading a large volume data in one batch (#6947)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 4719771be8049e5b17fdfc0caaa9b727a1a2a854
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Mon Nov 1 10:51:50 2021 +0800

    [Enhance][Load] Reduce the number of segments when loading a large volume data in one batch (#6947)
    
    ## Case
    
    In the load process, each tablet will have a memtable to save the incoming data,
    and if the data in a memtable is larger than 100MB, it will be flushed to disk as a `segment` file. And then
    a new memtable will be created to save the following data/
    
    Assume that this is a table with N buckets(tablets). So the max size of all memtables will be `N * 100MB`.
    If N is large, it will cost too much memory.
    
    So for memory limit purpose, when the size of all memtables reach a threshold(2GB as default), Doris will
    try to flush all current memtables to disk(even if their size are not reach 100MB).
    
    So you will see that the memtable will be flushed when it's size reach `2GB/N`, which maybe much smaller
    than 100MB, resulting in too many small segment files.
    
    ## Solution
    
    When decide to flush memtable to reduce memory consumption, NOT to flush all memtable, but to flush part
    of them.
    For example, there are 50 tablets(with 50 memtables). The memory limit is 1GB, so when each memtable reach
    20MB, the total size reach 1GB, and flush will occur.
    
    If I only flush 25 of 50 memtables, then next time when the total size reach 1GB, there will be 25 memtables with
    size 10MB, and other 25 memtables with size 30MB. So I can flush those memtables with size 30MB, which is larger
    than 20MB.
    
    The main idea is to introduce some jitter during flush to ensure the small unevenness of each memtable, so as to ensure that flush will only be triggered when the memtable is large enough.
    
    In my test, loading a table with 48 buckets, mem limit 2G, in previous version, the average memtable size is 44MB,
    after modification, the average size is 82MB
---
 be/src/common/config.h                             |  4 +-
 be/src/olap/memtable.cpp                           |  5 +++
 be/src/olap/memtable.h                             |  4 ++
 be/src/runtime/load_channel.cpp                    |  2 +-
 be/src/runtime/tablets_channel.cpp                 | 48 ++++++++++++++++++----
 be/src/runtime/tablets_channel.h                   |  2 +-
 docs/en/best-practices/star-schema-benchmark.md    | 12 +++---
 docs/zh-CN/best-practices/star-schema-benchmark.md | 16 ++++----
 .../org/apache/doris/analysis/OutFileClause.java   |  3 +-
 9 files changed, 70 insertions(+), 26 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 42c43fb..977282c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -175,7 +175,7 @@ CONF_mInt32(doris_max_pushdown_conjuncts_return_rate, "90");
 // (Advanced) Maximum size of per-query receive-side buffer
 CONF_mInt32(exchg_node_buffer_size_bytes, "10485760");
 // push_write_mbytes_per_sec
-CONF_mInt32(push_write_mbytes_per_sec, "10");
+CONF_mInt32(push_write_mbytes_per_sec, "100");
 
 CONF_mInt64(column_dictionary_key_ratio_threshold, "0");
 CONF_mInt64(column_dictionary_key_size_threshold, "0");
@@ -440,7 +440,7 @@ CONF_mInt64(memory_maintenance_sleep_time_s, "10");
 CONF_Int32(memory_max_alignment, "16");
 
 // write buffer size before flush
-CONF_mInt64(write_buffer_size, "104857600");
+CONF_mInt64(write_buffer_size, "209715200");
 
 // following 2 configs limit the memory consumption of load process on a Backend.
 // eg: memory limit to 80% of mem limit config but up to 100GB(default)
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 2c6069f..3ce5db1 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -61,6 +61,7 @@ int MemTable::RowCursorComparator::operator()(const char* left, const char* righ
 }
 
 void MemTable::insert(const Tuple* tuple) {
+    _rows++;
     bool overwritten = false;
     uint8_t* _tuple_buf = nullptr;
     if (_keys_type == KeysType::DUP_KEYS) {
@@ -120,6 +121,8 @@ void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_
 }
 
 OLAPStatus MemTable::flush() {
+    VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id
+                  << ", memsize: " << memory_usage() << ", rows: " << _rows;
     int64_t duration_ns = 0;
     {
         SCOPED_RAW_TIMER(&duration_ns);
@@ -141,6 +144,8 @@ OLAPStatus MemTable::flush() {
     }
     DorisMetrics::instance()->memtable_flush_total->increment(1);
     DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
+    VLOG_CRITICAL << "after flush memtable for tablet: " << _tablet_id
+                  << ", flushsize: " << _flush_size;
     return OLAP_SUCCESS;
 }
 
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 6d5beeb..02783d7 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -118,6 +118,10 @@ private:
 
     // the data size flushed on disk of this memtable
     int64_t _flush_size = 0;
+    // Number of rows inserted to this memtable.
+    // This is not the rows in this memtable, because rows may be merged
+    // in unique or aggragate key model.
+    int64_t _rows = 0;
 
 }; // class MemTable
 
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 01e8d66..5834777 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -121,7 +121,7 @@ void LoadChannel::handle_mem_exceed_limit(bool force) {
 
     std::shared_ptr<TabletsChannel> channel;
     if (_find_largest_consumption_channel(&channel)) {
-        channel->reduce_mem_usage();
+        channel->reduce_mem_usage(_mem_tracker->limit());
     } else {
         // should not happen, add log to observe
         LOG(WARNING) << "fail to find suitable tablets-channel when memory exceed. "
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index db5e23b..f74861f 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -184,7 +184,7 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
     return Status::OK();
 }
 
-Status TabletsChannel::reduce_mem_usage() {
+Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kFinished) {
         // TabletsChannel is closed without LoadChannel's lock,
@@ -192,18 +192,48 @@ Status TabletsChannel::reduce_mem_usage() {
         return _close_status;
     }
 
-    // Flush all memtables
+    // Sort the DeltaWriters by mem consumption in descend order.
+    std::vector<DeltaWriter*> writers;
     for (auto& it : _tablet_writers) {
-        it.second->flush_memtable_and_wait(false);
+        writers.push_back(it.second);
     }
+    std::sort(writers.begin(), writers.end(),
+              [](const DeltaWriter* lhs, const DeltaWriter* rhs) {
+                  return lhs->mem_consumption() > rhs->mem_consumption();
+              });
 
-    for (auto& it : _tablet_writers) {
-        OLAPStatus st = it.second->wait_flush();
+    // Decide which writes should be flushed to reduce mem consumption.
+    // The main idea is to flush at least one third of the mem_limit.
+    // This is mainly to solve the following scenarios.
+    // Suppose there are N tablets in this TabletsChannel, and the mem limit is M.
+    // If the data is evenly distributed, when each tablet memory accumulates to M/N,
+    // the reduce memory operation will be triggered.
+    // At this time, the value of M/N may be much smaller than the value of `write_buffer_size`.
+    // If we flush all the tablets at this time, each tablet will generate a lot of small files.
+    // So here we only flush part of the tablet, and the next time the reduce memory operation is triggered,
+    // the tablet that has not been flushed before will accumulate more data, thereby reducing the number of flushes.
+    int64_t mem_to_flushed = mem_limit / 3;
+    int counter = 0;
+    int64_t  sum = 0;
+    for (auto writer : writers) {
+        if (writer->mem_consumption() <= 0) {
+            break;
+        }
+        ++counter;
+        sum += writer->mem_consumption();
+        if (sum > mem_to_flushed) {
+            break;
+        }
+    }
+    VLOG_CRITICAL << "flush " << counter << " memtables to reduce memory: " << sum;
+    for (int i = 0; i < counter; i++) {
+        writers[i]->flush_memtable_and_wait(false);
+    }
+
+    for (int i = 0; i < counter; i++) {
+        OLAPStatus st = writers[i]->wait_flush();
         if (st != OLAP_SUCCESS) {
-            // flush failed, return error
-            std::stringstream ss;
-            ss << "failed to reduce mem consumption by flushing memtable. err: " << st;
-            return Status::InternalError(ss.str());
+            return Status::InternalError(fmt::format("failed to reduce mem consumption by flushing memtable. err: {}", st));
         }
     }
     return Status::OK();
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index b0402bd..9f87e8a 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -78,7 +78,7 @@ public:
     // eg. flush the largest memtable immediately.
     // return Status::OK if mem is reduced.
     // no-op when this channel has been closed or cancelled
-    Status reduce_mem_usage();
+    Status reduce_mem_usage(int64_t mem_limit);
 
     int64_t mem_consumption() const { return _mem_tracker->consumption(); }
 
diff --git a/docs/en/best-practices/star-schema-benchmark.md b/docs/en/best-practices/star-schema-benchmark.md
index f0ea9e1..fb28fee 100644
--- a/docs/en/best-practices/star-schema-benchmark.md
+++ b/docs/en/best-practices/star-schema-benchmark.md
@@ -60,7 +60,7 @@ Execute the following script to generate the SSB data set:
 sh gen-ssb-data.sh -s 100 -c 100
 ```
 
-> Note 1: `sh gen-ssb-data.sh -h View help`
+> Note 1: Run `sh gen-ssb-data.sh -h` for help.
 >
 > Note 2: The data will be generated under the directory `ssb-data/` with a suffix of `.tbl`. The total file size is about 60GB. The generation time may vary from a few minutes to an hour.
 >
@@ -92,9 +92,11 @@ Under the `-s 100` parameter, the generated data set size is:
 
         Import the lineorder table data with the following command:
 
-        `sh load-fact-data.sh -c 3`
+        `sh load-fact-data.sh -c 5`
 
-        `-c 3` means to start 5 concurrent threads to import (the default is 3). In the case of a single BE node, the import time of lineorder data generated by `sh gen-ssb-data.sh -s 100 -c 100` using `sh load-fact-data.sh -c 3` is about 10 minutes. The memory overhead is about 5-6GB. If you turn on more threads, you can speed up the import speed, but it will increase additional memory overhead.
+        `-c 5` means to start 5 concurrent threads to import (the default is 3). In the case of a single BE node, the import time of lineorder data generated by `sh gen-ssb-data.sh -s 100 -c 100` using `sh load-fact-data.sh -c 3` is about 10 minutes. The memory overhead is about 5-6GB. If you turn on more threads, you can speed up the import speed, but it will increase additional memory overhead.
+
+        > Note: To get a faster import speed, you can add `flush_thread_num_per_store=5` in be.conf and restart BE. This configuration indicates the number of disk write threads for each data directory, and the default is 2. Larger data can increase write data throughput, but may increase IO Util. (Reference value: 1 mechanical disk, when the default is 2, the IO Util during the import process is about 12%, when it is set to 5, the IO Util is about 26%. If it is an SSD disk, it is almost 0) .
 
 5. Check the imported data
 
@@ -152,7 +154,7 @@ The following test report is based on Doris [branch-0.15](https://github.com/apa
     | q4.2 | 1430 | 670 | 8 | BLOOM_FILTER |
     | q4.2 | 1750 | 1030 | 8 | BLOOM_FILTER |
 
-    > Note 1: "This test set is far from your generation environment, don't be superstitious!"
+    > Note 1: "This test set is far from your production environment, please be skeptical!"
     >
     > Note 2: The test result is the average value of multiple executions (Page Cache will play a certain acceleration role). And the data has undergone sufficient compaction (if you test immediately after importing the data, the query delay may be higher than the test result)
     >
@@ -160,4 +162,4 @@ The following test report is based on Doris [branch-0.15](https://github.com/apa
     >
     > Note 4: Parallelism means query concurrency, which is set by `set parallel_fragment_exec_instance_num=8`.
     >
-    > Note 5: Runtime Filter Mode is the type of Runtime Filter, set by `set runtime_filter_type="BLOOM_FILTER"`. ([Runtime Filter](http://doris.incubator.apache.org/master/en/administrator-guide/runtime-filter.html) function has a significant effect on the SSB test set. Because in this test level, Join is calculated The data in the sub-right table can filter the left table very well. You can try to turn off this function through `set runtime_filter_mode=off` to see the change in query l [...]
+    > Note 5: Runtime Filter Mode is the type of Runtime Filter, set by `set runtime_filter_type="BLOOM_FILTER"`. ([Runtime Filter](http://doris.incubator.apache.org/master/en/administrator-guide/runtime-filter.html) function has a significant effect on the SSB test set. Because in this test set, The data from the right table of Join can filter the left table very well. You can try to turn off this function through `set runtime_filter_mode=off` to see the change in query latency.)
diff --git a/docs/zh-CN/best-practices/star-schema-benchmark.md b/docs/zh-CN/best-practices/star-schema-benchmark.md
index 30f43aa..ae7d608 100644
--- a/docs/zh-CN/best-practices/star-schema-benchmark.md
+++ b/docs/zh-CN/best-practices/star-schema-benchmark.md
@@ -30,7 +30,7 @@ under the License.
 
 本文档主要介绍如何在 Doris 中通过 SSB 进程初步的性能测试。
 
-> 注1:包括 SSB 在内标准测试集通常和实际业务场景差距较大,并且部分测试会针对测试集进行参数调优。所以标准测试集的测试结果仅能反映数据库在特定场景下的性能表现。建议用户使用实际业务数据进行进一步的测试。
+> 注1:包括 SSB 在内的标准测试集通常和实际业务场景差距较大,并且部分测试会针对测试集进行参数调优。所以标准测试集的测试结果仅能反映数据库在特定场景下的性能表现。建议用户使用实际业务数据进行进一步的测试。
 > 
 > 注2:本文档涉及的操作都在 CentOS 7 环境进行。
 
@@ -60,7 +60,7 @@ sh build-ssb-dbgen.sh
 sh gen-ssb-data.sh -s 100 -c 100
 ```
 
-> 注1:`sh gen-ssb-data.sh -h 查看帮助`
+> 注1:通过 `sh gen-ssb-data.sh -h` 查看脚本帮助。
 > 
 > 注2:数据会以 `.tbl` 为后缀生成在  `ssb-data/` 目录下。文件总大小约60GB。生成时间可能在数分钟到1小时不等。
 > 
@@ -84,7 +84,7 @@ sh gen-ssb-data.sh -s 100 -c 100
 
     1. 导入 4 张维度表数据(customer, part, supplier and date)
     
-        因为这4张维表数据量较小,导入较简答,我们使用以下命令先导入这4表的数据:
+        因为这4张维表数据量较小,导入较简单,我们使用以下命令先导入这4表的数据:
         
         `sh load-dimension-data.sh`
         
@@ -92,9 +92,11 @@ sh gen-ssb-data.sh -s 100 -c 100
 
         通过以下命令导入 lineorder 表数据:
         
-        `sh load-fact-data.sh -c 3`
+        `sh load-fact-data.sh -c 5`
         
-        `-c 3` 表示启动 5 个并发线程导入(默认为3)。在单 BE 节点情况下,由 `sh gen-ssb-data.sh -s 100 -c 100` 生成的 lineorder 数据,使用 `sh load-fact-data.sh -c 3` 的导入时间约为 10min。内存开销约为 5-6GB。如果开启更多线程,可以加快导入速度,但会增加额外的内存开销。
+        `-c 5` 表示启动 5 个并发线程导入(默认为3)。在单 BE 节点情况下,由 `sh gen-ssb-data.sh -s 100 -c 100` 生成的 lineorder 数据,使用 `sh load-fact-data.sh -c 3` 的导入时间约为 10min。内存开销约为 5-6GB。如果开启更多线程,可以加快导入速度,但会增加额外的内存开销。
+
+    > 注:为获得更快的导入速度,你可以在 be.conf 中添加 `flush_thread_num_per_store=5` 后重启BE。该配置表示每个数据目录的写盘线程数,默认为2。较大的数据可以提升写数据吞吐,但可能会增加 IO Util。(参考值:1块机械磁盘,在默认为2的情况下,导入过程中的 IO Util 约为12%,设置为5时,IO Util 约为26%。如果是 SSD 盘,则几乎为 0)。
 
 5. 检查导入数据
 
@@ -152,7 +154,7 @@ SSB 测试集共 4 组 14 个 SQL。查询语句在 [queries/](https://github.co
     | q4.2 | 1430 | 670 | 8 | BLOOM_FILTER |
     | q4.2 | 1750 | 1030 | 8 | BLOOM_FILTER |
 
-    > 注1:“这个测试集和你的生成环境相去甚远,不要迷信他!”
+    > 注1:“这个测试集和你的生产环境相去甚远,请对他保持怀疑态度!”
     > 
     > 注2:测试结果为多次执行取平均值(Page Cache 会起到一定加速作用)。并且数据经过充分的 compaction (如果在刚导入数据后立刻测试,则查询延迟可能高于本测试结果)
     >
@@ -160,5 +162,5 @@ SSB 测试集共 4 组 14 个 SQL。查询语句在 [queries/](https://github.co
     >
     > 注4:Parallelism 表示查询并发度,通过 `set parallel_fragment_exec_instance_num=8` 设置。
     >
-    > 注5:Runtime Filter Mode 是 Runtime Filter 的类型,通过 `set runtime_filter_type="BLOOM_FILTER"` 设置。([Runtime Filter](http://doris.incubator.apache.org/master/zh-CN/administrator-guide/runtime-filter.html) 功能对 SSB 测试集效果显著。因为该测试级中,Join 算子右表的数据可以对左表起到很好的过滤作用。你可以尝试通过 `set runtime_filter_mode=off` 关闭该功能,看看查询延迟的变化。)
+    > 注5:Runtime Filter Mode 是 Runtime Filter 的类型,通过 `set runtime_filter_type="BLOOM_FILTER"` 设置。([Runtime Filter](http://doris.incubator.apache.org/master/zh-CN/administrator-guide/runtime-filter.html) 功能对 SSB 测试集效果显著。因为该测试集中,Join 算子右表的数据可以对左表起到很好的过滤作用。你可以尝试通过 `set runtime_filter_mode=off` 关闭该功能,看看查询延迟的变化。)
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 18bfc2d..b2268ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -308,7 +308,8 @@ public class OutFileClause {
 
         if (filePath.startsWith(LOCAL_FILE_PREFIX)) {
             if (!Config.enable_outfile_to_local) {
-                throw new AnalysisException("Exporting results to local disk is not allowed.");
+                throw new AnalysisException("Exporting results to local disk is not allowed." 
+                    + " To enable this feature, you need to add `enable_outfile_to_local=true` in fe.conf and restart FE");
             }
             isLocalOutput = true;
             filePath = filePath.substring(LOCAL_FILE_PREFIX.length() - 1); // leave last '/'

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 02/07: [Docs] Fix error KEY url (#6955)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit a9468959cef0d7b62cd8fa94fc9a88c4b14884f3
Author: EmmyMiao87 <52...@qq.com>
AuthorDate: Fri Oct 29 12:07:44 2021 +0800

    [Docs] Fix error KEY url (#6955)
---
 docs/en/downloads/downloads.md    | 2 +-
 docs/zh-CN/downloads/downloads.md | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/en/downloads/downloads.md b/docs/en/downloads/downloads.md
index 6ed6502..57faba5 100644
--- a/docs/en/downloads/downloads.md
+++ b/docs/en/downloads/downloads.md
@@ -37,6 +37,6 @@ You can download source code from following links, then compile and install Dori
 | 0.10.0 | 2019-07-02 | [Source](http://archive.apache.org/dist/incubator/doris/0.10.0-incubating/apache-doris-0.10.0-incubating-src.tar.gz) ([Signature](http://archive.apache.org/dist/incubator/doris/0.10.0-incubating/apache-doris-0.10.0-incubating-src.tar.gz.asc) [SHA512](http://archive.apache.org/dist/incubator/doris/0.10.0-incubating/apache-doris-0.10.0-incubating-src.tar.gz.sha512)) |
 | 0.9.0 | 2019-02-18 | [Source](http://archive.apache.org/dist/incubator/doris/0.9.0-incubating/apache-doris-0.9.0-incubating-src.tar.gz) ([Signature](http://archive.apache.org/dist/incubator/doris/0.9.0-incubating/apache-doris-0.9.0-incubating-src.tar.gz.asc) [SHA512](http://archive.apache.org/dist/incubator/doris/0.9.0-incubating/apache-doris-0.9.0-incubating-src.tar.gz.sha512)) |
  
-To verify the downloaded files, please read [Verify Apache Release](../community/verify-apache-release.html) and using these [KEYS](https://www.apache.org/dist/incubator/doris/KEYS).
+To verify the downloaded files, please read [Verify Apache Release](../community/verify-apache-release.html) and using these [KEYS](https://downloads.apache.org/incubator/doris/KEYS).
 
 After verification, please read [Compilation](../installing/compilation.html) and [Installation and deployment](../installing/install-deploy.html) to compile and install Doris.
diff --git a/docs/zh-CN/downloads/downloads.md b/docs/zh-CN/downloads/downloads.md
index 6f7e141..0bc8291 100644
--- a/docs/zh-CN/downloads/downloads.md
+++ b/docs/zh-CN/downloads/downloads.md
@@ -18,6 +18,6 @@
 | 0.10.0 | 2019-07-02 | [Source](http://archive.apache.org/dist/incubator/doris/0.10.0-incubating/apache-doris-0.10.0-incubating-src.tar.gz) ([Signature](http://archive.apache.org/dist/incubator/doris/0.10.0-incubating/apache-doris-0.10.0-incubating-src.tar.gz.asc) [SHA512](http://archive.apache.org/dist/incubator/doris/0.10.0-incubating/apache-doris-0.10.0-incubating-src.tar.gz.sha512)) |
 | 0.9.0 | 2019-02-18 | [Source](http://archive.apache.org/dist/incubator/doris/0.9.0-incubating/apache-doris-0.9.0-incubating-src.tar.gz) ([Signature](http://archive.apache.org/dist/incubator/doris/0.9.0-incubating/apache-doris-0.9.0-incubating-src.tar.gz.asc) [SHA512](http://archive.apache.org/dist/incubator/doris/0.9.0-incubating/apache-doris-0.9.0-incubating-src.tar.gz.sha512)) |
  
-关于如何校验下载文件,请参阅 [校验下载文件](../community/verify-apache-release.html),并使用这些[KEYS](https://www.apache.org/dist/incubator/doris/KEYS)。
+关于如何校验下载文件,请参阅 [校验下载文件](../community/verify-apache-release.html),并使用这些[KEYS](https://downloads.apache.org/incubator/doris/KEYS)。
 
 校验完成后,可以参阅 [编译文档](../installing/compilation.html) 以及 [安装与部署文档](../installing/install-deploy.html) 进行 Doris 的编译、安装与部署。

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 03/07: support use char like \x01 in flink-doris-sink column & line delimiter (#6937)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit b58ae3e02bf7ac0937cbfe502222002a91abb8d8
Author: wunan1210 <wu...@gmail.com>
AuthorDate: Fri Oct 29 13:56:52 2021 +0800

    support use char like \x01 in flink-doris-sink column & line delimiter (#6937)
    
    * support use char like \x01 in flink-doris-sink column & line delimiter
    
    * extend imports
    
    * add docs
---
 docs/en/extending-doris/flink-doris-connector.md   |  2 +-
 .../zh-CN/extending-doris/flink-doris-connector.md |  2 +-
 .../doris/flink/rest/models/RespContent.java       |  4 ++
 .../flink/table/DorisDynamicOutputFormat.java      | 44 +++++++++++++++++++---
 .../apache/doris/flink/table/DorisStreamLoad.java  |  3 +-
 5 files changed, 46 insertions(+), 9 deletions(-)

diff --git a/docs/en/extending-doris/flink-doris-connector.md b/docs/en/extending-doris/flink-doris-connector.md
index c42d237..961da90 100644
--- a/docs/en/extending-doris/flink-doris-connector.md
+++ b/docs/en/extending-doris/flink-doris-connector.md
@@ -257,7 +257,7 @@ outputFormat.close();
 | sink.batch.size                        | 100            | Maximum number of lines in a single write BE                                             |
 | sink.max-retries                        | 1            | Number of retries after writing BE failed                                              |
 | sink.batch.interval                         | 1s            | The flush interval, after which the asynchronous thread will write the data in the cache to BE. The default value is 1 second, and the time units are ms, s, min, h, and d. Set to 0 to turn off periodic writing. |
-| sink.properties.*     | --               | The stream load parameters.eg:sink.properties.column_separator' = ','.<br /> Support JSON format import, you need to enable both 'sink.properties.format' ='json' and 'sink.properties.strip_outer_array' ='true'|
+| sink.properties.*     | --               | The stream load parameters.eg:sink.properties.column_separator' = ','. Setting 'sink.properties.escape_delimiters' = 'true' if you want to use a control char as a separator, so that such as '\\x01' will translate to binary 0x01<br /> Support JSON format import, you need to enable both 'sink.properties.format' ='json' and 'sink.properties.strip_outer_array' ='true'|
 
 
 ## Doris & Flink Column Type Mapping
diff --git a/docs/zh-CN/extending-doris/flink-doris-connector.md b/docs/zh-CN/extending-doris/flink-doris-connector.md
index 9ea1eba..4a0a8cb 100644
--- a/docs/zh-CN/extending-doris/flink-doris-connector.md
+++ b/docs/zh-CN/extending-doris/flink-doris-connector.md
@@ -260,7 +260,7 @@ outputFormat.close();
 | sink.batch.size     | 100                | 单次写BE的最大行数        |
 | sink.max-retries     | 1                | 写BE失败之后的重试次数       |
 | sink.batch.interval     | 1s                | flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。|
-| sink.properties.*     | --               | Stream load 的导入参数。例如:'sink.properties.column_separator' = ','等。<br /> 支持JSON格式导入,需要同时开启'sink.properties.format' = 'json'和'sink.properties.strip_outer_array' = 'true' |
+| sink.properties.*     | --               | Stream load 的导入参数。例如:'sink.properties.column_separator' = ','等。如果需要特殊字符作为分隔符, 可以加上参数'sink.properties.escape_delimiters' = 'true', '\\x01'会被转换为二进制的0x01<br /> 支持JSON格式导入,需要同时开启'sink.properties.format' = 'json'和'sink.properties.strip_outer_array' = 'true' |
 
 
 
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
index b86b3dd..07a356c 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
@@ -93,4 +93,8 @@ public class RespContent {
         }
 
     }
+
+    public String getErrorURL() {
+        return ErrorURL;
+    }
 }
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 0fd154a..f4f49bd 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -38,11 +38,14 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.StringJoiner;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static org.apache.flink.table.data.RowData.createFieldGetter;
 
@@ -62,9 +65,11 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {
     private static final String FORMAT_KEY = "format";
     private static final String FORMAT_JSON_VALUE = "json";
     private static final String NULL_VALUE = "\\N";
+    private static final String ESCAPE_DELIMITERS_KEY = "escape_delimiters";
+    private static final String ESCAPE_DELIMITERS_DEFAULT = "false";
 
-    private final String fieldDelimiter;
-    private final String lineDelimiter;
+    private String fieldDelimiter;
+    private String lineDelimiter;
     private final String[] fieldNames;
     private final boolean jsonFormat;
     private DorisOptions options;
@@ -88,10 +93,26 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {
         this.options = option;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
-        this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY,
-                FIELD_DELIMITER_DEFAULT);
-        this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY,
-                LINE_DELIMITER_DEFAULT);
+
+        Properties streamLoadProp=executionOptions.getStreamLoadProp();
+
+        boolean ifEscape = Boolean.parseBoolean(streamLoadProp.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT));
+        if (ifEscape) {
+            this.fieldDelimiter = escapeString(streamLoadProp.getProperty(FIELD_DELIMITER_KEY,
+                    FIELD_DELIMITER_DEFAULT));
+            this.lineDelimiter = escapeString(streamLoadProp.getProperty(LINE_DELIMITER_KEY,
+                    LINE_DELIMITER_DEFAULT));
+
+            if (streamLoadProp.contains(ESCAPE_DELIMITERS_KEY)) {
+                streamLoadProp.remove(ESCAPE_DELIMITERS_KEY);
+            }
+        } else {
+            this.fieldDelimiter = streamLoadProp.getProperty(FIELD_DELIMITER_KEY,
+                    FIELD_DELIMITER_DEFAULT);
+            this.lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY,
+                    LINE_DELIMITER_DEFAULT);
+        }
+
         this.fieldNames = fieldNames;
         this.jsonFormat = FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
         this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
@@ -100,6 +121,17 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {
         }
     }
 
+    private String escapeString( String s) {
+            Pattern p = Pattern.compile("\\\\x(\\d{2})");
+            Matcher m = p.matcher(s);
+
+            StringBuffer buf = new StringBuffer();
+            while (m.find()) {
+                m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1))));
+            }
+            m.appendTail(buf);
+            return buf.toString();
+    }
 
     @Override
     public void configure(Configuration configuration) {
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
index c37e640..b897ff2 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
@@ -94,7 +94,8 @@ public class DorisStreamLoad implements Serializable {
             try {
                 RespContent respContent = OBJECT_MAPPER.readValue(loadResponse.respContent, RespContent.class);
                 if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
-                    throw new StreamLoadException("stream load error: " + respContent.getMessage());
+                    String errMsg=String.format("stream load error: %s, see more in %s",respContent.getMessage(),respContent.getErrorURL());
+                    throw new StreamLoadException(errMsg);
                 }
             } catch (IOException e) {
                 throw new StreamLoadException(e);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org