You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/05/26 15:20:10 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-1232 - PublishKafka processor doesn't validate some properties

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 907563b  MINIFICPP-1232 - PublishKafka processor doesn't validate some properties
907563b is described below

commit 907563b542f55d1d87b69a24c03e34bb3e66c46d
Author: Adam Markovics <ad...@aimotive.com>
AuthorDate: Mon May 25 10:40:41 2020 +0200

    MINIFICPP-1232 - PublishKafka processor doesn't validate some properties
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #794
---
 extensions/librdkafka/PublishKafka.cpp | 77 ++++++++++++++++++++++------------
 extensions/librdkafka/PublishKafka.h   | 54 ++++++++++++------------
 2 files changed, 77 insertions(+), 54 deletions(-)

diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index 74eb375..7681560 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -40,34 +40,34 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property PublishKafka::SeedBrokers(
+const core::Property PublishKafka::SeedBrokers(
     core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
         ->isRequired(true)->supportsExpressionLanguage(true)->build());
 
-core::Property PublishKafka::Topic(
+const core::Property PublishKafka::Topic(
     core::PropertyBuilder::createProperty("Topic Name")->withDescription("The Kafka Topic of interest")
         ->isRequired(true)->supportsExpressionLanguage(true)->build());
 
-core::Property PublishKafka::DeliveryGuarantee(
+const core::Property PublishKafka::DeliveryGuarantee(
     core::PropertyBuilder::createProperty("Delivery Guarantee")->withDescription("Specifies the requirement for guaranteeing that a message is sent to Kafka. "
                                                                                  "Valid values are 0 (do not wait for acks), "
                                                                                  "-1 or all (block until message is committed by all in sync replicas) "
                                                                                  "or any concrete number of nodes.")
         ->isRequired(false)->supportsExpressionLanguage(true)->withDefaultValue(DELIVERY_ONE_NODE)->build());
 
-core::Property PublishKafka::MaxMessageSize(
+const core::Property PublishKafka::MaxMessageSize(
     core::PropertyBuilder::createProperty("Max Request Size")->withDescription("Maximum Kafka protocol request message size")
         ->isRequired(false)->build());
 
-core::Property PublishKafka::RequestTimeOut(
+const core::Property PublishKafka::RequestTimeOut(
     core::PropertyBuilder::createProperty("Request Timeout")->withDescription("The ack timeout of the producer request")
         ->isRequired(false)->withDefaultValue<core::TimePeriodValue>("10 sec")->supportsExpressionLanguage(true)->build());
 
-core::Property PublishKafka::MessageTimeOut(
+const core::Property PublishKafka::MessageTimeOut(
     core::PropertyBuilder::createProperty("Message Timeout")->withDescription("The total time sending a message could take")
         ->isRequired(false)->withDefaultValue<core::TimePeriodValue>("30 sec")->supportsExpressionLanguage(true)->build());
 
-core::Property PublishKafka::ClientName(
+const core::Property PublishKafka::ClientName(
     core::PropertyBuilder::createProperty("Client Name")->withDescription("Client Name to use when communicating with Kafka")
         ->isRequired(true)->supportsExpressionLanguage(true)->build());
 
@@ -75,33 +75,56 @@ core::Property PublishKafka::ClientName(
  * These don't appear to need EL support
  */
 
-core::Property PublishKafka::BatchSize(
+const core::Property PublishKafka::BatchSize(
     core::PropertyBuilder::createProperty("Batch Size")->withDescription("Maximum number of messages batched in one MessageSet")
         ->isRequired(false)->withDefaultValue<uint32_t>(10)->build());
-core::Property PublishKafka::TargetBatchPayloadSize(
+const core::Property PublishKafka::TargetBatchPayloadSize(
     core::PropertyBuilder::createProperty("Target Batch Payload Size")->withDescription("The target total payload size for a batch. 0 B means unlimited (Batch Size is still applied).")
         ->isRequired(false)->withDefaultValue<core::DataSizeValue>("512 KB")->build());
-core::Property PublishKafka::AttributeNameRegex("Attributes to Send as Headers", "Any attribute whose name matches the regex will be added to the Kafka messages as a Header", "");
-core::Property PublishKafka::QueueBufferMaxTime("Queue Buffering Max Time", "Delay to wait for messages in the producer queue to accumulate before constructing message batches", "");
-core::Property PublishKafka::QueueBufferMaxSize("Queue Max Buffer Size", "Maximum total message size sum allowed on the producer queue", "");
-core::Property PublishKafka::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the producer queue", "");
-core::Property PublishKafka::CompressCodec("Compress Codec", "compression codec to use for compressing message sets", COMPRESSION_CODEC_NONE);
-core::Property PublishKafka::MaxFlowSegSize(
+const core::Property PublishKafka::AttributeNameRegex("Attributes to Send as Headers", "Any attribute whose name matches the regex will be added to the Kafka messages as a Header", "");
+
+const core::Property PublishKafka::QueueBufferMaxTime(
+        core::PropertyBuilder::createProperty("Queue Buffering Max Time")
+        ->isRequired(false)
+        ->withDefaultValue<core::TimePeriodValue>("10 sec")
+        ->withDescription("Delay to wait for messages in the producer queue to accumulate before constructing message batches")
+        ->build());
+const core::Property PublishKafka::QueueBufferMaxSize(
+        core::PropertyBuilder::createProperty("Queue Max Buffer Size")
+        ->isRequired(false)
+        ->withDefaultValue<core::DataSizeValue>("1 MB")
+        ->withDescription("Maximum total message size sum allowed on the producer queue")
+        ->build());
+const core::Property PublishKafka::QueueBufferMaxMessage(
+        core::PropertyBuilder::createProperty("Queue Max Message")
+        ->isRequired(false)
+        ->withDefaultValue<uint64_t>(1000)
+        ->withDescription("Maximum number of messages allowed on the producer queue")
+        ->build());
+const core::Property PublishKafka::CompressCodec(
+        core::PropertyBuilder::createProperty("Compress Codec")
+        ->isRequired(false)
+        ->withDefaultValue<std::string>(COMPRESSION_CODEC_NONE)
+        ->withAllowableValues<std::string>({COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY})
+        ->withDescription("compression codec to use for compressing message sets")
+        ->build());
+
+const core::Property PublishKafka::MaxFlowSegSize(
     core::PropertyBuilder::createProperty("Max Flow Segment Size")->withDescription("Maximum flow content payload segment size for the kafka record. 0 B means unlimited.")
         ->isRequired(false)->withDefaultValue<core::DataSizeValue>("0 B")->build());
-core::Property PublishKafka::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", "");
-core::Property PublishKafka::SecurityCA("Security CA", "File or directory path to CA certificate(s) for verifying the broker's key", "");
-core::Property PublishKafka::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", "");
-core::Property PublishKafka::SecurityPrivateKey("Security Private Key", "Path to client's private key (PEM) used for authentication", "");
-core::Property PublishKafka::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", "");
-core::Property PublishKafka::KerberosServiceName("Kerberos Service Name", "Kerberos Service Name", "");
-core::Property PublishKafka::KerberosPrincipal("Kerberos Principal", "Keberos Principal", "");
-core::Property PublishKafka::KerberosKeytabPath("Kerberos Keytab Path",
+const core::Property PublishKafka::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", "");
+const core::Property PublishKafka::SecurityCA("Security CA", "File or directory path to CA certificate(s) for verifying the broker's key", "");
+const core::Property PublishKafka::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", "");
+const core::Property PublishKafka::SecurityPrivateKey("Security Private Key", "Path to client's private key (PEM) used for authentication", "");
+const core::Property PublishKafka::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", "");
+const core::Property PublishKafka::KerberosServiceName("Kerberos Service Name", "Kerberos Service Name", "");
+const core::Property PublishKafka::KerberosPrincipal("Kerberos Principal", "Keberos Principal", "");
+const core::Property PublishKafka::KerberosKeytabPath("Kerberos Keytab Path",
                                                 "The path to the location on the local filesystem where the kerberos keytab is located. Read permission on the file is required.", "");
-core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a field in the Input Records that should be used as the Key for the Kafka message.\n"
+const core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a field in the Input Records that should be used as the Key for the Kafka message.\n"
                                              "Supports Expression Language: true (will be evaluated using flow file attributes)",
                                              "");
-core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable."
+const core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable."
                                            "Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
 const core::Property PublishKafka::FailEmptyFlowFiles(
     core::PropertyBuilder::createProperty("Fail empty flow files")
@@ -111,8 +134,8 @@ const core::Property PublishKafka::FailEmptyFlowFiles(
         ->withDefaultValue<bool>(true)
         ->build());
 
-core::Relationship PublishKafka::Success("success", "Any FlowFile that is successfully sent to Kafka will be routed to this Relationship");
-core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot be sent to Kafka will be routed to this Relationship");
+const core::Relationship PublishKafka::Success("success", "Any FlowFile that is successfully sent to Kafka will be routed to this Relationship");
+const core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot be sent to Kafka will be routed to this Relationship");
 
 void PublishKafka::initialize() {
   // Set the supported properties
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 5946741..30836de 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -79,36 +79,36 @@ class PublishKafka : public core::Processor {
   static constexpr char const* ProcessorName = "PublishKafka";
 
   // Supported Properties
-  static core::Property SeedBrokers;
-  static core::Property Topic;
-  static core::Property DeliveryGuarantee;
-  static core::Property MaxMessageSize;
-  static core::Property RequestTimeOut;
-  static core::Property MessageTimeOut;
-  static core::Property ClientName;
-  static core::Property BatchSize;
-  static core::Property TargetBatchPayloadSize;
-  static core::Property AttributeNameRegex;
-  static core::Property QueueBufferMaxTime;
-  static core::Property QueueBufferMaxSize;
-  static core::Property QueueBufferMaxMessage;
-  static core::Property CompressCodec;
-  static core::Property MaxFlowSegSize;
-  static core::Property SecurityProtocol;
-  static core::Property SecurityCA;
-  static core::Property SecurityCert;
-  static core::Property SecurityPrivateKey;
-  static core::Property SecurityPrivateKeyPassWord;
-  static core::Property KerberosServiceName;
-  static core::Property KerberosPrincipal;
-  static core::Property KerberosKeytabPath;
-  static core::Property MessageKeyField;
-  static core::Property DebugContexts;
+  static const core::Property SeedBrokers;
+  static const core::Property Topic;
+  static const core::Property DeliveryGuarantee;
+  static const core::Property MaxMessageSize;
+  static const core::Property RequestTimeOut;
+  static const core::Property MessageTimeOut;
+  static const core::Property ClientName;
+  static const core::Property BatchSize;
+  static const core::Property TargetBatchPayloadSize;
+  static const core::Property AttributeNameRegex;
+  static const core::Property QueueBufferMaxTime;
+  static const core::Property QueueBufferMaxSize;
+  static const core::Property QueueBufferMaxMessage;
+  static const core::Property CompressCodec;
+  static const core::Property MaxFlowSegSize;
+  static const core::Property SecurityProtocol;
+  static const core::Property SecurityCA;
+  static const core::Property SecurityCert;
+  static const core::Property SecurityPrivateKey;
+  static const core::Property SecurityPrivateKeyPassWord;
+  static const core::Property KerberosServiceName;
+  static const core::Property KerberosPrincipal;
+  static const core::Property KerberosKeytabPath;
+  static const core::Property MessageKeyField;
+  static const core::Property DebugContexts;
   static const core::Property FailEmptyFlowFiles;
 
   // Supported Relationships
-  static core::Relationship Failure;
-  static core::Relationship Success;
+  static const core::Relationship Failure;
+  static const core::Relationship Success;
 
   // Message
   enum class MessageStatus : uint8_t {