You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by lo...@apache.org on 2022/03/11 14:25:34 UTC

[nifi-minifi-cpp] 02/02: MINIFICPP-1773 - Crash when using Provenance Repository in properties

This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 2200aadbc1b3d6dd927b5d1687ef72cf5fa60a36
Author: Adam Markovics <nu...@gmail.com>
AuthorDate: Wed Mar 9 11:35:12 2022 +0100

    MINIFICPP-1773 - Crash when using Provenance Repository in properties
    
    Signed-off-by: Gabor Gyimesi <ga...@gmail.com>
    
    This closes #1282
---
 .../features/core_functionality.feature            |  5 ++++
 docker/test/integration/minifi/core/ImageStore.py  | 34 ++++++++++++++++++----
 .../integration/minifi/core/MinifiContainer.py     |  4 +--
 ...ner.py => MinifiWithProvenanceRepoContainer.py} |  9 ++----
 .../minifi/core/SingleNodeDockerCluster.py         |  3 ++
 .../minifi/core/TransientMinifiContainer.py        |  2 --
 .../minifi.properties                              | 32 ++++++++++++++++++++
 docker/test/integration/steps/steps.py             |  5 ++++
 extensions/rocksdb-repos/ProvenanceRepository.h    |  4 +--
 .../include/core/repository/VolatileRepository.h   |  6 ++--
 10 files changed, 82 insertions(+), 22 deletions(-)

diff --git a/docker/test/integration/features/core_functionality.feature b/docker/test/integration/features/core_functionality.feature
index 54e0e76..3f3330e 100644
--- a/docker/test/integration/features/core_functionality.feature
+++ b/docker/test/integration/features/core_functionality.feature
@@ -35,3 +35,8 @@ Feature: Core flow functionalities
     Given a LogOnDestructionProcessor processor with the name "logOnDestruction" in the "transient-minifi" flow with engine "transient-minifi"
     When the MiNiFi instance starts up
     Then the Minifi logs contain the following message: "LogOnDestructionProcessor is being destructed" in less than 100 seconds
+
+  Scenario: Agent does not crash when using provenance repositories
+    Given a GenerateFlowFile processor with the name "generateFlowFile" in the "minifi-cpp-with-provenance-repo" flow with engine "minifi-cpp-with-provenance-repo"
+    When the MiNiFi instance starts up
+    Then the "minifi-cpp-with-provenance-repo" flow has a log line matching "MiNiFi started" in less than 30 seconds
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index 8d3baed..f6cf1df 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -45,6 +45,8 @@ class ImageStore:
 
         if container_engine == "minifi-cpp" or container_engine == "transient-minifi":
             image = self.__build_minifi_cpp_image()
+        elif container_engine == "minifi-cpp-with-provenance-repo":
+            image = self.__build_minifi_cpp_image_with_provenance_repo()
         elif container_engine == "http-proxy":
             image = self.__build_http_proxy_image()
         elif container_engine == "nifi":
@@ -64,7 +66,8 @@ class ImageStore:
         return image
 
     def __build_minifi_cpp_image(self):
-        dockerfile = dedent("""FROM {base_image}
+        dockerfile = dedent("""\
+                FROM {base_image}
                 USER root
                 RUN apk --update --no-cache add psqlodbc
                 RUN echo "[PostgreSQL ANSI]" > /odbcinst.ini.template && \
@@ -100,8 +103,26 @@ class ImageStore:
 
         return self.__build_image(dockerfile)
 
+    def __build_minifi_cpp_image_with_provenance_repo(self):
+        dockerfile = dedent("""\
+                FROM {base_image}
+                USER root
+                COPY minifi.properties {minifi_root}/conf/minifi.properties
+                USER minificpp
+                """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_VERSION,
+                           minifi_root=MinifiContainer.MINIFI_ROOT))
+
+        properties_path = self.test_dir + "/resources/minifi_cpp_with_provenance_repo/minifi.properties"
+        properties_context = {'name': 'minifi.properties', 'size': os.path.getsize(properties_path)}
+
+        with open(properties_path, 'rb') as properties_file:
+            properties_context['file_obj'] = properties_file
+            image = self.__build_image(dockerfile, [properties_context])
+        return image
+
     def __build_http_proxy_image(self):
-        dockerfile = dedent("""FROM {base_image}
+        dockerfile = dedent("""\
+                FROM {base_image}
                 RUN apt -y update && apt install -y apache2-utils
                 RUN htpasswd -b -c /etc/squid/.squid_users {proxy_username} {proxy_password}
                 RUN echo 'auth_param basic program /usr/lib/squid3/basic_ncsa_auth /etc/squid/.squid_users'  > /etc/squid/squid.conf && \
@@ -115,7 +136,8 @@ class ImageStore:
         return self.__build_image(dockerfile)
 
     def __build_nifi_image(self):
-        dockerfile = dedent(r"""FROM {base_image}
+        dockerfile = dedent(r"""\
+                FROM {base_image}
                 USER root
                 RUN sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' {nifi_root}/conf/nifi.properties
                 RUN sed -i -e 's/^\(nifi.remote.input.socket.port\)=.*/\1=5000/' {nifi_root}/conf/nifi.properties
@@ -127,7 +149,8 @@ class ImageStore:
         return self.__build_image(dockerfile)
 
     def __build_postgresql_server_image(self):
-        dockerfile = dedent("""FROM {base_image}
+        dockerfile = dedent("""\
+                FROM {base_image}
                 RUN mkdir -p /docker-entrypoint-initdb.d
                 RUN echo "#!/bin/bash" > /docker-entrypoint-initdb.d/init-user-db.sh && \
                     echo "set -e" >> /docker-entrypoint-initdb.d/init-user-db.sh && \
@@ -147,7 +170,8 @@ class ImageStore:
         return self.__build_image_by_path(self.test_dir + "/resources/kafka_broker", 'minifi-kafka')
 
     def __build_mqtt_broker_image(self):
-        dockerfile = dedent("""FROM {base_image}
+        dockerfile = dedent("""\
+            FROM {base_image}
             RUN echo 'log_dest stderr' >> /mosquitto-no-auth.conf
             CMD ["/usr/sbin/mosquitto", "--verbose", "--config-file", "/mosquitto-no-auth.conf"]
             """.format(base_image='eclipse-mosquitto:2.0.12'))
diff --git a/docker/test/integration/minifi/core/MinifiContainer.py b/docker/test/integration/minifi/core/MinifiContainer.py
index 770bf75..71cb2c9 100644
--- a/docker/test/integration/minifi/core/MinifiContainer.py
+++ b/docker/test/integration/minifi/core/MinifiContainer.py
@@ -24,10 +24,10 @@ class MinifiContainer(FlowContainer):
     MINIFI_VERSION = os.environ['MINIFI_VERSION']
     MINIFI_ROOT = '/opt/minifi/nifi-minifi-cpp-' + MINIFI_VERSION
 
-    def __init__(self, config_dir, name, vols, network, image_store, command=None):
+    def __init__(self, config_dir, name, vols, network, image_store, command=None, engine='minifi-cpp'):
         if not command:
             command = ["/bin/sh", "-c", "cp /tmp/minifi_config/config.yml " + MinifiContainer.MINIFI_ROOT + "/conf && /opt/minifi/minifi-current/bin/minifi.sh run"]
-        super().__init__(config_dir, name, 'minifi-cpp', vols, network, image_store, command)
+        super().__init__(config_dir, name, engine, vols, network, image_store, command)
 
     def get_startup_finished_log_entry(self):
         return "Starting Flow Controller"
diff --git a/docker/test/integration/minifi/core/TransientMinifiContainer.py b/docker/test/integration/minifi/core/MinifiWithProvenanceRepoContainer.py
similarity index 76%
copy from docker/test/integration/minifi/core/TransientMinifiContainer.py
copy to docker/test/integration/minifi/core/MinifiWithProvenanceRepoContainer.py
index 9c93a08..dd8529b 100644
--- a/docker/test/integration/minifi/core/TransientMinifiContainer.py
+++ b/docker/test/integration/minifi/core/MinifiWithProvenanceRepoContainer.py
@@ -1,5 +1,3 @@
-# @file TransientMinifiContainer.py
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -18,9 +16,6 @@
 from .MinifiContainer import MinifiContainer
 
 
-class TransientMinifiContainer(MinifiContainer):
+class MinifiWithProvenanceRepoContainer(MinifiContainer):
     def __init__(self, config_dir, name, vols, network, image_store, command=None):
-        if not command:
-            command = ["/bin/sh", "-c",
-                       "cp /tmp/minifi_config/config.yml ./conf/ && ./bin/minifi.sh start && sleep 10 && ./bin/minifi.sh stop && sleep 100"]
-        super().__init__(config_dir, name, vols, network, image_store, command)
+        super().__init__(config_dir, name, vols, network, image_store, command, engine='minifi-cpp-with-provenance-repo')
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index 2cdffd5..a72e972 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -21,6 +21,7 @@ import uuid
 from .Cluster import Cluster
 from .MinifiContainer import MinifiContainer
 from .TransientMinifiContainer import TransientMinifiContainer
+from .MinifiWithProvenanceRepoContainer import MinifiWithProvenanceRepoContainer
 from .NifiContainer import NifiContainer
 from .ZookeeperContainer import ZookeeperContainer
 from .KafkaBrokerContainer import KafkaBrokerContainer
@@ -90,6 +91,8 @@ class SingleNodeDockerCluster(Cluster):
             return self.containers.setdefault(name, MinifiAsPodInKubernetesCluster(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
         elif engine == 'transient-minifi':
             return self.containers.setdefault(name, TransientMinifiContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
+        elif engine == 'minifi-cpp-with-provenance-repo':
+            return self.containers.setdefault(name, MinifiWithProvenanceRepoContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
         elif engine == 'kafka-broker':
             if 'zookeeper' not in self.containers:
                 self.containers.setdefault('zookeeper', ZookeeperContainer('zookeeper', self.vols, self.network, self.image_store, command))
diff --git a/docker/test/integration/minifi/core/TransientMinifiContainer.py b/docker/test/integration/minifi/core/TransientMinifiContainer.py
index 9c93a08..eb8fce4 100644
--- a/docker/test/integration/minifi/core/TransientMinifiContainer.py
+++ b/docker/test/integration/minifi/core/TransientMinifiContainer.py
@@ -1,5 +1,3 @@
-# @file TransientMinifiContainer.py
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
diff --git a/docker/test/integration/resources/minifi_cpp_with_provenance_repo/minifi.properties b/docker/test/integration/resources/minifi_cpp_with_provenance_repo/minifi.properties
new file mode 100644
index 0000000..9ef59f3
--- /dev/null
+++ b/docker/test/integration/resources/minifi_cpp_with_provenance_repo/minifi.properties
@@ -0,0 +1,32 @@
+nifi.version=0.12.0
+nifi.flow.configuration.file=./conf/config.yml
+nifi.administrative.yield.duration=30 sec
+nifi.bored.yield.duration=10 millis
+nifi.extension.path=../extensions/*
+nifi.provenance.repository.directory.default=${MINIFI_HOME}/provenance_repository
+nifi.provenance.repository.max.storage.time=1 MIN
+nifi.provenance.repository.max.storage.size=1 MB
+nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository
+nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository
+nifi.remote.input.secure=false
+nifi.security.need.ClientAuth=false
+nifi.security.client.certificate=/tmp/shared/identity.pem
+nifi.security.client.private.key=/tmp/shared/identity.pem
+nifi.security.client.ca.certificate=/tmp/shared/nifi-cert.pem
+nifi.c2.enable=true
+nifi.c2.agent.class=minifi-cpp-latest
+nifi.c2.agent.listen=false
+nifi.c2.agent.heartbeat.period=30000
+nifi.c2.agent.heartbeat.reporter.classes=RESTReceiver
+nifi.c2.agent.protocol.class=RESTSender
+nifi.c2.full.heartbeat=false
+nifi.c2.rest.url=http://local-cem-efm:10091/efm/api/c2-protocol/heartbeat
+nifi.c2.rest.url.ack=http://local-cem-efm:10091/efm/api/c2-protocol/acknowledge
+nifi.c2.rest.listener.port=8765
+nifi.c2.rest.listener.heartbeat.rooturi=/path
+nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,RepositoryMetrics
+nifi.c2.root.class.definitions.DeviceInfo.name=deviceinfo
+nifi.c2.root.class.definitions.DeviceInfo.classes=DeviceInfoNode
+nifi.flow.metrics.class.definitions=15
+nifi.flow.engine.threads=10
+nifi.remote.input.http.enabled=true
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 870af59..31dbf5f 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -731,6 +731,11 @@ def step_impl(context, log_pattern):
     context.test.wait_for_container_logs('mqtt-broker', log_pattern, 30, count=1)
 
 
+@then("the \"{minifi_container_name}\" flow has a log line matching \"{log_pattern}\" in less than {duration}")
+def step_impl(context, minifi_container_name, log_pattern, duration):
+    context.test.wait_for_container_logs(minifi_container_name, log_pattern, timeparse(duration), count=1)
+
+
 @then("an MQTT broker is deployed in correspondence with the PublishMQTT")
 def step_impl(context):
     context.test.acquire_container("mqtt-broker", "mqtt-broker")
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h
index 068d14c..dcfc39f 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.h
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -42,7 +42,7 @@ namespace provenance {
 constexpr auto MAX_PROVENANCE_ENTRY_LIFE_TIME = std::chrono::minutes(1);
 constexpr auto PROVENANCE_PURGE_PERIOD = std::chrono::milliseconds(2500);
 
-class ProvenanceRepository : public core::Repository, public std::enable_shared_from_this<ProvenanceRepository> {
+class ProvenanceRepository : public core::Repository {
  public:
   ProvenanceRepository(const std::string& name, const utils::Identifier& /*uuid*/)
       : ProvenanceRepository(name) {
@@ -72,7 +72,7 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_
     if (running_)
       return;
     running_ = true;
-    thread_ = std::thread(&ProvenanceRepository::run, shared_from_this());
+    thread_ = std::thread(&ProvenanceRepository::run, this);
     logger_->log_debug("%s Repository Monitor Thread Start", name_);
   }
 
diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h
index f5ef08c..67f48bc 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -51,9 +51,7 @@ namespace repository {
  * Design: Extends Repository and implements the run function, using RocksDB as the primary substrate.
  */
 template<typename T>
-class VolatileRepository : public core::Repository, public utils::EnableSharedFromThis<VolatileRepository<T>> {
-  using utils::EnableSharedFromThis<VolatileRepository<T>>::sharedFromThis;
-
+class VolatileRepository : public core::Repository {
  public:
   static const char *volatile_repo_max_count;
   static const char *volatile_repo_max_bytes;
@@ -397,7 +395,7 @@ void VolatileRepository<T>::start() {
   if (running_)
     return;
   running_ = true;
-  thread_ = std::thread(&VolatileRepository<T>::run, sharedFromThis());
+  thread_ = std::thread(&VolatileRepository<T>::run, this);
   logger_->log_debug("%s Repository Monitor Thread Start", name_);
 }
 #if defined(__clang__)