You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by GitBox <> on 2021/01/06 17:20:42 UTC

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #940: MINIFICPP-1373 - Implement ConsumeKafka

szaszm commented on a change in pull request #940:

File path: extensions/librdkafka/ConsumeKafka.h
@@ -0,0 +1,175 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <string>
+#include <utility>
+#include <vector>
+#include <memory>
+#include "core/Processor.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "rdkafka.h"
+#include "rdkafka_utils.h"
+#include "KafkaConnection.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+class ConsumeKafka : public core::Processor {
+ public:
+  static constexpr char const* ProcessorName = "ConsumeKafka";
+  // Supported Properties
+  static core::Property KafkaBrokers;
+  static core::Property SecurityProtocol;
+  static core::Property TopicNames;
+  static core::Property TopicNameFormat;
+  static core::Property HonorTransactions;
+  static core::Property GroupID;
+  static core::Property OffsetReset;
+  static core::Property KeyAttributeEncoding;
+  static core::Property MessageDemarcator;
+  static core::Property MessageHeaderEncoding;
+  static core::Property HeadersToAddAsAttributes;
+  static core::Property DuplicateHeaderHandling;
+  static core::Property MaxPollRecords;
+  static core::Property MaxPollTime;
+  static core::Property SessionTimeout;
+  // Supported Relationships
+  static const core::Relationship Success;
+  // Security Protocol allowable values
+  static constexpr char const* SECURITY_PROTOCOL_PLAINTEXT = "PLAINTEXT";
+  static constexpr char const* SECURITY_PROTOCOL_SSL = "SSL";
+  static constexpr char const* SECURITY_PROTOCOL_SASL_PLAINTEXT = "SASL_PLAINTEXT";
+  static constexpr char const* SECURITY_PROTOCOL_SASL_SSL = "SASL_SSL";
+  // Topic Name Format allowable values
+  static constexpr char const* TOPIC_FORMAT_NAMES = "Names";
+  static constexpr char const* TOPIC_FORMAT_PATTERNS = "Patterns";
+  // Offset Reset allowable values
+  static constexpr char const* OFFSET_RESET_EARLIEST = "earliest";
+  static constexpr char const* OFFSET_RESET_LATEST = "latest";
+  static constexpr char const* OFFSET_RESET_NONE = "none";
+  // Key Attribute Encoding allowable values
+  static constexpr char const* KEY_ATTR_ENCODING_UTF_8 = "UTF-8";
+  static constexpr char const* KEY_ATTR_ENCODING_HEX = "Hex";
+  // Message Header Encoding allowable values
+  static constexpr char const* MSG_HEADER_ENCODING_UTF_8 = "UTF-8";
+  static constexpr char const* MSG_HEADER_ENCODING_HEX = "Hex";
+  // Duplicate Header Handling allowable values
+  static constexpr char const* MSG_HEADER_KEEP_FIRST = "Keep First";
+  static constexpr char const* MSG_HEADER_KEEP_LATEST = "Keep Latest";
+  static constexpr char const* MSG_HEADER_COMMA_SEPARATED_MERGE = "Comma-separated Merge";
+  // Flowfile attributes written
+  static constexpr char const* KAFKA_COUNT_ATTR = "kafka.count";  // Always 1 until we start supporting merging from batches
+  static constexpr char const* KAFKA_MESSAGE_KEY_ATTR = "kafka.key";
+  static constexpr char const* KAFKA_OFFSET_ATTR = "kafka.offset";
+  static constexpr char const* KAFKA_PARTITION_ATTR = "kafka.partition";
+  static constexpr char const* KAFKA_TOPIC_ATTR = "kafka.topic";
+  static constexpr const std::size_t DEFAULT_MAX_POLL_RECORDS{ 10000 };
+  static constexpr char const* DEFAULT_MAX_POLL_TIME = "4 seconds";
+  static constexpr const std::size_t METADATA_COMMUNICATIONS_TIMEOUT_MS{ 60000 };
+  explicit ConsumeKafka(std::string name, utils::Identifier uuid = utils::Identifier()) :
+      Processor(name, uuid),
+      logger_(logging::LoggerFactory<ConsumeKafka>::getLogger()) {}
+  virtual ~ConsumeKafka() = default;
+ public:
+  bool supportsDynamicProperties() override {
+    return true;
+  }
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /* sessionFactory */) override;
+  /**
+   * Execution trigger for the RetryFlowFile Processor
+   * @param context processor context
+   * @param session processor session reference.
+   */
+  void onTrigger(core::ProcessContext* context, core::ProcessSession* session) override;
+  // Initialize, overwrite by NiFi RetryFlowFile
+  void initialize() override;
+ private:
+  void create_topic_partition_list();
+  void extend_config_from_dynamic_properties(const core::ProcessContext* context);
+  void configure_new_connection(const core::ProcessContext* context);
+  std::string extract_message(const rd_kafka_message_t* rkmessage) const;
+  std::vector<std::unique_ptr<rd_kafka_message_t, utils::rd_kafka_message_deleter>> poll_kafka_messages();
+  utils::KafkaEncoding key_attr_encoding_attr_to_enum() const;
+  utils::KafkaEncoding message_header_encoding_attr_to_enum() const;
+  std::string resolve_duplicate_headers(const std::vector<std::string>& matching_headers) const;
+  std::vector<std::string> get_matching_headers(const rd_kafka_message_t* message, const std::string& header_name) const;
+  std::vector<std::pair<std::string, std::string>> get_flowfile_attributes_from_message_header(const rd_kafka_message_t* message) const;
+  void add_kafka_attributes_to_flowfile(std::shared_ptr<FlowFileRecord>& flow_file, const rd_kafka_message_t* message) const;
+  std::vector<std::shared_ptr<FlowFileRecord>> transform_messages_into_flowfiles(
+      const std::vector<std::unique_ptr<rd_kafka_message_t, utils::rd_kafka_message_deleter>>& messages, core::ProcessSession* session) const;
+ private:
+  std::string kafka_brokers_;
+  std::string security_protocol_;
+  std::vector<std::string> topic_names_;
+  std::string topic_name_format_;
+  bool honor_transactions_;
+  std::string group_id_;
+  std::string offset_reset_;
+  std::string key_attribute_encoding_;
+  std::string message_demarcator_;
+  std::string message_header_encoding_;
+  std::string duplicate_header_handling_;
+  std::vector<std::string> headers_to_add_as_attributes_;
+  std::size_t max_poll_records_;
+  std::chrono::milliseconds max_poll_time_milliseconds_;
+  std::chrono::milliseconds communications_timeout_milliseconds_;
+  std::chrono::milliseconds session_timeout_milliseconds_;

Review comment:
       I think the `_milliseconds` suffixes in the identifiers are unnecessary because the information is already part of the type.

File path: extensions/librdkafka/ConsumeKafka.cpp
@@ -0,0 +1,553 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ConsumeKafka.h"
+#include <algorithm>
+#include <limits>
+#include "core/PropertyValidation.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+  ConsumeKafkaMaxPollTimeValidator(const std::string &name) // NOLINT
+      : TimePeriodValidator(name) {
+  }
+  ~ConsumeKafkaMaxPollTimeValidator() override = default;
+  ValidationResult validate(const std::string& subject, const std::string& input) const override {
+    uint64_t value;
+    TimeUnit timeUnit;
+    uint64_t value_as_ms;
+    return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+        core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+        org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, timeUnit, value_as_ms) &&
+        0 < value_as_ms && value_as_ms <= 4000).build();
+  }
+}  // namespace core
+namespace processors {
+constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS;
+core::Property ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka Brokers")
+  ->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>.")
+  ->withDefaultValue("localhost:9092", core::StandardValidators::get().NON_BLANK_VALIDATOR)
+  ->supportsExpressionLanguage(true)
+  ->isRequired(true)
+  ->build());
+core::Property ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security Protocol")
+  ->withDescription("This property is currently not supported. Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.")
+  ->isRequired(true)
+  ->build());
+core::Property ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
+  ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple topic names are supported as a comma separated list.")
+  ->supportsExpressionLanguage(true)
+  ->build());
+core::Property ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name Format")
+  ->withDescription("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression.")
+  ->withAllowableValues<std::string>({TOPIC_FORMAT_NAMES, TOPIC_FORMAT_PATTERNS})
+  ->withDefaultValue(TOPIC_FORMAT_NAMES)
+  ->build());
+core::Property ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor Transactions")
+  ->withDescription(
+      "Specifies whether or not MiNiFi should honor transactional guarantees when communicating with Kafka. If false, the Processor will use an \"isolation level\" of "
+      "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. "
+      "If this value is true, MiNiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer "
+      "must wait for the producer to finish its entire transaction instead of pulling as the messages become available.")
+  ->withDefaultValue<bool>(true)
+  ->isRequired(true)
+  ->build());
+core::Property ConsumeKafka::GroupID(core::PropertyBuilder::createProperty("Group ID")
+  ->withDescription("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's '' property.")
+  ->supportsExpressionLanguage(true)
+  ->build());
+core::Property ConsumeKafka::OffsetReset(core::PropertyBuilder::createProperty("Offset Reset")
+  ->withDescription("Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that "
+      "data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.")
+  ->withDefaultValue(OFFSET_RESET_LATEST)
+  ->isRequired(true)
+  ->build());
+core::Property ConsumeKafka::KeyAttributeEncoding(core::PropertyBuilder::createProperty("Key Attribute Encoding")
+  ->withDescription("FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded.")
+  ->withAllowableValues<std::string>({KEY_ATTR_ENCODING_UTF_8, KEY_ATTR_ENCODING_HEX})
+  ->withDefaultValue(KEY_ATTR_ENCODING_UTF_8)
+  ->isRequired(true)
+  ->build());
+core::Property ConsumeKafka::MessageDemarcator(core::PropertyBuilder::createProperty("Message Demarcator")
+  ->withDescription("Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains all Kafka messages in a single batch "
+      "for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart multiple Kafka messages. "
+      "This is an optional property and if not provided each Kafka message received will result in a single FlowFile which time it is triggered. ")
+  ->supportsExpressionLanguage(true)
+  ->build());
+core::Property ConsumeKafka::MessageHeaderEncoding(core::PropertyBuilder::createProperty("Message Header Encoding")
+  ->withDescription("Any message header that is found on a Kafka message will be added to the outbound FlowFile as an attribute. This property indicates the Character Encoding "
+      "to use for deserializing the headers.")
+  ->withAllowableValues<std::string>({MSG_HEADER_ENCODING_UTF_8, MSG_HEADER_ENCODING_HEX})
+  ->withDefaultValue(MSG_HEADER_ENCODING_UTF_8)
+  ->build());
+core::Property ConsumeKafka::HeadersToAddAsAttributes(core::PropertyBuilder::createProperty("Headers To Add As Attributes")
+  ->withDescription("A Regular Expression that is matched against all message headers. Any message header whose name matches the regex will be added to the FlowFile "
+      "as an Attribute. If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that "
+      "header is selected by the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a "
+      "regex like \".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent MiNiFi "
+      "from bundling the messages together efficiently.")
+  ->build());
+core::Property ConsumeKafka::DuplicateHeaderHandling(core::PropertyBuilder::createProperty("Duplicate Header Handling")
+  ->withDescription("For headers to be added as attributes, this option specifies how to handle cases where multiple headers are present with the same key. "
+      "For example in case of receiving these two headers: \"Accept: text/html\" and \"Accept: application/xml\" and we want to attach the value of \"Accept\" "
+      "as a FlowFile attribute:\n"
+      " - \"Keep First\" attaches: \"Accept -> text/html\"\n"
+      " - \"Keep Latest\" attaches: \"Accept -> application/xml\"\n"
+      " - \"Comma-separated Merge\" attaches: \"Accept -> text/html, application/xml\"\n")
+  ->withDefaultValue(MSG_HEADER_KEEP_LATEST)  // Mirroring NiFi behaviour
+  ->build());
+core::Property ConsumeKafka::MaxPollRecords(core::PropertyBuilder::createProperty("Max Poll Records")
+  ->withDescription("Specifies the maximum number of records Kafka should return when polling each time the processor is triggered.")
+  ->withDefaultValue<unsigned int>(DEFAULT_MAX_POLL_RECORDS)
+  ->build());
+core::Property ConsumeKafka::MaxPollTime(core::PropertyBuilder::createProperty("Max Poll Time")
+  ->withDescription("Specifies the maximum amount of time the consumer can use for polling data from the brokers. "
+      "Polling is a blocking operation, so the upper limit of this value is specified in 4 seconds.")
+  ->withDefaultValue(DEFAULT_MAX_POLL_TIME, std::make_shared<core::ConsumeKafkaMaxPollTimeValidator>(std::string("ConsumeKafkaMaxPollTimeValidator")))
+  ->isRequired(true)
+  ->build());
+core::Property ConsumeKafka::SessionTimeout(core::PropertyBuilder::createProperty("Session Timeout")
+  ->withDescription("Client group session and failure detection timeout. The consumer sends periodic heartbeats "
+      "to indicate its liveness to the broker. If no hearts are received by the broker for a group member within "
+      "the session timeout, the broker will remove the consumer from the group and trigger a rebalance. "
+      "The allowed range is configured with the broker configuration properties and")
+  ->withDefaultValue<core::TimePeriodValue>("60 seconds")
+  ->build());
+const core::Relationship ConsumeKafka::Success("success", "Incoming kafka messages as flowfiles. Depending on the demarcation strategy, this can be one or multiple flowfiles per message.");
+void ConsumeKafka::initialize() {
+  setSupportedProperties({
+    KafkaBrokers,
+    SecurityProtocol,
+    TopicNames,
+    TopicNameFormat,
+    HonorTransactions,
+    GroupID,
+    OffsetReset,
+    KeyAttributeEncoding,
+    MessageDemarcator,
+    MessageHeaderEncoding,
+    HeadersToAddAsAttributes,
+    DuplicateHeaderHandling,
+    MaxPollRecords,
+    MaxPollTime,
+    SessionTimeout
+  });
+  setSupportedRelationships({
+    Success,
+  });
+void ConsumeKafka::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /* sessionFactory */) {
+  // Required properties
+  kafka_brokers_                = utils::getRequiredPropertyOrThrow(context, KafkaBrokers.getName());
+  security_protocol_            = utils::getRequiredPropertyOrThrow(context, SecurityProtocol.getName());
+  topic_names_                  = utils::listFromRequiredCommaSeparatedProperty(context, TopicNames.getName());
+  topic_name_format_            = utils::getRequiredPropertyOrThrow(context, TopicNameFormat.getName());
+  honor_transactions_           = utils::parseBooleanPropertyOrThrow(context, HonorTransactions.getName());
+  group_id_                     = utils::getRequiredPropertyOrThrow(context, GroupID.getName());
+  offset_reset_                 = utils::getRequiredPropertyOrThrow(context, OffsetReset.getName());
+  key_attribute_encoding_       = utils::getRequiredPropertyOrThrow(context, KeyAttributeEncoding.getName());
+  max_poll_time_milliseconds_   = utils::parseTimePropertyMSOrThrow(context, MaxPollTime.getName());
+  session_timeout_milliseconds_ = utils::parseTimePropertyMSOrThrow(context, SessionTimeout.getName());
+  // Optional properties
+  context->getProperty(MessageDemarcator.getName(), message_demarcator_);
+  context->getProperty(MessageHeaderEncoding.getName(), message_header_encoding_);
+  context->getProperty(DuplicateHeaderHandling.getName(), duplicate_header_handling_);
+  headers_to_add_as_attributes_ = utils::listFromCommaSeparatedProperty(context, HeadersToAddAsAttributes.getName());
+  max_poll_records_ = gsl::narrow<std::size_t>(utils::getOptionalUintProperty(context, MaxPollRecords.getName()).value_or(DEFAULT_MAX_POLL_RECORDS));
+  // For now security protocols are not yet supported
+  if (!utils::StringUtils::equalsIgnoreCase(SECURITY_PROTOCOL_PLAINTEXT, security_protocol_)) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Security protocols are not supported yet.");
+  }
+  if (!utils::StringUtils::equalsIgnoreCase(KEY_ATTR_ENCODING_UTF_8, key_attribute_encoding_) && !utils::StringUtils::equalsIgnoreCase(KEY_ATTR_ENCODING_HEX, key_attribute_encoding_)) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Unsupported key attribute encoding: " + key_attribute_encoding_);
+  }
+  if (!utils::StringUtils::equalsIgnoreCase(MSG_HEADER_ENCODING_UTF_8, message_header_encoding_) && !utils::StringUtils::equalsIgnoreCase(MSG_HEADER_ENCODING_HEX, message_header_encoding_)) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Unsupported message header encoding: " + key_attribute_encoding_);
+  }
+  configure_new_connection(context);
+void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t trigger, rd_kafka_topic_partition_list_t* partitions, void* /*opaque*/) {
+  // Cooperative, incremental assignment is not supported in the current librdkafka version
+  std::shared_ptr<logging::Logger> logger{logging::LoggerFactory<ConsumeKafka>::getLogger()};
+  logger->log_debug("Rebalance triggered.");
+  rd_kafka_resp_err_t assign_error = RD_KAFKA_RESP_ERR_NO_ERROR;
+  switch (trigger) {
+      logger->log_debug("assigned");
+      utils::print_topics_list(logger, partitions);
+      assign_error = rd_kafka_assign(rk, partitions);
+      break;
+      logger->log_debug("revoked:");
+      rd_kafka_commit(rk, partitions, /* async = */ 0);  // Sync commit, maybe unneccessary
+      utils::print_topics_list(logger, partitions);
+      assign_error = rd_kafka_assign(rk, NULL);
+      break;
+    default:
+      logger->log_debug("failed: %s", rd_kafka_err2str(trigger));
+      assign_error = rd_kafka_assign(rk, NULL);
+      break;
+  }
+  logger->log_debug("assign failure: %s", rd_kafka_err2str(assign_error));
+void ConsumeKafka::create_topic_partition_list() {
+  kf_topic_partition_list_ = { rd_kafka_topic_partition_list_new(topic_names_.size()), utils::rd_kafka_topic_partition_list_deleter() };
+  // On subscriptions any topics prefixed with ^ will be regex matched
+  if (utils::StringUtils::equalsIgnoreCase(TOPIC_FORMAT_PATTERNS, topic_name_format_)) {
+    for (const std::string& topic : topic_names_) {
+      const std::string regex_format = "^" + topic;
+      rd_kafka_topic_partition_list_add(kf_topic_partition_list_.get(), regex_format.c_str(), RD_KAFKA_PARTITION_UA);

Review comment:
       The librdkafka api docs don't mention regex matching. Where is this documented?

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at: