You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2019/02/25 12:41:59 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-735: Fix issues with delimiter, make changes to facilitate testing this change

This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 54007b2  MINIFICPP-735: Fix issues with delimiter, make changes to facilitate testing this change
54007b2 is described below

commit 54007b2ab7371bb8462ac46d89aa9969f3178d6b
Author: Marc Parisi <ph...@apache.org>
AuthorDate: Thu Feb 14 11:17:17 2019 -0500

    MINIFICPP-735: Fix issues with delimiter, make changes to facilitate testing this change
    
    Approval by jdye64 on GH.
    
    Signed-off-by: Marc Parisi <ph...@apache.org>
---
 cmake/BuildTests.cmake                             |   2 +
 libminifi/include/core/state/ProcessorController.h |  12 ++-
 libminifi/include/processors/TailFile.h            |  15 +--
 libminifi/src/core/ProcessSession.cpp              |  20 ++--
 libminifi/src/processors/TailFile.cpp              |  33 ++++++-
 libminifi/test/integration/IntegrationBase.h       |   6 +-
 libminifi/test/integration/TailFileTest.cpp        | 108 +++++++++++++++++++++
 libminifi/test/resources/TestTailFile.yml          |  63 ++++++++++++
 libminifi/test/unit/TailFileTests.cpp              |   2 +-
 9 files changed, 236 insertions(+), 25 deletions(-)

diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index c7ec1d6..796f626 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -158,3 +158,5 @@ add_test(NAME TestExecuteProcess COMMAND TestExecuteProcess )
 
 add_test(NAME SecureSocketGetTCPTest COMMAND SecureSocketGetTCPTest "${TEST_RESOURCES}/TestGetTCPSecure.yml"  "${TEST_RESOURCES}/")
 
+add_test(NAME TailFileTest COMMAND TailFileTest "${TEST_RESOURCES}/TestTailFile.yml"  "${TEST_RESOURCES}/")
+
diff --git a/libminifi/include/core/state/ProcessorController.h b/libminifi/include/core/state/ProcessorController.h
index 1fadd27..2cba897 100644
--- a/libminifi/include/core/state/ProcessorController.h
+++ b/libminifi/include/core/state/ProcessorController.h
@@ -42,13 +42,17 @@ class ProcessorController : public StateController {
 
   virtual ~ProcessorController();
 
-  virtual std::string getComponentName() const{
+  virtual std::string getComponentName() const {
     return processor_->getName();
   }
 
-  virtual std::string getComponentUUID() const{
-      return processor_->getUUIDStr();
-    }
+  virtual std::string getComponentUUID() const {
+    return processor_->getUUIDStr();
+  }
+
+  std::shared_ptr<core::Processor> getProcessor() {
+    return processor_;
+  }
   /**
    * Start the client
    */
diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h
index c781f4d..3714023 100644
--- a/libminifi/include/processors/TailFile.h
+++ b/libminifi/include/processors/TailFile.h
@@ -40,8 +40,9 @@ class TailFile : public core::Processor {
   /*!
    * Create a new processor
    */
-  explicit TailFile(std::string name,  utils::Identifier uuid = utils::Identifier())
+  explicit TailFile(std::string name, utils::Identifier uuid = utils::Identifier())
       : core::Processor(name, uuid),
+        _currentTailFilePosition(0),
         logger_(logging::LoggerFactory<TailFile>::getLogger()) {
     _stateRecovered = false;
   }
@@ -84,7 +85,7 @@ class TailFile : public core::Processor {
   // State related to the tailed file
   std::string _currentTailFileName;
   // Delimiter for the data incoming from the tailed file.
-  std::string _delimiter;
+  std::string delimiter_;
   // determine if state is recovered;
   bool _stateRecovered;
   uint64_t _currentTailFilePosition;
@@ -102,11 +103,11 @@ class TailFile : public core::Processor {
 };
 
 REGISTER_RESOURCE(TailFile, "\"Tails\" a file, or a list of files, ingesting data from the file as it is written to the file. The file is expected to be textual."
-    " Data is ingested only when a new line is encountered (carriage return or new-line character or combination). If the file to tail is periodically \"rolled over\","
-    " as is generally the case with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover"
-    " occurred while NiFi was not running (provided that the data still exists upon restart of NiFi). It is generally advisable to set the Run Schedule to a few seconds,"
-    " rather than running with the default value of 0 secs, as this Processor will consume a lot of resources if scheduled very aggressively. At this time, this Processor"
-    " does not support ingesting files that have been compressed when 'rolled over'.");
+                  " Data is ingested only when a new line is encountered (carriage return or new-line character or combination). If the file to tail is periodically \"rolled over\","
+                  " as is generally the case with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover"
+                  " occurred while NiFi was not running (provided that the data still exists upon restart of NiFi). It is generally advisable to set the Run Schedule to a few seconds,"
+                  " rather than running with the default value of 0 secs, as this Processor will consume a lot of resources if scheduled very aggressively. At this time, this Processor"
+                  " does not support ingesting files that have been compressed when 'rolled over'.");
 
 // Matched File Item for Roll over check
 typedef struct {
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index ab7e4bf..4a79b21 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -545,13 +545,18 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
           throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
         }
       }
-      while (input.good()) {
+      while (input.good() && !input.eof()) {
         bool invalidWrite = false;
-        flowFile = std::static_pointer_cast<FlowFileRecord>(create());
-        claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
         uint64_t startTime = getTimeMillis();
         input.getline(charBuffer.data(), size, inputDelimiter);
 
+        if (input.eof() || input.fail()) {
+          logger_->log_trace("Finished reading input %s", source);
+          break;
+        }
+        flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+        claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+
         size_t bufsize = strlen(charBuffer.data());
         std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
         if (nullptr == stream) {
@@ -560,7 +565,7 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
           return;
         }
 
-        if (input) {
+        if (input.good()) {
           if (stream->write(reinterpret_cast<uint8_t*>(charBuffer.data()), bufsize) < 0) {
             invalidWrite = true;
             break;
@@ -582,7 +587,7 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
           }
           flowFile->setResourceClaim(claim);
           claim->increaseFlowFileRecordOwnedCount();
-          logger_->log_debug("Import offset %llu length %llu into content %s for FlowFile UUID %s", flowFile->getOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath(),
+          logger_->log_debug("Import offset %u length %u into content %s for FlowFile UUID %s", flowFile->getOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath(),
                              flowFile->getUUIDStr());
           stream->closeStream();
           std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
@@ -596,6 +601,7 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
         }
       }
       input.close();
+      logger_->log_trace("Closed input %s, keeping source ? %i", source, keepSource);
       if (!keepSource)
         std::remove(source.c_str());
     } else {
@@ -729,7 +735,7 @@ void ProcessSession::commit() {
         }
       } else {
         // Can not find relationship for the flow
-        throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
+        throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow " + record->getUUIDStr());
       }
     }
 
@@ -774,7 +780,7 @@ void ProcessSession::commit() {
         }
       } else {
         // Can not find relationship for the flow
-        throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
+        throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow " + record->getUUIDStr());
       }
     }
 
diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp
index df0afc4..bfbf21e 100644
--- a/libminifi/src/processors/TailFile.cpp
+++ b/libminifi/src/processors/TailFile.cpp
@@ -86,7 +86,7 @@ void TailFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFac
   std::string value;
 
   if (context->getProperty(Delimiter.getName(), value)) {
-    _delimiter = value;
+    delimiter_ = value;
   }
 }
 
@@ -277,6 +277,7 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se
   std::string fullPath = fileLocation + "/" + _currentTailFileName;
   struct stat statbuf;
 
+  logger_->log_debug("Tailing file %s", fullPath);
   if (stat(fullPath.c_str(), &statbuf) == 0) {
     if ((uint64_t) statbuf.st_size <= this->_currentTailFilePosition) {
       // there are no new input for the current tail file
@@ -287,14 +288,36 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se
     std::string baseName = _currentTailFileName.substr(0, found);
     std::string extension = _currentTailFileName.substr(found + 1);
 
-    if (!this->_delimiter.empty()) {
-      char delim = this->_delimiter.c_str()[0];
+    if (!delimiter_.empty()) {
+      char delim = delimiter_.c_str()[0];
+      if (delim == '\\') {
+        if (delimiter_.size() > 1) {
+          switch (delimiter_.c_str()[1]) {
+            case 'r':
+              delim = '\r';
+              break;
+            case 't':
+              delim = '\t';
+              break;
+            case 'n':
+              delim = '\n';
+              break;
+            case '\\':
+              delim = '\\';
+              break;
+            default:
+              // previous behavior
+              break;
+          }
+        }
+      }
+      logger_->log_debug("Looking for delimiter 0x%X", delim);
       std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
       session->import(fullPath, flowFiles, true, this->_currentTailFilePosition, delim);
-      logger_->log_info("%ll flowfiles were received from TailFile input", flowFiles.size());
+      logger_->log_info("%u flowfiles were received from TailFile input", flowFiles.size());
 
       for (auto ffr : flowFiles) {
-        logger_->log_info("TailFile %s for %llu bytes", _currentTailFileName, ffr->getSize());
+        logger_->log_info("TailFile %s for %u bytes", _currentTailFileName, ffr->getSize());
         std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + ffr->getSize()) + "." + extension;
         ffr->updateKeyedAttribute(PATH, fileLocation);
         ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 95e14c4..ac3d892 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -62,6 +62,10 @@ class IntegrationBase {
 
   }
 
+  virtual void updateProperties(std::shared_ptr<minifi::FlowController> fc) {
+
+  }
+
   void configureSecurity();
   std::shared_ptr<minifi::Configure> configuration;
   uint64_t wait_time_;
@@ -116,8 +120,8 @@ void IntegrationBase::run(std::string test_file_location) {
 
   std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
                                                                                                 true);
-
   controller->load();
+  updateProperties(controller);
   controller->start();
   waitToVerifyProcessor();
 
diff --git a/libminifi/test/integration/TailFileTest.cpp b/libminifi/test/integration/TailFileTest.cpp
new file mode 100644
index 0000000..b3c2f31
--- /dev/null
+++ b/libminifi/test/integration/TailFileTest.cpp
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "../TestBase.h"
+#include "processors/TailFile.h"
+#include "processors/LogAttribute.h"
+#include "state/ProcessorController.h"
+#include "IntegrationBase.h"
+
+class TailFileTestHarness : public IntegrationBase {
+ public:
+  TailFileTestHarness() {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+
+    statefile = dir;
+    statefile += "/statefile";
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "Lin\\e1\nli\\nen\nli\\ne3\nli\\ne4\nli\\ne5\n";
+    file.close();
+  }
+
+  void testSetup() {
+    LogTestController::getInstance().setInfo<minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+    LogTestController::getInstance().setInfo<minifi::FlowController>();
+    LogTestController::getInstance().setTrace<core::ProcessSession>();
+    LogTestController::getInstance().setDebug<core::ConfigurableComponent>();
+  }
+
+  virtual void cleanup() {
+    unlink(ss.str().c_str());
+    unlink(statefile.c_str());
+  }
+
+  virtual void runAssertions() {
+    assert(LogTestController::getInstance().contains("5 flowfiles were received from TailFile input") == true);
+    assert(LogTestController::getInstance().contains("Looking for delimiter 0xA") == true);
+    assert(LogTestController::getInstance().contains("li\\ne5") == true);
+  }
+
+ protected:
+  virtual void updateProperties(std::shared_ptr<minifi::FlowController> fc) {
+    for (auto &comp : fc->getComponents("tf")) {
+      std::shared_ptr<minifi::state::ProcessorController> proc = std::dynamic_pointer_cast<minifi::state::ProcessorController>(comp);
+      if (nullptr != proc) {
+        proc->getProcessor()->setProperty(minifi::processors::TailFile::FileName, ss.str());
+        proc->getProcessor()->setProperty(minifi::processors::TailFile::StateFile, statefile);
+      }
+    }
+  }
+
+  std::string statefile;
+  char *dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+  }
+
+  TailFileTestHarness harness;
+
+  harness.run(test_file_location);
+
+  return 0;
+}
diff --git a/libminifi/test/resources/TestTailFile.yml b/libminifi/test/resources/TestTailFile.yml
new file mode 100644
index 0000000..a1a78b9
--- /dev/null
+++ b/libminifi/test/resources/TestTailFile.yml
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+    name: MiNiFi Flow
+    id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+    - name: tf
+      id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      class: org.apache.nifi.processors.standard.TailFile
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 100 sec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+        Input Delimiter: \n
+    - name: la
+      id: 2438e3c8-015a-1000-79ca-83af40ec1995
+      class: org.apache.nifi.processors.standard.LogAttribute
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 500 msec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list: 
+       - success
+      Properties:
+        Log Payload: true
+
+Connections:
+    - name: tr1
+      id: 2438e3c8-015a-1000-79ca-83af40ec1997
+      source name: tf
+      source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      source relationship name: success
+      destination name: la
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1995
+      source relationship name: success
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec      
+
+Remote Processing Groups:
+    
diff --git a/libminifi/test/unit/TailFileTests.cpp b/libminifi/test/unit/TailFileTests.cpp
index 379ee40..4966363 100644
--- a/libminifi/test/unit/TailFileTests.cpp
+++ b/libminifi/test/unit/TailFileTests.cpp
@@ -70,7 +70,7 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") {
   std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords();
   std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
   REQUIRE(record == nullptr);
-  REQUIRE(records.size() == 4);
+  REQUIRE(records.size() == 2);
 
   LogTestController::getInstance().reset();