You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2021/03/03 13:04:40 UTC

[GitHub] [incubator-doris] weizuo93 opened a new pull request #5452: [Audit][Stream Load] Support audit function for stream load

weizuo93 opened a new pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452


   ## Proposed changes
   
   Record finished  stream load job (both successful job and failed job) into audit log so that we can see when the stream load job was executed and check the details of stream load jobs.
   
   ## Types of changes
   
   What types of changes does your code introduce to Doris?
   _Put an `x` in the boxes that apply_
   
   - [] Bugfix (non-breaking change which fixes an issue)
   - [x] New feature (non-breaking change which adds functionality)
   - [] Breaking change (fix or feature that would cause existing functionality to not work as expected)
   - [] Documentation Update (if none of the other choices apply)
   - [x] Code refactor (Modify the code structure, format the code, etc...)
   
   ## Checklist
   
   _Put an `x` in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code._
   
   - [] I have created an issue on (Fix #ISSUE) and described the bug/feature there in detail
   - [x] Compiling and unit tests pass locally with my changes
   - [] I have added tests that prove my fix is effective or that my feature works
   - [] If these changes need document changes, I have updated the document
   - [x] Any dependent changes have been merged
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r601974461



##########
File path: be/src/runtime/stream_load/stream_load_context.cpp
##########
@@ -89,6 +103,91 @@ std::string StreamLoadContext::to_json() const {
     return s.GetString();
 }
 
+void StreamLoadContext::parse_stream_load_record(std::string stream_load_record, TStreamLoadRecord& stream_load_item) {

Review comment:
       > 
   
   OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r608308183



##########
File path: be/src/runtime/stream_load/stream_load_recorder.cpp
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_recorder.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/slice_transform.h"
+#include "rocksdb/utilities/db_ttl.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+
+StreamLoadRecorder::StreamLoadRecorder(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr),
+          _last_compaction_time(UnixMillis()) {
+}
+
+StreamLoadRecorder::~StreamLoadRecorder() {
+    if (_db != nullptr) {
+        for (auto handle : _handles) {
+            _db->DestroyColumnFamilyHandle(handle);
+            handle = nullptr;
+        }
+        delete _db;
+        _db= nullptr;

Review comment:
       > 
   
   OK. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r601974829



##########
File path: be/src/runtime/stream_load/stream_load_record.cpp
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_record.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice_transform.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+const size_t PREFIX_LENGTH = 4;
+
+StreamLoadRecord::StreamLoadRecord(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr) {
+}
+
+StreamLoadRecord::~StreamLoadRecord() {
+    for (auto handle : _handles) {
+        _db->DestroyColumnFamilyHandle(handle);
+        handle = nullptr;
+    }
+    if (_db != nullptr) {
+        delete _db;
+        _db= nullptr;
+    }
+}
+
+Status StreamLoadRecord::init() {
+    // init db
+    rocksdb::DBOptions options;
+    options.IncreaseParallelism();
+    options.create_if_missing = true;
+    options.create_missing_column_families = true;
+    std::string db_path = _root_path + STREAM_LOAD_POSTFIX;
+    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+    // default column family is required
+    column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    // stream load column family add prefix extractor to improve performance and ensure correctness
+    rocksdb::ColumnFamilyOptions stream_load_column_family;
+    stream_load_column_family.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(PREFIX_LENGTH));
+    column_families.emplace_back(STREAM_LOAD_COLUMN_FAMILY, stream_load_column_family);
+    rocksdb::Status s = rocksdb::DB::Open(options, db_path, column_families, &_handles, &_db);
+    if (!s.ok() || _db == nullptr) {
+        LOG(WARNING) << "rocks db open failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb open failed");
+    }
+    return Status::OK();
+}
+
+Status StreamLoadRecord::put(const std::string& key, const std::string& value) {
+    rocksdb::ColumnFamilyHandle* handle = _handles[1];
+    rocksdb::WriteOptions write_options;
+    write_options.sync = false;
+    rocksdb::Status s = _db->Put(write_options, handle, rocksdb::Slice(key), rocksdb::Slice(value));
+    if (!s.ok()) {
+        LOG(WARNING) << "rocks db put key:" << key << " failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb put failed");
+    }
+    return Status::OK();
+}
+
+Status StreamLoadRecord::get_batch(const std::string& start, const int batch_size, std::map<std::string, std::string> &stream_load_records) {

Review comment:
       > Use pointer to indicate the passout parameter, like:
   > `std::map<std::string, std::string>* stream_load_records`
   
   OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r601975517



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import com.google.common.collect.ImmutableMap;

Review comment:
       > Reorder the import
   
   OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r608307702



##########
File path: be/src/runtime/CMakeLists.txt
##########
@@ -91,6 +91,7 @@ set(RUNTIME_FILES
     message_body_sink.cpp
     stream_load/stream_load_context.cpp
     stream_load/stream_load_executor.cpp
+        stream_load/stream_load_recorder.cpp

Review comment:
       > 
   
   OK. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r601974649



##########
File path: be/src/runtime/stream_load/stream_load_record.cpp
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_record.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice_transform.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+const size_t PREFIX_LENGTH = 4;
+
+StreamLoadRecord::StreamLoadRecord(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr) {
+}
+
+StreamLoadRecord::~StreamLoadRecord() {
+    for (auto handle : _handles) {
+        _db->DestroyColumnFamilyHandle(handle);
+        handle = nullptr;
+    }
+    if (_db != nullptr) {

Review comment:
       > This logic is strange. If we need to check `_db` is null here, than the above `_db->DestroyColumnFamilyHandle(handle);` also need to check.
   
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r608315555



##########
File path: be/src/runtime/stream_load/stream_load_recorder.cpp
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_recorder.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/slice_transform.h"
+#include "rocksdb/utilities/db_ttl.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+
+StreamLoadRecorder::StreamLoadRecorder(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr),
+          _last_compaction_time(UnixMillis()) {
+}
+
+StreamLoadRecorder::~StreamLoadRecorder() {
+    if (_db != nullptr) {
+        for (auto handle : _handles) {
+            _db->DestroyColumnFamilyHandle(handle);
+            handle = nullptr;
+        }
+        delete _db;
+        _db= nullptr;
+    }
+}
+
+Status StreamLoadRecorder::init() {
+    // init db
+    rocksdb::DBOptions options;
+    options.IncreaseParallelism();
+    options.create_if_missing = true;
+    options.create_missing_column_families = true;
+    std::string db_path = _root_path + STREAM_LOAD_POSTFIX;
+    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+    // default column family is required
+    column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    // stream load column family
+    column_families.emplace_back(STREAM_LOAD_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    std::vector<int32_t> ttls = {config::stream_load_record_expire_time_secs, config::stream_load_record_expire_time_secs};
+    rocksdb::Status s = rocksdb::DBWithTTL::Open(options, db_path, column_families, &_handles, &_db, ttls);
+
+    if (!s.ok() || _db == nullptr) {
+        LOG(WARNING) << "rocks db open failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb open failed");
+    }
+    return Status::OK();
+}
+
+Status StreamLoadRecorder::put(const std::string& key, const std::string& value) {
+    rocksdb::ColumnFamilyHandle* handle = _handles[1];
+    rocksdb::WriteOptions write_options;
+    write_options.sync = false;
+    rocksdb::Status s = _db->Put(write_options, handle, rocksdb::Slice(key), rocksdb::Slice(value));
+    if (!s.ok()) {
+        LOG(WARNING) << "rocks db put key:" << key << " failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb put failed");
+    }
+
+    if ((UnixMillis() - _last_compaction_time) / 1000 > config::clean_stream_load_record_interval_secs) {
+        rocksdb::CompactRangeOptions options;
+        s = _db->CompactRange(options, _handles[1], nullptr, nullptr);

Review comment:
       > The expired data will be GCed in rocksdb compaction procedure, not needed to compact it manually, and CompactRange may cost much resouce and time. Why you do that?
   
   We want to execute compaction as early as possible to delete expired data. Manually triggerring `CompactRange` can control the time of deleting expired data. In addition, `CompactRange` will be triggerred after the result of stream load returned to client so that it will not effect the process of stream load.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] acelyc111 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
acelyc111 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r607922735



##########
File path: be/src/runtime/CMakeLists.txt
##########
@@ -91,6 +91,7 @@ set(RUNTIME_FILES
     message_body_sink.cpp
     stream_load/stream_load_context.cpp
     stream_load/stream_load_executor.cpp
+        stream_load/stream_load_recorder.cpp

Review comment:
       ```suggestion
       stream_load/stream_load_recorder.cpp
   ```

##########
File path: be/src/http/action/stream_load.cpp
##########
@@ -533,4 +539,17 @@ Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_pa
     return Status::OK();
 }
 
+void StreamLoadAction::_sava_stream_load_record(StreamLoadContext* ctx, const std::string& str) {
+    auto stream_load_recorder = StorageEngine::instance()->get_stream_load_recorder();
+    if (stream_load_recorder != nullptr) {
+        std::string key = std::to_string(ctx->start_micros + ctx->load_cost_micros) + "_" + ctx->label;
+        auto st = stream_load_recorder->put(key, str);
+        if (st.ok()) {

Review comment:
       Also need to log when put into rocksdb failed

##########
File path: be/src/runtime/stream_load/stream_load_recorder.cpp
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_recorder.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/slice_transform.h"
+#include "rocksdb/utilities/db_ttl.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+
+StreamLoadRecorder::StreamLoadRecorder(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr),
+          _last_compaction_time(UnixMillis()) {
+}
+
+StreamLoadRecorder::~StreamLoadRecorder() {
+    if (_db != nullptr) {
+        for (auto handle : _handles) {
+            _db->DestroyColumnFamilyHandle(handle);
+            handle = nullptr;
+        }
+        delete _db;
+        _db= nullptr;
+    }
+}
+
+Status StreamLoadRecorder::init() {
+    // init db
+    rocksdb::DBOptions options;
+    options.IncreaseParallelism();
+    options.create_if_missing = true;
+    options.create_missing_column_families = true;
+    std::string db_path = _root_path + STREAM_LOAD_POSTFIX;
+    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+    // default column family is required
+    column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    // stream load column family
+    column_families.emplace_back(STREAM_LOAD_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    std::vector<int32_t> ttls = {config::stream_load_record_expire_time_secs, config::stream_load_record_expire_time_secs};
+    rocksdb::Status s = rocksdb::DBWithTTL::Open(options, db_path, column_families, &_handles, &_db, ttls);
+
+    if (!s.ok() || _db == nullptr) {
+        LOG(WARNING) << "rocks db open failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb open failed");
+    }
+    return Status::OK();
+}
+
+Status StreamLoadRecorder::put(const std::string& key, const std::string& value) {
+    rocksdb::ColumnFamilyHandle* handle = _handles[1];
+    rocksdb::WriteOptions write_options;
+    write_options.sync = false;
+    rocksdb::Status s = _db->Put(write_options, handle, rocksdb::Slice(key), rocksdb::Slice(value));
+    if (!s.ok()) {
+        LOG(WARNING) << "rocks db put key:" << key << " failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb put failed");

Review comment:
       Same as above.

##########
File path: be/src/runtime/stream_load/stream_load_recorder.cpp
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_recorder.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/slice_transform.h"
+#include "rocksdb/utilities/db_ttl.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+
+StreamLoadRecorder::StreamLoadRecorder(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr),
+          _last_compaction_time(UnixMillis()) {
+}
+
+StreamLoadRecorder::~StreamLoadRecorder() {
+    if (_db != nullptr) {
+        for (auto handle : _handles) {
+            _db->DestroyColumnFamilyHandle(handle);
+            handle = nullptr;
+        }
+        delete _db;
+        _db= nullptr;
+    }
+}
+
+Status StreamLoadRecorder::init() {
+    // init db
+    rocksdb::DBOptions options;
+    options.IncreaseParallelism();
+    options.create_if_missing = true;
+    options.create_missing_column_families = true;
+    std::string db_path = _root_path + STREAM_LOAD_POSTFIX;
+    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+    // default column family is required
+    column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    // stream load column family
+    column_families.emplace_back(STREAM_LOAD_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    std::vector<int32_t> ttls = {config::stream_load_record_expire_time_secs, config::stream_load_record_expire_time_secs};
+    rocksdb::Status s = rocksdb::DBWithTTL::Open(options, db_path, column_families, &_handles, &_db, ttls);
+
+    if (!s.ok() || _db == nullptr) {
+        LOG(WARNING) << "rocks db open failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb open failed");
+    }
+    return Status::OK();
+}
+
+Status StreamLoadRecorder::put(const std::string& key, const std::string& value) {
+    rocksdb::ColumnFamilyHandle* handle = _handles[1];
+    rocksdb::WriteOptions write_options;
+    write_options.sync = false;
+    rocksdb::Status s = _db->Put(write_options, handle, rocksdb::Slice(key), rocksdb::Slice(value));
+    if (!s.ok()) {
+        LOG(WARNING) << "rocks db put key:" << key << " failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb put failed");
+    }
+
+    if ((UnixMillis() - _last_compaction_time) / 1000 > config::clean_stream_load_record_interval_secs) {
+        rocksdb::CompactRangeOptions options;
+        s = _db->CompactRange(options, _handles[1], nullptr, nullptr);

Review comment:
       The expired data will be GCed in rocksdb compaction procedure, not needed to compact it manually, and CompactRange may cost much resouce and time. Why you do that?

##########
File path: be/src/runtime/stream_load/stream_load_recorder.cpp
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_recorder.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/slice_transform.h"
+#include "rocksdb/utilities/db_ttl.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+
+StreamLoadRecorder::StreamLoadRecorder(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr),
+          _last_compaction_time(UnixMillis()) {
+}
+
+StreamLoadRecorder::~StreamLoadRecorder() {
+    if (_db != nullptr) {
+        for (auto handle : _handles) {
+            _db->DestroyColumnFamilyHandle(handle);
+            handle = nullptr;
+        }
+        delete _db;
+        _db= nullptr;

Review comment:
       ```suggestion
           _db = nullptr;
   ```

##########
File path: be/src/runtime/stream_load/stream_load_recorder.h
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <map>
+#include <string>
+
+#include "rocksdb/utilities/db_ttl.h"
+
+#pragma once
+
+namespace doris {
+
+class Status;
+
+class StreamLoadRecorder {
+public:
+    StreamLoadRecorder(const std::string& root_path);
+
+    virtual ~StreamLoadRecorder();
+
+    Status init();
+
+    Status put(const std::string& key, const std::string& value);
+
+    Status get_batch(const std::string& start, const int batch_size, std::map<std::string, std::string>* stream_load_records);
+
+private:
+    std::string _root_path;
+    rocksdb::DBWithTTL* _db;
+    std::vector<rocksdb::ColumnFamilyHandle*> _handles;
+
+    int64_t _last_compaction_time;
+
+    enum ColumnFamilyIndex {
+        DEFAULT_COLUMN_FAMILY_INDEX = 0,
+        STREAM_LOAD_COLUMN_FAMILY_INDEX
+    };
+
+    const std::string DEFAULT_COLUMN_FAMILY = "default";
+    const std::string STREAM_LOAD_COLUMN_FAMILY = "stream_load";

Review comment:
       Why you use multiple column families? you can use the default column family directly.

##########
File path: be/src/runtime/stream_load/stream_load_recorder.cpp
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_recorder.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/slice_transform.h"
+#include "rocksdb/utilities/db_ttl.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+
+StreamLoadRecorder::StreamLoadRecorder(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr),
+          _last_compaction_time(UnixMillis()) {
+}
+
+StreamLoadRecorder::~StreamLoadRecorder() {
+    if (_db != nullptr) {
+        for (auto handle : _handles) {
+            _db->DestroyColumnFamilyHandle(handle);
+            handle = nullptr;
+        }
+        delete _db;
+        _db= nullptr;
+    }
+}
+
+Status StreamLoadRecorder::init() {
+    // init db
+    rocksdb::DBOptions options;
+    options.IncreaseParallelism();
+    options.create_if_missing = true;
+    options.create_missing_column_families = true;
+    std::string db_path = _root_path + STREAM_LOAD_POSTFIX;
+    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+    // default column family is required
+    column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    // stream load column family
+    column_families.emplace_back(STREAM_LOAD_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    std::vector<int32_t> ttls = {config::stream_load_record_expire_time_secs, config::stream_load_record_expire_time_secs};
+    rocksdb::Status s = rocksdb::DBWithTTL::Open(options, db_path, column_families, &_handles, &_db, ttls);
+
+    if (!s.ok() || _db == nullptr) {
+        LOG(WARNING) << "rocks db open failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb open failed");

Review comment:
       You can append s.ToString() to the returned value, then the caller could know the error details, or he should grep the log to find the root cause.

##########
File path: be/src/runtime/stream_load/stream_load_recorder.cpp
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_recorder.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/slice_transform.h"
+#include "rocksdb/utilities/db_ttl.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+
+StreamLoadRecorder::StreamLoadRecorder(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr),
+          _last_compaction_time(UnixMillis()) {
+}
+
+StreamLoadRecorder::~StreamLoadRecorder() {
+    if (_db != nullptr) {
+        for (auto handle : _handles) {
+            _db->DestroyColumnFamilyHandle(handle);
+            handle = nullptr;
+        }
+        delete _db;
+        _db= nullptr;
+    }
+}
+
+Status StreamLoadRecorder::init() {
+    // init db
+    rocksdb::DBOptions options;
+    options.IncreaseParallelism();
+    options.create_if_missing = true;
+    options.create_missing_column_families = true;
+    std::string db_path = _root_path + STREAM_LOAD_POSTFIX;
+    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+    // default column family is required
+    column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    // stream load column family
+    column_families.emplace_back(STREAM_LOAD_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    std::vector<int32_t> ttls = {config::stream_load_record_expire_time_secs, config::stream_load_record_expire_time_secs};
+    rocksdb::Status s = rocksdb::DBWithTTL::Open(options, db_path, column_families, &_handles, &_db, ttls);
+

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#issuecomment-807896697


   > And I pulled and run your PR, submit several stream load.
   > I can see log `put stream_load_record rocksdb successfully. label: f0130520-835a-4d05-a38d-95f67f5b8c7b` in be.INFO.
   > And I can see log `finished to pull stream load records of all backend` in fe.log.
   > 
   > But there is nothing got from BE. I don't know why, maybe you need to check it.
   
   I test again. There should be nothing wrong.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] acelyc111 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
acelyc111 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r600381383



##########
File path: be/src/common/config.h
##########
@@ -348,6 +348,12 @@ CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200");
 CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60");
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing data.
 CONF_mBool(tablet_writer_ignore_eovercrowded, "false");
+// batch size of stream load record reported to FE
+CONF_mInt32(stream_load_record_batch_size, "50");
+// expire time of stream load record in rocksdb.
+CONF_mInt32(stream_load_record_expire_time_secs, "28800");

Review comment:
       stream_load_record_expire_time_secs is not configurable at runtime.

##########
File path: be/src/runtime/stream_load/stream_load_record.cpp
##########
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_record.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/slice_transform.h"
+#include "rocksdb/utilities/db_ttl.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+
+StreamLoadRecord::StreamLoadRecord(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr),
+          _last_compaction_time(UnixMillis()) {
+}
+
+StreamLoadRecord::~StreamLoadRecord() {
+    if (_db != nullptr) {
+        for (auto handle : _handles) {
+            _db->DestroyColumnFamilyHandle(handle);
+            handle = nullptr;
+        }
+        delete _db;
+        _db= nullptr;
+    }
+}
+
+Status StreamLoadRecord::init() {
+    // init db
+    rocksdb::DBOptions options;
+    options.IncreaseParallelism();
+    options.create_if_missing = true;
+    options.create_missing_column_families = true;
+    std::string db_path = _root_path + STREAM_LOAD_POSTFIX;
+    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+    // default column family is required
+    column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    // stream load column family
+    column_families.emplace_back(STREAM_LOAD_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    std::vector<int32_t> ttls = {config::stream_load_record_expire_time_secs, config::stream_load_record_expire_time_secs};
+    rocksdb::Status s = rocksdb::DBWithTTL::Open(options, db_path, column_families, &_handles, &_db, ttls);
+
+    if (!s.ok() || _db == nullptr) {
+        LOG(WARNING) << "rocks db open failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb open failed");
+    }
+    return Status::OK();
+}
+
+Status StreamLoadRecord::put(const std::string& key, const std::string& value) {
+    rocksdb::ColumnFamilyHandle* handle = _handles[1];
+    rocksdb::WriteOptions write_options;
+    write_options.sync = false;
+    rocksdb::Status s = _db->Put(write_options, handle, rocksdb::Slice(key), rocksdb::Slice(value));
+    if (!s.ok()) {
+        LOG(WARNING) << "rocks db put key:" << key << " failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb put failed");
+    }
+
+    if ((UnixMillis() - _last_compaction_time) / 1000 > config::clean_stream_load_record_interval_secs) {
+        rocksdb::CompactRangeOptions options;
+        s = _db->CompactRange(options, _handles[1], nullptr, nullptr);

Review comment:
       CompactRange may consume a long time, better to not invoke it on load path.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
morningman commented on pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#issuecomment-803596560


   And I pulled and run your PR, submit several stream load.
   I can see log `put stream_load_record rocksdb successfully. label: f0130520-835a-4d05-a38d-95f67f5b8c7b` in be.INFO.
   And I can see log `finished to pull stream load records of all backend` in fe.log.
   
   But there is nothing got from BE. I don't know why, maybe you need to check it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r598289847



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.plugin.AuditEvent;
+import org.apache.doris.plugin.AuditEvent.EventType;
+import org.apache.doris.plugin.StreamLoadAuditEvent;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStreamLoadRecord;
+import org.apache.doris.thrift.TStreamLoadRecordResult;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class StreamLoadRecordMgr {
+    private static final Logger LOG = LogManager.getLogger(StreamLoadRecordMgr.class);
+    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public StreamLoadRecordMgr() {
+        Thread pullStreamLoadRecordThread = new Thread(new PullStreamLoadRecordThread());
+        pullStreamLoadRecordThread.start();
+    }
+
+    private class PullStreamLoadRecordThread implements Runnable {
+        @Override
+        public void run() {
+            ImmutableMap<Long, Backend> backends = Catalog.getCurrentSystemInfo().getIdToBackend();
+
+            while (true) {
+                long start = System.currentTimeMillis();
+                for (Backend backend : backends.values()) {
+                    BackendService.Client client = null;
+                    TNetworkAddress address = null;
+                    boolean ok = false;
+                    try {
+                        address = new TNetworkAddress(backend.getHost(), backend.getBePort());
+                        client = ClientPool.backendPool.borrowObject(address);
+                        TStreamLoadRecordResult result = client.getStreamLoadRecord(backend.getLastStreamLoadTime());
+                        Map<String, TStreamLoadRecord> streamLoadRecordBatch = result.getStreamLoadRecord();
+                        LOG.info("receive stream load audit info from backend: {}. batch size: {}", backend.getHost(), streamLoadRecordBatch.size());
+                        for (Map.Entry<String, TStreamLoadRecord> entry : streamLoadRecordBatch.entrySet()) {
+                            TStreamLoadRecord streamLoadItem= entry.getValue();
+                            LOG.info("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," +
+                                            " status: {}, message: {}, error_url: {}, total_rows: {}, loaded_rows: {}, filtered_rows: {}," +
+                                            " unselected_rows: {}, load_bytes: {}, start_time: {}, finish_time: {}.",
+                                    backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(), streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(),
+                                    streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(), streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(),
+                                    streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(), streamLoadItem.getLoadBytes(), streamLoadItem.getStartTime(),
+                                    streamLoadItem.getFinishTime());
+
+                            AuditEvent auditEvent = new StreamLoadAuditEvent.AuditEventBuilder().setEventType(EventType.STREAM_LOAD_FINISH)
+                                    .setLabel(streamLoadItem.getLabel()).setDb(streamLoadItem.getDb()).setTable(streamLoadItem.getTbl())
+                                    .setUser(streamLoadItem.getUser()).setClientIp(streamLoadItem.getUserIp()).setStatus(streamLoadItem.getStatus())
+                                    .setMessage(streamLoadItem.getMessage()).setUrl(streamLoadItem.getUrl()).setTotalRows(streamLoadItem.getTotalRows())
+                                    .setLoadedRows( streamLoadItem.getLoadedRows()).setFilteredRows(streamLoadItem.getFilteredRows())
+                                    .setUnselectedRows(streamLoadItem.getUnselectedRows()).setLoadBytes(streamLoadItem.getLoadBytes())
+                                    .setStartTime(streamLoadItem.getStartTime()).setFinishTime(streamLoadItem.getFinishTime())
+                                    .build();
+                            Catalog.getCurrentCatalog().getAuditEventProcessor().handleAuditEvent(auditEvent);
+                            if (entry.getKey().compareTo(backend.getLastStreamLoadTime()) > 0) {
+                                backend.setLastStreamLoadTime(entry.getKey());
+                            }
+                        }
+                        ok = true;
+                    } catch (Exception e) {
+                        LOG.warn("task exec error. backend[{}]", backend.getId(), e);
+                    } finally {
+                        if (ok) {
+                            ClientPool.backendPool.returnObject(address, client);
+                        } else {
+                            ClientPool.backendPool.invalidateObject(address, client);
+                        }
+                    }
+                }
+                LOG.info("finished to pull stream load records of all backends. cost: {} ms",

Review comment:
       You can add the number of records in this log

##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.plugin.AuditEvent;
+import org.apache.doris.plugin.AuditEvent.EventType;
+import org.apache.doris.plugin.StreamLoadAuditEvent;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStreamLoadRecord;
+import org.apache.doris.thrift.TStreamLoadRecordResult;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class StreamLoadRecordMgr {
+    private static final Logger LOG = LogManager.getLogger(StreamLoadRecordMgr.class);
+    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public StreamLoadRecordMgr() {
+        Thread pullStreamLoadRecordThread = new Thread(new PullStreamLoadRecordThread());
+        pullStreamLoadRecordThread.start();

Review comment:
       Better not start the worker thread in constructor. Use a `start()` method instead.
   And I think this thread can only be started in Master FE?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 removed a comment on pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 removed a comment on pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#issuecomment-807896447


   > And I pulled and run your PR, submit several stream load.
   > I can see log `put stream_load_record rocksdb successfully. label: f0130520-835a-4d05-a38d-95f67f5b8c7b` in be.INFO.
   > And I can see log `finished to pull stream load records of all backend` in fe.log.
   > 
   > But there is nothing got from BE. I don't know why, maybe you need to check it.
   
   I test again. There should be nothing wrong.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r608676819



##########
File path: be/src/service/backend_service.cpp
##########
@@ -312,4 +314,25 @@ void BackendService::close_scanner(TScanCloseResult& result_, const TScanClosePa
     result_.status = t_status;
 }
 
+void BackendService::get_stream_load_record(TStreamLoadRecordResult& result, const int64_t last_stream_record_time) {

Review comment:
       Use `TStreamLoadRecordResult* result` as pass out parameter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r607462809



##########
File path: be/src/runtime/stream_load/stream_load_record.h
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <map>
+#include <string>
+
+#include "rocksdb/utilities/db_ttl.h"
+
+#pragma once
+
+namespace doris {
+
+class Status;
+
+class StreamLoadRecord {

Review comment:
       > Better name as `StreamLoadRecorder`
   
   OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman merged pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
morningman merged pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r601977468



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.plugin.AuditEvent;
+import org.apache.doris.plugin.AuditEvent.EventType;
+import org.apache.doris.plugin.StreamLoadAuditEvent;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStreamLoadRecord;
+import org.apache.doris.thrift.TStreamLoadRecordResult;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class StreamLoadRecordMgr {
+    private static final Logger LOG = LogManager.getLogger(StreamLoadRecordMgr.class);
+    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public StreamLoadRecordMgr() {
+        Thread pullStreamLoadRecordThread = new Thread(new PullStreamLoadRecordThread());
+        pullStreamLoadRecordThread.start();

Review comment:
       > Better not start the worker thread in constructor. Use a `start()` method instead.
   > And I think this thread can only be started in Master FE?
   
   Thanks. I have modified the code. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r601973743



##########
File path: be/src/http/action/stream_load.cpp
##########
@@ -132,9 +134,19 @@ void StreamLoadAction::handle(HttpRequest* req) {
     str = str + '\n';
     HttpChannel::send_reply(req, str);
 
+    auto stream_load_record = StorageEngine::instance()->get_stream_load_record();

Review comment:
       > Extract a method for here and the same code in `on_header()`?
   
   OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r608311258



##########
File path: be/src/runtime/stream_load/stream_load_recorder.cpp
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_recorder.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/slice_transform.h"
+#include "rocksdb/utilities/db_ttl.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+
+StreamLoadRecorder::StreamLoadRecorder(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr),
+          _last_compaction_time(UnixMillis()) {
+}
+
+StreamLoadRecorder::~StreamLoadRecorder() {
+    if (_db != nullptr) {
+        for (auto handle : _handles) {
+            _db->DestroyColumnFamilyHandle(handle);
+            handle = nullptr;
+        }
+        delete _db;
+        _db= nullptr;
+    }
+}
+
+Status StreamLoadRecorder::init() {
+    // init db
+    rocksdb::DBOptions options;
+    options.IncreaseParallelism();
+    options.create_if_missing = true;
+    options.create_missing_column_families = true;
+    std::string db_path = _root_path + STREAM_LOAD_POSTFIX;
+    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+    // default column family is required
+    column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    // stream load column family
+    column_families.emplace_back(STREAM_LOAD_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    std::vector<int32_t> ttls = {config::stream_load_record_expire_time_secs, config::stream_load_record_expire_time_secs};
+    rocksdb::Status s = rocksdb::DBWithTTL::Open(options, db_path, column_families, &_handles, &_db, ttls);
+
+    if (!s.ok() || _db == nullptr) {
+        LOG(WARNING) << "rocks db open failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb open failed");
+    }
+    return Status::OK();
+}
+
+Status StreamLoadRecorder::put(const std::string& key, const std::string& value) {
+    rocksdb::ColumnFamilyHandle* handle = _handles[1];
+    rocksdb::WriteOptions write_options;
+    write_options.sync = false;
+    rocksdb::Status s = _db->Put(write_options, handle, rocksdb::Slice(key), rocksdb::Slice(value));
+    if (!s.ok()) {
+        LOG(WARNING) << "rocks db put key:" << key << " failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb put failed");

Review comment:
       > Same as above.
   
   OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r601984474



##########
File path: be/src/runtime/stream_load/stream_load_record.cpp
##########
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_record.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/slice_transform.h"
+#include "rocksdb/utilities/db_ttl.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+
+StreamLoadRecord::StreamLoadRecord(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr),
+          _last_compaction_time(UnixMillis()) {
+}
+
+StreamLoadRecord::~StreamLoadRecord() {
+    if (_db != nullptr) {
+        for (auto handle : _handles) {
+            _db->DestroyColumnFamilyHandle(handle);
+            handle = nullptr;
+        }
+        delete _db;
+        _db= nullptr;
+    }
+}
+
+Status StreamLoadRecord::init() {
+    // init db
+    rocksdb::DBOptions options;
+    options.IncreaseParallelism();
+    options.create_if_missing = true;
+    options.create_missing_column_families = true;
+    std::string db_path = _root_path + STREAM_LOAD_POSTFIX;
+    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+    // default column family is required
+    column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    // stream load column family
+    column_families.emplace_back(STREAM_LOAD_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    std::vector<int32_t> ttls = {config::stream_load_record_expire_time_secs, config::stream_load_record_expire_time_secs};
+    rocksdb::Status s = rocksdb::DBWithTTL::Open(options, db_path, column_families, &_handles, &_db, ttls);
+
+    if (!s.ok() || _db == nullptr) {
+        LOG(WARNING) << "rocks db open failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb open failed");
+    }
+    return Status::OK();
+}
+
+Status StreamLoadRecord::put(const std::string& key, const std::string& value) {
+    rocksdb::ColumnFamilyHandle* handle = _handles[1];
+    rocksdb::WriteOptions write_options;
+    write_options.sync = false;
+    rocksdb::Status s = _db->Put(write_options, handle, rocksdb::Slice(key), rocksdb::Slice(value));
+    if (!s.ok()) {
+        LOG(WARNING) << "rocks db put key:" << key << " failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb put failed");
+    }
+
+    if ((UnixMillis() - _last_compaction_time) / 1000 > config::clean_stream_load_record_interval_secs) {
+        rocksdb::CompactRangeOptions options;
+        s = _db->CompactRange(options, _handles[1], nullptr, nullptr);

Review comment:
       > CompactRange may consume a long time, better to not invoke it on load path.
   
   The result of stream load has been returned to the client before `CompactRange`. Thus, this operator does not affect the load process. In addition, `CompactRange` will be done periodically and does not happen every time the LOAD ends.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#issuecomment-807896447


   > And I pulled and run your PR, submit several stream load.
   > I can see log `put stream_load_record rocksdb successfully. label: f0130520-835a-4d05-a38d-95f67f5b8c7b` in be.INFO.
   > And I can see log `finished to pull stream load records of all backend` in fe.log.
   > 
   > But there is nothing got from BE. I don't know why, maybe you need to check it.
   
   I test again. There should be nothing wrong.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r601975580



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.plugin.AuditEvent;
+import org.apache.doris.plugin.AuditEvent.EventType;
+import org.apache.doris.plugin.StreamLoadAuditEvent;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStreamLoadRecord;
+import org.apache.doris.thrift.TStreamLoadRecordResult;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class StreamLoadRecordMgr {
+    private static final Logger LOG = LogManager.getLogger(StreamLoadRecordMgr.class);
+    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public StreamLoadRecordMgr() {
+        Thread pullStreamLoadRecordThread = new Thread(new PullStreamLoadRecordThread());
+        pullStreamLoadRecordThread.start();
+    }
+
+    private class PullStreamLoadRecordThread implements Runnable {
+        @Override
+        public void run() {
+            ImmutableMap<Long, Backend> backends = Catalog.getCurrentSystemInfo().getIdToBackend();
+
+            while (true) {
+                long start = System.currentTimeMillis();
+                for (Backend backend : backends.values()) {
+                    BackendService.Client client = null;
+                    TNetworkAddress address = null;
+                    boolean ok = false;
+                    try {
+                        address = new TNetworkAddress(backend.getHost(), backend.getBePort());
+                        client = ClientPool.backendPool.borrowObject(address);
+                        TStreamLoadRecordResult result = client.getStreamLoadRecord(backend.getLastStreamLoadTime());
+                        Map<String, TStreamLoadRecord> streamLoadRecordBatch = result.getStreamLoadRecord();
+                        LOG.info("receive stream load audit info from backend: {}. batch size: {}", backend.getHost(), streamLoadRecordBatch.size());
+                        for (Map.Entry<String, TStreamLoadRecord> entry : streamLoadRecordBatch.entrySet()) {
+                            TStreamLoadRecord streamLoadItem= entry.getValue();
+                            LOG.info("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," +

Review comment:
       > Use debug level
   
   OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r598279486



##########
File path: be/src/common/config.h
##########
@@ -348,6 +348,12 @@ CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200");
 CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60");
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing data.
 CONF_mBool(tablet_writer_ignore_eovercrowded, "false");
+// batch size of stream load record reported to FE
+CONF_mInt32(stream_load_record_batch_size, "50");
+// expire time of stream load record in rocksdb. 1000*1000*60*60*8=28800000000(8 hour)
+CONF_mInt64(stream_load_record_expire_time_us, "28800000000");

Review comment:
       better use more readable config, I suggest using HOUR.

##########
File path: be/src/olap/storage_engine.cpp
##########
@@ -225,6 +226,35 @@ Status StorageEngine::_init_store_map() {
     for (auto store : tmp_stores) {
         _store_map.emplace(store->path(), store);
     }
+
+    auto st = _init_stream_load_record();
+    if (!st.ok()) {
+        LOG(WARNING) << "status=" << st.to_string();

Review comment:
       Do we need to stop the startup if this error happen?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.plugin.AuditEvent;
+import org.apache.doris.plugin.AuditEvent.EventType;
+import org.apache.doris.plugin.StreamLoadAuditEvent;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStreamLoadRecord;
+import org.apache.doris.thrift.TStreamLoadRecordResult;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class StreamLoadRecordMgr {
+    private static final Logger LOG = LogManager.getLogger(StreamLoadRecordMgr.class);
+    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public StreamLoadRecordMgr() {
+        Thread pullStreamLoadRecordThread = new Thread(new PullStreamLoadRecordThread());
+        pullStreamLoadRecordThread.start();
+    }
+
+    private class PullStreamLoadRecordThread implements Runnable {
+        @Override
+        public void run() {
+            ImmutableMap<Long, Backend> backends = Catalog.getCurrentSystemInfo().getIdToBackend();
+
+            while (true) {
+                long start = System.currentTimeMillis();
+                for (Backend backend : backends.values()) {
+                    BackendService.Client client = null;
+                    TNetworkAddress address = null;
+                    boolean ok = false;
+                    try {
+                        address = new TNetworkAddress(backend.getHost(), backend.getBePort());
+                        client = ClientPool.backendPool.borrowObject(address);
+                        TStreamLoadRecordResult result = client.getStreamLoadRecord(backend.getLastStreamLoadTime());
+                        Map<String, TStreamLoadRecord> streamLoadRecordBatch = result.getStreamLoadRecord();
+                        LOG.info("receive stream load audit info from backend: {}. batch size: {}", backend.getHost(), streamLoadRecordBatch.size());
+                        for (Map.Entry<String, TStreamLoadRecord> entry : streamLoadRecordBatch.entrySet()) {
+                            TStreamLoadRecord streamLoadItem= entry.getValue();
+                            LOG.info("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," +

Review comment:
       Use debug level

##########
File path: be/src/http/action/stream_load.cpp
##########
@@ -132,9 +134,19 @@ void StreamLoadAction::handle(HttpRequest* req) {
     str = str + '\n';
     HttpChannel::send_reply(req, str);
 
+    auto stream_load_record = StorageEngine::instance()->get_stream_load_record();

Review comment:
       Extract a method for here and the same code in `on_header()`?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import com.google.common.collect.ImmutableMap;

Review comment:
       Reorder the import

##########
File path: fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
##########
@@ -95,6 +95,8 @@
     // additional backendStatus information for BE, display in JSON format
     private BackendStatus backendStatus = new BackendStatus();
 
+    private String lastStreamLoadTime = "";

Review comment:
       Why not using timestamp as Long?
   And this field is not persisted. So every time the FE restart, this field will be reset to
   `1970`?

##########
File path: be/src/runtime/stream_load/stream_load_context.cpp
##########
@@ -89,6 +103,91 @@ std::string StreamLoadContext::to_json() const {
     return s.GetString();
 }
 
+void StreamLoadContext::parse_stream_load_record(std::string stream_load_record, TStreamLoadRecord& stream_load_item) {

Review comment:
       ```suggestion
   void StreamLoadContext::parse_stream_load_record(const std::string& stream_load_record, TStreamLoadRecord& stream_load_item) {
   ```

##########
File path: be/src/runtime/stream_load/stream_load_record.cpp
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_record.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice_transform.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+const size_t PREFIX_LENGTH = 4;
+
+StreamLoadRecord::StreamLoadRecord(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr) {
+}
+
+StreamLoadRecord::~StreamLoadRecord() {
+    for (auto handle : _handles) {
+        _db->DestroyColumnFamilyHandle(handle);
+        handle = nullptr;
+    }
+    if (_db != nullptr) {
+        delete _db;
+        _db= nullptr;
+    }
+}
+
+Status StreamLoadRecord::init() {
+    // init db
+    rocksdb::DBOptions options;
+    options.IncreaseParallelism();
+    options.create_if_missing = true;
+    options.create_missing_column_families = true;
+    std::string db_path = _root_path + STREAM_LOAD_POSTFIX;
+    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+    // default column family is required
+    column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    // stream load column family add prefix extractor to improve performance and ensure correctness
+    rocksdb::ColumnFamilyOptions stream_load_column_family;
+    stream_load_column_family.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(PREFIX_LENGTH));
+    column_families.emplace_back(STREAM_LOAD_COLUMN_FAMILY, stream_load_column_family);
+    rocksdb::Status s = rocksdb::DB::Open(options, db_path, column_families, &_handles, &_db);
+    if (!s.ok() || _db == nullptr) {
+        LOG(WARNING) << "rocks db open failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb open failed");
+    }
+    return Status::OK();
+}
+
+Status StreamLoadRecord::put(const std::string& key, const std::string& value) {
+    rocksdb::ColumnFamilyHandle* handle = _handles[1];
+    rocksdb::WriteOptions write_options;
+    write_options.sync = false;
+    rocksdb::Status s = _db->Put(write_options, handle, rocksdb::Slice(key), rocksdb::Slice(value));
+    if (!s.ok()) {
+        LOG(WARNING) << "rocks db put key:" << key << " failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb put failed");
+    }
+    return Status::OK();
+}
+
+Status StreamLoadRecord::get_batch(const std::string& start, const int batch_size, std::map<std::string, std::string> &stream_load_records) {
+    rocksdb::ColumnFamilyHandle* handle = _handles[1];
+    std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(rocksdb::ReadOptions(), handle));
+    if (start == "") {
+        it->SeekToFirst();
+    } else {
+        it->Seek(start);
+        rocksdb::Status status = it->status();
+        if (!status.ok()) {
+            it->SeekToFirst();
+        }
+    }
+    rocksdb::Status status = it->status();
+    if (!status.ok()) {
+        LOG(WARNING) << "rocksdb seek failed. reason:" << status.ToString();
+        return Status::InternalError("Stream load record rocksdb seek failed");
+    }
+    int num = 0;
+    for (it->Next(); it->Valid(); it->Next()) {
+        std::string key = it->key().ToString();
+        std::string value = it->value().ToString();
+        stream_load_records[key] = value;
+        num++;
+        if (num >= batch_size) {
+            return Status::OK();
+        }
+    }
+    return Status::OK();
+}
+
+Status StreamLoadRecord::clean_expired_stream_load_record() {

Review comment:
       How about using TTL feature of rocksdb to do this.

##########
File path: be/src/runtime/stream_load/stream_load_record.cpp
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_record.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice_transform.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+const size_t PREFIX_LENGTH = 4;
+
+StreamLoadRecord::StreamLoadRecord(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr) {
+}
+
+StreamLoadRecord::~StreamLoadRecord() {
+    for (auto handle : _handles) {
+        _db->DestroyColumnFamilyHandle(handle);
+        handle = nullptr;
+    }
+    if (_db != nullptr) {

Review comment:
       This logic is strange. If we need to check `_db` is null here, than the above `_db->DestroyColumnFamilyHandle(handle);` also need to check.

##########
File path: be/src/runtime/stream_load/stream_load_record.cpp
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_record.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice_transform.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+const size_t PREFIX_LENGTH = 4;
+
+StreamLoadRecord::StreamLoadRecord(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr) {
+}
+
+StreamLoadRecord::~StreamLoadRecord() {
+    for (auto handle : _handles) {
+        _db->DestroyColumnFamilyHandle(handle);
+        handle = nullptr;
+    }
+    if (_db != nullptr) {
+        delete _db;
+        _db= nullptr;
+    }
+}
+
+Status StreamLoadRecord::init() {
+    // init db
+    rocksdb::DBOptions options;
+    options.IncreaseParallelism();
+    options.create_if_missing = true;
+    options.create_missing_column_families = true;
+    std::string db_path = _root_path + STREAM_LOAD_POSTFIX;
+    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+    // default column family is required
+    column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    // stream load column family add prefix extractor to improve performance and ensure correctness
+    rocksdb::ColumnFamilyOptions stream_load_column_family;
+    stream_load_column_family.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(PREFIX_LENGTH));
+    column_families.emplace_back(STREAM_LOAD_COLUMN_FAMILY, stream_load_column_family);
+    rocksdb::Status s = rocksdb::DB::Open(options, db_path, column_families, &_handles, &_db);
+    if (!s.ok() || _db == nullptr) {
+        LOG(WARNING) << "rocks db open failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb open failed");
+    }
+    return Status::OK();
+}
+
+Status StreamLoadRecord::put(const std::string& key, const std::string& value) {
+    rocksdb::ColumnFamilyHandle* handle = _handles[1];
+    rocksdb::WriteOptions write_options;
+    write_options.sync = false;
+    rocksdb::Status s = _db->Put(write_options, handle, rocksdb::Slice(key), rocksdb::Slice(value));
+    if (!s.ok()) {
+        LOG(WARNING) << "rocks db put key:" << key << " failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb put failed");
+    }
+    return Status::OK();
+}
+
+Status StreamLoadRecord::get_batch(const std::string& start, const int batch_size, std::map<std::string, std::string> &stream_load_records) {

Review comment:
       Use pointer to indicate the passout parameter, like:
   `std::map<std::string, std::string>* stream_load_records`

##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.plugin.AuditEvent;
+import org.apache.doris.plugin.AuditEvent.EventType;
+import org.apache.doris.plugin.StreamLoadAuditEvent;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStreamLoadRecord;
+import org.apache.doris.thrift.TStreamLoadRecordResult;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class StreamLoadRecordMgr {
+    private static final Logger LOG = LogManager.getLogger(StreamLoadRecordMgr.class);
+    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public StreamLoadRecordMgr() {
+        Thread pullStreamLoadRecordThread = new Thread(new PullStreamLoadRecordThread());
+        pullStreamLoadRecordThread.start();
+    }
+
+    private class PullStreamLoadRecordThread implements Runnable {
+        @Override
+        public void run() {
+            ImmutableMap<Long, Backend> backends = Catalog.getCurrentSystemInfo().getIdToBackend();
+
+            while (true) {
+                long start = System.currentTimeMillis();
+                for (Backend backend : backends.values()) {
+                    BackendService.Client client = null;
+                    TNetworkAddress address = null;
+                    boolean ok = false;
+                    try {
+                        address = new TNetworkAddress(backend.getHost(), backend.getBePort());
+                        client = ClientPool.backendPool.borrowObject(address);
+                        TStreamLoadRecordResult result = client.getStreamLoadRecord(backend.getLastStreamLoadTime());
+                        Map<String, TStreamLoadRecord> streamLoadRecordBatch = result.getStreamLoadRecord();
+                        LOG.info("receive stream load audit info from backend: {}. batch size: {}", backend.getHost(), streamLoadRecordBatch.size());
+                        for (Map.Entry<String, TStreamLoadRecord> entry : streamLoadRecordBatch.entrySet()) {
+                            TStreamLoadRecord streamLoadItem= entry.getValue();
+                            LOG.info("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," +
+                                            " status: {}, message: {}, error_url: {}, total_rows: {}, loaded_rows: {}, filtered_rows: {}," +
+                                            " unselected_rows: {}, load_bytes: {}, start_time: {}, finish_time: {}.",
+                                    backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(), streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(),
+                                    streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(), streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(),
+                                    streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(), streamLoadItem.getLoadBytes(), streamLoadItem.getStartTime(),
+                                    streamLoadItem.getFinishTime());
+
+                            AuditEvent auditEvent = new StreamLoadAuditEvent.AuditEventBuilder().setEventType(EventType.STREAM_LOAD_FINISH)
+                                    .setLabel(streamLoadItem.getLabel()).setDb(streamLoadItem.getDb()).setTable(streamLoadItem.getTbl())
+                                    .setUser(streamLoadItem.getUser()).setClientIp(streamLoadItem.getUserIp()).setStatus(streamLoadItem.getStatus())
+                                    .setMessage(streamLoadItem.getMessage()).setUrl(streamLoadItem.getUrl()).setTotalRows(streamLoadItem.getTotalRows())
+                                    .setLoadedRows( streamLoadItem.getLoadedRows()).setFilteredRows(streamLoadItem.getFilteredRows())
+                                    .setUnselectedRows(streamLoadItem.getUnselectedRows()).setLoadBytes(streamLoadItem.getLoadBytes())
+                                    .setStartTime(streamLoadItem.getStartTime()).setFinishTime(streamLoadItem.getFinishTime())
+                                    .build();
+                            Catalog.getCurrentCatalog().getAuditEventProcessor().handleAuditEvent(auditEvent);
+                            if (entry.getKey().compareTo(backend.getLastStreamLoadTime()) > 0) {
+                                backend.setLastStreamLoadTime(entry.getKey());

Review comment:
       We can find the max timestamp of this batch and call `setLastStreamLoadTime` once.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r608320683



##########
File path: be/src/runtime/stream_load/stream_load_recorder.h
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <map>
+#include <string>
+
+#include "rocksdb/utilities/db_ttl.h"
+
+#pragma once
+
+namespace doris {
+
+class Status;
+
+class StreamLoadRecorder {
+public:
+    StreamLoadRecorder(const std::string& root_path);
+
+    virtual ~StreamLoadRecorder();
+
+    Status init();
+
+    Status put(const std::string& key, const std::string& value);
+
+    Status get_batch(const std::string& start, const int batch_size, std::map<std::string, std::string>* stream_load_records);
+
+private:
+    std::string _root_path;
+    rocksdb::DBWithTTL* _db;
+    std::vector<rocksdb::ColumnFamilyHandle*> _handles;
+
+    int64_t _last_compaction_time;
+
+    enum ColumnFamilyIndex {
+        DEFAULT_COLUMN_FAMILY_INDEX = 0,
+        STREAM_LOAD_COLUMN_FAMILY_INDEX
+    };
+
+    const std::string DEFAULT_COLUMN_FAMILY = "default";
+    const std::string STREAM_LOAD_COLUMN_FAMILY = "stream_load";

Review comment:
       > Why you use multiple column families? you can use the default column family directly.
   
   OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r608311135



##########
File path: be/src/runtime/stream_load/stream_load_recorder.cpp
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_recorder.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/slice_transform.h"
+#include "rocksdb/utilities/db_ttl.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+
+StreamLoadRecorder::StreamLoadRecorder(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr),
+          _last_compaction_time(UnixMillis()) {
+}
+
+StreamLoadRecorder::~StreamLoadRecorder() {
+    if (_db != nullptr) {
+        for (auto handle : _handles) {
+            _db->DestroyColumnFamilyHandle(handle);
+            handle = nullptr;
+        }
+        delete _db;
+        _db= nullptr;
+    }
+}
+
+Status StreamLoadRecorder::init() {
+    // init db
+    rocksdb::DBOptions options;
+    options.IncreaseParallelism();
+    options.create_if_missing = true;
+    options.create_missing_column_families = true;
+    std::string db_path = _root_path + STREAM_LOAD_POSTFIX;
+    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+    // default column family is required
+    column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    // stream load column family
+    column_families.emplace_back(STREAM_LOAD_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    std::vector<int32_t> ttls = {config::stream_load_record_expire_time_secs, config::stream_load_record_expire_time_secs};
+    rocksdb::Status s = rocksdb::DBWithTTL::Open(options, db_path, column_families, &_handles, &_db, ttls);
+
+    if (!s.ok() || _db == nullptr) {
+        LOG(WARNING) << "rocks db open failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb open failed");

Review comment:
       > You can append s.ToString() to the returned value, then the caller could know the error details, or he should grep the log to find the root cause.
   
   OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r601974244



##########
File path: be/src/olap/storage_engine.cpp
##########
@@ -225,6 +226,35 @@ Status StorageEngine::_init_store_map() {
     for (auto store : tmp_stores) {
         _store_map.emplace(store->path(), store);
     }
+
+    auto st = _init_stream_load_record();
+    if (!st.ok()) {
+        LOG(WARNING) << "status=" << st.to_string();

Review comment:
       > Do we need to stop the startup if this error happen?
   
   It is reasonable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r601976988



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.plugin.AuditEvent;
+import org.apache.doris.plugin.AuditEvent.EventType;
+import org.apache.doris.plugin.StreamLoadAuditEvent;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStreamLoadRecord;
+import org.apache.doris.thrift.TStreamLoadRecordResult;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class StreamLoadRecordMgr {
+    private static final Logger LOG = LogManager.getLogger(StreamLoadRecordMgr.class);
+    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public StreamLoadRecordMgr() {
+        Thread pullStreamLoadRecordThread = new Thread(new PullStreamLoadRecordThread());
+        pullStreamLoadRecordThread.start();
+    }
+
+    private class PullStreamLoadRecordThread implements Runnable {
+        @Override
+        public void run() {
+            ImmutableMap<Long, Backend> backends = Catalog.getCurrentSystemInfo().getIdToBackend();
+
+            while (true) {
+                long start = System.currentTimeMillis();
+                for (Backend backend : backends.values()) {
+                    BackendService.Client client = null;
+                    TNetworkAddress address = null;
+                    boolean ok = false;
+                    try {
+                        address = new TNetworkAddress(backend.getHost(), backend.getBePort());
+                        client = ClientPool.backendPool.borrowObject(address);
+                        TStreamLoadRecordResult result = client.getStreamLoadRecord(backend.getLastStreamLoadTime());
+                        Map<String, TStreamLoadRecord> streamLoadRecordBatch = result.getStreamLoadRecord();
+                        LOG.info("receive stream load audit info from backend: {}. batch size: {}", backend.getHost(), streamLoadRecordBatch.size());
+                        for (Map.Entry<String, TStreamLoadRecord> entry : streamLoadRecordBatch.entrySet()) {
+                            TStreamLoadRecord streamLoadItem= entry.getValue();
+                            LOG.info("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," +
+                                            " status: {}, message: {}, error_url: {}, total_rows: {}, loaded_rows: {}, filtered_rows: {}," +
+                                            " unselected_rows: {}, load_bytes: {}, start_time: {}, finish_time: {}.",
+                                    backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(), streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(),
+                                    streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(), streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(),
+                                    streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(), streamLoadItem.getLoadBytes(), streamLoadItem.getStartTime(),
+                                    streamLoadItem.getFinishTime());
+
+                            AuditEvent auditEvent = new StreamLoadAuditEvent.AuditEventBuilder().setEventType(EventType.STREAM_LOAD_FINISH)
+                                    .setLabel(streamLoadItem.getLabel()).setDb(streamLoadItem.getDb()).setTable(streamLoadItem.getTbl())
+                                    .setUser(streamLoadItem.getUser()).setClientIp(streamLoadItem.getUserIp()).setStatus(streamLoadItem.getStatus())
+                                    .setMessage(streamLoadItem.getMessage()).setUrl(streamLoadItem.getUrl()).setTotalRows(streamLoadItem.getTotalRows())
+                                    .setLoadedRows( streamLoadItem.getLoadedRows()).setFilteredRows(streamLoadItem.getFilteredRows())
+                                    .setUnselectedRows(streamLoadItem.getUnselectedRows()).setLoadBytes(streamLoadItem.getLoadBytes())
+                                    .setStartTime(streamLoadItem.getStartTime()).setFinishTime(streamLoadItem.getFinishTime())
+                                    .build();
+                            Catalog.getCurrentCatalog().getAuditEventProcessor().handleAuditEvent(auditEvent);
+                            if (entry.getKey().compareTo(backend.getLastStreamLoadTime()) > 0) {
+                                backend.setLastStreamLoadTime(entry.getKey());
+                            }
+                        }
+                        ok = true;
+                    } catch (Exception e) {
+                        LOG.warn("task exec error. backend[{}]", backend.getId(), e);
+                    } finally {
+                        if (ok) {
+                            ClientPool.backendPool.returnObject(address, client);
+                        } else {
+                            ClientPool.backendPool.invalidateObject(address, client);
+                        }
+                    }
+                }
+                LOG.info("finished to pull stream load records of all backends. cost: {} ms",

Review comment:
       > You can add the number of records in this log
   
   OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r601980568



##########
File path: be/src/common/config.h
##########
@@ -348,6 +348,12 @@ CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200");
 CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60");
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing data.
 CONF_mBool(tablet_writer_ignore_eovercrowded, "false");
+// batch size of stream load record reported to FE
+CONF_mInt32(stream_load_record_batch_size, "50");
+// expire time of stream load record in rocksdb.
+CONF_mInt32(stream_load_record_expire_time_secs, "28800");

Review comment:
       > stream_load_record_expire_time_secs is not configurable at runtime.
   
   OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r607462640



##########
File path: be/src/common/config.h
##########
@@ -348,6 +348,14 @@ CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200");
 CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60");
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing data.
 CONF_mBool(tablet_writer_ignore_eovercrowded, "false");
+// batch size of stream load record reported to FE
+CONF_mInt32(stream_load_record_batch_size, "50");
+// expire time of stream load record in rocksdb.
+CONF_Int32(stream_load_record_expire_time_secs, "28800");
+// time interval to clean expired stream load records
+CONF_mInt64(clean_stream_load_record_interval_secs, "1800");
+// the storage path of stream load record rocksdb
+CONF_String(stream_load_record_path, "");

Review comment:
       > If default `stream_load_record_path` is `""`, then the default rocksdb dir is
   > `/stream_load`, so there will be error when BE starts:
   > `rocks db open failed, reason:IO error: While mkdir if missing: /stream_load: Permission denied`
   > Maybe `${DORIS_HOME}/` ?
   
   That's reasonable. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r601976777



##########
File path: fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
##########
@@ -95,6 +95,8 @@
     // additional backendStatus information for BE, display in JSON format
     private BackendStatus backendStatus = new BackendStatus();
 
+    private String lastStreamLoadTime = "";

Review comment:
       > Why not using timestamp as Long?
   > And this field is not persisted. So every time the FE restart, this field will be reset to
   > `1970`?
   
   I have modified the code and this field will be persisted in bdbje.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r608307385



##########
File path: be/src/http/action/stream_load.cpp
##########
@@ -533,4 +539,17 @@ Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_pa
     return Status::OK();
 }
 
+void StreamLoadAction::_sava_stream_load_record(StreamLoadContext* ctx, const std::string& str) {
+    auto stream_load_recorder = StorageEngine::instance()->get_stream_load_recorder();
+    if (stream_load_recorder != nullptr) {
+        std::string key = std::to_string(ctx->start_micros + ctx->load_cost_micros) + "_" + ctx->label;
+        auto st = stream_load_recorder->put(key, str);
+        if (st.ok()) {

Review comment:
       > Also need to log when put into rocksdb failed
   
   There is log in `StreamLoadRecorder::put()`when putting data into rocksdb failed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r608308459



##########
File path: be/src/runtime/stream_load/stream_load_recorder.cpp
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_recorder.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/slice_transform.h"
+#include "rocksdb/utilities/db_ttl.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+
+StreamLoadRecorder::StreamLoadRecorder(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr),
+          _last_compaction_time(UnixMillis()) {
+}
+
+StreamLoadRecorder::~StreamLoadRecorder() {
+    if (_db != nullptr) {
+        for (auto handle : _handles) {
+            _db->DestroyColumnFamilyHandle(handle);
+            handle = nullptr;
+        }
+        delete _db;
+        _db= nullptr;
+    }
+}
+
+Status StreamLoadRecorder::init() {
+    // init db
+    rocksdb::DBOptions options;
+    options.IncreaseParallelism();
+    options.create_if_missing = true;
+    options.create_missing_column_families = true;
+    std::string db_path = _root_path + STREAM_LOAD_POSTFIX;
+    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+    // default column family is required
+    column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    // stream load column family
+    column_families.emplace_back(STREAM_LOAD_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    std::vector<int32_t> ttls = {config::stream_load_record_expire_time_secs, config::stream_load_record_expire_time_secs};
+    rocksdb::Status s = rocksdb::DBWithTTL::Open(options, db_path, column_families, &_handles, &_db, ttls);
+

Review comment:
       > 
   
   OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r601984725



##########
File path: be/src/common/config.h
##########
@@ -348,6 +348,12 @@ CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200");
 CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60");
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing data.
 CONF_mBool(tablet_writer_ignore_eovercrowded, "false");
+// batch size of stream load record reported to FE
+CONF_mInt32(stream_load_record_batch_size, "50");
+// expire time of stream load record in rocksdb. 1000*1000*60*60*8=28800000000(8 hour)
+CONF_mInt64(stream_load_record_expire_time_us, "28800000000");

Review comment:
       > better use more readable config, I suggest using HOUR.
   
   OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r601975417



##########
File path: be/src/runtime/stream_load/stream_load_record.cpp
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/stream_load/stream_load_record.h"
+
+#include "common/config.h"
+#include "common/status.h"
+#include "rocksdb/db.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice_transform.h"
+#include "util/time.h"
+
+
+namespace doris {
+const std::string STREAM_LOAD_POSTFIX = "/stream_load";
+const size_t PREFIX_LENGTH = 4;
+
+StreamLoadRecord::StreamLoadRecord(const std::string& root_path)
+        : _root_path(root_path),
+          _db(nullptr) {
+}
+
+StreamLoadRecord::~StreamLoadRecord() {
+    for (auto handle : _handles) {
+        _db->DestroyColumnFamilyHandle(handle);
+        handle = nullptr;
+    }
+    if (_db != nullptr) {
+        delete _db;
+        _db= nullptr;
+    }
+}
+
+Status StreamLoadRecord::init() {
+    // init db
+    rocksdb::DBOptions options;
+    options.IncreaseParallelism();
+    options.create_if_missing = true;
+    options.create_missing_column_families = true;
+    std::string db_path = _root_path + STREAM_LOAD_POSTFIX;
+    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+    // default column family is required
+    column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
+    // stream load column family add prefix extractor to improve performance and ensure correctness
+    rocksdb::ColumnFamilyOptions stream_load_column_family;
+    stream_load_column_family.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(PREFIX_LENGTH));
+    column_families.emplace_back(STREAM_LOAD_COLUMN_FAMILY, stream_load_column_family);
+    rocksdb::Status s = rocksdb::DB::Open(options, db_path, column_families, &_handles, &_db);
+    if (!s.ok() || _db == nullptr) {
+        LOG(WARNING) << "rocks db open failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb open failed");
+    }
+    return Status::OK();
+}
+
+Status StreamLoadRecord::put(const std::string& key, const std::string& value) {
+    rocksdb::ColumnFamilyHandle* handle = _handles[1];
+    rocksdb::WriteOptions write_options;
+    write_options.sync = false;
+    rocksdb::Status s = _db->Put(write_options, handle, rocksdb::Slice(key), rocksdb::Slice(value));
+    if (!s.ok()) {
+        LOG(WARNING) << "rocks db put key:" << key << " failed, reason:" << s.ToString();
+        return Status::InternalError("Stream load record rocksdb put failed");
+    }
+    return Status::OK();
+}
+
+Status StreamLoadRecord::get_batch(const std::string& start, const int batch_size, std::map<std::string, std::string> &stream_load_records) {
+    rocksdb::ColumnFamilyHandle* handle = _handles[1];
+    std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(rocksdb::ReadOptions(), handle));
+    if (start == "") {
+        it->SeekToFirst();
+    } else {
+        it->Seek(start);
+        rocksdb::Status status = it->status();
+        if (!status.ok()) {
+            it->SeekToFirst();
+        }
+    }
+    rocksdb::Status status = it->status();
+    if (!status.ok()) {
+        LOG(WARNING) << "rocksdb seek failed. reason:" << status.ToString();
+        return Status::InternalError("Stream load record rocksdb seek failed");
+    }
+    int num = 0;
+    for (it->Next(); it->Valid(); it->Next()) {
+        std::string key = it->key().ToString();
+        std::string value = it->value().ToString();
+        stream_load_records[key] = value;
+        num++;
+        if (num >= batch_size) {
+            return Status::OK();
+        }
+    }
+    return Status::OK();
+}
+
+Status StreamLoadRecord::clean_expired_stream_load_record() {

Review comment:
       > How about using TTL feature of rocksdb to do this.
   
   It is reasonable. Expired data will be removed from rocksdb after compaction.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r601975711



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.plugin.AuditEvent;
+import org.apache.doris.plugin.AuditEvent.EventType;
+import org.apache.doris.plugin.StreamLoadAuditEvent;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStreamLoadRecord;
+import org.apache.doris.thrift.TStreamLoadRecordResult;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class StreamLoadRecordMgr {
+    private static final Logger LOG = LogManager.getLogger(StreamLoadRecordMgr.class);
+    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public StreamLoadRecordMgr() {
+        Thread pullStreamLoadRecordThread = new Thread(new PullStreamLoadRecordThread());
+        pullStreamLoadRecordThread.start();
+    }
+
+    private class PullStreamLoadRecordThread implements Runnable {
+        @Override
+        public void run() {
+            ImmutableMap<Long, Backend> backends = Catalog.getCurrentSystemInfo().getIdToBackend();
+
+            while (true) {
+                long start = System.currentTimeMillis();
+                for (Backend backend : backends.values()) {
+                    BackendService.Client client = null;
+                    TNetworkAddress address = null;
+                    boolean ok = false;
+                    try {
+                        address = new TNetworkAddress(backend.getHost(), backend.getBePort());
+                        client = ClientPool.backendPool.borrowObject(address);
+                        TStreamLoadRecordResult result = client.getStreamLoadRecord(backend.getLastStreamLoadTime());
+                        Map<String, TStreamLoadRecord> streamLoadRecordBatch = result.getStreamLoadRecord();
+                        LOG.info("receive stream load audit info from backend: {}. batch size: {}", backend.getHost(), streamLoadRecordBatch.size());
+                        for (Map.Entry<String, TStreamLoadRecord> entry : streamLoadRecordBatch.entrySet()) {
+                            TStreamLoadRecord streamLoadItem= entry.getValue();
+                            LOG.info("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," +
+                                            " status: {}, message: {}, error_url: {}, total_rows: {}, loaded_rows: {}, filtered_rows: {}," +
+                                            " unselected_rows: {}, load_bytes: {}, start_time: {}, finish_time: {}.",
+                                    backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(), streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(),
+                                    streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(), streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(),
+                                    streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(), streamLoadItem.getLoadBytes(), streamLoadItem.getStartTime(),
+                                    streamLoadItem.getFinishTime());
+
+                            AuditEvent auditEvent = new StreamLoadAuditEvent.AuditEventBuilder().setEventType(EventType.STREAM_LOAD_FINISH)
+                                    .setLabel(streamLoadItem.getLabel()).setDb(streamLoadItem.getDb()).setTable(streamLoadItem.getTbl())
+                                    .setUser(streamLoadItem.getUser()).setClientIp(streamLoadItem.getUserIp()).setStatus(streamLoadItem.getStatus())
+                                    .setMessage(streamLoadItem.getMessage()).setUrl(streamLoadItem.getUrl()).setTotalRows(streamLoadItem.getTotalRows())
+                                    .setLoadedRows( streamLoadItem.getLoadedRows()).setFilteredRows(streamLoadItem.getFilteredRows())
+                                    .setUnselectedRows(streamLoadItem.getUnselectedRows()).setLoadBytes(streamLoadItem.getLoadBytes())
+                                    .setStartTime(streamLoadItem.getStartTime()).setFinishTime(streamLoadItem.getFinishTime())
+                                    .build();
+                            Catalog.getCurrentCatalog().getAuditEventProcessor().handleAuditEvent(auditEvent);
+                            if (entry.getKey().compareTo(backend.getLastStreamLoadTime()) > 0) {
+                                backend.setLastStreamLoadTime(entry.getKey());

Review comment:
       > We can find the max timestamp of this batch and call `setLastStreamLoadTime` once.
   
   It is reasonable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r607578246



##########
File path: gensrc/thrift/BackendService.thrift
##########
@@ -134,4 +159,6 @@ service BackendService {
     // release the context resource associated with the context_id
     DorisExternalService.TScanCloseResult close_scanner(1: DorisExternalService.TScanCloseParams params);
 
+    TStreamLoadRecordResult get_stream_load_record(1: string params);

Review comment:
       > Rename `params` to a meaningful name, maybe `last_record_time`?
   > But I think we'd better use a timestamp (which is in Long Integer type) to pass this param to decouple the implementation between RPC interface and the implementation in BE.
   
   Thank you. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r606805795



##########
File path: be/src/common/config.h
##########
@@ -348,6 +348,14 @@ CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200");
 CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60");
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing data.
 CONF_mBool(tablet_writer_ignore_eovercrowded, "false");
+// batch size of stream load record reported to FE
+CONF_mInt32(stream_load_record_batch_size, "50");
+// expire time of stream load record in rocksdb.
+CONF_Int32(stream_load_record_expire_time_secs, "28800");
+// time interval to clean expired stream load records
+CONF_mInt64(clean_stream_load_record_interval_secs, "1800");
+// the storage path of stream load record rocksdb
+CONF_String(stream_load_record_path, "");

Review comment:
       If default `stream_load_record_path` is `""`, then the default rocksdb dir is
   `/stream_load`, so there will be error when BE starts:
   `rocks db open failed, reason:IO error: While mkdir if missing: /stream_load: Permission denied`
   Maybe `${DORIS_HOME}/` ?

##########
File path: be/src/runtime/stream_load/stream_load_record.h
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <map>
+#include <string>
+
+#include "rocksdb/utilities/db_ttl.h"
+
+#pragma once
+
+namespace doris {
+
+class Status;
+
+class StreamLoadRecord {

Review comment:
       Better name as `StreamLoadRecorder`

##########
File path: gensrc/thrift/BackendService.thrift
##########
@@ -134,4 +159,6 @@ service BackendService {
     // release the context resource associated with the context_id
     DorisExternalService.TScanCloseResult close_scanner(1: DorisExternalService.TScanCloseParams params);
 
+    TStreamLoadRecordResult get_stream_load_record(1: string params);

Review comment:
       Rename `params` to a meaningful name, maybe `last_record_time`?
   But I think we'd better use a timestamp (which is in Long Integer type) to pass this param to decouple the implementation between RPC interface and the implementation in BE.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #5452: [Audit][Stream Load] Support audit function for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #5452:
URL: https://github.com/apache/incubator-doris/pull/5452#discussion_r612081013



##########
File path: be/src/service/backend_service.cpp
##########
@@ -312,4 +314,25 @@ void BackendService::close_scanner(TScanCloseResult& result_, const TScanClosePa
     result_.status = t_status;
 }
 
+void BackendService::get_stream_load_record(TStreamLoadRecordResult& result, const int64_t last_stream_record_time) {

Review comment:
       > Use `TStreamLoadRecordResult* result` as pass out parameter
   
   The function interface should be consistent with the thrift generated code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org