You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/05/14 07:57:37 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-1219 -
PublishKafka should release connection when stopped
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new e430612 MINIFICPP-1219 - PublishKafka should release connection when stopped
e430612 is described below
commit e430612c1400b2f4486e4f848dca36ca7b3712e6
Author: Nghia Le <mi...@gmail.com>
AuthorDate: Tue May 12 19:22:33 2020 +0200
MINIFICPP-1219 - PublishKafka should release connection when stopped
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #779
---
extensions/librdkafka/KafkaConnection.cpp | 12 +----
extensions/librdkafka/KafkaConnection.h | 26 -----------
extensions/librdkafka/KafkaPool.h | 77 -------------------------------
extensions/librdkafka/PublishKafka.cpp | 38 ++++++---------
extensions/librdkafka/PublishKafka.h | 10 ++--
5 files changed, 22 insertions(+), 141 deletions(-)
diff --git a/extensions/librdkafka/KafkaConnection.cpp b/extensions/librdkafka/KafkaConnection.cpp
index 04b327d..449a801 100644
--- a/extensions/librdkafka/KafkaConnection.cpp
+++ b/extensions/librdkafka/KafkaConnection.cpp
@@ -27,7 +27,6 @@ KafkaConnection::KafkaConnection(const KafkaConnectionKey &key)
: logger_(logging::LoggerFactory<KafkaConnection>::getLogger()),
kafka_connection_(nullptr),
poll_(false) {
- lease_ = false;
initialized_ = false;
key_ = key;
}
@@ -42,6 +41,7 @@ void KafkaConnection::remove() {
}
void KafkaConnection::removeConnection() {
+ logger_->log_trace("KafkaConnection::removeConnection START: Client = %s -- Broker = %s", key_.client_id_, key_.brokers_);
stopPoll();
if (kafka_connection_) {
rd_kafka_flush(kafka_connection_, 10 * 1000); /* wait for max 10 seconds */
@@ -52,6 +52,7 @@ void KafkaConnection::removeConnection() {
kafka_connection_ = nullptr;
}
initialized_ = false;
+ logger_->log_trace("KafkaConnection::removeConnection FINISH: Client = %s -- Broker = %s", key_.client_id_, key_.brokers_);
}
bool KafkaConnection::initialized() const {
@@ -125,15 +126,6 @@ void KafkaConnection::logCallback(const rd_kafka_t* rk, int level, const char* /
}
}
-bool KafkaConnection::tryUse() {
- std::lock_guard<std::mutex> lock(lease_mutex_);
- if (lease_) {
- return false;
- }
- lease_ = true;
- return true;
-}
-
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
diff --git a/extensions/librdkafka/KafkaConnection.h b/extensions/librdkafka/KafkaConnection.h
index 5daf96d..0c3979e 100644
--- a/extensions/librdkafka/KafkaConnection.h
+++ b/extensions/librdkafka/KafkaConnection.h
@@ -71,18 +71,10 @@ class KafkaConnection {
static void logCallback(const rd_kafka_t* rk, int level, const char* /*fac*/, const char* buf);
- bool tryUse();
-
- friend class KafkaLease;
-
private:
std::shared_ptr<logging::Logger> logger_;
- std::mutex lease_mutex_;
-
- bool lease_;
-
bool initialized_;
KafkaConnectionKey key_;
@@ -121,24 +113,6 @@ class KafkaConnection {
}
};
-class KafkaLease {
- public:
- ~KafkaLease() {
- std::lock_guard<std::mutex> lock(conn_->lease_mutex_);
- conn_->lease_ = false;
- }
- std::shared_ptr<KafkaConnection> getConn() const {
- return conn_;
- }
- friend class KafkaPool;
- private:
- KafkaLease(std::shared_ptr<KafkaConnection> conn) // This one should be private, and only KafkaPool can call (friend).
- : conn_(conn) {
- }
-
- std::shared_ptr<KafkaConnection> conn_;
-};
-
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
diff --git a/extensions/librdkafka/KafkaPool.h b/extensions/librdkafka/KafkaPool.h
deleted file mode 100644
index 3fe2d42..0000000
--- a/extensions/librdkafka/KafkaPool.h
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef NIFI_MINIFI_CPP_KAFKAPOOL_H
-#define NIFI_MINIFI_CPP_KAFKAPOOL_H
-
-#include "KafkaConnection.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-class KafkaPool {
- public:
-
- explicit KafkaPool(int max)
- : max_(max) {
- }
-
- bool removeConnection(const KafkaConnectionKey &key) {
- std::lock_guard<std::mutex> lock(mutex_);
- return map_.erase(key) == 1;
- }
-
- std::unique_ptr<KafkaLease> getOrCreateConnection(const KafkaConnectionKey &key) {
- std::lock_guard<std::mutex> lock(mutex_);
- auto connection = map_.find(key);
- std::shared_ptr<KafkaConnection> conn;
- if (connection == map_.end()) {
- // Not found, create new connection.
- conn = std::make_shared<KafkaConnection>(key);
- if (map_.size() == max_) {
- // Reached pool limit, remove the first one.
- map_.erase(map_.begin());
- }
- map_[key] = conn;
- } else {
- conn = connection->second;
- }
- std::unique_ptr<KafkaLease> lease;
- if (conn->tryUse()) {
- lease = std::unique_ptr<KafkaLease>(new KafkaLease(conn));
- }
- return lease;
- }
-
- private:
- std::mutex mutex_;
-
- size_t max_;
-
- std::map<KafkaConnectionKey, std::shared_ptr<KafkaConnection>> map_;
-};
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif //NIFI_MINIFI_CPP_KAFKAPOOL_H
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index f14dd6c..66a0937 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -27,6 +27,7 @@
#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
#include "utils/ScopeGuard.h"
+#include "utils/GeneralUtils.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
@@ -180,13 +181,11 @@ void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &conte
logger_->log_debug("PublishKafka: AttributeNameRegex [%s]", value);
}
- // Future Improvement: Get rid of key since we only need to store one connection with current design.
key_.brokers_ = brokers;
key_.client_id_ = client_id;
- std::unique_ptr<KafkaLease> lease = connection_pool_.getOrCreateConnection(key_);
- std::shared_ptr<KafkaConnection> conn = lease->getConn();
- configureNewConnection(conn, context);
+ conn_ = utils::make_unique<KafkaConnection>(key_);
+ configureNewConnection(context);
logger_->log_debug("Successfully configured PublishKafka");
}
@@ -198,6 +197,7 @@ void PublishKafka::notifyStop() {
for (auto& messages : messages_set_) {
messages->interrupt();
}
+ conn_.reset();
}
/**
@@ -215,7 +215,7 @@ void PublishKafka::messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_messag
delete func;
}
-bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context) {
+bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessContext> &context) {
std::string value;
int64_t valInt;
std::string valueConf;
@@ -231,7 +231,7 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
rd_kafka_conf_destroy(conf_);
});
- auto key = conn->getKey();
+ auto key = conn_->getKey();
if (key->brokers_.empty()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
@@ -434,12 +434,12 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
// The producer took ownership of the configuration, we must not free it
confGuard.disable();
- conn->setConnection(producer);
+ conn_->setConnection(producer);
return true;
}
-bool PublishKafka::createNewTopic(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name) {
+bool PublishKafka::createNewTopic(const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name) {
rd_kafka_topic_conf_t* topic_conf_ = rd_kafka_topic_conf_new();
if (topic_conf_ == nullptr) {
logger_->log_error("Failed to create rd_kafka_topic_conf_t object");
@@ -508,7 +508,7 @@ bool PublishKafka::createNewTopic(const std::shared_ptr<KafkaConnection> &conn,
}
}
- rd_kafka_topic_t* topic_reference = rd_kafka_topic_new(conn->getConnection(), topic_name.c_str(), topic_conf_);
+ rd_kafka_topic_t* topic_reference = rd_kafka_topic_new(conn_->getConnection(), topic_name.c_str(), topic_conf_);
if (topic_reference == nullptr) {
rd_kafka_resp_err_t resp_err = rd_kafka_last_error();
logger_->log_error("PublishKafka: failed to create topic %s, error: %s", topic_name.c_str(), rd_kafka_err2str(resp_err));
@@ -520,7 +520,7 @@ bool PublishKafka::createNewTopic(const std::shared_ptr<KafkaConnection> &conn,
std::shared_ptr<KafkaTopic> kafkaTopicref = std::make_shared<KafkaTopic>(topic_reference);
- conn->putTopic(topic_name, kafkaTopicref);
+ conn_->putTopic(topic_name, kafkaTopicref);
return true;
}
@@ -533,17 +533,9 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
return;
}
+ std::lock_guard<std::mutex> lock_connection(connection_mutex_);
logger_->log_debug("PublishKafka onTrigger");
- std::unique_ptr<KafkaLease> lease = connection_pool_.getOrCreateConnection(key_);
- if (lease == nullptr) {
- logger_->log_info("This connection is used by another thread.");
- context->yield();
- return;
- }
-
- std::shared_ptr<KafkaConnection> conn = lease->getConn();
-
// Collect FlowFiles to process
uint64_t actual_bytes = 0U;
std::vector<std::shared_ptr<core::FlowFile>> flowFiles;
@@ -591,8 +583,8 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
}
// Add topic to the connection if needed
- if (!conn->hasTopic(topic)) {
- if (!createNewTopic(conn, context, topic)) {
+ if (!conn_->hasTopic(topic)) {
+ if (!createNewTopic(context, topic)) {
logger_->log_error("Failed to add topic %s", topic);
messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
flow_file_result.flow_file_error = true;
@@ -609,7 +601,7 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
kafkaKey = flowFile->getUUIDStr();
}
- auto thisTopic = conn->getTopic(topic);
+ auto thisTopic = conn_->getTopic(topic);
if (thisTopic == nullptr) {
logger_->log_error("Topic %s is invalid", topic);
messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
@@ -621,7 +613,7 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
bool failEmptyFlowFiles = true;
context->getProperty(FailEmptyFlowFiles.getName(), failEmptyFlowFiles);
- PublishKafka::ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn->getConnection(), *flowFile,
+ PublishKafka::ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), *flowFile,
attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles);
session->read(flowFile, &callback);
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index c6ed19e..5946741 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -30,7 +30,7 @@
#include "core/logging/Logger.h"
#include "utils/RegexUtils.h"
#include "rdkafka.h"
-#include "KafkaPool.h"
+#include "KafkaConnection.h"
#include <atomic>
#include <map>
#include <set>
@@ -70,7 +70,6 @@ class PublishKafka : public core::Processor {
*/
explicit PublishKafka(std::string name, utils::Identifier uuid = utils::Identifier())
: core::Processor(std::move(name), uuid),
- connection_pool_(5),
logger_(logging::LoggerFactory<PublishKafka>::getLogger()),
interrupted_(false) {
}
@@ -344,16 +343,17 @@ class PublishKafka : public core::Processor {
protected:
- bool configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context);
- bool createNewTopic(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name);
+ bool configureNewConnection(const std::shared_ptr<core::ProcessContext> &context);
+ bool createNewTopic(const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name);
private:
static void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque);
std::shared_ptr<logging::Logger> logger_;
- KafkaPool connection_pool_;
KafkaConnectionKey key_;
+ std::unique_ptr<KafkaConnection> conn_;
+ std::mutex connection_mutex_;
uint32_t batch_size_;
uint64_t target_batch_payload_size_;