You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2017/07/05 17:59:29 UTC
nifi-minifi-cpp git commit: MINIFI-341: Introduce delimiter to
TailFile to delimit incoming data appropriately.
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 0a2d25c48 -> b20da80eb
MINIFI-341: Introduce delimiter to TailFile to delimit incoming data appropriately.
This closed #116.
Signed-off-by: Marc Parisi <ph...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/b20da80e
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/b20da80e
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/b20da80e
Branch: refs/heads/master
Commit: b20da80eb73d49286041ed3f7a2e04afb6b0a7f3
Parents: 0a2d25c
Author: Jeremy Dyer <je...@apache.org>
Authored: Thu Jun 22 22:29:21 2017 -0400
Committer: Marc Parisi <ph...@apache.org>
Committed: Wed Jul 5 13:58:39 2017 -0400
----------------------------------------------------------------------
.gitignore | 3 +
libminifi/include/core/ProcessSession.h | 2 +
libminifi/include/processors/TailFile.h | 10 ++
libminifi/src/core/ProcessSession.cpp | 85 ++++++++++++
libminifi/src/processors/TailFile.cpp | 68 +++++++---
libminifi/test/resources/TestTailFile.txt | 2 +
libminifi/test/unit/TailFileTests.cpp | 171 +++++++++++++++++++++++++
7 files changed, 325 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 4a4c064..3032f38 100644
--- a/.gitignore
+++ b/.gitignore
@@ -22,6 +22,9 @@
# Ignore JetBrains project files
.idea
+# Ignore JetBrains cLion project files.
+.project
+
# Ignore kdevelop metadata
nifi-minifi-cpp.kdev4
.kdev4
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/include/core/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 4d20f59..ad79d12 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -123,6 +123,8 @@ class ProcessSession {
void import(std::string source, std::shared_ptr<core::FlowFile> &&flow,
bool keepSource = true,
uint64_t offset = 0);
+ void import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> flows,
+ bool keepSource, uint64_t offset, char inputDelimiter);
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/include/processors/TailFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h
index 59cf224..5e166cf 100644
--- a/libminifi/include/processors/TailFile.h
+++ b/libminifi/include/processors/TailFile.h
@@ -54,10 +54,18 @@ class TailFile : public core::Processor {
// Supported Properties
static core::Property FileName;
static core::Property StateFile;
+ static core::Property Delimiter;
// Supported Relationships
static core::Relationship Success;
public:
+ /**
+ * Function that's executed when the processor is scheduled.
+ * @param context process context.
+ * @param sessionFactory process session factory that is used when creating
+ * ProcessSession objects.
+ */
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
// OnTrigger method, implemented by NiFi TailFile
virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
// Initialize, over write by NiFi TailFile
@@ -75,6 +83,8 @@ class TailFile : public core::Processor {
std::string _stateFile;
// State related to the tailed file
std::string _currentTailFileName;
+ // Delimiter for the data incoming from the tailed file.
+ std::string _delimiter;
// determine if state is recovered;
bool _stateRecovered;
uint64_t _currentTailFilePosition;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 037660f..df21a34 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -626,6 +626,91 @@ bool keepSource,
}
}
+void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> flows,
+ bool keepSource, uint64_t offset, char inputDelimiter) {
+ std::shared_ptr<ResourceClaim> claim;
+
+ std::shared_ptr<FlowFileRecord> flowFile;
+
+ char *buf = NULL;
+ int size = 4096;
+ buf = new char[size];
+
+ try {
+ // Open the input file and seek to the appropriate location.
+ std::ifstream input;
+ input.open(source.c_str(), std::fstream::in | std::fstream::binary);
+ if (input.is_open()) {
+ input.seekg(offset, input.beg);
+ while (input.good()) {
+ flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+ claim = std::make_shared<ResourceClaim>();
+ uint64_t startTime = getTimeMillis();
+ input.getline(buf, size, inputDelimiter);
+ std::ofstream fs;
+ fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
+
+ if (fs.is_open()) {
+ if (input)
+ fs.write(buf, strlen(buf));
+ else
+ fs.write(buf, input.gcount());
+
+ if (fs.good() && fs.tellp() >= 0) {
+ flowFile->setSize(fs.tellp());
+ flowFile->setOffset(0);
+ if (flowFile->getResourceClaim() != nullptr) {
+ // Remove the old claim
+ flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ flowFile->clearResourceClaim();
+ }
+ flowFile->setResourceClaim(claim);
+ claim->increaseFlowFileRecordOwnedCount();
+
+ logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flowFile->getOffset(),
+ flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath().c_str(),
+ flowFile->getUUIDStr().c_str());
+
+ fs.close();
+ std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flowFile->getUUIDStr();
+ uint64_t endTime = getTimeMillis();
+ provenance_report_->modifyContent(flowFile, details, endTime - startTime);
+ flows.push_back(flowFile);
+
+ } else {
+ fs.close();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
+ }
+ }
+ }
+ input.close();
+ if (!keepSource)
+ std::remove(source.c_str());
+ } else {
+ input.close();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+ }
+
+ delete[] buf;
+ } catch (std::exception &exception) {
+ if (flowFile && flowFile->getResourceClaim() == claim) {
+ flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ flowFile->clearResourceClaim();
+ }
+ logger_->log_debug("Caught Exception %s", exception.what());
+ delete[] buf;
+ throw;
+ } catch (...) {
+ if (flowFile && flowFile->getResourceClaim() == claim) {
+ flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ flowFile->clearResourceClaim();
+ }
+ logger_->log_debug("Caught Exception during process session write");
+ delete[] buf;
+ throw;
+ }
+}
+
void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &&flow,
bool keepSource,
uint64_t offset) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/src/processors/TailFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp
index d4f1b80..46ed1fb 100644
--- a/libminifi/src/processors/TailFile.cpp
+++ b/libminifi/src/processors/TailFile.cpp
@@ -50,6 +50,8 @@ core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of t
core::Property TailFile::StateFile("State File", "Specifies the file that should be used for storing state about"
" what data has been ingested so that upon restart NiFi can resume from where it left off",
"TailFileState");
+core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character that should be used for delimiting the data being tailed"
+ "from the incoming file.", "");
core::Relationship TailFile::Success("success", "All files are routed to success");
void TailFile::initialize() {
@@ -57,6 +59,7 @@ void TailFile::initialize() {
std::set<core::Property> properties;
properties.insert(FileName);
properties.insert(StateFile);
+ properties.insert(Delimiter);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
@@ -64,6 +67,14 @@ void TailFile::initialize() {
setSupportedRelationships(relationships);
}
+void TailFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+ std::string value;
+
+ if (context->getProperty(Delimiter.getName(), value)) {
+ _delimiter = value;
+ }
+}
+
std::string TailFile::trimLeft(const std::string& s) {
return org::apache::nifi::minifi::utils::StringUtils::trimLeft(s);
}
@@ -222,27 +233,52 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se
checkRollOver(fileLocation, fileName);
std::string fullPath = fileLocation + "/" + _currentTailFileName;
struct stat statbuf;
+
if (stat(fullPath.c_str(), &statbuf) == 0) {
if (statbuf.st_size <= this->_currentTailFilePosition) {
- // there are no new input for the current tail fil
+ // there are no new input for the current tail file
context->yield();
return;
}
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
- if (!flowFile)
- return;
- std::size_t found = _currentTailFileName.find_last_of(".");
- std::string baseName = _currentTailFileName.substr(0, found);
- std::string extension = _currentTailFileName.substr(found + 1);
- flowFile->updateKeyedAttribute(PATH, fileLocation);
- flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
- session->import(fullPath, flowFile, true, this->_currentTailFilePosition);
- session->transfer(flowFile, Success);
- logger_->log_info("TailFile %s for %d bytes", _currentTailFileName.c_str(), flowFile->getSize());
- std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension;
- flowFile->updateKeyedAttribute(FILENAME, logName);
- this->_currentTailFilePosition += flowFile->getSize();
- storeState();
+
+ std::size_t found = _currentTailFileName.find_last_of(".");
+ std::string baseName = _currentTailFileName.substr(0, found);
+ std::string extension = _currentTailFileName.substr(found + 1);
+
+ if (!this->_delimiter.empty()) {
+ char delim = this->_delimiter.c_str()[0];
+ std::vector<std::shared_ptr<FlowFileRecord>> flowFiles = std::vector<std::shared_ptr<FlowFileRecord>>();
+ session->import(fullPath, flowFiles, true, this->_currentTailFilePosition, delim);
+ logger_->log_info("%d flowfiles were received from TailFile input", flowFiles.size());
+
+ for (std::shared_ptr<FlowFileRecord> ffr : flowFiles) {
+ logger_->log_info("TailFile %s for %d bytes", _currentTailFileName, ffr->getSize());
+ std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + ffr->getSize()) + "." + extension;
+ ffr->updateKeyedAttribute(PATH, fileLocation);
+ ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
+ ffr->updateKeyedAttribute(FILENAME, logName);
+ session->transfer(ffr, Success);
+ this->_currentTailFilePosition += ffr->getSize() + 1;
+ storeState();
+ }
+
+ } else {
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+ if (!flowFile)
+ return;
+ flowFile->updateKeyedAttribute(PATH, fileLocation);
+ flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
+ session->import(fullPath, flowFile, true, this->_currentTailFilePosition);
+ session->transfer(flowFile, Success);
+ logger_->log_info("TailFile %s for %d bytes", _currentTailFileName, flowFile->getSize());
+ std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension;
+ flowFile->updateKeyedAttribute(FILENAME, logName);
+ this->_currentTailFilePosition += flowFile->getSize();
+ storeState();
+ }
+
+ } else {
+ logger_->log_warn("Unable to stat file %s", fullPath.c_str());
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/test/resources/TestTailFile.txt
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestTailFile.txt b/libminifi/test/resources/TestTailFile.txt
new file mode 100644
index 0000000..8b97da0
--- /dev/null
+++ b/libminifi/test/resources/TestTailFile.txt
@@ -0,0 +1,2 @@
+one,two,three
+four,five,six, seven
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/test/unit/TailFileTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/TailFileTests.cpp b/libminifi/test/unit/TailFileTests.cpp
new file mode 100644
index 0000000..e800b4c
--- /dev/null
+++ b/libminifi/test/unit/TailFileTests.cpp
@@ -0,0 +1,171 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
+#include <uuid/uuid.h>
+#include <fstream>
+#include <map>
+#include <memory>
+#include <utility>
+#include <string>
+#include <set>
+#include "FlowController.h"
+#include "../TestBase.h"
+#include "core/Core.h"
+#include "../../include/core/FlowFile.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include <iostream>
+
+static const char *NEWLINE_FILE = ""
+ "one,two,three\n"
+ "four,five,six, seven";
+static const char *TMP_FILE = "/tmp/minifi-tmpfile.txt";
+static const char *STATE_FILE = "/tmp/minifi-state-file.txt";
+
+TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") {
+ try {
+ // Create and write to the test file
+ std::ofstream tmpfile;
+ tmpfile.open(TMP_FILE);
+ tmpfile << NEWLINE_FILE;
+ tmpfile.close();
+
+ TestController testController;
+ LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>();
+
+ std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+ std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
+ std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+ uuid_t processoruuid;
+ REQUIRE(true == processor->getUUID(processoruuid));
+ uuid_t logAttributeuuid;
+ REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "logattributeconnection");
+ connection->setRelationship(core::Relationship("success", "TailFile successful output"));
+
+ // link the connections so that we can test results at the end for this
+ connection->setDestination(connection);
+
+ connection->setSourceUUID(processoruuid);
+
+ processor->addConnection(connection);
+
+ core::ProcessorNode node(processor);
+
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+ core::ProcessContext context(node, controller_services_provider, repo);
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n");
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
+
+ core::ProcessSession session(&context);
+
+ REQUIRE(processor->getName() == "tailfile");
+
+ core::ProcessSessionFactory factory(&context);
+
+ std::shared_ptr<core::FlowFile> record;
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ processor->onSchedule(&context, &factory);
+ processor->onTrigger(&context, &session);
+
+ provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+ std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
+ record = session.get();
+ REQUIRE(record == nullptr);
+ std::shared_ptr<core::FlowFile> ff = session.get();
+ REQUIRE(provRecords.size() == 4); // 2 creates and 2 modifies for flowfiles
+
+ LogTestController::getInstance().reset();
+ } catch (...) { }
+
+ // Delete the test and state file.
+ std::remove(TMP_FILE);
+ std::remove(STATE_FILE);
+}
+
+
+TEST_CASE("TailFileWithoutDelimiter", "[tailfiletest2]") {
+ try {
+ // Create and write to the test file
+ std::ofstream tmpfile;
+ tmpfile.open(TMP_FILE);
+ tmpfile << NEWLINE_FILE;
+ tmpfile.close();
+
+ TestController testController;
+ LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>();
+
+ std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+ std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
+ std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+ uuid_t processoruuid;
+ REQUIRE(true == processor->getUUID(processoruuid));
+ uuid_t logAttributeuuid;
+ REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "logattributeconnection");
+ connection->setRelationship(core::Relationship("success", "TailFile successful output"));
+
+ // link the connections so that we can test results at the end for this
+ connection->setDestination(connection);
+ connection->setSourceUUID(processoruuid);
+
+ processor->addConnection(connection);
+
+ core::ProcessorNode node(processor);
+
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+ core::ProcessContext context(node, controller_services_provider, repo);
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
+
+ core::ProcessSession session(&context);
+
+ REQUIRE(processor->getName() == "tailfile");
+
+ core::ProcessSessionFactory factory(&context);
+
+ std::shared_ptr<core::FlowFile> record;
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ processor->onSchedule(&context, &factory);
+ processor->onTrigger(&context, &session);
+
+ provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+ std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
+ record = session.get();
+ REQUIRE(record == nullptr);
+ std::shared_ptr<core::FlowFile> ff = session.get();
+ REQUIRE(provRecords.size() == 2);
+
+ LogTestController::getInstance().reset();
+ } catch (...) { }
+
+ // Delete the test and state file.
+ std::remove(TMP_FILE);
+ std::remove(STATE_FILE);
+}