You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/05/14 07:57:37 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-1219 - PublishKafka should release connection when stopped

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new e430612  MINIFICPP-1219 - PublishKafka should release connection when stopped
e430612 is described below

commit e430612c1400b2f4486e4f848dca36ca7b3712e6
Author: Nghia Le <mi...@gmail.com>
AuthorDate: Tue May 12 19:22:33 2020 +0200

    MINIFICPP-1219 - PublishKafka should release connection when stopped
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #779
---
 extensions/librdkafka/KafkaConnection.cpp | 12 +----
 extensions/librdkafka/KafkaConnection.h   | 26 -----------
 extensions/librdkafka/KafkaPool.h         | 77 -------------------------------
 extensions/librdkafka/PublishKafka.cpp    | 38 ++++++---------
 extensions/librdkafka/PublishKafka.h      | 10 ++--
 5 files changed, 22 insertions(+), 141 deletions(-)

diff --git a/extensions/librdkafka/KafkaConnection.cpp b/extensions/librdkafka/KafkaConnection.cpp
index 04b327d..449a801 100644
--- a/extensions/librdkafka/KafkaConnection.cpp
+++ b/extensions/librdkafka/KafkaConnection.cpp
@@ -27,7 +27,6 @@ KafkaConnection::KafkaConnection(const KafkaConnectionKey &key)
     : logger_(logging::LoggerFactory<KafkaConnection>::getLogger()),
       kafka_connection_(nullptr),
       poll_(false) {
-  lease_ = false;
   initialized_ = false;
   key_ = key;
 }
@@ -42,6 +41,7 @@ void KafkaConnection::remove() {
 }
 
 void KafkaConnection::removeConnection() {
+  logger_->log_trace("KafkaConnection::removeConnection START: Client = %s -- Broker = %s", key_.client_id_, key_.brokers_);
   stopPoll();
   if (kafka_connection_) {
     rd_kafka_flush(kafka_connection_, 10 * 1000); /* wait for max 10 seconds */
@@ -52,6 +52,7 @@ void KafkaConnection::removeConnection() {
     kafka_connection_ = nullptr;
   }
   initialized_ = false;
+  logger_->log_trace("KafkaConnection::removeConnection FINISH: Client = %s -- Broker = %s", key_.client_id_, key_.brokers_);
 }
 
 bool KafkaConnection::initialized() const {
@@ -125,15 +126,6 @@ void KafkaConnection::logCallback(const rd_kafka_t* rk, int level, const char* /
   }
 }
 
-bool KafkaConnection::tryUse() {
-  std::lock_guard<std::mutex> lock(lease_mutex_);
-  if (lease_) {
-    return false;
-  }
-  lease_ = true;
-  return true;
-}
-
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/extensions/librdkafka/KafkaConnection.h b/extensions/librdkafka/KafkaConnection.h
index 5daf96d..0c3979e 100644
--- a/extensions/librdkafka/KafkaConnection.h
+++ b/extensions/librdkafka/KafkaConnection.h
@@ -71,18 +71,10 @@ class KafkaConnection {
 
   static void logCallback(const rd_kafka_t* rk, int level, const char* /*fac*/, const char* buf);
 
-  bool tryUse();
-
-  friend class KafkaLease;
-
  private:
 
   std::shared_ptr<logging::Logger> logger_;
 
-  std::mutex lease_mutex_;
-
-  bool lease_;
-
   bool initialized_;
 
   KafkaConnectionKey key_;
@@ -121,24 +113,6 @@ class KafkaConnection {
   }
 };
 
-class KafkaLease {
-  public:
-    ~KafkaLease() {
-      std::lock_guard<std::mutex> lock(conn_->lease_mutex_);
-      conn_->lease_ = false;
-    }
-    std::shared_ptr<KafkaConnection> getConn() const {
-      return conn_;
-    }
-    friend class KafkaPool;
-  private:
-    KafkaLease(std::shared_ptr<KafkaConnection> conn) // This one should be private, and only KafkaPool can call (friend).
-            : conn_(conn) {
-    }
-
-    std::shared_ptr<KafkaConnection> conn_;
-};
-
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/extensions/librdkafka/KafkaPool.h b/extensions/librdkafka/KafkaPool.h
deleted file mode 100644
index 3fe2d42..0000000
--- a/extensions/librdkafka/KafkaPool.h
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef NIFI_MINIFI_CPP_KAFKAPOOL_H
-#define NIFI_MINIFI_CPP_KAFKAPOOL_H
-
-#include "KafkaConnection.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-class KafkaPool {
- public:
-
-  explicit KafkaPool(int max)
-      : max_(max) {
-  }
-
-  bool removeConnection(const KafkaConnectionKey &key) {
-    std::lock_guard<std::mutex> lock(mutex_);
-    return map_.erase(key) == 1;
-  }
-
-  std::unique_ptr<KafkaLease> getOrCreateConnection(const KafkaConnectionKey &key) {
-    std::lock_guard<std::mutex> lock(mutex_);
-    auto connection = map_.find(key);
-    std::shared_ptr<KafkaConnection> conn;
-    if (connection == map_.end()) {
-      // Not found, create new connection.
-      conn = std::make_shared<KafkaConnection>(key);
-      if (map_.size() == max_) {
-        // Reached pool limit, remove the first one.
-        map_.erase(map_.begin());
-      }
-      map_[key] = conn;
-    } else {
-      conn = connection->second;
-    }
-    std::unique_ptr<KafkaLease> lease;
-    if (conn->tryUse()) {
-      lease = std::unique_ptr<KafkaLease>(new KafkaLease(conn));
-    }
-    return lease;
-  }
-
- private:
-  std::mutex mutex_;
-
-  size_t max_;
-
-  std::map<KafkaConnectionKey, std::shared_ptr<KafkaConnection>> map_;
-};
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif //NIFI_MINIFI_CPP_KAFKAPOOL_H
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index f14dd6c..66a0937 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -27,6 +27,7 @@
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "utils/ScopeGuard.h"
+#include "utils/GeneralUtils.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
@@ -180,13 +181,11 @@ void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &conte
     logger_->log_debug("PublishKafka: AttributeNameRegex [%s]", value);
   }
 
-  // Future Improvement: Get rid of key since we only need to store one connection with current design.
   key_.brokers_ = brokers;
   key_.client_id_ = client_id;
 
-  std::unique_ptr<KafkaLease> lease = connection_pool_.getOrCreateConnection(key_);
-  std::shared_ptr<KafkaConnection> conn = lease->getConn();
-  configureNewConnection(conn, context);
+  conn_ = utils::make_unique<KafkaConnection>(key_);
+  configureNewConnection(context);
 
   logger_->log_debug("Successfully configured PublishKafka");
 }
@@ -198,6 +197,7 @@ void PublishKafka::notifyStop() {
   for (auto& messages : messages_set_) {
     messages->interrupt();
   }
+  conn_.reset();
 }
 
 /**
@@ -215,7 +215,7 @@ void PublishKafka::messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_messag
   delete func;
 }
 
-bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context) {
+bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessContext> &context) {
   std::string value;
   int64_t valInt;
   std::string valueConf;
@@ -231,7 +231,7 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
     rd_kafka_conf_destroy(conf_);
   });
 
-  auto key = conn->getKey();
+  auto key = conn_->getKey();
 
   if (key->brokers_.empty()) {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
@@ -434,12 +434,12 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
   // The producer took ownership of the configuration, we must not free it
   confGuard.disable();
 
-  conn->setConnection(producer);
+  conn_->setConnection(producer);
 
   return true;
 }
 
-bool PublishKafka::createNewTopic(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name) {
+bool PublishKafka::createNewTopic(const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name) {
   rd_kafka_topic_conf_t* topic_conf_ = rd_kafka_topic_conf_new();
   if (topic_conf_ == nullptr) {
     logger_->log_error("Failed to create rd_kafka_topic_conf_t object");
@@ -508,7 +508,7 @@ bool PublishKafka::createNewTopic(const std::shared_ptr<KafkaConnection> &conn,
     }
   }
 
-  rd_kafka_topic_t* topic_reference = rd_kafka_topic_new(conn->getConnection(), topic_name.c_str(), topic_conf_);
+  rd_kafka_topic_t* topic_reference = rd_kafka_topic_new(conn_->getConnection(), topic_name.c_str(), topic_conf_);
   if (topic_reference == nullptr) {
     rd_kafka_resp_err_t resp_err = rd_kafka_last_error();
     logger_->log_error("PublishKafka: failed to create topic %s, error: %s", topic_name.c_str(), rd_kafka_err2str(resp_err));
@@ -520,7 +520,7 @@ bool PublishKafka::createNewTopic(const std::shared_ptr<KafkaConnection> &conn,
 
   std::shared_ptr<KafkaTopic> kafkaTopicref = std::make_shared<KafkaTopic>(topic_reference);
 
-  conn->putTopic(topic_name, kafkaTopicref);
+  conn_->putTopic(topic_name, kafkaTopicref);
 
   return true;
 }
@@ -533,17 +533,9 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     return;
   }
 
+  std::lock_guard<std::mutex> lock_connection(connection_mutex_);
   logger_->log_debug("PublishKafka onTrigger");
 
-  std::unique_ptr<KafkaLease> lease = connection_pool_.getOrCreateConnection(key_);
-  if (lease == nullptr) {
-    logger_->log_info("This connection is used by another thread.");
-    context->yield();
-    return;
-  }
-
-  std::shared_ptr<KafkaConnection> conn = lease->getConn();
-
   // Collect FlowFiles to process
   uint64_t actual_bytes = 0U;
   std::vector<std::shared_ptr<core::FlowFile>> flowFiles;
@@ -591,8 +583,8 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     }
 
     // Add topic to the connection if needed
-    if (!conn->hasTopic(topic)) {
-      if (!createNewTopic(conn, context, topic)) {
+    if (!conn_->hasTopic(topic)) {
+      if (!createNewTopic(context, topic)) {
         logger_->log_error("Failed to add topic %s", topic);
         messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
           flow_file_result.flow_file_error = true;
@@ -609,7 +601,7 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
       kafkaKey = flowFile->getUUIDStr();
     }
 
-    auto thisTopic = conn->getTopic(topic);
+    auto thisTopic = conn_->getTopic(topic);
     if (thisTopic == nullptr) {
       logger_->log_error("Topic %s is invalid", topic);
       messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
@@ -621,7 +613,7 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     bool failEmptyFlowFiles = true;
     context->getProperty(FailEmptyFlowFiles.getName(), failEmptyFlowFiles);
 
-    PublishKafka::ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn->getConnection(), *flowFile,
+    PublishKafka::ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), *flowFile,
                                         attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles);
     session->read(flowFile, &callback);
 
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index c6ed19e..5946741 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -30,7 +30,7 @@
 #include "core/logging/Logger.h"
 #include "utils/RegexUtils.h"
 #include "rdkafka.h"
-#include "KafkaPool.h"
+#include "KafkaConnection.h"
 #include <atomic>
 #include <map>
 #include <set>
@@ -70,7 +70,6 @@ class PublishKafka : public core::Processor {
    */
   explicit PublishKafka(std::string name, utils::Identifier uuid = utils::Identifier())
       : core::Processor(std::move(name), uuid),
-        connection_pool_(5),
         logger_(logging::LoggerFactory<PublishKafka>::getLogger()),
         interrupted_(false) {
   }
@@ -344,16 +343,17 @@ class PublishKafka : public core::Processor {
 
  protected:
 
-  bool configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context);
-  bool createNewTopic(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name);
+  bool configureNewConnection(const std::shared_ptr<core::ProcessContext> &context);
+  bool createNewTopic(const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name);
 
  private:
   static void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque);
 
   std::shared_ptr<logging::Logger> logger_;
 
-  KafkaPool connection_pool_;
   KafkaConnectionKey key_;
+  std::unique_ptr<KafkaConnection> conn_;
+  std::mutex connection_mutex_;
 
   uint32_t batch_size_;
   uint64_t target_batch_payload_size_;