You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2017/09/22 17:47:01 UTC
nifi-minifi-cpp git commit: Create Merge Content processor
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 6283a447e -> 8b3b1c880
Create Merge Content processor
This closes #133.
Signed-off-by: Marc Parisi <ph...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/8b3b1c88
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/8b3b1c88
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/8b3b1c88
Branch: refs/heads/master
Commit: 8b3b1c880d92dfc8289af80ad883e254a23bc356
Parents: 6283a44
Author: Bin Qiu <be...@gmail.com>
Authored: Thu Aug 24 10:04:32 2017 -0700
Committer: Marc Parisi <ph...@apache.org>
Committed: Fri Sep 22 13:41:23 2017 -0400
----------------------------------------------------------------------
README.md | 1 +
libminifi/include/core/FlowConfiguration.h | 1 +
libminifi/include/core/ProcessSession.h | 2 +
libminifi/include/processors/BinFiles.h | 296 +++++++++
libminifi/include/processors/LoadProcessors.h | 1 +
libminifi/include/processors/MergeContent.h | 206 ++++++
libminifi/src/core/ProcessSession.cpp | 4 +
libminifi/src/processors/BinFiles.cpp | 303 +++++++++
libminifi/src/processors/MergeContent.cpp | 283 ++++++++
libminifi/test/unit/MergeFileTests.cpp | 720 +++++++++++++++++++++
10 files changed, 1817 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 8830aaa..db10ab4 100644
--- a/README.md
+++ b/README.md
@@ -56,6 +56,7 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a
* ListenSyslog
* PutFile
* TailFile
+ * MergeContent
* Provenance events generation is supported and are persisted using levelDB.
## System Requirements
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index 43d2bc0..d9ebc72 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -35,6 +35,7 @@
#include "processors/LogAttribute.h"
#include "processors/ExecuteProcess.h"
#include "processors/AppendHostInfo.h"
+#include "processors/MergeContent.h"
#include "core/Processor.h"
#include "core/logging/LoggerConfiguration.h"
#include "core/ProcessContext.h"
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/include/core/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index d853e9b..3a3b143 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -79,6 +79,8 @@ class ProcessSession {
std::shared_ptr<core::FlowFile> create(std::shared_ptr<core::FlowFile> &parent) {
return create(parent);
}
+ // Add a FlowFile to the session
+ void add(std::shared_ptr<core::FlowFile> &flow);
// Clone a new UUID FlowFile from parent both for content resource claim and attributes
std::shared_ptr<core::FlowFile> clone(std::shared_ptr<core::FlowFile> &parent);
// Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/include/processors/BinFiles.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/BinFiles.h b/libminifi/include/processors/BinFiles.h
new file mode 100644
index 0000000..6c619a8
--- /dev/null
+++ b/libminifi/include/processors/BinFiles.h
@@ -0,0 +1,296 @@
+/**
+ * @file BinFiles.h
+ * BinFiles class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __BIN_FILES_H__
+#define __BIN_FILES_H__
+
+#include <climits>
+#include <deque>
+#include <map>
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// Bin Class
+class Bin {
+ public:
+ // Constructor
+ /*!
+ * Create a new Bin. Note: this object is not thread safe
+ */
+ explicit Bin(const uint64_t &minSize, const uint64_t &maxSize, const int &minEntries, const int & maxEntries,
+ const std::string &fileCount, const std::string &groupId)
+ : minSize_(minSize), maxSize_(maxSize), maxEntries_(maxEntries), minEntries_(minEntries), fileCount_(fileCount),
+ groupId_(groupId), logger_(logging::LoggerFactory<Bin>::getLogger()) {
+ queued_data_size_ = 0;
+ creation_dated_ = getTimeMillis();
+ std::shared_ptr<utils::IdGenerator> id_generator = utils::IdGenerator::getIdGenerator();
+ char uuidStr[37] = { 0 };
+ id_generator->generate(uuid_);
+ uuid_unparse_lower(uuid_, uuidStr);
+ uuid_str_ = uuidStr;
+ logger_->log_info("Bin %s for group %s created", uuid_str_, groupId_);
+ }
+ virtual ~Bin() {
+ logger_->log_info("Bin %s for group %s destroyed", uuid_str_, groupId_);
+ }
+ // check whether the bin is full
+ bool isFull() {
+ if (queued_data_size_ >= maxSize_ || queue_.size() >= maxEntries_)
+ return true;
+ else
+ return false;
+ }
+ // check whether the bin meet the min required size and entries so that it can be processed for merge
+ bool isReadyForMerge() {
+ return isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_);
+ }
+ // check whether the bin is older than the time specified in msec
+ bool isOlderThan(const uint64_t &duration) {
+ uint64_t currentTime = getTimeMillis();
+ if (currentTime > (creation_dated_ + duration))
+ return true;
+ else
+ return false;
+ }
+ std::deque<std::shared_ptr<core::FlowFile>> & getFlowFile() {
+ return queue_;
+ }
+ // offer the flowfile to the bin
+ bool offer(std::shared_ptr<core::FlowFile> flow) {
+ if (!fileCount_.empty()) {
+ std::string value;
+ if (flow->getAttribute(fileCount_, value)) {
+ try {
+ // for defrag case using the identification
+ int count = std::stoi(value);
+ maxEntries_ = count;
+ minEntries_ = count;
+ } catch (...) {
+
+ }
+ }
+ }
+
+ if ((queued_data_size_ + flow->getSize()) > maxSize_ || (queue_.size() + 1) > maxEntries_)
+ return false;
+
+ queue_.push_back(flow);
+ queued_data_size_ += flow->getSize();
+ logger_->log_info("Bin %s for group %s offer size %d byte %d min_entry %d max_entry %d",
+ uuid_str_, groupId_, queue_.size(), queued_data_size_, minEntries_, maxEntries_);
+
+ return true;
+ }
+ // getBinAge
+ uint64_t getBinAge() {
+ return creation_dated_;
+ }
+ int getSize() {
+ return queue_.size();
+ }
+ // Get the UUID as string
+ std::string getUUIDStr() {
+ return uuid_str_;
+ }
+ std::string getGroupId() {
+ return groupId_;
+ }
+
+ protected:
+
+ private:
+ uint64_t minSize_;
+ uint64_t maxSize_;
+ int maxEntries_;
+ int minEntries_;
+ // Queued data size
+ uint64_t queued_data_size_;
+ // Queue for the Flow File
+ std::deque<std::shared_ptr<core::FlowFile>> queue_;
+ uint64_t creation_dated_;
+ std::string fileCount_;
+ std::string groupId_;
+ std::shared_ptr<logging::Logger> logger_;
+ // A global unique identifier
+ uuid_t uuid_;
+ // UUID string
+ std::string uuid_str_;
+};
+
+// BinManager Class
+class BinManager {
+ public:
+ // Constructor
+ /*!
+ * Create a new BinManager
+ */
+ BinManager()
+ : minSize_(0), maxSize_(ULLONG_MAX), maxEntries_(INT_MAX), minEntries_(1), binAge_(ULLONG_MAX), binCount_(0),
+ logger_(logging::LoggerFactory<BinManager>::getLogger()) {
+ }
+ virtual ~BinManager() {
+ purge();
+ }
+ void setMinSize(const uint64_t &size) {
+ minSize_ = size;
+ }
+ void setMaxSize(const uint64_t &size) {
+ maxSize_ = size;
+ }
+ void setMaxEntries(const int &entries) {
+ maxEntries_ = entries;
+ }
+ void setMinEntries(const int &entries) {
+ minEntries_ = entries;
+ }
+ void setBinAge(const uint64_t &age) {
+ binAge_ = age;
+ }
+ int getBinCount() {
+ return binCount_;
+ }
+ void setFileCount(const std::string &value) {
+ fileCount_ = value;
+ }
+ void purge() {
+ std::lock_guard < std::mutex > lock(mutex_);
+ groupBinMap_.clear();
+ binCount_ = 0;
+ }
+ // Adds the given flowFile to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary.
+ bool offer(const std::string &group, std::shared_ptr<core::FlowFile> flow);
+ // gather ready bins once the bin are full enough or exceed bin age
+ void gatherReadyBins();
+ // remove oldest bin
+ void removeOldestBin();
+ // get ready bin from binManager
+ void getReadyBin(std::deque<std::unique_ptr<Bin>> &retBins);
+
+ protected:
+
+ private:
+ std::mutex mutex_;
+ uint64_t minSize_;
+ uint64_t maxSize_;
+ int maxEntries_;
+ int minEntries_;
+ std::string fileCount_;
+ // Bin Age in msec
+ uint64_t binAge_;
+ std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>>> groupBinMap_;
+ std::deque<std::unique_ptr<Bin>> readyBin_;
+ int binCount_;
+ std::shared_ptr<logging::Logger> logger_;
+};
+
+// BinFiles Class
+class BinFiles : public core::Processor {
+ public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ explicit BinFiles(std::string name, uuid_t uuid = NULL)
+ : core::Processor(name, uuid),
+ logger_(logging::LoggerFactory<BinFiles>::getLogger()) {
+ maxBinCount_ = 100;
+ }
+ // Destructor
+ virtual ~BinFiles() {
+ }
+ // Processor Name
+ static constexpr char const* ProcessorName = "BinFiles";
+ // Supported Properties
+ static core::Property MinSize;
+ static core::Property MaxSize;
+ static core::Property MinEntries;
+ static core::Property MaxEntries;
+ static core::Property MaxBinCount;
+ static core::Property MaxBinAge;
+
+ // Supported Relationships
+ static core::Relationship Failure;
+ static core::Relationship Original;
+
+ // attributes
+ static const char *FRAGMENT_ID_ATTRIBUTE;
+ static const char *FRAGMENT_INDEX_ATTRIBUTE;
+ static const char *FRAGMENT_COUNT_ATTRIBUTE;
+
+ static const char *SEGMENT_ID_ATTRIBUTE;
+ static const char *SEGMENT_INDEX_ATTRIBUTE;
+ static const char *SEGMENT_COUNT_ATTRIBUTE;
+ static const char *SEGMENT_ORIGINAL_FILENAME;
+
+ public:
+ /**
+ * Function that's executed when the processor is scheduled.
+ * @param context process context.
+ * @param sessionFactory process session factory that is used when creating
+ * ProcessSession objects.
+ */
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+ // OnTrigger method, implemented by NiFi BinFiles
+ virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+ // Initialize, over write by NiFi BinFiles
+ virtual void initialize(void);
+
+ protected:
+ // Allows general pre-processing of a flow file before it is offered to a bin. This is called before getGroupId().
+ virtual void preprocessFlowFile(core::ProcessContext *context, core::ProcessSession *session, std::shared_ptr<core::FlowFile> flow);
+ // Returns a group ID representing a bin. This allows flow files to be binned into like groups
+ virtual std::string getGroupId(core::ProcessContext *context, std::shared_ptr<core::FlowFile> flow) {
+ return "";
+ }
+ // Processes a single bin.
+ virtual bool processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
+ return false;
+ }
+ // transfer flows to failure in bin
+ void transferFlowsToFail(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin);
+ // add flows to session
+ void addFlowsToSession(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin);
+
+ BinManager binManager_;
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+ int maxBinCount_;
+};
+
+REGISTER_RESOURCE(BinFiles);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/include/processors/LoadProcessors.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/LoadProcessors.h b/libminifi/include/processors/LoadProcessors.h
index 3e6cfcf..e8d207a 100644
--- a/libminifi/include/processors/LoadProcessors.h
+++ b/libminifi/include/processors/LoadProcessors.h
@@ -29,5 +29,6 @@
#include "LogAttribute.h"
#include "PutFile.h"
#include "TailFile.h"
+#include "MergeContent.h"
#endif /* LIBMINIFI_INCLUDE_PROCESSORS_LOADPROCESSORS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/include/processors/MergeContent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/MergeContent.h b/libminifi/include/processors/MergeContent.h
new file mode 100644
index 0000000..3f4c74a
--- /dev/null
+++ b/libminifi/include/processors/MergeContent.h
@@ -0,0 +1,206 @@
+/**
+ * @file MergeContent.h
+ * MergeContent class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MERGE_CONTENT_H__
+#define __MERGE_CONTENT_H__
+
+#include "processors/BinFiles.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#define MERGE_STRATEGY_BIN_PACK "Bin-Packing Algorithm"
+#define MERGE_STRATEGY_DEFRAGMENT "Defragment"
+#define MERGE_FORMAT_TAR_VALUE "TAR"
+#define MERGE_FORMAT_ZIP_VALUE "ZIP"
+#define MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE "FlowFile Stream, v3"
+#define MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE "FlowFile Stream, v2"
+#define MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE "FlowFile Tar, v1"
+#define MERGE_FORMAT_CONCAT_VALUE "Binary Concatenation"
+#define MERGE_FORMAT_AVRO_VALUE "Avro"
+#define DELIMITER_STRATEGY_FILENAME "Filename"
+#define DELIMITER_STRATEGY_TEXT "Text"
+
+// MergeBin Class
+class MergeBin {
+public:
+ virtual std::string getMergedContentType() = 0;
+ // merge the flows in the bin
+ virtual std::shared_ptr<core::FlowFile> merge(core::ProcessContext *context, core::ProcessSession *session,
+ std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator) = 0;
+};
+
+// BinaryConcatenationMerge Class
+class BinaryConcatenationMerge : public MergeBin {
+public:
+ static const char *mimeType;
+ std::string getMergedContentType() {
+ return mimeType;
+ }
+ std::shared_ptr<core::FlowFile> merge(core::ProcessContext *context, core::ProcessSession *session,
+ std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator);
+ // Nest Callback Class for read stream
+ class ReadCallback : public InputStreamCallback {
+ public:
+ ReadCallback(uint64_t size, std::shared_ptr<io::BaseStream> stream)
+ : buffer_size_(size), stream_(stream) {
+ }
+ ~ReadCallback() {
+ }
+ int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ uint8_t buffer[buffer_size_];
+ int64_t ret = 0;
+ uint64_t read_size;
+ ret = stream->read(buffer, buffer_size_);
+ if (!stream)
+ read_size = stream->getSize();
+ else
+ read_size = buffer_size_;
+ ret = stream_->write(buffer, read_size);
+ return ret;
+ }
+ uint64_t buffer_size_;
+ std::shared_ptr<io::BaseStream> stream_;
+ };
+ // Nest Callback Class for write stream
+ class WriteCallback: public OutputStreamCallback {
+ public:
+ WriteCallback(std::string &header, std::string &footer, std::string &demarcator, std::deque<std::shared_ptr<core::FlowFile>> &flows, core::ProcessSession *session) :
+ header_(header), footer_(footer), demarcator_(demarcator), flows_(flows), session_(session) {
+ }
+ std::string &header_;
+ std::string &footer_;
+ std::string &demarcator_;
+ std::deque<std::shared_ptr<core::FlowFile>> &flows_;
+ core::ProcessSession *session_;
+ int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t ret = 0;
+ if (!header_.empty()) {
+ int64_t len = stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(header_.data())), header_.size());
+ if (len < 0)
+ return len;
+ ret += len;
+ }
+ bool isFirst = true;
+ for (auto flow : flows_) {
+ if (!isFirst && !demarcator_.empty()) {
+ int64_t len = stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(demarcator_.data())), demarcator_.size());
+ if (len < 0)
+ return len;
+ ret += len;
+ }
+ ReadCallback readCb(flow->getSize(), stream);
+ session_->read(flow, &readCb);
+ ret += flow->getSize();
+ isFirst = false;
+ }
+ if (!footer_.empty()) {
+ int64_t len = stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(footer_.data())), footer_.size());
+ if (len < 0)
+ return len;
+ ret += len;
+ }
+ return ret;
+ }
+ };
+};
+
+
+// MergeContent Class
+class MergeContent : public processors::BinFiles {
+ public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ explicit MergeContent(std::string name, uuid_t uuid = NULL)
+ : processors::BinFiles(name, uuid),
+ logger_(logging::LoggerFactory<MergeContent>::getLogger()) {
+ mergeStratgey_ = MERGE_STRATEGY_DEFRAGMENT;
+ mergeFormat_ = MERGE_FORMAT_CONCAT_VALUE;
+ delimiterStratgey_ = DELIMITER_STRATEGY_FILENAME;
+ keepPath_ = false;
+ }
+ // Destructor
+ virtual ~MergeContent() {
+ }
+ // Processor Name
+ static constexpr char const* ProcessorName = "MergeContent";
+ // Supported Properties
+ static core::Property MergeStrategy;
+ static core::Property MergeFormat;
+ static core::Property CorrelationAttributeName;
+ static core::Property DelimiterStratgey;
+ static core::Property KeepPath;
+ static core::Property Header;
+ static core::Property Footer;
+ static core::Property Demarcator;
+
+ // Supported Relationships
+ static core::Relationship Merge;
+
+ public:
+ /**
+ * Function that's executed when the processor is scheduled.
+ * @param context process context.
+ * @param sessionFactory process session factory that is used when creating
+ * ProcessSession objects.
+ */
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+ // OnTrigger method, implemented by NiFi MergeContent
+ virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+ // Initialize, over write by NiFi MergeContent
+ virtual void initialize(void);
+ virtual bool processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin);
+
+ protected:
+ // Returns a group ID representing a bin. This allows flow files to be binned into like groups
+ virtual std::string getGroupId(core::ProcessContext *context, std::shared_ptr<core::FlowFile> flow);
+ // check whether the defragment bin is validate
+ bool checkDefragment(std::unique_ptr<Bin> &bin);
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+ std::string mergeStratgey_;
+ std::string mergeFormat_;
+ std::string correlationAttributeName_;
+ bool keepPath_;
+ std::string delimiterStratgey_;
+ std::string header_;
+ std::string footer_;
+ std::string demarcator_;
+ std::string headerContent_;
+ std::string footerContent_;
+ std::string demarcatorContent_;
+ // readContent
+ std::string readContent(std::string path);
+};
+
+REGISTER_RESOURCE(MergeContent);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index c69b361..b3035cb 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -50,6 +50,10 @@ std::shared_ptr<core::FlowFile> ProcessSession::create() {
return record;
}
+void ProcessSession::add(std::shared_ptr<core::FlowFile> &record) {
+ _addedFlowFiles[record->getUUIDStr()] = record;
+}
+
std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::FlowFile> &&parent) {
std::map<std::string, std::string> empty;
std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/src/processors/BinFiles.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/BinFiles.cpp b/libminifi/src/processors/BinFiles.cpp
new file mode 100644
index 0000000..bd4afca
--- /dev/null
+++ b/libminifi/src/processors/BinFiles.cpp
@@ -0,0 +1,303 @@
+/**
+ * @file BinFiles.cpp
+ * BinFiles class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "processors/BinFiles.h"
+#include <stdio.h>
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
+#include <queue>
+#include <map>
+#include <deque>
+#include <utility>
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property BinFiles::MinSize("Minimum Group Size", "The minimum size of for the bundle", "0");
+core::Property BinFiles::MaxSize("Maximum Group Size", "The maximum size for the bundle. If not specified, there is no maximum.", "");
+core::Property BinFiles::MinEntries("Minimum Number of Entries", "The minimum number of files to include in a bundle", "1");
+core::Property BinFiles::MaxEntries("Maximum Number of Entries", "The maximum number of files to include in a bundle. If not specified, there is no maximum.", "");
+core::Property BinFiles::MaxBinAge("Max Bin Age", "The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit>", "");
+core::Property BinFiles::MaxBinCount("Maximum number of Bins", "Specifies the maximum number of bins that can be held in memory at any one time", "100");
+core::Relationship BinFiles::Original("original", "The FlowFiles that were used to create the bundle");
+core::Relationship BinFiles::Failure("failure", "If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure");
+const char *BinFiles::FRAGMENT_COUNT_ATTRIBUTE = "fragment.count";
+const char *BinFiles::FRAGMENT_ID_ATTRIBUTE = "fragment.identifier";
+const char *BinFiles::FRAGMENT_INDEX_ATTRIBUTE = "fragment.index";
+const char *BinFiles::SEGMENT_COUNT_ATTRIBUTE = "segment.count";
+const char *BinFiles::SEGMENT_ID_ATTRIBUTE = "segment.identifier";
+const char *BinFiles::SEGMENT_INDEX_ATTRIBUTE = "segment.index";
+const char *BinFiles::SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+
+void BinFiles::initialize() {
+ // Set the supported properties
+ std::set<core::Property> properties;
+ properties.insert(MinSize);
+ properties.insert(MaxSize);
+ properties.insert(MinEntries);
+ properties.insert(MaxEntries);
+ properties.insert(MaxBinAge);
+ properties.insert(MaxBinCount);
+ setSupportedProperties(properties);
+ // Set the supported relationships
+ std::set<core::Relationship> relationships;
+ relationships.insert(Original);
+ relationships.insert(Failure);
+ setSupportedRelationships(relationships);
+}
+
+void BinFiles::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+ std::string value;
+ int64_t valInt;
+ if (context->getProperty(MinSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
+ this->binManager_.setMinSize(valInt);
+ logger_->log_info("BinFiles: MinSize [%d]", valInt);
+ }
+ value = "";
+ if (context->getProperty(MaxSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
+ this->binManager_.setMaxSize(valInt);
+ logger_->log_info("BinFiles: MaxSize [%d]", valInt);
+ }
+ value = "";
+ if (context->getProperty(MinEntries.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
+ this->binManager_.setMinEntries(valInt);
+ logger_->log_info("BinFiles: MinEntries [%d]", valInt);
+ }
+ value = "";
+ if (context->getProperty(MaxEntries.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
+ this->binManager_.setMaxEntries(valInt);
+ logger_->log_info("BinFiles: MaxEntries [%d]", valInt);
+ }
+ value = "";
+ if (context->getProperty(MaxBinCount.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
+ maxBinCount_ = static_cast<int> (valInt);
+ logger_->log_info("BinFiles: MaxBinCount [%d]", valInt);
+ }
+ value = "";
+ if (context->getProperty(MaxBinAge.getName(), value) && !value.empty()) {
+ core::TimeUnit unit;
+ if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
+ this->binManager_.setBinAge(valInt);
+ logger_->log_info("BinFiles: MaxBinAge [%d]", valInt);
+ }
+ }
+}
+
+void BinFiles::preprocessFlowFile(core::ProcessContext *context, core::ProcessSession *session, std::shared_ptr<core::FlowFile> flow) {
+ // handle backward compatibility with old segment attributes
+ std::string value;
+ if (!flow->getAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, value) && flow->getAttribute(BinFiles::SEGMENT_COUNT_ATTRIBUTE, value)) {
+ flow->setAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, value);
+ }
+ if (!flow->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value) && flow->getAttribute(BinFiles::SEGMENT_INDEX_ATTRIBUTE, value)) {
+ flow->setAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value);
+ }
+ if (!flow->getAttribute(BinFiles::FRAGMENT_ID_ATTRIBUTE, value) && flow->getAttribute(BinFiles::SEGMENT_ID_ATTRIBUTE, value)) {
+ flow->setAttribute(BinFiles::FRAGMENT_ID_ATTRIBUTE, value);
+ }
+}
+
+void BinManager::gatherReadyBins() {
+ std::lock_guard < std::mutex > lock(mutex_);
+ std::vector< std::string > emptyQueue;
+ for (std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>> >::iterator it=groupBinMap_.begin(); it !=groupBinMap_.end(); ++it) {
+ std::unique_ptr < std::deque<std::unique_ptr<Bin>>>&queue = it->second;
+ while (!queue->empty()) {
+ std::unique_ptr<Bin> &bin = queue->front();
+ if (bin->isReadyForMerge() || (binAge_ != ULLONG_MAX && bin->isOlderThan(binAge_))) {
+ readyBin_.push_back(std::move(bin));
+ queue->pop_front();
+ binCount_--;
+ logger_->log_info("BinManager move bin %s to ready bins for group %s", readyBin_.back()->getUUIDStr(), readyBin_.back()->getGroupId());
+ } else {
+ break;
+ }
+ }
+ if (queue->empty()) {
+ emptyQueue.push_back(it->first);
+ }
+ }
+ for (auto group : emptyQueue) {
+ // erase from the map if the queue is empty for the group
+ groupBinMap_.erase(group);
+ }
+ logger_->log_info("BinManager groupBinMap size %d", groupBinMap_.size());
+}
+
+void BinManager::removeOldestBin() {
+ std::lock_guard < std::mutex > lock(mutex_);
+ uint64_t olddate = ULLONG_MAX;
+ std::unique_ptr < std::deque<std::unique_ptr<Bin>>>*oldqueue;
+ for (std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>>>::iterator it=groupBinMap_.begin(); it !=groupBinMap_.end(); ++it) {
+ std::unique_ptr < std::deque<std::unique_ptr<Bin>>>&queue = it->second;
+ if (!queue->empty()) {
+ std::unique_ptr<Bin> &bin = queue->front();
+ if (bin->getBinAge() < olddate) {
+ olddate = bin->getBinAge();
+ oldqueue = &queue;
+ }
+ }
+ }
+ if (olddate != ULLONG_MAX) {
+ std::unique_ptr<Bin> &remove = (*oldqueue)->front();
+ std::string group = remove->getGroupId();
+ readyBin_.push_back(std::move(remove));
+ (*oldqueue)->pop_front();
+ binCount_--;
+ logger_->log_info("BinManager move bin %s to ready bins for group %s", readyBin_.back()->getUUIDStr(), readyBin_.back()->getGroupId());
+ if ((*oldqueue)->empty()) {
+ groupBinMap_.erase(group);
+ }
+ }
+ logger_->log_info("BinManager groupBinMap size %d", groupBinMap_.size());
+}
+
+void BinManager::getReadyBin(std::deque<std::unique_ptr<Bin>> &retBins) {
+ std::lock_guard < std::mutex > lock(mutex_);
+ while (!readyBin_.empty()) {
+ std::unique_ptr<Bin> &bin = readyBin_.front();
+ retBins.push_back(std::move(bin));
+ readyBin_.pop_front();
+ }
+}
+
+bool BinManager::offer(const std::string &group, std::shared_ptr<core::FlowFile> flow) {
+ std::lock_guard < std::mutex > lock(mutex_);
+ if (flow->getSize() > maxSize_) {
+ // could not be added to a bin -- too large by itself, so create a separate bin for just this guy.
+ std::unique_ptr<Bin> bin = std::unique_ptr < Bin > (new Bin(0, ULLONG_MAX, 1, INT_MAX, "", group));
+ if (!bin->offer(flow))
+ return false;
+ readyBin_.push_back(std::move(bin));
+ logger_->log_info("BinManager move bin %s to ready bins for group %s", readyBin_.back()->getUUIDStr(), group);
+ return true;
+ }
+ auto search = groupBinMap_.find(group);
+ if (search != groupBinMap_.end()) {
+ std::unique_ptr < std::deque<std::unique_ptr<Bin>>>&queue = search->second;
+ if (!queue->empty()) {
+ std::unique_ptr<Bin> &tail = queue->back();
+ if (!tail->offer(flow)) {
+ // last bin can not offer the flow
+ std::unique_ptr<Bin> bin = std::unique_ptr < Bin > (new Bin(minSize_, maxSize_, minEntries_, maxEntries_, fileCount_, group));
+ if (!bin->offer(flow))
+ return false;
+ queue->push_back(std::move(bin));
+ logger_->log_info("BinManager add bin %s to group %s", queue->back()->getUUIDStr(), group);
+ binCount_++;
+ }
+ } else {
+ std::unique_ptr<Bin> bin = std::unique_ptr < Bin > (new Bin(minSize_, maxSize_, minEntries_, maxEntries_, fileCount_, group));
+ if (!bin->offer(flow))
+ return false;
+ queue->push_back(std::move(bin));
+ binCount_++;
+ logger_->log_info("BinManager add bin %s to group %s", queue->back()->getUUIDStr(), group);
+ }
+ } else {
+ std::unique_ptr<std::deque<std::unique_ptr<Bin>>> queue = std::unique_ptr<std::deque<std::unique_ptr<Bin>>> (new std::deque<std::unique_ptr<Bin>>());
+ std::unique_ptr<Bin> bin = std::unique_ptr < Bin > (new Bin(minSize_, maxSize_, minEntries_, maxEntries_, fileCount_, group));
+ if (!bin->offer(flow))
+ return false;
+ queue->push_back(std::move(bin));
+ logger_->log_info("BinManager add bin %s to group %s", queue->back()->getUUIDStr(), group);
+ groupBinMap_.insert(std::make_pair(group, std::move(queue)));
+ binCount_++;
+ }
+
+ return true;
+}
+
+void BinFiles::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+ std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast < FlowFileRecord > (session->get());
+
+ if (flow != nullptr) {
+ preprocessFlowFile(context, session, flow);
+ std::string groupId = getGroupId(context, flow);
+
+ bool offer = this->binManager_.offer(groupId, flow);
+ if (!offer) {
+ session->transfer(flow, Failure);
+ context->yield();
+ return;
+ }
+
+ // remove the flowfile from the process session, it add to merge session later.
+ session->remove(flow);
+ }
+
+ // migrate bin to ready bin
+ this->binManager_.gatherReadyBins();
+ if (this->binManager_.getBinCount() > maxBinCount_) {
+ // bin count reach max allowed
+ context->yield();
+ logger_->log_info("BinFiles reach max bin count %d", this->binManager_.getBinCount());
+ this->binManager_.removeOldestBin();
+ }
+
+ // get the ready bin
+ std::deque<std::unique_ptr<Bin>> readyBins;
+ binManager_.getReadyBin(readyBins);
+
+ // process the ready bin
+ if (!readyBins.empty()) {
+ // create session for merge
+ core::ProcessSession mergeSession(context);
+ while (!readyBins.empty()) {
+ std::unique_ptr<Bin> bin = std::move(readyBins.front());
+ readyBins.pop_front();
+ // add bin's flows to the session
+ this->addFlowsToSession(context, &mergeSession, bin);
+ logger_->log_info("BinFiles start to process bin %s for group %s", bin->getUUIDStr(), bin->getGroupId());
+ if (!this->processBin(context, &mergeSession, bin))
+ this->transferFlowsToFail(context, &mergeSession, bin);
+ }
+ mergeSession.commit();
+ }
+}
+
+void BinFiles::transferFlowsToFail(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
+ std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
+ for (auto flow : flows) {
+ session->transfer(flow, Failure);
+ }
+ flows.clear();
+}
+
+void BinFiles::addFlowsToSession(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
+ std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
+ for (auto flow : flows) {
+ session->add(flow);
+ }
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/src/processors/MergeContent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/MergeContent.cpp b/libminifi/src/processors/MergeContent.cpp
new file mode 100644
index 0000000..ee1702f
--- /dev/null
+++ b/libminifi/src/processors/MergeContent.cpp
@@ -0,0 +1,283 @@
+/**
+ * @file MergeContent.cpp
+ * MergeContent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "processors/MergeContent.h"
+#include <stdio.h>
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
+#include <queue>
+#include <map>
+#include <deque>
+#include <utility>
+#include <algorithm>
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property MergeContent::MergeStrategy("Merge Strategy", "Defragment or Bin-Packing Algorithm", MERGE_STRATEGY_DEFRAGMENT);
+core::Property MergeContent::MergeFormat("Merge Format", "Merge Format", MERGE_FORMAT_CONCAT_VALUE);
+core::Property MergeContent::CorrelationAttributeName("Correlation Attribute Name", "Correlation Attribute Name", "");
+core::Property MergeContent::DelimiterStratgey("Delimiter Strategy", "Determines if Header, Footer, and Demarcator should point to files", DELIMITER_STRATEGY_FILENAME);
+core::Property MergeContent::Header("Header File", "Filename specifying the header to use", "");
+core::Property MergeContent::Footer("Footer File", "Filename specifying the footer to use", "");
+core::Property MergeContent::Demarcator("Demarcator File", "Filename specifying the demarcator to use", "");
+core::Property MergeContent::KeepPath("Keep Path", "If using the Zip or Tar Merge Format, specifies whether or not the FlowFiles' paths should be included in their entry", "false");
+core::Relationship MergeContent::Merge("merged", "The FlowFile containing the merged content");
+const char *BinaryConcatenationMerge::mimeType = "application/octet-stream";
+
+void MergeContent::initialize() {
+ // Set the supported properties
+ std::set<core::Property> properties;
+ properties.insert(MinSize);
+ properties.insert(MaxSize);
+ properties.insert(MinEntries);
+ properties.insert(MaxEntries);
+ properties.insert(MaxBinAge);
+ properties.insert(MaxBinCount);
+ properties.insert(MergeStrategy);
+ properties.insert(MergeFormat);
+ properties.insert(CorrelationAttributeName);
+ properties.insert(DelimiterStratgey);
+ properties.insert(Header);
+ properties.insert(Footer);
+ properties.insert(Demarcator);
+ properties.insert(KeepPath);
+ setSupportedProperties(properties);
+ // Set the supported relationships
+ std::set<core::Relationship> relationships;
+ relationships.insert(Original);
+ relationships.insert(Failure);
+ relationships.insert(Merge);
+ setSupportedRelationships(relationships);
+}
+
+std::string MergeContent::readContent(std::string path) {
+ std::string contents;
+ std::ifstream in(path.c_str(), std::ios::in | std::ios::binary);
+ if (in) {
+ in.seekg(0, std::ios::end);
+ contents.resize(in.tellg());
+ in.seekg(0, std::ios::beg);
+ in.read(&contents[0], contents.size());
+ in.close();
+ }
+ return (contents);
+}
+
+void MergeContent::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+ std::string value;
+ BinFiles::onSchedule(context, sessionFactory);
+ if (context->getProperty(MergeStrategy.getName(), value) && !value.empty()) {
+ this->mergeStratgey_ = value;
+ }
+ value = "";
+ if (context->getProperty(MergeFormat.getName(), value) && !value.empty()) {
+ this->mergeFormat_ = value;
+ }
+ value = "";
+ if (context->getProperty(CorrelationAttributeName.getName(), value) && !value.empty()) {
+ this->correlationAttributeName_ = value;
+ }
+ value = "";
+ if (context->getProperty(DelimiterStratgey.getName(), value) && !value.empty()) {
+ this->delimiterStratgey_ = value;
+ }
+ value = "";
+ if (context->getProperty(Header.getName(), value) && !value.empty()) {
+ this->header_ = value;
+ }
+ value = "";
+ if (context->getProperty(Footer.getName(), value) && !value.empty()) {
+ this->footer_ = value;
+ }
+ value = "";
+ if (context->getProperty(Demarcator.getName(), value) && !value.empty()) {
+ this->demarcator_ = value;
+ }
+ value = "";
+ if (context->getProperty(KeepPath.getName(), value) && !value.empty()) {
+ org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, keepPath_);
+ }
+ if (mergeStratgey_ == MERGE_STRATEGY_DEFRAGMENT) {
+ binManager_.setFileCount(FRAGMENT_COUNT_ATTRIBUTE);
+ }
+ logger_->log_info("Merge Content: Strategy [%s] Format [%s] Correlation Attribute [%s] Delimiter [%s]", mergeStratgey_, mergeFormat_, correlationAttributeName_, delimiterStratgey_);
+ logger_->log_info("Merge Content: Footer [%s] Header [%s] Demarcator [%s] KeepPath [%d]", footer_, header_, demarcator_, keepPath_);
+ if (delimiterStratgey_ == DELIMITER_STRATEGY_FILENAME) {
+ if (!header_.empty()) {
+ this->headerContent_ = readContent(header_);
+ }
+ if (!footer_.empty()) {
+ this->footerContent_ = readContent(footer_);
+ }
+ if (!demarcator_.empty()) {
+ this->demarcatorContent_ = readContent(demarcator_);
+ }
+ }
+ if (delimiterStratgey_ == DELIMITER_STRATEGY_TEXT) {
+ this->headerContent_ = header_;
+ this->footerContent_ = footer_;
+ this->demarcatorContent_ = demarcator_;
+ }
+}
+
+std::string MergeContent::getGroupId(core::ProcessContext *context, std::shared_ptr<core::FlowFile> flow) {
+ std::string groupId = "";
+ std::string value;
+ if (!correlationAttributeName_.empty()) {
+ if (flow->getAttribute(correlationAttributeName_, value))
+ groupId = value;
+ }
+ if (groupId.empty() && mergeStratgey_ == MERGE_STRATEGY_DEFRAGMENT) {
+ if (flow->getAttribute(FRAGMENT_ID_ATTRIBUTE, value))
+ groupId = value;
+ }
+ return groupId;
+}
+
+bool MergeContent::checkDefragment(std::unique_ptr<Bin> &bin) {
+ std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
+ if (!flows.empty()) {
+ std::shared_ptr<core::FlowFile> front = flows.front();
+ std::string fragId;
+ if (!front->getAttribute(BinFiles::FRAGMENT_ID_ATTRIBUTE, fragId))
+ return false;
+ std::string fragCount;
+ if (!front->getAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, fragCount))
+ return false;
+ int fragCountInt;
+ try {
+ fragCountInt = std::stoi(fragCount);
+ }
+ catch (...) {
+ return false;
+ }
+ for (auto flow : flows) {
+ std::string value;
+ if (!flow->getAttribute(BinFiles::FRAGMENT_ID_ATTRIBUTE, value))
+ return false;
+ if (value != fragId)
+ return false;
+ if (!flow->getAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, value))
+ return false;
+ if (value != fragCount)
+ return false;
+ if (!flow->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value))
+ return false;
+ try {
+ int index = std::stoi(value);
+ if (index < 0 || index >= fragCountInt)
+ return false;
+ }
+ catch (...) {
+ return false;
+ }
+ }
+ } else {
+ return false;
+ }
+
+ return true;
+}
+
+void MergeContent::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+ BinFiles::onTrigger(context, session);
+}
+
+bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
+ if (mergeStratgey_ != MERGE_STRATEGY_DEFRAGMENT && mergeStratgey_ != MERGE_STRATEGY_BIN_PACK)
+ return false;
+
+ if (mergeStratgey_ == MERGE_STRATEGY_DEFRAGMENT) {
+ // check the flowfile fragment values
+ if (!checkDefragment(bin)) {
+ logger_->log_error("Merge Content check defgrament failed");
+ return false;
+ }
+ // sort the flowfile fragment index
+ std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
+ std::sort(flows.begin(), flows.end(), [] (const std::shared_ptr<core::FlowFile> &first, const std::shared_ptr<core::FlowFile> &second)
+ {std::string value;
+ first->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value);
+ int indexFirst = std::stoi(value);
+ second->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value);
+ int indexSecond = std::stoi(value);
+ if (indexSecond > indexFirst)
+ return true;
+ else
+ return false;
+ });
+ }
+
+ std::unique_ptr<MergeBin> mergeBin;
+ if (mergeFormat_ == MERGE_FORMAT_CONCAT_VALUE) {
+ mergeBin = std::unique_ptr<MergeBin> (new BinaryConcatenationMerge());
+ std::shared_ptr<core::FlowFile> mergeFlow;
+ try {
+ mergeFlow = mergeBin->merge(context, session, bin->getFlowFile(), this->headerContent_, this->footerContent_, this->demarcatorContent_);
+ } catch (...) {
+ logger_->log_error("Merge Content merge catch exception");
+ return false;
+ }
+ session->putAttribute(mergeFlow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(bin->getSize()));
+ // we successfully merge the flow
+ session->transfer(mergeFlow, Merge);
+ std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
+ for (auto flow : flows) {
+ session->transfer(flow, Original);
+ }
+ logger_->log_info("Merge FlowFile record UUID %s, payload length %d", mergeFlow->getUUIDStr(), mergeFlow->getSize());
+ } else {
+ logger_->log_error("Merge format not supported %s", mergeFormat_);
+ return false;
+ }
+ return true;
+}
+
+std::shared_ptr<core::FlowFile> BinaryConcatenationMerge::merge(core::ProcessContext *context, core::ProcessSession *session,
+ std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator) {
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
+ BinaryConcatenationMerge::WriteCallback callback(header, footer, demarcator, flows, session);
+ session->write(flowFile, &callback);
+ session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType());
+ std::string fileName;
+ if (flows.size() == 1) {
+ flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+ } else {
+ flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName);
+ }
+ if (!fileName.empty())
+ session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
+ return flowFile;
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/test/unit/MergeFileTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/MergeFileTests.cpp b/libminifi/test/unit/MergeFileTests.cpp
new file mode 100644
index 0000000..d3e4fa9
--- /dev/null
+++ b/libminifi/test/unit/MergeFileTests.cpp
@@ -0,0 +1,720 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
+#include <uuid/uuid.h>
+#include <fstream>
+#include <map>
+#include <memory>
+#include <utility>
+#include <string>
+#include <set>
+#include "FlowController.h"
+#include "../TestBase.h"
+#include "core/Core.h"
+#include "../../include/core/FlowFile.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include "processors/MergeContent.h"
+#include <sstream>
+#include <iostream>
+
+static const char* FLOW_FILE = "/tmp/minifi-mergecontent";
+static const char* EXPECT_MERGE_CONTENT_FIRST = "/tmp/minifi-expect-mergecontent1.txt";
+static const char* EXPECT_MERGE_CONTENT_SECOND = "/tmp/minifi-expect-mergecontent2.txt";
+static const char* HEADER_FILE = "/tmp/minifi-mergecontent.header";
+static const char* FOOTER_FILE = "/tmp/minifi-mergecontent.footer";
+static const char* DEMARCATOR_FILE = "/tmp/minifi-mergecontent.demarcator";
+
+class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
+ public:
+ explicit ReadCallback(uint64_t size) :
+ read_size_(0) {
+ buffer_size_ = size;
+ buffer_ = new uint8_t[buffer_size_];
+ }
+ ~ReadCallback() {
+ if (buffer_)
+ delete[] buffer_;
+ }
+ int64_t process(std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> stream) {
+ int64_t ret = 0;
+ ret = stream->read(buffer_, buffer_size_);
+ if (!stream)
+ read_size_ = stream->getSize();
+ else
+ read_size_ = buffer_size_;
+ return ret;
+ }
+ uint8_t *buffer_;
+ uint64_t buffer_size_;
+ uint64_t read_size_;
+};
+
+TEST_CASE("MergeFileDefragment", "[mergefiletest1]") {
+ try {
+ std::ofstream expectfileFirst;
+ std::ofstream expectfileSecond;
+
+ expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
+ expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+
+ // Create and write to the test file
+ for (int i = 0; i < 6; i++) {
+ std::ofstream tmpfile;
+ std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+ tmpfile.open(flowFileName.c_str());
+ for (int j = 0; j < 32; j++) {
+ tmpfile << std::to_string(i);
+ if (i < 3)
+ expectfileFirst << std::to_string(i);
+ else
+ expectfileSecond << std::to_string(i);
+ }
+ tmpfile.close();
+ }
+ expectfileFirst.close();
+ expectfileSecond.close();
+
+ TestController testController;
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+ LogTestController::getInstance().setTrace<core::ProcessSession>();
+ LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+
+ std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+ std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
+ std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+ uuid_t processoruuid;
+ REQUIRE(true == processor->getUUID(processoruuid));
+ uuid_t logAttributeuuid;
+ REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+ // connection from merge processor to log attribute
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+ connection->setRelationship(core::Relationship("merged", "Merge successful output"));
+ connection->setSource(processor);
+ connection->setDestination(logAttributeProcessor);
+ connection->setSourceUUID(processoruuid);
+ connection->setDestinationUUID(logAttributeuuid);
+ processor->addConnection(connection);
+ // connection to merge processor
+ std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
+ mergeconnection->setDestination(processor);
+ mergeconnection->setDestinationUUID(processoruuid);
+ processor->addConnection(mergeconnection);
+
+ std::set<core::Relationship> autoTerminatedRelationships;
+ core::Relationship original("original", "");
+ core::Relationship failure("failure", "");
+ autoTerminatedRelationships.insert(original);
+ autoTerminatedRelationships.insert(failure);
+ processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+ processor->incrementActiveTasks();
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ logAttributeProcessor->incrementActiveTasks();
+ logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+
+ core::ProcessorNode node(processor);
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+ core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
+
+ core::ProcessSession sessionGenFlowFile(&context);
+ std::shared_ptr<core::FlowFile> record[6];
+
+ // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+ std::shared_ptr<core::Connectable> income = node.getNextIncomingConnection();
+ std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
+ for (int i = 0; i < 6; i++) {
+ std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+ std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+ sessionGenFlowFile.import(flowFileName, flow, true, 0);
+ // three bundle
+ if (i < 3)
+ flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
+ else
+ flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
+ if (i < 3)
+ flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
+ else
+ flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
+ flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
+ record[i] = flow;
+ }
+ income_connection->put(record[0]);
+ income_connection->put(record[2]);
+ income_connection->put(record[5]);
+ income_connection->put(record[4]);
+ income_connection->put(record[1]);
+ income_connection->put(record[3]);
+
+ REQUIRE(processor->getName() == "mergecontent");
+ core::ProcessSessionFactory factory(&context);
+ processor->onSchedule(&context, &factory);
+ for (int i = 0; i < 6; i++) {
+ core::ProcessSession session(&context);
+ processor->onTrigger(&context, &session);
+ session.commit();
+ }
+ // validate the merge content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
+ std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
+ REQUIRE(flow1->getSize() == 96);
+ {
+ ReadCallback callback(flow1->getSize());
+ sessionGenFlowFile.read(flow1, &callback);
+ std::ifstream file1;
+ file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
+ std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+ std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ REQUIRE(expectContents == contents);
+ file1.close();
+ }
+ REQUIRE(flow2->getSize() == 96);
+ {
+ ReadCallback callback(flow2->getSize());
+ sessionGenFlowFile.read(flow2, &callback);
+ std::ifstream file2;
+ file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
+ std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+ std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ REQUIRE(expectContents == contents);
+ file2.close();
+ }
+ LogTestController::getInstance().reset();
+ for (int i = 0; i < 6; i++) {
+ std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+ unlink(flowFileName.c_str());
+ }
+ unlink(EXPECT_MERGE_CONTENT_FIRST);
+ unlink(EXPECT_MERGE_CONTENT_SECOND);
+ } catch (...) {
+ }
+}
+
+TEST_CASE("MergeFileDefragmentDelimiter", "[mergefiletest2]") {
+ try {
+ std::ofstream expectfileFirst;
+ std::ofstream expectfileSecond;
+ std::ofstream headerfile, footerfile, demarcatorfile;
+ expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
+ expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+ headerfile.open(HEADER_FILE);
+ headerfile << "header";
+ expectfileFirst << "header";
+ expectfileSecond << "header";
+ headerfile.close();
+ footerfile.open(FOOTER_FILE);
+ footerfile << "footer";
+ footerfile.close();
+ demarcatorfile.open(DEMARCATOR_FILE);
+ demarcatorfile << "demarcator";
+ demarcatorfile.close();
+
+ // Create and write to the test file
+ for (int i = 0; i < 6; i++) {
+ if (i != 0 && i <= 2)
+ expectfileFirst << "demarcator";
+ if (i != 3 && i >= 4)
+ expectfileSecond << "demarcator";
+ std::ofstream tmpfile;
+ std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+ tmpfile.open(flowFileName.c_str());
+ for (int j = 0; j < 32; j++) {
+ tmpfile << std::to_string(i);
+ if (i < 3)
+ expectfileFirst << std::to_string(i);
+ else
+ expectfileSecond << std::to_string(i);
+ }
+ tmpfile.close();
+ }
+ expectfileFirst << "footer";
+ expectfileSecond << "footer";
+ expectfileFirst.close();
+ expectfileSecond.close();
+
+ TestController testController;
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+ LogTestController::getInstance().setTrace<core::ProcessSession>();
+ LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+
+ std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+ std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
+ std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+ uuid_t processoruuid;
+ REQUIRE(true == processor->getUUID(processoruuid));
+ uuid_t logAttributeuuid;
+ REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+ // connection from merge processor to log attribute
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+ connection->setRelationship(core::Relationship("merged", "Merge successful output"));
+ connection->setSource(processor);
+ connection->setDestination(logAttributeProcessor);
+ connection->setSourceUUID(processoruuid);
+ connection->setDestinationUUID(logAttributeuuid);
+ processor->addConnection(connection);
+ // connection to merge processor
+ std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
+ mergeconnection->setDestination(processor);
+ mergeconnection->setDestinationUUID(processoruuid);
+ processor->addConnection(mergeconnection);
+
+ std::set<core::Relationship> autoTerminatedRelationships;
+ core::Relationship original("original", "");
+ core::Relationship failure("failure", "");
+ autoTerminatedRelationships.insert(original);
+ autoTerminatedRelationships.insert(failure);
+ processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+ processor->incrementActiveTasks();
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ logAttributeProcessor->incrementActiveTasks();
+ logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+
+ core::ProcessorNode node(processor);
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+ core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_FILENAME);
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::Header, "/tmp/minifi-mergecontent.header");
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer, "/tmp/minifi-mergecontent.footer");
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, "/tmp/minifi-mergecontent.demarcator");
+
+ core::ProcessSession sessionGenFlowFile(&context);
+ std::shared_ptr<core::FlowFile> record[6];
+
+ // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+ std::shared_ptr<core::Connectable> income = node.getNextIncomingConnection();
+ std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
+ for (int i = 0; i < 6; i++) {
+ std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+ std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+ sessionGenFlowFile.import(flowFileName, flow, true, 0);
+ // three bundle
+ if (i < 3)
+ flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
+ else
+ flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
+ if (i < 3)
+ flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
+ else
+ flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
+ flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
+ record[i] = flow;
+ }
+ income_connection->put(record[0]);
+ income_connection->put(record[2]);
+ income_connection->put(record[5]);
+ income_connection->put(record[4]);
+ income_connection->put(record[1]);
+ income_connection->put(record[3]);
+
+ REQUIRE(processor->getName() == "mergecontent");
+ core::ProcessSessionFactory factory(&context);
+ processor->onSchedule(&context, &factory);
+ for (int i = 0; i < 6; i++) {
+ core::ProcessSession session(&context);
+ processor->onTrigger(&context, &session);
+ session.commit();
+ }
+ // validate the merge content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
+ std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
+ REQUIRE(flow1->getSize() == 128);
+ {
+ ReadCallback callback(flow1->getSize());
+ sessionGenFlowFile.read(flow1, &callback);
+ std::ifstream file1;
+ file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
+ std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+ std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ REQUIRE(expectContents == contents);
+ file1.close();
+ }
+ REQUIRE(flow2->getSize() == 128);
+ {
+ ReadCallback callback(flow2->getSize());
+ sessionGenFlowFile.read(flow2, &callback);
+ std::ifstream file2;
+ file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
+ std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+ std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ REQUIRE(expectContents == contents);
+ file2.close();
+ }
+ LogTestController::getInstance().reset();
+ for (int i = 0; i < 6; i++) {
+ std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+ unlink(flowFileName.c_str());
+ }
+ unlink(EXPECT_MERGE_CONTENT_FIRST);
+ unlink(EXPECT_MERGE_CONTENT_SECOND);
+ unlink(FOOTER_FILE);
+ unlink(HEADER_FILE);
+ unlink(DEMARCATOR_FILE);
+ } catch (...) {
+ }
+}
+
+TEST_CASE("MergeFileDefragmentDropFlow", "[mergefiletest3]") {
+ try {
+ std::ofstream expectfileFirst;
+ std::ofstream expectfileSecond;
+
+ expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
+ expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+
+ // Create and write to the test file, drop record 4
+ for (int i = 0; i < 6; i++) {
+ if (i == 4)
+ continue;
+ std::ofstream tmpfile;
+ std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+ tmpfile.open(flowFileName.c_str());
+ for (int j = 0; j < 32; j++) {
+ tmpfile << std::to_string(i);
+ if (i < 3)
+ expectfileFirst << std::to_string(i);
+ else
+ expectfileSecond << std::to_string(i);
+ }
+ tmpfile.close();
+ }
+ expectfileFirst.close();
+ expectfileSecond.close();
+
+ TestController testController;
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+ LogTestController::getInstance().setTrace<core::ProcessSession>();
+ LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+
+ std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+ std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
+ std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+ uuid_t processoruuid;
+ REQUIRE(true == processor->getUUID(processoruuid));
+ uuid_t logAttributeuuid;
+ REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+ // connection from merge processor to log attribute
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+ connection->setRelationship(core::Relationship("merged", "Merge successful output"));
+ connection->setSource(processor);
+ connection->setDestination(logAttributeProcessor);
+ connection->setSourceUUID(processoruuid);
+ connection->setDestinationUUID(logAttributeuuid);
+ processor->addConnection(connection);
+ // connection to merge processor
+ std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
+ mergeconnection->setDestination(processor);
+ mergeconnection->setDestinationUUID(processoruuid);
+ processor->addConnection(mergeconnection);
+
+ std::set<core::Relationship> autoTerminatedRelationships;
+ core::Relationship original("original", "");
+ core::Relationship failure("failure", "");
+ autoTerminatedRelationships.insert(original);
+ autoTerminatedRelationships.insert(failure);
+ processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+ processor->incrementActiveTasks();
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ logAttributeProcessor->incrementActiveTasks();
+ logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+
+ core::ProcessorNode node(processor);
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+ core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec");
+
+ core::ProcessSession sessionGenFlowFile(&context);
+ std::shared_ptr<core::FlowFile> record[6];
+
+ // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+ std::shared_ptr<core::Connectable> income = node.getNextIncomingConnection();
+ std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
+ for (int i = 0; i < 6; i++) {
+ if (i == 4)
+ continue;
+ std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+ std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+ sessionGenFlowFile.import(flowFileName, flow, true, 0);
+ // three bundle
+ if (i < 3)
+ flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
+ else
+ flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
+ if (i < 3)
+ flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
+ else
+ flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
+ flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
+ record[i] = flow;
+ }
+ income_connection->put(record[0]);
+ income_connection->put(record[2]);
+ income_connection->put(record[5]);
+ income_connection->put(record[1]);
+ income_connection->put(record[3]);
+
+ REQUIRE(processor->getName() == "mergecontent");
+ core::ProcessSessionFactory factory(&context);
+ processor->onSchedule(&context, &factory);
+ for (int i = 0; i < 6; i++) {
+ if (i == 4)
+ continue;
+ core::ProcessSession session(&context);
+ processor->onTrigger(&context, &session);
+ session.commit();
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(2000));
+ {
+ core::ProcessSession session(&context);
+ processor->onTrigger(&context, &session);
+ }
+ // validate the merge content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
+ std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
+ REQUIRE(flow1->getSize() == 96);
+ {
+ ReadCallback callback(flow1->getSize());
+ sessionGenFlowFile.read(flow1, &callback);
+ std::ifstream file1;
+ file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
+ std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+ std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ REQUIRE(expectContents == contents);
+ file1.close();
+ }
+ REQUIRE(flow2->getSize() == 64);
+ {
+ ReadCallback callback(flow2->getSize());
+ sessionGenFlowFile.read(flow2, &callback);
+ std::ifstream file2;
+ file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
+ std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+ std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ REQUIRE(expectContents == contents);
+ file2.close();
+ }
+ LogTestController::getInstance().reset();
+ for (int i = 0; i < 6; i++) {
+ if (i == 4)
+ continue;
+ std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+ unlink(flowFileName.c_str());
+ }
+ unlink(EXPECT_MERGE_CONTENT_FIRST);
+ unlink(EXPECT_MERGE_CONTENT_SECOND);
+ } catch (...) {
+ }
+}
+
+TEST_CASE("MergeFileBinPack", "[mergefiletest4]") {
+ try {
+ std::ofstream expectfileFirst;
+ std::ofstream expectfileSecond;
+ expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
+ expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+
+ // Create and write to the test file
+ for (int i = 0; i < 6; i++) {
+ std::ofstream tmpfile;
+ std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+ tmpfile.open(flowFileName.c_str());
+ for (int j = 0; j < 32; j++) {
+ tmpfile << std::to_string(i);
+ if (i < 3)
+ expectfileFirst << std::to_string(i);
+ else
+ expectfileSecond << std::to_string(i);
+ }
+ tmpfile.close();
+ }
+ expectfileFirst.close();
+ expectfileSecond.close();
+
+ TestController testController;
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+ LogTestController::getInstance().setTrace<core::ProcessSession>();
+ LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+
+ std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+ std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
+ std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+ uuid_t processoruuid;
+ REQUIRE(true == processor->getUUID(processoruuid));
+ uuid_t logAttributeuuid;
+ REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+ // connection from merge processor to log attribute
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+ connection->setRelationship(core::Relationship("merged", "Merge successful output"));
+ connection->setSource(processor);
+ connection->setDestination(logAttributeProcessor);
+ connection->setSourceUUID(processoruuid);
+ connection->setDestinationUUID(logAttributeuuid);
+ processor->addConnection(connection);
+ // connection to merge processor
+ std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
+ mergeconnection->setDestination(processor);
+ mergeconnection->setDestinationUUID(processoruuid);
+ processor->addConnection(mergeconnection);
+
+ std::set<core::Relationship> autoTerminatedRelationships;
+ core::Relationship original("original", "");
+ core::Relationship failure("failure", "");
+ autoTerminatedRelationships.insert(original);
+ autoTerminatedRelationships.insert(failure);
+ processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+ processor->incrementActiveTasks();
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ logAttributeProcessor->incrementActiveTasks();
+ logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+
+ core::ProcessorNode node(processor);
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+ core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
+ context.setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+
+ core::ProcessSession sessionGenFlowFile(&context);
+ std::shared_ptr<core::FlowFile> record[6];
+
+ // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+ std::shared_ptr<core::Connectable> income = node.getNextIncomingConnection();
+ std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
+ for (int i = 0; i < 6; i++) {
+ std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+ std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+ sessionGenFlowFile.import(flowFileName, flow, true, 0);
+ flow->setAttribute("tag", "tag");
+ record[i] = flow;
+ }
+ income_connection->put(record[0]);
+ income_connection->put(record[1]);
+ income_connection->put(record[2]);
+ income_connection->put(record[3]);
+ income_connection->put(record[4]);
+ income_connection->put(record[5]);
+
+ REQUIRE(processor->getName() == "mergecontent");
+ core::ProcessSessionFactory factory(&context);
+ processor->onSchedule(&context, &factory);
+ for (int i = 0; i < 6; i++) {
+ core::ProcessSession session(&context);
+ processor->onTrigger(&context, &session);
+ session.commit();
+ }
+ // validate the merge content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
+ std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
+ REQUIRE(flow1->getSize() == 96);
+ {
+ ReadCallback callback(flow1->getSize());
+ sessionGenFlowFile.read(flow1, &callback);
+ std::ifstream file1;
+ file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
+ std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+ std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ REQUIRE(expectContents == contents);
+ file1.close();
+ }
+ REQUIRE(flow2->getSize() == 96);
+ {
+ ReadCallback callback(flow2->getSize());
+ sessionGenFlowFile.read(flow2, &callback);
+ std::ifstream file2;
+ file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
+ std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+ std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ REQUIRE(expectContents == contents);
+ file2.close();
+ }
+ LogTestController::getInstance().reset();
+ for (int i = 0; i < 6; i++) {
+ std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+ unlink(flowFileName.c_str());
+ }
+ unlink(EXPECT_MERGE_CONTENT_FIRST);
+ unlink(EXPECT_MERGE_CONTENT_SECOND);
+ } catch (...) {
+ }
+}
+
+