You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/06/29 12:56:51 UTC

[GitHub] [nifi-minifi-cpp] lordgamez opened a new pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka

lordgamez opened a new pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120


   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r681062674



##########
File path: docker/test/integration/minifi/core/SingleNodeDockerCluster.py
##########
@@ -225,21 +224,32 @@ def deploy_kafka_broker(self):
         logging.info('Adding container \'%s\'', zookeeper.name)
         self.containers[zookeeper.name] = zookeeper
 
+    def deploy_kafka_broker(self):
+        logging.info('Creating and running docker containers for kafka broker...')
+        self.deploy_zookeeper()
+
         test_dir = os.environ['PYTHONPATH'].split(':')[-1]  # Based on DockerVerify.sh
         broker_image = self.build_image_by_path(test_dir + "/resources/kafka_broker", 'minifi-kafka')
         broker = self.client.containers.run(
             broker_image[0],
             detach=True,
             name='kafka-broker',
             network=self.network.name,
-            ports={'9092/tcp': 9092, '29092/tcp': 29092},
+            ports={'9092/tcp': 9092, '29092/tcp': 29092, '9093/tcp': 9093, '29093/tcp': 29093},
             environment=[
                 "KAFKA_BROKER_ID=1",
-                'ALLOW_PLAINTEXT_LISTENER: "yes"',
-                "KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,PLAINTEXT_HOST://0.0.0.0:29092",
-                "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL",
-                "KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,PLAINTEXT_HOST://localhost:29092",
-                "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"])
+                "ALLOW_PLAINTEXT_LISTENER=yes",
+                "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true",
+                "KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,SSL_HOST://0.0.0.0:29093,PLAINTEXT_HOST://0.0.0.0:29092",
+                "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL",
+                "KAFKA_SECURITY_INTER_BROKER_PROTOCOL=SSL",
+                "KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092,PLAINTEXT_HOST://localhost:29092,SSL://kafka-broker:9093,SSL_HOST://localhost:29093",
+                "KAFKA_HEAP_OPTS=-Xms512m -Xmx1g",
+                "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181",
+                "SSL_CLIENT_AUTH=none"],
+            volumes=self.vols,
+            sysctls={"net.ipv6.conf.all.disable_ipv6": "1"})

Review comment:
       There was a theory that the ipv6 addresses cause networking problem in docker, but that was disproved. I removed it in af11b5fd586fe44f402e4704230aa3d01d29f838




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r681072581



##########
File path: extensions/librdkafka/tests/ConsumeKafkaTests.cpp
##########
@@ -586,4 +598,19 @@ TEST_CASE_METHOD(ConsumeKafkaContinuousPublishingTest, "ConsumeKafka can spend n
   //  I tried adding a wait time for more than "session.timeout.ms" inbetween tests, but it was not sufficient
 }
 
+TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "ConsumeKafka can communicate with the broker via SSL.", "[ConsumeKafka][Kafka][SSL]") {
+  auto run_tests = [&] (const std::vector<std::string>& messages_on_topic) {
+    single_consumer_with_plain_text_test(true, {}, messages_on_topic, NON_TRANSACTIONAL_MESSAGES, {}, "localhost:9093", ConsumeKafka::SECURITY_PROTOCOL_SSL, "ConsumeKafkaTest", {}, {}, "test_group_id", {}, {}, {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT
+  };
+  const auto get_current_timestamp = [] {
+    const std::time_t result = std::time(nullptr);
+    std::stringstream time_stream;
+    time_stream << std::asctime(std::localtime(&result));
+    return time_stream.str();
+  };

Review comment:
       I would prefer not to put too much additional effort into these tests, as there is already a [Jira ticket](https://issues.apache.org/jira/browse/MINIFICPP-1518) for removing them in the future, as it is only a manual test suite and the test cases are already implemented (or mostly implemented) in the integration test suite. After this PR is merged the parity between the manual and the automated test suites of the kafka extension should be evaluated and after the gaps are fixed, the manual suites can be removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r680863837



##########
File path: docker/test/integration/resources/kafka_broker/conf/server-ssl.properties
##########
@@ -135,12 +135,13 @@ zookeeper.connection.timeout.ms=6000
 # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
 group.initial.rebalance.delay.ms=0
 
-security.inter.broker.protocol=SSL
-listeners=PLAINTEXT://:9092,SSL://:9093
+listeners=SSL://kafka-broker:9093,SSL_HOST://0.0.0.0:29093
+advertised.listeners=SSL://kafka-broker:9093,SSL_HOST://localhost:29093
+listener.security.protocol.map=SSL:SSL,SSL_HOST:SSL
 
 # SSL
 ssl.protocol = TLS
-ssl.enabled.protocols=TLSv1.2
+ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1

Review comment:
       I'd rather test the original, more strict protocol suite. Proving that high security configurations work is more valuable than proving that low security ones work, since you can always go higher security, but lower may not be acceptable to some users.

##########
File path: docker/test/integration/minifi/core/SingleNodeDockerCluster.py
##########
@@ -225,21 +224,32 @@ def deploy_kafka_broker(self):
         logging.info('Adding container \'%s\'', zookeeper.name)
         self.containers[zookeeper.name] = zookeeper
 
+    def deploy_kafka_broker(self):
+        logging.info('Creating and running docker containers for kafka broker...')
+        self.deploy_zookeeper()
+
         test_dir = os.environ['PYTHONPATH'].split(':')[-1]  # Based on DockerVerify.sh
         broker_image = self.build_image_by_path(test_dir + "/resources/kafka_broker", 'minifi-kafka')
         broker = self.client.containers.run(
             broker_image[0],
             detach=True,
             name='kafka-broker',
             network=self.network.name,
-            ports={'9092/tcp': 9092, '29092/tcp': 29092},
+            ports={'9092/tcp': 9092, '29092/tcp': 29092, '9093/tcp': 9093, '29093/tcp': 29093},
             environment=[
                 "KAFKA_BROKER_ID=1",
-                'ALLOW_PLAINTEXT_LISTENER: "yes"',
-                "KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,PLAINTEXT_HOST://0.0.0.0:29092",
-                "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL",
-                "KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,PLAINTEXT_HOST://localhost:29092",
-                "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"])
+                "ALLOW_PLAINTEXT_LISTENER=yes",
+                "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true",
+                "KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,SSL_HOST://0.0.0.0:29093,PLAINTEXT_HOST://0.0.0.0:29092",
+                "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL",
+                "KAFKA_SECURITY_INTER_BROKER_PROTOCOL=SSL",
+                "KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092,PLAINTEXT_HOST://localhost:29092,SSL://kafka-broker:9093,SSL_HOST://localhost:29093",
+                "KAFKA_HEAP_OPTS=-Xms512m -Xmx1g",
+                "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181",
+                "SSL_CLIENT_AUTH=none"],
+            volumes=self.vols,
+            sysctls={"net.ipv6.conf.all.disable_ipv6": "1"})

Review comment:
       Is there a reason for disabling IPv6 in the container?

##########
File path: extensions/librdkafka/tests/ConsumeKafkaTests.cpp
##########
@@ -586,4 +598,19 @@ TEST_CASE_METHOD(ConsumeKafkaContinuousPublishingTest, "ConsumeKafka can spend n
   //  I tried adding a wait time for more than "session.timeout.ms" inbetween tests, but it was not sufficient
 }
 
+TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "ConsumeKafka can communicate with the broker via SSL.", "[ConsumeKafka][Kafka][SSL]") {
+  auto run_tests = [&] (const std::vector<std::string>& messages_on_topic) {
+    single_consumer_with_plain_text_test(true, {}, messages_on_topic, NON_TRANSACTIONAL_MESSAGES, {}, "localhost:9093", ConsumeKafka::SECURITY_PROTOCOL_SSL, "ConsumeKafkaTest", {}, {}, "test_group_id", {}, {}, {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT
+  };
+  const auto get_current_timestamp = [] {
+    const std::time_t result = std::time(nullptr);
+    std::stringstream time_stream;
+    time_stream << std::asctime(std::localtime(&result));
+    return time_stream.str();
+  };

Review comment:
       I'd prefer a C++'y way of doing this, using the chrono, date and date-tz libraries. In addition to the below suggestion, you need to link the date and date-tz targets to the test and include "date/date.h" and "date/tz.h".
   ```suggestion
     const auto get_current_timestamp = [] {
       return date::format("%a %b %e %H:%M:%S %Y", date::make_zoned(date::current_zone(), std::chrono::floor<std::chrono::seconds>(std::chrono::system_clock::now())));
     };
   ```
   
   The C++20 version would look like this, but currently only MSVC implements the timezone and format support required:
   ```
     const auto get_current_timestamp = [] {
       return std::format("{:%a %b %e %H:%M:%S %Y}", std::chrono::zoned_time{std::chrono::current_zone(), std::chrono::floor<std::chrono::seconds>(std::chrono::system_clock::now())});
     };
   ```
   
   If you don't want to introduce the dependencies just to save a few lines, that's also fine for me.

##########
File path: extensions/librdkafka/ConsumeKafka.cpp
##########
@@ -185,7 +205,11 @@ void ConsumeKafka::initialize() {
     DuplicateHeaderHandling,
     MaxPollRecords,
     MaxPollTime,
-    SessionTimeout
+    SessionTimeout,
+    SecurityCA,
+    SecurityCert,
+    SecurityPrivateKey,
+    SecurityPrivateKeyPassword

Review comment:
       NiFi uses SSLContextService to provide these details. We seem to have the same implemented in MiNiFi C++, so I would prefer to have the same approach.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r681429910



##########
File path: extensions/librdkafka/ConsumeKafka.cpp
##########
@@ -185,7 +205,11 @@ void ConsumeKafka::initialize() {
     DuplicateHeaderHandling,
     MaxPollRecords,
     MaxPollTime,
-    SessionTimeout
+    SessionTimeout,
+    SecurityCA,
+    SecurityCert,
+    SecurityPrivateKey,
+    SecurityPrivateKeyPassword

Review comment:
       Not sure what would be the best here. I'm still leaning towards SSLContextService, but we can also add it later and support both approaches. Let's gather more feedback!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka

Posted by GitBox <gi...@apache.org>.
szaszm closed pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r681066965



##########
File path: extensions/librdkafka/ConsumeKafka.cpp
##########
@@ -185,7 +205,11 @@ void ConsumeKafka::initialize() {
     DuplicateHeaderHandling,
     MaxPollRecords,
     MaxPollTime,
-    SessionTimeout
+    SessionTimeout,
+    SecurityCA,
+    SecurityCert,
+    SecurityPrivateKey,
+    SecurityPrivateKeyPassword

Review comment:
       I'm on the edge about this. It's true that NiFi uses SSLContextService and that would make sense that we would use it as well as it is already implemented. But in the kafka extension we currently have the `PublishKafka` and the `ConsumeKafka` processors and as `PublishKafka` uses these attributes for SSL configuration instead of the SSLContextService it would also make sense to have the same properties in `ConsumeKafka` to be consistent within the extension.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r705419173



##########
File path: docker/test/integration/features/kafka.feature
##########
@@ -44,12 +44,9 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
       | PublishKafka   | Delivery Guarantee     | 1                                          |
       | PublishKafka   | Request Timeout        | 10 sec                                     |
       | PublishKafka   | Message Timeout        | 12 sec                                     |
-      | PublishKafka   | Security CA            | /tmp/resources/certs/ca-cert               |

Review comment:
       Makes sense, I added that version as well in 0cf6d007765d021d20573b0be61c6b793b60bf80




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r704286483



##########
File path: extensions/librdkafka/ConsumeKafka.cpp
##########
@@ -211,14 +218,22 @@ void ConsumeKafka::onSchedule(core::ProcessContext* context, core::ProcessSessio
   context->getProperty(MessageHeaderEncoding.getName(), message_header_encoding_);
   context->getProperty(DuplicateHeaderHandling.getName(), duplicate_header_handling_);
 
+  std::string ssl_service_name;
+  std::shared_ptr<minifi::controllers::SSLContextService> ssl_service;
+  if (context->getProperty(SSLContextService.getName(), ssl_service_name) && !ssl_service_name.empty()) {
+    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(ssl_service_name);
+    if (service) {
+      ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
+      ssl_data_.ca_loc = ssl_service->getCACertificate();
+      ssl_data_.cert_loc = ssl_service->getCertificateFile();
+      ssl_data_.key_loc = ssl_service->getPrivateKeyFile();
+      ssl_data_.key_pw = ssl_service->getPassphrase();
+    }
+  }

Review comment:
       I think a warning log would be helpful, when the SSLContextService property is present, but the name lookup or the conversion didn't yield a valid ssl context service.

##########
File path: extensions/librdkafka/PublishKafka.cpp
##########
@@ -709,37 +717,70 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessCon
         auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
         throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
       }
-      value = "";
-      if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
-        result = rd_kafka_conf_set(conf_.get(), "ssl.ca.location", value.c_str(), errstr.data(), errstr.size());
-        logger_->log_debug("PublishKafka: ssl.ca.location [%s]", value);
+
+      std::shared_ptr<minifi::controllers::SSLContextService> ssl_service;
+      if (context->getProperty(SSLContextService.getName(), value) && !value.empty()) {
+        std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
+        if (service) {
+          ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
+        }
+      }

Review comment:
       Same here. Maybe it's also worth a log message if SSL protocol is selected, but neither SSLContextService nor the individual properties are present.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r681063867



##########
File path: docker/test/integration/resources/kafka_broker/conf/server-ssl.properties
##########
@@ -135,12 +135,13 @@ zookeeper.connection.timeout.ms=6000
 # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
 group.initial.rebalance.delay.ms=0
 
-security.inter.broker.protocol=SSL
-listeners=PLAINTEXT://:9092,SSL://:9093
+listeners=SSL://kafka-broker:9093,SSL_HOST://0.0.0.0:29093
+advertised.listeners=SSL://kafka-broker:9093,SSL_HOST://localhost:29093
+listener.security.protocol.map=SSL:SSL,SSL_HOST:SSL
 
 # SSL
 ssl.protocol = TLS
-ssl.enabled.protocols=TLSv1.2
+ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1

Review comment:
       I did not consider this, that makes sense. Updated in af11b5fd586fe44f402e4704230aa3d01d29f838




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r686948120



##########
File path: extensions/librdkafka/ConsumeKafka.cpp
##########
@@ -185,7 +205,11 @@ void ConsumeKafka::initialize() {
     DuplicateHeaderHandling,
     MaxPollRecords,
     MaxPollTime,
-    SessionTimeout
+    SessionTimeout,
+    SecurityCA,
+    SecurityCert,
+    SecurityPrivateKey,
+    SecurityPrivateKeyPassword

Review comment:
       Added `SSLContextService` to `PublishKafka` and `ConsumeKafka` and added comment for deprecating SSL parameters in `PublishKafka` in 0bf54be561ccab7f0f6da29a9b1e6b8268d5e516. I also removed the `ConsumeKafkaTests.cpp` file as all test cases are now moved to the integration test suite, so there is no need to maintain it anymore.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r705286235



##########
File path: docker/test/integration/features/kafka.feature
##########
@@ -44,12 +44,9 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
       | PublishKafka   | Delivery Guarantee     | 1                                          |
       | PublishKafka   | Request Timeout        | 10 sec                                     |
       | PublishKafka   | Message Timeout        | 12 sec                                     |
-      | PublishKafka   | Security CA            | /tmp/resources/certs/ca-cert               |

Review comment:
       although we deprecated these properties in favor of the context service, shouldn't we keep testing the feature until we remove them?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1120: MINIFICPP-1374 Implement security protocol support for ConsumeKafka

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #1120:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1120#discussion_r705083561



##########
File path: extensions/librdkafka/ConsumeKafka.cpp
##########
@@ -211,14 +218,22 @@ void ConsumeKafka::onSchedule(core::ProcessContext* context, core::ProcessSessio
   context->getProperty(MessageHeaderEncoding.getName(), message_header_encoding_);
   context->getProperty(DuplicateHeaderHandling.getName(), duplicate_header_handling_);
 
+  std::string ssl_service_name;
+  std::shared_ptr<minifi::controllers::SSLContextService> ssl_service;
+  if (context->getProperty(SSLContextService.getName(), ssl_service_name) && !ssl_service_name.empty()) {
+    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(ssl_service_name);
+    if (service) {
+      ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
+      ssl_data_.ca_loc = ssl_service->getCACertificate();
+      ssl_data_.cert_loc = ssl_service->getCertificateFile();
+      ssl_data_.key_loc = ssl_service->getPrivateKeyFile();
+      ssl_data_.key_pw = ssl_service->getPassphrase();
+    }
+  }

Review comment:
       Updated in 0e5f15b290e21b011e99628a6e2198e88cfad106

##########
File path: extensions/librdkafka/PublishKafka.cpp
##########
@@ -709,37 +717,70 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessCon
         auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
         throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
       }
-      value = "";
-      if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
-        result = rd_kafka_conf_set(conf_.get(), "ssl.ca.location", value.c_str(), errstr.data(), errstr.size());
-        logger_->log_debug("PublishKafka: ssl.ca.location [%s]", value);
+
+      std::shared_ptr<minifi::controllers::SSLContextService> ssl_service;
+      if (context->getProperty(SSLContextService.getName(), value) && !value.empty()) {
+        std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
+        if (service) {
+          ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
+        }
+      }

Review comment:
       Updated in 0e5f15b290e21b011e99628a6e2198e88cfad106




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org