You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/11/10 23:27:58 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-780 - Change GenerateFlowFile to allow 0b content FlowFIles

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new aa65f5d  MINIFICPP-780 - Change GenerateFlowFile to allow 0b content FlowFIles
aa65f5d is described below

commit aa65f5d7aad80d83889152b68daecfdceb7558af
Author: Arpad Boda <ab...@apache.org>
AuthorDate: Wed Aug 28 18:03:05 2019 +0200

    MINIFICPP-780 - Change GenerateFlowFile to allow 0b content FlowFIles
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    Approved by bakaid on GH
    
    This closes #636
---
 .../processors/GenerateFlowFile.cpp                | 121 ++++++++-------------
 .../processors/GenerateFlowFile.h                  |  41 +++----
 .../tests/unit/GenerateFlowFileTests.cpp           | 116 ++++++++++++++++++++
 libminifi/test/TestBase.h                          |   4 +
 4 files changed, 189 insertions(+), 93 deletions(-)

diff --git a/extensions/standard-processors/processors/GenerateFlowFile.cpp b/extensions/standard-processors/processors/GenerateFlowFile.cpp
index f4b8f20..447f4dd 100644
--- a/extensions/standard-processors/processors/GenerateFlowFile.cpp
+++ b/extensions/standard-processors/processors/GenerateFlowFile.cpp
@@ -28,10 +28,7 @@
 #include <string>
 #include <set>
 #include <random>
-#ifdef WIN32
-#define srandom srand
-#define random rand
-#endif
+
 #include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
@@ -43,7 +40,6 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
-const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary";
 const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text";
 core::Property GenerateFlowFile::FileSize(
     core::PropertyBuilder::createProperty("File Size")->withDescription("The size of the file that will be used")->isRequired(false)->withDefaultValue<core::DataSizeValue>("1 kB")->build());
@@ -60,8 +56,8 @@ core::Property GenerateFlowFile::UniqueFlowFiles(
         ->isRequired(false)->withDefaultValue<bool>(true)->build());
 
 core::Relationship GenerateFlowFile::Success("success", "success operational on the flow record");
-const unsigned int TEXT_LEN = 90;
-static const char TEXT_CHARS[TEXT_LEN + 1] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()-_=+/?.,';:\"?<>\n\t ";
+
+static const char * TEXT_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()-_=+/?.,';:\"?<>\n\t ";
 
 void GenerateFlowFile::initialize() {
   // Set the supported properties
@@ -77,88 +73,67 @@ void GenerateFlowFile::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
-  uint64_t batchSize = 1;
-  bool uniqueFlowFile = true;
-  uint64_t fileSize = 1024;
-  bool textData = false;
+void generateData(std::vector<char>& data, bool textData = false) {
+  std::random_device rd;
+  std::mt19937 eng(rd());
+  if (textData) {
+    std::uniform_int_distribution<> distr(0, strlen(TEXT_CHARS) - 1);
+    auto rand = std::bind(distr, eng);
+    std::generate_n(data.begin(), data.size(), rand);
+    std::for_each(data.begin(), data.end(), [](char & c) { c = TEXT_CHARS[c];});
+  } else {
+    std::uniform_int_distribution<> distr(std::numeric_limits<char>::min(), std::numeric_limits<char>::max());
+    auto rand = std::bind(distr, eng);
+    std::generate_n(data.begin(), data.size(), rand);
+  }
+}
 
-  std::string value;
-  if (context->getProperty(FileSize.getName(), fileSize)) {
-    logger_->log_trace("File size is configured to be %d", fileSize);
+void GenerateFlowFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  if (context->getProperty(FileSize.getName(), fileSize_)) {
+    logger_->log_trace("File size is configured to be %d", fileSize_);
   }
 
-  if (context->getProperty(BatchSize.getName(), batchSize)) {
-    logger_->log_trace("Batch size is configured to be %d", batchSize);
+  if (context->getProperty(BatchSize.getName(), batchSize_)) {
+    logger_->log_trace("Batch size is configured to be %d", batchSize_);
   }
+
+  std::string value;
   if (context->getProperty(DataFormat.getName(), value)) {
-    textData = (value == GenerateFlowFile::DATA_FORMAT_TEXT);
+    textData_ = (value == GenerateFlowFile::DATA_FORMAT_TEXT);
   }
-  if (context->getProperty(UniqueFlowFiles.getName(), uniqueFlowFile)) {
-    logger_->log_trace("Unique Flow files is configured to be %i", uniqueFlowFile);
+  if (context->getProperty(UniqueFlowFiles.getName(), uniqueFlowFile_)) {
+    logger_->log_trace("Unique Flow files is configured to be %i", uniqueFlowFile_);
   }
 
-  if (uniqueFlowFile) {
-    char *data;
-    data = new char[fileSize];
-    if (!data)
+  if (!uniqueFlowFile_) {
+    data_.resize(fileSize_);
+    generateData(data_, textData_);
+  }
+}
+
+void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  for (uint64_t i = 0; i < batchSize_; i++) {
+    // For each batch
+    std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+    if (!flowFile) {
+      logger_->log_error("Failed to create flowfile!");
       return;
-    uint64_t dataSize = fileSize;
-    GenerateFlowFile::WriteCallback callback(data, dataSize);
-    char *current = data;
-    if (textData) {
-      for (uint64_t i = 0; i < fileSize; i++) {
-        int randValue = random();
-        data[i] = TEXT_CHARS[randValue % TEXT_LEN];
-      }
-    } else {
-      for (uint64_t i = 0; i < fileSize; i += sizeof(int)) {
-        int randValue = random();
-        *(reinterpret_cast<int*>(current)) = randValue;
-        current += sizeof(int);
-      }
     }
-    for (uint64_t i = 0; i < batchSize; i++) {
-      // For each batch
-      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
-      if (!flowFile)
-        return;
-      if (fileSize > 0)
+    if (fileSize_ > 0) {
+      if (uniqueFlowFile_) {
+        std::vector<char> data(fileSize_);
+        generateData(data, textData_);
+        GenerateFlowFile::WriteCallback callback(std::move(data));
         session->write(flowFile, &callback);
-      session->transfer(flowFile, Success);
-    }
-    delete[] data;
-  } else {
-    if (!_data) {
-      // We have not created the data yet
-      _data = new char[fileSize];
-      _dataSize = fileSize;
-      char *current = _data;
-      if (textData) {
-        for (uint64_t i = 0; i < fileSize; i++) {
-          int randValue = random();
-          _data[i] = TEXT_CHARS[randValue % TEXT_LEN];
-        }
       } else {
-        for (uint64_t i = 0; i < fileSize; i += sizeof(int)) {
-          int randValue = random();
-          *(reinterpret_cast<int*>(current)) = randValue;
-          current += sizeof(int);
-        }
-      }
-    }
-    GenerateFlowFile::WriteCallback callback(_data, _dataSize);
-    for (uint64_t i = 0; i < batchSize; i++) {
-      // For each batch
-      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
-      if (!flowFile)
-        return;
-      if (fileSize > 0)
+        GenerateFlowFile::WriteCallback callback(data_);
         session->write(flowFile, &callback);
-      session->transfer(flowFile, Success);
+      }
     }
+    session->transfer(flowFile, Success);
   }
 }
+
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/extensions/standard-processors/processors/GenerateFlowFile.h b/extensions/standard-processors/processors/GenerateFlowFile.h
index 019aca8..2d85c3a 100644
--- a/extensions/standard-processors/processors/GenerateFlowFile.h
+++ b/extensions/standard-processors/processors/GenerateFlowFile.h
@@ -40,14 +40,13 @@ class GenerateFlowFile : public core::Processor {
    */
   GenerateFlowFile(std::string name, utils::Identifier uuid = utils::Identifier())
       : Processor(name, uuid), logger_(logging::LoggerFactory<GenerateFlowFile>::getLogger()) {
-    _data = NULL;
-    _dataSize = 0;
+    batchSize_ = 1;
+    uniqueFlowFile_ = true;
+    fileSize_ = 1024;
+    textData_ = false;
   }
   // Destructor
-  virtual ~GenerateFlowFile() {
-    if (_data)
-      delete[] _data;
-  }
+  virtual ~GenerateFlowFile() = default;
   // Processor Name
   static constexpr char const* ProcessorName = "GenerateFlowFile";
   // Supported Properties
@@ -55,40 +54,42 @@ class GenerateFlowFile : public core::Processor {
   static core::Property BatchSize;
   static core::Property DataFormat;
   static core::Property UniqueFlowFiles;
-  static const char *DATA_FORMAT_BINARY;
   static const char *DATA_FORMAT_TEXT;
   // Supported Relationships
   static core::Relationship Success;
   // Nest Callback Class for write stream
   class WriteCallback : public OutputStreamCallback {
    public:
-    WriteCallback(char *data, uint64_t size)
-        : _data(data),
-          _dataSize(size) {
+    WriteCallback(std::vector<char> && data) : data_(std::move(data)) {
+    }
+    WriteCallback(const std::vector<char>& data) : data_(data) {
     }
-    char *_data;
-    uint64_t _dataSize;
+    std::vector<char> data_;
     int64_t process(std::shared_ptr<io::BaseStream> stream) {
       int64_t ret = 0;
-      if (_data && _dataSize > 0)
-        ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize);
+      if(data_.size() > 0)
+        ret = stream->write(reinterpret_cast<uint8_t*>(&data_[0]), data_.size());
       return ret;
     }
   };
 
  public:
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
   // OnTrigger method, implemented by NiFi GenerateFlowFile
-  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
   // Initialize, over write by NiFi GenerateFlowFile
-  virtual void initialize(void);
+  virtual void initialize(void) override;
 
  protected:
+  std::vector<char> data_;
+
+  uint64_t batchSize_;
+  bool uniqueFlowFile_;
+  uint64_t fileSize_;
+  bool textData_;
 
  private:
-  // Generated data
-  char * _data;
-  // Size of the generated data
-  uint64_t _dataSize;
+
   // logger instance
   std::shared_ptr<logging::Logger> logger_;
 };
diff --git a/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp b/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp
new file mode 100644
index 0000000..5245a29
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp
@@ -0,0 +1,116 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <utility>
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
+#include <fstream>
+
+#include "TestBase.h"
+#include "utils/file/FileUtils.h"
+#include "GenerateFlowFile.h"
+#include "PutFile.h"
+
+TEST_CASE("GenerateFlowFileTest", "[generateflowfiletest]") {
+  TestController testController;
+  LogTestController::getInstance().setTrace<TestPlan>();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+
+  std::shared_ptr<core::Processor> genfile = plan->addProcessor("GenerateFlowFile", "genfile");
+
+  std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true);
+
+  plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), dir);
+
+  plan->setProperty(genfile, org::apache::nifi::minifi::processors::GenerateFlowFile::FileSize.getName(), "10");
+  plan->setProperty(genfile, org::apache::nifi::minifi::processors::GenerateFlowFile::BatchSize.getName(), "2");
+  plan->setProperty(genfile, org::apache::nifi::minifi::processors::GenerateFlowFile::UniqueFlowFiles.getName(), "true");
+  plan->setProperty(genfile, org::apache::nifi::minifi::processors::GenerateFlowFile::DataFormat.getName(), "Text");
+
+  plan->runNextProcessor();  // Generate
+  plan->runNextProcessor();  // Put
+  plan->runCurrentProcessor();  // Put
+
+  std::vector<std::vector<char>> fileContents;
+
+  auto lambda = [&fileContents](const std::string& path, const std::string& filename) -> bool {
+    std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
+
+    is.seekg(0, is.end);
+    size_t length = is.tellg();
+    is.seekg(0, is.beg);
+
+    std::vector<char> content(length);
+
+    is.read(&content[0], length);
+
+    fileContents.push_back(std::move(content));
+
+    return true;
+  };
+
+  utils::file::FileUtils::list_dir(dir, lambda, plan->getLogger(), false);
+
+  REQUIRE(fileContents.size() == 2);
+  REQUIRE(fileContents[0].size() == 10);
+  REQUIRE(fileContents[1].size() == 10);
+  REQUIRE(fileContents[0] != fileContents[1]);
+}
+
+TEST_CASE("GenerateFlowFileTestEmpty", "[generateemptyfiletest]") {
+  TestController testController;
+  LogTestController::getInstance().setTrace<TestPlan>();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+
+  std::shared_ptr<core::Processor> genfile = plan->addProcessor("GenerateFlowFile", "genfile");
+
+  std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true);
+
+  plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), dir);
+
+  plan->setProperty(genfile, org::apache::nifi::minifi::processors::GenerateFlowFile::FileSize.getName(), "0");
+
+  plan->runNextProcessor();  // Generate
+  plan->runNextProcessor();  // Put
+
+  size_t counter = 0;
+
+  auto lambda = [&counter](const std::string& path, const std::string& filename) -> bool {
+    std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
+
+    is.seekg(0, is.end);
+    REQUIRE(is.tellg() == 0);
+
+    counter++;
+
+    return true;
+  };
+
+  utils::file::FileUtils::list_dir(dir, lambda, plan->getLogger(), false);
+
+  REQUIRE(counter == 1);
+}
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 4d7d363..03f9a1a 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -284,6 +284,10 @@ class TestPlan {
     return content_repo_;
   }
 
+  std::shared_ptr<logging::Logger> getLogger() const {
+    return logger_;
+  }
+
  protected:
 
   void finalize();