You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by lo...@apache.org on 2022/03/11 14:25:34 UTC
[nifi-minifi-cpp] 02/02: MINIFICPP-1773 - Crash when using Provenance Repository in properties
This is an automated email from the ASF dual-hosted git repository.
lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 2200aadbc1b3d6dd927b5d1687ef72cf5fa60a36
Author: Adam Markovics <nu...@gmail.com>
AuthorDate: Wed Mar 9 11:35:12 2022 +0100
MINIFICPP-1773 - Crash when using Provenance Repository in properties
Signed-off-by: Gabor Gyimesi <ga...@gmail.com>
This closes #1282
---
.../features/core_functionality.feature | 5 ++++
docker/test/integration/minifi/core/ImageStore.py | 34 ++++++++++++++++++----
.../integration/minifi/core/MinifiContainer.py | 4 +--
...ner.py => MinifiWithProvenanceRepoContainer.py} | 9 ++----
.../minifi/core/SingleNodeDockerCluster.py | 3 ++
.../minifi/core/TransientMinifiContainer.py | 2 --
.../minifi.properties | 32 ++++++++++++++++++++
docker/test/integration/steps/steps.py | 5 ++++
extensions/rocksdb-repos/ProvenanceRepository.h | 4 +--
.../include/core/repository/VolatileRepository.h | 6 ++--
10 files changed, 82 insertions(+), 22 deletions(-)
diff --git a/docker/test/integration/features/core_functionality.feature b/docker/test/integration/features/core_functionality.feature
index 54e0e76..3f3330e 100644
--- a/docker/test/integration/features/core_functionality.feature
+++ b/docker/test/integration/features/core_functionality.feature
@@ -35,3 +35,8 @@ Feature: Core flow functionalities
Given a LogOnDestructionProcessor processor with the name "logOnDestruction" in the "transient-minifi" flow with engine "transient-minifi"
When the MiNiFi instance starts up
Then the Minifi logs contain the following message: "LogOnDestructionProcessor is being destructed" in less than 100 seconds
+
+ Scenario: Agent does not crash when using provenance repositories
+ Given a GenerateFlowFile processor with the name "generateFlowFile" in the "minifi-cpp-with-provenance-repo" flow with engine "minifi-cpp-with-provenance-repo"
+ When the MiNiFi instance starts up
+ Then the "minifi-cpp-with-provenance-repo" flow has a log line matching "MiNiFi started" in less than 30 seconds
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index 8d3baed..f6cf1df 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -45,6 +45,8 @@ class ImageStore:
if container_engine == "minifi-cpp" or container_engine == "transient-minifi":
image = self.__build_minifi_cpp_image()
+ elif container_engine == "minifi-cpp-with-provenance-repo":
+ image = self.__build_minifi_cpp_image_with_provenance_repo()
elif container_engine == "http-proxy":
image = self.__build_http_proxy_image()
elif container_engine == "nifi":
@@ -64,7 +66,8 @@ class ImageStore:
return image
def __build_minifi_cpp_image(self):
- dockerfile = dedent("""FROM {base_image}
+ dockerfile = dedent("""\
+ FROM {base_image}
USER root
RUN apk --update --no-cache add psqlodbc
RUN echo "[PostgreSQL ANSI]" > /odbcinst.ini.template && \
@@ -100,8 +103,26 @@ class ImageStore:
return self.__build_image(dockerfile)
+ def __build_minifi_cpp_image_with_provenance_repo(self):
+ dockerfile = dedent("""\
+ FROM {base_image}
+ USER root
+ COPY minifi.properties {minifi_root}/conf/minifi.properties
+ USER minificpp
+ """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_VERSION,
+ minifi_root=MinifiContainer.MINIFI_ROOT))
+
+ properties_path = self.test_dir + "/resources/minifi_cpp_with_provenance_repo/minifi.properties"
+ properties_context = {'name': 'minifi.properties', 'size': os.path.getsize(properties_path)}
+
+ with open(properties_path, 'rb') as properties_file:
+ properties_context['file_obj'] = properties_file
+ image = self.__build_image(dockerfile, [properties_context])
+ return image
+
def __build_http_proxy_image(self):
- dockerfile = dedent("""FROM {base_image}
+ dockerfile = dedent("""\
+ FROM {base_image}
RUN apt -y update && apt install -y apache2-utils
RUN htpasswd -b -c /etc/squid/.squid_users {proxy_username} {proxy_password}
RUN echo 'auth_param basic program /usr/lib/squid3/basic_ncsa_auth /etc/squid/.squid_users' > /etc/squid/squid.conf && \
@@ -115,7 +136,8 @@ class ImageStore:
return self.__build_image(dockerfile)
def __build_nifi_image(self):
- dockerfile = dedent(r"""FROM {base_image}
+ dockerfile = dedent(r"""\
+ FROM {base_image}
USER root
RUN sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' {nifi_root}/conf/nifi.properties
RUN sed -i -e 's/^\(nifi.remote.input.socket.port\)=.*/\1=5000/' {nifi_root}/conf/nifi.properties
@@ -127,7 +149,8 @@ class ImageStore:
return self.__build_image(dockerfile)
def __build_postgresql_server_image(self):
- dockerfile = dedent("""FROM {base_image}
+ dockerfile = dedent("""\
+ FROM {base_image}
RUN mkdir -p /docker-entrypoint-initdb.d
RUN echo "#!/bin/bash" > /docker-entrypoint-initdb.d/init-user-db.sh && \
echo "set -e" >> /docker-entrypoint-initdb.d/init-user-db.sh && \
@@ -147,7 +170,8 @@ class ImageStore:
return self.__build_image_by_path(self.test_dir + "/resources/kafka_broker", 'minifi-kafka')
def __build_mqtt_broker_image(self):
- dockerfile = dedent("""FROM {base_image}
+ dockerfile = dedent("""\
+ FROM {base_image}
RUN echo 'log_dest stderr' >> /mosquitto-no-auth.conf
CMD ["/usr/sbin/mosquitto", "--verbose", "--config-file", "/mosquitto-no-auth.conf"]
""".format(base_image='eclipse-mosquitto:2.0.12'))
diff --git a/docker/test/integration/minifi/core/MinifiContainer.py b/docker/test/integration/minifi/core/MinifiContainer.py
index 770bf75..71cb2c9 100644
--- a/docker/test/integration/minifi/core/MinifiContainer.py
+++ b/docker/test/integration/minifi/core/MinifiContainer.py
@@ -24,10 +24,10 @@ class MinifiContainer(FlowContainer):
MINIFI_VERSION = os.environ['MINIFI_VERSION']
MINIFI_ROOT = '/opt/minifi/nifi-minifi-cpp-' + MINIFI_VERSION
- def __init__(self, config_dir, name, vols, network, image_store, command=None):
+ def __init__(self, config_dir, name, vols, network, image_store, command=None, engine='minifi-cpp'):
if not command:
command = ["/bin/sh", "-c", "cp /tmp/minifi_config/config.yml " + MinifiContainer.MINIFI_ROOT + "/conf && /opt/minifi/minifi-current/bin/minifi.sh run"]
- super().__init__(config_dir, name, 'minifi-cpp', vols, network, image_store, command)
+ super().__init__(config_dir, name, engine, vols, network, image_store, command)
def get_startup_finished_log_entry(self):
return "Starting Flow Controller"
diff --git a/docker/test/integration/minifi/core/TransientMinifiContainer.py b/docker/test/integration/minifi/core/MinifiWithProvenanceRepoContainer.py
similarity index 76%
copy from docker/test/integration/minifi/core/TransientMinifiContainer.py
copy to docker/test/integration/minifi/core/MinifiWithProvenanceRepoContainer.py
index 9c93a08..dd8529b 100644
--- a/docker/test/integration/minifi/core/TransientMinifiContainer.py
+++ b/docker/test/integration/minifi/core/MinifiWithProvenanceRepoContainer.py
@@ -1,5 +1,3 @@
-# @file TransientMinifiContainer.py
-#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -18,9 +16,6 @@
from .MinifiContainer import MinifiContainer
-class TransientMinifiContainer(MinifiContainer):
+class MinifiWithProvenanceRepoContainer(MinifiContainer):
def __init__(self, config_dir, name, vols, network, image_store, command=None):
- if not command:
- command = ["/bin/sh", "-c",
- "cp /tmp/minifi_config/config.yml ./conf/ && ./bin/minifi.sh start && sleep 10 && ./bin/minifi.sh stop && sleep 100"]
- super().__init__(config_dir, name, vols, network, image_store, command)
+ super().__init__(config_dir, name, vols, network, image_store, command, engine='minifi-cpp-with-provenance-repo')
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index 2cdffd5..a72e972 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -21,6 +21,7 @@ import uuid
from .Cluster import Cluster
from .MinifiContainer import MinifiContainer
from .TransientMinifiContainer import TransientMinifiContainer
+from .MinifiWithProvenanceRepoContainer import MinifiWithProvenanceRepoContainer
from .NifiContainer import NifiContainer
from .ZookeeperContainer import ZookeeperContainer
from .KafkaBrokerContainer import KafkaBrokerContainer
@@ -90,6 +91,8 @@ class SingleNodeDockerCluster(Cluster):
return self.containers.setdefault(name, MinifiAsPodInKubernetesCluster(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
elif engine == 'transient-minifi':
return self.containers.setdefault(name, TransientMinifiContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
+ elif engine == 'minifi-cpp-with-provenance-repo':
+ return self.containers.setdefault(name, MinifiWithProvenanceRepoContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
elif engine == 'kafka-broker':
if 'zookeeper' not in self.containers:
self.containers.setdefault('zookeeper', ZookeeperContainer('zookeeper', self.vols, self.network, self.image_store, command))
diff --git a/docker/test/integration/minifi/core/TransientMinifiContainer.py b/docker/test/integration/minifi/core/TransientMinifiContainer.py
index 9c93a08..eb8fce4 100644
--- a/docker/test/integration/minifi/core/TransientMinifiContainer.py
+++ b/docker/test/integration/minifi/core/TransientMinifiContainer.py
@@ -1,5 +1,3 @@
-# @file TransientMinifiContainer.py
-#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
diff --git a/docker/test/integration/resources/minifi_cpp_with_provenance_repo/minifi.properties b/docker/test/integration/resources/minifi_cpp_with_provenance_repo/minifi.properties
new file mode 100644
index 0000000..9ef59f3
--- /dev/null
+++ b/docker/test/integration/resources/minifi_cpp_with_provenance_repo/minifi.properties
@@ -0,0 +1,32 @@
+nifi.version=0.12.0
+nifi.flow.configuration.file=./conf/config.yml
+nifi.administrative.yield.duration=30 sec
+nifi.bored.yield.duration=10 millis
+nifi.extension.path=../extensions/*
+nifi.provenance.repository.directory.default=${MINIFI_HOME}/provenance_repository
+nifi.provenance.repository.max.storage.time=1 MIN
+nifi.provenance.repository.max.storage.size=1 MB
+nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository
+nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository
+nifi.remote.input.secure=false
+nifi.security.need.ClientAuth=false
+nifi.security.client.certificate=/tmp/shared/identity.pem
+nifi.security.client.private.key=/tmp/shared/identity.pem
+nifi.security.client.ca.certificate=/tmp/shared/nifi-cert.pem
+nifi.c2.enable=true
+nifi.c2.agent.class=minifi-cpp-latest
+nifi.c2.agent.listen=false
+nifi.c2.agent.heartbeat.period=30000
+nifi.c2.agent.heartbeat.reporter.classes=RESTReceiver
+nifi.c2.agent.protocol.class=RESTSender
+nifi.c2.full.heartbeat=false
+nifi.c2.rest.url=http://local-cem-efm:10091/efm/api/c2-protocol/heartbeat
+nifi.c2.rest.url.ack=http://local-cem-efm:10091/efm/api/c2-protocol/acknowledge
+nifi.c2.rest.listener.port=8765
+nifi.c2.rest.listener.heartbeat.rooturi=/path
+nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,RepositoryMetrics
+nifi.c2.root.class.definitions.DeviceInfo.name=deviceinfo
+nifi.c2.root.class.definitions.DeviceInfo.classes=DeviceInfoNode
+nifi.flow.metrics.class.definitions=15
+nifi.flow.engine.threads=10
+nifi.remote.input.http.enabled=true
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 870af59..31dbf5f 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -731,6 +731,11 @@ def step_impl(context, log_pattern):
context.test.wait_for_container_logs('mqtt-broker', log_pattern, 30, count=1)
+@then("the \"{minifi_container_name}\" flow has a log line matching \"{log_pattern}\" in less than {duration}")
+def step_impl(context, minifi_container_name, log_pattern, duration):
+ context.test.wait_for_container_logs(minifi_container_name, log_pattern, timeparse(duration), count=1)
+
+
@then("an MQTT broker is deployed in correspondence with the PublishMQTT")
def step_impl(context):
context.test.acquire_container("mqtt-broker", "mqtt-broker")
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h
index 068d14c..dcfc39f 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.h
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -42,7 +42,7 @@ namespace provenance {
constexpr auto MAX_PROVENANCE_ENTRY_LIFE_TIME = std::chrono::minutes(1);
constexpr auto PROVENANCE_PURGE_PERIOD = std::chrono::milliseconds(2500);
-class ProvenanceRepository : public core::Repository, public std::enable_shared_from_this<ProvenanceRepository> {
+class ProvenanceRepository : public core::Repository {
public:
ProvenanceRepository(const std::string& name, const utils::Identifier& /*uuid*/)
: ProvenanceRepository(name) {
@@ -72,7 +72,7 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_
if (running_)
return;
running_ = true;
- thread_ = std::thread(&ProvenanceRepository::run, shared_from_this());
+ thread_ = std::thread(&ProvenanceRepository::run, this);
logger_->log_debug("%s Repository Monitor Thread Start", name_);
}
diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h
index f5ef08c..67f48bc 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -51,9 +51,7 @@ namespace repository {
* Design: Extends Repository and implements the run function, using RocksDB as the primary substrate.
*/
template<typename T>
-class VolatileRepository : public core::Repository, public utils::EnableSharedFromThis<VolatileRepository<T>> {
- using utils::EnableSharedFromThis<VolatileRepository<T>>::sharedFromThis;
-
+class VolatileRepository : public core::Repository {
public:
static const char *volatile_repo_max_count;
static const char *volatile_repo_max_bytes;
@@ -397,7 +395,7 @@ void VolatileRepository<T>::start() {
if (running_)
return;
running_ = true;
- thread_ = std::thread(&VolatileRepository<T>::run, sharedFromThis());
+ thread_ = std::thread(&VolatileRepository<T>::run, this);
logger_->log_debug("%s Repository Monitor Thread Start", name_);
}
#if defined(__clang__)