You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/08/11 14:39:06 UTC

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #866: MINIFICPP-1321 Support attribute strategies in MergeContent processor

szaszm commented on a change in pull request #866:
URL: https://github.com/apache/nifi-minifi-cpp/pull/866#discussion_r468537554



##########
File path: extensions/libarchive/MergeContent.h
##########
@@ -259,9 +259,44 @@ class ZipMerge: public ArchiveMerge, public MergeBin {
   }
 };
 
+class AttributeMerger {
+public:
+  AttributeMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows)
+    : flows_(flows) {}
+  void mergeAttributes(core::ProcessSession *session, std::shared_ptr<core::FlowFile> &mergeFlow);
+  virtual ~AttributeMerger() = default;
+protected:
+  std::map<std::string, std::string> getMergedAttributes();
+  virtual void processFlowFile(const std::shared_ptr<core::FlowFile> &flow, std::map<std::string, std::string>& mergedAttributes) = 0;
+
+  const std::deque<std::shared_ptr<core::FlowFile>> &flows_;
+};

Review comment:
       I suggest using `std::accumulate` and one function (or function object) for each attribute merging strategy, instead of this class hierarchy. I believe this would make the code simpler.
   
   If you decide to stay with the classes, please make single-argument ctors explicit. https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rc-explicit

##########
File path: libminifi/test/archive-tests/MergeFileTests.cpp
##########
@@ -829,3 +829,152 @@ TEST_CASE("MergeFileOnAttribute", "[mergefiletest5]") {
   }
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Test Merge File Attributes Keeping Only Common Attributes", "[testMergeFileKeepOnlyCommonAttributes]") {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+
+    // Create and write to the test file
+    for (int i = 0; i < 3; i++) {
+      std::ofstream tmpfile;
+      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
+      for (int j = 0; j < 32; j++) {
+        tmpfile << std::to_string(i);
+        expectfileFirst << std::to_string(i);
+      }
+    }
+  }
+
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, DELIMITER_STRATEGY_TEXT);
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[3];
+
+  // Generate 3 flowfiles merging all into one
+  for (int i = 0; i < 3; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
+    flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
+    flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
+    if (i == 1)
+      flow->setAttribute("tagUnique1", "unique1");
+    else if (i == 2)
+      flow->setAttribute("tagUnique2", "unique2");
+    if (i % 2 == 0) {
+      flow->setAttribute("tagUncommon", "uncommon1");
+    } else {
+      flow->setAttribute("tagUncommon", "uncommon2");
+    }
+    flow->setAttribute("tagCommon", "common");
+
+    record[i] = flow;
+  }
+  input->put(record[1]);
+  input->put(record[2]);
+  input->put(record[0]);
+
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 3; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow = output->poll(expiredFlowRecords);
+
+  auto attributes = flow->getAttributes();
+  REQUIRE(attributes.find("tagUncommon") == attributes.end());
+  REQUIRE(attributes.find("tagUnique1") == attributes.end());
+  REQUIRE(attributes.find("tagUnique2") == attributes.end());
+  REQUIRE(attributes["tagCommon"] == "common");
+
+  LogTestController::getInstance().reset();
+}
+
+TEST_CASE("Test Merge File Attributes Keeping All Unique Attributes", "[testMergeFileKeepAllUniqueAttributes]") {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+
+    // Create and write to the test file
+    for (int i = 0; i < 3; i++) {
+      std::ofstream tmpfile;
+      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);

Review comment:
       ```suggestion
         std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
         std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
   ```

##########
File path: libminifi/test/archive-tests/MergeFileTests.cpp
##########
@@ -829,3 +829,152 @@ TEST_CASE("MergeFileOnAttribute", "[mergefiletest5]") {
   }
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Test Merge File Attributes Keeping Only Common Attributes", "[testMergeFileKeepOnlyCommonAttributes]") {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+
+    // Create and write to the test file
+    for (int i = 0; i < 3; i++) {
+      std::ofstream tmpfile;
+      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
+      for (int j = 0; j < 32; j++) {
+        tmpfile << std::to_string(i);
+        expectfileFirst << std::to_string(i);
+      }
+    }
+  }
+
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, DELIMITER_STRATEGY_TEXT);
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[3];
+
+  // Generate 3 flowfiles merging all into one
+  for (int i = 0; i < 3; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());

Review comment:
       Please avoid spaces inside/around angle brackets. https://google.github.io/styleguide/cppguide.html#Horizontal_Whitespace
   
   I recommend using `auto` to avoid repeating the type name.
   ```suggestion
       const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
   ```
   
   I also realize that this is copy-paste, so the above recommendations are totally optional.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org