You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/08/13 09:25:36 UTC

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #868: MINIFICPP-1326 improve PublishKafka logging

szaszm commented on a change in pull request #868:
URL: https://github.com/apache/nifi-minifi-cpp/pull/868#discussion_r469819190



##########
File path: extensions/librdkafka/PublishKafka.cpp
##########
@@ -144,6 +159,305 @@ struct rd_kafka_conf_deleter {
 struct rd_kafka_topic_conf_deleter {
   void operator()(rd_kafka_topic_conf_t* p) const noexcept { rd_kafka_topic_conf_destroy(p); }
 };
+
+// Message
+enum class MessageStatus : uint8_t {
+  InFlight,
+  Error,
+  Success
+};
+
+const char* to_string(const MessageStatus s) {
+  switch (s) {
+    case MessageStatus::InFlight: return "InFlight";
+    case MessageStatus::Error: return "Error";
+    case MessageStatus::Success: return "Success";
+  }
+  throw std::runtime_error{"PublishKafka to_string(MessageStatus): unreachable code"};
+}
+
+struct MessageResult {
+  MessageStatus status = MessageStatus::InFlight;
+  rd_kafka_resp_err_t err_code = RD_KAFKA_RESP_ERR_NO_ERROR;
+};
+
+struct FlowFileResult {
+  bool flow_file_error = false;
+  std::vector<MessageResult> messages;
+};
+}  // namespace
+
+class PublishKafka::Messages {
+  std::mutex mutex_;
+  std::condition_variable cv_;
+  std::vector<FlowFileResult> flow_files_;
+  bool interrupted_ = false;
+  const std::shared_ptr<logging::Logger> logger_;
+
+  std::string logStatus(const std::unique_lock<std::mutex>& lock) const {
+    gsl_Expects(lock.owns_lock());
+    const auto messageresult_ok = [](const MessageResult r) { return r.status == MessageStatus::Success && r.err_code == RD_KAFKA_RESP_ERR_NO_ERROR; };
+    const auto messageresult_inflight = [](const MessageResult r) { return r.status == MessageStatus::InFlight && r.err_code == RD_KAFKA_RESP_ERR_NO_ERROR; };
+    std::vector<size_t> flow_files_in_flight;
+    std::ostringstream oss;
+    if (interrupted_) { oss << "interrupted, "; }
+    for (size_t ffi = 0; ffi < flow_files_.size(); ++ffi) {
+      const auto& flow_file = flow_files_[ffi];
+      if (!flow_file.flow_file_error && std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages), messageresult_ok)) {

Review comment:
       Not sure what you mean. To me line 206 reads something like: "if no flow file error and all of the flow file's messages are ok"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org