You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/11/10 23:27:58 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-780 - Change
GenerateFlowFile to allow 0b content FlowFIles
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new aa65f5d MINIFICPP-780 - Change GenerateFlowFile to allow 0b content FlowFIles
aa65f5d is described below
commit aa65f5d7aad80d83889152b68daecfdceb7558af
Author: Arpad Boda <ab...@apache.org>
AuthorDate: Wed Aug 28 18:03:05 2019 +0200
MINIFICPP-780 - Change GenerateFlowFile to allow 0b content FlowFIles
Signed-off-by: Arpad Boda <ab...@apache.org>
Approved by bakaid on GH
This closes #636
---
.../processors/GenerateFlowFile.cpp | 121 ++++++++-------------
.../processors/GenerateFlowFile.h | 41 +++----
.../tests/unit/GenerateFlowFileTests.cpp | 116 ++++++++++++++++++++
libminifi/test/TestBase.h | 4 +
4 files changed, 189 insertions(+), 93 deletions(-)
diff --git a/extensions/standard-processors/processors/GenerateFlowFile.cpp b/extensions/standard-processors/processors/GenerateFlowFile.cpp
index f4b8f20..447f4dd 100644
--- a/extensions/standard-processors/processors/GenerateFlowFile.cpp
+++ b/extensions/standard-processors/processors/GenerateFlowFile.cpp
@@ -28,10 +28,7 @@
#include <string>
#include <set>
#include <random>
-#ifdef WIN32
-#define srandom srand
-#define random rand
-#endif
+
#include "utils/StringUtils.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
@@ -43,7 +40,6 @@ namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
-const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary";
const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text";
core::Property GenerateFlowFile::FileSize(
core::PropertyBuilder::createProperty("File Size")->withDescription("The size of the file that will be used")->isRequired(false)->withDefaultValue<core::DataSizeValue>("1 kB")->build());
@@ -60,8 +56,8 @@ core::Property GenerateFlowFile::UniqueFlowFiles(
->isRequired(false)->withDefaultValue<bool>(true)->build());
core::Relationship GenerateFlowFile::Success("success", "success operational on the flow record");
-const unsigned int TEXT_LEN = 90;
-static const char TEXT_CHARS[TEXT_LEN + 1] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()-_=+/?.,';:\"?<>\n\t ";
+
+static const char * TEXT_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()-_=+/?.,';:\"?<>\n\t ";
void GenerateFlowFile::initialize() {
// Set the supported properties
@@ -77,88 +73,67 @@ void GenerateFlowFile::initialize() {
setSupportedRelationships(relationships);
}
-void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
- uint64_t batchSize = 1;
- bool uniqueFlowFile = true;
- uint64_t fileSize = 1024;
- bool textData = false;
+void generateData(std::vector<char>& data, bool textData = false) {
+ std::random_device rd;
+ std::mt19937 eng(rd());
+ if (textData) {
+ std::uniform_int_distribution<> distr(0, strlen(TEXT_CHARS) - 1);
+ auto rand = std::bind(distr, eng);
+ std::generate_n(data.begin(), data.size(), rand);
+ std::for_each(data.begin(), data.end(), [](char & c) { c = TEXT_CHARS[c];});
+ } else {
+ std::uniform_int_distribution<> distr(std::numeric_limits<char>::min(), std::numeric_limits<char>::max());
+ auto rand = std::bind(distr, eng);
+ std::generate_n(data.begin(), data.size(), rand);
+ }
+}
- std::string value;
- if (context->getProperty(FileSize.getName(), fileSize)) {
- logger_->log_trace("File size is configured to be %d", fileSize);
+void GenerateFlowFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+ if (context->getProperty(FileSize.getName(), fileSize_)) {
+ logger_->log_trace("File size is configured to be %d", fileSize_);
}
- if (context->getProperty(BatchSize.getName(), batchSize)) {
- logger_->log_trace("Batch size is configured to be %d", batchSize);
+ if (context->getProperty(BatchSize.getName(), batchSize_)) {
+ logger_->log_trace("Batch size is configured to be %d", batchSize_);
}
+
+ std::string value;
if (context->getProperty(DataFormat.getName(), value)) {
- textData = (value == GenerateFlowFile::DATA_FORMAT_TEXT);
+ textData_ = (value == GenerateFlowFile::DATA_FORMAT_TEXT);
}
- if (context->getProperty(UniqueFlowFiles.getName(), uniqueFlowFile)) {
- logger_->log_trace("Unique Flow files is configured to be %i", uniqueFlowFile);
+ if (context->getProperty(UniqueFlowFiles.getName(), uniqueFlowFile_)) {
+ logger_->log_trace("Unique Flow files is configured to be %i", uniqueFlowFile_);
}
- if (uniqueFlowFile) {
- char *data;
- data = new char[fileSize];
- if (!data)
+ if (!uniqueFlowFile_) {
+ data_.resize(fileSize_);
+ generateData(data_, textData_);
+ }
+}
+
+void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+ for (uint64_t i = 0; i < batchSize_; i++) {
+ // For each batch
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+ if (!flowFile) {
+ logger_->log_error("Failed to create flowfile!");
return;
- uint64_t dataSize = fileSize;
- GenerateFlowFile::WriteCallback callback(data, dataSize);
- char *current = data;
- if (textData) {
- for (uint64_t i = 0; i < fileSize; i++) {
- int randValue = random();
- data[i] = TEXT_CHARS[randValue % TEXT_LEN];
- }
- } else {
- for (uint64_t i = 0; i < fileSize; i += sizeof(int)) {
- int randValue = random();
- *(reinterpret_cast<int*>(current)) = randValue;
- current += sizeof(int);
- }
}
- for (uint64_t i = 0; i < batchSize; i++) {
- // For each batch
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
- if (!flowFile)
- return;
- if (fileSize > 0)
+ if (fileSize_ > 0) {
+ if (uniqueFlowFile_) {
+ std::vector<char> data(fileSize_);
+ generateData(data, textData_);
+ GenerateFlowFile::WriteCallback callback(std::move(data));
session->write(flowFile, &callback);
- session->transfer(flowFile, Success);
- }
- delete[] data;
- } else {
- if (!_data) {
- // We have not created the data yet
- _data = new char[fileSize];
- _dataSize = fileSize;
- char *current = _data;
- if (textData) {
- for (uint64_t i = 0; i < fileSize; i++) {
- int randValue = random();
- _data[i] = TEXT_CHARS[randValue % TEXT_LEN];
- }
} else {
- for (uint64_t i = 0; i < fileSize; i += sizeof(int)) {
- int randValue = random();
- *(reinterpret_cast<int*>(current)) = randValue;
- current += sizeof(int);
- }
- }
- }
- GenerateFlowFile::WriteCallback callback(_data, _dataSize);
- for (uint64_t i = 0; i < batchSize; i++) {
- // For each batch
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
- if (!flowFile)
- return;
- if (fileSize > 0)
+ GenerateFlowFile::WriteCallback callback(data_);
session->write(flowFile, &callback);
- session->transfer(flowFile, Success);
+ }
}
+ session->transfer(flowFile, Success);
}
}
+
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
diff --git a/extensions/standard-processors/processors/GenerateFlowFile.h b/extensions/standard-processors/processors/GenerateFlowFile.h
index 019aca8..2d85c3a 100644
--- a/extensions/standard-processors/processors/GenerateFlowFile.h
+++ b/extensions/standard-processors/processors/GenerateFlowFile.h
@@ -40,14 +40,13 @@ class GenerateFlowFile : public core::Processor {
*/
GenerateFlowFile(std::string name, utils::Identifier uuid = utils::Identifier())
: Processor(name, uuid), logger_(logging::LoggerFactory<GenerateFlowFile>::getLogger()) {
- _data = NULL;
- _dataSize = 0;
+ batchSize_ = 1;
+ uniqueFlowFile_ = true;
+ fileSize_ = 1024;
+ textData_ = false;
}
// Destructor
- virtual ~GenerateFlowFile() {
- if (_data)
- delete[] _data;
- }
+ virtual ~GenerateFlowFile() = default;
// Processor Name
static constexpr char const* ProcessorName = "GenerateFlowFile";
// Supported Properties
@@ -55,40 +54,42 @@ class GenerateFlowFile : public core::Processor {
static core::Property BatchSize;
static core::Property DataFormat;
static core::Property UniqueFlowFiles;
- static const char *DATA_FORMAT_BINARY;
static const char *DATA_FORMAT_TEXT;
// Supported Relationships
static core::Relationship Success;
// Nest Callback Class for write stream
class WriteCallback : public OutputStreamCallback {
public:
- WriteCallback(char *data, uint64_t size)
- : _data(data),
- _dataSize(size) {
+ WriteCallback(std::vector<char> && data) : data_(std::move(data)) {
+ }
+ WriteCallback(const std::vector<char>& data) : data_(data) {
}
- char *_data;
- uint64_t _dataSize;
+ std::vector<char> data_;
int64_t process(std::shared_ptr<io::BaseStream> stream) {
int64_t ret = 0;
- if (_data && _dataSize > 0)
- ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize);
+ if(data_.size() > 0)
+ ret = stream->write(reinterpret_cast<uint8_t*>(&data_[0]), data_.size());
return ret;
}
};
public:
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
// OnTrigger method, implemented by NiFi GenerateFlowFile
- virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+ virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
// Initialize, over write by NiFi GenerateFlowFile
- virtual void initialize(void);
+ virtual void initialize(void) override;
protected:
+ std::vector<char> data_;
+
+ uint64_t batchSize_;
+ bool uniqueFlowFile_;
+ uint64_t fileSize_;
+ bool textData_;
private:
- // Generated data
- char * _data;
- // Size of the generated data
- uint64_t _dataSize;
+
// logger instance
std::shared_ptr<logging::Logger> logger_;
};
diff --git a/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp b/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp
new file mode 100644
index 0000000..5245a29
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp
@@ -0,0 +1,116 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <utility>
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
+#include <fstream>
+
+#include "TestBase.h"
+#include "utils/file/FileUtils.h"
+#include "GenerateFlowFile.h"
+#include "PutFile.h"
+
+TEST_CASE("GenerateFlowFileTest", "[generateflowfiletest]") {
+ TestController testController;
+ LogTestController::getInstance().setTrace<TestPlan>();
+
+ char format[] = "/tmp/gt.XXXXXX";
+ auto dir = testController.createTempDirectory(format);
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+
+ std::shared_ptr<core::Processor> genfile = plan->addProcessor("GenerateFlowFile", "genfile");
+
+ std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true);
+
+ plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), dir);
+
+ plan->setProperty(genfile, org::apache::nifi::minifi::processors::GenerateFlowFile::FileSize.getName(), "10");
+ plan->setProperty(genfile, org::apache::nifi::minifi::processors::GenerateFlowFile::BatchSize.getName(), "2");
+ plan->setProperty(genfile, org::apache::nifi::minifi::processors::GenerateFlowFile::UniqueFlowFiles.getName(), "true");
+ plan->setProperty(genfile, org::apache::nifi::minifi::processors::GenerateFlowFile::DataFormat.getName(), "Text");
+
+ plan->runNextProcessor(); // Generate
+ plan->runNextProcessor(); // Put
+ plan->runCurrentProcessor(); // Put
+
+ std::vector<std::vector<char>> fileContents;
+
+ auto lambda = [&fileContents](const std::string& path, const std::string& filename) -> bool {
+ std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
+
+ is.seekg(0, is.end);
+ size_t length = is.tellg();
+ is.seekg(0, is.beg);
+
+ std::vector<char> content(length);
+
+ is.read(&content[0], length);
+
+ fileContents.push_back(std::move(content));
+
+ return true;
+ };
+
+ utils::file::FileUtils::list_dir(dir, lambda, plan->getLogger(), false);
+
+ REQUIRE(fileContents.size() == 2);
+ REQUIRE(fileContents[0].size() == 10);
+ REQUIRE(fileContents[1].size() == 10);
+ REQUIRE(fileContents[0] != fileContents[1]);
+}
+
+TEST_CASE("GenerateFlowFileTestEmpty", "[generateemptyfiletest]") {
+ TestController testController;
+ LogTestController::getInstance().setTrace<TestPlan>();
+
+ char format[] = "/tmp/gt.XXXXXX";
+ auto dir = testController.createTempDirectory(format);
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+
+ std::shared_ptr<core::Processor> genfile = plan->addProcessor("GenerateFlowFile", "genfile");
+
+ std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true);
+
+ plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), dir);
+
+ plan->setProperty(genfile, org::apache::nifi::minifi::processors::GenerateFlowFile::FileSize.getName(), "0");
+
+ plan->runNextProcessor(); // Generate
+ plan->runNextProcessor(); // Put
+
+ size_t counter = 0;
+
+ auto lambda = [&counter](const std::string& path, const std::string& filename) -> bool {
+ std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
+
+ is.seekg(0, is.end);
+ REQUIRE(is.tellg() == 0);
+
+ counter++;
+
+ return true;
+ };
+
+ utils::file::FileUtils::list_dir(dir, lambda, plan->getLogger(), false);
+
+ REQUIRE(counter == 1);
+}
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 4d7d363..03f9a1a 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -284,6 +284,10 @@ class TestPlan {
return content_repo_;
}
+ std::shared_ptr<logging::Logger> getLogger() const {
+ return logger_;
+ }
+
protected:
void finalize();