You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2022/11/02 23:23:47 UTC

[nifi-minifi-cpp] 02/02: MINIFICPP-1967 Add batch processing of lines in TailFile

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit c5d58e8836723ba232440d457c6ff1be811bd075
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Thu Oct 20 13:13:46 2022 +0200

    MINIFICPP-1967 Add batch processing of lines in TailFile
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #1439
---
 PROCESSORS.md                                      |  1 +
 .../standard-processors/processors/TailFile.cpp    | 14 +++++++++-
 .../standard-processors/processors/TailFile.h      | 23 ++++++-----------
 .../tests/unit/TailFileTests.cpp                   | 30 ++++++++++++++++++++++
 4 files changed, 52 insertions(+), 16 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 8e7e6e328..8a0aac5de 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -2424,6 +2424,7 @@ In the list below, the names of required properties appear in bold. Any other pr
 | State File                 | TailFileState     |                                                        | Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off                                                                                                                                                                                                                                           [...]
 | tail-base-directory        |                   |                                                        | Base directory used to look for files to tail. This property is required when using Multiple file mode. Can contain expression language placeholders if Attribute Provider Service is set.<br/>**Supports Expression Language: true**                                                                                                                                                            [...]
 | **tail-mode**              | Single file       | Single file<br>Multiple file<br>                       | Specifies the tail file mode. In 'Single file' mode only a single file will be watched. In 'Multiple file' mode a regex may be used. Note that in multiple file mode we will still continue to watch for rollover on the initial set of watched files. The Regex used to locate multiple files will be run during the schedule phrase. Note that if rotated files are matched by the regex, thos [...]
+| **Batch Size**             | 0                 |                                                        | Maximum number of flowfiles emitted in a single trigger. If set to 0 all new content will be processed.                                                                                                                                                                                                                                                                                          [...]
 ### Relationships
 
 | Name    | Description                     |
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 7a3f20194..abfd6784b 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -135,6 +135,13 @@ const core::Property TailFile::AttributeProviderService(
         ->asType<minifi::controllers::AttributeProviderService>()
         ->build());
 
+const core::Property TailFile::BatchSize(
+    core::PropertyBuilder::createProperty("Batch Size")
+        ->withDescription("Maximum number of flowfiles emitted in a single trigger. If set to 0 all new content will be processed.")
+        ->isRequired(true)
+        ->withDefaultValue<uint32_t>(0)
+        ->build());
+
 const core::Relationship TailFile::Success("success", "All files are routed to success");
 
 const char *TailFile::CURRENT_STR = "CURRENT.";
@@ -395,6 +402,11 @@ void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
   context->getProperty(RollingFilenamePattern.getName(), rolling_filename_pattern_glob);
   rolling_filename_pattern_ = utils::file::globToRegex(rolling_filename_pattern_glob);
   initial_start_position_ = InitialStartPositions{utils::parsePropertyWithAllowableValuesOrThrow(*context, InitialStartPosition.getName(), InitialStartPositions::values())};
+
+  uint32_t batch_size = 0;
+  if (context->getProperty(BatchSize.getName(), batch_size) && batch_size != 0) {
+    batch_size_ = batch_size;
+  }
 }
 
 void TailFile::parseAttributeProviderServiceProperty(core::ProcessContext& context) {
@@ -784,7 +796,7 @@ void TailFile::processSingleFile(const std::shared_ptr<core::ProcessSession> &se
     FileReaderCallback file_reader{full_file_name, state.position_, delim, state.checksum_};
     TailState state_copy{state};
 
-    while (file_reader.hasMoreToRead()) {
+    while (file_reader.hasMoreToRead() && (!batch_size_ || *batch_size_ > num_flow_files)) {
       auto flow_file = session->create();
       session->write(flow_file, std::ref(file_reader));
 
diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h
index 58a9b1122..88cf7c018 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -17,8 +17,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_TAILFILE_H_
-#define EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_TAILFILE_H_
+#pragma once
 
 #include <map>
 #include <memory>
@@ -27,6 +26,7 @@
 #include <unordered_map>
 #include <vector>
 #include <set>
+#include <optional>
 
 #include "controllers/AttributeProviderService.h"
 #include "FlowFileRecord.h"
@@ -37,11 +37,7 @@
 #include "utils/Enum.h"
 #include "utils/Export.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 
 struct TailState {
   TailState(std::string path, std::string file_name, uint64_t position,
@@ -106,6 +102,7 @@ class TailFile : public core::Processor {
   EXTENSIONAPI static const core::Property RollingFilenamePattern;
   EXTENSIONAPI static const core::Property InitialStartPosition;
   EXTENSIONAPI static const core::Property AttributeProviderService;
+  EXTENSIONAPI static const core::Property BatchSize;
 
   static auto properties() {
     return std::array{
@@ -118,7 +115,8 @@ class TailFile : public core::Processor {
       LookupFrequency,
       RollingFilenamePattern,
       InitialStartPosition,
-      AttributeProviderService
+      AttributeProviderService,
+      BatchSize
     };
   }
 
@@ -215,13 +213,8 @@ class TailFile : public core::Processor {
   bool first_trigger_{true};
   controllers::AttributeProviderService* attribute_provider_service_ = nullptr;
   std::unordered_map<std::string, controllers::AttributeProviderService::AttributeMap> extra_attributes_;
+  std::optional<uint32_t> batch_size_;
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<TailFile>::getLogger();
 };
 
-}  // namespace processors
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_TAILFILE_H_
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index 387f579a2..808dbb190 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -44,6 +44,7 @@
 #include "LogAttribute.h"
 #include "utils/TestUtils.h"
 #include "utils/StringUtils.h"
+#include "SingleProcessorTestController.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -1847,3 +1848,32 @@ TEST_CASE("TailFile can use an AttributeProviderService", "[AttributeProviderSer
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("TailFile honors batch size for maximum lines processed", "[batchSize]") {
+  LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+
+  auto tailfile = std::make_shared<minifi::processors::TailFile>("TailFile");
+  minifi::test::SingleProcessorTestController test_controller(tailfile);
+
+  auto dir = test_controller.createTempDirectory();
+  std::stringstream temp_file;
+  temp_file << dir << utils::file::get_separator() << TMP_FILE;
+
+  std::ofstream tmpfile;
+  tmpfile.open(temp_file.str(), std::ios::out | std::ios::binary);
+  for (auto i = 0; i < 20; ++i) {
+    tmpfile << NEW_TAIL_DATA;
+  }
+  tmpfile.close();
+
+  std::stringstream state_file;
+  state_file << dir << utils::file::get_separator() << STATE_FILE;
+
+  tailfile->setProperty(minifi::processors::TailFile::FileName.getName(), temp_file.str());
+  tailfile->setProperty(minifi::processors::TailFile::Delimiter.getName(), "\n");
+  tailfile->setProperty(minifi::processors::TailFile::BatchSize.getName(), "10");
+
+  const auto result = test_controller.trigger();
+  const auto& file_contents = result.at(minifi::processors::TailFile::Success);
+  REQUIRE(file_contents.size() == 10);
+}