You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2019/02/25 12:41:59 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-735: Fix issues
with delimiter, make changes to facilitate testing this change
This is an automated email from the ASF dual-hosted git repository.
phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 54007b2 MINIFICPP-735: Fix issues with delimiter, make changes to facilitate testing this change
54007b2 is described below
commit 54007b2ab7371bb8462ac46d89aa9969f3178d6b
Author: Marc Parisi <ph...@apache.org>
AuthorDate: Thu Feb 14 11:17:17 2019 -0500
MINIFICPP-735: Fix issues with delimiter, make changes to facilitate testing this change
Approval by jdye64 on GH.
Signed-off-by: Marc Parisi <ph...@apache.org>
---
cmake/BuildTests.cmake | 2 +
libminifi/include/core/state/ProcessorController.h | 12 ++-
libminifi/include/processors/TailFile.h | 15 +--
libminifi/src/core/ProcessSession.cpp | 20 ++--
libminifi/src/processors/TailFile.cpp | 33 ++++++-
libminifi/test/integration/IntegrationBase.h | 6 +-
libminifi/test/integration/TailFileTest.cpp | 108 +++++++++++++++++++++
libminifi/test/resources/TestTailFile.yml | 63 ++++++++++++
libminifi/test/unit/TailFileTests.cpp | 2 +-
9 files changed, 236 insertions(+), 25 deletions(-)
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index c7ec1d6..796f626 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -158,3 +158,5 @@ add_test(NAME TestExecuteProcess COMMAND TestExecuteProcess )
add_test(NAME SecureSocketGetTCPTest COMMAND SecureSocketGetTCPTest "${TEST_RESOURCES}/TestGetTCPSecure.yml" "${TEST_RESOURCES}/")
+add_test(NAME TailFileTest COMMAND TailFileTest "${TEST_RESOURCES}/TestTailFile.yml" "${TEST_RESOURCES}/")
+
diff --git a/libminifi/include/core/state/ProcessorController.h b/libminifi/include/core/state/ProcessorController.h
index 1fadd27..2cba897 100644
--- a/libminifi/include/core/state/ProcessorController.h
+++ b/libminifi/include/core/state/ProcessorController.h
@@ -42,13 +42,17 @@ class ProcessorController : public StateController {
virtual ~ProcessorController();
- virtual std::string getComponentName() const{
+ virtual std::string getComponentName() const {
return processor_->getName();
}
- virtual std::string getComponentUUID() const{
- return processor_->getUUIDStr();
- }
+ virtual std::string getComponentUUID() const {
+ return processor_->getUUIDStr();
+ }
+
+ std::shared_ptr<core::Processor> getProcessor() {
+ return processor_;
+ }
/**
* Start the client
*/
diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h
index c781f4d..3714023 100644
--- a/libminifi/include/processors/TailFile.h
+++ b/libminifi/include/processors/TailFile.h
@@ -40,8 +40,9 @@ class TailFile : public core::Processor {
/*!
* Create a new processor
*/
- explicit TailFile(std::string name, utils::Identifier uuid = utils::Identifier())
+ explicit TailFile(std::string name, utils::Identifier uuid = utils::Identifier())
: core::Processor(name, uuid),
+ _currentTailFilePosition(0),
logger_(logging::LoggerFactory<TailFile>::getLogger()) {
_stateRecovered = false;
}
@@ -84,7 +85,7 @@ class TailFile : public core::Processor {
// State related to the tailed file
std::string _currentTailFileName;
// Delimiter for the data incoming from the tailed file.
- std::string _delimiter;
+ std::string delimiter_;
// determine if state is recovered;
bool _stateRecovered;
uint64_t _currentTailFilePosition;
@@ -102,11 +103,11 @@ class TailFile : public core::Processor {
};
REGISTER_RESOURCE(TailFile, "\"Tails\" a file, or a list of files, ingesting data from the file as it is written to the file. The file is expected to be textual."
- " Data is ingested only when a new line is encountered (carriage return or new-line character or combination). If the file to tail is periodically \"rolled over\","
- " as is generally the case with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover"
- " occurred while NiFi was not running (provided that the data still exists upon restart of NiFi). It is generally advisable to set the Run Schedule to a few seconds,"
- " rather than running with the default value of 0 secs, as this Processor will consume a lot of resources if scheduled very aggressively. At this time, this Processor"
- " does not support ingesting files that have been compressed when 'rolled over'.");
+ " Data is ingested only when a new line is encountered (carriage return or new-line character or combination). If the file to tail is periodically \"rolled over\","
+ " as is generally the case with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover"
+ " occurred while NiFi was not running (provided that the data still exists upon restart of NiFi). It is generally advisable to set the Run Schedule to a few seconds,"
+ " rather than running with the default value of 0 secs, as this Processor will consume a lot of resources if scheduled very aggressively. At this time, this Processor"
+ " does not support ingesting files that have been compressed when 'rolled over'.");
// Matched File Item for Roll over check
typedef struct {
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index ab7e4bf..4a79b21 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -545,13 +545,18 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
}
}
- while (input.good()) {
+ while (input.good() && !input.eof()) {
bool invalidWrite = false;
- flowFile = std::static_pointer_cast<FlowFileRecord>(create());
- claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
uint64_t startTime = getTimeMillis();
input.getline(charBuffer.data(), size, inputDelimiter);
+ if (input.eof() || input.fail()) {
+ logger_->log_trace("Finished reading input %s", source);
+ break;
+ }
+ flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+ claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+
size_t bufsize = strlen(charBuffer.data());
std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
if (nullptr == stream) {
@@ -560,7 +565,7 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
return;
}
- if (input) {
+ if (input.good()) {
if (stream->write(reinterpret_cast<uint8_t*>(charBuffer.data()), bufsize) < 0) {
invalidWrite = true;
break;
@@ -582,7 +587,7 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
}
flowFile->setResourceClaim(claim);
claim->increaseFlowFileRecordOwnedCount();
- logger_->log_debug("Import offset %llu length %llu into content %s for FlowFile UUID %s", flowFile->getOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath(),
+ logger_->log_debug("Import offset %u length %u into content %s for FlowFile UUID %s", flowFile->getOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath(),
flowFile->getUUIDStr());
stream->closeStream();
std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
@@ -596,6 +601,7 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
}
}
input.close();
+ logger_->log_trace("Closed input %s, keeping source ? %i", source, keepSource);
if (!keepSource)
std::remove(source.c_str());
} else {
@@ -729,7 +735,7 @@ void ProcessSession::commit() {
}
} else {
// Can not find relationship for the flow
- throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
+ throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow " + record->getUUIDStr());
}
}
@@ -774,7 +780,7 @@ void ProcessSession::commit() {
}
} else {
// Can not find relationship for the flow
- throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
+ throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow " + record->getUUIDStr());
}
}
diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp
index df0afc4..bfbf21e 100644
--- a/libminifi/src/processors/TailFile.cpp
+++ b/libminifi/src/processors/TailFile.cpp
@@ -86,7 +86,7 @@ void TailFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFac
std::string value;
if (context->getProperty(Delimiter.getName(), value)) {
- _delimiter = value;
+ delimiter_ = value;
}
}
@@ -277,6 +277,7 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se
std::string fullPath = fileLocation + "/" + _currentTailFileName;
struct stat statbuf;
+ logger_->log_debug("Tailing file %s", fullPath);
if (stat(fullPath.c_str(), &statbuf) == 0) {
if ((uint64_t) statbuf.st_size <= this->_currentTailFilePosition) {
// there are no new input for the current tail file
@@ -287,14 +288,36 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se
std::string baseName = _currentTailFileName.substr(0, found);
std::string extension = _currentTailFileName.substr(found + 1);
- if (!this->_delimiter.empty()) {
- char delim = this->_delimiter.c_str()[0];
+ if (!delimiter_.empty()) {
+ char delim = delimiter_.c_str()[0];
+ if (delim == '\\') {
+ if (delimiter_.size() > 1) {
+ switch (delimiter_.c_str()[1]) {
+ case 'r':
+ delim = '\r';
+ break;
+ case 't':
+ delim = '\t';
+ break;
+ case 'n':
+ delim = '\n';
+ break;
+ case '\\':
+ delim = '\\';
+ break;
+ default:
+ // previous behavior
+ break;
+ }
+ }
+ }
+ logger_->log_debug("Looking for delimiter 0x%X", delim);
std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
session->import(fullPath, flowFiles, true, this->_currentTailFilePosition, delim);
- logger_->log_info("%ll flowfiles were received from TailFile input", flowFiles.size());
+ logger_->log_info("%u flowfiles were received from TailFile input", flowFiles.size());
for (auto ffr : flowFiles) {
- logger_->log_info("TailFile %s for %llu bytes", _currentTailFileName, ffr->getSize());
+ logger_->log_info("TailFile %s for %u bytes", _currentTailFileName, ffr->getSize());
std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + ffr->getSize()) + "." + extension;
ffr->updateKeyedAttribute(PATH, fileLocation);
ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 95e14c4..ac3d892 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -62,6 +62,10 @@ class IntegrationBase {
}
+ virtual void updateProperties(std::shared_ptr<minifi::FlowController> fc) {
+
+ }
+
void configureSecurity();
std::shared_ptr<minifi::Configure> configuration;
uint64_t wait_time_;
@@ -116,8 +120,8 @@ void IntegrationBase::run(std::string test_file_location) {
std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
true);
-
controller->load();
+ updateProperties(controller);
controller->start();
waitToVerifyProcessor();
diff --git a/libminifi/test/integration/TailFileTest.cpp b/libminifi/test/integration/TailFileTest.cpp
new file mode 100644
index 0000000..b3c2f31
--- /dev/null
+++ b/libminifi/test/integration/TailFileTest.cpp
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "../TestBase.h"
+#include "processors/TailFile.h"
+#include "processors/LogAttribute.h"
+#include "state/ProcessorController.h"
+#include "IntegrationBase.h"
+
+class TailFileTestHarness : public IntegrationBase {
+ public:
+ TailFileTestHarness() {
+ char format[] = "/tmp/ssth.XXXXXX";
+ dir = testController.createTempDirectory(format);
+
+ statefile = dir;
+ statefile += "/statefile";
+ std::fstream file;
+ ss << dir << "/" << "tstFile.ext";
+ file.open(ss.str(), std::ios::out);
+ file << "Lin\\e1\nli\\nen\nli\\ne3\nli\\ne4\nli\\ne5\n";
+ file.close();
+ }
+
+ void testSetup() {
+ LogTestController::getInstance().setInfo<minifi::processors::LogAttribute>();
+ LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+ LogTestController::getInstance().setInfo<minifi::FlowController>();
+ LogTestController::getInstance().setTrace<core::ProcessSession>();
+ LogTestController::getInstance().setDebug<core::ConfigurableComponent>();
+ }
+
+ virtual void cleanup() {
+ unlink(ss.str().c_str());
+ unlink(statefile.c_str());
+ }
+
+ virtual void runAssertions() {
+ assert(LogTestController::getInstance().contains("5 flowfiles were received from TailFile input") == true);
+ assert(LogTestController::getInstance().contains("Looking for delimiter 0xA") == true);
+ assert(LogTestController::getInstance().contains("li\\ne5") == true);
+ }
+
+ protected:
+ virtual void updateProperties(std::shared_ptr<minifi::FlowController> fc) {
+ for (auto &comp : fc->getComponents("tf")) {
+ std::shared_ptr<minifi::state::ProcessorController> proc = std::dynamic_pointer_cast<minifi::state::ProcessorController>(comp);
+ if (nullptr != proc) {
+ proc->getProcessor()->setProperty(minifi::processors::TailFile::FileName, ss.str());
+ proc->getProcessor()->setProperty(minifi::processors::TailFile::StateFile, statefile);
+ }
+ }
+ }
+
+ std::string statefile;
+ char *dir;
+ std::stringstream ss;
+ TestController testController;
+};
+
+int main(int argc, char **argv) {
+ std::string key_dir, test_file_location, url;
+ if (argc > 1) {
+ test_file_location = argv[1];
+ key_dir = argv[2];
+ }
+
+ TailFileTestHarness harness;
+
+ harness.run(test_file_location);
+
+ return 0;
+}
diff --git a/libminifi/test/resources/TestTailFile.yml b/libminifi/test/resources/TestTailFile.yml
new file mode 100644
index 0000000..a1a78b9
--- /dev/null
+++ b/libminifi/test/resources/TestTailFile.yml
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+ name: MiNiFi Flow
+ id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+ - name: tf
+ id: 2438e3c8-015a-1000-79ca-83af40ec1991
+ class: org.apache.nifi.processors.standard.TailFile
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 100 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ Properties:
+ Input Delimiter: \n
+ - name: la
+ id: 2438e3c8-015a-1000-79ca-83af40ec1995
+ class: org.apache.nifi.processors.standard.LogAttribute
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 500 msec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ - success
+ Properties:
+ Log Payload: true
+
+Connections:
+ - name: tr1
+ id: 2438e3c8-015a-1000-79ca-83af40ec1997
+ source name: tf
+ source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+ source relationship name: success
+ destination name: la
+ destination id: 2438e3c8-015a-1000-79ca-83af40ec1995
+ source relationship name: success
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+
+Remote Processing Groups:
+
diff --git a/libminifi/test/unit/TailFileTests.cpp b/libminifi/test/unit/TailFileTests.cpp
index 379ee40..4966363 100644
--- a/libminifi/test/unit/TailFileTests.cpp
+++ b/libminifi/test/unit/TailFileTests.cpp
@@ -70,7 +70,7 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") {
std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords();
std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
REQUIRE(record == nullptr);
- REQUIRE(records.size() == 4);
+ REQUIRE(records.size() == 2);
LogTestController::getInstance().reset();