You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/01/15 13:16:46 UTC

[GitHub] [nifi-minifi-cpp] nghiaxlee opened a new pull request #710: WIP: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

nghiaxlee opened a new pull request #710: WIP: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710
 
 
   throw in onSchedule
   
   * Finished OPC processors
   * Working on Kafka processor
   
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369668462
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -187,81 +239,77 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
   auto key = conn->getKey();
 
   if (key->brokers_.empty()) {
-    logger_->log_error("There are no brokers");
-    return false;
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
   }
   result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size());
   logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
   if (result != RD_KAFKA_CONF_OK) {
-    logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-    return false;
+    auto error_msg = utils::StringUtils::join_pack("PublishKafka: configure error result [%s]", errstr.data());
 
 Review comment:
   I actually did not expect I forgot this one :), @arpadboda what do you think about adding formatting library?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369645408
 
 

 ##########
 File path: extensions/standard-processors/processors/LogAttribute.cpp
 ##########
 @@ -95,7 +95,7 @@ void LogAttribute::onSchedule(const std::shared_ptr<core::ProcessContext> &conte
     // in configuration. In future releases we can add that exception handling there.
     if (!flowsToLog.getValue().validate("Validating FlowFilesToLog").valid())
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for flowfiles to log: " + flowsToLog.getValue().to_string());
-    flowfiles_to_log_ = flowsToLog.getValue();
+    flowfiles_to_log_ = static_cast<uint64_t>(flowsToLog.getValue());
 
 Review comment:
   There is a getProperty template to get typed value, so I think the following does the job:
   ```
   getProperty(FlowFilesToLog.getName(), flowfiles_to_log_);
   ```
   
   Didn't try though

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369660940
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -187,81 +239,77 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
   auto key = conn->getKey();
 
   if (key->brokers_.empty()) {
-    logger_->log_error("There are no brokers");
-    return false;
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
   }
   result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size());
   logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
   if (result != RD_KAFKA_CONF_OK) {
-    logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-    return false;
+    auto error_msg = utils::StringUtils::join_pack("PublishKafka: configure error result [%s]", errstr.data());
 
 Review comment:
   Maybe we could use the formatting library used by spdlog to create these strings? I've mentioned directly depending on the formatting library in [MINIFICPP-1121](https://issues.apache.org/jira/projects/MINIFICPP/issues/MINIFICPP-1121).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375834407
 
 

 ##########
 File path: libminifi/include/core/state/Value.h
 ##########
 @@ -115,6 +125,64 @@ class Value {
   std::type_index type_id;
 };
 
+class UInt32Value : public Value {
+ public:
+  explicit UInt32Value(uint32_t value)
+      : Value(std::to_string(value)),
+        value(value) {
+    setTypeId<uint32_t>();
+  }
+
+  explicit UInt32Value(const std::string &strvalue)
+      : Value(strvalue),
+        value(std::stoul(strvalue)) {
+    /**
+     * This is a fundamental change in that we would be changing where this error occurs.
+     * We should be prudent about breaking backwards compatibility, but since Uint32Value
+     * is only created with a validator and type, we **should** be okay.
+     */
+    const auto negative = strvalue.find_first_of('-') != std::string::npos;
 
 Review comment:
   Same here

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r376419716
 
 

 ##########
 File path: libminifi/include/core/state/Value.h
 ##########
 @@ -115,6 +125,64 @@ class Value {
   std::type_index type_id;
 };
 
+class UInt32Value : public Value {
+ public:
+  explicit UInt32Value(uint32_t value)
+      : Value(std::to_string(value)),
+        value(value) {
+    setTypeId<uint32_t>();
+  }
+
+  explicit UInt32Value(const std::string &strvalue)
+      : Value(strvalue),
+        value(std::stoul(strvalue)) {
+    /**
+     * This is a fundamental change in that we would be changing where this error occurs.
+     * We should be prudent about breaking backwards compatibility, but since Uint32Value
+     * is only created with a validator and type, we **should** be okay.
+     */
+    const auto negative = strvalue.find_first_of('-') != std::string::npos;
+     if (negative){
+       throw std::out_of_range("negative value detected");
+     }
+    setTypeId<uint32_t>();
+  }
+
+  uint32_t getValue() const {
+    return value;
+  }
+ protected:
+
+  virtual bool getValue(uint32_t &ref) {
+    ref = value;
+    return true;
+  }
+
+  virtual bool getValue(int &ref) {
+    if (value <= (std::numeric_limits<int>::max)()) {
 
 Review comment:
   My bad, I confused `value` with `ref`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369641957
 
 

 ##########
 File path: extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
 ##########
 @@ -0,0 +1,84 @@
+/**
+ * @file GenerateFlowFile.h
+ * GenerateFlowFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <cassert>
+#include "../../../libminifi/test/integration/IntegrationBase.h"
+#include "core/logging/Logger.h"
+#include "../../../libminifi/test/TestBase.h"
+#include "../PublishKafka.h"
+
+class PublishKafkaOnScheduleTests : public IntegrationBase {
+public:
+    virtual void runAssertions() {
+      std::string logs = LogTestController::getInstance().log_output.str();
+      size_t pos = 0;
+      size_t last_pos = 0;
+      unsigned int occurances = 0;
+      do {
+        pos = logs.find(" value 1 is outside allowed range 1000..1000000000", pos);
 
 Review comment:
   As this logic is not so nice ( I admit to be the guilty one responsible for this :) ), I would prefer not to copy-paste it, but create a function like "countMessagePairsInLog" and make it to be part of test framework. 
   If you have a better idea for naming this, I'm happy to see that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r374835864
 
 

 ##########
 File path: extensions/standard-processors/processors/LogAttribute.cpp
 ##########
 @@ -88,23 +88,14 @@ void LogAttribute::initialize() {
 }
 
 void LogAttribute::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
-  core::Property flowsToLog = FlowFilesToLog;
-
-  if (getProperty(FlowFilesToLog.getName(), flowsToLog)) {
-    // we are going this route since to avoid breaking backwards compatibility the get property function doesn't perform validation ( That's done
-    // in configuration. In future releases we can add that exception handling there.
-    if (!flowsToLog.getValue().validate("Validating FlowFilesToLog").valid())
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for flowfiles to log: " + flowsToLog.getValue().to_string());
-    flowfiles_to_log_ = flowsToLog.getValue();
-  }
 
 Review comment:
   This line, if I wrote like: 
   ```
   if (!context->getProperty(FlowFilesToLog.getName(), flowfiles_to_log_)) {
         throw ...
   }
   ```
   Then the GetTCPTest (only this) will fail due to exception, so I guess that test's configuration has issues, will take a look later.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r376417867
 
 

 ##########
 File path: libminifi/include/core/state/Value.h
 ##########
 @@ -115,6 +125,64 @@ class Value {
   std::type_index type_id;
 };
 
+class UInt32Value : public Value {
+ public:
+  explicit UInt32Value(uint32_t value)
+      : Value(std::to_string(value)),
+        value(value) {
+    setTypeId<uint32_t>();
+  }
+
+  explicit UInt32Value(const std::string &strvalue)
+      : Value(strvalue),
+        value(std::stoul(strvalue)) {
+    /**
+     * This is a fundamental change in that we would be changing where this error occurs.
+     * We should be prudent about breaking backwards compatibility, but since Uint32Value
+     * is only created with a validator and type, we **should** be okay.
+     */
+    const auto negative = strvalue.find_first_of('-') != std::string::npos;
+     if (negative){
+       throw std::out_of_range("negative value detected");
+     }
+    setTypeId<uint32_t>();
+  }
+
+  uint32_t getValue() const {
+    return value;
+  }
+ protected:
+
+  virtual bool getValue(uint32_t &ref) {
+    ref = value;
+    return true;
+  }
+
+  virtual bool getValue(int &ref) {
+    if (value <= (std::numeric_limits<int>::max)()) {
 
 Review comment:
   Could you elaborate it?
   This is derived from previous impl of uint64, but I think it makes sense (in logic, not sure that's actually what we want).
   From my understanding, this check whether `value`(0..2^32-1) is smaller than 2^31-1 or not, if it is smaller then we assign to `ref`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369640254
 
 

 ##########
 File path: extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
 ##########
 @@ -0,0 +1,84 @@
+/**
+ * @file GenerateFlowFile.h
+ * GenerateFlowFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <cassert>
+#include "../../../libminifi/test/integration/IntegrationBase.h"
+#include "core/logging/Logger.h"
+#include "../../../libminifi/test/TestBase.h"
+#include "../PublishKafka.h"
+
+class PublishKafkaOnScheduleTests : public IntegrationBase {
+public:
+    virtual void runAssertions() {
+      std::string logs = LogTestController::getInstance().log_output.str();
+      size_t pos = 0;
+      size_t last_pos = 0;
+      unsigned int occurances = 0;
+      do {
+        pos = logs.find(" value 1 is outside allowed range 1000..1000000000", pos);
+        if (pos != std::string::npos) {
+          last_pos = pos;
+          pos = logs.find("notifyStop called", pos);
+          if (pos != std::string::npos) {
+            last_pos = pos;
+            occurances++;
+          }
+        }
+      } while (pos != std::string::npos);
+
+      assert(occurances > 1);  // Verify retry of onSchedule and onUnSchedule calls
+
+      // Make sure onSchedule succeeded after property was set
+      assert(logs.find("Successfully configured PublishKafka", last_pos) != std::string::npos);
 
 Review comment:
   I like this test, nice work! (Even though some parts make me feel deja vu :) )

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375895619
 
 

 ##########
 File path: libminifi/include/core/state/Value.h
 ##########
 @@ -188,6 +264,18 @@ class BoolValue : public Value {
     }
   }
 
+  virtual bool getValue(uint32_t &ref) {
 
 Review comment:
   The implementation is also bad smell, I agree, didn't check that. 
   
   In case the functions are virtual we should introduce a templated private helper and call that from the virtuals. Still better than 4 times duplicated code. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369626184
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -187,81 +239,77 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
   auto key = conn->getKey();
 
   if (key->brokers_.empty()) {
-    logger_->log_error("There are no brokers");
-    return false;
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
   }
   result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size());
   logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
   if (result != RD_KAFKA_CONF_OK) {
-    logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-    return false;
+    auto error_msg = utils::StringUtils::join_pack("PublishKafka: configure error result [%s]", errstr.data());
 
 Review comment:
   I don't think it's gonna work as you expect. It just joins them, doesn't work as a format string. 
   
   What you mean is more like:
   ```
   auto error_msg = utils::StringUtils::join_pack("PublishKafka: configure error result [", errstr.data(), "]");
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369682523
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -187,81 +239,77 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
   auto key = conn->getKey();
 
   if (key->brokers_.empty()) {
-    logger_->log_error("There are no brokers");
-    return false;
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
   }
   result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size());
   logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
   if (result != RD_KAFKA_CONF_OK) {
-    logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-    return false;
+    auto error_msg = utils::StringUtils::join_pack("PublishKafka: configure error result [%s]", errstr.data());
 
 Review comment:
   I'm fine with that, but please make sure you do that in a separate commit and in case we start using that directly we should add the cmake dependency directly as well. 
   https://github.com/apache/nifi-minifi-cpp/blob/master/ThirdParties.md

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375869797
 
 

 ##########
 File path: libminifi/test/integration/IntegrationBase.h
 ##########
 @@ -42,6 +42,17 @@ class IntegrationBase {
     configureSecurity();
   }
 
+  // Return the last position and number of occurrences.
+  std::pair<size_t, uint32_t> countPatInStr(const std::string &str, const std::string &pattern) {
+    size_t last_pos = 0;
+    unsigned int occurrences = 0;
 
 Review comment:
   Indeed, it should be consistent. This is minor, but I'll write it down anyway.
   I don't think the count type should be `uint32_t`, since there's nothing that mandates 32 bits of storage for our integer.
   The position is fine as `size_t`.
   
   Some candidates:
   - `int`: plain and simple, we return an integer
   - `unsigned int`: as above, but a count can not be smaller than zero and we can signal this with the type
   - `size_t`: The max. possible count of matches, in the case of a theoretical string spanning the full addressable memory and a search string matching on every character.
   - `ptrdiff_t`: The max. possible count of matches is the size of the string, that is the difference between the last and first element pointers. Note: signed integer
   - `typename iterator_traits<std::string::iterator>::difference_type`: Same rationale as for `ptrdiff_t`, but for last and first iterators. This is what `std::count` returns.
   
   My suggestion would be a simple `int`, `unsigned int` or a correct `typename iterator_traits<std::string::iterator>::difference_type`, in the order of my preference.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369656857
 
 

 ##########
 File path: libminifi/src/core/PropertyValidation.cpp
 ##########
 @@ -27,6 +27,7 @@ std::shared_ptr<PropertyValidator> StandardValidators::VALID = std::make_shared<
 StandardValidators::StandardValidators() {
   INVALID = std::make_shared<AlwaysValid>(false, "INVALID");
   INTEGER_VALIDATOR = std::make_shared<IntegerValidator>("INTEGER_VALIDATOR");
+  UNSIGNED_INT_VALIDATOR = std::make_shared<UnsignedIntValidator>("UNSIGNED_INT_VALIDATOR");
 
 Review comment:
   https://github.com/apache/nifi/blob/master/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java#L292 :sweat:

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375835044
 
 

 ##########
 File path: libminifi/test/integration/IntegrationBase.h
 ##########
 @@ -42,6 +42,17 @@ class IntegrationBase {
     configureSecurity();
   }
 
+  // Return the last position and number of occurrences.
+  std::pair<size_t, uint32_t> countPatInStr(const std::string &str, const std::string &pattern) {
+    size_t last_pos = 0;
+    unsigned int occurrences = 0;
 
 Review comment:
   Please be consistent with types, it's returned as uint32_t

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369668522
 
 

 ##########
 File path: extensions/librdkafka/tests/CMakeLists.txt
 ##########
 @@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB KAFKA_TESTS  "*.cpp")
+
+SET(KAFKA_TEST_COUNT 0)
+
+FOREACH(testfile ${KAFKA_TESTS})
+    get_filename_component(testfilename "${testfile}" NAME_WE)
+    add_executable("${testfilename}" "${testfile}")
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/librdkafka/")
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+    target_wholearchive_library(${testfilename} minifi-rdkafka-extensions)
+    target_wholearchive_library(${testfilename} minifi-standard-processors)
+    createTests("${testfilename}")
+    MATH(EXPR KAFKA_TEST_COUNT "${KAFKA_TEST_COUNT}+1")
+    if (${testfilename} MATCHES "(.)*OnSchedule(.)*")
 
 Review comment:
   Just a workaround for myself, I have some unit tests which require local Kafka server but never commit them. Should I add them in or modify this file for this test only? (will rewrite anyway because that line looks weird)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r374834609
 
 

 ##########
 File path: extensions/standard-processors/processors/LogAttribute.cpp
 ##########
 @@ -88,23 +88,14 @@ void LogAttribute::initialize() {
 }
 
 void LogAttribute::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
-  core::Property flowsToLog = FlowFilesToLog;
-
-  if (getProperty(FlowFilesToLog.getName(), flowsToLog)) {
-    // we are going this route since to avoid breaking backwards compatibility the get property function doesn't perform validation ( That's done
-    // in configuration. In future releases we can add that exception handling there.
-    if (!flowsToLog.getValue().validate("Validating FlowFilesToLog").valid())
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for flowfiles to log: " + flowsToLog.getValue().to_string());
-    flowfiles_to_log_ = flowsToLog.getValue();
-  }
 
-  std::string value;
-  if (context->getProperty(HexencodePayload.getName(), value)) {
-    utils::StringUtils::StringToBool(value, hexencode_);
-  }
-  if (context->getProperty(MaxPayloadLineLength.getName(), value)) {
-    core::Property::StringToInt(value, max_line_length_);
-  }
+  context->getProperty(FlowFilesToLog.getName(), flowfiles_to_log_);
 
 Review comment:
   This line, if I wrote like: 
   ```
   if (!context->getProperty(FlowFilesToLog.getName(), flowfiles_to_log_)) {
         throw ...
   }
   ```
   Then the GetTCPTest (only this) will fail due to exception, so I guess that test's configuration has issues, will take a look later.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r386412123
 
 

 ##########
 File path: libminifi/include/core/state/Value.h
 ##########
 @@ -218,6 +274,16 @@ class BoolValue : public Value {
   }
 
   bool value;
+
+ private:
+  template<typename T>
+  bool PreventSwearingInFutureRefactor(T &ref) {
 
 Review comment:
   😄 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375325208
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -187,81 +239,77 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
   auto key = conn->getKey();
 
   if (key->brokers_.empty()) {
-    logger_->log_error("There are no brokers");
-    return false;
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
   }
   result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size());
   logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
   if (result != RD_KAFKA_CONF_OK) {
-    logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-    return false;
+    auto error_msg = utils::StringUtils::join_pack("PublishKafka: configure error result [%s]", errstr.data());
 
 Review comment:
   okay, then I will do a follow-up refactoring later

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r370243546
 
 

 ##########
 File path: libminifi/src/core/PropertyValidation.cpp
 ##########
 @@ -27,6 +27,7 @@ std::shared_ptr<PropertyValidator> StandardValidators::VALID = std::make_shared<
 StandardValidators::StandardValidators() {
   INVALID = std::make_shared<AlwaysValid>(false, "INVALID");
   INTEGER_VALIDATOR = std::make_shared<IntegerValidator>("INTEGER_VALIDATOR");
+  UNSIGNED_INT_VALIDATOR = std::make_shared<UnsignedIntValidator>("UNSIGNED_INT_VALIDATOR");
 
 Review comment:
   Java `int` is 32 bit signed, but the validator filters all the negatives so it's effectively 31 bit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] nghiaxlee commented on issue #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
nghiaxlee commented on issue #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#issuecomment-584309399
 
 
   Rebased and resolved conflicts

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369643436
 
 

 ##########
 File path: extensions/librdkafka/tests/CMakeLists.txt
 ##########
 @@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB KAFKA_TESTS  "*.cpp")
+
+SET(KAFKA_TEST_COUNT 0)
+
+FOREACH(testfile ${KAFKA_TESTS})
+    get_filename_component(testfilename "${testfile}" NAME_WE)
+    add_executable("${testfilename}" "${testfile}")
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/librdkafka/")
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+    target_wholearchive_library(${testfilename} minifi-rdkafka-extensions)
+    target_wholearchive_library(${testfilename} minifi-standard-processors)
+    createTests("${testfilename}")
+    MATH(EXPR KAFKA_TEST_COUNT "${KAFKA_TEST_COUNT}+1")
+    if (${testfilename} MATCHES "(.)*OnSchedule(.)*")
 
 Review comment:
   What's the point this? Did you try to prepare this test file to have different kind of Kafka tests?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369642669
 
 

 ##########
 File path: extensions/librdkafka/tests/CMakeLists.txt
 ##########
 @@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB KAFKA_TESTS  "*.cpp")
+
+SET(KAFKA_TEST_COUNT 0)
+
+FOREACH(testfile ${KAFKA_TESTS})
+    get_filename_component(testfilename "${testfile}" NAME_WE)
+    add_executable("${testfilename}" "${testfile}")
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/librdkafka/")
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+    target_wholearchive_library(${testfilename} minifi-rdkafka-extensions)
+    target_wholearchive_library(${testfilename} minifi-standard-processors)
 
 Review comment:
   Why do we need standard processors to be included and linked to this test? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r370248291
 
 

 ##########
 File path: libminifi/src/core/PropertyValidation.cpp
 ##########
 @@ -27,6 +27,7 @@ std::shared_ptr<PropertyValidator> StandardValidators::VALID = std::make_shared<
 StandardValidators::StandardValidators() {
   INVALID = std::make_shared<AlwaysValid>(false, "INVALID");
   INTEGER_VALIDATOR = std::make_shared<IntegerValidator>("INTEGER_VALIDATOR");
+  UNSIGNED_INT_VALIDATOR = std::make_shared<UnsignedIntValidator>("UNSIGNED_INT_VALIDATOR");
 
 Review comment:
   Side note: IIRC, Java 8 provides some Unsinged methods for Integer class so that it can work in range 0..2^32-1, though I am not proficient enough at Java to understand why they dont provide unsigned keyword.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375295861
 
 

 ##########
 File path: extensions/standard-processors/processors/LogAttribute.cpp
 ##########
 @@ -88,23 +88,14 @@ void LogAttribute::initialize() {
 }
 
 void LogAttribute::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
-  core::Property flowsToLog = FlowFilesToLog;
-
-  if (getProperty(FlowFilesToLog.getName(), flowsToLog)) {
-    // we are going this route since to avoid breaking backwards compatibility the get property function doesn't perform validation ( That's done
-    // in configuration. In future releases we can add that exception handling there.
-    if (!flowsToLog.getValue().validate("Validating FlowFilesToLog").valid())
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for flowfiles to log: " + flowsToLog.getValue().to_string());
-    flowfiles_to_log_ = flowsToLog.getValue();
-  }
 
 Review comment:
   You are right. 
   
   Initialise() is not called in the testcases, which would set the default value. 
   Added that call to every testcase fixes the issue. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375844201
 
 

 ##########
 File path: libminifi/include/core/state/Value.h
 ##########
 @@ -115,6 +125,64 @@ class Value {
   std::type_index type_id;
 };
 
+class UInt32Value : public Value {
+ public:
+  explicit UInt32Value(uint32_t value)
+      : Value(std::to_string(value)),
+        value(value) {
+    setTypeId<uint32_t>();
+  }
+
+  explicit UInt32Value(const std::string &strvalue)
+      : Value(strvalue),
+        value(std::stoul(strvalue)) {
+    /**
+     * This is a fundamental change in that we would be changing where this error occurs.
+     * We should be prudent about breaking backwards compatibility, but since Uint32Value
+     * is only created with a validator and type, we **should** be okay.
+     */
+    const auto negative = strvalue.find_first_of('-') != std::string::npos;
+     if (negative){
+       throw std::out_of_range("negative value detected");
+     }
+    setTypeId<uint32_t>();
+  }
+
+  uint32_t getValue() const {
+    return value;
+  }
+ protected:
+
+  virtual bool getValue(uint32_t &ref) {
+    ref = value;
+    return true;
+  }
+
+  virtual bool getValue(int &ref) {
+    if (value <= (std::numeric_limits<int>::max)()) {
 
 Review comment:
   This condition is always `true`. Signed integer overflow results in undefined behavior.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda closed pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375870980
 
 

 ##########
 File path: libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
 ##########
 @@ -29,32 +27,22 @@ class OnScheduleErrorHandlingTests : public IntegrationBase {
  public:
   virtual void runAssertions() {
     std::string logs = LogTestController::getInstance().log_output.str();
-    size_t pos = 0;
-    size_t last_pos = 0;
-    unsigned int occurances = 0;
-    do {
-      pos = logs.find(minifi::processors::KamikazeProcessor::OnScheduleExceptionStr, pos);
-      if (pos != std::string::npos) {
-        last_pos = pos;
-        pos = logs.find(minifi::processors::KamikazeProcessor::OnUnScheduleLogStr, pos);
-        if (pos != std::string::npos) {
-          last_pos = pos;
-          occurances++;
-        }
-      }
-    } while (pos != std::string::npos);
 
-    assert(occurances > 1);  // Verify retry of onSchedule and onUnSchedule calls
+    auto result = countPatInStr(logs, minifi::processors::KamikazeProcessor::OnScheduleExceptionStr);
+    size_t last_pos = result.first;
+    unsigned int occurances = result.second;
 
 Review comment:
   typo: it's spelled occurence

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r376420231
 
 

 ##########
 File path: libminifi/include/core/state/Value.h
 ##########
 @@ -87,6 +88,15 @@ class Value {
     type_id = std::type_index(typeid(T));
   }
 
+  virtual bool getValue(uint32_t &ref) {
+    const auto negative = string_value.find_first_of('-') != std::string::npos;
 
 Review comment:
   Will keep the current impl for now

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r374834609
 
 

 ##########
 File path: extensions/standard-processors/processors/LogAttribute.cpp
 ##########
 @@ -88,23 +88,14 @@ void LogAttribute::initialize() {
 }
 
 void LogAttribute::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
-  core::Property flowsToLog = FlowFilesToLog;
-
-  if (getProperty(FlowFilesToLog.getName(), flowsToLog)) {
-    // we are going this route since to avoid breaking backwards compatibility the get property function doesn't perform validation ( That's done
-    // in configuration. In future releases we can add that exception handling there.
-    if (!flowsToLog.getValue().validate("Validating FlowFilesToLog").valid())
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for flowfiles to log: " + flowsToLog.getValue().to_string());
-    flowfiles_to_log_ = flowsToLog.getValue();
-  }
 
-  std::string value;
-  if (context->getProperty(HexencodePayload.getName(), value)) {
-    utils::StringUtils::StringToBool(value, hexencode_);
-  }
-  if (context->getProperty(MaxPayloadLineLength.getName(), value)) {
-    core::Property::StringToInt(value, max_line_length_);
-  }
+  context->getProperty(FlowFilesToLog.getName(), flowfiles_to_log_);
 
 Review comment:
   This line, if I wrote like: 
   ```
   if (!context->getProperty(FlowFilesToLog.getName(), flowfiles_to_log_)) {
         throw ...
   }
   ```
   Then the GetTCPTest (only this) will fail due to exception, so I guess that test's configuration has issues, will take a look later.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375827761
 
 

 ##########
 File path: libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
 ##########
 @@ -1,6 +1,4 @@
 /**
- * @file GenerateFlowFile.h
 
 Review comment:
   Thanks :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369626667
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -187,81 +239,77 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
   auto key = conn->getKey();
 
   if (key->brokers_.empty()) {
-    logger_->log_error("There are no brokers");
-    return false;
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
   }
   result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size());
   logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
   if (result != RD_KAFKA_CONF_OK) {
-    logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-    return false;
+    auto error_msg = utils::StringUtils::join_pack("PublishKafka: configure error result [%s]", errstr.data());
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
   }
 
   if (key->client_id_.empty()) {
-    logger_->log_error("Client id is empty");
-    return false;
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client id is empty");
   }
   result = rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr.data(), errstr.size());
   logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_);
   if (result != RD_KAFKA_CONF_OK) {
-    logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-    return false;
+    auto error_msg = utils::StringUtils::join_pack("PublishKafka: configure error result ", errstr.data());
 
 Review comment:
   Nitpicking: a ':' would be nice to indicate where the error string begins
   
   The same applies for all these cases.
   
   Altough it's used in many places, so I think it would be better to prepar the error string and just concat the actual error when throwing the exception to avoid multiplication of this code snippet.  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r376419000
 
 

 ##########
 File path: libminifi/test/integration/IntegrationBase.h
 ##########
 @@ -42,6 +42,17 @@ class IntegrationBase {
     configureSecurity();
   }
 
+  // Return the last position and number of occurrences.
+  std::pair<size_t, uint32_t> countPatInStr(const std::string &str, const std::string &pattern) {
+    size_t last_pos = 0;
+    unsigned int occurrences = 0;
 
 Review comment:
   Done, @szaszm thanks for the detailed explanation. ^^

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369647514
 
 

 ##########
 File path: libminifi/src/core/PropertyValidation.cpp
 ##########
 @@ -27,6 +27,7 @@ std::shared_ptr<PropertyValidator> StandardValidators::VALID = std::make_shared<
 StandardValidators::StandardValidators() {
   INVALID = std::make_shared<AlwaysValid>(false, "INVALID");
   INTEGER_VALIDATOR = std::make_shared<IntegerValidator>("INTEGER_VALIDATOR");
+  UNSIGNED_INT_VALIDATOR = std::make_shared<UnsignedIntValidator>("UNSIGNED_INT_VALIDATOR");
 
 Review comment:
   To ensure compatibility with MiNiFi java via C2 (having matching manifest in case of validators) please make sure about UInt32 validator exists in NiFi and it has the same name. ( I can help with this) 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375878858
 
 

 ##########
 File path: libminifi/include/core/state/Value.h
 ##########
 @@ -87,6 +88,15 @@ class Value {
     type_id = std::type_index(typeid(T));
   }
 
+  virtual bool getValue(uint32_t &ref) {
+    const auto negative = string_value.find_first_of('-') != std::string::npos;
 
 Review comment:
   @arpadboda: The name helps readability and since it's `const` it doesn't increase complexity. (by potentially introducing extra states)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375896103
 
 

 ##########
 File path: libminifi/test/integration/IntegrationBase.h
 ##########
 @@ -42,6 +42,17 @@ class IntegrationBase {
     configureSecurity();
   }
 
+  // Return the last position and number of occurrences.
+  std::pair<size_t, uint32_t> countPatInStr(const std::string &str, const std::string &pattern) {
+    size_t last_pos = 0;
+    unsigned int occurrences = 0;
 
 Review comment:
   I'm fine with size_t as well, I don't really mind which one we choose, just make sure it's consistent. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r370248291
 
 

 ##########
 File path: libminifi/src/core/PropertyValidation.cpp
 ##########
 @@ -27,6 +27,7 @@ std::shared_ptr<PropertyValidator> StandardValidators::VALID = std::make_shared<
 StandardValidators::StandardValidators() {
   INVALID = std::make_shared<AlwaysValid>(false, "INVALID");
   INTEGER_VALIDATOR = std::make_shared<IntegerValidator>("INTEGER_VALIDATOR");
+  UNSIGNED_INT_VALIDATOR = std::make_shared<UnsignedIntValidator>("UNSIGNED_INT_VALIDATOR");
 
 Review comment:
   Side note: IIRC, Java 8 provides some Unsinged methods for Integer class so that it can work in range 0..2^32-1.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #710: WIP: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #710: WIP: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r368467670
 
 

 ##########
 File path: extensions/opc/src/fetchopc.cpp
 ##########
 @@ -93,14 +93,12 @@ namespace processors {
 
     translatedNodeIDs_.clear();  // Path might has changed during restart
 
-    BaseOPCProcessor::onSchedule(context, factory);
-
-    if(!configOK_) {
-      return;
+    try {
+      BaseOPCProcessor::onSchedule(context, factory);
+    } catch (const Exception &e) {
+      throw e;
     }
 
 Review comment:
   What's the intention behind these lines? We catch `minifi::Exception`s then rethrow them without doing anything.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r374837145
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -141,6 +141,59 @@ void PublishKafka::initialize() {
 
 void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   interrupted_ = false;
+
+  // Try to get a KafkaConnection
+  std::string client_id, brokers;
+  if (!context->getProperty(ClientName.getName(), client_id)) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client Name property missing or invalid");
+  }
+  if (!context->getProperty(SeedBrokers.getName(), brokers)) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Known Brokers property missing or invalid");
+  }
+
+  // Get some properties not (only) used directly to set up librdkafka
+  std::string value;
+
+  // Batch Size
+  value = "";
+  if (context->getProperty(BatchSize.getName(), batch_size_)) {
+    logger_->log_debug("PublishKafka: Batch Size [%lu]", batch_size_);
+  } else {
+    batch_size_ = 10;
+  }
+
+  // Target Batch Payload Size
+  value = "";
+  if (context->getProperty(TargetBatchPayloadSize.getName(), target_batch_payload_size_)) {
+    logger_->log_debug("PublishKafka: Target Batch Payload Size [%llu]", target_batch_payload_size_);
+  } else {
+    target_batch_payload_size_ = 512 * 1024U;
+  }
+
+  // Max Flow Segment Size
+  value = "";
+  if (context->getProperty(MaxFlowSegSize.getName(), max_flow_seg_size_)) {
+    logger_->log_debug("PublishKafka: Max Flow Segment Size [%llu]", max_flow_seg_size_);
+  } else {
+    max_flow_seg_size_ = 0U;
 
 Review comment:
   Fixed, we already had default value, maybe we were overcareful

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375902215
 
 

 ##########
 File path: libminifi/include/core/state/Value.h
 ##########
 @@ -87,6 +88,15 @@ class Value {
     type_id = std::type_index(typeid(T));
   }
 
+  virtual bool getValue(uint32_t &ref) {
+    const auto negative = string_value.find_first_of('-') != std::string::npos;
 
 Review comment:
   In my opinion the variable just decreases readability here, but this is really minor, I'm fine with the current way as well. 
   
   Although it's simply wrong. 
   It's going to fail in case the '-' char is after the digits (like "3-"), which std::stoull handles. 
   Either let it go or go strict and apply isdigit check for all.
   
   I will create a Jira for this, should be done in scope of a separate PR. 
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375827609
 
 

 ##########
 File path: libminifi/src/core/PropertyValidation.cpp
 ##########
 @@ -27,6 +27,7 @@ std::shared_ptr<PropertyValidator> StandardValidators::VALID = std::make_shared<
 StandardValidators::StandardValidators() {
   INVALID = std::make_shared<AlwaysValid>(false, "INVALID");
   INTEGER_VALIDATOR = std::make_shared<IntegerValidator>("INTEGER_VALIDATOR");
+  UNSIGNED_INT_VALIDATOR = std::make_shared<UnsignedIntValidator>("NON_NEGATIVE_INTEGER_VALIDATOR");
 
 Review comment:
   👍 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375826987
 
 

 ##########
 File path: libminifi/include/core/state/Value.h
 ##########
 @@ -188,6 +264,18 @@ class BoolValue : public Value {
     }
   }
 
+  virtual bool getValue(uint32_t &ref) {
 
 Review comment:
   Please, no!
   
   Yes, it was already horrible, but that doesn't justify copy-pasting the same! :)
   
   ```
   template<typename Integral, typename std::enable_if<
       (sizeof(Integral) >= 4) && std::is_integral<Integral>::value
       ,Integral>::type* = nullptr>
   virtual bool getValue(Integral &ref) {
     if(ref != 0 && ref != 1) {
      return false;
     }
    ref = (ref == 1);
    return true;
   }
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r369681377
 
 

 ##########
 File path: libminifi/src/core/PropertyValidation.cpp
 ##########
 @@ -27,6 +27,7 @@ std::shared_ptr<PropertyValidator> StandardValidators::VALID = std::make_shared<
 StandardValidators::StandardValidators() {
   INVALID = std::make_shared<AlwaysValid>(false, "INVALID");
   INTEGER_VALIDATOR = std::make_shared<IntegerValidator>("INTEGER_VALIDATOR");
+  UNSIGNED_INT_VALIDATOR = std::make_shared<UnsignedIntValidator>("UNSIGNED_INT_VALIDATOR");
 
 Review comment:
   Fair, but not sure whether that's 32 or 64 bits. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375847455
 
 

 ##########
 File path: libminifi/include/core/state/Value.h
 ##########
 @@ -188,6 +264,18 @@ class BoolValue : public Value {
     }
   }
 
+  virtual bool getValue(uint32_t &ref) {
 
 Review comment:
   Isn't this function meant to validate and return `value` instead of just validating and self-assigning `ref`?
   If I'm right:
   ```
   if (value != 0 && value != 1) { return false; }
   ref = (value != 0);
   return true;
   ```
   If the protocol allows it, I'd even go as far as skipping the validation and interpreting everything non-zero as true.
   
   Also: "A member function template shall not be virtual."
   source: C++ standard draft, 13.7.2 [temp.mem] 3), http://open-std.org/JTC1/SC22/WG21/docs/papers/2019/n4835.pdf 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r372990619
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -141,6 +141,59 @@ void PublishKafka::initialize() {
 
 void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   interrupted_ = false;
+
+  // Try to get a KafkaConnection
+  std::string client_id, brokers;
+  if (!context->getProperty(ClientName.getName(), client_id)) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client Name property missing or invalid");
+  }
+  if (!context->getProperty(SeedBrokers.getName(), brokers)) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Known Brokers property missing or invalid");
+  }
+
+  // Get some properties not (only) used directly to set up librdkafka
+  std::string value;
+
+  // Batch Size
+  value = "";
+  if (context->getProperty(BatchSize.getName(), batch_size_)) {
+    logger_->log_debug("PublishKafka: Batch Size [%lu]", batch_size_);
+  } else {
+    batch_size_ = 10;
+  }
+
+  // Target Batch Payload Size
+  value = "";
+  if (context->getProperty(TargetBatchPayloadSize.getName(), target_batch_payload_size_)) {
+    logger_->log_debug("PublishKafka: Target Batch Payload Size [%llu]", target_batch_payload_size_);
+  } else {
+    target_batch_payload_size_ = 512 * 1024U;
+  }
+
+  // Max Flow Segment Size
+  value = "";
+  if (context->getProperty(MaxFlowSegSize.getName(), max_flow_seg_size_)) {
+    logger_->log_debug("PublishKafka: Max Flow Segment Size [%llu]", max_flow_seg_size_);
+  } else {
+    max_flow_seg_size_ = 0U;
 
 Review comment:
   Instead of doing this, why don't we simply add default values to these properties, so getProperty always succeeds?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
nghiaxlee commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r374837546
 
 

 ##########
 File path: extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
 ##########
 @@ -0,0 +1,84 @@
+/**
+ * @file GenerateFlowFile.h
+ * GenerateFlowFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <cassert>
+#include "../../../libminifi/test/integration/IntegrationBase.h"
+#include "core/logging/Logger.h"
+#include "../../../libminifi/test/TestBase.h"
+#include "../PublishKafka.h"
+
+class PublishKafkaOnScheduleTests : public IntegrationBase {
+public:
+    virtual void runAssertions() {
+      std::string logs = LogTestController::getInstance().log_output.str();
+      size_t pos = 0;
+      size_t last_pos = 0;
+      unsigned int occurances = 0;
+      do {
+        pos = logs.find(" value 1 is outside allowed range 1000..1000000000", pos);
 
 Review comment:
   Wrapped the logic to a new function

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375834179
 
 

 ##########
 File path: libminifi/include/core/state/Value.h
 ##########
 @@ -87,6 +88,15 @@ class Value {
     type_id = std::type_index(typeid(T));
   }
 
+  virtual bool getValue(uint32_t &ref) {
+    const auto negative = string_value.find_first_of('-') != std::string::npos;
 
 Review comment:
   You don't really need the variable here, simply put this in the "if"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #710: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #710: MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
URL: https://github.com/apache/nifi-minifi-cpp/pull/710#discussion_r375835365
 
 

 ##########
 File path: libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
 ##########
 @@ -29,32 +27,22 @@ class OnScheduleErrorHandlingTests : public IntegrationBase {
  public:
   virtual void runAssertions() {
     std::string logs = LogTestController::getInstance().log_output.str();
-    size_t pos = 0;
-    size_t last_pos = 0;
-    unsigned int occurances = 0;
-    do {
-      pos = logs.find(minifi::processors::KamikazeProcessor::OnScheduleExceptionStr, pos);
-      if (pos != std::string::npos) {
-        last_pos = pos;
-        pos = logs.find(minifi::processors::KamikazeProcessor::OnUnScheduleLogStr, pos);
-        if (pos != std::string::npos) {
-          last_pos = pos;
-          occurances++;
-        }
-      }
-    } while (pos != std::string::npos);
 
-    assert(occurances > 1);  // Verify retry of onSchedule and onUnSchedule calls
+    auto result = countPatInStr(logs, minifi::processors::KamikazeProcessor::OnScheduleExceptionStr);
+    size_t last_pos = result.first;
+    unsigned int occurances = result.second;
 
 Review comment:
   Will need to change this to uint32_t as well

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services