You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/03/08 15:15:19 UTC

[nifi-minifi-cpp] 01/02: MINIFICPP-1762 Add the attributes from the AttributeProviderService to the flow file

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit e1cfc444b407198c76807e8002467eab0e78cdcb
Author: Ferenc Gerlits <fg...@gmail.com>
AuthorDate: Tue Mar 1 11:21:51 2022 +0100

    MINIFICPP-1762 Add the attributes from the AttributeProviderService to the flow file
    
    Closes #1274
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .../test/integration/features/kubernetes.feature   | 15 +++++++
 .../KubernetesControllerService.h                  |  2 +
 .../standard-processors/processors/TailFile.cpp    | 46 +++++++++++++---------
 .../standard-processors/processors/TailFile.h      |  4 +-
 .../tests/unit/TailFileTests.cpp                   | 16 ++++++++
 .../include/controllers/AttributeProviderService.h |  1 +
 6 files changed, 64 insertions(+), 20 deletions(-)

diff --git a/docker/test/integration/features/kubernetes.feature b/docker/test/integration/features/kubernetes.feature
index af7ac29..5d14b19 100644
--- a/docker/test/integration/features/kubernetes.feature
+++ b/docker/test/integration/features/kubernetes.feature
@@ -56,3 +56,18 @@ Feature: TailFile can collect logs from Kubernetes pods
     And the "success" relationship of the TailFile processor is connected to the PutFile
     When the MiNiFi instance starts up
     Then one flowfile with the contents "Hello again, World!" is placed in the monitored directory in less than 30 seconds
+
+  Scenario: Pod name etc are added as flow file attributes
+    Given a TailFile processor in a Kubernetes cluster
+    And the "tail-mode" property of the TailFile processor is set to "Multiple file"
+    And the "tail-base-directory" property of the TailFile processor is set to "/var/log/pods/${namespace}_${pod}_${uid}/${container}"
+    And the "File to Tail" property of the TailFile processor is set to ".*\.log"
+    And the "Lookup frequency" property of the TailFile processor is set to "1s"
+    And the TailFile processor has an Attribute Provider Service which is a Kubernetes Controller Service with the "Pod Name Filter" property set to ".*one"
+    And a LogAttribute processor in the Kubernetes cluster
+    And the "success" relationship of the TailFile processor is connected to the LogAttribute
+    When the MiNiFi instance starts up
+    Then the Minifi logs contain the following message: "key:kubernetes.namespace value:default" in less than 30 seconds
+    And the Minifi logs contain the following message: "key:kubernetes.pod value:hello-world-one" in less than 1 second
+    And the Minifi logs contain the following message: "key:kubernetes.uid value:" in less than 1 second
+    And the Minifi logs contain the following message: "key:kubernetes.container value:echo-one" in less than 1 second
diff --git a/extensions/kubernetes/controllerservice/KubernetesControllerService.h b/extensions/kubernetes/controllerservice/KubernetesControllerService.h
index b088e3b..e923749 100644
--- a/extensions/kubernetes/controllerservice/KubernetesControllerService.h
+++ b/extensions/kubernetes/controllerservice/KubernetesControllerService.h
@@ -18,6 +18,7 @@
 
 #include <memory>
 #include <string>
+#include <string_view>
 #include <vector>
 
 #include "controllers/AttributeProviderService.h"
@@ -38,6 +39,7 @@ class KubernetesControllerService : public AttributeProviderService {
   void initialize() final;
   void onEnable() override;
   std::optional<std::vector<AttributeMap>> getAttributes() override;
+  std::string_view name() const override { return "kubernetes"; }
 
  private:
   class APIClient;
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index eac0d07..098e794 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -833,9 +833,20 @@ void TailFile::updateFlowFileAttributes(const std::string &full_file_name, const
   flow_file->setAttribute(core::SpecialFlowAttribute::PATH, state.path_);
   flow_file->addAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, full_file_name);
   flow_file->setAttribute(core::SpecialFlowAttribute::FILENAME, logName);
+
   flow_file->setAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, baseName);
   flow_file->setAttribute(textfragmentutils::POST_NAME_ATTRIBUTE, extension);
   flow_file->setAttribute(textfragmentutils::OFFSET_ATTRIBUTE, std::to_string(state.position_));
+
+  if (extra_attributes_.contains(state.path_)) {
+    std::string prefix;
+    if (attribute_provider_service_) {
+      prefix = std::string(attribute_provider_service_->name()) + ".";
+    }
+    for (const auto& [key, value] : extra_attributes_.at(state.path_)) {
+      flow_file->setAttribute(prefix + key, value);
+    }
+  }
 }
 
 void TailFile::updateStateAttributes(TailState &state, uint64_t size, uint64_t checksum) const {
@@ -878,33 +889,30 @@ void TailFile::checkForNewFiles(core::ProcessContext& context) {
     return true;
   };
 
-  if (attribute_provider_service_) {
-    for (const auto& base_dir : getBaseDirectories(context)) {
-      utils::file::list_dir(base_dir, add_new_files_callback, logger_, recursive_lookup_);
-    }
-  } else {
+  if (!attribute_provider_service_) {
     utils::file::list_dir(base_dir_, add_new_files_callback, logger_, recursive_lookup_);
+    return;
   }
-}
-
-std::vector<std::string> TailFile::getBaseDirectories(core::ProcessContext& context) const {
-  gsl_Expects(attribute_provider_service_);
 
   const auto attribute_maps = attribute_provider_service_->getAttributes();
   if (!attribute_maps) {
     logger_->log_error("Could not get attributes from the Attribute Provider Service");
-    return {};
+    return;
   }
 
-  return attribute_maps.value() |
-      ranges::views::transform([&context](const auto& attribute_map) {
-        auto flow_file = std::make_shared<FlowFileRecord>();
-        for (const auto& [key, value] : attribute_map) {
-          flow_file->setAttribute(key, value);
-        }
-        return context.getProperty(BaseDirectory, flow_file).value();
-      }) |
-      ranges::to<std::vector<std::string>>();
+  for (const auto& attribute_map : *attribute_maps) {
+    std::string base_dir = baseDirectoryFromAttributes(attribute_map, context);
+    extra_attributes_[base_dir] = attribute_map;
+    utils::file::list_dir(base_dir, add_new_files_callback, logger_, recursive_lookup_);
+  }
+}
+
+std::string TailFile::baseDirectoryFromAttributes(const controllers::AttributeProviderService::AttributeMap& attribute_map, core::ProcessContext& context) const {
+  auto flow_file = std::make_shared<FlowFileRecord>();
+  for (const auto& [key, value] : attribute_map) {
+    flow_file->setAttribute(key, value);
+  }
+  return context.getProperty(BaseDirectory, flow_file).value();
 }
 
 std::chrono::milliseconds TailFile::getLookupFrequency() const {
diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h
index 9d8ef36..10bb94b 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -24,6 +24,7 @@
 #include <memory>
 #include <utility>
 #include <string>
+#include <unordered_map>
 #include <vector>
 #include <set>
 
@@ -166,7 +167,7 @@ class TailFile : public core::Processor {
   void doMultifileLookup(core::ProcessContext& context);
   void checkForRemovedFiles();
   void checkForNewFiles(core::ProcessContext& context);
-  std::vector<std::string> getBaseDirectories(core::ProcessContext& context) const;
+  std::string baseDirectoryFromAttributes(const controllers::AttributeProviderService::AttributeMap& attribute_map, core::ProcessContext& context) const;
   void updateFlowFileAttributes(const std::string &full_file_name, const TailState &state, const std::string &fileName,
                                 const std::string &baseName, const std::string &extension,
                                 std::shared_ptr<core::FlowFile> &flow_file) const;
@@ -190,6 +191,7 @@ class TailFile : public core::Processor {
   InitialStartPositions initial_start_position_;
   bool first_trigger_{true};
   controllers::AttributeProviderService* attribute_provider_service_ = nullptr;
+  std::unordered_map<std::string, controllers::AttributeProviderService::AttributeMap> extra_attributes_;
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<TailFile>::getLogger();
 };
 
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index a24d4d3..089d812 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -1777,6 +1777,7 @@ class TestAttributeProviderService : public minifi::controllers::AttributeProvid
     return std::vector<AttributeMap>{AttributeMap{{"color", "red"}, {"fruit", "apple"}, {"uid", "001"}, {"animal", "dog"}},
                                      AttributeMap{{"color", "yellow"}, {"fruit", "banana"}, {"uid", "004"}, {"animal", "dolphin"}}};
   }
+  std::string_view name() const override { return "test"; }
 };
 REGISTER_RESOURCE(TestAttributeProviderService, "An attribute provider service which provides a constant set of records.");
 
@@ -1814,5 +1815,20 @@ TEST_CASE("TailFile can use an AttributeProviderService", "[AttributeProviderSer
   CHECK(LogTestController::getInstance().contains("key:absolute.path value:" + (temp_directory / "my_red_apple_001" / "dog" / "1.log").string()));
   CHECK(LogTestController::getInstance().contains("key:absolute.path value:" + (temp_directory / "my_yellow_banana_004" / "dolphin" / "0.log").string()));
 
+  CHECK(LogTestController::getInstance().contains("key:test.color value:red"));
+  CHECK(LogTestController::getInstance().contains("key:test.color value:yellow"));
+  CHECK(LogTestController::getInstance().contains("key:test.fruit value:apple"));
+  CHECK(LogTestController::getInstance().contains("key:test.fruit value:banana"));
+  CHECK(LogTestController::getInstance().contains("key:test.uid value:001"));
+  CHECK(LogTestController::getInstance().contains("key:test.uid value:004"));
+  CHECK(LogTestController::getInstance().contains("key:test.animal value:dog"));
+  CHECK(LogTestController::getInstance().contains("key:test.animal value:dolphin"));
+
+  CHECK_FALSE(LogTestController::getInstance().contains("key:test.fruit value:strawberry"));
+  CHECK_FALSE(LogTestController::getInstance().contains("key:test.uid value:002"));
+  CHECK_FALSE(LogTestController::getInstance().contains("key:test.uid value:003"));
+  CHECK_FALSE(LogTestController::getInstance().contains("key:test.animal value:elephant"));
+  CHECK_FALSE(LogTestController::getInstance().contains("key:test.animal value:horse"));
+
   LogTestController::getInstance().reset();
 }
diff --git a/libminifi/include/controllers/AttributeProviderService.h b/libminifi/include/controllers/AttributeProviderService.h
index 0c127e4..3aee6a5 100644
--- a/libminifi/include/controllers/AttributeProviderService.h
+++ b/libminifi/include/controllers/AttributeProviderService.h
@@ -35,6 +35,7 @@ class AttributeProviderService : public core::controller::ControllerService {
 
   using AttributeMap = std::unordered_map<std::string, std::string>;
   virtual std::optional<std::vector<AttributeMap>> getAttributes() = 0;
+  virtual std::string_view name() const = 0;
 };
 
 }  // namespace org::apache::nifi::minifi::controllers