You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2023/01/23 18:34:02 UTC

[nifi-minifi-cpp] 01/03: MINIFICPP-1965 Add CMAKE flags to select malloc implementation

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 78a5c85b8499b055489f9e5c4ab41101b72ed293
Author: Martin Zink <ma...@apache.org>
AuthorDate: Sun Oct 23 21:13:38 2022 +0200

    MINIFICPP-1965 Add CMAKE flags to select malloc implementation
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    This closes #1456
---
 CMakeLists.txt                                     | 24 ++++++--
 cmake/MiMalloc.cmake                               | 25 +++++++++
 cmake/MiNiFiOptions.cmake                          |  9 +++
 cmake/RpMalloc.cmake                               | 32 +++++++++++
 docker/requirements.txt                            |  2 +-
 .../integration/MiNiFi_integration_test_driver.py  | 14 ++++-
 .../features/core_functionality.feature            | 10 ++++
 .../integration/minifi/core/DockerTestCluster.py   | 29 +++++++++-
 docker/test/integration/minifi/core/utils.py       | 24 ++++++++
 docker/test/integration/steps/steps.py             | 64 ++++++++++++++--------
 minifi_main/CMakeLists.txt                         |  5 +-
 11 files changed, 204 insertions(+), 34 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 31d1a3d7c..b7166f2c8 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -200,14 +200,26 @@ set(PASSTHROUGH_CMAKE_ARGS -DANDROID_ABI=${ANDROID_ABI}
     -G${CMAKE_GENERATOR}
     )
 
-# jemalloc
+if(CUSTOM_MALLOC)
+    if (CUSTOM_MALLOC STREQUAL jemalloc)
+        include(BundledJemalloc)
+        use_bundled_jemalloc(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
+        set(CUSTOM_MALLOC_LIB JeMalloc::JeMalloc)
+    elseif (CUSTOM_MALLOC STREQUAL mimalloc)
+        include(MiMalloc)
+        set(CUSTOM_MALLOC_LIB mimalloc)
+    elseif (CUSTOM_MALLOC STREQUAL rpmalloc)
+        include(RpMalloc)
+        set(CUSTOM_MALLOC_LIB rpmalloc)
+    else()
+        message(FATAL_ERROR "Invalid CUSTOM_MALLOC")
+    endif()
+else()
+    message(VERBOSE "No custom malloc implementation")
+endif()
+
 if(NOT WIN32)
     if (ENABLE_JNI)
-        if (NOT DISABLE_JEMALLOC)
-            include(BundledJemalloc)
-            use_bundled_jemalloc(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
-            message("jemalloc found at ${JEMALLOC_LIBRARIES}")
-        endif()
         set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_JNI")
         set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_JNI")
     endif()
diff --git a/cmake/MiMalloc.cmake b/cmake/MiMalloc.cmake
new file mode 100644
index 000000000..ec12daa0d
--- /dev/null
+++ b/cmake/MiMalloc.cmake
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+include(FetchContent)
+FetchContent_Declare(
+        mimalloc
+        URL      https://github.com/microsoft/mimalloc/archive/refs/tags/v2.0.6.tar.gz
+        URL_HASH SHA256=9f05c94cc2b017ed13698834ac2a3567b6339a8bde27640df5a1581d49d05ce5
+)
+FetchContent_MakeAvailable(mimalloc)
diff --git a/cmake/MiNiFiOptions.cmake b/cmake/MiNiFiOptions.cmake
index cfca8b213..b04e3157f 100644
--- a/cmake/MiNiFiOptions.cmake
+++ b/cmake/MiNiFiOptions.cmake
@@ -28,6 +28,12 @@ function(add_minifi_dependent_option OPTION_NAME OPTION_DESCRIPTION OPTION_VALUE
     set(MINIFI_OPTIONS ${MINIFI_OPTIONS} PARENT_SCOPE)
 endfunction()
 
+function(set_minifi_cache_variable VARIABLE_NAME VARIABLE_VALUE DOCSTRING)
+    set(${VARIABLE_NAME} ${VARIABLE_VALUE} CACHE STRING ${DOCSTRING})
+    list(APPEND MINIFI_OPTIONS ${VARIABLE_NAME})
+    set(MINIFI_OPTIONS ${MINIFI_OPTIONS} PARENT_SCOPE)
+endfunction()
+
 add_minifi_option(CI_BUILD "Build is used for CI." OFF)
 add_minifi_option(SKIP_TESTS "Skips building all tests." OFF)
 add_minifi_option(DOCKER_BUILD_ONLY "Disables all targets except docker build scripts. Ideal for systems without an up-to-date compiler." OFF)
@@ -117,6 +123,9 @@ add_minifi_option(ENABLE_TEST_PROCESSORS "Enables test processors" OFF)
 add_minifi_option(ENABLE_PROMETHEUS "Enables Prometheus support." OFF)
 add_minifi_option(DISABLE_JEMALLOC "Disables jemalloc." OFF)
 
+set_minifi_cache_variable(CUSTOM_MALLOC OFF "Overwrite malloc implementation.")
+set_property(CACHE CUSTOM_MALLOC PROPERTY STRINGS "jemalloc" "mimalloc" "rpmalloc" OFF)
+
 if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
     add_minifi_option(ENABLE_PROCFS "Enables the procfs extension." ON)
 endif()
diff --git a/cmake/RpMalloc.cmake b/cmake/RpMalloc.cmake
new file mode 100644
index 000000000..17ab3166e
--- /dev/null
+++ b/cmake/RpMalloc.cmake
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+include(FetchContent)
+FetchContent_Declare(
+        rpmalloc
+        URL      https://github.com/mjansson/rpmalloc/archive/refs/tags/1.4.4.tar.gz
+        URL_HASH SHA256=3859620c03e6473f0b3f16a4e965e7c049594253f70e8370fb9caa0e4118accb
+)
+FetchContent_GetProperties(rpmalloc)
+
+if(NOT rpmalloc_POPULATED)
+    FetchContent_Populate(rpmalloc)
+    add_library(rpmalloc ${rpmalloc_SOURCE_DIR}/rpmalloc/rpmalloc.c)
+    target_include_directories(rpmalloc PUBLIC ${rpmalloc_SOURCE_DIR}/rpmalloc)
+    target_compile_definitions(rpmalloc PRIVATE ENABLE_OVERRIDE=1 ENABLE_PRELOAD=1)
+endif()
diff --git a/docker/requirements.txt b/docker/requirements.txt
index 2dda64d12..8efcc2861 100644
--- a/docker/requirements.txt
+++ b/docker/requirements.txt
@@ -1,5 +1,4 @@
 behave==1.2.6
-pytimeparse==1.1.8
 docker==5.0.0
 kafka-python==2.0.2
 confluent-kafka==1.7.0
@@ -9,3 +8,4 @@ watchdog==2.1.2
 pyopenssl==23.0.0
 azure-storage-blob==12.9.0
 prometheus-api-client==0.5.0
+humanfriendly==10.0
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index 328a7b60b..f16a29d32 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -33,7 +33,7 @@ from minifi.validators.NoContentCheckFileNumberValidator import NoContentCheckFi
 from minifi.validators.NumFileRangeValidator import NumFileRangeValidator
 from minifi.validators.SingleJSONFileOutputValidator import SingleJSONFileOutputValidator
 
-from minifi.core.utils import decode_escaped_str
+from minifi.core.utils import decode_escaped_str, get_minifi_pid, get_peak_memory_usage
 
 
 class MiNiFi_integration_test:
@@ -311,3 +311,15 @@ class MiNiFi_integration_test:
 
     def check_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name):
         assert self.cluster.wait_for_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name)
+
+    def check_if_peak_memory_usage_exceeded(self, minimum_peak_memory_usage: int, timeout_seconds: int) -> None:
+        assert self.cluster.wait_for_peak_memory_usage_to_exceed(minimum_peak_memory_usage, timeout_seconds)
+
+    def check_if_memory_usage_is_below(self, maximum_memory_usage: int, timeout_seconds: int) -> None:
+        assert self.cluster.wait_for_memory_usage_to_drop_below(maximum_memory_usage, timeout_seconds)
+
+    def check_memory_usage_compared_to_peak(self, peak_multiplier: float, timeout_seconds: int) -> None:
+        peak_memory = get_peak_memory_usage(get_minifi_pid())
+        assert (peak_memory is not None)
+        assert (1.0 > peak_multiplier > 0.0)
+        assert self.cluster.wait_for_memory_usage_to_drop_below(peak_memory * peak_multiplier, timeout_seconds)
diff --git a/docker/test/integration/features/core_functionality.feature b/docker/test/integration/features/core_functionality.feature
index 3f3330e99..ce5ce667f 100644
--- a/docker/test/integration/features/core_functionality.feature
+++ b/docker/test/integration/features/core_functionality.feature
@@ -40,3 +40,13 @@ Feature: Core flow functionalities
     Given a GenerateFlowFile processor with the name "generateFlowFile" in the "minifi-cpp-with-provenance-repo" flow with engine "minifi-cpp-with-provenance-repo"
     When the MiNiFi instance starts up
     Then the "minifi-cpp-with-provenance-repo" flow has a log line matching "MiNiFi started" in less than 30 seconds
+
+  Scenario: Memory usage returns after peak usage
+    Given a GenerateFlowFile processor with the "Batch Size" property set to "50000"
+    And the "Data Format" property of the GenerateFlowFile processor is set to "Text"
+    And the scheduling period of the GenerateFlowFile processor is set to "1 hours"
+    And the "Unique FlowFiles" property of the GenerateFlowFile processor is set to "false"
+    And the "Custom Text" property of the GenerateFlowFile processor is set to "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur tellus quam, sagittis quis ante ac, finibus ornare lectus. Morbi libero mauris, mollis sed mi at."
+    When all instances start up
+    Then the peak memory usage of the agent is more than 130 MB in less than 20 seconds
+    And the memory usage of the agent decreases to 70% peak usage in less than 20 seconds
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
index e89895dd0..0d03f8037 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -25,7 +25,7 @@ import tempfile
 from .LogSource import LogSource
 from .SingleNodeDockerCluster import SingleNodeDockerCluster
 from .PrometheusChecker import PrometheusChecker
-from .utils import retry_check
+from .utils import retry_check, get_peak_memory_usage, get_minifi_pid, get_memory_usage
 from azure.storage.blob import BlobServiceClient
 from azure.core.exceptions import ResourceExistsError
 
@@ -318,3 +318,30 @@ class DockerTestCluster(SingleNodeDockerCluster):
 
     def wait_for_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name):
         return PrometheusChecker().wait_for_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name)
+
+    def wait_for_peak_memory_usage_to_exceed(self, minimum_peak_memory_usage: int, timeout_seconds: int) -> bool:
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            current_peak_memory_usage = get_peak_memory_usage(get_minifi_pid())
+            if current_peak_memory_usage is None:
+                logging.warning("Failed to determine peak memory usage")
+                return False
+            if current_peak_memory_usage > minimum_peak_memory_usage:
+                return True
+            time.sleep(1)
+        logging.warning(f"Peak memory usage ({current_peak_memory_usage}) didnt exceed minimum asserted peak memory usage {minimum_peak_memory_usage}")
+        return False
+
+    def wait_for_memory_usage_to_drop_below(self, max_memory_usage: int, timeout_seconds: int) -> bool:
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            current_memory_usage = get_memory_usage(get_minifi_pid())
+            if current_memory_usage is None:
+                logging.warning("Failed to determine memory usage")
+                return False
+            if current_memory_usage < max_memory_usage:
+                return True
+            current_memory_usage = get_memory_usage(get_minifi_pid())
+            time.sleep(1)
+        logging.warning(f"Memory usage ({current_memory_usage}) is more than the maximum asserted memory usage ({max_memory_usage})")
+        return False
diff --git a/docker/test/integration/minifi/core/utils.py b/docker/test/integration/minifi/core/utils.py
index f559a2b18..c0a5d34c1 100644
--- a/docker/test/integration/minifi/core/utils.py
+++ b/docker/test/integration/minifi/core/utils.py
@@ -17,6 +17,8 @@
 import time
 import functools
 import os
+import subprocess
+from typing import Optional
 
 
 def retry_check(max_tries=5, retry_interval=1):
@@ -57,3 +59,25 @@ def decode_escaped_str(str):
 
 def is_temporary_output_file(filepath):
     return filepath.split(os.path.sep)[-1][0] == '.'
+
+
+def get_minifi_pid() -> int:
+    return int(subprocess.run(["pidof", "-s", "minifi"], capture_output=True).stdout)
+
+
+def get_peak_memory_usage(pid: int) -> Optional[int]:
+    with open("/proc/" + str(pid) + "/status") as stat_file:
+        for line in stat_file:
+            if "VmHWM" in line:
+                peak_resident_set_size = [int(s) for s in line.split() if s.isdigit()].pop()
+                return peak_resident_set_size * 1024
+    return None
+
+
+def get_memory_usage(pid: int) -> Optional[int]:
+    with open("/proc/" + str(pid) + "/status") as stat_file:
+        for line in stat_file:
+            if "VmRSS" in line:
+                resident_set_size = [int(s) for s in line.split() if s.isdigit()].pop()
+                return resident_set_size * 1024
+    return None
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 6ae9b5904..52b80bcb0 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -28,12 +28,12 @@ from minifi.controllers.KubernetesControllerService import KubernetesControllerS
 from behave import given, then, when
 from behave.model_describe import ModelDescriptor
 from pydoc import locate
-from pytimeparse.timeparse import timeparse
 
 import logging
 import time
 import uuid
 import binascii
+import humanfriendly
 
 from kafka import KafkaProducer
 from confluent_kafka.admin import AdminClient, NewTopic
@@ -67,8 +67,10 @@ def step_impl(context, processor_type, processor_name, property_name, property_v
     __create_processor(context, processor_type, processor_name, property_name, property_value, minifi_container_name)
 
 
-@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow with engine \"{engine_name}\"")
-@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow with engine \"{engine_name}\"")
+@given(
+    "a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow with engine \"{engine_name}\"")
+@given(
+    "a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow with engine \"{engine_name}\"")
 def step_impl(context, processor_type, processor_name, property_name, property_value, minifi_container_name, engine_name):
     __create_processor(context, processor_type, processor_name, property_name, property_value, minifi_container_name, engine_name)
 
@@ -195,10 +197,10 @@ def step_impl(context, processor_name, max_concurrent_tasks):
 @given("the \"{property_name}\" property of the {processor_name} processor is set to match {key_attribute_encoding} encoded kafka message key \"{message_key}\"")
 def step_impl(context, property_name, processor_name, key_attribute_encoding, message_key):
     encoded_key = ""
-    if(key_attribute_encoding.lower() == "hex"):
+    if (key_attribute_encoding.lower() == "hex"):
         # Hex is presented upper-case to be in sync with NiFi
         encoded_key = binascii.hexlify(message_key.encode("utf-8")).upper()
-    elif(key_attribute_encoding.lower() == "(not set)"):
+    elif (key_attribute_encoding.lower() == "(not set)"):
         encoded_key = message_key.encode("utf-8")
     else:
         encoded_key = message_key.encode(key_attribute_encoding)
@@ -725,19 +727,19 @@ def step_impl(context):
 @then("a flowfile with the content '{content}' is placed in the monitored directory in less than {duration}")
 @then("{number_of_flow_files:d} flowfiles with the content \"{content}\" are placed in the monitored directory in less than {duration}")
 def step_impl(context, content, duration, number_of_flow_files=1):
-    context.test.check_for_multiple_files_generated(number_of_flow_files, timeparse(duration), [content])
+    context.test.check_for_multiple_files_generated(number_of_flow_files, humanfriendly.parse_timespan(duration), [content])
 
 
 @then("a flowfile with the JSON content \"{content}\" is placed in the monitored directory in less than {duration}")
 @then("a flowfile with the JSON content '{content}' is placed in the monitored directory in less than {duration}")
 def step_impl(context, content, duration):
-    context.test.check_for_single_json_file_with_content_generated(content, timeparse(duration))
+    context.test.check_for_single_json_file_with_content_generated(content, humanfriendly.parse_timespan(duration))
 
 
 @then("at least one flowfile with the content \"{content}\" is placed in the monitored directory in less than {duration}")
 @then("at least one flowfile with the content '{content}' is placed in the monitored directory in less than {duration}")
 def step_impl(context, content, duration):
-    context.test.check_for_at_least_one_file_with_content_generated(content, timeparse(duration))
+    context.test.check_for_at_least_one_file_with_content_generated(content, humanfriendly.parse_timespan(duration))
 
 
 @then("{num_flowfiles} flowfiles are placed in the monitored directory in less than {duration}")
@@ -745,49 +747,49 @@ def step_impl(context, num_flowfiles, duration):
     if num_flowfiles == 0:
         context.execute_steps("""no files are placed in the monitored directory in {duration} of running time""".format(duration=duration))
         return
-    context.test.check_for_num_files_generated(int(num_flowfiles), timeparse(duration))
+    context.test.check_for_num_files_generated(int(num_flowfiles), humanfriendly.parse_timespan(duration))
 
 
 @then("at least one flowfile is placed in the monitored directory in less than {duration}")
 def step_impl(context, duration):
-    context.test.check_for_num_file_range_generated(1, float('inf'), timeparse(duration))
+    context.test.check_for_num_file_range_generated(1, float('inf'), humanfriendly.parse_timespan(duration))
 
 
 @then("one flowfile with the contents \"{content}\" is placed in the monitored directory in less than {duration}")
 def step_impl(context, content, duration):
-    context.test.check_for_multiple_files_generated(1, timeparse(duration), [content])
+    context.test.check_for_multiple_files_generated(1, humanfriendly.parse_timespan(duration), [content])
 
 
 @then("two flowfiles with the contents \"{content_1}\" and \"{content_2}\" are placed in the monitored directory in less than {duration}")
 def step_impl(context, content_1, content_2, duration):
-    context.test.check_for_multiple_files_generated(2, timeparse(duration), [content_1, content_2])
+    context.test.check_for_multiple_files_generated(2, humanfriendly.parse_timespan(duration), [content_1, content_2])
 
 
 @then("flowfiles with these contents are placed in the monitored directory in less than {duration}: \"{contents}\"")
 def step_impl(context, duration, contents):
     contents_arr = contents.split(",")
-    context.test.check_for_multiple_files_generated(0, timeparse(duration), contents_arr)
+    context.test.check_for_multiple_files_generated(0, humanfriendly.parse_timespan(duration), contents_arr)
 
 
 @then("after a wait of {duration}, at least {lower_bound:d} and at most {upper_bound:d} flowfiles are produced and placed in the monitored directory")
 def step_impl(context, lower_bound, upper_bound, duration):
-    context.test.check_for_num_file_range_generated(lower_bound, upper_bound, timeparse(duration))
+    context.test.check_for_num_file_range_generated(lower_bound, upper_bound, humanfriendly.parse_timespan(duration))
 
 
 @then("{number_of_files:d} flowfiles are placed in the monitored directory in {duration}")
 @then("{number_of_files:d} flowfile is placed in the monitored directory in {duration}")
 def step_impl(context, number_of_files, duration):
-    context.test.check_for_multiple_files_generated(number_of_files, timeparse(duration))
+    context.test.check_for_multiple_files_generated(number_of_files, humanfriendly.parse_timespan(duration))
 
 
 @then("at least one empty flowfile is placed in the monitored directory in less than {duration}")
 def step_impl(context, duration):
-    context.test.check_for_an_empty_file_generated(timeparse(duration))
+    context.test.check_for_an_empty_file_generated(humanfriendly.parse_timespan(duration))
 
 
 @then("no files are placed in the monitored directory in {duration} of running time")
 def step_impl(context, duration):
-    context.test.check_for_no_files_generated(timeparse(duration))
+    context.test.check_for_no_files_generated(humanfriendly.parse_timespan(duration))
 
 
 @then("no errors were generated on the http-proxy regarding \"{url}\"")
@@ -854,7 +856,7 @@ def step_impl(context, query, number_of_rows, timeout_seconds):
 
 @then("the Minifi logs contain the following message: \"{log_message}\" in less than {duration}")
 def step_impl(context, log_message, duration):
-    context.test.check_minifi_log_contents(log_message, timeparse(duration))
+    context.test.check_minifi_log_contents(log_message, humanfriendly.parse_timespan(duration))
 
 
 @then("the Minifi logs contain the following message: \"{log_message}\" {count:d} times after {seconds:d} seconds")
@@ -870,12 +872,12 @@ def step_impl(context, log_message, seconds):
 
 @then("the Minifi logs match the following regex: \"{regex}\" in less than {duration}")
 def step_impl(context, regex, duration):
-    context.test.check_minifi_log_matches_regex(regex, timeparse(duration))
+    context.test.check_minifi_log_matches_regex(regex, humanfriendly.parse_timespan(duration))
 
 
 @then("the OPC UA server logs contain the following message: \"{log_message}\" in less than {duration}")
 def step_impl(context, log_message, duration):
-    context.test.check_container_log_contents("opcua-server", log_message, timeparse(duration))
+    context.test.check_container_log_contents("opcua-server", log_message, humanfriendly.parse_timespan(duration))
 
 
 # MQTT
@@ -891,7 +893,7 @@ def step_impl(context, log_count, log_pattern):
 
 @then("the \"{minifi_container_name}\" flow has a log line matching \"{log_pattern}\" in less than {duration}")
 def step_impl(context, minifi_container_name, log_pattern, duration):
-    context.test.check_container_log_matches_regex(minifi_container_name, log_pattern, timeparse(duration), count=1)
+    context.test.check_container_log_matches_regex(minifi_container_name, log_pattern, humanfriendly.parse_timespan(duration), count=1)
 
 
 @then("an MQTT broker is deployed in correspondence with the PublishMQTT")
@@ -981,14 +983,30 @@ def step_impl(context):
 
 @then("the MiNiFi C2 server logs contain the following message: \"{log_message}\" in less than {duration}")
 def step_impl(context, log_message, duration):
-    context.test.check_container_log_contents("minifi-c2-server", log_message, timeparse(duration))
+    context.test.check_container_log_contents("minifi-c2-server", log_message, humanfriendly.parse_timespan(duration))
 
 
 @then("the MiNiFi C2 SSL server logs contain the following message: \"{log_message}\" in less than {duration}")
 def step_impl(context, log_message, duration):
-    context.test.check_container_log_contents("minifi-c2-server-ssl", log_message, timeparse(duration))
+    context.test.check_container_log_contents("minifi-c2-server-ssl", log_message, humanfriendly.parse_timespan(duration))
 
 
 @given(u'a MiNiFi C2 server is set up with SSL')
 def step_impl(context):
     context.test.acquire_container("minifi-c2-server", "minifi-c2-server-ssl")
+
+
+# MiNiFi memory usage
+@then(u'the peak memory usage of the agent is more than {size} in less than {duration}')
+def step_impl(context, size: str, duration: str) -> None:
+    context.test.check_if_peak_memory_usage_exceeded(humanfriendly.parse_size(size), humanfriendly.parse_timespan(duration))
+
+
+@then(u'the memory usage of the agent is less than {size} in less than {duration}')
+def step_impl(context, size: str, duration: str) -> None:
+    context.test.check_if_memory_usage_is_below(humanfriendly.parse_size(size), humanfriendly.parse_timespan(duration))
+
+
+@then(u'the memory usage of the agent decreases to {peak_usage_percent}% peak usage in less than {duration}')
+def step_impl(context, peak_usage_percent: str, duration: str) -> None:
+    context.test.check_memory_usage_compared_to_peak(float(peak_usage_percent) * 0.01, humanfriendly.parse_timespan(duration))
diff --git a/minifi_main/CMakeLists.txt b/minifi_main/CMakeLists.txt
index ea7c9e381..beb2be852 100644
--- a/minifi_main/CMakeLists.txt
+++ b/minifi_main/CMakeLists.txt
@@ -52,8 +52,9 @@ endif(NOT USE_SHARED_LIBS)
 target_link_libraries(minifiexe Threads::Threads)
 
 target_link_libraries(minifiexe yaml-cpp)
-if(NOT WIN32 AND ENABLE_JNI AND NOT DISABLE_JEMALLOC)
-    target_link_libraries(minifiexe JeMalloc::JeMalloc)
+if(CUSTOM_MALLOC_LIB)
+    message(VERBOSE "Using custom malloc lib ${CUSTOM_MALLOC_LIB} for minifiexe")
+    target_link_libraries(minifiexe ${CUSTOM_MALLOC_LIB})
 endif()
 
 if (WIN32)