You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/07/11 08:15:30 UTC

[GitHub] [nifi-minifi-cpp] adamdebreceni opened a new pull request, #1367: MINIFICPP-1822 - Add alert capability

adamdebreceni opened a new pull request, #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367

   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
szaszm commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r952488970


##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+#include "controllers/SSLContextService.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      live_logs_(config_.rate_limit),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = utils::Regex{filter_str.value()};
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::shared_ptr<AlertSink>(new AlertSink(std::move(config), std::move(logger)));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));

Review Comment:
   Yes, making this explicit would eliminate a similar doubt in future readers, so I think it's worth the extra couple of lines.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r935554229


##########
extensions/http-curl/tests/AlertTests.cpp:
##########
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#define CATCH_CONFIG_MAIN
+#include "TestBase.h"
+#include "Catch.h"
+#include "ServerAwareHandler.h"
+#include "CivetServer.h"
+#include "TestServer.h"
+#include "HTTPIntegrationBase.h"
+#include "rapidjson/document.h"
+#include "EmptyFlow.h"
+#include "Utils.h"
+#include "TestUtils.h"
+
+class AlertHandler : public ServerAwareHandler {
+ public:
+  explicit AlertHandler(std::string agent_id): agent_id_(std::move(agent_id)) {}
+
+  bool handlePut(CivetServer* , struct mg_connection *conn) override {
+    auto msg = readPayload(conn);
+    rapidjson::Document doc;
+    rapidjson::ParseResult res = doc.Parse(msg.c_str());
+    REQUIRE(static_cast<bool>(res));
+    REQUIRE(doc.IsObject());
+    REQUIRE(doc.HasMember("agentId"));
+    REQUIRE(doc["agentId"].IsString());
+    REQUIRE(doc.HasMember("alerts"));
+    REQUIRE(doc["alerts"].IsArray());
+    REQUIRE(doc["alerts"].Size() > 0);
+    std::string id(doc["agentId"].GetString(), doc["agentId"].GetStringLength());
+    REQUIRE(id == agent_id_);
+    std::vector<std::string> batch;
+    for (size_t i = 0; i < doc["alerts"].Size(); ++i) {
+      REQUIRE(doc["alerts"][i].IsString());
+      batch.emplace_back(doc["alerts"][i].GetString(), doc["alerts"][i].GetStringLength());
+    }
+    alerts_.enqueue(std::move(batch));
+    return true;
+  }
+
+  std::string agent_id_;
+  utils::ConditionConcurrentQueue<std::vector<std::string>> alerts_;
+};
+
+class VerifyAlerts : public HTTPIntegrationBase {
+ public:
+  void testSetup() override {}
+
+  void runAssertions() override {
+    verify_();
+  }
+
+  std::function<bool()> verify_;
+};
+
+TEST_CASE("Alert system forwards logs") {
+  auto clock = std::make_shared<utils::ManualClock>();
+  utils::timeutils::setClock(clock);
+
+  TempDirectory dir;
+  auto flow_file = std::filesystem::path(dir.getPath()) / "config.yml";
+  std::ofstream(flow_file) << empty_flow;
+
+  std::string agent_id = "test-agent-1";
+  VerifyAlerts harness;
+  AlertHandler handler(agent_id);
+  harness.setUrl("http://localhost:0/api/alerts", &handler);
+  harness.getConfiguration()->set(minifi::Configuration::nifi_c2_agent_identifier, agent_id);
+  harness.getConfiguration()->setHome(dir.getPath());
+
+  auto log_props = std::make_shared<logging::LoggerProperties>();
+  log_props->set("appender.alert1", "alert");
+  log_props->set("appender.alert1.url", harness.getC2RestUrl());
+  log_props->set("appender.alert1.filter", ".*<begin>(.*)<end>.*");
+  log_props->set("appender.alert1.rate.limit", "10 s");
+  log_props->set("appender.alert1.flush.period", "1 s");
+  log_props->set("logger.root", "INFO,alert1");
+  logging::LoggerConfiguration::getConfiguration().initialize(log_props);
+
+  auto verifyLogsArrived = [&] (const std::vector<std::string>& expected) {
+    std::vector<std::string> logs;
+    REQUIRE(handler.alerts_.dequeueWaitFor(logs, 1s));
+    REQUIRE(logs.size() == expected.size());
+    for (size_t idx = 0; idx < expected.size(); ++idx) {
+      bool contains = std::search(logs[idx].begin(), logs[idx].end(), expected[idx].begin(), expected[idx].end()) != logs[idx].end();
+      REQUIRE(contains);
+    }
+  };
+
+  harness.verify_ = [&] {
+    auto logger = logging::LoggerFactory<minifi::FlowController>::getLogger();
+    // time = 0
+    logger->log_error("not matched");
+    logger->log_error("<begin>one<end>");
+    logger->log_error("not the same but treated so <begin>one<end>");
+    logger->log_error("<begin>two<end>");
+    clock->advance(2s);
+    // time = 2
+    verifyLogsArrived({
+        "<begin>one<end>", "<begin>two<end>"
+    });
+
+    clock->advance(5s);
+    // time = 7
+    // no new logs over HTTP
+
+    logger->log_error("other <begin>one<end>");
+    logger->log_error("new log <begin>three<end>");
+    clock->advance(2s);
+
+    // time = 9
+    verifyLogsArrived({
+        "new log <begin>three<end>"
+    });
+
+    clock->advance(2s);
+    // time = 11
+    logger->log_error("other <begin>one<end>");
+    logger->log_error("new log <begin>three<end>");
+    clock->advance(2s);
+    // time = 13
+
+    verifyLogsArrived({
+        "other <begin>one<end>"
+    });
+
+    return true;
+  };
+
+  harness.run(flow_file, dir.getPath());

Review Comment:
   ```suggestion
     harness.run(flow_file.string(), dir.getPath());
   ```
   as `std::filesystem::path` doesn't convert implicitly to `std::string` on Windows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
szaszm commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r942393823


##########
libminifi/include/core/logging/alert/AlertSink.h:
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <unordered_set>
+#include <regex>
+#include <utility>
+#include <string>
+#include <memory>
+
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/logging/LoggerProperties.h"
+#include "utils/ThreadPool.h"
+#include "utils/StagingQueue.h"
+#include "utils/RegexUtils.h"
+#include "properties/Configure.h"
+#include "spdlog/sinks/base_sink.h"
+
+namespace org::apache::nifi::minifi::controllers {
+class SSLContextService;
+}  // namespace org::apache::nifi::minifi::controllers
+
+namespace org::apache::nifi::minifi::core::logging {
+
+class AlertSink : public spdlog::sinks::base_sink<std::mutex> {
+  struct Config {
+    std::string url;
+    std::optional<std::string> ssl_service_name;
+    int batch_size;
+    std::chrono::milliseconds flush_period;
+    std::chrono::milliseconds rate_limit;
+    int buffer_limit;
+    utils::Regex filter;
+    spdlog::level::level_enum level;
+  };
+
+  struct Services {
+    std::shared_ptr<controllers::SSLContextService> ssl_service;
+    std::shared_ptr<AgentIdentificationProvider> agent_id;
+  };
+
+  struct LogBuffer {
+    size_t size_{0};
+    std::deque<std::pair<std::string, size_t>> data_;

Review Comment:
   Struct (i.e. simple class with public non-static data members) non-static data members are not supposed to have underscores, similarly to the other structs above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
szaszm commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r952772655


##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+#include "controllers/SSLContextService.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      live_logs_(config_.rate_limit),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = utils::Regex{filter_str.value()};
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::shared_ptr<AlertSink>(new AlertSink(std::move(config), std::move(logger)));

Review Comment:
   nevermind, shared_ptr constructor frees the ptr before throwing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r941662489


##########
libminifi/include/core/logging/alert/AlertSink.h:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <unordered_set>
+#include <regex>
+#include <utility>
+#include <string>
+#include <memory>
+
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/logging/LoggerProperties.h"
+#include "utils/ThreadPool.h"
+#include "utils/StagingQueue.h"
+#include "properties/Configure.h"
+#include "spdlog/sinks/base_sink.h"
+
+namespace org::apache::nifi::minifi::controllers {
+class SSLContextService;
+}  // namespace org::apache::nifi::minifi::controllers
+
+namespace org::apache::nifi::minifi::core::logging {
+
+class AlertSink : public spdlog::sinks::base_sink<std::mutex> {
+  struct Config {
+    std::string url;
+    std::optional<std::string> ssl_service_name;
+    int batch_size;
+    std::chrono::milliseconds flush_period;
+    std::chrono::milliseconds rate_limit;
+    int buffer_limit;
+    std::regex filter;
+    spdlog::level::level_enum level;
+  };
+
+  struct Services {
+    std::shared_ptr<controllers::SSLContextService> ssl_service;
+    std::shared_ptr<AgentIdentificationProvider> agent_id;
+  };
+
+  struct LogBuffer {
+    size_t size_{0};
+    std::deque<std::pair<std::string, size_t>> data_;
+
+    static LogBuffer allocate(size_t size);
+    LogBuffer commit();
+    [[nodiscard]]
+    size_t size() const;
+  };
+
+  class LiveLogSet {
+    using Hash = size_t;
+    std::chrono::milliseconds lifetime_{};
+    std::unordered_set<Hash> hashes_to_ignore_;
+    std::deque<std::pair<std::chrono::milliseconds, Hash>> timestamped_hashes_;
+   public:
+    bool tryAdd(std::chrono::milliseconds now, Hash hash);
+    void setLifetime(std::chrono::milliseconds lifetime);
+  };
+
+ public:
+  // must be public for make_shared
+  AlertSink(Config config, std::shared_ptr<Logger> logger);

Review Comment:
   I faced a similar problem recently and this can be made private with a trick like this
   https://github.com/apache/nifi-minifi-cpp/pull/1383/files#diff-4205c4389dbd9a8dee16d4d38da02ada57c19d038893abdcb3f29424c7d1dd86R112-R124



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r951099429


##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+#include "controllers/SSLContextService.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      live_logs_(config_.rate_limit),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = utils::Regex{filter_str.value()};
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::shared_ptr<AlertSink>(new AlertSink(std::move(config), std::move(logger)));

Review Comment:
   my understanding is that the storage will be automatically freed up https://eel.is/c++draft/except.ctor#note-2
   exception-safety of `std::shared_ptr<T>(new T{...})` was only of concern until C++17 before which a call like `f(std::shared_ptr<T>(new T{...}, g())` could indeed leak if `g()` throws between allocating in `new T{...}` and the constructor call of that object, from C++17 arguments are indeterminately sequenced so even this is not a concern anymore 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r949041515


##########
libminifi/include/core/logging/alert/AlertSink.h:
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <unordered_set>
+#include <regex>
+#include <utility>
+#include <string>
+#include <memory>
+
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/logging/LoggerProperties.h"
+#include "utils/ThreadPool.h"
+#include "utils/StagingQueue.h"
+#include "utils/RegexUtils.h"
+#include "properties/Configure.h"
+#include "spdlog/sinks/base_sink.h"
+
+namespace org::apache::nifi::minifi::controllers {
+class SSLContextService;
+}  // namespace org::apache::nifi::minifi::controllers
+
+namespace org::apache::nifi::minifi::core::logging {
+
+class AlertSink : public spdlog::sinks::base_sink<std::mutex> {
+  struct Config {
+    std::string url;
+    std::optional<std::string> ssl_service_name;
+    int batch_size;
+    std::chrono::milliseconds flush_period;
+    std::chrono::milliseconds rate_limit;
+    int buffer_limit;
+    utils::Regex filter;
+    spdlog::level::level_enum level;
+  };
+
+  struct Services {
+    std::shared_ptr<controllers::SSLContextService> ssl_service;
+    std::shared_ptr<AgentIdentificationProvider> agent_id;
+  };
+
+  struct LogBuffer {
+    size_t size_{0};
+    std::deque<std::pair<std::string, size_t>> data_;

Review Comment:
   changed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r949017222


##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+#include "controllers/SSLContextService.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  live_logs_.setLifetime(config_.rate_limit);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = utils::Regex{filter_str.value()};
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::shared_ptr<AlertSink>(new AlertSink(std::move(config), std::move(logger)));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));
+}
+
+void AlertSink::sink_it_(const spdlog::details::log_msg& msg) {
+  // this method is protected upstream in base_sink by a mutex
+
+  // TODO(adebreceni): revisit this after MINIFICPP-1903
+  utils::SMatch match;
+  std::string_view payload(msg.payload.data(), msg.payload.size());
+  if (!utils::regexMatch(std::string{payload}, match, config_.filter)) {
+    return;
+  }
+  size_t hash = 0;
+  for (size_t idx = 1; idx < match.size(); ++idx) {
+    std::string submatch = match[idx].str();
+    hash = utils::hash_combine(hash, std::hash<std::string>{}(submatch));
+  }
+  if (!live_logs_.tryAdd(clock_->timeSinceEpoch(), hash)) {
+    return;
+  }
+
+  spdlog::memory_buf_t formatted;
+  formatter_->format(msg, formatted);
+
+  buffer_.modify([&] (LogBuffer& log_buf) {
+    log_buf.size_ += formatted.size();
+    log_buf.data_.emplace_back(std::string{formatted.data(), formatted.size()}, hash);
+  });
+}
+
+void AlertSink::flush_() {}
+
+void AlertSink::run() {
+  while (running_) {
+    {
+      std::unique_lock lock(mtx_);
+      next_flush_ = clock_->wait_until(cv_, lock, next_flush_, [&] {return !running_;}) + config_.flush_period;
+    }
+    std::unique_ptr<Services> services(services_.exchange(nullptr));
+    if (!services || !running_) {
+      continue;
+    }
+    try {
+      send(*services);
+    } catch (const std::exception& err) {
+      logger_->log_error("Exception while sending logs: %s", err.what());
+    } catch (...) {
+      logger_->log_error("Unknown exception while sending logs");
+    }
+    Services* expected{nullptr};
+    // only restore the services pointer if no initialize set it to something else meanwhile
+    if (services_.compare_exchange_strong(expected, services.get())) {
+      (void)services.release();
+    }
+  }
+}
+
+AlertSink::~AlertSink() {
+  {
+    std::lock_guard lock(mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+  if (flush_thread_.joinable()) {
+    flush_thread_.join();
+  }
+  delete services_.exchange(nullptr);
+}
+
+void AlertSink::send(Services& services) {
+  LogBuffer logs;
+  buffer_.commit();
+  if (!buffer_.tryDequeue(logs)) {
+    return;
+  }
+
+  auto client = core::ClassLoader::getDefaultClassLoader().instantiate<utils::BaseHTTPClient>("HTTPClient", "HTTPClient");
+  if (!client) {
+    logger_->log_error("Could not instantiate a HTTPClient object");
+    return;
+  }
+  client->initialize("PUT", config_.url, services.ssl_service);
+
+  rapidjson::Document doc(rapidjson::kObjectType);
+  std::string agent_id = services.agent_id->getAgentIdentifier();
+  doc.AddMember("agentId", rapidjson::Value(agent_id.data(), agent_id.length()), doc.GetAllocator());
+  doc.AddMember("alerts", rapidjson::Value(rapidjson::kArrayType), doc.GetAllocator());
+  for (const auto& [log, _] : logs.data_) {
+    doc["alerts"].PushBack(rapidjson::Value(log.data(), log.size()), doc.GetAllocator());
+  }
+  rapidjson::StringBuffer buffer;
+  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+  doc.Accept(writer);
+
+  auto data_input = std::make_unique<utils::ByteInputCallback>();
+  auto data_cb = std::make_unique<utils::HTTPUploadCallback>();
+  data_input->write(std::string(buffer.GetString(), buffer.GetSize()));
+  data_cb->ptr = data_input.get();
+  client->setUploadCallback(data_cb.get());
+  client->setContentType("application/json");
+
+  bool req_success = client->submit();
+
+  int64_t resp_code = client->getResponseCode();
+  const bool response_success = 200 <= resp_code && resp_code < 300;
+  const bool client_err = 400 <= resp_code && resp_code < 500;
+  const bool server_err = 500 <= resp_code && resp_code < 600;
+  if (client_err || server_err) {
+    logger_->log_error("Error response code '" "%" PRId64 "' from '%s'", resp_code, config_.url);
+  } else if (!response_success) {
+    logger_->log_warn("Non-success response code '" "%" PRId64 "' from '%s'", resp_code, config_.url);
+  } else {
+    logger_->log_debug("Response code '" "%" PRId64 "' from '%s'", resp_code, config_.url);
+  }
+
+  if (!req_success) {
+    logger_->log_error("Failed to send alert request");
+  }
+}
+
+AlertSink::LogBuffer AlertSink::LogBuffer::allocate(size_t /*size*/) {
+  return {};
+}
+
+AlertSink::LogBuffer AlertSink::LogBuffer::commit() {
+  return std::move(*this);
+}
+
+size_t AlertSink::LogBuffer::size() const {
+  return size_;
+}
+
+void AlertSink::LiveLogSet::setLifetime(std::chrono::milliseconds lifetime) {
+  lifetime_ = lifetime;
+}
+
+bool AlertSink::LiveLogSet::tryAdd(std::chrono::milliseconds now, size_t hash) {
+  auto limit = now - lifetime_;
+  while (!timestamped_hashes_.empty() && timestamped_hashes_.front().first < limit) {

Review Comment:
   I don't remember what the purpose of `setLifeTime` was, removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r949021546


##########
libminifi/include/utils/Hash.h:
##########
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+
+namespace org::apache::nifi::minifi::utils {
+
+// boost::hash_combine
+inline size_t hash_combine(size_t seed, size_t h) noexcept {
+  seed ^= h + 0x9e3779b9 + (seed << 6U) + (seed >> 2U);
+  return seed;
+}

Review Comment:
   rewrote it to the second version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r942175691


##########
libminifi/include/core/logging/alert/AlertSink.h:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <unordered_set>
+#include <regex>
+#include <utility>
+#include <string>
+#include <memory>
+
+#include "controllers/SSLContextService.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/logging/LoggerProperties.h"
+#include "utils/ThreadPool.h"
+#include "utils/StagingQueue.h"
+#include "properties/Configure.h"
+#include "spdlog/sinks/base_sink.h"
+
+
+namespace org::apache::nifi::minifi::core::logging {
+
+class AlertSink : public spdlog::sinks::base_sink<std::mutex> {
+  struct Config {
+    std::string url;
+    std::optional<std::string> ssl_service_name;
+    int batch_size;
+    std::chrono::milliseconds flush_period;
+    std::chrono::milliseconds rate_limit;
+    int buffer_limit;
+    std::regex filter;

Review Comment:
   changed it to use `utils::Regex`, it has some shortcomings created a followup ticket [MINIFICPP-1903](https://issues.apache.org/jira/browse/MINIFICPP-1903)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r945665823


##########
libminifi/include/utils/TestUtils.h:
##########
@@ -62,14 +63,45 @@ Identifier generateUUID() {
 
 class ManualClock : public timeutils::Clock {
  public:
-  [[nodiscard]] std::chrono::milliseconds timeSinceEpoch() const override { return time_; }
-  void advance(std::chrono::milliseconds elapsed_time) { time_ += elapsed_time; }
+  [[nodiscard]] std::chrono::milliseconds timeSinceEpoch() const override {
+    std::lock_guard lock(mtx_);
+    return time_;
+  }
+  void advance(std::chrono::milliseconds elapsed_time) {
+    std::lock_guard lock(mtx_);
+    time_ += elapsed_time;
+    for (auto* cv : cvs_) {
+      cv->notify_all();
+    }
+  }
+  std::chrono::milliseconds wait_until(std::condition_variable& cv, std::unique_lock<std::mutex>& lck, std::chrono::milliseconds time, const std::function<bool()>& pred) override {
+    std::chrono::milliseconds now;
+    {
+      std::unique_lock lock(mtx_);
+      now = time_;
+      cvs_.insert(&cv);
+    }
+    if (now < time) {
+      cv.wait_for(lck, time - now, [&] {
+        now = timeSinceEpoch();
+        return now >= time || pred();
+      });
+    }
+    {
+      std::unique_lock lock(mtx_);
+      cvs_.erase(&cv);
+    }
+    return now;
+  }
 
  private:
+  mutable std::mutex mtx_;
+  std::unordered_set<std::condition_variable*> cvs_;
   std::chrono::milliseconds time_{0};
 };
 
 
+

Review Comment:
   removed



##########
libminifi/include/core/logging/alert/AlertSink.h:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <unordered_set>
+#include <regex>
+#include <utility>
+#include <string>
+#include <memory>
+
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/logging/LoggerProperties.h"
+#include "utils/ThreadPool.h"
+#include "utils/StagingQueue.h"
+#include "properties/Configure.h"
+#include "spdlog/sinks/base_sink.h"
+
+namespace org::apache::nifi::minifi::controllers {
+class SSLContextService;
+}  // namespace org::apache::nifi::minifi::controllers
+
+namespace org::apache::nifi::minifi::core::logging {
+
+class AlertSink : public spdlog::sinks::base_sink<std::mutex> {
+  struct Config {
+    std::string url;
+    std::optional<std::string> ssl_service_name;
+    int batch_size;
+    std::chrono::milliseconds flush_period;
+    std::chrono::milliseconds rate_limit;
+    int buffer_limit;
+    std::regex filter;
+    spdlog::level::level_enum level;
+  };
+
+  struct Services {
+    std::shared_ptr<controllers::SSLContextService> ssl_service;
+    std::shared_ptr<AgentIdentificationProvider> agent_id;
+  };
+
+  struct LogBuffer {
+    size_t size_{0};
+    std::deque<std::pair<std::string, size_t>> data_;
+
+    static LogBuffer allocate(size_t size);
+    LogBuffer commit();
+    [[nodiscard]]
+    size_t size() const;
+  };
+
+  class LiveLogSet {
+    using Hash = size_t;
+    std::chrono::milliseconds lifetime_{};
+    std::unordered_set<Hash> hashes_to_ignore_;
+    std::deque<std::pair<std::chrono::milliseconds, Hash>> timestamped_hashes_;
+   public:

Review Comment:
   moved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r945670220


##########
libminifi/include/core/logging/alert/AlertSink.h:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <unordered_set>
+#include <regex>
+#include <utility>
+#include <string>
+#include <memory>
+
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/logging/LoggerProperties.h"
+#include "utils/ThreadPool.h"
+#include "utils/StagingQueue.h"
+#include "properties/Configure.h"
+#include "spdlog/sinks/base_sink.h"
+
+namespace org::apache::nifi::minifi::controllers {
+class SSLContextService;
+}  // namespace org::apache::nifi::minifi::controllers
+
+namespace org::apache::nifi::minifi::core::logging {
+
+class AlertSink : public spdlog::sinks::base_sink<std::mutex> {
+  struct Config {
+    std::string url;
+    std::optional<std::string> ssl_service_name;
+    int batch_size;
+    std::chrono::milliseconds flush_period;
+    std::chrono::milliseconds rate_limit;
+    int buffer_limit;
+    std::regex filter;
+    spdlog::level::level_enum level;
+  };
+
+  struct Services {
+    std::shared_ptr<controllers::SSLContextService> ssl_service;
+    std::shared_ptr<AgentIdentificationProvider> agent_id;
+  };
+
+  struct LogBuffer {
+    size_t size_{0};
+    std::deque<std::pair<std::string, size_t>> data_;
+
+    static LogBuffer allocate(size_t size);
+    LogBuffer commit();
+    [[nodiscard]]
+    size_t size() const;
+  };
+
+  class LiveLogSet {
+    using Hash = size_t;
+    std::chrono::milliseconds lifetime_{};
+    std::unordered_set<Hash> hashes_to_ignore_;
+    std::deque<std::pair<std::chrono::milliseconds, Hash>> timestamped_hashes_;
+   public:
+    bool tryAdd(std::chrono::milliseconds now, Hash hash);
+    void setLifetime(std::chrono::milliseconds lifetime);
+  };
+
+ public:
+  // must be public for make_shared
+  AlertSink(Config config, std::shared_ptr<Logger> logger);

Review Comment:
   neat trick, but since there is a single `make_shared` that would utilize this, the raw pointer constructor of `std::shared_ptr` feels like more appropriate/simpler in this case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
szaszm commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r952487914


##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+#include "controllers/SSLContextService.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      live_logs_(config_.rate_limit),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = utils::Regex{filter_str.value()};
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::shared_ptr<AlertSink>(new AlertSink(std::move(config), std::move(logger)));

Review Comment:
   I think this only applies to delegating constructors, where the delegating expression allocates via new.
   Tested this scenario:  https://godbolt.org/z/zK6Wj6rTd



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r949173765


##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+#include "controllers/SSLContextService.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  live_logs_.setLifetime(config_.rate_limit);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = utils::Regex{filter_str.value()};
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::shared_ptr<AlertSink>(new AlertSink(std::move(config), std::move(logger)));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));
+}
+
+void AlertSink::sink_it_(const spdlog::details::log_msg& msg) {
+  // this method is protected upstream in base_sink by a mutex
+
+  // TODO(adebreceni): revisit this after MINIFICPP-1903
+  utils::SMatch match;
+  std::string_view payload(msg.payload.data(), msg.payload.size());
+  if (!utils::regexMatch(std::string{payload}, match, config_.filter)) {
+    return;
+  }
+  size_t hash = 0;
+  for (size_t idx = 1; idx < match.size(); ++idx) {
+    std::string submatch = match[idx].str();
+    hash = utils::hash_combine(hash, std::hash<std::string>{}(submatch));
+  }
+  if (!live_logs_.tryAdd(clock_->timeSinceEpoch(), hash)) {
+    return;
+  }
+
+  spdlog::memory_buf_t formatted;
+  formatter_->format(msg, formatted);
+
+  buffer_.modify([&] (LogBuffer& log_buf) {
+    log_buf.size_ += formatted.size();
+    log_buf.data_.emplace_back(std::string{formatted.data(), formatted.size()}, hash);
+  });
+}
+
+void AlertSink::flush_() {}
+
+void AlertSink::run() {
+  while (running_) {
+    {
+      std::unique_lock lock(mtx_);
+      next_flush_ = clock_->wait_until(cv_, lock, next_flush_, [&] {return !running_;}) + config_.flush_period;
+    }
+    std::unique_ptr<Services> services(services_.exchange(nullptr));
+    if (!services || !running_) {
+      continue;
+    }
+    try {
+      send(*services);
+    } catch (const std::exception& err) {
+      logger_->log_error("Exception while sending logs: %s", err.what());
+    } catch (...) {
+      logger_->log_error("Unknown exception while sending logs");
+    }
+    Services* expected{nullptr};
+    // only restore the services pointer if no initialize set it to something else meanwhile
+    if (services_.compare_exchange_strong(expected, services.get())) {
+      (void)services.release();
+    }
+  }
+}
+
+AlertSink::~AlertSink() {
+  {
+    std::lock_guard lock(mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+  if (flush_thread_.joinable()) {
+    flush_thread_.join();
+  }
+  delete services_.exchange(nullptr);
+}
+
+void AlertSink::send(Services& services) {
+  LogBuffer logs;
+  buffer_.commit();
+  if (!buffer_.tryDequeue(logs)) {
+    return;
+  }
+
+  auto client = core::ClassLoader::getDefaultClassLoader().instantiate<utils::BaseHTTPClient>("HTTPClient", "HTTPClient");
+  if (!client) {
+    logger_->log_error("Could not instantiate a HTTPClient object");
+    return;
+  }
+  client->initialize("PUT", config_.url, services.ssl_service);
+
+  rapidjson::Document doc(rapidjson::kObjectType);
+  std::string agent_id = services.agent_id->getAgentIdentifier();
+  doc.AddMember("agentId", rapidjson::Value(agent_id.data(), agent_id.length()), doc.GetAllocator());
+  doc.AddMember("alerts", rapidjson::Value(rapidjson::kArrayType), doc.GetAllocator());
+  for (const auto& [log, _] : logs.data_) {
+    doc["alerts"].PushBack(rapidjson::Value(log.data(), log.size()), doc.GetAllocator());
+  }
+  rapidjson::StringBuffer buffer;
+  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+  doc.Accept(writer);
+
+  auto data_input = std::make_unique<utils::ByteInputCallback>();
+  auto data_cb = std::make_unique<utils::HTTPUploadCallback>();
+  data_input->write(std::string(buffer.GetString(), buffer.GetSize()));
+  data_cb->ptr = data_input.get();
+  client->setUploadCallback(data_cb.get());
+  client->setContentType("application/json");
+
+  bool req_success = client->submit();
+
+  int64_t resp_code = client->getResponseCode();
+  const bool response_success = 200 <= resp_code && resp_code < 300;
+  const bool client_err = 400 <= resp_code && resp_code < 500;
+  const bool server_err = 500 <= resp_code && resp_code < 600;
+  if (client_err || server_err) {
+    logger_->log_error("Error response code '" "%" PRId64 "' from '%s'", resp_code, config_.url);
+  } else if (!response_success) {
+    logger_->log_warn("Non-success response code '" "%" PRId64 "' from '%s'", resp_code, config_.url);
+  } else {
+    logger_->log_debug("Response code '" "%" PRId64 "' from '%s'", resp_code, config_.url);
+  }
+
+  if (!req_success) {
+    logger_->log_error("Failed to send alert request");
+  }
+}
+
+AlertSink::LogBuffer AlertSink::LogBuffer::allocate(size_t /*size*/) {
+  return {};
+}
+
+AlertSink::LogBuffer AlertSink::LogBuffer::commit() {
+  return std::move(*this);
+}
+
+size_t AlertSink::LogBuffer::size() const {
+  return size_;
+}
+
+void AlertSink::LiveLogSet::setLifetime(std::chrono::milliseconds lifetime) {
+  lifetime_ = lifetime;
+}
+
+bool AlertSink::LiveLogSet::tryAdd(std::chrono::milliseconds now, size_t hash) {
+  auto limit = now - lifetime_;
+  while (!timestamped_hashes_.empty() && timestamped_hashes_.front().first < limit) {

Review Comment:
   Okay. Wouldn't you think a priority queue would better fit the role of `timestamped_hashes_`, so that it is always in a chronological order? Don't do it if it's too complicated, just an idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r934389490


##########
libminifi/include/utils/Id.h:
##########
@@ -141,18 +142,14 @@ struct hash<org::apache::nifi::minifi::utils::Identifier> {
   size_t operator()(const org::apache::nifi::minifi::utils::Identifier& id) const noexcept {
     static_assert(sizeof(org::apache::nifi::minifi::utils::Identifier) % sizeof(size_t) == 0);
     constexpr int slices = sizeof(org::apache::nifi::minifi::utils::Identifier) / sizeof(size_t);

Review Comment:
   does the `static_assert` at linke 143 not cover these cases?



##########
libminifi/include/utils/Id.h:
##########
@@ -141,18 +142,14 @@ struct hash<org::apache::nifi::minifi::utils::Identifier> {
   size_t operator()(const org::apache::nifi::minifi::utils::Identifier& id) const noexcept {
     static_assert(sizeof(org::apache::nifi::minifi::utils::Identifier) % sizeof(size_t) == 0);
     constexpr int slices = sizeof(org::apache::nifi::minifi::utils::Identifier) / sizeof(size_t);

Review Comment:
   does the `static_assert` at line 143 not cover these cases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r950051847


##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+#include "controllers/SSLContextService.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  live_logs_.setLifetime(config_.rate_limit);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = utils::Regex{filter_str.value()};
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::shared_ptr<AlertSink>(new AlertSink(std::move(config), std::move(logger)));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));
+}
+
+void AlertSink::sink_it_(const spdlog::details::log_msg& msg) {
+  // this method is protected upstream in base_sink by a mutex
+
+  // TODO(adebreceni): revisit this after MINIFICPP-1903
+  utils::SMatch match;
+  std::string_view payload(msg.payload.data(), msg.payload.size());
+  if (!utils::regexMatch(std::string{payload}, match, config_.filter)) {
+    return;
+  }
+  size_t hash = 0;
+  for (size_t idx = 1; idx < match.size(); ++idx) {
+    std::string submatch = match[idx].str();
+    hash = utils::hash_combine(hash, std::hash<std::string>{}(submatch));
+  }
+  if (!live_logs_.tryAdd(clock_->timeSinceEpoch(), hash)) {
+    return;
+  }
+
+  spdlog::memory_buf_t formatted;
+  formatter_->format(msg, formatted);
+
+  buffer_.modify([&] (LogBuffer& log_buf) {
+    log_buf.size_ += formatted.size();
+    log_buf.data_.emplace_back(std::string{formatted.data(), formatted.size()}, hash);
+  });
+}
+
+void AlertSink::flush_() {}
+
+void AlertSink::run() {
+  while (running_) {
+    {
+      std::unique_lock lock(mtx_);
+      next_flush_ = clock_->wait_until(cv_, lock, next_flush_, [&] {return !running_;}) + config_.flush_period;
+    }
+    std::unique_ptr<Services> services(services_.exchange(nullptr));
+    if (!services || !running_) {
+      continue;
+    }
+    try {
+      send(*services);
+    } catch (const std::exception& err) {
+      logger_->log_error("Exception while sending logs: %s", err.what());
+    } catch (...) {
+      logger_->log_error("Unknown exception while sending logs");
+    }
+    Services* expected{nullptr};
+    // only restore the services pointer if no initialize set it to something else meanwhile
+    if (services_.compare_exchange_strong(expected, services.get())) {
+      (void)services.release();
+    }
+  }
+}
+
+AlertSink::~AlertSink() {
+  {
+    std::lock_guard lock(mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+  if (flush_thread_.joinable()) {
+    flush_thread_.join();
+  }
+  delete services_.exchange(nullptr);
+}
+
+void AlertSink::send(Services& services) {
+  LogBuffer logs;
+  buffer_.commit();
+  if (!buffer_.tryDequeue(logs)) {
+    return;
+  }
+
+  auto client = core::ClassLoader::getDefaultClassLoader().instantiate<utils::BaseHTTPClient>("HTTPClient", "HTTPClient");
+  if (!client) {
+    logger_->log_error("Could not instantiate a HTTPClient object");
+    return;
+  }
+  client->initialize("PUT", config_.url, services.ssl_service);
+
+  rapidjson::Document doc(rapidjson::kObjectType);
+  std::string agent_id = services.agent_id->getAgentIdentifier();
+  doc.AddMember("agentId", rapidjson::Value(agent_id.data(), agent_id.length()), doc.GetAllocator());
+  doc.AddMember("alerts", rapidjson::Value(rapidjson::kArrayType), doc.GetAllocator());
+  for (const auto& [log, _] : logs.data_) {
+    doc["alerts"].PushBack(rapidjson::Value(log.data(), log.size()), doc.GetAllocator());
+  }
+  rapidjson::StringBuffer buffer;
+  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+  doc.Accept(writer);
+
+  auto data_input = std::make_unique<utils::ByteInputCallback>();
+  auto data_cb = std::make_unique<utils::HTTPUploadCallback>();
+  data_input->write(std::string(buffer.GetString(), buffer.GetSize()));
+  data_cb->ptr = data_input.get();
+  client->setUploadCallback(data_cb.get());
+  client->setContentType("application/json");
+
+  bool req_success = client->submit();
+
+  int64_t resp_code = client->getResponseCode();
+  const bool response_success = 200 <= resp_code && resp_code < 300;
+  const bool client_err = 400 <= resp_code && resp_code < 500;
+  const bool server_err = 500 <= resp_code && resp_code < 600;
+  if (client_err || server_err) {
+    logger_->log_error("Error response code '" "%" PRId64 "' from '%s'", resp_code, config_.url);
+  } else if (!response_success) {
+    logger_->log_warn("Non-success response code '" "%" PRId64 "' from '%s'", resp_code, config_.url);
+  } else {
+    logger_->log_debug("Response code '" "%" PRId64 "' from '%s'", resp_code, config_.url);
+  }
+
+  if (!req_success) {
+    logger_->log_error("Failed to send alert request");
+  }
+}
+
+AlertSink::LogBuffer AlertSink::LogBuffer::allocate(size_t /*size*/) {
+  return {};
+}
+
+AlertSink::LogBuffer AlertSink::LogBuffer::commit() {
+  return std::move(*this);
+}
+
+size_t AlertSink::LogBuffer::size() const {
+  return size_;
+}
+
+void AlertSink::LiveLogSet::setLifetime(std::chrono::milliseconds lifetime) {
+  lifetime_ = lifetime;
+}
+
+bool AlertSink::LiveLogSet::tryAdd(std::chrono::milliseconds now, size_t hash) {
+  auto limit = now - lifetime_;
+  while (!timestamped_hashes_.empty() && timestamped_hashes_.front().first < limit) {

Review Comment:
   I think chronological order is already guaranteed as by default we use a steady clock 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
szaszm commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r945896725


##########
libminifi/include/utils/Hash.h:
##########
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+
+namespace org::apache::nifi::minifi::utils {
+
+// boost::hash_combine

Review Comment:
   Please clarify that this is coming from the docs, not the sources. Copying the source would require appropriate LICENSE/NOTICE changes, while reimplementing what the docs document is a new work from a copyright perspective.



##########
libminifi/src/utils/TimeUtil.cpp:
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/TimeUtil.h"
+
+namespace org::apache::nifi::minifi::utils::timeutils {
+
+static std::mutex global_clock_mtx;
+static std::shared_ptr<Clock> global_clock{std::make_shared<SteadyClock>()};
+
+std::shared_ptr<Clock> getClock() {
+  std::lock_guard lock(global_clock_mtx);
+  return global_clock;
+}
+
+// test-only utility to specify what clock to use
+void setClock(std::shared_ptr<Clock> clock) {
+  std::lock_guard lock(global_clock_mtx);
+  global_clock = std::move(clock);
+}

Review Comment:
   These globals can be avoided. Please use dependency injection.



##########
libminifi/include/utils/Hash.h:
##########
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+
+namespace org::apache::nifi::minifi::utils {
+
+// boost::hash_combine
+inline size_t hash_combine(size_t seed, size_t h) noexcept {
+  seed ^= h + 0x9e3779b9 + (seed << 6U) + (seed >> 2U);
+  return seed;
+}

Review Comment:
   If we want to follow the boost interface:
   ```suggestion
   inline void hash_combine(size_t& seed, size_t new_hash) noexcept {
     seed ^= new_hash + 0x9e3779b9 + (seed << 6U) + (seed >> 2U);
   }
   ```
   
   or simplify if not:
   ```suggestion
   inline size_t hash_combine(size_t seed, size_t new_hash) noexcept {
     return seed ^ new_hash + 0x9e3779b9 + (seed << 6U) + (seed >> 2U);
   }
   ```
   
   I changed the second parameter name in both suggestions to be more descriptive.
   
   I would go for the first version, but I'm fine with either one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
szaszm commented on PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#issuecomment-1224653894

   pushed a rebase to main
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r940173589


##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  live_logs_.setLifetime(config_.rate_limit);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = filter_str.value();
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::make_shared<AlertSink>(std::move(config), std::move(logger));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));
+}
+
+void AlertSink::sink_it_(const spdlog::details::log_msg& msg) {
+  // this method is protected upstream in base_sink by a mutex
+
+  std::match_results<std::string_view::const_iterator> match;
+  std::string_view payload(msg.payload.data(), msg.payload.size());
+  if (!std::regex_match(payload.begin(), payload.end(), match, config_.filter)) {
+    return;
+  }
+  size_t hash = 0;
+  for (size_t idx = 1; idx < match.size(); ++idx) {
+    std::string_view submatch;
+    if (match[idx].first != match[idx].second) {
+      // TODO(adebreceni): std::string_view(It begin, It end) is not yet supported on all platforms
+      submatch = std::string_view(std::to_address(match[idx].first), std::distance(match[idx].first, match[idx].second));
+    }
+    hash = utils::hash_combine(hash, std::hash<std::string_view>{}(submatch));
+  }
+  if (!live_logs_.tryAdd(clock_->timeSinceEpoch(), hash)) {
+    return;
+  }
+
+  spdlog::memory_buf_t formatted;
+  formatter_->format(msg, formatted);
+
+  buffer_.modify([&] (LogBuffer& log_buf) {
+    log_buf.size_ += formatted.size();
+    log_buf.data_.emplace_back(std::string{formatted.data(), formatted.size()}, hash);
+  });
+}
+
+void AlertSink::flush_() {}
+
+void AlertSink::run() {
+  while (running_) {
+    {
+      std::unique_lock lock(mtx_);
+      next_flush_ = clock_->wait_until(cv_, lock, next_flush_, [&] {return !running_;}) + config_.flush_period;
+    }
+    std::unique_ptr<Services> services(services_.exchange(nullptr));
+    if (!services || !running_) {
+      continue;
+    }
+    try {
+      send(*services);
+    } catch (const std::exception& err) {
+      logger_->log_error("Exception while sending logs: %s", err.what());
+    } catch (...) {
+      logger_->log_error("Unknown exception while sending logs");
+    }
+    Services* expected{nullptr};
+    // only restore the services pointer if no initialize set it to something else meanwhile
+    if (services_.compare_exchange_strong(expected, services.get())) {
+      (void)services.release();
+    }
+  }
+}
+
+AlertSink::~AlertSink() {
+  {
+    std::lock_guard lock(mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+  if (flush_thread_.joinable()) {
+    flush_thread_.join();
+  }
+}

Review Comment:
   nice catch



##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  live_logs_.setLifetime(config_.rate_limit);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = filter_str.value();
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::make_shared<AlertSink>(std::move(config), std::move(logger));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));
+}
+
+void AlertSink::sink_it_(const spdlog::details::log_msg& msg) {
+  // this method is protected upstream in base_sink by a mutex
+
+  std::match_results<std::string_view::const_iterator> match;
+  std::string_view payload(msg.payload.data(), msg.payload.size());
+  if (!std::regex_match(payload.begin(), payload.end(), match, config_.filter)) {
+    return;
+  }
+  size_t hash = 0;
+  for (size_t idx = 1; idx < match.size(); ++idx) {
+    std::string_view submatch;
+    if (match[idx].first != match[idx].second) {
+      // TODO(adebreceni): std::string_view(It begin, It end) is not yet supported on all platforms
+      submatch = std::string_view(std::to_address(match[idx].first), std::distance(match[idx].first, match[idx].second));
+    }
+    hash = utils::hash_combine(hash, std::hash<std::string_view>{}(submatch));
+  }
+  if (!live_logs_.tryAdd(clock_->timeSinceEpoch(), hash)) {
+    return;
+  }
+
+  spdlog::memory_buf_t formatted;
+  formatter_->format(msg, formatted);
+
+  buffer_.modify([&] (LogBuffer& log_buf) {
+    log_buf.size_ += formatted.size();
+    log_buf.data_.emplace_back(std::string{formatted.data(), formatted.size()}, hash);
+  });
+}
+
+void AlertSink::flush_() {}
+
+void AlertSink::run() {
+  while (running_) {
+    {
+      std::unique_lock lock(mtx_);
+      next_flush_ = clock_->wait_until(cv_, lock, next_flush_, [&] {return !running_;}) + config_.flush_period;
+    }
+    std::unique_ptr<Services> services(services_.exchange(nullptr));
+    if (!services || !running_) {
+      continue;
+    }
+    try {
+      send(*services);
+    } catch (const std::exception& err) {
+      logger_->log_error("Exception while sending logs: %s", err.what());
+    } catch (...) {
+      logger_->log_error("Unknown exception while sending logs");
+    }
+    Services* expected{nullptr};
+    // only restore the services pointer if no initialize set it to something else meanwhile
+    if (services_.compare_exchange_strong(expected, services.get())) {
+      (void)services.release();
+    }
+  }
+}
+
+AlertSink::~AlertSink() {
+  {
+    std::lock_guard lock(mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+  if (flush_thread_.joinable()) {
+    flush_thread_.join();
+  }
+}

Review Comment:
   nice catch, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r940172874


##########
libminifi/src/core/logging/LoggerConfiguration.cpp:
##########
@@ -60,26 +62,28 @@ namespace org::apache::nifi::minifi::core::logging {
 
 const char* LoggerConfiguration::spdlog_default_pattern = "[%Y-%m-%d %H:%M:%S.%e] [%n] [%l] %v";
 
-namespace {
-std::optional<spdlog::level::level_enum> parse_log_level(const std::string& level_name) {
-  if (utils::StringUtils::equalsIgnoreCase(level_name, "trace")) {
-    return spdlog::level::trace;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "debug")) {
-    return spdlog::level::debug;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "info")) {
-    return spdlog::level::info;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "warn")) {
-    return spdlog::level::warn;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "error")) {
-    return spdlog::level::err;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "critical")) {
-    return spdlog::level::critical;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "off")) {
-    return spdlog::level::off;
+namespace internal {
+
+bool LoggerNamespace::findSink(std::function<bool(const std::shared_ptr<spdlog::sinks::sink>&)> filter) const {

Review Comment:
   changed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r940173244


##########
extensions/http-curl/tests/AlertTests.cpp:
##########
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#define CATCH_CONFIG_MAIN
+#include "TestBase.h"
+#include "Catch.h"
+#include "ServerAwareHandler.h"
+#include "CivetServer.h"
+#include "TestServer.h"
+#include "HTTPIntegrationBase.h"
+#include "rapidjson/document.h"
+#include "EmptyFlow.h"
+#include "Utils.h"
+#include "TestUtils.h"
+
+class AlertHandler : public ServerAwareHandler {
+ public:
+  explicit AlertHandler(std::string agent_id): agent_id_(std::move(agent_id)) {}
+
+  bool handlePut(CivetServer* , struct mg_connection *conn) override {
+    auto msg = readPayload(conn);
+    rapidjson::Document doc;
+    rapidjson::ParseResult res = doc.Parse(msg.c_str());
+    REQUIRE(static_cast<bool>(res));
+    REQUIRE(doc.IsObject());
+    REQUIRE(doc.HasMember("agentId"));
+    REQUIRE(doc["agentId"].IsString());
+    REQUIRE(doc.HasMember("alerts"));
+    REQUIRE(doc["alerts"].IsArray());
+    REQUIRE(doc["alerts"].Size() > 0);
+    std::string id(doc["agentId"].GetString(), doc["agentId"].GetStringLength());
+    REQUIRE(id == agent_id_);
+    std::vector<std::string> batch;
+    for (size_t i = 0; i < doc["alerts"].Size(); ++i) {
+      REQUIRE(doc["alerts"][i].IsString());
+      batch.emplace_back(doc["alerts"][i].GetString(), doc["alerts"][i].GetStringLength());
+    }
+    alerts_.enqueue(std::move(batch));
+    return true;
+  }
+
+  std::string agent_id_;
+  utils::ConditionConcurrentQueue<std::vector<std::string>> alerts_;
+};
+
+class VerifyAlerts : public HTTPIntegrationBase {
+ public:
+  void testSetup() override {}
+
+  void runAssertions() override {
+    verify_();
+  }
+
+  std::function<bool()> verify_;
+};
+
+TEST_CASE("Alert system forwards logs") {
+  auto clock = std::make_shared<utils::ManualClock>();
+  utils::timeutils::setClock(clock);
+
+  TempDirectory dir;
+  auto flow_file = std::filesystem::path(dir.getPath()) / "config.yml";
+  std::ofstream(flow_file) << empty_flow;
+
+  std::string agent_id = "test-agent-1";
+  VerifyAlerts harness;
+  AlertHandler handler(agent_id);
+  harness.setUrl("http://localhost:0/api/alerts", &handler);
+  harness.getConfiguration()->set(minifi::Configuration::nifi_c2_agent_identifier, agent_id);
+  harness.getConfiguration()->setHome(dir.getPath());
+
+  auto log_props = std::make_shared<logging::LoggerProperties>();
+  log_props->set("appender.alert1", "alert");
+  log_props->set("appender.alert1.url", harness.getC2RestUrl());
+  log_props->set("appender.alert1.filter", ".*<begin>(.*)<end>.*");
+  log_props->set("appender.alert1.rate.limit", "10 s");
+  log_props->set("appender.alert1.flush.period", "1 s");
+  log_props->set("logger.root", "INFO,alert1");
+  logging::LoggerConfiguration::getConfiguration().initialize(log_props);

Review Comment:
   added commented section to `conf/minifi-log.properties`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
szaszm commented on PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#issuecomment-1225975920

   force pushed the same thing to trigger github actions, because check didn't run for some reason


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r942309625


##########
libminifi/include/utils/TestUtils.h:
##########
@@ -62,14 +63,45 @@ Identifier generateUUID() {
 
 class ManualClock : public timeutils::Clock {
  public:
-  [[nodiscard]] std::chrono::milliseconds timeSinceEpoch() const override { return time_; }
-  void advance(std::chrono::milliseconds elapsed_time) { time_ += elapsed_time; }
+  [[nodiscard]] std::chrono::milliseconds timeSinceEpoch() const override {
+    std::lock_guard lock(mtx_);
+    return time_;
+  }
+  void advance(std::chrono::milliseconds elapsed_time) {
+    std::lock_guard lock(mtx_);
+    time_ += elapsed_time;
+    for (auto* cv : cvs_) {
+      cv->notify_all();
+    }
+  }
+  std::chrono::milliseconds wait_until(std::condition_variable& cv, std::unique_lock<std::mutex>& lck, std::chrono::milliseconds time, const std::function<bool()>& pred) override {
+    std::chrono::milliseconds now;
+    {
+      std::unique_lock lock(mtx_);
+      now = time_;
+      cvs_.insert(&cv);
+    }
+    if (now < time) {
+      cv.wait_for(lck, time - now, [&] {
+        now = timeSinceEpoch();
+        return now >= time || pred();
+      });
+    }
+    {
+      std::unique_lock lock(mtx_);
+      cvs_.erase(&cv);
+    }
+    return now;
+  }
 
  private:
+  mutable std::mutex mtx_;
+  std::unordered_set<std::condition_variable*> cvs_;
   std::chrono::milliseconds time_{0};
 };
 
 
+

Review Comment:
   nitpick: I would remove one newline instead of adding one :)



##########
libminifi/include/core/logging/alert/AlertSink.h:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <unordered_set>
+#include <regex>
+#include <utility>
+#include <string>
+#include <memory>
+
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/logging/LoggerProperties.h"
+#include "utils/ThreadPool.h"
+#include "utils/StagingQueue.h"
+#include "properties/Configure.h"
+#include "spdlog/sinks/base_sink.h"
+
+namespace org::apache::nifi::minifi::controllers {
+class SSLContextService;
+}  // namespace org::apache::nifi::minifi::controllers
+
+namespace org::apache::nifi::minifi::core::logging {
+
+class AlertSink : public spdlog::sinks::base_sink<std::mutex> {
+  struct Config {
+    std::string url;
+    std::optional<std::string> ssl_service_name;
+    int batch_size;
+    std::chrono::milliseconds flush_period;
+    std::chrono::milliseconds rate_limit;
+    int buffer_limit;
+    std::regex filter;
+    spdlog::level::level_enum level;
+  };
+
+  struct Services {
+    std::shared_ptr<controllers::SSLContextService> ssl_service;
+    std::shared_ptr<AgentIdentificationProvider> agent_id;
+  };
+
+  struct LogBuffer {
+    size_t size_{0};
+    std::deque<std::pair<std::string, size_t>> data_;
+
+    static LogBuffer allocate(size_t size);
+    LogBuffer commit();
+    [[nodiscard]]
+    size_t size() const;
+  };
+
+  class LiveLogSet {
+    using Hash = size_t;
+    std::chrono::milliseconds lifetime_{};
+    std::unordered_set<Hash> hashes_to_ignore_;
+    std::deque<std::pair<std::chrono::milliseconds, Hash>> timestamped_hashes_;
+   public:

Review Comment:
   nitpick: Prefer the order of public members before privates ones according to the [cpp guideline](https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rl-order)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r951099429


##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+#include "controllers/SSLContextService.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      live_logs_(config_.rate_limit),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = utils::Regex{filter_str.value()};
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::shared_ptr<AlertSink>(new AlertSink(std::move(config), std::move(logger)));

Review Comment:
   my understanding is that the storage will be automatically freed up https://eel.is/c++draft/except.ctor#note-2
   exception-safety of `std::shared_ptr<T>(new T{...})` was only of concern until C++17 before which a call like `f(std::shared_ptr<T>(new T{...}, g())` could indeed leak if `g()` throws between ~allocating in `new T{...}` and the constructor call of that object~ the `new T{...}` and calling the `shared_ptr` constructor, from C++17 arguments are indeterminately sequenced so even this is not a concern anymore 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
szaszm closed pull request #1367: MINIFICPP-1822 - Add alert capability
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r949021385


##########
libminifi/include/utils/Hash.h:
##########
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+
+namespace org::apache::nifi::minifi::utils {
+
+// boost::hash_combine

Review Comment:
   updated the comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
szaszm commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r950277874


##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+#include "controllers/SSLContextService.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      live_logs_(config_.rate_limit),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = utils::Regex{filter_str.value()};
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::shared_ptr<AlertSink>(new AlertSink(std::move(config), std::move(logger)));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));

Review Comment:
   Reading this line is confusing. Using a raw pointer also makes the class unsafe to copy and move. Any way to use a unique_ptr instead? If not, could you write or disable copy and move?



##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+#include "controllers/SSLContextService.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      live_logs_(config_.rate_limit),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = utils::Regex{filter_str.value()};
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::shared_ptr<AlertSink>(new AlertSink(std::move(config), std::move(logger)));

Review Comment:
   This is not exception-safe: If the allocation succeeds, but the constructor call throws, then the AlertSink object is leaked. An easy fix would be wrapping it in unique_ptr first, since that constructor can't throw.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r952559630


##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+#include "controllers/SSLContextService.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      live_logs_(config_.rate_limit),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = utils::Regex{filter_str.value()};
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::shared_ptr<AlertSink>(new AlertSink(std::move(config), std::move(logger)));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));

Review Comment:
   disabled copy/move ctor/assignment in [017d5bd](https://github.com/apache/nifi-minifi-cpp/pull/1367/commits/017d5bd474704cba5c2c82ed34915ab6555577db)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r940174324


##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  live_logs_.setLifetime(config_.rate_limit);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = filter_str.value();
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::make_shared<AlertSink>(std::move(config), std::move(logger));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));
+}
+
+void AlertSink::sink_it_(const spdlog::details::log_msg& msg) {
+  // this method is protected upstream in base_sink by a mutex
+
+  std::match_results<std::string_view::const_iterator> match;
+  std::string_view payload(msg.payload.data(), msg.payload.size());
+  if (!std::regex_match(payload.begin(), payload.end(), match, config_.filter)) {
+    return;
+  }
+  size_t hash = 0;
+  for (size_t idx = 1; idx < match.size(); ++idx) {
+    std::string_view submatch;
+    if (match[idx].first != match[idx].second) {
+      // TODO(adebreceni): std::string_view(It begin, It end) is not yet supported on all platforms
+      submatch = std::string_view(std::to_address(match[idx].first), std::distance(match[idx].first, match[idx].second));
+    }
+    hash = utils::hash_combine(hash, std::hash<std::string_view>{}(submatch));
+  }
+  if (!live_logs_.tryAdd(clock_->timeSinceEpoch(), hash)) {
+    return;
+  }
+
+  spdlog::memory_buf_t formatted;
+  formatter_->format(msg, formatted);
+
+  buffer_.modify([&] (LogBuffer& log_buf) {
+    log_buf.size_ += formatted.size();
+    log_buf.data_.emplace_back(std::string{formatted.data(), formatted.size()}, hash);
+  });
+}
+
+void AlertSink::flush_() {}
+
+void AlertSink::run() {
+  while (running_) {
+    {
+      std::unique_lock lock(mtx_);
+      next_flush_ = clock_->wait_until(cv_, lock, next_flush_, [&] {return !running_;}) + config_.flush_period;
+    }
+    std::unique_ptr<Services> services(services_.exchange(nullptr));
+    if (!services || !running_) {
+      continue;
+    }
+    try {
+      send(*services);
+    } catch (const std::exception& err) {
+      logger_->log_error("Exception while sending logs: %s", err.what());
+    } catch (...) {
+      logger_->log_error("Unknown exception while sending logs");
+    }
+    Services* expected{nullptr};
+    // only restore the services pointer if no initialize set it to something else meanwhile
+    if (services_.compare_exchange_strong(expected, services.get())) {
+      (void)services.release();
+    }
+  }
+}
+
+AlertSink::~AlertSink() {
+  {
+    std::lock_guard lock(mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+  if (flush_thread_.joinable()) {
+    flush_thread_.join();
+  }
+}
+
+void AlertSink::send(Services& services) {
+  LogBuffer logs;
+  buffer_.commit();
+  if (!buffer_.tryDequeue(logs)) {
+    return;
+  }
+
+  auto client = core::ClassLoader::getDefaultClassLoader().instantiate<utils::BaseHTTPClient>("HTTPClient", "HTTPClient");
+  if (!client) {
+    logger_->log_error("Could not instantiate a HTTPClient object");
+    return;
+  }
+  client->initialize("PUT", config_.url, services.ssl_service);
+
+  rapidjson::Document doc(rapidjson::kObjectType);
+  std::string agent_id = services.agent_id->getAgentIdentifier();
+  doc.AddMember("agentId", rapidjson::Value(agent_id.data(), agent_id.length()), doc.GetAllocator());
+  doc.AddMember("alerts", rapidjson::Value(rapidjson::kArrayType), doc.GetAllocator());
+  for (const auto& [log, _] : logs.data_) {
+    doc["alerts"].PushBack(rapidjson::Value(log.data(), log.size()), doc.GetAllocator());
+  }
+  rapidjson::StringBuffer buffer;
+  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+  doc.Accept(writer);
+
+  auto data_input = std::make_unique<utils::ByteInputCallback>();
+  auto data_cb = std::make_unique<utils::HTTPUploadCallback>();
+  data_input->write(std::string(buffer.GetString(), buffer.GetSize()));
+  data_cb->ptr = data_input.get();
+  client->setUploadCallback(data_cb.get());
+  client->setContentType("application/json");
+
+  bool req_success = client->submit();
+
+  int64_t resp_code = client->getResponseCode();
+  const bool client_err = 400 <= resp_code && resp_code < 500;
+  const bool server_err = 500 <= resp_code && resp_code < 600;
+  if (client_err || server_err) {
+    logger_->log_error("Error response code '" "%" PRId64 "' from '%s'", resp_code, config_.url);
+  } else {
+    logger_->log_debug("Response code '" "%" PRId64 "' from '%s'", resp_code, config_.url);

Review Comment:
   added a new warning



##########
libminifi/include/core/logging/alert/AlertSink.h:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <unordered_set>
+#include <regex>
+#include <utility>
+#include <string>
+#include <memory>
+
+#include "controllers/SSLContextService.h"

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r940177967


##########
libminifi/include/utils/TimeUtil.h:
##########
@@ -70,6 +72,16 @@ class Clock {
  public:
   virtual ~Clock() = default;
   virtual std::chrono::milliseconds timeSinceEpoch() const = 0;
+  virtual std::chrono::milliseconds wait_until(std::condition_variable& cv, std::unique_lock<std::mutex>& lck, std::chrono::milliseconds time, const std::function<bool()>& pred) {

Review Comment:
   based on the comment above `Clock`, these classes' purpose is to be mocked, as it is not possible to safely wait when using a `Clock` (as it might be an instance of `ManualClock`) I think this new method has relevance to all users of `Clock`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r934396497


##########
libminifi/include/utils/Id.h:
##########
@@ -141,18 +142,14 @@ struct hash<org::apache::nifi::minifi::utils::Identifier> {
   size_t operator()(const org::apache::nifi::minifi::utils::Identifier& id) const noexcept {
     static_assert(sizeof(org::apache::nifi::minifi::utils::Identifier) % sizeof(size_t) == 0);
     constexpr int slices = sizeof(org::apache::nifi::minifi::utils::Identifier) / sizeof(size_t);

Review Comment:
   ah yes, thanks, I somehow missed that, never mind



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r935235352


##########
libminifi/include/core/logging/alert/AlertSink.h:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <unordered_set>
+#include <regex>
+#include <utility>
+#include <string>
+#include <memory>
+
+#include "controllers/SSLContextService.h"

Review Comment:
   This include breaks the Windows build, because `SSLContextService.h` pulls in `wincrypt.h`, which `#define`s `X509_CERT_PAIR` as `(LPCSTR) 53`, but OpenSSL/LibreSSL has its own `X509_CERT_PAIR` struct which conflicts with this.
   
   Specifically, in `ExtendedKeyUsage.cpp`, we first `#include <openssl/x509v3.h>`, then we `#include "core/logging/LoggerConfiguration.h --> AlertSink.h --> SSLContextService.h --> wincrypt.h`, which redefines `X509_CERT_PAIR`.
   
   I think the best solution would be to move `#include "controllers/SSLContextService.h"` to `AlertSink.cpp`, and put a forward declaration in the header instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r947007813


##########
libminifi/include/utils/TimeUtil.h:
##########
@@ -70,6 +72,16 @@ class Clock {
  public:
   virtual ~Clock() = default;
   virtual std::chrono::milliseconds timeSinceEpoch() const = 0;
+  virtual std::chrono::milliseconds wait_until(std::condition_variable& cv, std::unique_lock<std::mutex>& lck, std::chrono::milliseconds time, const std::function<bool()>& pred) {
+    auto now = timeSinceEpoch();
+    if (now < time) {
+      cv.wait_for(lck, time - now, [&] {
+        now = timeSinceEpoch();
+        return now >= time || pred();
+      });
+    }
+    return timeSinceEpoch();

Review Comment:
   I think it would be better to tell in the return value that a timeout happened or the predicate returned true.



##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+#include "controllers/SSLContextService.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  live_logs_.setLifetime(config_.rate_limit);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = utils::Regex{filter_str.value()};
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::shared_ptr<AlertSink>(new AlertSink(std::move(config), std::move(logger)));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));
+}
+
+void AlertSink::sink_it_(const spdlog::details::log_msg& msg) {
+  // this method is protected upstream in base_sink by a mutex
+
+  // TODO(adebreceni): revisit this after MINIFICPP-1903
+  utils::SMatch match;
+  std::string_view payload(msg.payload.data(), msg.payload.size());
+  if (!utils::regexMatch(std::string{payload}, match, config_.filter)) {
+    return;
+  }
+  size_t hash = 0;
+  for (size_t idx = 1; idx < match.size(); ++idx) {
+    std::string submatch = match[idx].str();
+    hash = utils::hash_combine(hash, std::hash<std::string>{}(submatch));
+  }
+  if (!live_logs_.tryAdd(clock_->timeSinceEpoch(), hash)) {
+    return;
+  }
+
+  spdlog::memory_buf_t formatted;
+  formatter_->format(msg, formatted);
+
+  buffer_.modify([&] (LogBuffer& log_buf) {
+    log_buf.size_ += formatted.size();
+    log_buf.data_.emplace_back(std::string{formatted.data(), formatted.size()}, hash);
+  });
+}
+
+void AlertSink::flush_() {}
+
+void AlertSink::run() {
+  while (running_) {
+    {
+      std::unique_lock lock(mtx_);
+      next_flush_ = clock_->wait_until(cv_, lock, next_flush_, [&] {return !running_;}) + config_.flush_period;
+    }
+    std::unique_ptr<Services> services(services_.exchange(nullptr));
+    if (!services || !running_) {
+      continue;
+    }
+    try {
+      send(*services);
+    } catch (const std::exception& err) {
+      logger_->log_error("Exception while sending logs: %s", err.what());
+    } catch (...) {
+      logger_->log_error("Unknown exception while sending logs");
+    }
+    Services* expected{nullptr};
+    // only restore the services pointer if no initialize set it to something else meanwhile
+    if (services_.compare_exchange_strong(expected, services.get())) {
+      (void)services.release();
+    }
+  }
+}
+
+AlertSink::~AlertSink() {
+  {
+    std::lock_guard lock(mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+  if (flush_thread_.joinable()) {
+    flush_thread_.join();
+  }
+  delete services_.exchange(nullptr);
+}
+
+void AlertSink::send(Services& services) {
+  LogBuffer logs;
+  buffer_.commit();
+  if (!buffer_.tryDequeue(logs)) {
+    return;
+  }
+
+  auto client = core::ClassLoader::getDefaultClassLoader().instantiate<utils::BaseHTTPClient>("HTTPClient", "HTTPClient");
+  if (!client) {
+    logger_->log_error("Could not instantiate a HTTPClient object");
+    return;
+  }
+  client->initialize("PUT", config_.url, services.ssl_service);
+
+  rapidjson::Document doc(rapidjson::kObjectType);
+  std::string agent_id = services.agent_id->getAgentIdentifier();
+  doc.AddMember("agentId", rapidjson::Value(agent_id.data(), agent_id.length()), doc.GetAllocator());
+  doc.AddMember("alerts", rapidjson::Value(rapidjson::kArrayType), doc.GetAllocator());
+  for (const auto& [log, _] : logs.data_) {
+    doc["alerts"].PushBack(rapidjson::Value(log.data(), log.size()), doc.GetAllocator());
+  }
+  rapidjson::StringBuffer buffer;
+  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+  doc.Accept(writer);
+
+  auto data_input = std::make_unique<utils::ByteInputCallback>();
+  auto data_cb = std::make_unique<utils::HTTPUploadCallback>();
+  data_input->write(std::string(buffer.GetString(), buffer.GetSize()));
+  data_cb->ptr = data_input.get();
+  client->setUploadCallback(data_cb.get());
+  client->setContentType("application/json");
+
+  bool req_success = client->submit();
+
+  int64_t resp_code = client->getResponseCode();
+  const bool response_success = 200 <= resp_code && resp_code < 300;
+  const bool client_err = 400 <= resp_code && resp_code < 500;
+  const bool server_err = 500 <= resp_code && resp_code < 600;
+  if (client_err || server_err) {
+    logger_->log_error("Error response code '" "%" PRId64 "' from '%s'", resp_code, config_.url);
+  } else if (!response_success) {
+    logger_->log_warn("Non-success response code '" "%" PRId64 "' from '%s'", resp_code, config_.url);
+  } else {
+    logger_->log_debug("Response code '" "%" PRId64 "' from '%s'", resp_code, config_.url);
+  }
+
+  if (!req_success) {
+    logger_->log_error("Failed to send alert request");
+  }
+}
+
+AlertSink::LogBuffer AlertSink::LogBuffer::allocate(size_t /*size*/) {
+  return {};
+}
+
+AlertSink::LogBuffer AlertSink::LogBuffer::commit() {
+  return std::move(*this);
+}
+
+size_t AlertSink::LogBuffer::size() const {
+  return size_;
+}
+
+void AlertSink::LiveLogSet::setLifetime(std::chrono::milliseconds lifetime) {
+  lifetime_ = lifetime;
+}
+
+bool AlertSink::LiveLogSet::tryAdd(std::chrono::milliseconds now, size_t hash) {
+  auto limit = now - lifetime_;
+  while (!timestamped_hashes_.empty() && timestamped_hashes_.front().first < limit) {

Review Comment:
   This is assuming that `timestamped_hashes_` is having the hashes ordered in a chronological order. I think it would be better to use a priority queue for that. Also, they could become invalid if `setLifeTime` is called between two `tryAdd`s and `lifetime_` is modified. But since I just see one call for it in the ` AlertSink` constructor, can this method be removed and `lifetime_` set from constructor only?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r951101401


##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+#include "controllers/SSLContextService.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      live_logs_(config_.rate_limit),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = utils::Regex{filter_str.value()};
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::shared_ptr<AlertSink>(new AlertSink(std::move(config), std::move(logger)));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));

Review Comment:
   copy and move are automatically disabled as `AlertSink` has both `std::mutex` and `std::atomic` members, of course we could make this explicit, should we make it explicit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r949028084


##########
libminifi/src/utils/TimeUtil.cpp:
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/TimeUtil.h"
+
+namespace org::apache::nifi::minifi::utils::timeutils {
+
+static std::mutex global_clock_mtx;
+static std::shared_ptr<Clock> global_clock{std::make_shared<SteadyClock>()};
+
+std::shared_ptr<Clock> getClock() {
+  std::lock_guard lock(global_clock_mtx);
+  return global_clock;
+}
+
+// test-only utility to specify what clock to use
+void setClock(std::shared_ptr<Clock> clock) {
+  std::lock_guard lock(global_clock_mtx);
+  global_clock = std::move(clock);
+}

Review Comment:
   routing the clock through the logging system would be quite the hassle for something that has a test-only relevance, also being able to access this clock from anywhere makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r950087185


##########
libminifi/include/utils/TimeUtil.h:
##########
@@ -70,6 +72,16 @@ class Clock {
  public:
   virtual ~Clock() = default;
   virtual std::chrono::milliseconds timeSinceEpoch() const = 0;
+  virtual std::chrono::milliseconds wait_until(std::condition_variable& cv, std::unique_lock<std::mutex>& lck, std::chrono::milliseconds time, const std::function<bool()>& pred) {
+    auto now = timeSinceEpoch();
+    if (now < time) {
+      cv.wait_for(lck, time - now, [&] {
+        now = timeSinceEpoch();
+        return now >= time || pred();
+      });
+    }
+    return timeSinceEpoch();

Review Comment:
   yes it makes sense to be inline with the interface of `std::condition_variable::wait_until`, changed it to return a `bool`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r951099429


##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+#include "controllers/SSLContextService.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      live_logs_(config_.rate_limit),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = utils::Regex{filter_str.value()};
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::shared_ptr<AlertSink>(new AlertSink(std::move(config), std::move(logger)));

Review Comment:
   my understanding is that the storage will be automatically freed up https://eel.is/c++draft/except.ctor#note-2
   exception-safety of `std::shared_ptr<T>(new T{...})` was only of concern until C++17 before which a call like `f(std::shared_ptr<T>(new T{...}), g())` could indeed leak if `g()` throws between ~allocating in `new T{...}` and the constructor call of that object~ the `new T{...}` and calling the `shared_ptr` constructor, from C++17 arguments are indeterminately sequenced so even this is not a concern anymore 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r933201410


##########
extensions/http-curl/tests/AlertTests.cpp:
##########
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#define CATCH_CONFIG_MAIN
+#include "TestBase.h"
+#include "Catch.h"
+#include "ServerAwareHandler.h"
+#include "CivetServer.h"
+#include "TestServer.h"
+#include "HTTPIntegrationBase.h"
+#include "rapidjson/document.h"
+#include "EmptyFlow.h"
+#include "Utils.h"
+#include "TestUtils.h"
+
+class AlertHandler : public ServerAwareHandler {
+ public:
+  explicit AlertHandler(std::string agent_id): agent_id_(std::move(agent_id)) {}
+
+  bool handlePut(CivetServer* , struct mg_connection *conn) override {
+    auto msg = readPayload(conn);
+    rapidjson::Document doc;
+    rapidjson::ParseResult res = doc.Parse(msg.c_str());
+    REQUIRE(static_cast<bool>(res));
+    REQUIRE(doc.IsObject());
+    REQUIRE(doc.HasMember("agentId"));
+    REQUIRE(doc["agentId"].IsString());
+    REQUIRE(doc.HasMember("alerts"));
+    REQUIRE(doc["alerts"].IsArray());
+    REQUIRE(doc["alerts"].Size() > 0);
+    std::string id(doc["agentId"].GetString(), doc["agentId"].GetStringLength());
+    REQUIRE(id == agent_id_);
+    std::vector<std::string> batch;
+    for (size_t i = 0; i < doc["alerts"].Size(); ++i) {
+      REQUIRE(doc["alerts"][i].IsString());
+      batch.emplace_back(doc["alerts"][i].GetString(), doc["alerts"][i].GetStringLength());
+    }
+    alerts_.enqueue(std::move(batch));
+    return true;
+  }
+
+  std::string agent_id_;
+  utils::ConditionConcurrentQueue<std::vector<std::string>> alerts_;
+};
+
+class VerifyAlerts : public HTTPIntegrationBase {
+ public:
+  void testSetup() override {}
+
+  void runAssertions() override {
+    verify_();
+  }
+
+  std::function<bool()> verify_;
+};
+
+TEST_CASE("Alert system forwards logs") {
+  auto clock = std::make_shared<utils::ManualClock>();
+  utils::timeutils::setClock(clock);
+
+  TempDirectory dir;
+  auto flow_file = std::filesystem::path(dir.getPath()) / "config.yml";

Review Comment:
   ```suggestion
     auto flow_config_file = std::filesystem::path(dir.getPath()) / "config.yml";
   ```
   since a "flow file" is a different thing



##########
extensions/http-curl/tests/AlertTests.cpp:
##########
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#define CATCH_CONFIG_MAIN
+#include "TestBase.h"
+#include "Catch.h"
+#include "ServerAwareHandler.h"
+#include "CivetServer.h"
+#include "TestServer.h"
+#include "HTTPIntegrationBase.h"
+#include "rapidjson/document.h"
+#include "EmptyFlow.h"
+#include "Utils.h"
+#include "TestUtils.h"
+
+class AlertHandler : public ServerAwareHandler {
+ public:
+  explicit AlertHandler(std::string agent_id): agent_id_(std::move(agent_id)) {}
+
+  bool handlePut(CivetServer* , struct mg_connection *conn) override {
+    auto msg = readPayload(conn);
+    rapidjson::Document doc;
+    rapidjson::ParseResult res = doc.Parse(msg.c_str());
+    REQUIRE(static_cast<bool>(res));
+    REQUIRE(doc.IsObject());
+    REQUIRE(doc.HasMember("agentId"));
+    REQUIRE(doc["agentId"].IsString());
+    REQUIRE(doc.HasMember("alerts"));
+    REQUIRE(doc["alerts"].IsArray());
+    REQUIRE(doc["alerts"].Size() > 0);
+    std::string id(doc["agentId"].GetString(), doc["agentId"].GetStringLength());
+    REQUIRE(id == agent_id_);
+    std::vector<std::string> batch;
+    for (size_t i = 0; i < doc["alerts"].Size(); ++i) {
+      REQUIRE(doc["alerts"][i].IsString());
+      batch.emplace_back(doc["alerts"][i].GetString(), doc["alerts"][i].GetStringLength());
+    }
+    alerts_.enqueue(std::move(batch));
+    return true;
+  }
+
+  std::string agent_id_;
+  utils::ConditionConcurrentQueue<std::vector<std::string>> alerts_;
+};
+
+class VerifyAlerts : public HTTPIntegrationBase {
+ public:
+  void testSetup() override {}
+
+  void runAssertions() override {
+    verify_();
+  }
+
+  std::function<bool()> verify_;
+};
+
+TEST_CASE("Alert system forwards logs") {
+  auto clock = std::make_shared<utils::ManualClock>();
+  utils::timeutils::setClock(clock);
+
+  TempDirectory dir;
+  auto flow_file = std::filesystem::path(dir.getPath()) / "config.yml";
+  std::ofstream(flow_file) << empty_flow;
+
+  std::string agent_id = "test-agent-1";
+  VerifyAlerts harness;
+  AlertHandler handler(agent_id);
+  harness.setUrl("http://localhost:0/api/alerts", &handler);
+  harness.getConfiguration()->set(minifi::Configuration::nifi_c2_agent_identifier, agent_id);
+  harness.getConfiguration()->setHome(dir.getPath());
+
+  auto log_props = std::make_shared<logging::LoggerProperties>();
+  log_props->set("appender.alert1", "alert");
+  log_props->set("appender.alert1.url", harness.getC2RestUrl());
+  log_props->set("appender.alert1.filter", ".*<begin>(.*)<end>.*");
+  log_props->set("appender.alert1.rate.limit", "10 s");
+  log_props->set("appender.alert1.flush.period", "1 s");
+  log_props->set("logger.root", "INFO,alert1");
+  logging::LoggerConfiguration::getConfiguration().initialize(log_props);

Review Comment:
   It could be useful to add something like this in a `minifi-log.properties` file under `examples`.



##########
libminifi/src/core/logging/LoggerConfiguration.cpp:
##########
@@ -60,26 +62,28 @@ namespace org::apache::nifi::minifi::core::logging {
 
 const char* LoggerConfiguration::spdlog_default_pattern = "[%Y-%m-%d %H:%M:%S.%e] [%n] [%l] %v";
 
-namespace {
-std::optional<spdlog::level::level_enum> parse_log_level(const std::string& level_name) {
-  if (utils::StringUtils::equalsIgnoreCase(level_name, "trace")) {
-    return spdlog::level::trace;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "debug")) {
-    return spdlog::level::debug;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "info")) {
-    return spdlog::level::info;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "warn")) {
-    return spdlog::level::warn;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "error")) {
-    return spdlog::level::err;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "critical")) {
-    return spdlog::level::critical;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "off")) {
-    return spdlog::level::off;
+namespace internal {
+
+bool LoggerNamespace::findSink(std::function<bool(const std::shared_ptr<spdlog::sinks::sink>&)> filter) const {

Review Comment:
   It is kind of interesting that we can apply an operation to all sinks by using a "find" function and giving it a predicate which always returns `false`, but since we never use `findSink` to find a sink, I think it would be more readable, and simpler, to replace it with `void forEachSink(std::function<void(...)> operation)`.



##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  live_logs_.setLifetime(config_.rate_limit);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = filter_str.value();
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::make_shared<AlertSink>(std::move(config), std::move(logger));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));
+}
+
+void AlertSink::sink_it_(const spdlog::details::log_msg& msg) {
+  // this method is protected upstream in base_sink by a mutex
+
+  std::match_results<std::string_view::const_iterator> match;
+  std::string_view payload(msg.payload.data(), msg.payload.size());
+  if (!std::regex_match(payload.begin(), payload.end(), match, config_.filter)) {
+    return;
+  }
+  size_t hash = 0;
+  for (size_t idx = 1; idx < match.size(); ++idx) {
+    std::string_view submatch;
+    if (match[idx].first != match[idx].second) {
+      // TODO(adebreceni): std::string_view(It begin, It end) is not yet supported on all platforms
+      submatch = std::string_view(std::to_address(match[idx].first), std::distance(match[idx].first, match[idx].second));
+    }
+    hash = utils::hash_combine(hash, std::hash<std::string_view>{}(submatch));
+  }
+  if (!live_logs_.tryAdd(clock_->timeSinceEpoch(), hash)) {
+    return;
+  }
+
+  spdlog::memory_buf_t formatted;
+  formatter_->format(msg, formatted);
+
+  buffer_.modify([&] (LogBuffer& log_buf) {
+    log_buf.size_ += formatted.size();
+    log_buf.data_.emplace_back(std::string{formatted.data(), formatted.size()}, hash);
+  });
+}
+
+void AlertSink::flush_() {}
+
+void AlertSink::run() {
+  while (running_) {
+    {
+      std::unique_lock lock(mtx_);
+      next_flush_ = clock_->wait_until(cv_, lock, next_flush_, [&] {return !running_;}) + config_.flush_period;
+    }
+    std::unique_ptr<Services> services(services_.exchange(nullptr));
+    if (!services || !running_) {
+      continue;
+    }
+    try {
+      send(*services);
+    } catch (const std::exception& err) {
+      logger_->log_error("Exception while sending logs: %s", err.what());
+    } catch (...) {
+      logger_->log_error("Unknown exception while sending logs");
+    }
+    Services* expected{nullptr};
+    // only restore the services pointer if no initialize set it to something else meanwhile
+    if (services_.compare_exchange_strong(expected, services.get())) {
+      (void)services.release();
+    }
+  }
+}
+
+AlertSink::~AlertSink() {
+  {
+    std::lock_guard lock(mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+  if (flush_thread_.joinable()) {
+    flush_thread_.join();
+  }
+}

Review Comment:
   do we need to delete `services_`, too?



##########
libminifi/include/utils/Id.h:
##########
@@ -141,18 +142,14 @@ struct hash<org::apache::nifi::minifi::utils::Identifier> {
   size_t operator()(const org::apache::nifi::minifi::utils::Identifier& id) const noexcept {
     static_assert(sizeof(org::apache::nifi::minifi::utils::Identifier) % sizeof(size_t) == 0);
     constexpr int slices = sizeof(org::apache::nifi::minifi::utils::Identifier) / sizeof(size_t);

Review Comment:
   I know it's not new code, but this hash function has problems: if `sizeof(Identifier) < sizeof(size_t)`, then it reads uninitialized memory, and if `sizeof(Identifier)` is not a multiple of `sizeof(size_t)`, then the hash doesn't depend on the last few bytes of the `Identifier`.
   
   Of course, in real life the size of `Identifier` is 16, which is either 2 or 4 times the size of `size_t`, so this is not a problem.  Maybe we could add a `static_assert(sizeof(Identifier) >= sizeof(size_t) && sizeof(Identifier) % sizeof(size_t) == 0)` for now?



##########
libminifi/include/utils/TimeUtil.h:
##########
@@ -70,6 +72,16 @@ class Clock {
  public:
   virtual ~Clock() = default;
   virtual std::chrono::milliseconds timeSinceEpoch() const = 0;
+  virtual std::chrono::milliseconds wait_until(std::condition_variable& cv, std::unique_lock<std::mutex>& lck, std::chrono::milliseconds time, const std::function<bool()>& pred) {

Review Comment:
   I would prefer to have a parallel `ThreadSafeClock` hierarchy with classes which have a `Clock` (subclass) as a member instead of adding complexity to the `Clock` classes directly.



##########
libminifi/include/core/logging/alert/AlertSink.h:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <unordered_set>
+#include <regex>
+#include <utility>
+#include <string>
+#include <memory>
+
+#include "controllers/SSLContextService.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/logging/LoggerProperties.h"
+#include "utils/ThreadPool.h"
+#include "utils/StagingQueue.h"
+#include "properties/Configure.h"
+#include "spdlog/sinks/base_sink.h"
+
+
+namespace org::apache::nifi::minifi::core::logging {
+
+class AlertSink : public spdlog::sinks::base_sink<std::mutex> {
+  struct Config {
+    std::string url;
+    std::optional<std::string> ssl_service_name;
+    int batch_size;
+    std::chrono::milliseconds flush_period;
+    std::chrono::milliseconds rate_limit;
+    int buffer_limit;
+    std::regex filter;
+    spdlog::level::level_enum level;
+  };
+
+  struct Services {
+    std::shared_ptr<controllers::SSLContextService> ssl_service;
+    std::shared_ptr<AgentIdentificationProvider> agent_id;
+  };
+
+  struct LogBuffer {
+    size_t size_{0};
+    std::deque<std::pair<std::string, size_t>> data_;
+
+    static LogBuffer allocate(size_t size);
+    LogBuffer commit();
+    [[nodiscard]]
+    size_t size() const;
+  };
+
+  class LiveLogSet {
+    std::chrono::milliseconds lifetime_{};
+    std::unordered_set<size_t> ignored_;
+    std::deque<std::pair<std::chrono::milliseconds, size_t>> ordered_;

Review Comment:
   I think these could be named better; maybe
   ```suggestion
       std::unordered_set<size_t> hashes_to_ignore_;
       std::deque<std::pair<std::chrono::milliseconds, size_t>> timestamped_hashes_;
   ```
   ?
   
   Also, typedefing `size_t` to something like `Hash` could help with readability.



##########
libminifi/include/core/logging/alert/AlertSink.h:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <unordered_set>
+#include <regex>
+#include <utility>
+#include <string>
+#include <memory>
+
+#include "controllers/SSLContextService.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/logging/LoggerProperties.h"
+#include "utils/ThreadPool.h"
+#include "utils/StagingQueue.h"
+#include "properties/Configure.h"
+#include "spdlog/sinks/base_sink.h"
+
+
+namespace org::apache::nifi::minifi::core::logging {
+
+class AlertSink : public spdlog::sinks::base_sink<std::mutex> {
+  struct Config {
+    std::string url;
+    std::optional<std::string> ssl_service_name;
+    int batch_size;
+    std::chrono::milliseconds flush_period;
+    std::chrono::milliseconds rate_limit;
+    int buffer_limit;
+    std::regex filter;

Review Comment:
   Are you sure we want to use `std::regex`?  Elsewhere we generally use `utils::Regex` because `std::regex` has performance issues with long targets.  Even if long targets are not likely here, we may prefer to use the same kind of regex everywhere.



##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  live_logs_.setLifetime(config_.rate_limit);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = filter_str.value();
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::make_shared<AlertSink>(std::move(config), std::move(logger));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));
+}
+
+void AlertSink::sink_it_(const spdlog::details::log_msg& msg) {
+  // this method is protected upstream in base_sink by a mutex
+
+  std::match_results<std::string_view::const_iterator> match;
+  std::string_view payload(msg.payload.data(), msg.payload.size());
+  if (!std::regex_match(payload.begin(), payload.end(), match, config_.filter)) {
+    return;
+  }
+  size_t hash = 0;
+  for (size_t idx = 1; idx < match.size(); ++idx) {
+    std::string_view submatch;
+    if (match[idx].first != match[idx].second) {
+      // TODO(adebreceni): std::string_view(It begin, It end) is not yet supported on all platforms

Review Comment:
   Can you add to the comment on which platform it is not supported yet, please?  That way, we'll know when we can remove the workaround.



##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  live_logs_.setLifetime(config_.rate_limit);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = filter_str.value();
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::make_shared<AlertSink>(std::move(config), std::move(logger));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));
+}
+
+void AlertSink::sink_it_(const spdlog::details::log_msg& msg) {
+  // this method is protected upstream in base_sink by a mutex
+
+  std::match_results<std::string_view::const_iterator> match;
+  std::string_view payload(msg.payload.data(), msg.payload.size());
+  if (!std::regex_match(payload.begin(), payload.end(), match, config_.filter)) {
+    return;
+  }
+  size_t hash = 0;
+  for (size_t idx = 1; idx < match.size(); ++idx) {
+    std::string_view submatch;
+    if (match[idx].first != match[idx].second) {
+      // TODO(adebreceni): std::string_view(It begin, It end) is not yet supported on all platforms
+      submatch = std::string_view(std::to_address(match[idx].first), std::distance(match[idx].first, match[idx].second));
+    }
+    hash = utils::hash_combine(hash, std::hash<std::string_view>{}(submatch));
+  }
+  if (!live_logs_.tryAdd(clock_->timeSinceEpoch(), hash)) {
+    return;
+  }
+
+  spdlog::memory_buf_t formatted;
+  formatter_->format(msg, formatted);
+
+  buffer_.modify([&] (LogBuffer& log_buf) {
+    log_buf.size_ += formatted.size();
+    log_buf.data_.emplace_back(std::string{formatted.data(), formatted.size()}, hash);
+  });
+}
+
+void AlertSink::flush_() {}
+
+void AlertSink::run() {
+  while (running_) {
+    {
+      std::unique_lock lock(mtx_);
+      next_flush_ = clock_->wait_until(cv_, lock, next_flush_, [&] {return !running_;}) + config_.flush_period;
+    }
+    std::unique_ptr<Services> services(services_.exchange(nullptr));
+    if (!services || !running_) {
+      continue;
+    }
+    try {
+      send(*services);
+    } catch (const std::exception& err) {
+      logger_->log_error("Exception while sending logs: %s", err.what());
+    } catch (...) {
+      logger_->log_error("Unknown exception while sending logs");
+    }
+    Services* expected{nullptr};
+    // only restore the services pointer if no initialize set it to something else meanwhile
+    if (services_.compare_exchange_strong(expected, services.get())) {
+      (void)services.release();
+    }
+  }
+}
+
+AlertSink::~AlertSink() {
+  {
+    std::lock_guard lock(mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+  if (flush_thread_.joinable()) {
+    flush_thread_.join();
+  }
+}
+
+void AlertSink::send(Services& services) {
+  LogBuffer logs;
+  buffer_.commit();
+  if (!buffer_.tryDequeue(logs)) {
+    return;
+  }
+
+  auto client = core::ClassLoader::getDefaultClassLoader().instantiate<utils::BaseHTTPClient>("HTTPClient", "HTTPClient");
+  if (!client) {
+    logger_->log_error("Could not instantiate a HTTPClient object");
+    return;
+  }
+  client->initialize("PUT", config_.url, services.ssl_service);
+
+  rapidjson::Document doc(rapidjson::kObjectType);
+  std::string agent_id = services.agent_id->getAgentIdentifier();
+  doc.AddMember("agentId", rapidjson::Value(agent_id.data(), agent_id.length()), doc.GetAllocator());
+  doc.AddMember("alerts", rapidjson::Value(rapidjson::kArrayType), doc.GetAllocator());
+  for (const auto& [log, _] : logs.data_) {
+    doc["alerts"].PushBack(rapidjson::Value(log.data(), log.size()), doc.GetAllocator());
+  }
+  rapidjson::StringBuffer buffer;
+  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+  doc.Accept(writer);
+
+  auto data_input = std::make_unique<utils::ByteInputCallback>();
+  auto data_cb = std::make_unique<utils::HTTPUploadCallback>();
+  data_input->write(std::string(buffer.GetString(), buffer.GetSize()));
+  data_cb->ptr = data_input.get();
+  client->setUploadCallback(data_cb.get());
+  client->setContentType("application/json");
+
+  bool req_success = client->submit();
+
+  int64_t resp_code = client->getResponseCode();
+  const bool client_err = 400 <= resp_code && resp_code < 500;
+  const bool server_err = 500 <= resp_code && resp_code < 600;
+  if (client_err || server_err) {
+    logger_->log_error("Error response code '" "%" PRId64 "' from '%s'", resp_code, config_.url);
+  } else {
+    logger_->log_debug("Response code '" "%" PRId64 "' from '%s'", resp_code, config_.url);

Review Comment:
   This could be logged at `info` or `warning` level if the response code is not an error, but not a `2xx` success, either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r940173993


##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  live_logs_.setLifetime(config_.rate_limit);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + ".filter")) {
+    try {
+      config.filter = filter_str.value();
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + ".ssl.context.service");
+
+  return std::make_shared<AlertSink>(std::move(config), std::move(logger));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));
+}
+
+void AlertSink::sink_it_(const spdlog::details::log_msg& msg) {
+  // this method is protected upstream in base_sink by a mutex
+
+  std::match_results<std::string_view::const_iterator> match;
+  std::string_view payload(msg.payload.data(), msg.payload.size());
+  if (!std::regex_match(payload.begin(), payload.end(), match, config_.filter)) {
+    return;
+  }
+  size_t hash = 0;
+  for (size_t idx = 1; idx < match.size(); ++idx) {
+    std::string_view submatch;
+    if (match[idx].first != match[idx].second) {
+      // TODO(adebreceni): std::string_view(It begin, It end) is not yet supported on all platforms

Review Comment:
   added compiler version



##########
libminifi/include/core/logging/alert/AlertSink.h:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <unordered_set>
+#include <regex>
+#include <utility>
+#include <string>
+#include <memory>
+
+#include "controllers/SSLContextService.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/logging/LoggerProperties.h"
+#include "utils/ThreadPool.h"
+#include "utils/StagingQueue.h"
+#include "properties/Configure.h"
+#include "spdlog/sinks/base_sink.h"
+
+
+namespace org::apache::nifi::minifi::core::logging {
+
+class AlertSink : public spdlog::sinks::base_sink<std::mutex> {
+  struct Config {
+    std::string url;
+    std::optional<std::string> ssl_service_name;
+    int batch_size;
+    std::chrono::milliseconds flush_period;
+    std::chrono::milliseconds rate_limit;
+    int buffer_limit;
+    std::regex filter;
+    spdlog::level::level_enum level;
+  };
+
+  struct Services {
+    std::shared_ptr<controllers::SSLContextService> ssl_service;
+    std::shared_ptr<AgentIdentificationProvider> agent_id;
+  };
+
+  struct LogBuffer {
+    size_t size_{0};
+    std::deque<std::pair<std::string, size_t>> data_;
+
+    static LogBuffer allocate(size_t size);
+    LogBuffer commit();
+    [[nodiscard]]
+    size_t size() const;
+  };
+
+  class LiveLogSet {
+    std::chrono::milliseconds lifetime_{};
+    std::unordered_set<size_t> ignored_;
+    std::deque<std::pair<std::chrono::milliseconds, size_t>> ordered_;

Review Comment:
   renamed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r940174690


##########
extensions/http-curl/tests/AlertTests.cpp:
##########
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#define CATCH_CONFIG_MAIN
+#include "TestBase.h"
+#include "Catch.h"
+#include "ServerAwareHandler.h"
+#include "CivetServer.h"
+#include "TestServer.h"
+#include "HTTPIntegrationBase.h"
+#include "rapidjson/document.h"
+#include "EmptyFlow.h"
+#include "Utils.h"
+#include "TestUtils.h"
+
+class AlertHandler : public ServerAwareHandler {
+ public:
+  explicit AlertHandler(std::string agent_id): agent_id_(std::move(agent_id)) {}
+
+  bool handlePut(CivetServer* , struct mg_connection *conn) override {
+    auto msg = readPayload(conn);
+    rapidjson::Document doc;
+    rapidjson::ParseResult res = doc.Parse(msg.c_str());
+    REQUIRE(static_cast<bool>(res));
+    REQUIRE(doc.IsObject());
+    REQUIRE(doc.HasMember("agentId"));
+    REQUIRE(doc["agentId"].IsString());
+    REQUIRE(doc.HasMember("alerts"));
+    REQUIRE(doc["alerts"].IsArray());
+    REQUIRE(doc["alerts"].Size() > 0);
+    std::string id(doc["agentId"].GetString(), doc["agentId"].GetStringLength());
+    REQUIRE(id == agent_id_);
+    std::vector<std::string> batch;
+    for (size_t i = 0; i < doc["alerts"].Size(); ++i) {
+      REQUIRE(doc["alerts"][i].IsString());
+      batch.emplace_back(doc["alerts"][i].GetString(), doc["alerts"][i].GetStringLength());
+    }
+    alerts_.enqueue(std::move(batch));
+    return true;
+  }
+
+  std::string agent_id_;
+  utils::ConditionConcurrentQueue<std::vector<std::string>> alerts_;
+};
+
+class VerifyAlerts : public HTTPIntegrationBase {
+ public:
+  void testSetup() override {}
+
+  void runAssertions() override {
+    verify_();
+  }
+
+  std::function<bool()> verify_;
+};
+
+TEST_CASE("Alert system forwards logs") {
+  auto clock = std::make_shared<utils::ManualClock>();
+  utils::timeutils::setClock(clock);
+
+  TempDirectory dir;
+  auto flow_file = std::filesystem::path(dir.getPath()) / "config.yml";
+  std::ofstream(flow_file) << empty_flow;
+
+  std::string agent_id = "test-agent-1";
+  VerifyAlerts harness;
+  AlertHandler handler(agent_id);
+  harness.setUrl("http://localhost:0/api/alerts", &handler);
+  harness.getConfiguration()->set(minifi::Configuration::nifi_c2_agent_identifier, agent_id);
+  harness.getConfiguration()->setHome(dir.getPath());
+
+  auto log_props = std::make_shared<logging::LoggerProperties>();
+  log_props->set("appender.alert1", "alert");
+  log_props->set("appender.alert1.url", harness.getC2RestUrl());
+  log_props->set("appender.alert1.filter", ".*<begin>(.*)<end>.*");
+  log_props->set("appender.alert1.rate.limit", "10 s");
+  log_props->set("appender.alert1.flush.period", "1 s");
+  log_props->set("logger.root", "INFO,alert1");
+  logging::LoggerConfiguration::getConfiguration().initialize(log_props);
+
+  auto verifyLogsArrived = [&] (const std::vector<std::string>& expected) {
+    std::vector<std::string> logs;
+    REQUIRE(handler.alerts_.dequeueWaitFor(logs, 1s));
+    REQUIRE(logs.size() == expected.size());
+    for (size_t idx = 0; idx < expected.size(); ++idx) {
+      bool contains = std::search(logs[idx].begin(), logs[idx].end(), expected[idx].begin(), expected[idx].end()) != logs[idx].end();
+      REQUIRE(contains);
+    }
+  };
+
+  harness.verify_ = [&] {
+    auto logger = logging::LoggerFactory<minifi::FlowController>::getLogger();
+    // time = 0
+    logger->log_error("not matched");
+    logger->log_error("<begin>one<end>");
+    logger->log_error("not the same but treated so <begin>one<end>");
+    logger->log_error("<begin>two<end>");
+    clock->advance(2s);
+    // time = 2
+    verifyLogsArrived({
+        "<begin>one<end>", "<begin>two<end>"
+    });
+
+    clock->advance(5s);
+    // time = 7
+    // no new logs over HTTP
+
+    logger->log_error("other <begin>one<end>");
+    logger->log_error("new log <begin>three<end>");
+    clock->advance(2s);
+
+    // time = 9
+    verifyLogsArrived({
+        "new log <begin>three<end>"
+    });
+
+    clock->advance(2s);
+    // time = 11
+    logger->log_error("other <begin>one<end>");
+    logger->log_error("new log <begin>three<end>");
+    clock->advance(2s);
+    // time = 13
+
+    verifyLogsArrived({
+        "other <begin>one<end>"
+    });
+
+    return true;
+  };
+
+  harness.run(flow_file, dir.getPath());

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1367: MINIFICPP-1822 - Add alert capability

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r940172633


##########
extensions/http-curl/tests/AlertTests.cpp:
##########
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#define CATCH_CONFIG_MAIN
+#include "TestBase.h"
+#include "Catch.h"
+#include "ServerAwareHandler.h"
+#include "CivetServer.h"
+#include "TestServer.h"
+#include "HTTPIntegrationBase.h"
+#include "rapidjson/document.h"
+#include "EmptyFlow.h"
+#include "Utils.h"
+#include "TestUtils.h"
+
+class AlertHandler : public ServerAwareHandler {
+ public:
+  explicit AlertHandler(std::string agent_id): agent_id_(std::move(agent_id)) {}
+
+  bool handlePut(CivetServer* , struct mg_connection *conn) override {
+    auto msg = readPayload(conn);
+    rapidjson::Document doc;
+    rapidjson::ParseResult res = doc.Parse(msg.c_str());
+    REQUIRE(static_cast<bool>(res));
+    REQUIRE(doc.IsObject());
+    REQUIRE(doc.HasMember("agentId"));
+    REQUIRE(doc["agentId"].IsString());
+    REQUIRE(doc.HasMember("alerts"));
+    REQUIRE(doc["alerts"].IsArray());
+    REQUIRE(doc["alerts"].Size() > 0);
+    std::string id(doc["agentId"].GetString(), doc["agentId"].GetStringLength());
+    REQUIRE(id == agent_id_);
+    std::vector<std::string> batch;
+    for (size_t i = 0; i < doc["alerts"].Size(); ++i) {
+      REQUIRE(doc["alerts"][i].IsString());
+      batch.emplace_back(doc["alerts"][i].GetString(), doc["alerts"][i].GetStringLength());
+    }
+    alerts_.enqueue(std::move(batch));
+    return true;
+  }
+
+  std::string agent_id_;
+  utils::ConditionConcurrentQueue<std::vector<std::string>> alerts_;
+};
+
+class VerifyAlerts : public HTTPIntegrationBase {
+ public:
+  void testSetup() override {}
+
+  void runAssertions() override {
+    verify_();
+  }
+
+  std::function<bool()> verify_;
+};
+
+TEST_CASE("Alert system forwards logs") {
+  auto clock = std::make_shared<utils::ManualClock>();
+  utils::timeutils::setClock(clock);
+
+  TempDirectory dir;
+  auto flow_file = std::filesystem::path(dir.getPath()) / "config.yml";

Review Comment:
   changed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org