You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2017/07/05 17:59:29 UTC

nifi-minifi-cpp git commit: MINIFI-341: Introduce delimiter to TailFile to delimit incoming data appropriately.

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 0a2d25c48 -> b20da80eb


MINIFI-341: Introduce delimiter to TailFile to delimit incoming data appropriately.

This closed #116.

Signed-off-by: Marc Parisi <ph...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/b20da80e
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/b20da80e
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/b20da80e

Branch: refs/heads/master
Commit: b20da80eb73d49286041ed3f7a2e04afb6b0a7f3
Parents: 0a2d25c
Author: Jeremy Dyer <je...@apache.org>
Authored: Thu Jun 22 22:29:21 2017 -0400
Committer: Marc Parisi <ph...@apache.org>
Committed: Wed Jul 5 13:58:39 2017 -0400

----------------------------------------------------------------------
 .gitignore                                |   3 +
 libminifi/include/core/ProcessSession.h   |   2 +
 libminifi/include/processors/TailFile.h   |  10 ++
 libminifi/src/core/ProcessSession.cpp     |  85 ++++++++++++
 libminifi/src/processors/TailFile.cpp     |  68 +++++++---
 libminifi/test/resources/TestTailFile.txt |   2 +
 libminifi/test/unit/TailFileTests.cpp     | 171 +++++++++++++++++++++++++
 7 files changed, 325 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 4a4c064..3032f38 100644
--- a/.gitignore
+++ b/.gitignore
@@ -22,6 +22,9 @@
 # Ignore JetBrains project files
 .idea
 
+# Ignore JetBrains cLion project files.
+.project
+
 # Ignore kdevelop metadata
 nifi-minifi-cpp.kdev4
 .kdev4

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/include/core/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 4d20f59..ad79d12 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -123,6 +123,8 @@ class ProcessSession {
   void import(std::string source, std::shared_ptr<core::FlowFile> &&flow,
   bool keepSource = true,
               uint64_t offset = 0);
+  void import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> flows,
+  bool keepSource, uint64_t offset, char inputDelimiter);
 
 // Prevent default copy constructor and assignment operation
 // Only support pass by reference or pointer

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/include/processors/TailFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h
index 59cf224..5e166cf 100644
--- a/libminifi/include/processors/TailFile.h
+++ b/libminifi/include/processors/TailFile.h
@@ -54,10 +54,18 @@ class TailFile : public core::Processor {
   // Supported Properties
   static core::Property FileName;
   static core::Property StateFile;
+  static core::Property Delimiter;
   // Supported Relationships
   static core::Relationship Success;
 
  public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
   // OnTrigger method, implemented by NiFi TailFile
   virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
   // Initialize, over write by NiFi TailFile
@@ -75,6 +83,8 @@ class TailFile : public core::Processor {
   std::string _stateFile;
   // State related to the tailed file
   std::string _currentTailFileName;
+  // Delimiter for the data incoming from the tailed file.
+  std::string _delimiter;
   // determine if state is recovered;
   bool _stateRecovered;
   uint64_t _currentTailFilePosition;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 037660f..df21a34 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -626,6 +626,91 @@ bool keepSource,
   }
 }
 
+void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> flows,
+  bool keepSource, uint64_t offset, char inputDelimiter) {
+  std::shared_ptr<ResourceClaim> claim;
+
+  std::shared_ptr<FlowFileRecord> flowFile;
+
+  char *buf = NULL;
+  int size = 4096;
+  buf = new char[size];
+
+  try {
+    // Open the input file and seek to the appropriate location.
+    std::ifstream input;
+    input.open(source.c_str(), std::fstream::in | std::fstream::binary);
+    if (input.is_open()) {
+      input.seekg(offset, input.beg);
+      while (input.good()) {
+        flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+        claim = std::make_shared<ResourceClaim>();
+        uint64_t startTime = getTimeMillis();
+        input.getline(buf, size, inputDelimiter);
+        std::ofstream fs;
+        fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
+
+        if (fs.is_open()) {
+          if (input)
+            fs.write(buf, strlen(buf));
+          else
+            fs.write(buf, input.gcount());
+
+          if (fs.good() && fs.tellp() >= 0) {
+            flowFile->setSize(fs.tellp());
+            flowFile->setOffset(0);
+            if (flowFile->getResourceClaim() != nullptr) {
+              // Remove the old claim
+              flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+              flowFile->clearResourceClaim();
+            }
+            flowFile->setResourceClaim(claim);
+            claim->increaseFlowFileRecordOwnedCount();
+
+            logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flowFile->getOffset(),
+                               flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath().c_str(),
+                               flowFile->getUUIDStr().c_str());
+
+            fs.close();
+            std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flowFile->getUUIDStr();
+            uint64_t endTime = getTimeMillis();
+            provenance_report_->modifyContent(flowFile, details, endTime - startTime);
+            flows.push_back(flowFile);
+
+          } else {
+            fs.close();
+            throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
+          }
+        }
+      }
+      input.close();
+      if (!keepSource)
+        std::remove(source.c_str());
+    } else {
+      input.close();
+      throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+    }
+
+    delete[] buf;
+  } catch (std::exception &exception) {
+    if (flowFile && flowFile->getResourceClaim() == claim) {
+      flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+      flowFile->clearResourceClaim();
+    }
+    logger_->log_debug("Caught Exception %s", exception.what());
+    delete[] buf;
+    throw;
+  } catch (...) {
+    if (flowFile && flowFile->getResourceClaim() == claim) {
+      flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+      flowFile->clearResourceClaim();
+    }
+    logger_->log_debug("Caught Exception during process session write");
+    delete[] buf;
+    throw;
+  }
+}
+
 void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &&flow,
 bool keepSource,
                             uint64_t offset) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/src/processors/TailFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp
index d4f1b80..46ed1fb 100644
--- a/libminifi/src/processors/TailFile.cpp
+++ b/libminifi/src/processors/TailFile.cpp
@@ -50,6 +50,8 @@ core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of t
 core::Property TailFile::StateFile("State File", "Specifies the file that should be used for storing state about"
                                    " what data has been ingested so that upon restart NiFi can resume from where it left off",
                                    "TailFileState");
+core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character that should be used for delimiting the data being tailed"
+                                    "from the incoming file.", "");
 core::Relationship TailFile::Success("success", "All files are routed to success");
 
 void TailFile::initialize() {
@@ -57,6 +59,7 @@ void TailFile::initialize() {
   std::set<core::Property> properties;
   properties.insert(FileName);
   properties.insert(StateFile);
+  properties.insert(Delimiter);
   setSupportedProperties(properties);
   // Set the supported relationships
   std::set<core::Relationship> relationships;
@@ -64,6 +67,14 @@ void TailFile::initialize() {
   setSupportedRelationships(relationships);
 }
 
+void TailFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+
+  if (context->getProperty(Delimiter.getName(), value)) {
+    _delimiter = value;
+  }
+}
+
 std::string TailFile::trimLeft(const std::string& s) {
   return org::apache::nifi::minifi::utils::StringUtils::trimLeft(s);
 }
@@ -222,27 +233,52 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se
   checkRollOver(fileLocation, fileName);
   std::string fullPath = fileLocation + "/" + _currentTailFileName;
   struct stat statbuf;
+
   if (stat(fullPath.c_str(), &statbuf) == 0) {
     if (statbuf.st_size <= this->_currentTailFilePosition) {
-      // there are no new input for the current tail fil
+      // there are no new input for the current tail file
       context->yield();
       return;
     }
-    std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
-    if (!flowFile)
-      return;
-    std::size_t found = _currentTailFileName.find_last_of(".");
-    std::string baseName = _currentTailFileName.substr(0, found);
-    std::string extension = _currentTailFileName.substr(found + 1);
-    flowFile->updateKeyedAttribute(PATH, fileLocation);
-    flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-    session->import(fullPath, flowFile, true, this->_currentTailFilePosition);
-    session->transfer(flowFile, Success);
-    logger_->log_info("TailFile %s for %d bytes", _currentTailFileName.c_str(), flowFile->getSize());
-    std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension;
-    flowFile->updateKeyedAttribute(FILENAME, logName);
-    this->_currentTailFilePosition += flowFile->getSize();
-    storeState();
+
+      std::size_t found = _currentTailFileName.find_last_of(".");
+      std::string baseName = _currentTailFileName.substr(0, found);
+      std::string extension = _currentTailFileName.substr(found + 1);
+
+    if (!this->_delimiter.empty()) {
+      char delim = this->_delimiter.c_str()[0];
+      std::vector<std::shared_ptr<FlowFileRecord>> flowFiles = std::vector<std::shared_ptr<FlowFileRecord>>();
+      session->import(fullPath, flowFiles, true, this->_currentTailFilePosition, delim);
+      logger_->log_info("%d flowfiles were received from TailFile input", flowFiles.size());
+
+      for (std::shared_ptr<FlowFileRecord> ffr : flowFiles) {
+        logger_->log_info("TailFile %s for %d bytes", _currentTailFileName, ffr->getSize());
+        std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + ffr->getSize()) + "." + extension;
+          ffr->updateKeyedAttribute(PATH, fileLocation);
+          ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
+        ffr->updateKeyedAttribute(FILENAME, logName);
+          session->transfer(ffr, Success);
+        this->_currentTailFilePosition += ffr->getSize() + 1;
+        storeState();
+      }
+
+    } else {
+        std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+        if (!flowFile)
+            return;
+        flowFile->updateKeyedAttribute(PATH, fileLocation);
+        flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
+      session->import(fullPath, flowFile, true, this->_currentTailFilePosition);
+      session->transfer(flowFile, Success);
+      logger_->log_info("TailFile %s for %d bytes", _currentTailFileName, flowFile->getSize());
+      std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension;
+      flowFile->updateKeyedAttribute(FILENAME, logName);
+      this->_currentTailFilePosition += flowFile->getSize();
+      storeState();
+    }
+
+  } else {
+    logger_->log_warn("Unable to stat file %s", fullPath.c_str());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/test/resources/TestTailFile.txt
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestTailFile.txt b/libminifi/test/resources/TestTailFile.txt
new file mode 100644
index 0000000..8b97da0
--- /dev/null
+++ b/libminifi/test/resources/TestTailFile.txt
@@ -0,0 +1,2 @@
+one,two,three
+four,five,six, seven
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/test/unit/TailFileTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/TailFileTests.cpp b/libminifi/test/unit/TailFileTests.cpp
new file mode 100644
index 0000000..e800b4c
--- /dev/null
+++ b/libminifi/test/unit/TailFileTests.cpp
@@ -0,0 +1,171 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN  // This tells Catch to provide a main() - only do this in one cpp file
+#include <uuid/uuid.h>
+#include <fstream>
+#include <map>
+#include <memory>
+#include <utility>
+#include <string>
+#include <set>
+#include "FlowController.h"
+#include "../TestBase.h"
+#include "core/Core.h"
+#include "../../include/core/FlowFile.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include <iostream>
+
+static const char *NEWLINE_FILE = ""
+    "one,two,three\n"
+    "four,five,six, seven";
+static const char *TMP_FILE = "/tmp/minifi-tmpfile.txt";
+static const char *STATE_FILE = "/tmp/minifi-state-file.txt";
+
+TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") {
+    try {
+        // Create and write to the test file
+        std::ofstream tmpfile;
+        tmpfile.open(TMP_FILE);
+        tmpfile << NEWLINE_FILE;
+        tmpfile.close();
+
+        TestController testController;
+        LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>();
+
+        std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+        std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
+        std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+        uuid_t processoruuid;
+        REQUIRE(true == processor->getUUID(processoruuid));
+        uuid_t logAttributeuuid;
+        REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+        std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "logattributeconnection");
+        connection->setRelationship(core::Relationship("success", "TailFile successful output"));
+
+        // link the connections so that we can test results at the end for this
+        connection->setDestination(connection);
+
+        connection->setSourceUUID(processoruuid);
+
+        processor->addConnection(connection);
+
+        core::ProcessorNode node(processor);
+
+        std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+        core::ProcessContext context(node, controller_services_provider, repo);
+        context.setProperty(org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n");
+        context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
+        context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
+
+        core::ProcessSession session(&context);
+
+        REQUIRE(processor->getName() == "tailfile");
+
+        core::ProcessSessionFactory factory(&context);
+
+        std::shared_ptr<core::FlowFile> record;
+        processor->setScheduledState(core::ScheduledState::RUNNING);
+        processor->onSchedule(&context, &factory);
+        processor->onTrigger(&context, &session);
+
+        provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+        std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
+        record = session.get();
+        REQUIRE(record == nullptr);
+        std::shared_ptr<core::FlowFile> ff = session.get();
+        REQUIRE(provRecords.size() == 4);   // 2 creates and 2 modifies for flowfiles
+
+        LogTestController::getInstance().reset();
+    } catch (...) { }
+
+    // Delete the test and state file.
+    std::remove(TMP_FILE);
+    std::remove(STATE_FILE);
+}
+
+
+TEST_CASE("TailFileWithoutDelimiter", "[tailfiletest2]") {
+    try {
+        // Create and write to the test file
+        std::ofstream tmpfile;
+        tmpfile.open(TMP_FILE);
+        tmpfile << NEWLINE_FILE;
+        tmpfile.close();
+
+        TestController testController;
+        LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>();
+
+        std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+        std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
+        std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+        uuid_t processoruuid;
+        REQUIRE(true == processor->getUUID(processoruuid));
+        uuid_t logAttributeuuid;
+        REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+        std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "logattributeconnection");
+        connection->setRelationship(core::Relationship("success", "TailFile successful output"));
+
+        // link the connections so that we can test results at the end for this
+        connection->setDestination(connection);
+        connection->setSourceUUID(processoruuid);
+
+        processor->addConnection(connection);
+
+        core::ProcessorNode node(processor);
+
+        std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+        core::ProcessContext context(node, controller_services_provider, repo);
+        context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
+        context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
+
+        core::ProcessSession session(&context);
+
+        REQUIRE(processor->getName() == "tailfile");
+
+        core::ProcessSessionFactory factory(&context);
+
+        std::shared_ptr<core::FlowFile> record;
+        processor->setScheduledState(core::ScheduledState::RUNNING);
+        processor->onSchedule(&context, &factory);
+        processor->onTrigger(&context, &session);
+
+        provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+        std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
+        record = session.get();
+        REQUIRE(record == nullptr);
+        std::shared_ptr<core::FlowFile> ff = session.get();
+        REQUIRE(provRecords.size() == 2);
+
+        LogTestController::getInstance().reset();
+    } catch (...) { }
+
+    // Delete the test and state file.
+    std::remove(TMP_FILE);
+    std::remove(STATE_FILE);
+}