You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/06/23 16:55:48 UTC

[nifi-minifi-cpp] branch main updated (9173029 -> 31fd099)

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git.


    from 9173029  MINIFICPP-1426 MSVC: Use /std:c++latest /permissive- and drop VS 2017
     new a09d158  MINIFICPP-1414 Create in-memory compressed logs
     new 88e7429  MINIFICPP-1576 Allow build of all extensions in docker and cleanup docker files
     new e6ecb2e  MINIFICPP-1580 Fix Tensorflow extension build
     new 89f1713  MINIFICPP-1567 enable linter checks in extensions (part 3)
     new dd42a04  MINIFICPP-1595 Pin pip package versions in requirements.txt
     new 31fd099  MINIFICPP-1494 Allow InvokeHTTP GET requests without incoming flowfile

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .dockerignore                                      |  64 +++++-
 .github/workflows/ci.yml                           |   2 +-
 .gitignore                                         |   7 -
 CMakeLists.txt                                     |  20 +-
 PROCESSORS.md                                      |   6 +-
 cmake/DockerConfig.cmake                           |  18 +-
 cmake/FindTensorFlow.cmake                         |  55 ++++--
 conf/minifi-log.properties                         |   9 +
 docker/DockerBuild.sh                              |  16 +-
 docker/DockerVerify.sh                             |   8 +-
 docker/Dockerfile                                  | 208 +++++++++-----------
 docker/requirements.txt                            |   6 +
 extensions/http-curl/processors/InvokeHTTP.cpp     |   7 +-
 extensions/http-curl/processors/InvokeHTTP.h       |   1 +
 extensions/http-curl/tests/CMakeLists.txt          |   8 +-
 extensions/http-curl/tests/HTTPHandlers.h          |  22 +++
 .../http-curl/tests/HttpGetIntegrationTest.cpp     | 139 -------------
 extensions/http-curl/tests/VerifyInvokeHTTP.h      | 131 +++++++++++++
 .../http-curl/tests/VerifyInvokeHTTPGetTest.cpp    |  63 ++++++
 ...keHTTPTest.cpp => VerifyInvokeHTTPPostTest.cpp} | 121 +-----------
 extensions/libarchive/ArchiveLoader.h              |   8 +-
 extensions/libarchive/ArchiveMetadata.cpp          | 215 ++++++++++-----------
 extensions/libarchive/ArchiveMetadata.h            | 100 +++++-----
 extensions/libarchive/ArchiveTests.h               |   7 +-
 extensions/libarchive/BinFiles.cpp                 |   3 +-
 extensions/libarchive/BinFiles.h                   |  14 +-
 extensions/libarchive/CMakeLists.txt               |   4 +-
 extensions/libarchive/CompressContent.cpp          |   3 -
 extensions/libarchive/CompressContent.h            |  23 +--
 extensions/libarchive/FocusArchiveEntry.cpp        |   6 +-
 extensions/libarchive/FocusArchiveEntry.h          |   8 +-
 extensions/libarchive/ManipulateArchive.cpp        |  11 +-
 extensions/libarchive/ManipulateArchive.h          |  17 +-
 extensions/libarchive/MergeContent.cpp             |   9 +-
 extensions/libarchive/MergeContent.h               |  13 +-
 extensions/libarchive/UnfocusArchiveEntry.cpp      |   6 +-
 extensions/libarchive/UnfocusArchiveEntry.h        |   7 +-
 extensions/mqtt/CMakeLists.txt                     |   4 +-
 extensions/mqtt/MQTTLoader.h                       |   8 +-
 .../controllerservice/MQTTControllerService.cpp    |   2 +-
 .../mqtt/controllerservice/MQTTControllerService.h |  21 +-
 extensions/mqtt/processors/AbstractMQTTProcessor.h |  16 +-
 extensions/mqtt/processors/ConsumeMQTT.h           |   8 +-
 extensions/mqtt/processors/ConvertBase.h           |  11 +-
 extensions/mqtt/processors/ConvertHeartBeat.cpp    |   2 +
 extensions/mqtt/processors/ConvertHeartBeat.h      |  21 +-
 extensions/mqtt/processors/ConvertJSONAck.h        |  17 +-
 extensions/mqtt/processors/ConvertUpdate.cpp       |   6 +
 extensions/mqtt/processors/ConvertUpdate.h         |  20 +-
 extensions/mqtt/processors/PublishMQTT.h           |   9 +-
 extensions/mqtt/protocol/MQTTC2Protocol.cpp        |   3 +-
 extensions/mqtt/protocol/MQTTC2Protocol.h          |   6 +-
 extensions/opc/CMakeLists.txt                      |   1 +
 extensions/opc/include/fetchopc.h                  |  20 +-
 extensions/opc/include/opc.h                       |  19 +-
 extensions/opc/include/opcbase.h                   |  10 +-
 extensions/opc/include/putopc.h                    |  20 +-
 extensions/opc/src/fetchopc.cpp                    |   4 +-
 extensions/opc/src/opc.cpp                         | 148 +++++++-------
 extensions/opencv/CMakeLists.txt                   |   3 +-
 extensions/opencv/CaptureRTSPFrame.cpp             |  20 +-
 extensions/opencv/CaptureRTSPFrame.h               |  16 +-
 extensions/opencv/FrameIO.h                        |  73 +++----
 extensions/opencv/MotionDetector.cpp               |   5 +
 extensions/opencv/MotionDetector.h                 |  26 +--
 extensions/opencv/OpenCVLoader.h                   |  10 +-
 extensions/opencv/tests/CaptureRTSPFrameTest.cpp   |   2 +-
 extensions/openwsman/CMakeLists.txt                |   1 +
 .../SourceInitiatedSubscriptionListener.h          |   5 +-
 extensions/pcap/CMakeLists.txt                     |   4 +-
 extensions/pcap/CapturePacket.cpp                  |  13 +-
 extensions/pcap/CapturePacket.h                    |  15 +-
 extensions/pcap/PcapLoader.h                       |   8 +-
 extensions/tensorflow/CMakeLists.txt               |   3 +-
 extensions/tensorflow/TFApplyGraph.cpp             |  11 +-
 extensions/tensorflow/TFApplyGraph.h               |   2 +-
 extensions/tensorflow/TFConvertImageToTensor.cpp   |   6 +-
 extensions/tensorflow/TFConvertImageToTensor.h     |   2 +-
 extensions/tensorflow/TFExtractTopLabels.cpp       |  11 +-
 extensions/tensorflow/TFExtractTopLabels.h         |   2 +-
 libminifi/CMakeLists.txt                           |   2 +-
 libminifi/include/core/TypedValues.h               |  88 +++------
 .../include/core/logging/LoggerConfiguration.h     |  53 +++--
 .../{WindowsEventLogSink.h => LoggerProperties.h}  |  57 +++---
 .../ActiveCompressor.h}                            |  56 +++---
 .../core/logging/internal/CompressionManager.h     |  84 ++++++++
 .../include/core/logging/internal/LogBuffer.h      |  34 +++-
 .../include/core/logging/internal/LogCompressor.h  |  23 ++-
 .../core/logging/internal/LogCompressorSink.h      |  92 +++++++++
 libminifi/include/io/BufferStream.h                |   8 +
 libminifi/include/io/ZlibStream.h                  |  13 +-
 libminifi/include/io/validation.h                  |   1 +
 libminifi/include/utils/Literals.h                 |  59 ++++++
 libminifi/include/utils/StagingQueue.h             | 178 +++++++++++++++++
 libminifi/include/utils/ValueParser.h              |   4 +
 libminifi/src/core/TypedValues.cpp                 |   9 +
 libminifi/src/core/logging/LoggerConfiguration.cpp |  55 ++++--
 .../core/logging/internal/CompressionManager.cpp   |  74 +++++++
 .../src/core/logging/internal/LogCompressor.cpp    |  24 ++-
 .../core/logging/internal/LogCompressorSink.cpp    |  81 ++++++++
 libminifi/src/io/ZlibStream.cpp                    |  39 +++-
 .../test/archive-tests/ManipulateArchiveTests.cpp  |   4 +-
 libminifi/test/resources/TestHTTPGet.yml           |  20 +-
 libminifi/test/resources/TestHTTPGetSecure.yml     |  19 +-
 .../test/resources/TestHTTPPostChunkedEncoding.yml |  14 +-
 libminifi/test/tensorflow-tests/CMakeLists.txt     |   2 +-
 .../test/tensorflow-tests/TensorFlowTests.cpp      |  10 +-
 libminifi/test/unit/LoggerTests.cpp                |  97 +++++++++-
 libminifi/test/unit/StagingQueueTests.cpp          | 121 ++++++++++++
 thirdparty/openwsman/openwsman.patch               |  39 ++++
 thirdparty/pcap++/Pcap++/src/PcapLiveDevice.cpp    |   9 +-
 111 files changed, 2124 insertions(+), 1224 deletions(-)
 create mode 100644 docker/requirements.txt
 delete mode 100644 extensions/http-curl/tests/HttpGetIntegrationTest.cpp
 create mode 100644 extensions/http-curl/tests/VerifyInvokeHTTP.h
 create mode 100644 extensions/http-curl/tests/VerifyInvokeHTTPGetTest.cpp
 rename extensions/http-curl/tests/{VerifyInvokeHTTPTest.cpp => VerifyInvokeHTTPPostTest.cpp} (52%)
 copy libminifi/include/core/logging/{WindowsEventLogSink.h => LoggerProperties.h} (53%)
 copy libminifi/include/core/logging/{WindowsEventLogSink.h => internal/ActiveCompressor.h} (55%)
 create mode 100644 libminifi/include/core/logging/internal/CompressionManager.h
 copy extensions/rocksdb-repos/database/ColumnHandle.h => libminifi/include/core/logging/internal/LogBuffer.h (62%)
 copy extensions/rocksdb-repos/database/ColumnHandle.h => libminifi/include/core/logging/internal/LogCompressor.h (73%)
 create mode 100644 libminifi/include/core/logging/internal/LogCompressorSink.h
 create mode 100644 libminifi/include/utils/Literals.h
 create mode 100644 libminifi/include/utils/StagingQueue.h
 create mode 100644 libminifi/src/core/logging/internal/CompressionManager.cpp
 copy extensions/rocksdb-repos/database/WriteBatch.cpp => libminifi/src/core/logging/internal/LogCompressor.cpp (64%)
 create mode 100644 libminifi/src/core/logging/internal/LogCompressorSink.cpp
 create mode 100644 libminifi/test/unit/StagingQueueTests.cpp

[nifi-minifi-cpp] 01/06: MINIFICPP-1414 Create in-memory compressed logs

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit a09d158f500e1e977d289c5b03fc0d11574f8340
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Wed Jun 23 17:29:32 2021 +0200

    MINIFICPP-1414 Create in-memory compressed logs
    
    Closes #955
    
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 conf/minifi-log.properties                         |   9 ++
 libminifi/CMakeLists.txt                           |   2 +-
 libminifi/include/core/TypedValues.h               |  88 ++++------
 .../include/core/logging/LoggerConfiguration.h     |  53 +++---
 libminifi/include/core/logging/LoggerProperties.h  |  69 ++++++++
 .../core/logging/internal/ActiveCompressor.h       |  72 +++++++++
 .../core/logging/internal/CompressionManager.h     |  84 ++++++++++
 .../core/logging/internal/LogBuffer.h}             |  46 ++++--
 .../core/logging/internal/LogCompressor.h}         |  35 ++--
 .../core/logging/internal/LogCompressorSink.h      |  92 +++++++++++
 libminifi/include/io/BufferStream.h                |   8 +
 libminifi/include/io/ZlibStream.h                  |  13 +-
 libminifi/include/utils/Literals.h                 |  59 +++++++
 libminifi/include/utils/StagingQueue.h             | 178 +++++++++++++++++++++
 libminifi/include/utils/ValueParser.h              |   4 +
 libminifi/src/core/TypedValues.cpp                 |   9 ++
 libminifi/src/core/logging/LoggerConfiguration.cpp |  55 +++++--
 .../core/logging/internal/CompressionManager.cpp   |  74 +++++++++
 .../internal/LogCompressor.cpp}                    |  31 ++--
 .../core/logging/internal/LogCompressorSink.cpp    |  81 ++++++++++
 libminifi/src/io/ZlibStream.cpp                    |  39 +++--
 libminifi/test/unit/LoggerTests.cpp                |  97 ++++++++++-
 libminifi/test/unit/StagingQueueTests.cpp          | 121 ++++++++++++++
 23 files changed, 1176 insertions(+), 143 deletions(-)

diff --git a/conf/minifi-log.properties b/conf/minifi-log.properties
index 250cb46..aeca6dd 100644
--- a/conf/minifi-log.properties
+++ b/conf/minifi-log.properties
@@ -45,3 +45,12 @@ logger.org::apache::nifi::minifi=INFO,rolling
 
 #Logging configurable by class fully qualified name
 #logger.org::apache::nifi::minifi::core::logging::LoggerConfiguration=DEBUG
+
+# Log compression #
+## Enables the agent to keep a limited chunk of the application
+## logs in memory in compressed format. Note that due to its
+## compressed nature this could mean more logs than the contents
+## of the log files.
+## Setting any of these to 0 disables the in-memory log compression.
+#compression.cached.log.max.size=8 MB
+#compression.compressed.log.max.size=8 MB
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index a9cce79..6550e7f 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -57,7 +57,7 @@ if (NOT OPENSSL_OFF)
 	set(TLS_SOURCES "src/utils/tls/*.cpp" "src/io/tls/*.cpp")
 endif()
 
-file(GLOB SOURCES "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp"  "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/serialization/*.cpp" "src/pro [...]
+file(GLOB SOURCES "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp" "src/core/logging/internal/*.cpp"  "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp"  [...]
 # manually add this as it might not yet be present when this executes
 list(APPEND SOURCES "src/agent/agent_version.cpp")
 
diff --git a/libminifi/include/core/TypedValues.h b/libminifi/include/core/TypedValues.h
index b3808ba..09050f1 100644
--- a/libminifi/include/core/TypedValues.h
+++ b/libminifi/include/core/TypedValues.h
@@ -21,12 +21,15 @@
 #include <algorithm>
 #include <string>
 #include <typeindex>
+#include <map>
+#include <memory>
 
 #include "state/Value.h"
 #include "utils/StringUtils.h"
 #include "utils/ValueParser.h"
 #include "utils/PropertyErrors.h"
 #include "utils/OptionalUtils.h"
+#include "utils/Literals.h"
 
 namespace org {
 namespace apache {
@@ -114,6 +117,8 @@ class TimePeriodValue : public TransformableValue, public state::response::UInt6
  * format <numeric> <byte size>.
  */
 class DataSizeValue : public TransformableValue, public state::response::UInt64Value {
+  static std::shared_ptr<logging::Logger>& getLogger();
+
  public:
   static const std::type_index type_id;
 
@@ -128,72 +133,41 @@ class DataSizeValue : public TransformableValue, public state::response::UInt64V
   }
 
 
-// Convert String to Integer
+  // Convert String to Integer
   template<typename T, typename std::enable_if<
       std::is_integral<T>::value>::type* = nullptr>
   static bool StringToInt(const std::string &input, T &output) {
-    if (input.size() == 0) {
+    // TODO(adebreceni): this mapping is to preserve backwards compatibility,
+    //  we should entertain the idea of moving to standardized units in
+    //  the configuration (i.e. K = 1000, Ki = 1024)
+    static std::map<std::string, int64_t> unit_map{
+      {"B", 1},
+      {"K", 1_KB}, {"M", 1_MB}, {"G", 1_GB}, {"T", 1_TB}, {"P", 1_PB},
+      {"KB", 1_KiB}, {"MB", 1_MiB}, {"GB", 1_GiB}, {"TB", 1_TiB}, {"PB", 1_PiB},
+    };
+
+    int64_t value;
+    std::string unit_str;
+    try {
+      unit_str = utils::StringUtils::trim(utils::internal::ValueParser(input).parse(value).rest());
+    } catch (const utils::internal::ParseException&) {
       return false;
     }
 
-    const char *cvalue = input.c_str();
-    char *pEnd;
-    auto ival = std::strtoll(cvalue, &pEnd, 0);
-
-    if (pEnd[0] == '\0') {
-      output = gsl::narrow<T>(ival);
-      return true;
-    }
-
-    while (*pEnd == ' ') {
-      // Skip the space
-      pEnd++;
-    }
-
-    char end0 = toupper(pEnd[0]);
-    if (end0 == 'B') {
-      output = gsl::narrow<T>(ival);
-      return true;
-    } else if ((end0 == 'K') || (end0 == 'M') || (end0 == 'G') || (end0 == 'T') || (end0 == 'P')) {
-      if (pEnd[1] == '\0') {
-        unsigned long int multiplier = 1000; // NOLINT
-
-        if ((end0 != 'K')) {
-          multiplier *= 1000;
-          if (end0 != 'M') {
-            multiplier *= 1000;
-            if (end0 != 'G') {
-              multiplier *= 1000;
-              if (end0 != 'T') {
-                multiplier *= 1000;
-              }
-            }
-          }
-        }
-        output = gsl::narrow<T>(ival * multiplier);
-        return true;
-
-      } else if ((pEnd[1] == 'b' || pEnd[1] == 'B') && (pEnd[2] == '\0')) {
-        unsigned long int multiplier = 1024; // NOLINT
-
-        if ((end0 != 'K')) {
-          multiplier *= 1024;
-          if (end0 != 'M') {
-            multiplier *= 1024;
-            if (end0 != 'G') {
-              multiplier *= 1024;
-              if (end0 != 'T') {
-                multiplier *= 1024;
-              }
-            }
-          }
-        }
-        output = gsl::narrow<T>(ival * multiplier);
-        return true;
+    if (!unit_str.empty()) {
+      std::transform(unit_str.begin(), unit_str.end(), unit_str.begin(), ::toupper);
+      auto multiplierIt = unit_map.find(unit_str);
+      if (multiplierIt == unit_map.end()) {
+        getLogger()->log_warn("Unrecognized data unit: '%s', in the future this will constitute as an error", unit_str);
+        // backwards compatibility
+        // return false;
+      } else {
+        value *= multiplierIt->second;
       }
     }
 
-    return false;
+    output = gsl::narrow<T>(value);
+    return true;
   }
 };
 
diff --git a/libminifi/include/core/logging/LoggerConfiguration.h b/libminifi/include/core/logging/LoggerConfiguration.h
index 7e93ef0..80e2f1e 100644
--- a/libminifi/include/core/logging/LoggerConfiguration.h
+++ b/libminifi/include/core/logging/LoggerConfiguration.h
@@ -36,7 +36,10 @@
 
 #include "core/Core.h"
 #include "core/logging/Logger.h"
-#include "properties/Properties.h"
+#include "LoggerProperties.h"
+#include "internal/CompressionManager.h"
+
+class LoggerTestAccessor;
 
 namespace org {
 namespace apache {
@@ -50,6 +53,8 @@ struct LoggerNamespace {
   spdlog::level::level_enum level;
   bool has_level;
   std::vector<std::shared_ptr<spdlog::sinks::sink>> sinks;
+  // sinks made available to all descendants
+  std::vector<std::shared_ptr<spdlog::sinks::sink>> exported_sinks;
   std::map<std::string, std::shared_ptr<LoggerNamespace>> children;
 
   LoggerNamespace()
@@ -61,37 +66,9 @@ struct LoggerNamespace {
 };
 }  // namespace internal
 
-class LoggerProperties : public Properties {
- public:
-  LoggerProperties()
-      : Properties("Logger properties") {
-  }
-  /**
-   * Gets all keys that start with the given prefix and do not have a "." after the prefix and "." separator.
-   *
-   * Ex: with type argument "appender"
-   * you would get back a property of "appender.rolling" but not "appender.rolling.file_name"
-   */
-  std::vector<std::string> get_keys_of_type(const std::string &type);
-
-  /**
-   * Registers a sink witht the given name. This allows for programmatic definition of sinks.
-   */
-  void add_sink(const std::string &name, std::shared_ptr<spdlog::sinks::sink> sink) {
-    sinks_[name] = sink;
-  }
-  std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> initial_sinks() {
-    return sinks_;
-  }
-
-  static const char* appender_prefix;
-  static const char* logger_prefix;
-
- private:
-  std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> sinks_;
-};
-
 class LoggerConfiguration {
+  friend class ::LoggerTestAccessor;
+
  public:
   /**
    * Gets the current log configuration
@@ -121,6 +98,15 @@ class LoggerConfiguration {
    */
   void initialize(const std::shared_ptr<LoggerProperties> &logger_properties);
 
+  static std::unique_ptr<io::InputStream> getCompressedLog(bool flush = false) {
+    return getCompressedLog(std::chrono::milliseconds{0}, flush);
+  }
+
+  template<class Rep, class Period>
+  static std::unique_ptr<io::InputStream> getCompressedLog(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
+    return getConfiguration().compression_manager_.getCompressedLog(time, flush);
+  }
+
   /**
    * Can be used to get arbitrarily named Logger, LoggerFactory should be preferred within a class.
    */
@@ -134,6 +120,10 @@ class LoggerConfiguration {
                                                     std::shared_ptr<spdlog::formatter> formatter, bool remove_if_present = false);
 
  private:
+  std::shared_ptr<Logger> getLogger(const std::string& name, const std::lock_guard<std::mutex>& lock);
+
+  void initializeCompression(const std::lock_guard<std::mutex>& lock, const std::shared_ptr<LoggerProperties>& properties);
+
   static spdlog::sink_ptr create_syslog_sink();
   static spdlog::sink_ptr create_fallback_sink();
 
@@ -154,6 +144,7 @@ class LoggerConfiguration {
   };
 
   LoggerConfiguration();
+  internal::CompressionManager compression_manager_;
   std::shared_ptr<internal::LoggerNamespace> root_namespace_;
   std::vector<std::shared_ptr<LoggerImpl>> loggers;
   std::shared_ptr<spdlog::formatter> formatter_;
diff --git a/libminifi/include/core/logging/LoggerProperties.h b/libminifi/include/core/logging/LoggerProperties.h
new file mode 100644
index 0000000..1341a7f
--- /dev/null
+++ b/libminifi/include/core/logging/LoggerProperties.h
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <map>
+#include <vector>
+
+#include "spdlog/sinks/sink.h"
+
+#include "properties/Properties.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+
+class LoggerProperties : public Properties {
+ public:
+  LoggerProperties()
+      : Properties("Logger properties") {
+  }
+  /**
+   * Gets all keys that start with the given prefix and do not have a "." after the prefix and "." separator.
+   *
+   * Ex: with type argument "appender"
+   * you would get back a property of "appender.rolling" but not "appender.rolling.file_name"
+   */
+  std::vector<std::string> get_keys_of_type(const std::string &type);
+
+  /**
+   * Registers a sink witht the given name. This allows for programmatic definition of sinks.
+   */
+  void add_sink(const std::string &name, std::shared_ptr<spdlog::sinks::sink> sink) {
+    sinks_[name] = sink;
+  }
+  std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> initial_sinks() {
+    return sinks_;
+  }
+
+ private:
+  std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> sinks_;
+};
+
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/core/logging/internal/ActiveCompressor.h b/libminifi/include/core/logging/internal/ActiveCompressor.h
new file mode 100644
index 0000000..045d3e0
--- /dev/null
+++ b/libminifi/include/core/logging/internal/ActiveCompressor.h
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include "LogBuffer.h"
+#include "LogCompressor.h"
+#include "core/logging/Logger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+class ActiveCompressor {
+ public:
+  class Allocator {
+   public:
+    explicit Allocator(std::shared_ptr<logging::Logger> logger) : logger_{std::move(logger)} {}
+
+    ActiveCompressor operator()(size_t max_size) const {
+      ActiveCompressor instance;
+      instance.output_.reset(new io::BufferStream());
+      instance.output_->extend(max_size);
+      instance.compressor_.reset(new LogCompressor(gsl::make_not_null(instance.output_.get()), logger_));
+      return instance;
+    }
+
+   private:
+    std::shared_ptr<logging::Logger> logger_;
+  };
+
+  LogBuffer commit() {
+    compressor_->close();
+    return LogBuffer{std::move(output_)};
+  }
+
+  size_t size() const {
+    return output_->size();
+  }
+
+  std::unique_ptr<io::BufferStream> output_;
+  std::unique_ptr<LogCompressor> compressor_;
+};
+
+}  // namespace internal
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/core/logging/internal/CompressionManager.h b/libminifi/include/core/logging/internal/CompressionManager.h
new file mode 100644
index 0000000..b1ea9fa
--- /dev/null
+++ b/libminifi/include/core/logging/internal/CompressionManager.h
@@ -0,0 +1,84 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <mutex>
+#include <atomic>
+#include <functional>
+#include <utility>
+#include <string>
+
+#include "core/logging/Logger.h"
+#include "LogCompressorSink.h"
+#include "core/logging/LoggerProperties.h"
+#include "io/InputStream.h"
+#include "utils/Literals.h"
+
+class LoggerTestAccessor;
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+class CompressionManager {
+  friend class ::LoggerTestAccessor;
+
+  using LoggerFactory = std::function<std::shared_ptr<Logger>(const std::string&)>;
+
+ public:
+  std::shared_ptr<LogCompressorSink> initialize(const std::shared_ptr<LoggerProperties>& properties, const std::shared_ptr<Logger>& error_logger, const LoggerFactory& logger_factory);
+
+  template<class Rep, class Period>
+  std::unique_ptr<io::InputStream> getCompressedLog(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
+    std::shared_ptr<internal::LogCompressorSink> sink = getSink();
+    if (sink) {
+      return sink->getContent(time, flush);
+    }
+    return nullptr;
+  }
+
+  static constexpr const char* compression_cached_log_max_size_ = "compression.cached.log.max.size";
+  static constexpr const char* compression_compressed_log_max_size_ = "compression.compressed.log.max.size";
+
+ private:
+  std::shared_ptr<internal::LogCompressorSink> getSink() const {
+    // gcc4.8 bug => cannot use std::atomic_load
+    std::lock_guard<std::mutex> lock(mtx_);
+    return sink_;
+  }
+
+  std::atomic<size_t> cache_segment_size{1_MiB};
+  std::atomic<size_t> compressed_segment_size{1_MiB};
+
+  mutable std::mutex mtx_;
+  std::shared_ptr<LogCompressorSink> sink_;
+};
+
+}  // namespace internal
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/src/core/TypedValues.cpp b/libminifi/include/core/logging/internal/LogBuffer.h
similarity index 53%
copy from libminifi/src/core/TypedValues.cpp
copy to libminifi/include/core/logging/internal/LogBuffer.h
index 93a43b8..968be80 100644
--- a/libminifi/src/core/TypedValues.cpp
+++ b/libminifi/include/core/logging/internal/LogBuffer.h
@@ -16,19 +16,47 @@
  * limitations under the License.
  */
 
-#include "core/Property.h"
-#include "core/TypedValues.h"
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "io/BufferStream.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace core {
+namespace logging {
+namespace internal {
+
+class LogBuffer {
+ public:
+  LogBuffer() = default;
+  explicit LogBuffer(std::unique_ptr<io::BufferStream> buffer): buffer_{std::move(buffer)} {}
+
+  static LogBuffer allocate(size_t max_size) {
+    LogBuffer instance{utils::make_unique<io::BufferStream>()};
+    instance.buffer_->extend(max_size);
+    return instance;
+  }
+
+  LogBuffer commit() {
+    return LogBuffer{std::move(buffer_)};
+  }
+
+  size_t size() const {
+    return buffer_->size();
+  }
 
-const  std::type_index DataSizeValue::type_id = typeid(uint64_t);
-const  std::type_index TimePeriodValue::type_id = typeid(uint64_t);
+  std::unique_ptr<io::BufferStream> buffer_;
+};
 
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace internal
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/src/core/TypedValues.cpp b/libminifi/include/core/logging/internal/LogCompressor.h
similarity index 62%
copy from libminifi/src/core/TypedValues.cpp
copy to libminifi/include/core/logging/internal/LogCompressor.h
index 93a43b8..9bc722e 100644
--- a/libminifi/src/core/TypedValues.cpp
+++ b/libminifi/include/core/logging/internal/LogCompressor.h
@@ -16,19 +16,36 @@
  * limitations under the License.
  */
 
-#include "core/Property.h"
-#include "core/TypedValues.h"
+#pragma once
+
+#include <memory>
+#include "io/ZlibStream.h"
+#include "io/OutputStream.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace core {
+namespace logging {
+namespace internal {
+
+class LogCompressor : public io::ZlibCompressStream {
+ public:
+  LogCompressor(gsl::not_null<OutputStream *> output, std::shared_ptr<logging::Logger> logger);
+
+  enum class FlushResult {
+    Success,
+    Error
+  };
 
-const  std::type_index DataSizeValue::type_id = typeid(uint64_t);
-const  std::type_index TimePeriodValue::type_id = typeid(uint64_t);
+  FlushResult flush();
+};
 
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace internal
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/core/logging/internal/LogCompressorSink.h b/libminifi/include/core/logging/internal/LogCompressorSink.h
new file mode 100644
index 0000000..bbb7c24
--- /dev/null
+++ b/libminifi/include/core/logging/internal/LogCompressorSink.h
@@ -0,0 +1,92 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <atomic>
+#include <utility>
+
+#include "spdlog/common.h"
+#include "spdlog/details/log_msg.h"
+#include "spdlog/details/null_mutex.h"
+#include "spdlog/sinks/base_sink.h"
+#include "ActiveCompressor.h"
+#include "LogBuffer.h"
+#include "utils/StagingQueue.h"
+
+class LoggerTestAccessor;
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+struct LogQueueSize {
+  size_t max_total_size;
+  size_t max_segment_size;
+};
+
+class LogCompressorSink : public spdlog::sinks::base_sink<spdlog::details::null_mutex> {
+  friend class ::LoggerTestAccessor;
+
+ private:
+  void sink_it_(const spdlog::details::log_msg& msg) override;
+  void flush_() override;
+
+ public:
+  explicit LogCompressorSink(LogQueueSize cache_size, LogQueueSize compressed_size, std::shared_ptr<logging::Logger> logger);
+  ~LogCompressorSink() override;
+
+  template<class Rep, class Period>
+  std::unique_ptr<io::InputStream> getContent(const std::chrono::duration<Rep, Period>& time, bool flush = false) {
+    if (flush) {
+      cached_logs_.commit();
+      compress(true);
+    }
+    LogBuffer compressed;
+    compressed_logs_.tryDequeue(compressed, time);
+    return std::move(compressed.buffer_);
+  }
+
+ private:
+  enum class CompressionResult {
+    Success,
+    NothingToCompress
+  };
+
+  CompressionResult compress(bool force_rotation = false);
+  void run();
+
+  std::atomic<bool> running_{true};
+  std::thread compression_thread_;
+
+  utils::StagingQueue<LogBuffer> cached_logs_;
+  utils::StagingQueue<ActiveCompressor, ActiveCompressor::Allocator> compressed_logs_;
+};
+
+}  // namespace internal
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/io/BufferStream.h b/libminifi/include/io/BufferStream.h
index 2290a3f..c16e15c 100644
--- a/libminifi/include/io/BufferStream.h
+++ b/libminifi/include/io/BufferStream.h
@@ -43,6 +43,14 @@ class BufferStream : public BaseStream {
     write(reinterpret_cast<const uint8_t*>(data.c_str()), data.length());
   }
 
+  /*
+   * prepares the stream to accept and additional byte_count bytes
+   * @param byte_count number of bytes we expect to write
+   */
+  void extend(size_t byte_count) {
+    buffer_.reserve(buffer_.size() + byte_count);
+  }
+
   using BaseStream::read;
   using BaseStream::write;
 
diff --git a/libminifi/include/io/ZlibStream.h b/libminifi/include/io/ZlibStream.h
index b146413..f8efff8 100644
--- a/libminifi/include/io/ZlibStream.h
+++ b/libminifi/include/io/ZlibStream.h
@@ -26,7 +26,7 @@
 #include <vector>
 
 #include "BaseStream.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/Logger.h"
 #include "utils/gsl.h"
 
 namespace org {
@@ -80,8 +80,13 @@ class ZlibCompressStream : public ZlibBaseStream {
 
   void close() override;
 
- private:
-  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<ZlibCompressStream>::getLogger()};
+ protected:
+  ZlibCompressStream(gsl::not_null<OutputStream*> ouput, ZlibCompressionFormat format, int level, std::shared_ptr<core::logging::Logger> logger);
+
+  using FlushMode = int;
+  size_t write(const uint8_t* value, size_t size, FlushMode mode);
+
+  std::shared_ptr<core::logging::Logger> logger_;
 };
 
 class ZlibDecompressStream : public ZlibBaseStream {
@@ -98,7 +103,7 @@ class ZlibDecompressStream : public ZlibBaseStream {
   size_t write(const uint8_t *value, size_t size) override;
 
  private:
-  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<ZlibDecompressStream>::getLogger()};
+  std::shared_ptr<core::logging::Logger> logger_;
 };
 
 }  // namespace io
diff --git a/libminifi/include/utils/Literals.h b/libminifi/include/utils/Literals.h
new file mode 100644
index 0000000..9b47b8a
--- /dev/null
+++ b/libminifi/include/utils/Literals.h
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+constexpr unsigned long long operator "" _KiB(unsigned long long n) {  // NOLINT
+  return 1024 * n;
+}
+
+constexpr unsigned long long operator "" _MiB(unsigned long long n) {  // NOLINT
+  return 1024_KiB * n;
+}
+
+constexpr unsigned long long operator "" _GiB(unsigned long long n) {  // NOLINT
+  return 1024_MiB * n;
+}
+
+constexpr unsigned long long operator "" _TiB(unsigned long long n) {  // NOLINT
+  return 1024_GiB * n;
+}
+
+constexpr unsigned long long operator "" _PiB(unsigned long long n) {  // NOLINT
+  return 1024_TiB * n;
+}
+
+constexpr unsigned long long operator "" _KB(unsigned long long n) {  // NOLINT
+  return 1000 * n;
+}
+
+constexpr unsigned long long operator "" _MB(unsigned long long n) {  // NOLINT
+  return 1000_KB * n;
+}
+
+constexpr unsigned long long operator "" _GB(unsigned long long n) {  // NOLINT
+  return 1000_MB * n;
+}
+
+constexpr unsigned long long operator "" _TB(unsigned long long n) {  // NOLINT
+  return 1000_GB * n;
+}
+
+constexpr unsigned long long operator "" _PB(unsigned long long n) {  // NOLINT
+  return 1000_TB * n;
+}
diff --git a/libminifi/include/utils/StagingQueue.h b/libminifi/include/utils/StagingQueue.h
new file mode 100644
index 0000000..2714a3a
--- /dev/null
+++ b/libminifi/include/utils/StagingQueue.h
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <mutex>
+#include <atomic>
+#include <utility>
+#include "MinifiConcurrentQueue.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+namespace internal {
+template<typename T>
+struct default_allocator {
+  T operator()(size_t max_size) const {
+    return T::allocate(max_size);
+  }
+};
+}  // namespace internal
+
+/**
+ * Purpose: A FIFO container that allows chunked processing while trying to enforce
+ * soft limits like max chunk size and max total size. The "head" chunk might be
+ * modified in a thread-safe manner (usually appending to it) before committing it
+ * thus making it available for dequeuing.
+ */
+template<typename ActiveItem, typename Allocator = internal::default_allocator<ActiveItem>>
+class StagingQueue {
+  using Item = typename std::decay<decltype(std::declval<ActiveItem&>().commit())>::type;
+
+  static_assert(std::is_same<decltype(std::declval<const Allocator&>()(std::declval<size_t>())), ActiveItem>::value,
+      "Allocator::operator(size_t) must return an ActiveItem");
+  static_assert(std::is_same<decltype(std::declval<const Item&>().size()), size_t>::value,
+      "Item::size must return size_t");
+  static_assert(std::is_same<decltype(std::declval<const ActiveItem&>().size()), size_t>::value,
+      "ActiveItem::size must return size_t");
+
+  template<typename Functor, typename Arg, typename = void>
+  struct FunctorCallHelper;
+
+  template<typename Functor, typename Arg>
+  struct FunctorCallHelper<Functor, Arg, typename std::enable_if<std::is_same<decltype(std::declval<Functor>()(std::declval<Arg>())), bool>::value>::type> {
+    static bool call(Functor&& fn, Arg&& arg) {
+      return std::forward<Functor>(fn)(std::forward<Arg>(arg));
+    }
+  };
+
+  template<typename Functor, typename Arg>
+  struct FunctorCallHelper<Functor, Arg, typename std::enable_if<std::is_same<decltype(std::declval<Functor>()(std::declval<Arg>())), void>::value>::type> {
+    static bool call(Functor&& fn, Arg&& arg) {
+      std::forward<Functor>(fn)(std::forward<Arg>(arg));
+      return false;
+    }
+  };
+
+  static ActiveItem allocateActiveItem(const Allocator& allocator, size_t max_item_size) {
+    // max_size is a soft limit, i.e. reaching max_size is an indicator
+    // that that item should be committed, we cannot guarantee that only
+    // max_size content is in the item, since max_size is the "trigger limit",
+    // presumable each item would contain (at the trigger point) a little
+    // more than max_size content, that is the reasoning behind "* 3 / 2"
+    return allocator(max_item_size * 3 / 2);
+  }
+
+ public:
+  StagingQueue(size_t max_size, size_t max_item_size, Allocator allocator = {})
+    : max_size_(max_size),
+      max_item_size_(max_item_size),
+      active_item_(allocateActiveItem(allocator, max_item_size)),
+      allocator_(allocator) {}
+
+  void commit() {
+    std::unique_lock<std::mutex> lock{active_item_mutex_};
+    if (active_item_.size() == 0) {
+      // nothing to commit
+      return;
+    }
+    commit(lock);
+  }
+
+  /**
+   * Allows thread-safe modification of the "live" instance.
+   * @tparam Functor
+   * @param fn callable which can modify the instance, should return true
+   * if it would like to force a commit
+   */
+  template<typename Functor>
+  void modify(Functor&& fn) {
+    std::unique_lock<std::mutex> lock{active_item_mutex_};
+    size_t original_size = active_item_.size();
+    bool should_commit = FunctorCallHelper<Functor, ActiveItem&>::call(std::forward<Functor>(fn), active_item_);
+    size_t new_size = active_item_.size();
+    if (new_size >= original_size) {
+      total_size_ += new_size - original_size;
+    } else {
+      total_size_ -= original_size - new_size;
+    }
+    if (should_commit || new_size > max_item_size_) {
+      commit(lock);
+    }
+  }
+
+  template<class Rep, class Period>
+  bool tryDequeue(Item& out, const std::chrono::duration<Rep, Period>& time) {
+    if (time == std::chrono::duration<Rep, Period>{0}) {
+      return tryDequeue(out);
+    }
+    if (queue_.dequeueWaitFor(out, time)) {
+      total_size_ -= out.size();
+      return true;
+    }
+    return false;
+  }
+
+  bool tryDequeue(Item& out) {
+    if (queue_.tryDequeue(out)) {
+      total_size_ -= out.size();
+      return true;
+    }
+    return false;
+  }
+
+  void discardOverflow() {
+    while (total_size_ > max_size_) {
+      Item item;
+      if (!queue_.tryDequeue(item)) {
+        break;
+      }
+      total_size_ -= item.size();
+    }
+  }
+
+  size_t size() const {
+    return total_size_;
+  }
+
+ private:
+  void commit(std::unique_lock<std::mutex>& /*lock*/) {
+    queue_.enqueue(active_item_.commit());
+    active_item_ = allocateActiveItem(allocator_, max_item_size_);
+  }
+
+  const size_t max_size_;
+  const size_t max_item_size_;
+  std::atomic<size_t> total_size_{0};
+
+  std::mutex active_item_mutex_;
+  ActiveItem active_item_;
+
+  const Allocator allocator_;
+
+  ConditionConcurrentQueue<Item> queue_;
+};
+
+}  // namespace utils
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/utils/ValueParser.h b/libminifi/include/utils/ValueParser.h
index 6639165..8910096 100644
--- a/libminifi/include/utils/ValueParser.h
+++ b/libminifi/include/utils/ValueParser.h
@@ -137,6 +137,10 @@ class ValueParser {
     }
   }
 
+  std::string rest() const noexcept {
+    return str.substr(offset);
+  }
+
  private:
   /**
    *
diff --git a/libminifi/src/core/TypedValues.cpp b/libminifi/src/core/TypedValues.cpp
index 93a43b8..8306dec 100644
--- a/libminifi/src/core/TypedValues.cpp
+++ b/libminifi/src/core/TypedValues.cpp
@@ -16,8 +16,12 @@
  * limitations under the License.
  */
 
+#include <memory>
+
 #include "core/Property.h"
 #include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -27,6 +31,11 @@ namespace core {
 const  std::type_index DataSizeValue::type_id = typeid(uint64_t);
 const  std::type_index TimePeriodValue::type_id = typeid(uint64_t);
 
+std::shared_ptr<logging::Logger>& DataSizeValue::getLogger() {
+  static std::shared_ptr<logging::Logger> logger = logging::LoggerFactory<DataSizeValue>::getLogger();
+  return logger;
+}
+
 } /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp
index 2b89547..dfff263 100644
--- a/libminifi/src/core/logging/LoggerConfiguration.cpp
+++ b/libminifi/src/core/logging/LoggerConfiguration.cpp
@@ -27,12 +27,16 @@
 #include <memory>
 #include <map>
 #include <string>
+#include <atomic>
 
 #include "core/Core.h"
 #include "utils/StringUtils.h"
 #include "utils/ClassUtils.h"
 #include "utils/file/FileUtils.h"
 #include "utils/Environment.h"
+#include "core/logging/internal/LogCompressorSink.h"
+#include "utils/Literals.h"
+#include "core/TypedValues.h"
 
 #include "spdlog/spdlog.h"
 #include "spdlog/sinks/stdout_sinks.h"
@@ -60,6 +64,25 @@ namespace logging {
 
 const char* LoggerConfiguration::spdlog_default_pattern = "[%Y-%m-%d %H:%M:%S.%e] [%n] [%l] %v";
 
+utils::optional<spdlog::level::level_enum> parse_log_level(const std::string& level_name) {
+  if (utils::StringUtils::equalsIgnoreCase(level_name, "trace")) {
+    return spdlog::level::trace;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "debug")) {
+    return spdlog::level::debug;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "info")) {
+    return spdlog::level::info;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "warn")) {
+    return spdlog::level::warn;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "error")) {
+    return spdlog::level::err;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "critical")) {
+    return spdlog::level::critical;
+  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "off")) {
+    return spdlog::level::off;
+  }
+  return {};
+}
+
 std::vector<std::string> LoggerProperties::get_keys_of_type(const std::string &type) {
   std::vector<std::string> appenders;
   std::string prefix = type + ".";
@@ -85,6 +108,7 @@ LoggerConfiguration::LoggerConfiguration()
 void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &logger_properties) {
   std::lock_guard<std::mutex> lock(mutex);
   root_namespace_ = initialize_namespaces(logger_properties);
+  initializeCompression(lock, logger_properties);
   std::string spdlog_pattern;
   if (!logger_properties->getString("spdlog.pattern", spdlog_pattern)) {
     spdlog_pattern = spdlog_default_pattern;
@@ -116,6 +140,10 @@ void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &lo
 
 std::shared_ptr<Logger> LoggerConfiguration::getLogger(const std::string &name) {
   std::lock_guard<std::mutex> lock(mutex);
+  return getLogger(name, lock);
+}
+
+std::shared_ptr<Logger> LoggerConfiguration::getLogger(const std::string &name, const std::lock_guard<std::mutex>& /*lock*/) {
   std::string adjusted_name = name;
   const std::string clazz = "class ";
   auto haz_clazz = name.find(clazz);
@@ -207,19 +235,9 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::initialize_names
       std::string level_name = utils::StringUtils::trim(segment);
       if (first) {
         first = false;
-        std::transform(level_name.begin(), level_name.end(), level_name.begin(), ::tolower);
-        if ("trace" == level_name) {
-          level = spdlog::level::trace;
-        } else if ("debug" == level_name) {
-          level = spdlog::level::debug;
-        } else if ("warn" == level_name) {
-          level = spdlog::level::warn;
-        } else if ("critical" == level_name) {
-          level = spdlog::level::critical;
-        } else if ("error" == level_name) {
-          level = spdlog::level::err;
-        } else if ("off" == level_name) {
-          level = spdlog::level::off;
+        auto opt_level = parse_log_level(level_name);
+        if (opt_level) {
+          level = *opt_level;
         }
       } else {
         sinks.push_back(sink_map[level_name]);
@@ -258,6 +276,7 @@ std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr<
   }
   std::shared_ptr<internal::LoggerNamespace> current_namespace = root_namespace;
   std::vector<std::shared_ptr<spdlog::sinks::sink>> sinks = root_namespace->sinks;
+  std::vector<std::shared_ptr<spdlog::sinks::sink>> inherited_sinks;
   spdlog::level::level_enum level = root_namespace->level;
   std::string current_namespace_str = "";
   std::string sink_namespace_str = "root";
@@ -268,6 +287,7 @@ std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr<
     if (child_pair == current_namespace->children.end()) {
       break;
     }
+    std::copy(current_namespace->exported_sinks.begin(), current_namespace->exported_sinks.end(), std::back_inserter(inherited_sinks));
     current_namespace = child_pair->second;
     if (current_namespace->sinks.size() > 0) {
       sinks = current_namespace->sinks;
@@ -283,6 +303,7 @@ std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr<
     const auto levelView(spdlog::level::to_string_view(level));
     logger->log_debug("%s logger got sinks from namespace %s and level %s from namespace %s", name, sink_namespace_str, std::string(levelView.begin(), levelView.end()), level_namespace_str);
   }
+  std::copy(inherited_sinks.begin(), inherited_sinks.end(), std::back_inserter(sinks));
   spdlogger = std::make_shared<spdlog::logger>(name, begin(sinks), end(sinks));
   spdlogger->set_level(level);
   spdlogger->set_formatter(formatter -> clone());
@@ -318,6 +339,14 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::create_default_r
   return result;
 }
 
+void LoggerConfiguration::initializeCompression(const std::lock_guard<std::mutex>& lock, const std::shared_ptr<LoggerProperties>& properties) {
+  auto compression_sink = compression_manager_.initialize(properties, logger_, [&] (const std::string& name) {return getLogger(name, lock);});
+  if (compression_sink) {
+    root_namespace_->sinks.push_back(compression_sink);
+    root_namespace_->exported_sinks.push_back(compression_sink);
+  }
+}
+
 } /* namespace logging */
 } /* namespace core */
 } /* namespace minifi */
diff --git a/libminifi/src/core/logging/internal/CompressionManager.cpp b/libminifi/src/core/logging/internal/CompressionManager.cpp
new file mode 100644
index 0000000..6347988
--- /dev/null
+++ b/libminifi/src/core/logging/internal/CompressionManager.cpp
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <mutex>
+
+#include "core/logging/internal/CompressionManager.h"
+#include "core/logging/internal/LogCompressorSink.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerProperties.h"
+#include "core/TypedValues.h"
+#include "core/Core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+std::shared_ptr<LogCompressorSink> CompressionManager::initialize(
+    const std::shared_ptr<LoggerProperties>& properties, const std::shared_ptr<Logger>& error_logger, const LoggerFactory& logger_factory) {
+  auto get_size = [&] (const char* const property_name) -> utils::optional<size_t> {
+    auto size_str = properties->getString(property_name);
+    if (!size_str) return {};
+    size_t value;
+    if (DataSizeValue::StringToInt(*size_str, value)) {
+      return value;
+    }
+    if (error_logger) {
+      error_logger->log_error("Invalid format for %s", property_name);
+    }
+    return {};
+  };
+  auto cached_log_max_size = get_size(compression_cached_log_max_size_).value_or(8_MiB);
+  auto compressed_log_max_size = get_size(compression_compressed_log_max_size_).value_or(8_MiB);
+  std::shared_ptr<internal::LogCompressorSink> sink;
+  if (cached_log_max_size != 0 && compressed_log_max_size != 0) {
+    sink = std::make_shared<internal::LogCompressorSink>(
+        LogQueueSize{cached_log_max_size, cache_segment_size},
+        LogQueueSize{compressed_log_max_size, compressed_segment_size},
+        logger_factory(getClassName<LogCompressorSink>()));
+  }
+  {
+    // gcc4.8 bug => cannot use std::atomic_store
+    std::lock_guard<std::mutex> lock(mtx_);
+    sink_ = sink;
+  }
+  return sink;
+}
+
+}  // namespace internal
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/src/core/TypedValues.cpp b/libminifi/src/core/logging/internal/LogCompressor.cpp
similarity index 56%
copy from libminifi/src/core/TypedValues.cpp
copy to libminifi/src/core/logging/internal/LogCompressor.cpp
index 93a43b8..4dd23c0 100644
--- a/libminifi/src/core/TypedValues.cpp
+++ b/libminifi/src/core/logging/internal/LogCompressor.cpp
@@ -16,19 +16,32 @@
  * limitations under the License.
  */
 
-#include "core/Property.h"
-#include "core/TypedValues.h"
+#include "core/logging/internal/LogCompressor.h"
+#include "core/logging/LoggerConfiguration.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace core {
+namespace logging {
+namespace internal {
+
+LogCompressor::LogCompressor(gsl::not_null<OutputStream *> output, std::shared_ptr<logging::Logger> logger)
+    : ZlibCompressStream(output, io::ZlibCompressionFormat::GZIP, Z_DEFAULT_COMPRESSION, std::move(logger)) {}
+
+LogCompressor::FlushResult LogCompressor::flush() {
+  if (write(nullptr, 0, Z_SYNC_FLUSH) == 0) {
+    return FlushResult::Success;
+  }
+  return FlushResult::Error;
+}
 
-const  std::type_index DataSizeValue::type_id = typeid(uint64_t);
-const  std::type_index TimePeriodValue::type_id = typeid(uint64_t);
+}  // namespace internal
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
 
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/libminifi/src/core/logging/internal/LogCompressorSink.cpp b/libminifi/src/core/logging/internal/LogCompressorSink.cpp
new file mode 100644
index 0000000..112e6ae
--- /dev/null
+++ b/libminifi/src/core/logging/internal/LogCompressorSink.cpp
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/internal/LogCompressorSink.h"
+#include "spdlog/details/log_msg.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+namespace internal {
+
+LogCompressorSink::LogCompressorSink(LogQueueSize cache_size, LogQueueSize compressed_size, std::shared_ptr<logging::Logger> logger)
+  : cached_logs_(cache_size.max_total_size, cache_size.max_segment_size),
+    compressed_logs_(compressed_size.max_total_size, compressed_size.max_segment_size, ActiveCompressor::Allocator{std::move(logger)}) {
+  compression_thread_ = std::thread{&LogCompressorSink::run, this};
+}
+
+LogCompressorSink::~LogCompressorSink() {
+  running_ = false;
+  compression_thread_.join();
+}
+
+void LogCompressorSink::sink_it_(const spdlog::details::log_msg &msg) {
+  cached_logs_.modify([&] (LogBuffer& active) {
+    active.buffer_->write(reinterpret_cast<const uint8_t*>(msg.payload.data()), msg.payload.size());
+  });
+}
+
+void LogCompressorSink::run() {
+  while (running_) {
+    cached_logs_.discardOverflow();
+    compressed_logs_.discardOverflow();
+    if (compress() == CompressionResult::NothingToCompress) {
+      std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    }
+  }
+}
+
+LogCompressorSink::CompressionResult LogCompressorSink::compress(bool force_rotation) {
+  LogBuffer log_cache;
+  if (!cached_logs_.tryDequeue(log_cache)) {
+    if (force_rotation) {
+      compressed_logs_.commit();
+    }
+    return CompressionResult::NothingToCompress;
+  }
+  compressed_logs_.modify([&] (ActiveCompressor& compressor) {
+    compressor.compressor_->write(log_cache.buffer_->getBuffer(), log_cache.buffer_->size());
+    compressor.compressor_->flush();
+    return force_rotation;
+  });
+  return CompressionResult::Success;
+}
+
+void LogCompressorSink::flush_() {}
+
+}  // namespace internal
+}  // namespace logging
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/src/io/ZlibStream.cpp b/libminifi/src/io/ZlibStream.cpp
index 711a31d..9a9aa2f 100644
--- a/libminifi/src/io/ZlibStream.cpp
+++ b/libminifi/src/io/ZlibStream.cpp
@@ -19,6 +19,7 @@
 #include "io/ZlibStream.h"
 #include "Exception.h"
 #include "utils/gsl.h"
+#include "core/logging/LoggerConfiguration.h"
 
 namespace org {
 namespace apache {
@@ -39,7 +40,11 @@ bool ZlibBaseStream::isFinished() const {
 }
 
 ZlibCompressStream::ZlibCompressStream(gsl::not_null<OutputStream*> output, ZlibCompressionFormat format, int level)
-  : ZlibBaseStream(output) {
+  : ZlibCompressStream(output, format, level, logging::LoggerFactory<ZlibCompressStream>::getLogger()) {}
+
+ZlibCompressStream::ZlibCompressStream(gsl::not_null<OutputStream*> output, ZlibCompressionFormat format, int level, std::shared_ptr<logging::Logger> logger)
+  : ZlibBaseStream(output),
+    logger_{std::move(logger)} {
   int ret = deflateInit2(
       &strm_,
       level,
@@ -57,11 +62,22 @@ ZlibCompressStream::ZlibCompressStream(gsl::not_null<OutputStream*> output, Zlib
 
 ZlibCompressStream::~ZlibCompressStream() {
   if (state_ != ZlibStreamState::UNINITIALIZED) {
-    deflateEnd(&strm_);
+    int result = deflateEnd(&strm_);
+    if (result == Z_DATA_ERROR) {
+      logger_->log_debug("Stream was freed prematurely");
+    } else if (result == Z_STREAM_ERROR) {
+      logger_->log_debug("Stream state was inconsistent");
+    } else if (result != Z_OK) {
+      logger_->log_debug("Unknown error while finishing compression %d", result);
+    }
   }
 }
 
-size_t ZlibCompressStream::write(const uint8_t* value, size_t size) {
+size_t ZlibCompressStream::write(const uint8_t *value, size_t size) {
+  return write(value, size, Z_NO_FLUSH);
+}
+
+size_t ZlibCompressStream::write(const uint8_t* value, size_t size, FlushMode mode) {
   if (state_ != ZlibStreamState::INITIALIZED) {
     logger_->log_error("writeData called in invalid ZlibCompressStream state, state is %hhu", state_);
     return STREAM_ERROR;
@@ -84,10 +100,9 @@ size_t ZlibCompressStream::write(const uint8_t* value, size_t size) {
     strm_.next_out = outputBuffer_.data();
     strm_.avail_out = gsl::narrow<uInt>(outputBuffer_.size());
 
-    int flush = value == nullptr ? Z_FINISH : Z_NO_FLUSH;
-    logger_->log_trace("calling deflate with flush %d", flush);
+    logger_->log_trace("calling deflate with flush %d", mode);
 
-    int ret = deflate(&strm_, flush);
+    int ret = deflate(&strm_, mode);
     if (ret == Z_STREAM_ERROR) {
       logger_->log_error("deflate failed, error code: %d", ret);
       state_ = ZlibStreamState::ERRORED;
@@ -107,14 +122,15 @@ size_t ZlibCompressStream::write(const uint8_t* value, size_t size) {
 
 void ZlibCompressStream::close() {
   if (state_ == ZlibStreamState::INITIALIZED) {
-    if (write(nullptr, 0U) == 0) {
+    if (write(nullptr, 0U, Z_FINISH) == 0) {
       state_ = ZlibStreamState::FINISHED;
     }
   }
 }
 
 ZlibDecompressStream::ZlibDecompressStream(gsl::not_null<OutputStream*> output, ZlibCompressionFormat format)
-    : ZlibBaseStream(output) {
+    : ZlibBaseStream(output),
+      logger_{logging::LoggerFactory<ZlibDecompressStream>::getLogger()} {
   int ret = inflateInit2(&strm_, 15 + (format == ZlibCompressionFormat::GZIP ? 16 : 0) /* windowBits */);
   if (ret != Z_OK) {
     logger_->log_error("Failed to initialize z_stream with inflateInit2, error code: %d", ret);
@@ -126,7 +142,12 @@ ZlibDecompressStream::ZlibDecompressStream(gsl::not_null<OutputStream*> output,
 
 ZlibDecompressStream::~ZlibDecompressStream() {
   if (state_ != ZlibStreamState::UNINITIALIZED) {
-    inflateEnd(&strm_);
+    int result = inflateEnd(&strm_);
+    if (result == Z_STREAM_ERROR) {
+      logger_->log_error("Stream state was inconsistent");
+    } else if (result != Z_OK) {
+      logger_->log_error("Unknown error while finishing decompression %d", result);
+    }
   }
 }
 
diff --git a/libminifi/test/unit/LoggerTests.cpp b/libminifi/test/unit/LoggerTests.cpp
index e9db9a7..8750df7 100644
--- a/libminifi/test/unit/LoggerTests.cpp
+++ b/libminifi/test/unit/LoggerTests.cpp
@@ -22,6 +22,9 @@
 #include <ctime>
 #include "../TestBase.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "io/ZlibStream.h"
+#include "StreamPipe.h"
+#include "utils/IntegrationTestUtils.h"
 
 TEST_CASE("Test log Levels", "[ttl1]") {
   LogTestController::getInstance().setTrace<logging::Logger>();
@@ -76,7 +79,7 @@ TEST_CASE("Test log Levels change", "[ttl5]") {
 namespace single {
 class TestClass {
 };
-}
+}  // namespace single
 
 class TestClass2 {
 };
@@ -107,3 +110,95 @@ TEST_CASE("Test ShortenNames", "[ttl6]") {
   LogTestController::getInstance(props)->reset();
   LogTestController::getInstance().reset();
 }
+
+using namespace minifi::io;
+
+std::string decompress(const std::shared_ptr<InputStream>& input) {
+  auto output = utils::make_unique<BufferStream>();
+  auto decompressor = std::make_shared<ZlibDecompressStream>(gsl::make_not_null(output.get()));
+  minifi::internal::pipe(input, decompressor);
+  decompressor->close();
+  return std::string{reinterpret_cast<const char*>(output->getBuffer()), output->size()};
+}
+
+TEST_CASE("Test Compression", "[ttl7]") {
+  auto& log_config = logging::LoggerConfiguration::getConfiguration();
+  auto properties = std::make_shared<logging::LoggerProperties>();
+  std::string className;
+  SECTION("Using root logger") {
+    className = "CompressionTestClassUsingRoot";
+    // by default the root logger is OFF
+    properties->set("logger.root", "INFO");
+  }
+  SECTION("Inherit compression sink") {
+    className = "CompressionTestClassInheriting";
+    properties->set("appender.null", "null");
+    properties->set("logger." + className, "INFO,null");
+  }
+  log_config.initialize(properties);
+  auto logger = log_config.getLogger(className);
+  logger->log_error("Hi there");
+  std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
+  REQUIRE(compressed_log);
+  auto logs = decompress(compressed_log);
+  REQUIRE(logs == "Hi there");
+}
+
+class LoggerTestAccessor {
+ public:
+  static void setCompressionCacheSegmentSize(logging::LoggerConfiguration& log_config, size_t value) {
+    log_config.compression_manager_.cache_segment_size = value;
+  }
+  static void setCompressionCompressedSegmentSize(logging::LoggerConfiguration& log_config, size_t value) {
+    log_config.compression_manager_.compressed_segment_size = value;
+  }
+  static size_t getUncompressedSize(logging::LoggerConfiguration& log_config) {
+    return log_config.compression_manager_.getSink()->cached_logs_.size();
+  }
+  static size_t getCompressedSize(logging::LoggerConfiguration& log_config) {
+    return log_config.compression_manager_.getSink()->compressed_logs_.size();
+  }
+};
+
+TEST_CASE("Test Compression cache overflow is discarded intermittently", "[ttl8]") {
+  auto& log_config = logging::LoggerConfiguration::getConfiguration();
+  auto properties = std::make_shared<logging::LoggerProperties>();
+  properties->set(logging::internal::CompressionManager::compression_cached_log_max_size_, "10 KB");
+  LoggerTestAccessor::setCompressionCacheSegmentSize(log_config, 1_KiB);
+  std::string className = "CompressionTestCacheCleaned";
+  // by default the root logger is OFF
+  properties->set("logger.root", "INFO");
+  log_config.initialize(properties);
+  auto logger = log_config.getLogger(className);
+  for (size_t idx = 0; idx < 10000; ++idx) {
+    logger->log_error("Hi there");
+  }
+  bool cache_shrunk = utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, [&] {
+    return LoggerTestAccessor::getUncompressedSize(log_config) <= 10_KiB;
+  });
+  REQUIRE(cache_shrunk);
+}
+
+TEST_CASE("Setting either properties to 0 disables in-memory compressed logs", "[ttl9]") {
+  auto& log_config = logging::LoggerConfiguration::getConfiguration();
+  auto properties = std::make_shared<logging::LoggerProperties>();
+  bool is_nullptr = false;
+  SECTION("Cached log size is set to 0") {
+    is_nullptr = true;
+    properties->set(logging::internal::CompressionManager::compression_cached_log_max_size_, "0");
+  }
+  SECTION("Compressed log size is set to 0") {
+    is_nullptr = true;
+    properties->set(logging::internal::CompressionManager::compression_compressed_log_max_size_, "0");
+  }
+  SECTION("Sanity check") {
+    is_nullptr = false;
+    // pass
+  }
+  // by default the root logger is OFF
+  properties->set("logger.root", "INFO");
+  log_config.initialize(properties);
+  auto logger = log_config.getLogger("DisableCompressionTestLogger");
+  logger->log_error("Hi there");
+  REQUIRE((logging::LoggerConfiguration::getCompressedLog(true) == nullptr) == is_nullptr);
+}
diff --git a/libminifi/test/unit/StagingQueueTests.cpp b/libminifi/test/unit/StagingQueueTests.cpp
new file mode 100644
index 0000000..9391458
--- /dev/null
+++ b/libminifi/test/unit/StagingQueueTests.cpp
@@ -0,0 +1,121 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+
+#include "utils/StringUtils.h"
+#include "../TestBase.h"
+#include "utils/StagingQueue.h"
+
+using org::apache::nifi::minifi::utils::StagingQueue;
+
+class MockItem {
+ public:
+  static MockItem allocate(size_t max_size) {
+    MockItem instance;
+    instance.data_.reserve(max_size * 3 / 2);
+    return instance;
+  }
+
+  MockItem commit() {
+    return std::move(*this);
+  }
+
+  size_t size() const {
+    return data_.size();
+  }
+
+  std::string data_;
+};
+
+TEST_CASE("Construct queue", "[TestStagingQueue1]") {
+  StagingQueue<MockItem> queue(30, 10);
+  REQUIRE(queue.size() == 0);
+}
+
+TEST_CASE("Modify no commit", "[TestStagingQueue2]") {
+  StagingQueue<MockItem> queue(30, 10);
+  queue.modify([] (MockItem& item) {
+    item.data_ += "12345";
+  });
+  REQUIRE(queue.size() == 5);
+  SECTION("Decrease size") {
+    queue.modify([] (MockItem& item) {
+      REQUIRE(item.data_ == "12345");
+      item.data_ = "";
+    });
+    REQUIRE(queue.size() == 0);
+  }
+  MockItem out;
+  REQUIRE(!queue.tryDequeue(out));
+}
+
+TEST_CASE("Modify and commit", "[TestStagingQueue3]") {
+  StagingQueue<MockItem> queue(30, 10);
+  queue.modify([] (MockItem& item) {
+    item.data_ += "12345";
+  });
+  queue.commit();
+  SECTION("Commit is idempotent if there is no modification between") {
+    queue.commit();
+  }
+  REQUIRE(queue.size() == 5);
+  MockItem out;
+  REQUIRE(queue.tryDequeue(out));
+  REQUIRE(out.data_ == "12345");
+  REQUIRE(queue.size() == 0);
+}
+
+TEST_CASE("Modify and overflow triggered automatic commit", "[TestStagingQueue4]") {
+  StagingQueue<MockItem> queue(30, 10);
+  queue.modify([] (MockItem& item) {
+    item.data_ += "123456789ab";
+  });
+  SECTION("Explicit commit makes no difference") {
+    queue.commit();
+  }
+  queue.modify([] (MockItem& item) {
+    // a new item has been allocated
+    REQUIRE(item.data_ == "");
+  });
+  REQUIRE(queue.size() == 11);
+  MockItem out;
+  REQUIRE(queue.tryDequeue(out));
+  REQUIRE(out.data_ == "123456789ab");
+  REQUIRE(queue.size() == 0);
+}
+
+TEST_CASE("Discard overflow", "[TestStagingQueue5]") {
+  StagingQueue<MockItem> queue(30, 10);
+  for (size_t idx = 0; idx < 5; ++idx) {
+    queue.modify([&] (MockItem& item) {
+      item.data_ = utils::StringUtils::repeat(std::to_string(idx), 10);
+    });
+    queue.commit();
+  }
+  REQUIRE(queue.size() == 50);
+  queue.discardOverflow();
+  REQUIRE(queue.size() == 30);
+  MockItem out;
+  // idx 0 and 1 have been discarded
+  for (size_t idx = 2; idx < 5; ++idx) {
+    REQUIRE(queue.tryDequeue(out));
+    REQUIRE(out.data_ == utils::StringUtils::repeat(std::to_string(idx), 10));
+  }
+  REQUIRE(queue.size() == 0);
+}

[nifi-minifi-cpp] 03/06: MINIFICPP-1580 Fix Tensorflow extension build

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit e6ecb2e182008e3d6f7bdebc591409fafd2cd6c9
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Jun 23 18:14:24 2021 +0200

    MINIFICPP-1580 Fix Tensorflow extension build
    
    Closes #1096
    
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 cmake/FindTensorFlow.cmake                         | 55 ++++++++++++++++------
 extensions/tensorflow/CMakeLists.txt               |  3 +-
 extensions/tensorflow/TFApplyGraph.cpp             | 11 +++--
 extensions/tensorflow/TFApplyGraph.h               |  2 +-
 extensions/tensorflow/TFConvertImageToTensor.cpp   |  6 +--
 extensions/tensorflow/TFConvertImageToTensor.h     |  2 +-
 extensions/tensorflow/TFExtractTopLabels.cpp       | 11 +++--
 extensions/tensorflow/TFExtractTopLabels.h         |  2 +-
 libminifi/test/tensorflow-tests/CMakeLists.txt     |  2 +-
 .../test/tensorflow-tests/TensorFlowTests.cpp      | 10 ++--
 10 files changed, 67 insertions(+), 37 deletions(-)

diff --git a/cmake/FindTensorFlow.cmake b/cmake/FindTensorFlow.cmake
index 732e3ae..2996048 100644
--- a/cmake/FindTensorFlow.cmake
+++ b/cmake/FindTensorFlow.cmake
@@ -18,8 +18,20 @@
 include(FindPackageHandleStandardArgs)
 unset(TENSORFLOW_FOUND)
 
-if (TENSORFLOW_PATH)
-  message("-- Checking for TensorFlow in provided TENSORFLOW_PATH: ${TENSORFLOW_PATH}")
+if (TENSORFLOW_INCLUDE_PATH)
+  message("-- Checking for TensorFlow includes in provided TENSORFLOW_INCLUDE_PATH: ${TENSORFLOW_INCLUDE_PATH}")
+endif()
+
+if (TENSORFLOW_LIB_PATH)
+  message("-- Checking for TensorFlow libs in provided TENSORFLOW_LIB_PATH: ${TENSORFLOW_LIB_PATH}")
+endif()
+
+if (GOOGLE_PROTOBUF_INCLUDE_PATH)
+  message("-- Checking for Google Protobuf includes in provided GOOGLE_PROTOBUF_INCLUDE_PATH: ${GOOGLE_PROTOBUF_INCLUDE_PATH}")
+endif()
+
+if (GOOGLE_PROTOBUF_LIB_PATH)
+  message("-- Checking for Google Protobuf libs in provided GOOGLE_PROTOBUF_LIB_PATH: ${GOOGLE_PROTOBUF_LIB_PATH}")
 endif()
 
 find_path(TENSORFLOW_INCLUDE_DIR
@@ -28,34 +40,46 @@ find_path(TENSORFLOW_INCLUDE_DIR
           tensorflow/cc
           third_party
           HINTS
-          ${TENSORFLOW_PATH}
+          ${TENSORFLOW_INCLUDE_PATH}
           /usr/include/tensorflow
           /usr/local/include/google/tensorflow
           /usr/local/include/tensorflow
+          /usr/local/include/tensorflow/bazel-bin/tensorflow/include
           /usr/include/google/tensorflow)
 
 find_library(TENSORFLOW_CC_LIBRARY NAMES tensorflow_cc
              HINTS
-             ${TENSORFLOW_PATH}
-             ${TENSORFLOW_PATH}/bazel-bin/tensorflow
+             ${TENSORFLOW_LIB_PATH}
+             ${TENSORFLOW_INCLUDE_PATH}/bazel-bin/tensorflow
              /usr/lib
              /usr/local/lib
              /usr/local/lib/tensorflow_cc)
 
-find_library(TENSORFLOW_FRAMEWORK_LIBRARY NAMES tensorflow_framework
+find_path(GOOGLE_PROTOBUF_INCLUDE NAMES google/protobuf
+          HINTS
+          ${GOOGLE_PROTOBUF_INCLUDE_PATH}
+          ${TENSORFLOW_INCLUDE_PATH}/src
+          /usr/include/tensorflow/src
+          /usr/local/include/google/tensorflow/src
+          /usr/local/include/tensorflow/src
+          /usr/local/include/tensorflow/bazel-bin/tensorflow/include/src
+          /usr/include/google/tensorflow/src)
+
+find_library(GOOGLE_PROTOBUF_LIBRARY NAMES protobuf
              HINTS
-             ${TENSORFLOW_PATH}
-             ${TENSORFLOW_PATH}/bazel-bin/tensorflow
+             ${GOOGLE_PROTOBUF_LIB_PATH}
              /usr/lib
              /usr/local/lib
-             /usr/local/lib/tensorflow_cc)
+             /usr/lib/x86_64-linux-gnu)
 
-find_package_handle_standard_args(TENSORFLOW DEFAULT_MSG TENSORFLOW_INCLUDE_DIR TENSORFLOW_CC_LIBRARY TENSORFLOW_FRAMEWORK_LIBRARY)
+find_package_handle_standard_args(TENSORFLOW DEFAULT_MSG TENSORFLOW_INCLUDE_DIR TENSORFLOW_CC_LIBRARY GOOGLE_PROTOBUF_INCLUDE GOOGLE_PROTOBUF_LIBRARY)
 
 if(TENSORFLOW_FOUND)
     message("-- Found TensorFlow includes: ${TENSORFLOW_INCLUDE_DIR}")
-    message("-- Found TensorFlow libraries: ${TENSORFLOW_CC_LIBRARY} ${TENSORFLOW_FRAMEWORK_LIBRARY}")
-    set(TENSORFLOW_LIBRARIES ${TENSORFLOW_CC_LIBRARY} ${TENSORFLOW_FRAMEWORK_LIBRARY})
+    message("-- Found TensorFlow libs: ${TENSORFLOW_CC_LIBRARY}")
+    message("-- Found Google Protobuf includes: ${GOOGLE_PROTOBUF_INCLUDE}")
+    message("-- Found Google Protobuf libs: ${GOOGLE_PROTOBUF_LIBRARY}")
+    set(TENSORFLOW_LIBRARIES ${TENSORFLOW_CC_LIBRARY} ${GOOGLE_PROTOBUF_LIBRARY})
     set(TENSORFLOW_INCLUDE_DIRS
         ${TENSORFLOW_INCLUDE_DIR}
         ${TENSORFLOW_INCLUDE_DIR}/bazel-genfiles
@@ -63,9 +87,10 @@ if(TENSORFLOW_FOUND)
         ${TENSORFLOW_INCLUDE_DIR}/tensorflow/contrib/makefile/downloads/eigen
         ${TENSORFLOW_INCLUDE_DIR}/tensorflow/contrib/makefile/downloads/gemmlowp
         ${TENSORFLOW_INCLUDE_DIR}/tensorflow/contrib/makefile/downloads/nsync/public
-        ${TENSORFLOW_INCLUDE_DIR}/tensorflow/contrib/makefile/gen/protobuf-host/include)
+        ${TENSORFLOW_INCLUDE_DIR}/tensorflow/contrib/makefile/gen/protobuf-host/include
+        ${GOOGLE_PROTOBUF_INCLUDE})
 else()
-  message(FATAL_ERROR "TensorFlow was not found. Check or set TENSORFLOW_PATH to TensorFlow build, Install libtensorflow_cc.so and headers into the system, or disable the TensorFlow extension.")
+  message(FATAL_ERROR "TensorFlow or Google Protobuf dependency was not found. Check or set TENSORFLOW_INCLUDE_PATH, TENSORFLOW_LIB_PATH, GOOGLE_PROTOBUF_INCLUDE, GOOGLE_PROTOBUF_LIBRARY to TensorFlow build, Install libtensorflow_cc.so, libprotobuf.so and headers into the system, or disable the TensorFlow extension.")
 endif()
 
-mark_as_advanced(TENSORFLOW_INCLUDE_DIR TENSORFLOW_CC_LIBRARY TENSORFLOW_FRAMEWORK_LIBRARY)
+mark_as_advanced(TENSORFLOW_INCLUDE_DIR TENSORFLOW_CC_LIBRARY GOOGLE_PROTOBUF_INCLUDE GOOGLE_PROTOBUF_LIBRARY)
diff --git a/extensions/tensorflow/CMakeLists.txt b/extensions/tensorflow/CMakeLists.txt
index 3c792ee..b6f74c2 100644
--- a/extensions/tensorflow/CMakeLists.txt
+++ b/extensions/tensorflow/CMakeLists.txt
@@ -26,13 +26,12 @@ find_package(TensorFlow REQUIRED)
 
 message("-- Found TensorFlow: ${TENSORFLOW_INCLUDE_DIRS}")
 
-include_directories(${TENSORFLOW_INCLUDE_DIRS})
-
 file(GLOB SOURCES  "*.cpp")
 
 add_library(minifi-tensorflow-extensions STATIC ${SOURCES})
 set_property(TARGET minifi-tensorflow-extensions PROPERTY POSITION_INDEPENDENT_CODE ON)
 
+target_include_directories(minifi-tensorflow-extensions SYSTEM PRIVATE ${TENSORFLOW_INCLUDE_DIRS})
 target_link_libraries(minifi-tensorflow-extensions ${LIBMINIFI} Threads::Threads)
 target_link_libraries(minifi-tensorflow-extensions ${TENSORFLOW_LIBRARIES})
 
diff --git a/extensions/tensorflow/TFApplyGraph.cpp b/extensions/tensorflow/TFApplyGraph.cpp
index 14d349e..b14c2e3 100644
--- a/extensions/tensorflow/TFApplyGraph.cpp
+++ b/extensions/tensorflow/TFApplyGraph.cpp
@@ -65,7 +65,7 @@ void TFApplyGraph::initialize() {
   setSupportedRelationships(std::move(relationships));
 }
 
-void TFApplyGraph::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+void TFApplyGraph::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory* /*sessionFactory*/) {
   context->getProperty(InputNode.getName(), input_node_);
 
   if (input_node_.empty()) {
@@ -79,8 +79,8 @@ void TFApplyGraph::onSchedule(core::ProcessContext *context, core::ProcessSessio
   }
 }
 
-void TFApplyGraph::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
-                             const std::shared_ptr<core::ProcessSession> &session) {
+void TFApplyGraph::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/,
+                             const std::shared_ptr<core::ProcessSession>& session) {
   auto flow_file = session->get();
 
   if (!flow_file) {
@@ -152,7 +152,10 @@ void TFApplyGraph::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     TensorReadCallback tensor_cb(input_tensor_proto);
     session->read(flow_file, &tensor_cb);
     tensorflow::Tensor input;
-    input.FromProto(*input_tensor_proto);
+    if (!input.FromProto(*input_tensor_proto)) {
+      // failure deliberately ignored at this time
+      // added to avoid warn_unused_result build errors
+    }
     std::vector<tensorflow::Tensor> outputs;
     auto status = ctx->tf_session->Run({{input_node_, input}}, {output_node_}, {}, &outputs);
 
diff --git a/extensions/tensorflow/TFApplyGraph.h b/extensions/tensorflow/TFApplyGraph.h
index 3f26872..e00d528 100644
--- a/extensions/tensorflow/TFApplyGraph.h
+++ b/extensions/tensorflow/TFApplyGraph.h
@@ -47,7 +47,7 @@ class TFApplyGraph : public core::Processor {
 
   void initialize() override;
   void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
-  void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override {
+  void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) override {
     logger_->log_error("onTrigger invocation with raw pointers is not implemented");
   }
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context,
diff --git a/extensions/tensorflow/TFConvertImageToTensor.cpp b/extensions/tensorflow/TFConvertImageToTensor.cpp
index aea09f8..efa4716 100644
--- a/extensions/tensorflow/TFConvertImageToTensor.cpp
+++ b/extensions/tensorflow/TFConvertImageToTensor.cpp
@@ -115,7 +115,7 @@ void TFConvertImageToTensor::initialize() {
   setSupportedRelationships(std::move(relationships));
 }
 
-void TFConvertImageToTensor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+void TFConvertImageToTensor::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*sessionFactory*/) {
   context->getProperty(ImageFormat.getName(), input_format_);
 
   if (input_format_.empty()) {
@@ -189,8 +189,8 @@ void TFConvertImageToTensor::onSchedule(core::ProcessContext *context, core::Pro
   }
 }
 
-void TFConvertImageToTensor::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
-                                       const std::shared_ptr<core::ProcessSession> &session) {
+void TFConvertImageToTensor::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/,
+                                       const std::shared_ptr<core::ProcessSession>& session) {
   auto flow_file = session->get();
 
   if (!flow_file) {
diff --git a/extensions/tensorflow/TFConvertImageToTensor.h b/extensions/tensorflow/TFConvertImageToTensor.h
index da3a11c..93c5b38 100644
--- a/extensions/tensorflow/TFConvertImageToTensor.h
+++ b/extensions/tensorflow/TFConvertImageToTensor.h
@@ -54,7 +54,7 @@ class TFConvertImageToTensor : public core::Processor {
 
   void initialize() override;
   void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
-  void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override {
+  void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) override {
     logger_->log_error("onTrigger invocation with raw pointers is not implemented");
   }
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context,
diff --git a/extensions/tensorflow/TFExtractTopLabels.cpp b/extensions/tensorflow/TFExtractTopLabels.cpp
index 2f44f84..cfdaa9d 100644
--- a/extensions/tensorflow/TFExtractTopLabels.cpp
+++ b/extensions/tensorflow/TFExtractTopLabels.cpp
@@ -48,10 +48,10 @@ void TFExtractTopLabels::initialize() {
   setSupportedRelationships(std::move(relationships));
 }
 
-void TFExtractTopLabels::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+void TFExtractTopLabels::onSchedule(core::ProcessContext* /*context*/, core::ProcessSessionFactory* /*sessionFactory*/) {
 }
 
-void TFExtractTopLabels::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
+void TFExtractTopLabels::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/,
                                    const std::shared_ptr<core::ProcessSession> &session) {
   auto flow_file = session->get();
 
@@ -88,7 +88,10 @@ void TFExtractTopLabels::onTrigger(const std::shared_ptr<core::ProcessContext> &
     session->read(flow_file, &tensor_cb);
 
     tensorflow::Tensor input;
-    input.FromProto(*input_tensor_proto);
+    if (!input.FromProto(*input_tensor_proto)) {
+      // failure deliberately ignored at this time
+      // added to avoid warn_unused_result build errors
+    }
     auto input_flat = input.flat<float>();
 
     std::vector<std::pair<uint64_t, float>> scores;
@@ -102,7 +105,7 @@ void TFExtractTopLabels::onTrigger(const std::shared_ptr<core::ProcessContext> &
       return a.second > b.second;
     });
 
-    for (int i = 0; i < 5 && i < scores.size(); i++) {
+    for (std::size_t i = 0; i < 5 && i < scores.size(); i++) {
       if (!labels || scores[i].first > labels->size()) {
         logger_->log_error("Label index is out of range (are the correct labels loaded?); routing to retry...");
         session->transfer(flow_file, Retry);
diff --git a/extensions/tensorflow/TFExtractTopLabels.h b/extensions/tensorflow/TFExtractTopLabels.h
index a1b2211..03ca8ca 100644
--- a/extensions/tensorflow/TFExtractTopLabels.h
+++ b/extensions/tensorflow/TFExtractTopLabels.h
@@ -44,7 +44,7 @@ class TFExtractTopLabels : public core::Processor {
 
   void initialize() override;
   void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
-  void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override {
+  void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) override {
     logger_->log_error("onTrigger invocation with raw pointers is not implemented");
   }
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context,
diff --git a/libminifi/test/tensorflow-tests/CMakeLists.txt b/libminifi/test/tensorflow-tests/CMakeLists.txt
index c7e68eb..97b45ec 100644
--- a/libminifi/test/tensorflow-tests/CMakeLists.txt
+++ b/libminifi/test/tensorflow-tests/CMakeLists.txt
@@ -27,7 +27,7 @@ FOREACH(testfile ${TENSORFLOW_INTEGRATION_TESTS})
 	add_executable("${testfilename}" "${testfile}" "${TEST_DIR}/TestBase.cpp")
 	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/tensorflow")
 	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
-	target_include_directories(${testfilename} PRIVATE BEFORE ${TENSORFLOW_INCLUDE_DIRS})
+	target_include_directories(${testfilename} SYSTEM PRIVATE BEFORE ${TENSORFLOW_INCLUDE_DIRS})
 	createTests("${testfilename}")
 	target_wholearchive_library(${testfilename} minifi-tensorflow-extensions)
 	target_wholearchive_library(${testfilename} minifi-standard-processors)
diff --git a/libminifi/test/tensorflow-tests/TensorFlowTests.cpp b/libminifi/test/tensorflow-tests/TensorFlowTests.cpp
index d4dc27c..6959cfb 100644
--- a/libminifi/test/tensorflow-tests/TensorFlowTests.cpp
+++ b/libminifi/test/tensorflow-tests/TensorFlowTests.cpp
@@ -120,13 +120,13 @@ TEST_CASE("TensorFlow: Apply Graph", "[tfApplyGraph]") { // NOLINT
     tensorflow::GraphDef graph;
 
     // Write test TensorFlow graph
-    root.ToGraphDef(&graph);
+    REQUIRE(root.ToGraphDef(&graph).ok());
     std::ofstream in_file_stream(in_graph_file);
     graph.SerializeToOstream(&in_file_stream);
   }
 
   // Read test TensorFlow graph into TFApplyGraph
-  plan->runNextProcessor([&get_file, &in_graph_file, &plan](const std::shared_ptr<core::ProcessContext> context,
+  plan->runNextProcessor([&in_graph_file](const std::shared_ptr<core::ProcessContext> /*context*/,
                                                             const std::shared_ptr<core::ProcessSession> session) {
     // Intercept the call so that we can add an attr (won't be required when we have UpdateAttribute processor)
     auto flow_file = session->create();
@@ -163,7 +163,7 @@ TEST_CASE("TensorFlow: Apply Graph", "[tfApplyGraph]") { // NOLINT
     tensorflow::TensorProto tensor_proto;
     tensor_proto.ParseFromIstream(&out_file_stream);
     tensorflow::Tensor tensor;
-    tensor.FromProto(tensor_proto);
+    REQUIRE(tensor.FromProto(tensor_proto));
 
     // Verify output tensor
     float tensor_val = tensor.flat<float>().data()[0];
@@ -286,7 +286,7 @@ TEST_CASE("TensorFlow: ConvertImageToTensor", "[tfConvertImageToTensor]") { // N
     tensorflow::TensorProto tensor_proto;
     tensor_proto.ParseFromIstream(&out_file_stream);
     tensorflow::Tensor tensor;
-    tensor.FromProto(tensor_proto);
+    REQUIRE(tensor.FromProto(tensor_proto));
 
     // Verify output tensor
     auto shape = tensor.shape();
@@ -358,7 +358,7 @@ TEST_CASE("TensorFlow: Extract Top Labels", "[tfExtractTopLabels]") { // NOLINT
   }
 
   // Read labels
-  plan->runNextProcessor([&get_file, &in_labels_file, &plan](const std::shared_ptr<core::ProcessContext> context,
+  plan->runNextProcessor([&in_labels_file](const std::shared_ptr<core::ProcessContext> /*context*/,
                                                              const std::shared_ptr<core::ProcessSession> session) {
     // Intercept the call so that we can add an attr (won't be required when we have UpdateAttribute processor)
     auto flow_file = session->create();

[nifi-minifi-cpp] 02/06: MINIFICPP-1576 Allow build of all extensions in docker and cleanup docker files

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 88e742975a418347371b9ca7dd27c6cff5d305ce
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Jun 23 17:55:18 2021 +0200

    MINIFICPP-1576 Allow build of all extensions in docker and cleanup docker files
    
    - Add and cleanup dependencies in docker image required by extensions
    - Fix extension builds without glibc (for alpine base image)
    - Cleanup .dockerignore file
    - Avoid extension rebuild in CPack preinstall phase while building docker image
    - Merge docker image and minimal docker image build phases
    - Fix dockerfile linter issues
    - Add DISABLE_PYTHON_SCRIPTING check
    
    Closes #1094
    
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .dockerignore                                   |  64 +++++++-
 .github/workflows/ci.yml                        |   2 +-
 .gitignore                                      |   7 -
 CMakeLists.txt                                  |  20 +--
 cmake/DockerConfig.cmake                        |  18 +-
 docker/DockerBuild.sh                           |  16 +-
 docker/Dockerfile                               | 208 ++++++++++--------------
 thirdparty/openwsman/openwsman.patch            |  39 +++++
 thirdparty/pcap++/Pcap++/src/PcapLiveDevice.cpp |   9 +-
 9 files changed, 218 insertions(+), 165 deletions(-)

diff --git a/.dockerignore b/.dockerignore
index 66dab34..247ebd5 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -1,14 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+.git
+.vscode
+.cache
+.cproject
+.settings
+.DS_Store
+.idea
+.iml
+.project
+nifi-minifi-cpp.kdev4
+.kdev4
+.device_id
+.ccls-cache
+.vs
+bt_state
 bin/minifi
 *build*
+*flowfile_checkpoint
 *_repository*
 logs
-cmake-build-*
+**/cmake-build-*
 docker
-.git
-.vscode
+target
+*.pyc
+*.swp
+__pycache__/
+docs/generated
+corecomponentstate
+flowfile_repository
+content_repository
+provenance_repository
+logs
+compile_commands.json
+
 extensions/expression-language/Parser.cpp
 extensions/expression-language/Parser.hpp
 extensions/expression-language/Scanner.cpp
 extensions/expression-language/location.hh
 extensions/expression-language/position.hh
 extensions/expression-language/stack.h
+thirdparty/uuid/tst_uuid*
+thirdparty/apache-rat/apache-rat*
+thirdparty/**/*.o
+thirdparty/**/*.a
+libminifi/test/**/*.a
+libminifi/src/agent/agent_version.cpp
+
+**/assemblies
+**/CMakeCache.txt
+**/CMakeFiles
+**/CMakeScripts
+**/cmake_install.cmake
+**/install_manifest.txt
+**/CTestTestfile.cmake
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 3b64198..d380294 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -378,7 +378,7 @@ jobs:
       - id: checkout
         uses: actions/checkout@v2
       - id: build
-        run: ./bootstrap.sh -e -t && cd build  && cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_AZURE=ON .. && make docker
+        run: mkdir build && cd build && cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DDISABLE_SCRIPTING=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_AZURE=ON .. && make docker
       - id: install_deps
         run: |
           sudo apt update
diff --git a/.gitignore b/.gitignore
index 2398a5c..8754b95 100644
--- a/.gitignore
+++ b/.gitignore
@@ -66,13 +66,6 @@ __pycache__/
 /provenance_repository
 /logs
 
-# Ignore source files that have been placed in the docker directory during build
-docker/minificppsource
-docker/bionic/minificppsource
-docker/focal/minificppsource
-docker/debian/minificppsource
-docker/centos/minificppsource
-docker/fedora/minificppsource
 .vs/**
 *.swp
 .cache
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 63548f3..370d80d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -577,8 +577,11 @@ add_subdirectory(main)
 add_subdirectory(nanofi)
 add_dependencies(nanofi minifiexe)
 
-add_subdirectory(encrypt-config)
-add_dependencies(encrypt-config minifi)
+option(ENABLE_ENCRYPT_CONFIG "Enables build of encrypt-config binary." ON)
+if (ENABLE_ENCRYPT_CONFIG)
+	add_subdirectory(encrypt-config)
+	add_dependencies(encrypt-config minifi)
+endif()
 
 if (NOT DISABLE_CURL AND NOT DISABLE_CONTROLLER)
 	add_subdirectory(controller)
@@ -833,18 +836,7 @@ registerTest("encrypt-config/tests")
 
 include(BuildDocs)
 
-if (ENABLE_PYTHON OR NOT DISABLE_SCRIPTING)
-	message(STATUS "Python and scripting extensions will disabled for the docker build as they produce many error messages on MiNiFi startup.")
-	set(ENABLE_PYTHON_CACHE ${ENABLE_PYTHON})
-	set(DISABLE_SCRIPTING_CACHE ${DISABLE_SCRIPTING})
-	set(ENABLE_PYTHON OFF)
-	set(DISABLE_SCRIPTING ON) # Implies DISABLE_PYTHON_SCRIPTING as well
-	include(DockerConfig)
-	set(ENABLE_PYTHON ${ENABLE_PYTHON_CACHE})
-	set(DISABLE_SCRIPTING ${DISABLE_SCRIPTING_CACHE})
-else()
-	include(DockerConfig)
-endif()
+include(DockerConfig)
 
 # Create a custom build target that will run the linter.
 get_property(extensions GLOBAL PROPERTY EXTENSION-LINTERS)
diff --git a/cmake/DockerConfig.cmake b/cmake/DockerConfig.cmake
index 5bb4107..1947f65 100644
--- a/cmake/DockerConfig.cmake
+++ b/cmake/DockerConfig.cmake
@@ -22,7 +22,6 @@ add_custom_target(
         -u 1000
         -g 1000
         -v ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH}
-        -i release
         -c ENABLE_ALL=${ENABLE_ALL}
         -c ENABLE_PYTHON=${ENABLE_PYTHON}
         -c ENABLE_OPS=${ENABLE_OPS}
@@ -31,7 +30,6 @@ add_custom_target(
         -c ENABLE_OPC=${ENABLE_OPC}
         -c ENABLE_GPS=${ENABLE_GPS}
         -c ENABLE_COAP=${ENABLE_COAP}
-        -c ENABLE_WEL=${ENABLE_WEL}
         -c ENABLE_SQL=${ENABLE_SQL}
         -c ENABLE_MQTT=${ENABLE_MQTT}
         -c ENABLE_PCAP=${ENABLE_PCAP}
@@ -44,6 +42,7 @@ add_custom_target(
         -c ENABLE_SFTP=${ENABLE_SFTP}
         -c ENABLE_OPENWSMAN=${ENABLE_OPENWSMAN}
         -c ENABLE_AZURE=${ENABLE_AZURE}
+        -c ENABLE_ENCRYPT_CONFIG=${ENABLE_ENCRYPT_CONFIG}
         -c DISABLE_CURL=${DISABLE_CURL}
         -c DISABLE_JEMALLOC=${DISABLE_JEMALLOC}
         -c DISABLE_CIVET=${DISABLE_CIVET}
@@ -65,8 +64,16 @@ add_custom_target(
     COMMAND ${CMAKE_SOURCE_DIR}/docker/DockerBuild.sh
         -u 1000
         -g 1000
+        -t minimal
         -v ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH}
-        -i minimal
+        -c ENABLE_PYTHON=OFF
+        -c ENABLE_LIBRDKAFKA=ON
+        -c ENABLE_AWS=ON
+        -c DISABLE_CONTROLLER=ON
+        -c DISABLE_SCRIPTING=ON
+        -c DISABLE_PYTHON_SCRIPTING=ON
+        -c ENABLE_ENCRYPT_CONFIG=OFF
+        -c AWS_ENABLE_UNITY_BUILD=OFF
         -c DOCKER_BASE_IMAGE=${DOCKER_BASE_IMAGE}
         -c BUILD_NUMBER=${BUILD_NUMBER}
     WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/docker/)
@@ -77,7 +84,6 @@ add_custom_target(
         -u 1000
         -g 1000
         -v ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH}
-        -i release
         -c ENABLE_JNI=${ENABLE_JNI}
         -l ${CMAKE_BINARY_DIR}
         -d centos
@@ -90,7 +96,6 @@ add_custom_target(
         -u 1000
         -g 1000
         -v ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH}
-        -i release
         -c ENABLE_JNI=${ENABLE_JNI}
         -l ${CMAKE_BINARY_DIR}
         -d debian
@@ -103,7 +108,6 @@ add_custom_target(
         -u 1000
         -g 1000
         -v ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH}
-        -i release
         -c ENABLE_JNI=${ENABLE_JNI}
         -l ${CMAKE_BINARY_DIR}
         -d fedora
@@ -116,7 +120,6 @@ add_custom_target(
         -u 1000
         -g 1000
         -v ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH}
-        -i release
         -c ENABLE_JNI=${ENABLE_JNI}
         -l ${CMAKE_BINARY_DIR}
         -d bionic
@@ -129,7 +132,6 @@ add_custom_target(
         -u 1000
         -g 1000
         -v ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH}
-        -i release
         -c ENABLE_JNI=${ENABLE_JNI}
         -l ${CMAKE_BINARY_DIR}
         -d focal
diff --git a/docker/DockerBuild.sh b/docker/DockerBuild.sh
index 4641072..397fa8c 100755
--- a/docker/DockerBuild.sh
+++ b/docker/DockerBuild.sh
@@ -24,7 +24,7 @@ set -euo pipefail
 UID_ARG=1000
 GID_ARG=1000
 MINIFI_VERSION=
-IMAGE_TYPE=release
+IMAGE_TAG=
 DUMP_LOCATION=
 DISTRO_NAME=
 BUILD_NUMBER=
@@ -33,7 +33,7 @@ function usage {
   echo "Usage: ./DockerBuild.sh -v <MINIFI_VERSION> [additional options]"
   echo "Options:"
   echo "-v, --minifi-version  Minifi version number to be used (required)"
-  echo "-i, --image-type      Can be release or minimal (default: release)"
+  echo "-t, --tag             Additional prefix added to the image tag"
   echo "-u, --uid             User id to be used in the Docker image (default: 1000)"
   echo "-g, --gid             Group id to be used in the Docker image (default: 1000)"
   echo "-d, --distro-name     Linux distribution build to be used for alternative builds (bionic|focal|fedora|debian|centos)"
@@ -62,8 +62,8 @@ while [[ $# -gt 0 ]]; do
     shift
     shift
     ;;
-    -i|--image-type)
-      IMAGE_TYPE="$2"
+    -t|--tag)
+      IMAGE_TAG="$2"
       shift
       shift
       ;;
@@ -114,8 +114,8 @@ else
 fi
 
 TAG=""
-if [ "${IMAGE_TYPE}" != "release" ]; then
-  TAG="${IMAGE_TYPE}-"
+if [ -n "${IMAGE_TAG}" ]; then
+  TAG="${IMAGE_TAG}-"
 fi
 if [ -n "${DISTRO_NAME}" ]; then
   TAG="${TAG}${DISTRO_NAME}-"
@@ -137,17 +137,15 @@ DOCKER_COMMAND="docker build "
 BUILD_ARGS="--build-arg UID=${UID_ARG} \
             --build-arg GID=${GID_ARG} \
             --build-arg MINIFI_VERSION=${MINIFI_VERSION} \
-            --build-arg IMAGE_TYPE=${IMAGE_TYPE} \
             --build-arg DUMP_LOCATION=${DUMP_LOCATION} \
             --build-arg DISTRO_NAME=${DISTRO_NAME} ${BUILD_ARGS}"
 
 DOCKER_COMMAND="${DOCKER_COMMAND} ${BUILD_ARGS} \
-                --target ${IMAGE_TYPE} \
                 -f ${DOCKERFILE} \
                 -t \
                 apacheminificpp:${TAG} .."
 
-echo "Docker Command: '$DOCKER_COMMAND'"
+echo "Docker Command: 'DOCKER_BUILDKIT=1 ${DOCKER_COMMAND}'"
 DOCKER_BUILDKIT=1 ${DOCKER_COMMAND}
 
 if [ -n "${DUMP_LOCATION}" ]; then
diff --git a/docker/Dockerfile b/docker/Dockerfile
index a1ab8b2..9aef963 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -16,45 +16,76 @@
 # under the License.
 #
 
-# First stage: the common build environment dependencies
-ARG BASE_ALPINE_IMAGE="alpine:3.12"
-FROM ${BASE_ALPINE_IMAGE} AS build_deps
+ARG BASE_ALPINE_IMAGE="alpine:3.13"
+
+# Build image
+FROM ${BASE_ALPINE_IMAGE} AS build
 LABEL maintainer="Apache NiFi <de...@nifi.apache.org>"
 
 ARG MINIFI_VERSION
 ARG UID=1000
 ARG GID=1000
 
+# PDH and WEL extensions and not listed as they are Windows specific
+# SYSTEMD extension is turned OFF explicitly as it has no use in an alpine container
+ARG ENABLE_ALL=OFF
+ARG ENABLE_PYTHON=OFF
+ARG ENABLE_OPS=ON
+ARG ENABLE_JNI=OFF
+ARG ENABLE_OPENCV=OFF
+ARG ENABLE_OPC=OFF
+ARG ENABLE_GPS=OFF
+ARG ENABLE_COAP=OFF
+ARG ENABLE_SQL=OFF
+ARG ENABLE_MQTT=OFF
+ARG ENABLE_PCAP=OFF
+ARG ENABLE_LIBRDKAFKA=OFF
+ARG ENABLE_SENSORS=OFF
+ARG ENABLE_USB_CAMERA=OFF
+ARG ENABLE_TENSORFLOW=OFF
+ARG ENABLE_AWS=OFF
+ARG ENABLE_BUSTACHE=OFF
+ARG ENABLE_SFTP=OFF
+ARG ENABLE_OPENWSMAN=OFF
+ARG ENABLE_AZURE=OFF
+ARG ENABLE_ENCRYPT_CONFIG=ON
+ARG DISABLE_CURL=OFF
+ARG DISABLE_JEMALLOC=ON
+ARG DISABLE_CIVET=OFF
+ARG DISABLE_EXPRESSION_LANGUAGE=OFF
+ARG DISABLE_ROCKSDB=OFF
+ARG DISABLE_LIBARCHIVE=OFF
+ARG DISABLE_LZMA=OFF
+ARG DISABLE_BZIP2=OFF
+ARG DISABLE_SCRIPTING=OFF
+ARG DISABLE_PYTHON_SCRIPTING=
+ARG DISABLE_CONTROLLER=OFF
+ARG CMAKE_BUILD_TYPE=Release
+
 # Install the system dependencies needed for a build
-RUN apk --update --no-cache upgrade && apk --update --no-cache add gcc \
+RUN apk --no-cache add gcc \
   g++ \
   make \
   bison \
   flex \
   flex-dev \
+  linux-headers \
   maven \
   openjdk8-jre-base \
   openjdk8 \
   autoconf \
   automake \
   libtool \
-  wget \
-  gdb \
-  musl-dev \
-  vim \
-  util-linux-dev \
   curl-dev \
   cmake \
   git \
-  nss \
-  nss-dev \
-  unzip \
+  patch \
+  libpcap-dev \
+  libpng-dev \
+  libusb-dev \
   gpsd-dev \
-  libressl-dev \
-  zlib-dev \
-  bzip2-dev \
   python3-dev \
-  patch \
+  boost-dev \
   doxygen
 
 ENV USER minificpp
@@ -65,82 +96,46 @@ ENV MINIFI_HOME $MINIFI_BASE_DIR/nifi-minifi-cpp-${MINIFI_VERSION}
 ENV MINIFI_VERSION ${MINIFI_VERSION}
 
 # Setup minificpp user
-RUN addgroup -g ${GID} ${USER} && adduser -u ${UID} -D -G ${USER} -g "" ${USER}
-
-RUN install -d -o ${USER} -g ${USER} ${MINIFI_BASE_DIR}
+RUN addgroup -g ${GID} ${USER} && adduser -u ${UID} -D -G ${USER} -g "" ${USER} && \
+    install -d -o ${USER} -g ${USER} ${MINIFI_BASE_DIR}
 COPY --chown=${USER}:${USER} . ${MINIFI_BASE_DIR}
 
 USER ${USER}
 
+RUN mkdir ${MINIFI_BASE_DIR}/build
+WORKDIR ${MINIFI_BASE_DIR}/build
+RUN cmake -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_ALL="${ENABLE_ALL}" -DENABLE_PYTHON="${ENABLE_PYTHON}" -DENABLE_OPS="${ENABLE_OPS}" \
+    -DENABLE_JNI="${ENABLE_JNI}" -DENABLE_OPENCV="${ENABLE_OPENCV}" -DENABLE_OPC="${ENABLE_OPC}" -DENABLE_GPS="${ENABLE_GPS}" \
+    -DENABLE_COAP="${ENABLE_COAP}" -DENABLE_SQL="${ENABLE_SQL}" -DENABLE_MQTT="${ENABLE_MQTT}" -DENABLE_PCAP="${ENABLE_PCAP}" \
+    -DENABLE_LIBRDKAFKA="${ENABLE_LIBRDKAFKA}" -DENABLE_SENSORS="${ENABLE_SENSORS}" -DENABLE_USB_CAMERA="${ENABLE_USB_CAMERA}" \
+    -DENABLE_TENSORFLOW="${ENABLE_TENSORFLOW}" -DENABLE_AWS="${ENABLE_AWS}" -DENABLE_BUSTACHE="${ENABLE_BUSTACHE}" \
+    -DENABLE_SFTP="${ENABLE_SFTP}" -DENABLE_OPENWSMAN="${ENABLE_OPENWSMAN}" -DENABLE_AZURE="${ENABLE_AZURE}" -DENABLE_SYSTEMD=OFF \
+    -DDISABLE_CURL="${DISABLE_CURL}" -DDISABLE_JEMALLOC="${DISABLE_JEMALLOC}" -DDISABLE_CIVET="${DISABLE_CIVET}" \
+    -DDISABLE_EXPRESSION_LANGUAGE="${DISABLE_EXPRESSION_LANGUAGE}" -DDISABLE_ROCKSDB="${DISABLE_ROCKSDB}" \
+    -DDISABLE_LIBARCHIVE="${DISABLE_LIBARCHIVE}" -DDISABLE_LZMA="${DISABLE_LZMA}" -DDISABLE_BZIP2="${DISABLE_BZIP2}" \
+    -DDISABLE_SCRIPTING="${DISABLE_SCRIPTING}" -DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}" -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" \
+    -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. \
+  && make -j "$(nproc)" package \
+  && tar -xzvf "${MINIFI_BASE_DIR}/build/nifi-minifi-cpp-${MINIFI_VERSION}-bin.tar.gz" -C "${MINIFI_BASE_DIR}"
+
+
+# Release image
+FROM ${BASE_ALPINE_IMAGE} AS release
+LABEL maintainer="Apache NiFi <de...@nifi.apache.org>"
 
-# Build stage of the minimal image
-FROM build_deps AS build_minimal
-RUN cd ${MINIFI_BASE_DIR} \
-  && mkdir build \
-  && cd build \
-  && cmake -DENABLE_PYTHON=OFF -DDISABLE_SCRIPTING=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_AWS=ON -DSKIP_TESTS=true -DCMAKE_BUILD_TYPE=MinSizeRel .. \
-  && make -j$(nproc) package \
-  && tar -xzvf ${MINIFI_BASE_DIR}/build/nifi-minifi-cpp-${MINIFI_VERSION}-bin.tar.gz -C ${MINIFI_BASE_DIR}
-
+ARG UID=1000
+ARG GID=1000
+ARG MINIFI_VERSION
 
-# Build stage of normal image
-FROM build_deps AS build_release
-ARG ENABLE_ALL=OFF
-ARG ENABLE_PYTHON=OFF
-ARG ENABLE_OPS=ON
-ARG ENABLE_JNI=OFF
-ARG ENABLE_OPENCV=OFF
-ARG ENABLE_OPC=OFF
 ARG ENABLE_GPS=OFF
-ARG ENABLE_COAP=OFF
-ARG ENABLE_WEL=OFF
-ARG ENABLE_SQL=OFF
-ARG ENABLE_MQTT=OFF
+ARG ENABLE_JNI=OFF
 ARG ENABLE_PCAP=OFF
-ARG ENABLE_LIBRDKAFKA=OFF
-ARG ENABLE_SENSORS=OFF
 ARG ENABLE_USB_CAMERA=OFF
-ARG ENABLE_TENSORFLOW=OFF
-ARG ENABLE_AWS=OFF
+ARG ENABLE_OPENCV=OFF
+ARG ENABLE_PYTHON=OFF
 ARG ENABLE_BUSTACHE=OFF
-ARG ENABLE_SFTP=OFF
-ARG ENABLE_OPENWSMAN=OFF
-ARG ENABLE_AZURE=OFF
-ARG DISABLE_CURL=OFF
-ARG DISABLE_JEMALLOC=ON
-ARG DISABLE_CIVET=OFF
-ARG DISABLE_EXPRESSION_LANGUAGE=OFF
-ARG DISABLE_ROCKSDB=OFF
-ARG DISABLE_LIBARCHIVE=OFF
-ARG DISABLE_LZMA=OFF
-ARG DISABLE_BZIP2=OFF
 ARG DISABLE_SCRIPTING=OFF
-ARG DISABLE_PYTHON_SCRIPTING=OFF
-ARG DISABLE_CONTROLLER=OFF
-RUN cd ${MINIFI_BASE_DIR} \
-  && mkdir build \
-  && cd build \
-  && cmake -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_ALL=${ENABLE_ALL} -DENABLE_PYTHON=${ENABLE_PYTHON} -DENABLE_OPS=${ENABLE_OPS} \
-    -DENABLE_JNI=${ENABLE_JNI} -DENABLE_OPENCV=${ENABLE_OPENCV} -DENABLE_OPC=${ENABLE_OPC} -DENABLE_GPS=${ENABLE_GPS} -DENABLE_COAP=${ENABLE_COAP} \
-    -DENABLE_WEL=${ENABLE_WEL} -DENABLE_SQL=${ENABLE_SQL} -DENABLE_MQTT=${ENABLE_MQTT} -DENABLE_PCAP=${ENABLE_PCAP} \
-    -DENABLE_LIBRDKAFKA=${ENABLE_LIBRDKAFKA} -DENABLE_SENSORS=${ENABLE_SENSORS} \
-    -DENABLE_USB_CAMERA=${ENABLE_USB_CAMERA} -DENABLE_TENSORFLOW=${ENABLE_TENSORFLOW} -DENABLE_AWS=${ENABLE_AWS} \
-    -DENABLE_BUSTACHE=${ENABLE_BUSTACHE} -DENABLE_SFTP=${ENABLE_SFTP} -DENABLE_OPENWSMAN=${ENABLE_OPENWSMAN} -DENABLE_AZURE=${ENABLE_AZURE} \
-    -DDISABLE_CURL=${DISABLE_CURL} -DDISABLE_JEMALLOC=${DISABLE_JEMALLOC} -DDISABLE_CIVET=${DISABLE_CIVET} \
-    -DDISABLE_EXPRESSION_LANGUAGE=${DISABLE_EXPRESSION_LANGUAGE} -DDISABLE_ROCKSDB=${DISABLE_ROCKSDB} \
-    -DDISABLE_LIBARCHIVE=${DISABLE_LIBARCHIVE} -DDISABLE_LZMA=${DISABLE_LZMA} -DDISABLE_BZIP2=${DISABLE_BZIP2} \
-    -DDISABLE_SCRIPTING=${DISABLE_SCRIPTING} -DDISABLE_PYTHON_SCRIPTING=${DDISABLE_PYTHON_SCRIPTING} -DDISABLE_CONTROLLER=${DISABLE_CONTROLLER} -DCMAKE_BUILD_TYPE=Release .. \
-  && make -j$(nproc) package \
-  && tar -xzvf ${MINIFI_BASE_DIR}/build/nifi-minifi-cpp-${MINIFI_VERSION}-bin.tar.gz -C ${MINIFI_BASE_DIR}
-
-
-# Common runtime image dependencies
-# Edge required for rocksdb
-FROM ${BASE_ALPINE_IMAGE} AS common_runtime_deps
-
-ARG UID=1000
-ARG GID=1000
-ARG MINIFI_VERSION
+ARG DISABLE_PYTHON_SCRIPTING=
 
 # Add testing repo for rocksdb
 RUN echo 'http://dl-cdn.alpinelinux.org/alpine/edge/testing' >> /etc/apk/repositories
@@ -152,50 +147,21 @@ ENV MINIFI_VERSIONED_HOME ${MINIFI_BASE_DIR}/nifi-minifi-cpp-${MINIFI_VERSION}
 ENV JAVA_HOME /usr/lib/jvm/default-jvm
 ENV PATH ${PATH}:/usr/lib/jvm/default-jvm/bin
 
-RUN addgroup -g ${GID} ${USER} && adduser -u ${UID} -D -G ${USER} -g "" ${USER}
-RUN install -d -o ${USER} -g ${USER} ${MINIFI_BASE_DIR} \
-  && ln -s ${MINIFI_VERSIONED_HOME} ${MINIFI_HOME}
-
-
-# Final stage of the minimal image
-FROM common_runtime_deps AS minimal
-
-RUN apk --update --no-cache upgrade && apk add --update --no-cache libstdc++
-RUN install -d -o ${USER} -g ${USER} ${MINIFI_VERSIONED_HOME}/bin \
-  && install -d -o ${USER} -g ${USER} ${MINIFI_VERSIONED_HOME}/conf && chown ${USER}:${USER} ${MINIFI_HOME}
-
-# Copy built minifi distribution from builder
-COPY --from=build_minimal --chown=${USER}:${USER} ${MINIFI_VERSIONED_HOME}/bin/minifi ${MINIFI_HOME}/bin/minifi
-COPY --from=build_minimal --chown=${USER}:${USER} ${MINIFI_VERSIONED_HOME}/bin/minifi.sh ${MINIFI_HOME}/bin/minifi.sh
-COPY --from=build_minimal --chown=${USER}:${USER} ${MINIFI_VERSIONED_HOME}/conf ${MINIFI_HOME}/conf
-
-USER ${USER}
-WORKDIR ${MINIFI_HOME}
-
-# Start MiNiFi CPP in the foreground
-CMD ./bin/minifi.sh run
-
-
-# Final stage of release image
-FROM common_runtime_deps AS release
-RUN apk --update --no-cache upgrade && apk add --update --no-cache \
-  util-linux \
-  curl \
-  unzip \
-  gpsd \
-  openjdk8-jre-base \
-  openjdk8 \
-  nss \
-  nss-dev \
-  libressl \
-  python3 \
-  zlib
+RUN addgroup -g ${GID} ${USER} && adduser -u ${UID} -D -G ${USER} -g "" ${USER} && \
+    install -d -o ${USER} -g ${USER} ${MINIFI_BASE_DIR} && ln -s ${MINIFI_VERSIONED_HOME} ${MINIFI_HOME} && \
+    apk add --no-cache libstdc++ && \
+    if [ "$ENABLE_GPS" = "ON" ]; then apk add --no-cache gpsd; fi && \
+    if [ "$ENABLE_JNI" = "ON" ]; then apk add --no-cache openjdk8-jre-base; fi && \
+    if [ "$ENABLE_PCAP" = "ON" ]; then apk add --no-cache libpcap; fi && \
+    if [ "$ENABLE_USB_CAMERA" = "ON" ]; then apk add --no-cache libpng libusb; fi && \
+    if [ "$ENABLE_OPENCV" = "ON" ] || [ "$ENABLE_BUSTACHE" = "ON" ]; then apk add --no-cache boost; fi && \
+    if { [ "$ENABLE_PYTHON" = "ON" ] || [ "$DISABLE_SCRIPTING" = "OFF" ]; } && [ -z "$DISABLE_PYTHON_SCRIPTING" ]; then apk add --no-cache python3-dev; fi
 
 # Copy built minifi distribution from builder
-COPY --from=build_release --chown=${USER}:${USER} ${MINIFI_VERSIONED_HOME} ${MINIFI_HOME}
+COPY --from=build --chown=${USER}:${USER} ${MINIFI_VERSIONED_HOME} ${MINIFI_HOME}
 
 USER ${USER}
 WORKDIR ${MINIFI_HOME}
 
 # Start MiNiFi CPP in the foreground
-CMD ./bin/minifi.sh run
+CMD ["./bin/minifi.sh", "run"]
diff --git a/thirdparty/openwsman/openwsman.patch b/thirdparty/openwsman/openwsman.patch
index 5fff4b7..2065972 100644
--- a/thirdparty/openwsman/openwsman.patch
+++ b/thirdparty/openwsman/openwsman.patch
@@ -1,3 +1,6 @@
+PTHREAD_MUTEX_RECURSIVE_NP is a non-portable glibc mutex and our docker base distro alpine does not have glibc, only musl is available.
+On these systems only PTHREAD_MUTEX_RECURSIVE mutex is available that's why we redefine the non-portable symbol.
+
 diff -rupN orig/CMakeLists.txt patched/CMakeLists.txt
 --- orig/CMakeLists.txt	2019-09-17 11:38:38.000000000 +0200
 +++ patched/CMakeLists.txt	2020-04-16 23:43:22.000000000 +0200
@@ -79,3 +82,39 @@ diff -rupN orig/src/lib/wsman-soap.c patched/src/lib/wsman-soap.c
                          max_connections_per_thread = (* fptr)();
                  }
                  else{
+
+diff -rupN orig/src/lib/u/lock.c patched/src/lib/u/lock.c
+--- orig/src/lib/u/lock.c	2021-05-31 13:44:43.992941115 +0200
++++ patched/src/lib/u/lock.c	2021-05-31 12:00:21.972733061 +0200
+@@ -50,7 +50,7 @@
+ extern int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int type);
+ #endif
+ 
+-#if defined (__SVR4) && defined (__sun)
++#if (defined (__SVR4) && defined (__sun)) || !defined(__GLIBC__)
+ #define PTHREAD_MUTEX_RECURSIVE_NP PTHREAD_MUTEX_RECURSIVE
+ #endif
+ 
+@@ -94,7 +94,7 @@ void u_destroy_lock(void* data)
+ void u_unlock(void* data)
+ {
+     if ( data )
+-    {	
++    {
+         pthread_mutex_unlock((pthread_mutex_t*)data);
+     }
+ }
+
+diff -rupN orig/include/u/lock.h patched/include/u/lock.h
+--- orig/include/u/lock.h	2021-05-31 13:44:43.992941115 +0200
++++ patched/include/u/lock.h	2021-05-31 12:00:30.792726402 +0200
+@@ -2,7 +2,7 @@
+ #ifndef LOCKING_H
+ #define LOCKING_H
+ 
+-#if defined (__FreeBSD__)  || defined (__OpenBSD__) || defined (__NetBSD__) || defined (__APPLE__)
++#if defined (__FreeBSD__)  || defined (__OpenBSD__) || defined (__NetBSD__) || defined (__APPLE__) || !defined(__GLIBC__)
+ /* Provide the Linux initializers for MacOS X */
+ #define PTHREAD_MUTEX_RECURSIVE_NP                                      PTHREAD_MUTEX_RECURSIVE
+ #define PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP           { 0x4d555458, \
+
diff --git a/thirdparty/pcap++/Pcap++/src/PcapLiveDevice.cpp b/thirdparty/pcap++/Pcap++/src/PcapLiveDevice.cpp
index ceefab0..7d63a1e 100644
--- a/thirdparty/pcap++/Pcap++/src/PcapLiveDevice.cpp
+++ b/thirdparty/pcap++/Pcap++/src/PcapLiveDevice.cpp
@@ -22,13 +22,18 @@
 #else
 #include <arpa/inet.h>
 #include <sys/ioctl.h>
-#include <sys/sysctl.h>
 #include <net/if.h>
 #endif
 #ifdef MAC_OS_X
 #include <net/if_dl.h>
 #endif
 
+#if defined(__linux__) && !defined(__GLIBC__)
+#include <linux/sysctl.h>
+#elif !defined(WIN32) && !defined(WINx64)
+#include <sys/sysctl.h>
+#endif
+
 // On Mac OS X timeout of -1 causes pcap_open_live to fail so value of 1ms is set here.
 // On Linux and Windows this is not the case so we keep the -1 value
 #ifdef MAC_OS_X
@@ -292,7 +297,7 @@ void PcapLiveDevice::close()
 	pcap_close(m_PcapDescriptor);
 	LOG_DEBUG("Receive pcap descriptor closed");
 	if (!sameDescriptor)
-	{ 
+	{
 		pcap_close(m_PcapSendDescriptor);
 		LOG_DEBUG("Send pcap descriptor closed");
 	}

[nifi-minifi-cpp] 05/06: MINIFICPP-1595 Pin pip package versions in requirements.txt

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit dd42a049f18f7d854c2c810a0b786539fa250a83
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Jun 23 18:42:36 2021 +0200

    MINIFICPP-1595 Pin pip package versions in requirements.txt
    
    Closes #1115
    
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 docker/DockerVerify.sh  | 8 +-------
 docker/requirements.txt | 6 ++++++
 2 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/docker/DockerVerify.sh b/docker/DockerVerify.sh
index 7361a2e..cbe2b82 100755
--- a/docker/DockerVerify.sh
+++ b/docker/DockerVerify.sh
@@ -57,13 +57,7 @@ if ! command swig -version &> /dev/null; then
   exit 1
 fi
 
-pip install --upgrade \
-            behave \
-            pytimeparse \
-            docker \
-            PyYAML \
-            m2crypto \
-            watchdog
+pip install -r "${docker_dir}/requirements.txt"
 JAVA_HOME="/usr/lib/jvm/default-jvm"
 export JAVA_HOME
 PATH="$PATH:/usr/lib/jvm/default-jvm/bin"
diff --git a/docker/requirements.txt b/docker/requirements.txt
new file mode 100644
index 0000000..4abac40
--- /dev/null
+++ b/docker/requirements.txt
@@ -0,0 +1,6 @@
+behave==1.2.6
+pytimeparse==1.1.8
+docker==5.0.0
+PyYAML==5.4.1
+m2crypto==0.37.1
+watchdog==2.1.2

[nifi-minifi-cpp] 06/06: MINIFICPP-1494 Allow InvokeHTTP GET requests without incoming flowfile

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 31fd0999ac656f28600a952d0b5aad66365d816b
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Jun 23 18:45:06 2021 +0200

    MINIFICPP-1494 Allow InvokeHTTP GET requests without incoming flowfile
    
    - Refactor HTTPGetIntegrationTest
    - Fix flaky HTTP test
    - Fix InvokeHTTP relationship documentation
    - Add documentation of relationships of InvokeHTTP
    
    Closes #1082
    
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 PROCESSORS.md                                      |   6 +-
 extensions/http-curl/processors/InvokeHTTP.cpp     |   7 +-
 extensions/http-curl/processors/InvokeHTTP.h       |   1 +
 extensions/http-curl/tests/CMakeLists.txt          |   8 +-
 extensions/http-curl/tests/HTTPHandlers.h          |  22 ++++
 .../http-curl/tests/HttpGetIntegrationTest.cpp     | 139 ---------------------
 extensions/http-curl/tests/VerifyInvokeHTTP.h      | 131 +++++++++++++++++++
 .../http-curl/tests/VerifyInvokeHTTPGetTest.cpp    |  63 ++++++++++
 ...keHTTPTest.cpp => VerifyInvokeHTTPPostTest.cpp} | 121 ++----------------
 libminifi/test/resources/TestHTTPGet.yml           |  20 ++-
 libminifi/test/resources/TestHTTPGetSecure.yml     |  19 ++-
 .../test/resources/TestHTTPPostChunkedEncoding.yml |  14 +--
 12 files changed, 273 insertions(+), 278 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 3cbf541..4fe1f06 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -728,7 +728,11 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 | Name | Description |
 | - | - |
-|success|All files are routed to success|
+|success|The original FlowFile will be routed upon success (2xx status codes). It will have new attributes detailing the success of the request.|
+|response|A Response FlowFile will be routed upon success (2xx status codes). If the 'Always Output Response' property is true then the response will be sent to this relationship regardless of the status code received.|
+|retry|The original FlowFile will be routed on any status code that can be retried (5xx status codes). It will have new attributes detailing the request.|
+|no retry|The original FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes). It will have new attributes detailing the request.|
+|failure|The original FlowFile will be routed on any type of connection failure, timeout or general exception. It will have new attributes detailing the request.|
 
 
 ## ListSFTP
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index 9ac3c07..34b4db1 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -123,9 +123,12 @@ const char* InvokeHTTP::REMOTE_DN = "invokehttp.remote.dn";
 const char* InvokeHTTP::EXCEPTION_CLASS = "invokehttp.java.exception.class";
 const char* InvokeHTTP::EXCEPTION_MESSAGE = "invokehttp.java.exception.message";
 
-core::Relationship InvokeHTTP::Success("success", "All files are routed to success");
+core::Relationship InvokeHTTP::Success("success", "The original FlowFile will be routed upon success (2xx status codes). "
+                                       "It will have new attributes detailing the success of the request.");
 
-core::Relationship InvokeHTTP::RelResponse("response", "Represents a response flowfile");
+core::Relationship InvokeHTTP::RelResponse("response", "A Response FlowFile will be routed upon success (2xx status codes). "
+                                           "If the 'Always Output Response' property is true then the response will be sent "
+                                           "to this relationship regardless of the status code received.");
 
 core::Relationship InvokeHTTP::RelRetry("retry", "The original FlowFile will be routed on any status code that can be retried "
                                         "(5xx status codes). It will have new attributes detailing the request.");
diff --git a/extensions/http-curl/processors/InvokeHTTP.h b/extensions/http-curl/processors/InvokeHTTP.h
index 99f54a0..766729f 100644
--- a/extensions/http-curl/processors/InvokeHTTP.h
+++ b/extensions/http-curl/processors/InvokeHTTP.h
@@ -47,6 +47,7 @@ class InvokeHTTP : public core::Processor {
    */
   explicit InvokeHTTP(const std::string& name, const utils::Identifier& uuid = {})
       : Processor(name, uuid) {
+    setTriggerWhenEmpty(true);
   }
   // Destructor
   virtual ~InvokeHTTP();
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index 025392d..d31e400 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -66,7 +66,7 @@ ENDFOREACH()
 
 message("-- Finished building ${CURL_INT_TEST_COUNT} libcURL integration test file(s)...")
 
-add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
+add_test(NAME VerifyInvokeHTTPGetTest COMMAND VerifyInvokeHTTPGetTest "${TEST_RESOURCES}/TestHTTPGet.yml")
 add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2FetchFlowIfMissingTest COMMAND C2FetchFlowIfMissingTest "${TEST_RESOURCES}/TestEmpty.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2ConfigEncryption COMMAND C2ConfigEncryption "${TEST_RESOURCES}/decrypted.config.yml"  "${TEST_RESOURCES}/")
@@ -77,9 +77,9 @@ add_test(NAME C2FailedUpdateTest COMMAND C2FailedUpdateTest "${TEST_RESOURCES}/T
 add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2RequestClassTest COMMAND C2RequestClassTest)
 if (NOT OPENSSL_OFF)
-	add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml"  "${TEST_RESOURCES}/")
+	add_test(NAME VerifyInvokeHTTPGetTestSecure COMMAND VerifyInvokeHTTPGetTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml"  "${TEST_RESOURCES}/")
 	add_test(NAME C2VerifyHeartbeatAndStopSecure COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/C2VerifyHeartbeatAndStopSecure.yml" "${TEST_RESOURCES}/")
-	add_test(NAME VerifyInvokeHTTPTestSecure COMMAND VerifyInvokeHTTPTest "${TEST_RESOURCES}/TestInvokeHTTPPostSecure.yml" "${TEST_RESOURCES}/")
+	add_test(NAME VerifyInvokeHTTPPostTestSecure COMMAND VerifyInvokeHTTPPostTest "${TEST_RESOURCES}/TestInvokeHTTPPostSecure.yml" "${TEST_RESOURCES}/")
 endif()
 add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" "${TEST_RESOURCES}/")
 if (NOT APPLE)
@@ -92,7 +92,7 @@ add_test(NAME HTTPSiteToSiteTests COMMAND HTTPSiteToSiteTests "${TEST_RESOURCES}
 add_test(NAME TimeoutHTTPSiteToSiteTests COMMAND TimeoutHTTPSiteToSiteTests "${TEST_RESOURCES}/TestTimeoutHTTPSiteToSite.yml" "${TEST_RESOURCES}/" "http://localhost:8098/nifi-api")
 add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8077/nifi-api/site-to-site")
 add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/")
-add_test(NAME VerifyInvokeHTTPTest COMMAND VerifyInvokeHTTPTest "${TEST_RESOURCES}/TestInvokeHTTPPost.yml")
+add_test(NAME VerifyInvokeHTTPPostTest COMMAND VerifyInvokeHTTPPostTest "${TEST_RESOURCES}/TestInvokeHTTPPost.yml")
 add_test(NAME AbsoluteTimeoutTest COMMAND AbsoluteTimeoutTest)
 add_test(NAME C2PauseResumeTest COMMAND C2PauseResumeTest "${TEST_RESOURCES}/C2PauseResumeTest.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2LogHeartbeatTest COMMAND C2LogHeartbeatTest)
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index e797b37..8cdc2d3 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -624,3 +624,25 @@ class TimeoutingHTTPHandler : public ServerAwareHandler {
   }
   std::vector<std::chrono::milliseconds> wait_times_;
 };
+
+class HttpGetResponder : public ServerAwareHandler {
+ public:
+  bool handleGet(CivetServer* /*server*/, struct mg_connection *conn) override {
+    puts("handle get");
+    static const std::string site2site_rest_resp = "hi this is a get test";
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+              site2site_rest_resp.length());
+    mg_printf(conn, "%s", site2site_rest_resp.c_str());
+    return true;
+  }
+};
+
+class RetryHttpGetResponder : public ServerAwareHandler {
+ public:
+  bool handleGet(CivetServer* /*server*/, struct mg_connection *conn) override {
+    puts("handle get with retry");
+    mg_printf(conn, "HTTP/1.1 501 Not Implemented\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+    return true;
+  }
+};
diff --git a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
deleted file mode 100644
index 1cd4b5b..0000000
--- a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#define CURLOPT_SSL_VERIFYPEER_DISABLE 1
-#undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <memory>
-#include <string>
-#include <vector>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
-#include "TestServer.h"
-#include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "processors/LogAttribute.h"
-#include "integration/IntegrationBase.h"
-#include "utils/IntegrationTestUtils.h"
-
-int log_message(const struct mg_connection* /*conn*/, const char *message) {
-  puts(message);
-  return 1;
-}
-
-int ssl_enable(void* /*ssl_context*/, void* /*user_data*/) {
-  puts("Enable ssl");
-  return 0;
-}
-
-class HttpResponder : public CivetHandler {
- private:
- public:
-  bool handleGet(CivetServer* /*server*/, struct mg_connection *conn) override {
-    puts("handle get");
-    static const std::string site2site_rest_resp = "hi this is a get test";
-    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
-              site2site_rest_resp.length());
-    mg_printf(conn, "%s", site2site_rest_resp.c_str());
-    return true;
-  }
-};
-
-int main(int argc, char **argv) {
-  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
-  const cmd_args args = parse_cmdline_args(argc, argv);
-
-  LogTestController::getInstance().setDebug<core::Processor>();
-  LogTestController::getInstance().setDebug<core::ProcessSession>();
-  LogTestController::getInstance().setDebug<utils::HTTPClient>();
-  LogTestController::getInstance().setDebug<minifi::controllers::SSLContextService>();
-  LogTestController::getInstance().setDebug<minifi::processors::InvokeHTTP>();
-  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
-
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  configuration->set(minifi::Configure::nifi_default_directory, args.key_dir);
-
-  std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
-  std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
-
-  configuration->set(minifi::Configure::nifi_flow_configuration_file, args.test_file);
-
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
-
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-
-  content_repo->initialize(configuration);
-
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
-      new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file));
-  std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
-
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(
-      test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true);
-
-  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file);
-
-  std::shared_ptr<core::Processor> proc = yaml_config.getRoot()->findProcessorByName("invoke");
-  assert(proc != nullptr);
-
-  const auto inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
-  assert(inv != nullptr);
-
-  std::string url;
-  inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
-  HttpResponder h_ex;
-  std::string port, scheme, path;
-  std::unique_ptr<TestServer> server;
-  parse_http_components(url, port, scheme, path);
-  CivetCallbacks callback{};
-  if (scheme == "https") {
-    std::string cert;
-    cert = args.key_dir + "nifi-cert.pem";
-    callback.init_ssl = ssl_enable;
-    std::string https_port = port + "s";
-    callback.log_message = log_message;
-    server = utils::make_unique<TestServer>(https_port, path, &h_ex, &callback, cert, cert);
-  } else {
-    server = utils::make_unique<TestServer>(port, path, &h_ex);
-  }
-  controller->load();
-  controller->start();
-
-  assert(verifyLogLinePresenceInPollTime(
-      std::chrono::seconds(10),
-      "key:invokehttp.request.url value:" + url,
-      "key:invokehttp.status.code value:200",
-      "key:flow.id"));
-
-  controller->waitUnload(60000);
-  if (url.find("localhost") == std::string::npos) {
-    server.reset();
-    exit(1);
-  }
-
-  LogTestController::getInstance().reset();
-  return 0;
-}
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTP.h b/extensions/http-curl/tests/VerifyInvokeHTTP.h
new file mode 100644
index 0000000..6546173
--- /dev/null
+++ b/extensions/http-curl/tests/VerifyInvokeHTTP.h
@@ -0,0 +1,131 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#undef NDEBUG
+
+#include <memory>
+#include <utility>
+#include <string>
+
+#include "TestBase.h"
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "processors/LogAttribute.h"
+#include "core/state/ProcessorController.h"
+#include "HTTPIntegrationBase.h"
+#include "utils/GeneralUtils.h"
+
+class VerifyInvokeHTTP : public HTTPIntegrationBase {
+ public:
+  VerifyInvokeHTTP()
+      : HTTPIntegrationBase(6000) {
+  }
+
+  void testSetup() override {
+    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+    LogTestController::getInstance().setDebug<LogTestController>();
+    LogTestController::getInstance().setTrace<minifi::processors::InvokeHTTP>();
+    LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setDebug<core::Processor>();
+    LogTestController::getInstance().setDebug<core::ProcessSession>();
+  }
+
+  void setUrl(const std::string &url, ServerAwareHandler *handler) override {
+    if (path_) {
+      throw std::logic_error("Url is already set");
+    }
+    std::string port, scheme, path;
+    parse_http_components(url, port, scheme, path);
+    path_ = path;
+    HTTPIntegrationBase::setUrl(url, handler);
+  }
+
+  void setProperties(const std::shared_ptr<core::Processor>& proc) {
+    std::string url = scheme + "://localhost:" + getWebPort() + *path_;
+    proc->setProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+  }
+
+  void setProperty(const std::string& property, const std::string& value) {
+    const auto components = flowController_->getComponents("InvokeHTTP");
+    assert(!components.empty());
+
+    const auto stateController = components[0];
+    assert(stateController);
+    const auto processorController = std::dynamic_pointer_cast<minifi::state::ProcessorController>(stateController);
+    assert(processorController);
+    auto proc = processorController->getProcessor();
+    proc->setProperty(property, value);
+  }
+
+  virtual void setupFlow(const utils::optional<std::string>& flow_yml_path) {
+    testSetup();
+
+    std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
+    std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
+
+    if (flow_yml_path) {
+      configuration->set(minifi::Configure::nifi_flow_configuration_file, *flow_yml_path);
+    }
+    configuration->set("c2.agent.heartbeat.period", "200");
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    content_repo->initialize(configuration);
+    std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
+    std::unique_ptr<core::FlowConfiguration> yaml_ptr =
+      minifi::utils::make_unique<core::YamlConfiguration>(test_repo, test_repo, content_repo, stream_factory, configuration, flow_yml_path);
+
+    flowController_ = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true);
+    flowController_->load();
+
+    std::string url = scheme + "://localhost:" + getWebPort() + *path_;
+    setProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+  }
+
+  void run(const utils::optional<std::string>& flow_yml_path = {}, const utils::optional<std::string>& = {}) override {
+    setupFlow(flow_yml_path);
+    startFlowController();
+
+    runAssertions();
+
+    shutdownBeforeFlowController();
+    stopFlowController();
+  }
+
+  void run(const std::string& url,
+           const std::string& test_file_location,
+           const std::string& key_dir,
+           ServerAwareHandler* handler) {
+    setKeyDir(key_dir);
+    setUrl(url, handler);
+    run(test_file_location);
+  }
+
+  void startFlowController() {
+    flowController_->start();
+  }
+
+  void stopFlowController() {
+    flowController_->unload();
+    flowController_->stopC2();
+
+    cleanup();
+  }
+
+ private:
+  utils::optional<std::string> path_;
+};
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTPGetTest.cpp b/extensions/http-curl/tests/VerifyInvokeHTTPGetTest.cpp
new file mode 100644
index 0000000..3af0c06
--- /dev/null
+++ b/extensions/http-curl/tests/VerifyInvokeHTTPGetTest.cpp
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "VerifyInvokeHTTP.h"
+
+#include "HTTPHandlers.h"
+#include "utils/IntegrationTestUtils.h"
+
+#define CURLOPT_SSL_VERIFYPEER_DISABLE 1
+
+class VerifyHTTPGet : public VerifyInvokeHTTP {
+ public:
+  virtual void runAssertions() {
+    assert(org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime(
+        std::chrono::seconds(10),
+        "key:invokehttp.status.code value:200",
+        "key:flow.id"));
+  }
+};
+
+class VerifyRetryHTTPGet : public VerifyInvokeHTTP {
+ public:
+  void runAssertions() override {
+    assert(org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime(
+        std::chrono::seconds(10),
+        "isSuccess: 0, response code 501"));
+    assert(org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime(
+        std::chrono::seconds(10),
+        "from InvokeHTTP to relationship retry"));
+  }
+};
+
+int main(int argc, char **argv) {
+  const cmd_args args = parse_cmdline_args(argc, argv);
+
+  {
+    HttpGetResponder http_handler;
+    VerifyHTTPGet harness;
+    harness.run(args.url, args.test_file, args.key_dir, &http_handler);
+  }
+
+  {
+    RetryHttpGetResponder http_handler;
+    VerifyRetryHTTPGet harness;
+    harness.run(args.url, args.test_file, args.key_dir, &http_handler);
+  }
+
+  return 0;
+}
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp b/extensions/http-curl/tests/VerifyInvokeHTTPPostTest.cpp
similarity index 52%
rename from extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
rename to extensions/http-curl/tests/VerifyInvokeHTTPPostTest.cpp
index 6fa4713..7090346 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
+++ b/extensions/http-curl/tests/VerifyInvokeHTTPPostTest.cpp
@@ -15,107 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "VerifyInvokeHTTP.h"
 
-#undef NDEBUG
-#include "TestBase.h"
 #include "HTTPHandlers.h"
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
-#include "processors/LogAttribute.h"
-#include "core/state/ProcessorController.h"
-
-#include "CivetServer.h"
-#include "HTTPIntegrationBase.h"
 #include "utils/IntegrationTestUtils.h"
 
-class VerifyInvokeHTTP : public HTTPIntegrationBase {
- public:
-  VerifyInvokeHTTP()
-      : HTTPIntegrationBase(6000) {
-  }
-
-  void testSetup() override {
-    LogTestController::getInstance().setDebug<utils::HTTPClient>();
-    LogTestController::getInstance().setDebug<LogTestController>();
-    LogTestController::getInstance().setTrace<minifi::processors::InvokeHTTP>();
-    LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>();
-  }
-
-  void setUrl(const std::string &url, ServerAwareHandler *handler) override {
-    if (path_) {
-      throw std::logic_error("Url is already set");
-    }
-    std::string port, scheme, path;
-    parse_http_components(url, port, scheme, path);
-    path_ = path;
-    HTTPIntegrationBase::setUrl(url, handler);
-  }
-
-  void setProperties(std::shared_ptr<core::Processor> proc) {
-    std::string url = scheme + "://localhost:" + getWebPort() + *path_;
-    proc->setProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
-  }
-
-  void setProperty(const std::string& property, const std::string& value) {
-    const auto components = flowController_->getComponents("InvokeHTTP");
-    assert(!components.empty());
-
-    const auto stateController = components.at(0);
-    assert(stateController);
-    const auto processorController = std::dynamic_pointer_cast<minifi::state::ProcessorController>(stateController);
-    assert(processorController);
-    auto proc = processorController->getProcessor();
-    proc->setProperty(property, value);
-  }
-
-  virtual void setupFlow(const utils::optional<std::string>& flow_yml_path) {
-    testSetup();
-
-    std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
-    std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
-
-    if (flow_yml_path) {
-      configuration->set(minifi::Configure::nifi_flow_configuration_file, *flow_yml_path);
-    }
-    configuration->set("c2.agent.heartbeat.period", "200");
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(configuration);
-    std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
-    std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
-        new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, flow_yml_path));
-
-    flowController_ = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true);
-    flowController_->load();
-
-    std::string url = scheme + "://localhost:" + getWebPort() + *path_;
-    setProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
-  }
-
-  void run(const utils::optional<std::string>& flow_yml_path = {}, const utils::optional<std::string>& = {}) override {
-    setupFlow(flow_yml_path);
-    startFlowController();
-
-    runAssertions();
-
-    shutdownBeforeFlowController();
-    stopFlowController();
-  }
-
-  void startFlowController() {
-    flowController_->start();
-  }
-
-  void stopFlowController() {
-    flowController_->unload();
-    flowController_->stopC2();
-
-    cleanup();
-  }
-
- private:
-  utils::optional<std::string> path_;
-};
-
 class VerifyInvokeHTTPOKResponse : public VerifyInvokeHTTP {
  public:
   void runAssertions() override {
@@ -189,17 +93,6 @@ class VerifyRWTimeoutInvokeHTTP : public VerifyInvokeHTTP {
   }
 };
 
-void run(VerifyInvokeHTTP& harness,
-    const std::string& url,
-    const std::string& test_file_location,
-    const std::string& key_dir,
-    ServerAwareHandler * handler) {
-
-  harness.setKeyDir(key_dir);
-  harness.setUrl(url, handler);
-  harness.run(test_file_location);
-}
-
 int main(int argc, char ** argv) {
   const cmd_args args = parse_cmdline_args(argc, argv);
 
@@ -220,37 +113,37 @@ int main(int argc, char ** argv) {
   {
     InvokeHTTPResponseOKHandler handler;
     VerifyInvokeHTTPOKResponse harness;
-    run(harness, args.url, args.test_file, args.key_dir, &handler);
+    harness.run(args.url, args.test_file, args.key_dir, &handler);
   }
 
   {
     InvokeHTTPRedirectHandler handler;
     VerifyInvokeHTTPOK200Response harness;
-    run(harness, args.url, args.test_file, args.key_dir, &handler);
+    harness.run(args.url, args.test_file, args.key_dir, &handler);
   }
 
   {
     InvokeHTTPRedirectHandler handler;
     VerifyInvokeHTTPRedirectResponse harness;
-    run(harness, args.url, args.test_file, args.key_dir, &handler);
+    harness.run(args.url, args.test_file, args.key_dir, &handler);
   }
 
   {
     InvokeHTTPResponse404Handler handler;
     VerifyNoRetryInvokeHTTP harness;
-    run(harness, args.url, args.test_file, args.key_dir, &handler);
+    harness.run(args.url, args.test_file, args.key_dir, &handler);
   }
 
   {
     InvokeHTTPResponse501Handler handler;
     VerifyRetryInvokeHTTP harness;
-    run(harness, args.url, args.test_file, args.key_dir, &handler);
+    harness.run(args.url, args.test_file, args.key_dir, &handler);
   }
 
   {
     TimeoutingHTTPHandler handler({std::chrono::seconds(2)});
     VerifyRWTimeoutInvokeHTTP harness;
-    run(harness, args.url, args.test_file, args.key_dir, &handler);
+    harness.run(args.url, args.test_file, args.key_dir, &handler);
   }
 
   return 0;
diff --git a/libminifi/test/resources/TestHTTPGet.yml b/libminifi/test/resources/TestHTTPGet.yml
index fc4a07f..bfddd3c 100644
--- a/libminifi/test/resources/TestHTTPGet.yml
+++ b/libminifi/test/resources/TestHTTPGet.yml
@@ -20,7 +20,7 @@ Flow Controller:
     name: MiNiFi Flow
     id: 2438e3c8-015a-1000-79ca-83af40ec1990
 Processors:
-    - name: invoke
+    - name: InvokeHTTP
       id: 2438e3c8-015a-1000-79ca-83af40ec1991
       class: org.apache.nifi.processors.standard.InvokeHTTP
       max concurrent tasks: 1
@@ -30,7 +30,6 @@ Processors:
       yield period: 1 sec
       run duration nanos: 0
       auto-terminated relationships list:
-          - retry
           - no retry
           - response
           - failure
@@ -54,7 +53,7 @@ Processors:
 Connections:
     - name: TransferFilesToRPG
       id: 2438e3c8-015a-1000-79ca-83af40ec1997
-      source name: invoke
+      source name: InvokeHTTP
       source id: 2438e3c8-015a-1000-79ca-83af40ec1991
       source relationship name: success
       destination name: LogAttribute
@@ -67,11 +66,22 @@ Connections:
       source name: LogAttribute
       source id: 2438e3c8-015a-1000-79ca-83af40ec1992
       destination name: LogAttribute
-      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992  
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
       source relationship name: success
       max work queue size: 0
       max work queue data size: 1 MB
       flowfile expiration: 60 sec
+    - name: RetryInvokeHTTP
+      id: c8a96f1d-48cd-4ee3-8a2f-081c3e3c7bcd
+      source name: InvokeHTTP
+      source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      source relationship names: retry
+      destination name: InvokeHTTP
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 seconds
+
 
 Remote Processing Groups:
-    
+
diff --git a/libminifi/test/resources/TestHTTPGetSecure.yml b/libminifi/test/resources/TestHTTPGetSecure.yml
index d90770d..194c4ff 100644
--- a/libminifi/test/resources/TestHTTPGetSecure.yml
+++ b/libminifi/test/resources/TestHTTPGetSecure.yml
@@ -20,7 +20,7 @@ Flow Controller:
     name: MiNiFi Flow
     id: 2438e3c8-015a-1000-79ca-83af40ec1990
 Processors:
-    - name: invoke
+    - name: InvokeHTTP
       id: 2438e3c8-015a-1000-79ca-83af40ec1991
       class: org.apache.nifi.processors.standard.InvokeHTTP
       max concurrent tasks: 1
@@ -31,7 +31,6 @@ Processors:
       run duration nanos: 0
       auto-terminated relationships list:
           - failure
-          - retry
           - no retry
           - response
       Properties:
@@ -56,7 +55,7 @@ Processors:
 Connections:
     - name: TransferFilesToRPG
       id: 2438e3c8-015a-1000-79ca-83af40ec1997
-      source name: invoke
+      source name: InvokeHTTP
       source id: 2438e3c8-015a-1000-79ca-83af40ec1991
       source relationship name: success
       destination name: LogAttribute
@@ -69,11 +68,21 @@ Connections:
       source name: LogAttribute
       source id: 2438e3c8-015a-1000-79ca-83af40ec1992
       destination name: LogAttribute
-      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992  
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
       source relationship name: success
       max work queue size: 0
       max work queue data size: 1 MB
       flowfile expiration: 60 sec
+    - name: RetryInvokeHTTP
+      id: c8a96f1d-48cd-4ee3-8a2f-081c3e3c7bcd
+      source name: InvokeHTTP
+      source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      source relationship names: retry
+      destination name: InvokeHTTP
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 seconds
 
 Controller Services:
     - name: SSLContextService
@@ -90,4 +99,4 @@ Controller Services:
             - value: nifi-cert.pem
 
 Remote Processing Groups:
-    
+
diff --git a/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml b/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml
index 993a289..532771e 100644
--- a/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml
+++ b/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml
@@ -48,12 +48,11 @@ Processors:
       id: 2438e3c8-015a-1000-79ca-83af40ec1992
       class: org.apache.nifi.processors.standard.InvokeHTTP
       max concurrent tasks: 1
-      scheduling strategy: TIMER_DRIVEN
-      scheduling period: 1 sec
+      scheduling strategy: EVENT_DRIVEN
       penalization period: 30 sec
       yield period: 1 sec
       run duration nanos: 0
-      auto-terminated relationships list: 
+      auto-terminated relationships list:
           - success
           - retry
           - failure
@@ -68,17 +67,16 @@ Processors:
       id: 2438e3c8-015a-1000-79ca-83af40ec1993
       class: org.apache.nifi.processors.standard.LogAttribute
       max concurrent tasks: 1
-      scheduling strategy: TIMER_DRIVEN
-      scheduling period: 1 sec
+      scheduling strategy: EVENT_DRIVEN
       penalization period: 30 sec
       yield period: 1 sec
       run duration nanos: 0
-      auto-terminated relationships list: 
+      auto-terminated relationships list:
           - success
       Properties:
           LogLevel: debug
 
-Connections:    
+Connections:
     - name: GenerateFlowFile/Invoke
       id: 2438e3c8-015a-1000-79ca-83af40ec1997
       source name: invoke
@@ -98,4 +96,4 @@ Connections:
       max work queue data size: 1 MB
       flowfile expiration: 60 sec
 Remote Processing Groups:
-    
+

[nifi-minifi-cpp] 04/06: MINIFICPP-1567 enable linter checks in extensions (part 3)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 89f1713f7440a71bd2e4804e1b27b7f6291f6c62
Author: Martin Zink <ma...@protonmail.com>
AuthorDate: Wed Jun 23 18:16:37 2021 +0200

    MINIFICPP-1567 enable linter checks in extensions (part 3)
    
    extensions: libarchive, mqtt, opc, opencv, openwsman, pcap
    
    Closes #1103
    
    Signed-off-by: Marton Szasz <sz...@apache.org>
    Co-authored-by: Ferenc Gerlits <fg...@users.noreply.github.com>
---
 extensions/libarchive/ArchiveLoader.h              |   8 +-
 extensions/libarchive/ArchiveMetadata.cpp          | 215 ++++++++++-----------
 extensions/libarchive/ArchiveMetadata.h            | 100 +++++-----
 extensions/libarchive/ArchiveTests.h               |   7 +-
 extensions/libarchive/BinFiles.cpp                 |   3 +-
 extensions/libarchive/BinFiles.h                   |  14 +-
 extensions/libarchive/CMakeLists.txt               |   4 +-
 extensions/libarchive/CompressContent.cpp          |   3 -
 extensions/libarchive/CompressContent.h            |  23 +--
 extensions/libarchive/FocusArchiveEntry.cpp        |   6 +-
 extensions/libarchive/FocusArchiveEntry.h          |   8 +-
 extensions/libarchive/ManipulateArchive.cpp        |  11 +-
 extensions/libarchive/ManipulateArchive.h          |  17 +-
 extensions/libarchive/MergeContent.cpp             |   9 +-
 extensions/libarchive/MergeContent.h               |  13 +-
 extensions/libarchive/UnfocusArchiveEntry.cpp      |   6 +-
 extensions/libarchive/UnfocusArchiveEntry.h        |   7 +-
 extensions/mqtt/CMakeLists.txt                     |   4 +-
 extensions/mqtt/MQTTLoader.h                       |   8 +-
 .../controllerservice/MQTTControllerService.cpp    |   2 +-
 .../mqtt/controllerservice/MQTTControllerService.h |  21 +-
 extensions/mqtt/processors/AbstractMQTTProcessor.h |  16 +-
 extensions/mqtt/processors/ConsumeMQTT.h           |   8 +-
 extensions/mqtt/processors/ConvertBase.h           |  11 +-
 extensions/mqtt/processors/ConvertHeartBeat.cpp    |   2 +
 extensions/mqtt/processors/ConvertHeartBeat.h      |  21 +-
 extensions/mqtt/processors/ConvertJSONAck.h        |  17 +-
 extensions/mqtt/processors/ConvertUpdate.cpp       |   6 +
 extensions/mqtt/processors/ConvertUpdate.h         |  20 +-
 extensions/mqtt/processors/PublishMQTT.h           |   9 +-
 extensions/mqtt/protocol/MQTTC2Protocol.cpp        |   3 +-
 extensions/mqtt/protocol/MQTTC2Protocol.h          |   6 +-
 extensions/opc/CMakeLists.txt                      |   1 +
 extensions/opc/include/fetchopc.h                  |  20 +-
 extensions/opc/include/opc.h                       |  19 +-
 extensions/opc/include/opcbase.h                   |  10 +-
 extensions/opc/include/putopc.h                    |  20 +-
 extensions/opc/src/fetchopc.cpp                    |   4 +-
 extensions/opc/src/opc.cpp                         | 148 +++++++-------
 extensions/opencv/CMakeLists.txt                   |   3 +-
 extensions/opencv/CaptureRTSPFrame.cpp             |  20 +-
 extensions/opencv/CaptureRTSPFrame.h               |  16 +-
 extensions/opencv/FrameIO.h                        |  73 +++----
 extensions/opencv/MotionDetector.cpp               |   5 +
 extensions/opencv/MotionDetector.h                 |  26 +--
 extensions/opencv/OpenCVLoader.h                   |  10 +-
 extensions/opencv/tests/CaptureRTSPFrameTest.cpp   |   2 +-
 extensions/openwsman/CMakeLists.txt                |   1 +
 .../SourceInitiatedSubscriptionListener.h          |   5 +-
 extensions/pcap/CMakeLists.txt                     |   4 +-
 extensions/pcap/CapturePacket.cpp                  |  13 +-
 extensions/pcap/CapturePacket.h                    |  15 +-
 extensions/pcap/PcapLoader.h                       |   8 +-
 libminifi/include/io/validation.h                  |   1 +
 .../test/archive-tests/ManipulateArchiveTests.cpp  |   4 +-
 55 files changed, 495 insertions(+), 541 deletions(-)

diff --git a/extensions/libarchive/ArchiveLoader.h b/extensions/libarchive/ArchiveLoader.h
index 79eed4e..2a71aeb 100644
--- a/extensions/libarchive/ArchiveLoader.h
+++ b/extensions/libarchive/ArchiveLoader.h
@@ -15,8 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSION_ARCHIVELOADER_H
-#define EXTENSION_ARCHIVELOADER_H
+#pragma once
+
+#include <vector>
+#include <string>
+#include <memory>
 
 #include "MergeContent.h"
 #include "CompressContent.h"
@@ -76,4 +79,3 @@ class ArchiveFactory : public core::ObjectFactory {
 extern "C" {
 void *createArchiveFactory(void);
 }
-#endif /* EXTENSION_ARCHIVELOADER_H */
diff --git a/extensions/libarchive/ArchiveMetadata.cpp b/extensions/libarchive/ArchiveMetadata.cpp
index 69af90a..7b7a527 100644
--- a/extensions/libarchive/ArchiveMetadata.cpp
+++ b/extensions/libarchive/ArchiveMetadata.cpp
@@ -20,10 +20,8 @@
 
 #include "ArchiveMetadata.h"
 
-#include <archive.h>
 #include <archive_entry.h>
 
-#include <list>
 #include <string>
 #include <algorithm>
 #include <iostream>
@@ -32,179 +30,180 @@
 #include "utils/gsl.h"
 #include "Exception.h"
 
+#include "rapidjson/writer.h"
+#include "rapidjson/error/en.h"
+
 using org::apache::nifi::minifi::Exception;
 using org::apache::nifi::minifi::ExceptionType;
 
 rapidjson::Value ArchiveEntryMetadata::toJson(rapidjson::Document::AllocatorType &alloc) const {
-    rapidjson::Value entryVal(rapidjson::kObjectType);
+  rapidjson::Value entryVal(rapidjson::kObjectType);
 
-    rapidjson::Value entryNameVal;
-    entryNameVal.SetString(entryName.c_str(), gsl::narrow<rapidjson::SizeType>(entryName.length()));
-    entryVal.AddMember("entry_name", entryNameVal, alloc);
+  rapidjson::Value entryNameVal;
+  entryNameVal.SetString(entryName.c_str(), gsl::narrow<rapidjson::SizeType>(entryName.length()));
+  entryVal.AddMember("entry_name", entryNameVal, alloc);
 
-    entryVal.AddMember("entry_type", entryType, alloc);
-    entryVal.AddMember("entry_perm", entryPerm, alloc);
-    entryVal.AddMember("entry_size", entrySize, alloc);
-    entryVal.AddMember("entry_uid", entryUID, alloc);
-    entryVal.AddMember("entry_gid", entryGID, alloc);
-    entryVal.AddMember("entry_mtime", entryMTime, alloc);
-    entryVal.AddMember("entry_mtime_nsec", entryMTimeNsec, alloc);
+  entryVal.AddMember("entry_type", entryType, alloc);
+  entryVal.AddMember("entry_perm", entryPerm, alloc);
+  entryVal.AddMember("entry_size", entrySize, alloc);
+  entryVal.AddMember("entry_uid", entryUID, alloc);
+  entryVal.AddMember("entry_gid", entryGID, alloc);
+  entryVal.AddMember("entry_mtime", entryMTime, alloc);
+  entryVal.AddMember("entry_mtime_nsec", entryMTimeNsec, alloc);
 
-    if (entryType == AE_IFREG) {
-        rapidjson::Value stashKeyVal;
-        stashKeyVal.SetString(stashKey.c_str(), gsl::narrow<rapidjson::SizeType>(stashKey.length()));
-        entryVal.AddMember("stash_key", stashKeyVal, alloc);
-    }
+  if (entryType == AE_IFREG) {
+    rapidjson::Value stashKeyVal;
+    stashKeyVal.SetString(stashKey.c_str(), gsl::narrow<rapidjson::SizeType>(stashKey.length()));
+    entryVal.AddMember("stash_key", stashKeyVal, alloc);
+  }
 
-    return entryVal;
+  return entryVal;
 }
 
 void ArchiveEntryMetadata::loadJson(const rapidjson::Value& entryVal) {
-    entryName.assign(entryVal["entry_name"].GetString());
-    entryType = gsl::narrow<mode_t>(entryVal["entry_type"].GetUint64());
-    entryPerm = gsl::narrow<mode_t>(entryVal["entry_perm"].GetUint64());
-    entrySize = entryVal["entry_size"].GetUint64();
-    entryUID = gsl::narrow<uid_t>(entryVal["entry_uid"].GetUint64());
-    entryGID = entryVal["entry_gid"].GetUint64();
-    entryMTime = entryVal["entry_mtime"].GetUint64();
-    entryMTimeNsec = entryVal["entry_mtime_nsec"].GetInt64();
+  entryName.assign(entryVal["entry_name"].GetString());
+  entryType = gsl::narrow<mode_t>(entryVal["entry_type"].GetUint64());
+  entryPerm = gsl::narrow<mode_t>(entryVal["entry_perm"].GetUint64());
+  entrySize = entryVal["entry_size"].GetUint64();
+  entryUID = gsl::narrow<uid_t>(entryVal["entry_uid"].GetUint64());
+  entryGID = entryVal["entry_gid"].GetUint64();
+  entryMTime = entryVal["entry_mtime"].GetUint64();
+  entryMTimeNsec = entryVal["entry_mtime_nsec"].GetInt64();
 
-    if (entryType == AE_IFREG)
-        stashKey.assign(entryVal["stash_key"].GetString());
+  if (entryType == AE_IFREG)
+    stashKey.assign(entryVal["stash_key"].GetString());
 }
 
 ArchiveEntryMetadata ArchiveEntryMetadata::fromJson(const rapidjson::Value& entryVal) {
-    ArchiveEntryMetadata aem;
-    aem.loadJson(entryVal);
-    return aem;
+  ArchiveEntryMetadata aem;
+  aem.loadJson(entryVal);
+  return aem;
 }
 
 ArchiveEntryIterator ArchiveMetadata::find(const std::string& name) {
-    auto targetTest = [&](const ArchiveEntryMetadata& entry) -> bool {
-        return entry.entryName == name;
-    };
+  auto targetTest = [&](const ArchiveEntryMetadata& entry) -> bool {
+    return entry.entryName == name;
+  };
 
-    return std::find_if(entryMetadata.begin(),
-                        entryMetadata.end(),
-                        targetTest);
+  return std::find_if(entryMetadata.begin(), entryMetadata.end(), targetTest);
 }
 
 ArchiveEntryIterator ArchiveMetadata::eraseEntry(ArchiveEntryIterator position) {
-    return entryMetadata.erase(position);
+  return entryMetadata.erase(position);
 }
 
 ArchiveEntryIterator ArchiveMetadata::insertEntry(
-    ArchiveEntryIterator position, const ArchiveEntryMetadata& entry) {
-    return entryMetadata.insert(position, entry);
+  ArchiveEntryIterator position, const ArchiveEntryMetadata& entry) {
+  return entryMetadata.insert(position, entry);
 }
 
 rapidjson::Value ArchiveMetadata::toJson(rapidjson::Document::AllocatorType &alloc) const {
-    rapidjson::Value structVal(rapidjson::kArrayType);
+  rapidjson::Value structVal(rapidjson::kArrayType);
 
-    for (const auto &entry : entryMetadata) {
-        structVal.PushBack(entry.toJson(alloc), alloc);
-    }
+  for (const auto &entry : entryMetadata) {
+      structVal.PushBack(entry.toJson(alloc), alloc);
+  }
 
-    rapidjson::Value lensVal(rapidjson::kObjectType);
+  rapidjson::Value lensVal(rapidjson::kObjectType);
 
-    rapidjson::Value archiveFormatNameVal;
-    archiveFormatNameVal.SetString(archiveFormatName.c_str(), gsl::narrow<rapidjson::SizeType>(archiveFormatName.length()));
-    lensVal.AddMember("archive_format_name", archiveFormatNameVal, alloc);
+  rapidjson::Value archiveFormatNameVal;
+  archiveFormatNameVal.SetString(archiveFormatName.c_str(), gsl::narrow<rapidjson::SizeType>(archiveFormatName.length()));
+  lensVal.AddMember("archive_format_name", archiveFormatNameVal, alloc);
 
-    lensVal.AddMember("archive_format", archiveFormat, alloc);
-    lensVal.AddMember("archive_structure", structVal, alloc);
+  lensVal.AddMember("archive_format", archiveFormat, alloc);
+  lensVal.AddMember("archive_structure", structVal, alloc);
 
-    if (!archiveName.empty()) {
-        rapidjson::Value archiveNameVal;
-        archiveNameVal.SetString(archiveName.c_str(), gsl::narrow<rapidjson::SizeType>(archiveName.length()));
-        lensVal.AddMember("archive_name", archiveNameVal, alloc);
-    }
+  if (!archiveName.empty()) {
+    rapidjson::Value archiveNameVal;
+    archiveNameVal.SetString(archiveName.c_str(), gsl::narrow<rapidjson::SizeType>(archiveName.length()));
+    lensVal.AddMember("archive_name", archiveNameVal, alloc);
+  }
 
-   rapidjson::Value focusedEntryVal;
-    focusedEntryVal.SetString(focusedEntry.c_str(), gsl::narrow<rapidjson::SizeType>(focusedEntry.length()));
-    lensVal.AddMember("focused_entry", focusedEntryVal, alloc);
+  rapidjson::Value focusedEntryVal;
+  focusedEntryVal.SetString(focusedEntry.c_str(), gsl::narrow<rapidjson::SizeType>(focusedEntry.length()));
+  lensVal.AddMember("focused_entry", focusedEntryVal, alloc);
 
-    return lensVal;
+  return lensVal;
 }
 
 ArchiveMetadata ArchiveMetadata::fromJson(const rapidjson::Value& metadataDoc) {
-    ArchiveMetadata am;
-    am.loadJson(metadataDoc);
-    return am;
+  ArchiveMetadata am;
+  am.loadJson(metadataDoc);
+  return am;
 }
 
 void ArchiveMetadata::loadJson(const rapidjson::Value& metadataDoc) {
-    rapidjson::Value::ConstMemberIterator itr = metadataDoc.FindMember("archive_name");
-    if (itr != metadataDoc.MemberEnd())
-        archiveName.assign(itr->value.GetString());
+  rapidjson::Value::ConstMemberIterator itr = metadataDoc.FindMember("archive_name");
+  if (itr != metadataDoc.MemberEnd())
+    archiveName.assign(itr->value.GetString());
+
+  archiveFormatName.assign(metadataDoc["archive_format_name"].GetString());
+  archiveFormat = gsl::narrow<int>(metadataDoc["archive_format"].GetUint64());
 
-    archiveFormatName.assign(metadataDoc["archive_format_name"].GetString());
-    archiveFormat = gsl::narrow<int>(metadataDoc["archive_format"].GetUint64());
+  focusedEntry = metadataDoc["focused_entry"].GetString();
 
-    focusedEntry = metadataDoc["focused_entry"].GetString();
-  
-    for (const auto &entryVal : metadataDoc["archive_structure"].GetArray()) {
-        entryMetadata.push_back(ArchiveEntryMetadata::fromJson(entryVal));
-    }
+  for (const auto &entryVal : metadataDoc["archive_structure"].GetArray()) {
+    entryMetadata.push_back(ArchiveEntryMetadata::fromJson(entryVal));
+  }
 }
 
 void ArchiveMetadata::seedTempPaths(fileutils::FileManager *file_man, bool keep = false) {
-    for (auto& entry : entryMetadata)
-        entry.tmpFileName.assign(file_man->unique_file(keep));
+  for (auto& entry : entryMetadata)
+    entry.tmpFileName.assign(file_man->unique_file(keep));
 }
 
 ArchiveStack ArchiveStack::fromJson(const rapidjson::Value& input) {
-    ArchiveStack as;
-    as.loadJson(input);
-    return as;
+  ArchiveStack as;
+  as.loadJson(input);
+  return as;
 }
 
 ArchiveStack ArchiveStack::fromJsonString(const std::string& input) {
-    ArchiveStack as;
-    as.loadJsonString(input);
-    return as;
+  ArchiveStack as;
+  as.loadJsonString(input);
+  return as;
 }
 
 void ArchiveStack::loadJson(const rapidjson::Value& lensStack) {
-    for (const auto& metadata : lensStack.GetArray()) {
-        stack_.push_back(ArchiveMetadata::fromJson(metadata));
-    }
+  for (const auto& metadata : lensStack.GetArray()) {
+    stack_.push_back(ArchiveMetadata::fromJson(metadata));
+  }
 }
 
 void ArchiveStack::loadJsonString(const std::string& input) {
-    rapidjson::Document lensStack;
-    rapidjson::ParseResult ok = lensStack.Parse(input.c_str());
+  rapidjson::Document lensStack;
+  rapidjson::ParseResult ok = lensStack.Parse(input.c_str());
 
-    if (!ok) {
-        std::stringstream ss;
-        ss << "Failed to parse archive lens stack from JSON string with reason: "
-           << rapidjson::GetParseError_En(ok.Code())
-           << " at offset " << ok.Offset();
+  if (!ok) {
+    std::stringstream ss;
+    ss << "Failed to parse archive lens stack from JSON string with reason: "
+        << rapidjson::GetParseError_En(ok.Code())
+        << " at offset " << ok.Offset();
 
-        throw Exception(ExceptionType::GENERAL_EXCEPTION, ss.str());
-    }
+    throw Exception(ExceptionType::GENERAL_EXCEPTION, ss.str());
+  }
 
-    loadJson(lensStack);
+  loadJson(lensStack);
 }
 
 rapidjson::Document ArchiveStack::toJson() const {
-    rapidjson::Document lensStack(rapidjson::kArrayType);
-    rapidjson::Document::AllocatorType &alloc = lensStack.GetAllocator();
+  rapidjson::Document lensStack(rapidjson::kArrayType);
+  rapidjson::Document::AllocatorType &alloc = lensStack.GetAllocator();
 
-    for (const auto& metadata : stack_) {
-        lensStack.PushBack(metadata.toJson(alloc), alloc);
-    }
+  for (const auto& metadata : stack_) {
+    lensStack.PushBack(metadata.toJson(alloc), alloc);
+  }
 
-    return lensStack;
+  return lensStack;
 }
 
 std::string ArchiveStack::toJsonString() const {
-    rapidjson::Document d = toJson();
-
-    rapidjson::StringBuffer buffer;
-    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
-    d.Accept(writer);
-    
-    std::string jsonString = buffer.GetString();
-    return jsonString;
+  rapidjson::Document d = toJson();
+
+  rapidjson::StringBuffer buffer;
+  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+  d.Accept(writer);
+
+  std::string jsonString = buffer.GetString();
+  return jsonString;
 }
diff --git a/extensions/libarchive/ArchiveMetadata.h b/extensions/libarchive/ArchiveMetadata.h
index 6896c1d..081a9b6 100644
--- a/extensions/libarchive/ArchiveMetadata.h
+++ b/extensions/libarchive/ArchiveMetadata.h
@@ -20,79 +20,75 @@
 #ifndef EXTENSIONS_LIBARCHIVE_ARCHIVEMETADATA_H_
 #define EXTENSIONS_LIBARCHIVE_ARCHIVEMETADATA_H_
 
-#include "rapidjson/document.h"
-#include "rapidjson/writer.h"
-#include "rapidjson/stringbuffer.h"
-#include "rapidjson/error/en.h"
-
 #include <list>
 #include <vector>
 #include <string>
-#include <algorithm>
+
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
 
 #include "ArchiveCommon.h"
-#include "core/Core.h"
 #include "utils/file/FileManager.h"
 
 class ArchiveEntryMetadata {
-public:
-    std::string entryName;
-    mode_t entryType;
-    mode_t entryPerm;
-    uid_t entryUID;
-    gid_t entryGID;
-    uint64_t entryMTime;
-    uint64_t entryMTimeNsec;
-    uint64_t entrySize;
-
-    std::string tmpFileName;
-    std::string stashKey;
-
-    inline rapidjson::Value toJson(rapidjson::Document::AllocatorType &alloc) const;
-    static inline ArchiveEntryMetadata fromJson(const rapidjson::Value&);
-
-private:
-    inline void loadJson(const rapidjson::Value&);
+ public:
+  std::string entryName;
+  mode_t entryType;
+  mode_t entryPerm;
+  uid_t entryUID;
+  gid_t entryGID;
+  uint64_t entryMTime;
+  uint64_t entryMTimeNsec;
+  uint64_t entrySize;
+
+  std::string tmpFileName;
+  std::string stashKey;
+
+  inline rapidjson::Value toJson(rapidjson::Document::AllocatorType &alloc) const;
+  static inline ArchiveEntryMetadata fromJson(const rapidjson::Value&);
+
+ private:
+  inline void loadJson(const rapidjson::Value&);
 };
 
 using ArchiveEntryIterator = typename std::list<ArchiveEntryMetadata>::iterator;
 
 class ArchiveMetadata {
-public:
-    std::string archiveName;
-    std::string archiveFormatName;
-    int archiveFormat;
-    std::list<ArchiveEntryMetadata> entryMetadata;
+ public:
+  std::string archiveName;
+  std::string archiveFormatName;
+  int archiveFormat;
+  std::list<ArchiveEntryMetadata> entryMetadata;
 
-    std::string focusedEntry;
+  std::string focusedEntry;
 
-    ArchiveEntryIterator find(const std::string& name);
-    ArchiveEntryIterator eraseEntry(ArchiveEntryIterator position);
-    ArchiveEntryIterator insertEntry(ArchiveEntryIterator it, const ArchiveEntryMetadata& entry);
+  ArchiveEntryIterator find(const std::string& name);
+  ArchiveEntryIterator eraseEntry(ArchiveEntryIterator position);
+  ArchiveEntryIterator insertEntry(ArchiveEntryIterator it, const ArchiveEntryMetadata& entry);
 
-    void seedTempPaths(fileutils::FileManager* file_man, bool keep);
+  void seedTempPaths(fileutils::FileManager* file_man, bool keep);
 
-    rapidjson::Value toJson(rapidjson::Document::AllocatorType &alloc) const;
-    static ArchiveMetadata fromJson(const rapidjson::Value&);
+  rapidjson::Value toJson(rapidjson::Document::AllocatorType &alloc) const;
+  static ArchiveMetadata fromJson(const rapidjson::Value&);
 
-private:
-    void loadJson(const rapidjson::Value&);
+ private:
+  void loadJson(const rapidjson::Value&);
 };
 
 class ArchiveStack {
-public:
-    static ArchiveStack fromJsonString(const std::string& input);
-    static ArchiveStack fromJson(const rapidjson::Value& input);
-    void push(const ArchiveMetadata& metadata) { stack_.push_back(metadata); }
-    ArchiveMetadata pop() { auto x = top(); stack_.pop_back(); return x; }
-    ArchiveMetadata top() const { return stack_.back(); }
-    void loadJson(const rapidjson::Value& input);
-    void loadJsonString(const std::string& input);
-    std::string toJsonString() const;
-    rapidjson::Document toJson() const;
-
-private:
-    std::vector<ArchiveMetadata> stack_;
+ public:
+  static ArchiveStack fromJsonString(const std::string& input);
+  static ArchiveStack fromJson(const rapidjson::Value& input);
+  void push(const ArchiveMetadata& metadata) { stack_.push_back(metadata); }
+  ArchiveMetadata pop() { auto x = top(); stack_.pop_back(); return x; }
+  ArchiveMetadata top() const { return stack_.back(); }
+  void loadJson(const rapidjson::Value& input);
+  void loadJsonString(const std::string& input);
+  std::string toJsonString() const;
+  rapidjson::Document toJson() const;
+
+ private:
+  std::vector<ArchiveMetadata> stack_;
 };
 
 #endif  // EXTENSIONS_LIBARCHIVE_ARCHIVEMETADATA_H_
diff --git a/extensions/libarchive/ArchiveTests.h b/extensions/libarchive/ArchiveTests.h
index 333abf4..a8d60cd 100644
--- a/extensions/libarchive/ArchiveTests.h
+++ b/extensions/libarchive/ArchiveTests.h
@@ -18,15 +18,13 @@
  * limitations under the License.
  */
 
-#ifndef ARCHIVE_TESTS_H
-#define ARCHIVE_TESTS_H
+#pragma once
 
 #include <map>
 #include <vector>
 #include <string>
 
-#include <archive.h>
-#include <archive_entry.h>
+#include "archive_entry.h"
 
 #include "ArchiveCommon.h"
 
@@ -63,4 +61,3 @@ void build_test_archive(std::string, OrderedTestArchive);
 bool check_archive_contents(std::string, TAE_MAP_T entries, bool check_attributes = true, FN_VEC_T order = FN_VEC_T());
 bool check_archive_contents(std::string, OrderedTestArchive, bool check_attributes = true);
 
-#endif
diff --git a/extensions/libarchive/BinFiles.cpp b/extensions/libarchive/BinFiles.cpp
index c3c92e2..3b12026 100644
--- a/extensions/libarchive/BinFiles.cpp
+++ b/extensions/libarchive/BinFiles.cpp
@@ -23,11 +23,10 @@
 #include <string>
 #include <vector>
 #include <set>
-#include <queue>
+#include <unordered_set>
 #include <map>
 #include <deque>
 #include <utility>
-#include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h
index bc245a8..40b6bb7 100644
--- a/extensions/libarchive/BinFiles.h
+++ b/extensions/libarchive/BinFiles.h
@@ -17,17 +17,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __BIN_FILES_H__
-#define __BIN_FILES_H__
+#pragma once
 
 #include <cinttypes>
 #include <limits>
 #include <deque>
+#include <memory>
+#include <unordered_set>
+#include <string>
+#include <set>
 #include <map>
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
-#include "core/Core.h"
 #include "core/Resource.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/gsl.h"
@@ -124,8 +126,6 @@ class Bin {
     return groupId_;
   }
 
- protected:
-
  private:
   uint64_t minSize_;
   uint64_t maxSize_;
@@ -184,8 +184,6 @@ class BinManager {
   // get ready bin from binManager
   void getReadyBin(std::deque<std::unique_ptr<Bin>> &retBins);
 
- protected:
-
  private:
   std::mutex mutex_;
   uint64_t minSize_{0};
@@ -205,6 +203,7 @@ class BinManager {
 class BinFiles : public core::Processor {
  protected:
   static core::Relationship Self;
+
  public:
   using core::Processor::Processor;
   // Destructor
@@ -302,4 +301,3 @@ REGISTER_RESOURCE(BinFiles, "Bins flow files into buckets based on the number of
 } /* namespace apache */
 } /* namespace org */
 
-#endif
diff --git a/extensions/libarchive/CMakeLists.txt b/extensions/libarchive/CMakeLists.txt
index cc55acd..8cb361e 100644
--- a/extensions/libarchive/CMakeLists.txt
+++ b/extensions/libarchive/CMakeLists.txt
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) 
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
 
 file(GLOB SOURCES  "*.cpp")
 
@@ -30,4 +30,4 @@ target_link_libraries(minifi-archive-extensions LibArchive::LibArchive)
 SET (ARCHIVE-EXTENSIONS minifi-archive-extensions PARENT_SCOPE)
 
 register_extension(minifi-archive-extensions)
-
+register_extension_linter(minifi-archive-extensions-linter)
diff --git a/extensions/libarchive/CompressContent.cpp b/extensions/libarchive/CompressContent.cpp
index 821c2bd..82c2f80 100644
--- a/extensions/libarchive/CompressContent.cpp
+++ b/extensions/libarchive/CompressContent.cpp
@@ -19,13 +19,10 @@
  */
 #include "CompressContent.h"
 #include <stdio.h>
-#include <algorithm>
 #include <memory>
 #include <string>
 #include <map>
 #include <set>
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index 8865dfe..8d8a048 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -17,11 +17,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __COMPRESS_CONTENT_H__
-#define __COMPRESS_CONTENT_H__
+#pragma once
 
 #include <cinttypes>
+#include <vector>
 #include <utility>
+#include <memory>
+#include <map>
+#include <string>
 
 #include "archive_entry.h"
 #include "archive.h"
@@ -45,7 +48,7 @@ namespace processors {
 
 // CompressContent Class
 class CompressContent : public core::Processor {
-public:
+ public:
   // Constructor
   /*!
    * Create a new processor
@@ -88,10 +91,10 @@ public:
     (USE_MIME_TYPE, "use mime.type attribute")
   )
 
-public:
+ public:
   // Nest Callback Class for read stream from flow for compress
   class ReadCallbackCompress: public InputStreamCallback {
-  public:
+   public:
     ReadCallbackCompress(std::shared_ptr<core::FlowFile> &flow, struct archive *arch, struct archive_entry *entry) :
         flow_(flow), arch_(arch), entry_(entry), status_(0), logger_(logging::LoggerFactory<CompressContent>::getLogger()) {
     }
@@ -155,7 +158,7 @@ public:
   };
   // Nest Callback Class for write stream
   class WriteCallback: public OutputStreamCallback {
-  public:
+   public:
     WriteCallback(CompressionMode compress_mode, int compress_level, CompressionFormat compress_format,
         const std::shared_ptr<core::FlowFile> &flow, const std::shared_ptr<core::ProcessSession> &session) :
         compress_mode_(compress_mode), compress_level_(compress_level), compress_format_(compress_format),
@@ -187,7 +190,7 @@ public:
     }
 
     static la_ssize_t archive_read(struct archive* archive, void *context, const void **buff) {
-      auto *callback = (WriteCallback *) context;
+      auto *callback = reinterpret_cast<WriteCallback *>(context);
       callback->session_->read(callback->flow_, &callback->readDecompressCb_);
       *buff = callback->readDecompressCb_.buffer;
       if (io::isError(callback->readDecompressCb_.stream_read_result)) {
@@ -414,7 +417,7 @@ public:
     }
   };
 
-public:
+ public:
   /**
    * Function that's executed when the processor is scheduled.
    * @param context process context.
@@ -430,7 +433,7 @@ public:
   // Initialize, over write by NiFi CompressContent
   void initialize() override;
 
-private:
+ private:
   static std::string toMimeType(CompressionFormat format);
 
   void processFlowFile(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<core::ProcessSession>& session);
@@ -457,5 +460,3 @@ REGISTER_RESOURCE(CompressContent, "Compresses or decompresses the contents of F
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif
diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp
index 04bdb2c..3a4e91c 100644
--- a/extensions/libarchive/FocusArchiveEntry.cpp
+++ b/extensions/libarchive/FocusArchiveEntry.cpp
@@ -27,8 +27,6 @@
 #include <string>
 #include <set>
 
-#include <iostream>
-#include <fstream>
 #include <memory>
 
 #include "core/ProcessContext.h"
@@ -123,9 +121,7 @@ void FocusArchiveEntry::onTrigger(core::ProcessContext *context, core::ProcessSe
 
     archiveStack.push(archiveMetadata);
 
-    std::string stackStr = archiveStack.toJsonString();
-  
-    flowFile->setAttribute("lens.archive.stack", stackStr);
+    flowFile->setAttribute("lens.archive.stack", archiveStack.toJsonString());
   }
 
   // Update filename attribute to that of focused entry
diff --git a/extensions/libarchive/FocusArchiveEntry.h b/extensions/libarchive/FocusArchiveEntry.h
index 71f1223..bee6dcc 100644
--- a/extensions/libarchive/FocusArchiveEntry.h
+++ b/extensions/libarchive/FocusArchiveEntry.h
@@ -17,14 +17,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_PROCESSORS_FOCUSARCHIVEENTRY_H_
-#define LIBMINIFI_INCLUDE_PROCESSORS_FOCUSARCHIVEENTRY_H_
+#pragma once
 
-#include <list>
 #include <memory>
 #include <string>
 
-#include <archive.h>
+#include "archive.h"
 
 #include "ArchiveMetadata.h"
 #include "FlowFileRecord.h"
@@ -98,5 +96,3 @@ REGISTER_RESOURCE(FocusArchiveEntry, "Allows manipulation of entries within an a
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif  // LIBMINIFI_INCLUDE_PROCESSORS_FOCUSARCHIVEENTRY_H_
diff --git a/extensions/libarchive/ManipulateArchive.cpp b/extensions/libarchive/ManipulateArchive.cpp
index 2a21577..f553103 100644
--- a/extensions/libarchive/ManipulateArchive.cpp
+++ b/extensions/libarchive/ManipulateArchive.cpp
@@ -17,16 +17,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <string.h>
 #include <iostream>
-#include <fstream>
 #include <memory>
 #include <string>
 #include <set>
 #include <list>
+#include <algorithm>
 
-#include <archive.h>
-#include <archive_entry.h>
+#include "archive.h"
+#include "archive_entry.h"
 
 #include "ManipulateArchive.h"
 #include "Exception.h"
@@ -34,6 +33,8 @@
 #include "core/ProcessSession.h"
 #include "core/FlowFile.h"
 #include "utils/file/FileManager.h"
+#include "FocusArchiveEntry.h"
+#include "UnfocusArchiveEntry.h"
 
 namespace org {
 namespace apache {
@@ -107,7 +108,7 @@ void ManipulateArchive::onSchedule(core::ProcessContext *context, core::ProcessS
     if (before_.size() && after_.size()) {
         logger_->log_error("ManipulateArchive: cannot specify both before and after.");
         invalid = true;
-    }   
+    }
 
     if (invalid) {
         throw Exception(GENERAL_EXCEPTION, "Invalid ManipulateArchive configuration");
diff --git a/extensions/libarchive/ManipulateArchive.h b/extensions/libarchive/ManipulateArchive.h
index b630a69..d3927ca 100644
--- a/extensions/libarchive/ManipulateArchive.h
+++ b/extensions/libarchive/ManipulateArchive.h
@@ -17,19 +17,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_PROCESSORS_MANIPULATEARCHIVE_H_
-#define LIBMINIFI_INCLUDE_PROCESSORS_MANIPULATEARCHIVE_H_
+#pragma once
 
-#include <list>
 #include <string>
+#include <memory>
 
 #include "FlowFileRecord.h"
 #include "ArchiveMetadata.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 
-#include "FocusArchiveEntry.h"
-#include "UnfocusArchiveEntry.h"
 
 namespace org {
 namespace apache {
@@ -40,8 +37,8 @@ namespace processors {
 using logging::Logger;
 
 class ManipulateArchive : public core::Processor {
-public:
-  ManipulateArchive(const std::string& name, const utils::Identifier& uuid = {})
+ public:
+  explicit ManipulateArchive(const std::string& name, const utils::Identifier& uuid = {})
   : core::Processor(name, uuid),
     logger_(logging::LoggerFactory<ManipulateArchive>::getLogger()) {
   }
@@ -70,9 +67,7 @@ public:
   // Initialize, over write by NiFi ManipulateArchive
   void initialize(void);
 
-protected:
-
-private:
+ private:
   // Logger
   std::shared_ptr<Logger> logger_;
   std::string before_, after_, operation_, destination_, targetEntry_;
@@ -85,5 +80,3 @@ REGISTER_RESOURCE(ManipulateArchive, "Performs an operation which manipulates an
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif  // LIBMINIFI_INCLUDE_PROCESSORS_MANIPULATEARCHIVE_H_
diff --git a/extensions/libarchive/MergeContent.cpp b/extensions/libarchive/MergeContent.cpp
index 9923fe0..fb4fb82 100644
--- a/extensions/libarchive/MergeContent.cpp
+++ b/extensions/libarchive/MergeContent.cpp
@@ -21,16 +21,13 @@
 #include <stdio.h>
 #include <memory>
 #include <string>
-#include <vector>
 #include <set>
-#include <queue>
 #include <map>
 #include <deque>
 #include <utility>
 #include <algorithm>
 #include <numeric>
 #include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
 #include "utils/GeneralUtils.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
@@ -289,11 +286,11 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio
   }
 
   std::shared_ptr<core::FlowFile> merge_flow = std::static_pointer_cast<FlowFileRecord>(session->create());
-  if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON)
+  if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON) {
     KeepOnlyCommonAttributesMerger(bin->getFlowFile()).mergeAttributes(session, merge_flow);
-  else if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE)
+  } else if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE) {
     KeepAllUniqueAttributesMerger(bin->getFlowFile()).mergeAttributes(session, merge_flow);
-  else {
+  } else {
     logger_->log_error("Attribute strategy not supported %s", attributeStrategy_);
     return false;
   }
diff --git a/extensions/libarchive/MergeContent.h b/extensions/libarchive/MergeContent.h
index d14b924..7fd852a 100644
--- a/extensions/libarchive/MergeContent.h
+++ b/extensions/libarchive/MergeContent.h
@@ -17,8 +17,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __MERGE_CONTENT_H__
-#define __MERGE_CONTENT_H__
+#pragma once
+
+#include <deque>
+#include <map>
+#include <vector>
+#include <memory>
+#include <string>
 
 #include "ArchiveCommon.h"
 #include "BinFiles.h"
@@ -172,7 +177,7 @@ class ArchiveMerge {
     FlowFileSerializer& serializer_;
 
     static la_ssize_t archive_write(struct archive* /*arch*/, void *context, const void *buff, size_t size) {
-      WriteCallback *callback = (WriteCallback *) context;
+      WriteCallback *callback = reinterpret_cast<WriteCallback *>(context);
       uint8_t* data = reinterpret_cast<uint8_t*>(const_cast<void*>(buff));
       la_ssize_t totalWrote = 0;
       size_t remaining = size;
@@ -376,5 +381,3 @@ REGISTER_RESOURCE(MergeContent, "Merges a Group of FlowFiles together based on a
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif
diff --git a/extensions/libarchive/UnfocusArchiveEntry.cpp b/extensions/libarchive/UnfocusArchiveEntry.cpp
index de54bc2..cd33233 100644
--- a/extensions/libarchive/UnfocusArchiveEntry.cpp
+++ b/extensions/libarchive/UnfocusArchiveEntry.cpp
@@ -26,8 +26,8 @@
 #include <string>
 #include <set>
 
-#include <archive.h>
-#include <archive_entry.h>
+#include "archive.h"
+#include "archive_entry.h"
 
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
@@ -192,7 +192,7 @@ int64_t UnfocusArchiveEntry::WriteCallback::process(const std::shared_ptr<io::Ba
     archive_entry_set_size(entry, entryMetadata.entrySize);
     archive_entry_set_uid(entry, entryMetadata.entryUID);
     archive_entry_set_gid(entry, entryMetadata.entryGID);
-    archive_entry_set_mtime(entry, entryMetadata.entryMTime, gsl::narrow<long>(entryMetadata.entryMTimeNsec));
+    archive_entry_set_mtime(entry, entryMetadata.entryMTime, gsl::narrow<long>(entryMetadata.entryMTimeNsec));  // NOLINT long comes from libarchive API
 
     logger_->log_info("Writing %s with type %d, perms %d, size %d, uid %d, gid %d, mtime %d,%d", entryMetadata.entryName, entryMetadata.entryType, entryMetadata.entryPerm,
                       entryMetadata.entrySize, entryMetadata.entryUID, entryMetadata.entryGID, entryMetadata.entryMTime, entryMetadata.entryMTimeNsec);
diff --git a/extensions/libarchive/UnfocusArchiveEntry.h b/extensions/libarchive/UnfocusArchiveEntry.h
index a4a38bb..e74cad9 100644
--- a/extensions/libarchive/UnfocusArchiveEntry.h
+++ b/extensions/libarchive/UnfocusArchiveEntry.h
@@ -17,13 +17,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_PROCESSORS_UNFOCUSARCHIVEENTRY_H_
-#define LIBMINIFI_INCLUDE_PROCESSORS_UNFOCUSARCHIVEENTRY_H_
+#pragma once
 
 #include <memory>
 #include <string>
 
-#include <archive.h>
+#include "archive.h"
 
 #include "FocusArchiveEntry.h"
 #include "FlowFileRecord.h"
@@ -91,5 +90,3 @@ REGISTER_RESOURCE(UnfocusArchiveEntry, "Restores a FlowFile which has had an arc
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif  // LIBMINIFI_INCLUDE_PROCESSORS_UNFOCUSARCHIVEENTRY_H_
diff --git a/extensions/mqtt/CMakeLists.txt b/extensions/mqtt/CMakeLists.txt
index 1b7cacd..decd009 100644
--- a/extensions/mqtt/CMakeLists.txt
+++ b/extensions/mqtt/CMakeLists.txt
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) 
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
 include_directories(./controllerservice ./processors ./protocol ../../libminifi/include  ../../libminifi/include/core)
 
 file(GLOB SOURCES "*.cpp" "protocol/*.cpp" "processors/*.cpp" "controllerservice/*.cpp")
@@ -34,4 +34,4 @@ target_link_libraries(minifi-mqtt-extensions paho.mqtt.c)
 SET (MQTT-EXTENSIONS minifi-mqtt-extensions PARENT_SCOPE)
 
 register_extension(minifi-mqtt-extensions)
-
+register_extension_linter(minifi-mqtt-extensions-linter)
diff --git a/extensions/mqtt/MQTTLoader.h b/extensions/mqtt/MQTTLoader.h
index 125b042..95475c2 100644
--- a/extensions/mqtt/MQTTLoader.h
+++ b/extensions/mqtt/MQTTLoader.h
@@ -15,8 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSION_MQTTLOADER_H
-#define EXTENSION_MQTTLOADER_H
+#pragma once
+
+#include <vector>
+#include <string>
+#include <memory>
 
 #include "controllerservice/MQTTControllerService.h"
 #include "processors/PublishMQTT.h"
@@ -83,4 +86,3 @@ class MQTTFactory : public core::ObjectFactory {
 extern "C" {
 DLL_EXPORT void *createMQTTFactory(void);
 }
-#endif /* EXTENSION_MQTTLOADER_H */
diff --git a/extensions/mqtt/controllerservice/MQTTControllerService.cpp b/extensions/mqtt/controllerservice/MQTTControllerService.cpp
index d12541b..7595e74 100644
--- a/extensions/mqtt/controllerservice/MQTTControllerService.cpp
+++ b/extensions/mqtt/controllerservice/MQTTControllerService.cpp
@@ -70,7 +70,7 @@ void MQTTControllerService::onEnable() {
     }
 
     if (client_) {
-      MQTTClient_setCallbacks(client_, (void *) this, reconnectCallback, receiveCallback, deliveryCallback);
+      MQTTClient_setCallbacks(client_, this, reconnectCallback, receiveCallback, deliveryCallback);
       // call reconnect to bootstrap
       this->reconnect();
     }
diff --git a/extensions/mqtt/controllerservice/MQTTControllerService.h b/extensions/mqtt/controllerservice/MQTTControllerService.h
index 10d5f1f..3c99e52 100644
--- a/extensions/mqtt/controllerservice/MQTTControllerService.h
+++ b/extensions/mqtt/controllerservice/MQTTControllerService.h
@@ -15,16 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_MQTTCONTEXTSERVICE_H_
-#define LIBMINIFI_INCLUDE_CONTROLLERS_MQTTCONTEXTSERVICE_H_
+#pragma once
 
 #include <openssl/err.h>
 #include <openssl/ssl.h>
 #include <iostream>
 #include <memory>
+#include <vector>
+#include <utility>
+#include <map>
+#include <string>
+
 #include "core/Resource.h"
 #include "utils/StringUtils.h"
-#include "io/validation.h"
 #include "core/controller/ControllerService.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "controllers/SSLContextService.h"
@@ -47,7 +50,7 @@ class Message {
   Message() = default;
   explicit Message(const std::string &topic, void *data, size_t dataLen)
       : topic_(topic),
-        data_((uint8_t*) data, ((uint8_t*)data + dataLen)) {
+        data_(reinterpret_cast<uint8_t*>(data), (reinterpret_cast<uint8_t*>(data) + dataLen)) {
   }
 
   Message(const Message &other) = default;
@@ -223,7 +226,6 @@ class MQTTControllerService : public core::controller::ControllerService {
   }
 
  protected:
-
   void acknowledgeDelivery(MQTTClient_deliveryToken token) {
     std::lock_guard<std::mutex> lock(delivery_mutex_);
     // locked the mutex
@@ -242,12 +244,12 @@ class MQTTControllerService : public core::controller::ControllerService {
   }
 
   static void deliveryCallback(void *context, MQTTClient_deliveryToken dt) {
-    MQTTControllerService *service = (MQTTControllerService *) context;
+    MQTTControllerService *service = reinterpret_cast<MQTTControllerService *>(context);
     service->acknowledgeDelivery(dt);
   }
 
   static int receiveCallback(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
-    MQTTControllerService *service = (MQTTControllerService *) context;
+    MQTTControllerService *service = reinterpret_cast<MQTTControllerService *>(context);
     std::string topic(topicName, topicLen == 0 ? strlen(topicName) : topicLen);
     Message queueMessage(topic, message->payload, message->payloadlen);
     service->enqueue(topic, std::move(queueMessage));
@@ -256,7 +258,7 @@ class MQTTControllerService : public core::controller::ControllerService {
     return 1;
   }
   static void reconnectCallback(void *context, char* /*cause*/) {
-    MQTTControllerService *service = (MQTTControllerService *) context;
+    MQTTControllerService *service = reinterpret_cast<MQTTControllerService *>(context);
     service->reconnect();
   }
 
@@ -302,7 +304,6 @@ class MQTTControllerService : public core::controller::ControllerService {
   std::string passWord_;
 
  private:
-
   std::map<int, bool> delivered_;
   std::map<std::string, moodycamel::ConcurrentQueue<Message> > topics_;
 
@@ -321,5 +322,3 @@ class MQTTControllerService : public core::controller::ControllerService {
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_MQTTCONTEXTSERVICE_H_ */
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.h b/extensions/mqtt/processors/AbstractMQTTProcessor.h
index 5b1a28a..fbe8dac 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.h
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.h
@@ -17,14 +17,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __ABSTRACTMQTT_H__
-#define __ABSTRACTMQTT_H__
+#pragma once
+
+#include <set>
+#include <string>
+#include <memory>
 
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
-#include "core/Resource.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "MQTTClient.h"
 
@@ -98,11 +100,11 @@ class AbstractMQTTProcessor : public core::Processor {
 
   // MQTT async callbacks
   static void msgDelivered(void *context, MQTTClient_deliveryToken dt) {
-    AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context;
+    AbstractMQTTProcessor *processor = reinterpret_cast<AbstractMQTTProcessor *>(context);
     processor->delivered_token_ = dt;
   }
   static int msgReceived(void *context, char *topicName, int /*topicLen*/, MQTTClient_message *message) {
-    AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context;
+    AbstractMQTTProcessor *processor = reinterpret_cast<AbstractMQTTProcessor *>(context);
     if (processor->isSubscriber_) {
       if (!processor->enqueueReceiveMQTTMsg(message))
         MQTTClient_freeMessage(&message);
@@ -113,7 +115,7 @@ class AbstractMQTTProcessor : public core::Processor {
     return 1;
   }
   static void connectionLost(void *context, char* /*cause*/) {
-    AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context;
+    AbstractMQTTProcessor *processor = reinterpret_cast<AbstractMQTTProcessor *>(context);
     processor->reconnect();
   }
   bool reconnect();
@@ -154,5 +156,3 @@ class AbstractMQTTProcessor : public core::Processor {
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif
diff --git a/extensions/mqtt/processors/ConsumeMQTT.h b/extensions/mqtt/processors/ConsumeMQTT.h
index 10208c6..d279fc0 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.h
+++ b/extensions/mqtt/processors/ConsumeMQTT.h
@@ -17,10 +17,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __CONSUME_MQTT_H__
-#define __CONSUME_MQTT_H__
+#pragma once
 
 #include <limits>
+#include <string>
+#include <memory>
+
 #include <deque>
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
@@ -135,5 +137,3 @@ REGISTER_RESOURCE(ConsumeMQTT, "This Processor gets the contents of a FlowFile f
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif
diff --git a/extensions/mqtt/processors/ConvertBase.h b/extensions/mqtt/processors/ConvertBase.h
index 96fd7dc..4b1bbbd 100644
--- a/extensions/mqtt/processors/ConvertBase.h
+++ b/extensions/mqtt/processors/ConvertBase.h
@@ -15,8 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSIONS_MQTT_PROTOCOL_CONVERTBASE_H_
-#define EXTENSIONS_MQTT_PROTOCOL_CONVERTBASE_H_
+#pragma once
+
+#include <string>
+#include <memory>
 
 #include "MQTTControllerService.h"
 #include "FlowFileRecord.h"
@@ -57,7 +59,6 @@ class ConvertBase : public core::Processor, public minifi::c2::RESTProtocol {
   static core::Relationship Success;
 
  public:
-
   /**
    * Initialization of the processor
    */
@@ -69,8 +70,8 @@ class ConvertBase : public core::Processor, public minifi::c2::RESTProtocol {
    * ProcessSession objects.
    */
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
- protected:
 
+ protected:
   /**
    * MQTT controller service.
    */
@@ -84,5 +85,3 @@ class ConvertBase : public core::Processor, public minifi::c2::RESTProtocol {
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif /* EXTENSIONS_MQTT_PROTOCOL_CONVERTBASE_H_ */
diff --git a/extensions/mqtt/processors/ConvertHeartBeat.cpp b/extensions/mqtt/processors/ConvertHeartBeat.cpp
index 78c4e10..5fcb95b 100644
--- a/extensions/mqtt/processors/ConvertHeartBeat.cpp
+++ b/extensions/mqtt/processors/ConvertHeartBeat.cpp
@@ -20,7 +20,9 @@
 #include <memory>
 #include <string>
 #include <map>
+#include <vector>
 #include <set>
+
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
diff --git a/extensions/mqtt/processors/ConvertHeartBeat.h b/extensions/mqtt/processors/ConvertHeartBeat.h
index ee8161e..9f0292e 100644
--- a/extensions/mqtt/processors/ConvertHeartBeat.h
+++ b/extensions/mqtt/processors/ConvertHeartBeat.h
@@ -15,19 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __CONVERT_HEARTBEAT_H__
-#define __CONVERT_HEARTBEAT_H__
+#pragma once
+
+#include <string>
+#include <memory>
 
-#include "MQTTControllerService.h"
-#include "FlowFileRecord.h"
-#include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
-#include "core/Resource.h"
-#include "core/Property.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "MQTTClient.h"
-#include "c2/protocols/RESTProtocol.h"
 #include "ConvertBase.h"
 
 namespace org {
@@ -40,7 +35,7 @@ namespace processors {
  * Purpose: ConvertHeartBeat converts heatbeats into MQTT messages.
  */
 class ConvertHeartBeat: public ConvertBase{
-public:
+ public:
   // Constructor
   /*!
    * Create a new processor
@@ -53,7 +48,7 @@ public:
   // Processor Name
   static constexpr char const* ProcessorName = "ConvertHeartBeat";
 
-public:
+ public:
   /**
    * Function that's executed when the processor is triggered.
    * @param context process context.
@@ -63,7 +58,7 @@ public:
 
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
 
-private:
+ private:
   std::shared_ptr<logging::Logger> logger_;
 };
 
@@ -73,5 +68,3 @@ private:
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif
diff --git a/extensions/mqtt/processors/ConvertJSONAck.h b/extensions/mqtt/processors/ConvertJSONAck.h
index 6c9d3cd..8ec804e 100644
--- a/extensions/mqtt/processors/ConvertJSONAck.h
+++ b/extensions/mqtt/processors/ConvertJSONAck.h
@@ -17,22 +17,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __CONVERT_ACKNOWLEDGEMENT_H__
-#define __CONVERT_ACKNOWLEDGEMENT_H__
+#pragma once
+
+#include <vector>
+#include <string>
+#include <memory>
 
-#include "MQTTControllerService.h"
 #include "FlowFileRecord.h"
-#include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
 #include "core/Resource.h"
-#include "core/Property.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "MQTTClient.h"
-#include "c2/protocols/RESTProtocol.h"
 #include "ConvertBase.h"
 #include "utils/gsl.h"
 
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -70,7 +69,6 @@ class ConvertJSONAck : public ConvertBase {
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
 
  protected:
-
   class ReadCallback : public InputStreamCallback {
    public:
     ReadCallback() = default;
@@ -90,6 +88,7 @@ class ConvertJSONAck : public ConvertBase {
    * @param json json representation defined by the restful protocol
    */
   std::string parseTopicName(const std::string &json);
+
  private:
   std::shared_ptr<logging::Logger> logger_;
 };
@@ -99,5 +98,3 @@ class ConvertJSONAck : public ConvertBase {
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif
diff --git a/extensions/mqtt/processors/ConvertUpdate.cpp b/extensions/mqtt/processors/ConvertUpdate.cpp
index 9f3b456..b759eef 100644
--- a/extensions/mqtt/processors/ConvertUpdate.cpp
+++ b/extensions/mqtt/processors/ConvertUpdate.cpp
@@ -15,6 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include <string>
+#include <memory>
+#include <vector>
+#include <algorithm>
+#include <set>
+
 #include "ConvertUpdate.h"
 #include "utils/HTTPClient.h"
 #include "io/BaseStream.h"
diff --git a/extensions/mqtt/processors/ConvertUpdate.h b/extensions/mqtt/processors/ConvertUpdate.h
index d24e5d4..376232a 100644
--- a/extensions/mqtt/processors/ConvertUpdate.h
+++ b/extensions/mqtt/processors/ConvertUpdate.h
@@ -15,20 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSIONS_MQTT_PROTOCOL_CONVERTUPDATE_H_
-#define EXTENSIONS_MQTT_PROTOCOL_CONVERTUPDATE_H_
+#pragma once
 
+#include <string>
+#include <memory>
 
 #include "MQTTControllerService.h"
-#include "FlowFileRecord.h"
-#include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
-#include "core/Resource.h"
 #include "core/Property.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "MQTTClient.h"
-#include "c2/protocols/RESTProtocol.h"
 #include "ConvertBase.h"
 
 namespace org {
@@ -60,8 +56,7 @@ class ConvertUpdate : public ConvertBase {
   // Processor Name
   static constexpr char const* ProcessorName = "ConvertUpdate";
 
-public:
-
+ public:
   /**
      * Initialization of the processor
      */
@@ -75,9 +70,10 @@ public:
 
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
 
-protected:
+ protected:
   std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
-private:
+
+ private:
   std::shared_ptr<logging::Logger> logger_;
 };
 
@@ -86,5 +82,3 @@ private:
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif /* EXTENSIONS_MQTT_PROTOCOL_CONVERTUPDATE_H_ */
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index 04368c7..710d597 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -17,8 +17,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __PUBLISH_MQTT_H__
-#define __PUBLISH_MQTT_H__
+#pragma once
+
+#include <vector>
+#include <string>
+#include <memory>
 
 #include <limits>
 
@@ -151,5 +154,3 @@ REGISTER_RESOURCE(PublishMQTT, "PublishMQTT serializes FlowFile content as an MQ
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif
diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.cpp b/extensions/mqtt/protocol/MQTTC2Protocol.cpp
index d50f8cb..7b0da5b 100644
--- a/extensions/mqtt/protocol/MQTTC2Protocol.cpp
+++ b/extensions/mqtt/protocol/MQTTC2Protocol.cpp
@@ -34,8 +34,9 @@ void MQTTC2Protocol::initialize(core::controller::ControllerServiceProvider* con
   if (configure->get("nifi.c2.mqtt.connector.service", controller_service_name_)) {
     auto service = controller->getControllerService(controller_service_name_);
     mqtt_service_ = std::static_pointer_cast<controllers::MQTTControllerService>(service);
-  } else
+  } else {
     mqtt_service_ = nullptr;
+  }
 
   agent_identifier_ = configure->getAgentIdentifier();
 
diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.h b/extensions/mqtt/protocol/MQTTC2Protocol.h
index 6b94ca6..0c9ff77 100644
--- a/extensions/mqtt/protocol/MQTTC2Protocol.h
+++ b/extensions/mqtt/protocol/MQTTC2Protocol.h
@@ -15,8 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSIONS_MQTT_PROTOCOL_MQTTC2PROTOCOL_H_
-#define EXTENSIONS_MQTT_PROTOCOL_MQTTC2PROTOCOL_H_
+#pragma once
 
 #include <algorithm>
 #include <iostream>
@@ -67,7 +66,6 @@ class MQTTC2Protocol : public C2Protocol {
   void initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) override;
 
  protected:
-
   C2Payload serialize(const C2Payload &payload);
 
   std::mutex input_mutex_;
@@ -85,11 +83,9 @@ class MQTTC2Protocol : public C2Protocol {
   std::shared_ptr<logging::Logger> logger_;
   // mqtt controller serviec name.
   std::string controller_service_name_;
-
 };
 } /* namespace c2 */
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-#endif /* EXTENSIONS_MQTT_PROTOCOL_MQTTC2PROTOCOL_H_ */
diff --git a/extensions/opc/CMakeLists.txt b/extensions/opc/CMakeLists.txt
index 4a51adf..a029143 100644
--- a/extensions/opc/CMakeLists.txt
+++ b/extensions/opc/CMakeLists.txt
@@ -35,3 +35,4 @@ target_link_libraries(minifi-opc-extensions ${CMAKE_DL_LIBS} spdlog open62541::o
 SET (OPC-EXTENSIONS minifi-opc-extensions PARENT_SCOPE)
 
 register_extension(minifi-opc-extensions)
+register_extension_linter(minifi-opc-extensions-linter)
diff --git a/extensions/opc/include/fetchopc.h b/extensions/opc/include/fetchopc.h
index f4102c2..8e210cb 100644
--- a/extensions/opc/include/fetchopc.h
+++ b/extensions/opc/include/fetchopc.h
@@ -16,23 +16,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef NIFI_MINIFI_CPP_FetchOPCProcessor_H
-#define NIFI_MINIFI_CPP_FetchOPCProcessor_H
+#pragma once
 
 #include <memory>
 #include <string>
-#include <list>
 #include <unordered_map>
 #include <mutex>
-#include <thread>
+#include <vector>
 
 #include "opc.h"
 #include "opcbase.h"
-#include "utils/ByteArrayCallback.h"
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
-#include "core/Core.h"
 #include "core/Property.h"
 #include "core/Resource.h"
 #include "controllers/SSLContextService.h"
@@ -47,7 +43,7 @@ namespace minifi {
 namespace processors {
 
 class FetchOPCProcessor : public BaseOPCProcessor {
-public:
+ public:
   static constexpr char const* ProcessorName = "FetchOPC";
   // Supported Properties
   static core::Property NodeIDType;
@@ -60,7 +56,7 @@ public:
   static core::Relationship Success;
   static core::Relationship Failure;
 
-  FetchOPCProcessor(const std::string& name, const utils::Identifier& uuid = {})
+  explicit FetchOPCProcessor(const std::string& name, const utils::Identifier& uuid = {})
   : BaseOPCProcessor(name, uuid), nameSpaceIdx_(0), nodesFound_(0), variablesFound_(0), maxDepth_(0) {
     logger_ = logging::LoggerFactory<FetchOPCProcessor>::getLogger();
   }
@@ -71,7 +67,7 @@ public:
 
   void initialize(void) override;
 
-protected:
+ protected:
   bool nodeFoundCallBack(opc::Client& client, const UA_ReferenceDescription *ref, const std::string& path,
                          const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
 
@@ -80,7 +76,7 @@ protected:
   class WriteCallback : public OutputStreamCallback {
     std::string data_;
    public:
-    WriteCallback(std::string&& data)
+    explicit WriteCallback(std::string&& data)
       : data_(data) {
     }
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
@@ -96,7 +92,7 @@ protected:
   uint64_t maxDepth_;
   bool lazy_mode_;
 
-private:
+ private:
   std::mutex onTriggerMutex_;
   std::vector<UA_NodeId> translatedNodeIDs_;  // Only used when user provides path, path->nodeid translation is only done once
   std::unordered_map<std::string, std::string> node_timestamp_;  // Key = Full path, Value = Timestamp
@@ -109,5 +105,3 @@ REGISTER_RESOURCE(FetchOPCProcessor, "Fetches OPC-UA node");
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif  // NIFI_MINIFI_CPP_FetchOPCProcessor_H
diff --git a/extensions/opc/include/opc.h b/extensions/opc/include/opc.h
index 2cffe5a..9646f3d 100644
--- a/extensions/opc/include/opc.h
+++ b/extensions/opc/include/opc.h
@@ -16,18 +16,17 @@
  */
 
 
-#ifndef NIFI_MINIFI_CPP_OPC_H
-#define NIFI_MINIFI_CPP_OPC_H
-
-#include "open62541/client.h"
-#include "open62541/client_highlevel.h"
-#include "open62541/client_config_default.h"
-#include "logging/Logger.h"
-#include "Exception.h"
+#pragma once
 
 #include <string>
 #include <functional>
 #include <map>
+#include <vector>
+#include <memory>
+
+#include "open62541/client.h"
+#include "logging/Logger.h"
+#include "Exception.h"
 
 namespace org {
 namespace apache {
@@ -108,7 +107,7 @@ struct NodeData {
  private:
   UA_Variant* var_;
 
-  NodeData(UA_Variant * var = nullptr) {
+  explicit NodeData(UA_Variant * var = nullptr) {
     var_ = var;
   }
   void addVariant(UA_Variant * var) {
@@ -140,5 +139,3 @@ void logFunc(void *context, UA_LogLevel level, UA_LogCategory category, const ch
 } /* namespace apache */
 } /* namespace org */
 
-
-#endif  // NIFI_MINIFI_CPP_OPC_H
diff --git a/extensions/opc/include/opcbase.h b/extensions/opc/include/opcbase.h
index 54fc7f2..ea4dc46 100644
--- a/extensions/opc/include/opcbase.h
+++ b/extensions/opc/include/opcbase.h
@@ -17,10 +17,12 @@
  * limitations under the License.
  */
 
-#ifndef NIFI_MINIFI_CPP_OPCBASE_H
-#define NIFI_MINIFI_CPP_OPCBASE_H
+#pragma once
 
 #include <string>
+#include <memory>
+#include <vector>
+#include <set>
 
 #include "opc.h"
 #include "core/Processor.h"
@@ -45,7 +47,7 @@ class BaseOPCProcessor : public core::Processor {
   static core::Property KeyPath;
   static core::Property TrustedPath;
 
-  BaseOPCProcessor(const std::string& name, const utils::Identifier& uuid = {})
+  explicit BaseOPCProcessor(const std::string& name, const utils::Identifier& uuid = {})
   : Processor(name, uuid) {
   }
 
@@ -79,5 +81,3 @@ class BaseOPCProcessor : public core::Processor {
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif  // NIFI_MINIFI_CPP_OPCBASE_H
diff --git a/extensions/opc/include/putopc.h b/extensions/opc/include/putopc.h
index c1d402a..7443361 100644
--- a/extensions/opc/include/putopc.h
+++ b/extensions/opc/include/putopc.h
@@ -17,23 +17,18 @@
  * limitations under the License.
  */
 
-#ifndef NIFI_MINIFI_CPP_PUTOPC_H
-#define NIFI_MINIFI_CPP_PUTOPC_H
+#pragma once
 
 #include <memory>
 #include <string>
-#include <list>
-#include <map>
+#include <vector>
 #include <mutex>
-#include <thread>
 
 #include "opc.h"
 #include "opcbase.h"
-#include "utils/ByteArrayCallback.h"
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
-#include "core/Core.h"
 #include "core/Property.h"
 #include "core/Resource.h"
 #include "controllers/SSLContextService.h"
@@ -64,7 +59,7 @@ class PutOPCProcessor : public BaseOPCProcessor {
   static core::Relationship Success;
   static core::Relationship Failure;
 
-  PutOPCProcessor(const std::string& name, const utils::Identifier& uuid = {})
+  explicit PutOPCProcessor(const std::string& name, const utils::Identifier& uuid = {})
   : BaseOPCProcessor(name, uuid), nameSpaceIdx_(0), parentExists_(false) {
     logger_ = logging::LoggerFactory<PutOPCProcessor>::getLogger();
   }
@@ -76,14 +71,13 @@ class PutOPCProcessor : public BaseOPCProcessor {
   void initialize(void) override;
 
  private:
-
   class ReadCallback : public InputStreamCallback {
-  public:
-    ReadCallback(std::shared_ptr<logging::Logger> logger) : logger_(logger) {}
+   public:
+    explicit ReadCallback(std::shared_ptr<logging::Logger> logger) : logger_(logger) {}
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
     const std::vector<uint8_t>& getContent() const { return buf_; }
 
-  private:
+   private:
     std::vector<uint8_t> buf_;
     std::shared_ptr<logging::Logger> logger_;
   };
@@ -107,5 +101,3 @@ REGISTER_RESOURCE(PutOPCProcessor, "Creates/updates  OPC nodes");
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif  // NIFI_MINIFI_CPP_PUTOPC_H
diff --git a/extensions/opc/src/fetchopc.cpp b/extensions/opc/src/fetchopc.cpp
index b8ce1fd..3778215 100644
--- a/extensions/opc/src/fetchopc.cpp
+++ b/extensions/opc/src/fetchopc.cpp
@@ -207,8 +207,8 @@ namespace processors {
           variablesFound_++;
         }
       } catch (const std::exception& exception) {
-        std::string browsename((char*)ref->browseName.name.data, ref->browseName.name.length);
-        logger_->log_warn("Caught Exception while trying to get data from node %s: %s", path + "/" + browsename,  exception.what());
+        std::string browse_name(reinterpret_cast<char*>(ref->browseName.name.data), ref->browseName.name.length);
+        logger_->log_warn("Caught Exception while trying to get data from node %s: %s", path + "/" + browse_name, exception.what());
       }
     }
     return true;
diff --git a/extensions/opc/src/opc.cpp b/extensions/opc/src/opc.cpp
index da86e79..a85d36b 100644
--- a/extensions/opc/src/opc.cpp
+++ b/extensions/opc/src/opc.cpp
@@ -15,23 +15,23 @@
  * limitations under the License.
  */
 
-// OPC includes
 #include "opc.h"
 
-// MiNiFi includes
+#include <stdlib.h>
+#include <memory>
+#include <vector>
+#include <string>
+#include <functional>
+
 #include "utils/StringUtils.h"
 #include "logging/Logger.h"
 #include "Exception.h"
 
 #include "utils/gsl.h"
 
-// Standard includes
-#include <stdlib.h>
-#include <iostream>
-#include <memory>
-#include <vector>
-#include <string>
-#include <functional>
+#include "open62541/client_highlevel.h"
+#include "open62541/client_config_default.h"
+
 
 namespace org {
 namespace apache {
@@ -45,70 +45,70 @@ namespace opc {
 
 namespace {
 
-  void add_value_to_variant(UA_Variant *variant, std::string &value) {
-    UA_String ua_value = UA_STRING(&value[0]);
-    UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_STRING]);
-  }
+void add_value_to_variant(UA_Variant *variant, std::string &value) {
+  UA_String ua_value = UA_STRING(&value[0]);
+  UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_STRING]);
+}
 
-  void add_value_to_variant(UA_Variant *variant, const char *value) {
-    std::string strvalue(value);
-    add_value_to_variant(variant, strvalue);
-  }
+void add_value_to_variant(UA_Variant *variant, const char *value) {
+  std::string strvalue(value);
+  add_value_to_variant(variant, strvalue);
+}
 
-  void add_value_to_variant(UA_Variant *variant, int64_t value) {
-    UA_Int64 ua_value = value;
-    UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_INT64]);
-  }
+void add_value_to_variant(UA_Variant *variant, int64_t value) {
+  UA_Int64 ua_value = value;
+  UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_INT64]);
+}
 
-  void add_value_to_variant(UA_Variant *variant, uint64_t value) {
-    UA_UInt64 ua_value = value;
-    UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_UINT64]);
-  }
+void add_value_to_variant(UA_Variant *variant, uint64_t value) {
+  UA_UInt64 ua_value = value;
+  UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_UINT64]);
+}
 
-  void add_value_to_variant(UA_Variant *variant, int32_t value) {
-    UA_Int32 ua_value = value;
-    UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_INT32]);
-  }
+void add_value_to_variant(UA_Variant *variant, int32_t value) {
+  UA_Int32 ua_value = value;
+  UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_INT32]);
+}
 
-  void add_value_to_variant(UA_Variant *variant, uint32_t value) {
-    UA_UInt32 ua_value = value;
-    UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_UINT32]);
-  }
+void add_value_to_variant(UA_Variant *variant, uint32_t value) {
+  UA_UInt32 ua_value = value;
+  UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_UINT32]);
+}
 
-  void add_value_to_variant(UA_Variant *variant, bool value) {
-    UA_Boolean ua_value = value;
-    UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_BOOLEAN]);
-  }
+void add_value_to_variant(UA_Variant *variant, bool value) {
+  UA_Boolean ua_value = value;
+  UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_BOOLEAN]);
+}
 
-  void add_value_to_variant(UA_Variant *variant, float value) {
-    UA_Float ua_value = value;
-    UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_FLOAT]);
-  }
+void add_value_to_variant(UA_Variant *variant, float value) {
+  UA_Float ua_value = value;
+  UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_FLOAT]);
+}
 
-  void add_value_to_variant(UA_Variant *variant, double value) {
-    UA_Double ua_value = value;
-    UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_DOUBLE]);
-  }
+void add_value_to_variant(UA_Variant *variant, double value) {
+  UA_Double ua_value = value;
+  UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_DOUBLE]);
+}
 
-  core::logging::LOG_LEVEL MapOPCLogLevel(UA_LogLevel ualvl) {
-    switch (ualvl) {
-      case UA_LOGLEVEL_TRACE:
-        return core::logging::trace;
-      case UA_LOGLEVEL_DEBUG:
-        return core::logging::debug;
-      case UA_LOGLEVEL_INFO:
-        return core::logging::info;
-      case UA_LOGLEVEL_WARNING:
-        return core::logging::warn;
-      case UA_LOGLEVEL_ERROR:
-        return core::logging::err;
-      case UA_LOGLEVEL_FATAL:
-        return core::logging::critical;
-      default:
-        return core::logging::critical;
-    }
+core::logging::LOG_LEVEL MapOPCLogLevel(UA_LogLevel ualvl) {
+  switch (ualvl) {
+    case UA_LOGLEVEL_TRACE:
+      return core::logging::trace;
+    case UA_LOGLEVEL_DEBUG:
+      return core::logging::debug;
+    case UA_LOGLEVEL_INFO:
+      return core::logging::info;
+    case UA_LOGLEVEL_WARNING:
+      return core::logging::warn;
+    case UA_LOGLEVEL_ERROR:
+      return core::logging::err;
+    case UA_LOGLEVEL_FATAL:
+      return core::logging::critical;
+    default:
+      return core::logging::critical;
   }
 }
+}  // namespace
 
 /*
  * End of internal functions
@@ -127,13 +127,13 @@ Client::Client(std::shared_ptr<core::logging::Logger> logger, const std::string&
     // Certificate
     UA_ByteString certByteString = UA_STRING_NULL;
     certByteString.length = certBuffer.size();
-    certByteString.data = (UA_Byte*)UA_malloc(certByteString.length * sizeof(UA_Byte));
+    certByteString.data = reinterpret_cast<UA_Byte*>(UA_malloc(certByteString.length * sizeof(UA_Byte)));
     memcpy(certByteString.data, certBuffer.data(), certByteString.length);
 
     // Key
     UA_ByteString keyByteString = UA_STRING_NULL;
     keyByteString.length = keyBuffer.size();
-    keyByteString.data = (UA_Byte*)UA_malloc(keyByteString.length * sizeof(UA_Byte));
+    keyByteString.data = reinterpret_cast<UA_Byte*>(UA_malloc(keyByteString.length * sizeof(UA_Byte)));
     memcpy(keyByteString.data, keyBuffer.data(), keyByteString.length);
 
     // Trusted certificates
@@ -142,7 +142,7 @@ Client::Client(std::shared_ptr<core::logging::Logger> logger, const std::string&
     for (size_t i = 0; i < trustBuffers.size(); i++) {
       trustList[i] = UA_STRING_NULL;
       trustList[i].length = trustBuffers[i].size();
-      trustList[i].data = (UA_Byte*)UA_malloc(trustList[i].length * sizeof(UA_Byte));
+      trustList[i].data = reinterpret_cast<UA_Byte*>(UA_malloc(trustList[i].length * sizeof(UA_Byte)));
       memcpy(trustList[i].data, trustBuffers[i].data(), trustList[i].length);
     }
     UA_StatusCode sc = UA_ClientConfig_setDefaultEncryption(cc, certByteString, keyByteString,
@@ -313,8 +313,8 @@ void Client::traverse(UA_NodeId nodeId, std::function<nodeFoundCallBackFunc> cb,
       UA_ReferenceDescription *ref = &(bResp.results[i].references[j]);
       if (cb(*this, ref, basePath)) {
         if (ref->nodeClass == UA_NODECLASS_VARIABLE || ref->nodeClass == UA_NODECLASS_OBJECT) {
-          std::string browsename((char *) ref->browseName.name.data, ref->browseName.name.length);
-          traverse(ref->nodeId.nodeId, cb, basePath + browsename, maxDepth, false);
+          std::string browse_name(reinterpret_cast<char *>(ref->browseName.name.data), ref->browseName.name.length);
+          traverse(ref->nodeId.nodeId, cb, basePath + browse_name, maxDepth, false);
         }
       } else {
         return;
@@ -347,7 +347,7 @@ UA_StatusCode Client::translateBrowsePathsToNodeIdsRequest(const std::string& pa
   UA_BrowsePath_init(&browsePath);
   browsePath.startingNode = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER);
 
-  browsePath.relativePath.elements = (UA_RelativePathElement*)UA_Array_new(tokens.size(), &UA_TYPES[UA_TYPES_RELATIVEPATHELEMENT]);
+  browsePath.relativePath.elements = reinterpret_cast<UA_RelativePathElement*>(UA_Array_new(tokens.size(), &UA_TYPES[UA_TYPES_RELATIVEPATHELEMENT]));
   browsePath.relativePath.elementsSize = tokens.size();
 
   for (size_t i = 0; i < tokens.size(); ++i) {
@@ -381,7 +381,7 @@ UA_StatusCode Client::translateBrowsePathsToNodeIdsRequest(const std::string& pa
       UA_NodeId resultId;
       UA_NodeId_copy(&res.targets[j].targetId.nodeId, &resultId);
       foundNodeIDs.push_back(resultId);
-      std::string namespaceUri((char*)res.targets[j].targetId.namespaceUri.data, res.targets[j].targetId.namespaceUri.length);
+      std::string namespaceUri(reinterpret_cast<char*>(res.targets[j].targetId.namespaceUri.data), res.targets[j].targetId.namespaceUri.length);
     }
   }
 
@@ -450,8 +450,10 @@ template UA_StatusCode Client::add_node<uint32_t>(const UA_NodeId parentNodeId,
 template UA_StatusCode Client::add_node<float>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, float value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
 template UA_StatusCode Client::add_node<double>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, double value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
 template UA_StatusCode Client::add_node<bool>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, bool value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
-template UA_StatusCode Client::add_node<const char *>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, const char * value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
-template UA_StatusCode Client::add_node<std::string>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, std::string value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<const char *>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName,
+    const char * value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<std::string>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName,
+    std::string value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
 
 int32_t OPCNodeDataTypeToTypeID(OPCNodeDataType dt) {
   switch (dt) {
@@ -482,7 +484,7 @@ std::string nodeValue2String(const NodeData& nd) {
     case UA_TYPES_STRING:
     case UA_TYPES_LOCALIZEDTEXT:
     case UA_TYPES_BYTESTRING: {
-      UA_String value = *(UA_String *)(nd.var_->data);
+      UA_String value = *reinterpret_cast<UA_String *>(nd.var_->data);
       ret_val = std::string(reinterpret_cast<const char *>(value.data), value.length);
       break;
     }
@@ -572,7 +574,7 @@ std::string OPCDateTime2String(UA_DateTime raw_date) {
 
 void logFunc(void *context, UA_LogLevel level, UA_LogCategory /*category*/, const char *msg, va_list args) {
   char buffer[1024];
-  vsnprintf(buffer, 1024, msg, args);
+  vsnprintf(buffer, sizeof buffer, msg, args);
   auto loggerPtr = reinterpret_cast<core::logging::BaseLogger*>(context);
   loggerPtr->log_string(MapOPCLogLevel(level), buffer);
 }
diff --git a/extensions/opencv/CMakeLists.txt b/extensions/opencv/CMakeLists.txt
index 49d5e99..c1fcd84 100644
--- a/extensions/opencv/CMakeLists.txt
+++ b/extensions/opencv/CMakeLists.txt
@@ -75,4 +75,5 @@ target_link_libraries(minifi-opencv ${OpenCV_LIBS})
 include_directories(${OpenCV_INCLUDE_DIRS})
 
 SET (OPENCV-EXTENSION minifi-opencv PARENT_SCOPE)
-register_extension(minifi-opencv)
\ No newline at end of file
+register_extension(minifi-opencv)
+register_extension_linter(minifi-opencv-linter)
diff --git a/extensions/opencv/CaptureRTSPFrame.cpp b/extensions/opencv/CaptureRTSPFrame.cpp
index e3c69da..dc241ba 100644
--- a/extensions/opencv/CaptureRTSPFrame.cpp
+++ b/extensions/opencv/CaptureRTSPFrame.cpp
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include <set>
 
 #include "CaptureRTSPFrame.h"
 
@@ -30,32 +31,31 @@ static core::Property rtspURI;
 static core::Property captureFrameRate;
 static core::Property imageEncoding;
 
-core::Property CaptureRTSPFrame::RTSPUsername(  // NOLINT
+core::Property CaptureRTSPFrame::RTSPUsername(
     "RTSP Username",
     "The username for connecting to the RTSP stream", "");
-core::Property CaptureRTSPFrame::RTSPPassword(  // NOLINT
+core::Property CaptureRTSPFrame::RTSPPassword(
     "RTSP Password",
     "Password used to connect to the RTSP stream", "");
-core::Property CaptureRTSPFrame::RTSPHostname(  // NOLINT
+core::Property CaptureRTSPFrame::RTSPHostname(
     "RTSP Hostname",
     "Hostname of the RTSP stream we are trying to connect to", "");
-core::Property CaptureRTSPFrame::RTSPURI(  // NOLINT
+core::Property CaptureRTSPFrame::RTSPURI(
     "RTSP URI",
     "URI that should be appended to the RTSP stream hostname", "");
-core::Property CaptureRTSPFrame::RTSPPort(  // NOLINT
+core::Property CaptureRTSPFrame::RTSPPort(
     "RTSP Port",
     "Port that should be connected to to receive RTSP Frames",
     "");
-core::Property CaptureRTSPFrame::ImageEncoding( // NOLINT
+core::Property CaptureRTSPFrame::ImageEncoding(
     "Image Encoding",
     "The encoding that should be applied the the frame images captured from the RTSP stream",
-    ".jpg"
-    );
+    ".jpg");
 
-core::Relationship CaptureRTSPFrame::Success(  // NOLINT
+core::Relationship CaptureRTSPFrame::Success(
     "success",
     "Successful capture of RTSP frame");
-core::Relationship CaptureRTSPFrame::Failure(  // NOLINT
+core::Relationship CaptureRTSPFrame::Failure(
     "failure",
     "Failures to capture RTSP frame");
 
diff --git a/extensions/opencv/CaptureRTSPFrame.h b/extensions/opencv/CaptureRTSPFrame.h
index 0262478..3c16743 100644
--- a/extensions/opencv/CaptureRTSPFrame.h
+++ b/extensions/opencv/CaptureRTSPFrame.h
@@ -15,18 +15,22 @@
  * limitations under the License.
  */
 
-#ifndef NIFI_MINIFI_CPP_CAPTURERTSPFRAME_H
-#define NIFI_MINIFI_CPP_CAPTURERTSPFRAME_H
+#pragma once
 
 #include <atomic>
 #include <iomanip>
 #include <ctime>
+#include <utility>
+#include <vector>
+#include <memory>
+#include <string>
 #include <opencv2/opencv.hpp>
 
 #include "core/Resource.h"
 #include "core/Processor.h"
 #include "utils/gsl.h"
 
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -35,7 +39,6 @@ namespace processors {
 
 class CaptureRTSPFrame : public core::Processor {
  public:
-
   explicit CaptureRTSPFrame(const std::string &name, const utils::Identifier &uuid = {})
       : Processor(name, uuid),
         logger_(logging::LoggerFactory<CaptureRTSPFrame>::getLogger()) {
@@ -63,8 +66,8 @@ class CaptureRTSPFrame : public core::Processor {
 
   class CaptureRTSPFrameWriteCallback : public OutputStreamCallback {
    public:
-    explicit CaptureRTSPFrameWriteCallback(cv::Mat image_mat, std::string image_encoding_)
-        : image_mat_(std::move(image_mat)), image_encoding_(image_encoding_) {
+    explicit CaptureRTSPFrameWriteCallback(cv::Mat image_mat, std::string image_encoding)
+        : image_mat_(std::move(image_mat)), image_encoding_(image_encoding) {
     }
     ~CaptureRTSPFrameWriteCallback() override = default;
 
@@ -135,6 +138,3 @@ REGISTER_RESOURCE(CaptureRTSPFrame, "Captures a frame from the RTSP stream at sp
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-
-#endif  // NIFI_MINIFI_CPP_CAPTURERTSPFRAME_H
diff --git a/extensions/opencv/FrameIO.h b/extensions/opencv/FrameIO.h
index fa8d192..0f8b4e1 100644
--- a/extensions/opencv/FrameIO.h
+++ b/extensions/opencv/FrameIO.h
@@ -15,8 +15,12 @@
  * limitations under the License.
  */
 
-#ifndef NIFI_MINIFI_CPP_FRAMEIO_H
-#define NIFI_MINIFI_CPP_FRAMEIO_H
+#pragma once
+
+#include <utility>
+#include <vector>
+#include <memory>
+#include <string>
 
 #include "utils/gsl.h"
 
@@ -27,46 +31,45 @@ namespace minifi {
 namespace opencv {
 
 class FrameWriteCallback : public OutputStreamCallback {
-  public:
-    explicit FrameWriteCallback(cv::Mat image_mat, std::string image_encoding_)
-    // TODO - Nghia: Check std::move(img_mat).
-        : image_mat_(std::move(image_mat)), image_encoding_(image_encoding_) {
-    }
-    ~FrameWriteCallback() override = default;
+ public:
+  explicit FrameWriteCallback(cv::Mat image_mat, std::string image_encoding)
+      : image_mat_(std::move(image_mat)), image_encoding_(image_encoding) {
+  }
+  ~FrameWriteCallback() override = default;
 
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      imencode(image_encoding_, image_mat_, image_buf_);
-      const auto ret = stream->write(image_buf_.data(), image_buf_.size());
-      return io::isError(ret) ? -1 : gsl::narrow<int64_t>(ret);
-    }
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    imencode(image_encoding_, image_mat_, image_buf_);
+    const auto ret = stream->write(image_buf_.data(), image_buf_.size());
+    return io::isError(ret) ? -1 : gsl::narrow<int64_t>(ret);
+  }
 
-  private:
-    std::vector<uchar> image_buf_;
-    cv::Mat image_mat_;
-    std::string image_encoding_;
+ private:
+  std::vector<uchar> image_buf_;
+  cv::Mat image_mat_;
+  std::string image_encoding_;
 };
 
 class FrameReadCallback : public InputStreamCallback {
-  public:
-    explicit FrameReadCallback(cv::Mat &image_mat)
-        : image_mat_(image_mat) {
-    }
-    ~FrameReadCallback() override = default;
+ public:
+  explicit FrameReadCallback(cv::Mat &image_mat)
+      : image_mat_(image_mat) {
+  }
+  ~FrameReadCallback() override = default;
 
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      int64_t ret = 0;
-      image_buf_.resize(stream->size());
-      ret = stream->read(image_buf_.data(), static_cast<int>(stream->size()));
-      if (ret < 0 || static_cast<std::size_t>(ret) != stream->size()) {
-        throw std::runtime_error("ImageReadCallback failed to fully read flow file input stream");
-      }
-      image_mat_ = cv::imdecode(image_buf_, -1);
-      return ret;
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    int64_t ret = 0;
+    image_buf_.resize(stream->size());
+    ret = stream->read(image_buf_.data(), static_cast<int>(stream->size()));
+    if (ret < 0 || static_cast<std::size_t>(ret) != stream->size()) {
+      throw std::runtime_error("ImageReadCallback failed to fully read flow file input stream");
     }
+    image_mat_ = cv::imdecode(image_buf_, -1);
+    return ret;
+  }
 
-  private:
-    std::vector<uchar> image_buf_;
-    cv::Mat &image_mat_;
+ private:
+  std::vector<uchar> image_buf_;
+  cv::Mat &image_mat_;
 };
 
 } /* namespace opencv */
@@ -74,5 +77,3 @@ class FrameReadCallback : public InputStreamCallback {
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif  // NIFI_MINIFI_CPP_FRAMEIO_H
diff --git a/extensions/opencv/MotionDetector.cpp b/extensions/opencv/MotionDetector.cpp
index 1888a6c..d67b47a 100644
--- a/extensions/opencv/MotionDetector.cpp
+++ b/extensions/opencv/MotionDetector.cpp
@@ -15,7 +15,12 @@
  * limitations under the License.
  */
 
+#include <set>
+#include <utility>
+#include <vector>
+
 #include "MotionDetector.h"
+#include "FrameIO.h"
 
 namespace org {
 namespace apache {
diff --git a/extensions/opencv/MotionDetector.h b/extensions/opencv/MotionDetector.h
index 0cee7d3..95a7404 100644
--- a/extensions/opencv/MotionDetector.h
+++ b/extensions/opencv/MotionDetector.h
@@ -15,20 +15,16 @@
  * limitations under the License.
  */
 
-#ifndef NIFI_MINIFI_CPP_MOTIONDETECTOR_H
-#define NIFI_MINIFI_CPP_MOTIONDETECTOR_H
-
-#include <atomic>
-
-#include <core/Resource.h>
-#include <core/Processor.h>
-#include <opencv2/opencv.hpp>
-#include <opencv2/objdetect.hpp>
-#include <opencv2/imgproc.hpp>
-#include "FrameIO.h"
+#pragma once
 
 #include <iomanip>
-#include <ctime>
+#include <memory>
+#include <string>
+
+#include "core/Resource.h"
+#include "core/Processor.h"
+#include "opencv2/opencv.hpp"
+#include "opencv2/imgproc.hpp"
 
 namespace org {
 namespace apache {
@@ -38,7 +34,6 @@ namespace processors {
 
 class MotionDetector : public core::Processor {
  public:
-
   explicit MotionDetector(const std::string &name, const utils::Identifier &uuid = {})
       : Processor(name, uuid),
         logger_(logging::LoggerFactory<MotionDetector>::getLogger()) {
@@ -60,7 +55,6 @@ class MotionDetector : public core::Processor {
   void notifyStop() override;
 
  private:
-
   bool detectAndDraw(cv::Mat &frame);
 
   std::shared_ptr<logging::Logger> logger_;
@@ -76,12 +70,10 @@ class MotionDetector : public core::Processor {
   const double IMG_WIDTH = 500.0;
 };
 
-REGISTER_RESOURCE(MotionDetector, "Detect motion from captured images."); // NOLINT
+REGISTER_RESOURCE(MotionDetector, "Detect motion from captured images.");
 
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif  // NIFI_MINIFI_CPP_MOTIONDETECTOR_H
diff --git a/extensions/opencv/OpenCVLoader.h b/extensions/opencv/OpenCVLoader.h
index 516b87f..84ae4a3 100644
--- a/extensions/opencv/OpenCVLoader.h
+++ b/extensions/opencv/OpenCVLoader.h
@@ -15,8 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSIONS_OPENCVLOADER_H_
-#define EXTENSIONS_OPENCVLOADER_H_
+#pragma once
+
+#include <vector>
+#include <string>
+#include <memory>
 
 #include "CaptureRTSPFrame.h"
 #include "core/ClassLoader.h"
@@ -70,7 +73,7 @@ class OpenCVObjectFactory : public core::ObjectFactory {
     }
   }
 
-  virtual std::unique_ptr<core::ObjectFactoryInitializer> getInitializer() override {
+  std::unique_ptr<core::ObjectFactoryInitializer> getInitializer() override {
     return std::unique_ptr<core::ObjectFactoryInitializer>(new OpenCVObjectFactoryInitializer());
   }
 
@@ -80,4 +83,3 @@ class OpenCVObjectFactory : public core::ObjectFactory {
 extern "C" {
   DLL_EXPORT void *createOpenCVFactory(void);
 }
-#endif /* EXTENSIONS_OPENCVLOADER_H_ */
diff --git a/extensions/opencv/tests/CaptureRTSPFrameTest.cpp b/extensions/opencv/tests/CaptureRTSPFrameTest.cpp
index fd7e67c..bd74116 100644
--- a/extensions/opencv/tests/CaptureRTSPFrameTest.cpp
+++ b/extensions/opencv/tests/CaptureRTSPFrameTest.cpp
@@ -37,7 +37,7 @@
 #include "processors/LogAttribute.h"
 #include "../../../libminifi/test/unit/ProvenanceTestHelper.h"
 
-// TODO: valid capture test needs to be fixed
+// TODO(_): valid capture test needs to be fixed
 TEST_CASE("CaptureRTSPFrame::ValidCapture", "[!mayfail]") {
     TestController testController;
 
diff --git a/extensions/openwsman/CMakeLists.txt b/extensions/openwsman/CMakeLists.txt
index a9470d3..0903ba1 100644
--- a/extensions/openwsman/CMakeLists.txt
+++ b/extensions/openwsman/CMakeLists.txt
@@ -30,3 +30,4 @@ target_link_libraries(minifi-openwsman OpenWSMAN::libwsman CIVETWEB::civetweb-cp
 
 SET (OPENWSMAN-EXTENSION minifi-openwsman PARENT_SCOPE)
 register_extension(minifi-openwsman)
+register_extension_linter(minifi-openwsman-linter)
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
index 1798d33..8f1c57e 100644
--- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
@@ -15,8 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __SOURCE_INITIATED_SUBSCRIPTION_PROCESSOR_H__
-#define __SOURCE_INITIATED_SUBSCRIPTION_PROCESSOR_H__
+#pragma once
 
 #include <memory>
 #include <string>
@@ -169,5 +168,3 @@ REGISTER_RESOURCE(SourceInitiatedSubscriptionListener, "This processor implement
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif  // __SOURCE_INITIATED_SUBSCRIPTION_PROCESSOR_H__
diff --git a/extensions/pcap/CMakeLists.txt b/extensions/pcap/CMakeLists.txt
index 4698dd0..2d9f58b 100644
--- a/extensions/pcap/CMakeLists.txt
+++ b/extensions/pcap/CMakeLists.txt
@@ -19,7 +19,7 @@
 
 find_package(PCAP REQUIRED)
 
-include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) 
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
 
 include_directories(/usr/include/netinet)
 
@@ -67,4 +67,4 @@ target_link_libraries(minifi-pcap ${LIBMINIFI} Threads::Threads)
 
 SET (PCAP-EXTENSION minifi-pcap PARENT_SCOPE)
 register_extension(minifi-pcap)
-
+register_extension_linter(minifi-pcap-linter)
diff --git a/extensions/pcap/CapturePacket.cpp b/extensions/pcap/CapturePacket.cpp
index 8ab435b..33cf5d1 100644
--- a/extensions/pcap/CapturePacket.cpp
+++ b/extensions/pcap/CapturePacket.cpp
@@ -51,11 +51,18 @@ namespace minifi {
 namespace processors {
 
 std::shared_ptr<utils::IdGenerator> CapturePacket::id_generator_ = utils::IdGenerator::getIdGenerator();
-core::Property CapturePacket::BaseDir(core::PropertyBuilder::createProperty("Base Directory")->withDescription("Scratch directory for PCAP files")->withDefaultValue<std::string>("/tmp/")->build());
+core::Property CapturePacket::BaseDir(core::PropertyBuilder::createProperty("Base Directory")
+    ->withDescription("Scratch directory for PCAP files")
+    ->withDefaultValue<std::string>("/tmp/")->build());
+
+core::Property CapturePacket::BatchSize(core::PropertyBuilder::createProperty("Batch Size")
+    ->withDescription("The number of packets to combine within a given PCAP")
+    ->withDefaultValue<uint64_t>(50)->build());
 
-core::Property CapturePacket::BatchSize(core::PropertyBuilder::createProperty("Batch Size")->withDescription("The number of packets to combine within a given PCAP")->withDefaultValue<uint64_t>(50)->build());
 core::Property CapturePacket::NetworkControllers("Network Controllers", "Regular expression of the network controller(s) to which we will attach", ".*");
-core::Property CapturePacket::CaptureBluetooth(core::PropertyBuilder::createProperty("Capture Bluetooth")->withDescription("True indicates that we support bluetooth interfaces")->withDefaultValue<bool>(false)->build());
+core::Property CapturePacket::CaptureBluetooth(core::PropertyBuilder::createProperty("Capture Bluetooth")
+    ->withDescription("True indicates that we support bluetooth interfaces")
+    ->withDefaultValue<bool>(false)->build());
 
 const char *CapturePacket::ProcessorName = "CapturePacket";
 
diff --git a/extensions/pcap/CapturePacket.h b/extensions/pcap/CapturePacket.h
index 860c549..c6ce10c 100644
--- a/extensions/pcap/CapturePacket.h
+++ b/extensions/pcap/CapturePacket.h
@@ -16,11 +16,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSIONS_CAPTUREPACKET_H
-#define EXTENSIONS_CAPTUREPACKET_H
+#pragma once
 
 #include <memory>
 #include <regex>
+#include <vector>
+#include <string>
 
 #include "PcapLiveDeviceList.h"
 #include "PcapFilter.h"
@@ -73,15 +74,16 @@ class CapturePacketMechanism {
     return file_;
   }
 
-  long getSize() const {
+  int64_t getSize() const {
     return atomic_count_;
   }
+
  protected:
   CapturePacketMechanism &operator=(const CapturePacketMechanism &other) = delete;
   std::string path_;
   std::string file_;
   int64_t *max_size_;
-  std::atomic<long> atomic_count_;
+  std::atomic<int64_t> atomic_count_;
 };
 
 struct PacketMovers {
@@ -92,7 +94,6 @@ struct PacketMovers {
 // CapturePacket Class
 class CapturePacket : public core::Processor {
  public:
-
   // Constructor
   /*!
    * Create a new processor
@@ -122,7 +123,6 @@ class CapturePacket : public core::Processor {
   static void packet_callback(pcpp::RawPacket* packet, pcpp::PcapLiveDevice* dev, void* data);
 
  protected:
-
   void notifyStop() override {
     logger_->log_debug("Stopping capture");
     for (auto dev : device_list_) {
@@ -149,7 +149,6 @@ class CapturePacket : public core::Processor {
   static CapturePacketMechanism *create_new_capture(const std::string &base_path, int64_t *max_size);
 
  private:
-
   inline std::string getPath() {
     return base_dir_ + "/" + base_path_;
   }
@@ -174,5 +173,3 @@ REGISTER_RESOURCE(CapturePacket, "CapturePacket captures and writes one or more
 } /* namespace nifi */
 } /* namespace apache */
 } /* namespace org */
-
-#endif /* EXTENSIONS_CAPTUREPACKET_H */
diff --git a/extensions/pcap/PcapLoader.h b/extensions/pcap/PcapLoader.h
index d361799..5b7c47d 100644
--- a/extensions/pcap/PcapLoader.h
+++ b/extensions/pcap/PcapLoader.h
@@ -15,8 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSIONS_PCAPLOADER_H
-#define EXTENSIONS_PCAPLOADER_H
+#pragma once
+
+#include <vector>
+#include <string>
+#include <memory>
 
 #include "core/ClassLoader.h"
 #include "CapturePacket.h"
@@ -61,4 +64,3 @@ class PcapFactory : public core::ObjectFactory {
 extern "C" {
 DLL_EXPORT void *createPcapFactory(void);
 }
-#endif /* EXTENSIONS_PCAPLOADER_H */
diff --git a/libminifi/include/io/validation.h b/libminifi/include/io/validation.h
index da21d8b..eea0fde 100644
--- a/libminifi/include/io/validation.h
+++ b/libminifi/include/io/validation.h
@@ -22,6 +22,7 @@
 #include <type_traits>
 #include <string>
 #include <cstring>
+#include <memory>
 
 /**
  * A checker that will, at compile time, tell us
diff --git a/libminifi/test/archive-tests/ManipulateArchiveTests.cpp b/libminifi/test/archive-tests/ManipulateArchiveTests.cpp
index 0657c9f..ff83482 100644
--- a/libminifi/test/archive-tests/ManipulateArchiveTests.cpp
+++ b/libminifi/test/archive-tests/ManipulateArchiveTests.cpp
@@ -16,9 +16,7 @@
  * limitations under the License.
  */
 
-#include <iostream>
 #include <map>
-#include <set>
 #include <string>
 #include <algorithm>
 #include <memory>
@@ -30,6 +28,8 @@
 #include "processors/LogAttribute.h"
 #include "processors/PutFile.h"
 #include "ManipulateArchive.h"
+#include "FocusArchiveEntry.h"
+#include "UnfocusArchiveEntry.h"
 
 const char TEST_ARCHIVE_NAME[] = "manipulate_test_archive.tar";
 const int NUM_FILES = 3;