You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2017/09/22 17:47:01 UTC

nifi-minifi-cpp git commit: Create Merge Content processor

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 6283a447e -> 8b3b1c880


Create Merge Content processor

This closes #133.

Signed-off-by: Marc Parisi <ph...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/8b3b1c88
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/8b3b1c88
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/8b3b1c88

Branch: refs/heads/master
Commit: 8b3b1c880d92dfc8289af80ad883e254a23bc356
Parents: 6283a44
Author: Bin Qiu <be...@gmail.com>
Authored: Thu Aug 24 10:04:32 2017 -0700
Committer: Marc Parisi <ph...@apache.org>
Committed: Fri Sep 22 13:41:23 2017 -0400

----------------------------------------------------------------------
 README.md                                     |   1 +
 libminifi/include/core/FlowConfiguration.h    |   1 +
 libminifi/include/core/ProcessSession.h       |   2 +
 libminifi/include/processors/BinFiles.h       | 296 +++++++++
 libminifi/include/processors/LoadProcessors.h |   1 +
 libminifi/include/processors/MergeContent.h   | 206 ++++++
 libminifi/src/core/ProcessSession.cpp         |   4 +
 libminifi/src/processors/BinFiles.cpp         | 303 +++++++++
 libminifi/src/processors/MergeContent.cpp     | 283 ++++++++
 libminifi/test/unit/MergeFileTests.cpp        | 720 +++++++++++++++++++++
 10 files changed, 1817 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 8830aaa..db10ab4 100644
--- a/README.md
+++ b/README.md
@@ -56,6 +56,7 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a
   * ListenSyslog
   * PutFile
   * TailFile
+  * MergeContent
 * Provenance events generation is supported and are persisted using levelDB.
 
 ## System Requirements

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index 43d2bc0..d9ebc72 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -35,6 +35,7 @@
 #include "processors/LogAttribute.h"
 #include "processors/ExecuteProcess.h"
 #include "processors/AppendHostInfo.h"
+#include "processors/MergeContent.h"
 #include "core/Processor.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "core/ProcessContext.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/include/core/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index d853e9b..3a3b143 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -79,6 +79,8 @@ class ProcessSession {
   std::shared_ptr<core::FlowFile> create(std::shared_ptr<core::FlowFile> &parent) {
     return create(parent);
   }
+  // Add a FlowFile to the session
+  void add(std::shared_ptr<core::FlowFile> &flow);
 // Clone a new UUID FlowFile from parent both for content resource claim and attributes
   std::shared_ptr<core::FlowFile> clone(std::shared_ptr<core::FlowFile> &parent);
   // Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/include/processors/BinFiles.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/BinFiles.h b/libminifi/include/processors/BinFiles.h
new file mode 100644
index 0000000..6c619a8
--- /dev/null
+++ b/libminifi/include/processors/BinFiles.h
@@ -0,0 +1,296 @@
+/**
+ * @file BinFiles.h
+ * BinFiles class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __BIN_FILES_H__
+#define __BIN_FILES_H__
+
+#include <climits>
+#include <deque>
+#include <map>
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// Bin Class
+class Bin {
+ public:
+  // Constructor
+   /*!
+    * Create a new Bin. Note: this object is not thread safe
+    */
+  explicit Bin(const uint64_t &minSize, const uint64_t &maxSize, const int &minEntries, const int & maxEntries,
+      const std::string &fileCount, const std::string &groupId)
+      : minSize_(minSize), maxSize_(maxSize), maxEntries_(maxEntries), minEntries_(minEntries), fileCount_(fileCount),
+        groupId_(groupId), logger_(logging::LoggerFactory<Bin>::getLogger()) {
+    queued_data_size_ = 0;
+    creation_dated_ = getTimeMillis();
+    std::shared_ptr<utils::IdGenerator> id_generator = utils::IdGenerator::getIdGenerator();
+    char uuidStr[37] = { 0 };
+    id_generator->generate(uuid_);
+    uuid_unparse_lower(uuid_, uuidStr);
+    uuid_str_ = uuidStr;
+    logger_->log_info("Bin %s for group %s created", uuid_str_, groupId_);
+  }
+  virtual ~Bin() {
+    logger_->log_info("Bin %s for group %s destroyed", uuid_str_, groupId_);
+  }
+  // check whether the bin is full
+  bool isFull() {
+    if (queued_data_size_ >= maxSize_ || queue_.size() >= maxEntries_)
+      return true;
+    else
+      return false;
+  }
+  // check whether the bin meet the min required size and entries so that it can be processed for merge
+  bool isReadyForMerge() {
+    return isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_);
+  }
+  // check whether the bin is older than the time specified in msec
+  bool isOlderThan(const uint64_t &duration) {
+    uint64_t currentTime = getTimeMillis();
+    if (currentTime > (creation_dated_ + duration))
+      return true;
+    else
+      return false;
+  }
+  std::deque<std::shared_ptr<core::FlowFile>> & getFlowFile() {
+    return queue_;
+  }
+  // offer the flowfile to the bin
+  bool offer(std::shared_ptr<core::FlowFile> flow) {
+    if (!fileCount_.empty()) {
+      std::string value;
+      if (flow->getAttribute(fileCount_, value)) {
+        try {
+          // for defrag case using the identification
+          int count = std::stoi(value);
+          maxEntries_ = count;
+          minEntries_ = count;
+        } catch (...) {
+
+        }
+      }
+    }
+
+    if ((queued_data_size_ + flow->getSize()) > maxSize_ || (queue_.size() + 1) > maxEntries_)
+      return false;
+
+    queue_.push_back(flow);
+    queued_data_size_ += flow->getSize();
+    logger_->log_info("Bin %s for group %s offer size %d byte %d min_entry %d max_entry %d",
+        uuid_str_, groupId_, queue_.size(), queued_data_size_, minEntries_, maxEntries_);
+
+    return true;
+  }
+  // getBinAge
+  uint64_t getBinAge() {
+    return creation_dated_;
+  }
+  int getSize() {
+    return queue_.size();
+  }
+  // Get the UUID as string
+  std::string getUUIDStr() {
+    return uuid_str_;
+  }
+  std::string getGroupId() {
+    return groupId_;
+  }
+
+ protected:
+
+ private:
+  uint64_t minSize_;
+  uint64_t maxSize_;
+  int maxEntries_;
+  int minEntries_;
+  // Queued data size
+  uint64_t queued_data_size_;
+   // Queue for the Flow File
+  std::deque<std::shared_ptr<core::FlowFile>> queue_;
+  uint64_t creation_dated_;
+  std::string fileCount_;
+  std::string groupId_;
+  std::shared_ptr<logging::Logger> logger_;
+  // A global unique identifier
+  uuid_t uuid_;
+  // UUID string
+  std::string uuid_str_;
+};
+
+// BinManager Class
+class BinManager {
+ public:
+  // Constructor
+   /*!
+    * Create a new BinManager
+    */
+  BinManager()
+      : minSize_(0), maxSize_(ULLONG_MAX), maxEntries_(INT_MAX), minEntries_(1), binAge_(ULLONG_MAX), binCount_(0),
+        logger_(logging::LoggerFactory<BinManager>::getLogger()) {
+  }
+  virtual ~BinManager() {
+    purge();
+  }
+  void setMinSize(const uint64_t &size) {
+    minSize_ = size;
+  }
+  void setMaxSize(const uint64_t &size) {
+    maxSize_ = size;
+  }
+  void setMaxEntries(const int &entries) {
+    maxEntries_ = entries;
+  }
+  void setMinEntries(const int &entries) {
+    minEntries_ = entries;
+  }
+  void setBinAge(const uint64_t &age) {
+    binAge_ = age;
+  }
+  int getBinCount() {
+    return binCount_;
+  }
+  void setFileCount(const std::string &value) {
+    fileCount_ = value;
+  }
+  void purge() {
+    std::lock_guard < std::mutex > lock(mutex_);
+    groupBinMap_.clear();
+    binCount_ = 0;
+  }
+  // Adds the given flowFile to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary.
+  bool offer(const std::string &group, std::shared_ptr<core::FlowFile> flow);
+  // gather ready bins once the bin are full enough or exceed bin age
+  void gatherReadyBins();
+  // remove oldest bin
+  void removeOldestBin();
+  // get ready bin from binManager
+  void getReadyBin(std::deque<std::unique_ptr<Bin>> &retBins);
+
+ protected:
+
+ private:
+  std::mutex mutex_;
+  uint64_t minSize_;
+  uint64_t maxSize_;
+  int maxEntries_;
+  int minEntries_;
+  std::string fileCount_;
+  // Bin Age in msec
+  uint64_t binAge_;
+  std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>>> groupBinMap_;
+  std::deque<std::unique_ptr<Bin>> readyBin_;
+  int binCount_;
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+// BinFiles Class
+class BinFiles : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit BinFiles(std::string name, uuid_t uuid = NULL)
+      : core::Processor(name, uuid),
+        logger_(logging::LoggerFactory<BinFiles>::getLogger()) {
+    maxBinCount_ = 100;
+  }
+  // Destructor
+  virtual ~BinFiles() {
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "BinFiles";
+  // Supported Properties
+  static core::Property MinSize;
+  static core::Property MaxSize;
+  static core::Property MinEntries;
+  static core::Property MaxEntries;
+  static core::Property MaxBinCount;
+  static core::Property MaxBinAge;
+
+  // Supported Relationships
+  static core::Relationship Failure;
+  static core::Relationship Original;
+
+  // attributes
+  static const char *FRAGMENT_ID_ATTRIBUTE;
+  static const char *FRAGMENT_INDEX_ATTRIBUTE;
+  static const char *FRAGMENT_COUNT_ATTRIBUTE;
+
+  static const char *SEGMENT_ID_ATTRIBUTE;
+  static const char *SEGMENT_INDEX_ATTRIBUTE;
+  static const char *SEGMENT_COUNT_ATTRIBUTE;
+  static const char *SEGMENT_ORIGINAL_FILENAME;
+
+ public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+  // OnTrigger method, implemented by NiFi BinFiles
+  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+  // Initialize, over write by NiFi BinFiles
+  virtual void initialize(void);
+
+ protected:
+  // Allows general pre-processing of a flow file before it is offered to a bin. This is called before getGroupId().
+  virtual void preprocessFlowFile(core::ProcessContext *context, core::ProcessSession *session, std::shared_ptr<core::FlowFile> flow);
+  // Returns a group ID representing a bin. This allows flow files to be binned into like groups
+  virtual std::string getGroupId(core::ProcessContext *context, std::shared_ptr<core::FlowFile> flow) {
+    return "";
+  }
+  // Processes a single bin.
+  virtual bool processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
+    return false;
+  }
+  // transfer flows to failure in bin
+  void transferFlowsToFail(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin);
+  // add flows to session
+  void addFlowsToSession(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin);
+
+  BinManager binManager_;
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+  int maxBinCount_;
+};
+
+REGISTER_RESOURCE(BinFiles);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/include/processors/LoadProcessors.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/LoadProcessors.h b/libminifi/include/processors/LoadProcessors.h
index 3e6cfcf..e8d207a 100644
--- a/libminifi/include/processors/LoadProcessors.h
+++ b/libminifi/include/processors/LoadProcessors.h
@@ -29,5 +29,6 @@
 #include "LogAttribute.h"
 #include "PutFile.h"
 #include "TailFile.h"
+#include "MergeContent.h"
 
 #endif /* LIBMINIFI_INCLUDE_PROCESSORS_LOADPROCESSORS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/include/processors/MergeContent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/MergeContent.h b/libminifi/include/processors/MergeContent.h
new file mode 100644
index 0000000..3f4c74a
--- /dev/null
+++ b/libminifi/include/processors/MergeContent.h
@@ -0,0 +1,206 @@
+/**
+ * @file MergeContent.h
+ * MergeContent class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MERGE_CONTENT_H__
+#define __MERGE_CONTENT_H__
+
+#include "processors/BinFiles.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#define MERGE_STRATEGY_BIN_PACK "Bin-Packing Algorithm"
+#define MERGE_STRATEGY_DEFRAGMENT "Defragment"
+#define MERGE_FORMAT_TAR_VALUE "TAR"
+#define MERGE_FORMAT_ZIP_VALUE "ZIP"
+#define MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE "FlowFile Stream, v3"
+#define MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE  "FlowFile Stream, v2"
+#define MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE  "FlowFile Tar, v1"
+#define MERGE_FORMAT_CONCAT_VALUE "Binary Concatenation"
+#define MERGE_FORMAT_AVRO_VALUE "Avro"
+#define DELIMITER_STRATEGY_FILENAME "Filename"
+#define DELIMITER_STRATEGY_TEXT "Text"
+
+// MergeBin Class
+class MergeBin {
+public:
+  virtual std::string getMergedContentType() = 0;
+  // merge the flows in the bin
+  virtual std::shared_ptr<core::FlowFile> merge(core::ProcessContext *context, core::ProcessSession *session,
+      std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator) = 0;
+};
+
+// BinaryConcatenationMerge Class
+class BinaryConcatenationMerge : public MergeBin {
+public:
+  static const char *mimeType;
+  std::string getMergedContentType() {
+    return mimeType;
+  }
+  std::shared_ptr<core::FlowFile> merge(core::ProcessContext *context, core::ProcessSession *session,
+          std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator);
+  // Nest Callback Class for read stream
+  class ReadCallback : public InputStreamCallback {
+   public:
+    ReadCallback(uint64_t size, std::shared_ptr<io::BaseStream> stream)
+        : buffer_size_(size), stream_(stream) {
+    }
+    ~ReadCallback() {
+    }
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+      uint8_t buffer[buffer_size_];
+      int64_t ret = 0;
+      uint64_t read_size;
+      ret = stream->read(buffer, buffer_size_);
+      if (!stream)
+        read_size = stream->getSize();
+      else
+        read_size = buffer_size_;
+      ret = stream_->write(buffer, read_size);
+      return ret;
+    }
+    uint64_t buffer_size_;
+    std::shared_ptr<io::BaseStream> stream_;
+  };
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+    WriteCallback(std::string &header, std::string &footer, std::string &demarcator, std::deque<std::shared_ptr<core::FlowFile>> &flows, core::ProcessSession *session) :
+      header_(header), footer_(footer), demarcator_(demarcator), flows_(flows), session_(session) {
+    }
+    std::string &header_;
+    std::string &footer_;
+    std::string &demarcator_;
+    std::deque<std::shared_ptr<core::FlowFile>> &flows_;
+    core::ProcessSession *session_;
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+      int64_t ret = 0;
+      if (!header_.empty()) {
+        int64_t len = stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(header_.data())), header_.size());
+        if (len < 0)
+          return len;
+        ret += len;
+      }
+      bool isFirst = true;
+      for (auto flow : flows_) {
+        if (!isFirst && !demarcator_.empty()) {
+          int64_t len = stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(demarcator_.data())), demarcator_.size());
+          if (len < 0)
+            return len;
+          ret += len;
+        }
+        ReadCallback readCb(flow->getSize(), stream);
+        session_->read(flow, &readCb);
+        ret += flow->getSize();
+        isFirst = false;
+      }
+      if (!footer_.empty()) {
+        int64_t len = stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(footer_.data())), footer_.size());
+        if (len < 0)
+          return len;
+        ret += len;
+      }
+      return ret;
+    }
+  };
+};
+
+
+// MergeContent Class
+class MergeContent : public processors::BinFiles {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit MergeContent(std::string name, uuid_t uuid = NULL)
+      : processors::BinFiles(name, uuid),
+        logger_(logging::LoggerFactory<MergeContent>::getLogger()) {
+    mergeStratgey_ = MERGE_STRATEGY_DEFRAGMENT;
+    mergeFormat_ = MERGE_FORMAT_CONCAT_VALUE;
+    delimiterStratgey_ = DELIMITER_STRATEGY_FILENAME;
+    keepPath_ = false;
+  }
+  // Destructor
+  virtual ~MergeContent() {
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "MergeContent";
+  // Supported Properties
+  static core::Property MergeStrategy;
+  static core::Property MergeFormat;
+  static core::Property CorrelationAttributeName;
+  static core::Property DelimiterStratgey;
+  static core::Property KeepPath;
+  static core::Property Header;
+  static core::Property Footer;
+  static core::Property Demarcator;
+
+  // Supported Relationships
+  static core::Relationship Merge;
+
+ public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+  // OnTrigger method, implemented by NiFi MergeContent
+  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+  // Initialize, over write by NiFi MergeContent
+  virtual void initialize(void);
+  virtual bool processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin);
+
+ protected:
+  // Returns a group ID representing a bin. This allows flow files to be binned into like groups
+  virtual std::string getGroupId(core::ProcessContext *context, std::shared_ptr<core::FlowFile> flow);
+  // check whether the defragment bin is validate
+  bool checkDefragment(std::unique_ptr<Bin> &bin);
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+  std::string mergeStratgey_;
+  std::string mergeFormat_;
+  std::string correlationAttributeName_;
+  bool keepPath_;
+  std::string delimiterStratgey_;
+  std::string header_;
+  std::string footer_;
+  std::string demarcator_;
+  std::string headerContent_;
+  std::string footerContent_;
+  std::string demarcatorContent_;
+  // readContent
+  std::string readContent(std::string path);
+};
+
+REGISTER_RESOURCE(MergeContent);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index c69b361..b3035cb 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -50,6 +50,10 @@ std::shared_ptr<core::FlowFile> ProcessSession::create() {
   return record;
 }
 
+void ProcessSession::add(std::shared_ptr<core::FlowFile> &record) {
+  _addedFlowFiles[record->getUUIDStr()] = record;
+}
+
 std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::FlowFile> &&parent) {
   std::map<std::string, std::string> empty;
   std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/src/processors/BinFiles.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/BinFiles.cpp b/libminifi/src/processors/BinFiles.cpp
new file mode 100644
index 0000000..bd4afca
--- /dev/null
+++ b/libminifi/src/processors/BinFiles.cpp
@@ -0,0 +1,303 @@
+/**
+ * @file BinFiles.cpp
+ * BinFiles class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "processors/BinFiles.h"
+#include <stdio.h>
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
+#include <queue>
+#include <map>
+#include <deque>
+#include <utility>
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property BinFiles::MinSize("Minimum Group Size", "The minimum size of for the bundle", "0");
+core::Property BinFiles::MaxSize("Maximum Group Size", "The maximum size for the bundle. If not specified, there is no maximum.", "");
+core::Property BinFiles::MinEntries("Minimum Number of Entries", "The minimum number of files to include in a bundle", "1");
+core::Property BinFiles::MaxEntries("Maximum Number of Entries", "The maximum number of files to include in a bundle. If not specified, there is no maximum.", "");
+core::Property BinFiles::MaxBinAge("Max Bin Age", "The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit>", "");
+core::Property BinFiles::MaxBinCount("Maximum number of Bins", "Specifies the maximum number of bins that can be held in memory at any one time", "100");
+core::Relationship BinFiles::Original("original", "The FlowFiles that were used to create the bundle");
+core::Relationship BinFiles::Failure("failure", "If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure");
+const char *BinFiles::FRAGMENT_COUNT_ATTRIBUTE = "fragment.count";
+const char *BinFiles::FRAGMENT_ID_ATTRIBUTE = "fragment.identifier";
+const char *BinFiles::FRAGMENT_INDEX_ATTRIBUTE = "fragment.index";
+const char *BinFiles::SEGMENT_COUNT_ATTRIBUTE = "segment.count";
+const char *BinFiles::SEGMENT_ID_ATTRIBUTE = "segment.identifier";
+const char *BinFiles::SEGMENT_INDEX_ATTRIBUTE = "segment.index";
+const char *BinFiles::SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+
+void BinFiles::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(MinSize);
+  properties.insert(MaxSize);
+  properties.insert(MinEntries);
+  properties.insert(MaxEntries);
+  properties.insert(MaxBinAge);
+  properties.insert(MaxBinCount);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Original);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void BinFiles::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  if (context->getProperty(MinSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
+    this->binManager_.setMinSize(valInt);
+    logger_->log_info("BinFiles: MinSize [%d]", valInt);
+  }
+  value = "";
+  if (context->getProperty(MaxSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
+    this->binManager_.setMaxSize(valInt);
+    logger_->log_info("BinFiles: MaxSize [%d]", valInt);
+  }
+  value = "";
+  if (context->getProperty(MinEntries.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
+    this->binManager_.setMinEntries(valInt);
+    logger_->log_info("BinFiles: MinEntries [%d]", valInt);
+  }
+  value = "";
+  if (context->getProperty(MaxEntries.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
+    this->binManager_.setMaxEntries(valInt);
+    logger_->log_info("BinFiles: MaxEntries [%d]", valInt);
+  }
+  value = "";
+  if (context->getProperty(MaxBinCount.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
+    maxBinCount_ = static_cast<int> (valInt);
+    logger_->log_info("BinFiles: MaxBinCount [%d]", valInt);
+  }
+  value = "";
+  if (context->getProperty(MaxBinAge.getName(), value) && !value.empty()) {
+    core::TimeUnit unit;
+    if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
+      this->binManager_.setBinAge(valInt);
+      logger_->log_info("BinFiles: MaxBinAge [%d]", valInt);
+    }
+  }
+}
+
+void BinFiles::preprocessFlowFile(core::ProcessContext *context, core::ProcessSession *session, std::shared_ptr<core::FlowFile> flow) {
+  // handle backward compatibility with old segment attributes
+  std::string value;
+  if (!flow->getAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, value) && flow->getAttribute(BinFiles::SEGMENT_COUNT_ATTRIBUTE, value)) {
+    flow->setAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, value);
+  }
+  if (!flow->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value) && flow->getAttribute(BinFiles::SEGMENT_INDEX_ATTRIBUTE, value)) {
+    flow->setAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value);
+  }
+  if (!flow->getAttribute(BinFiles::FRAGMENT_ID_ATTRIBUTE, value) && flow->getAttribute(BinFiles::SEGMENT_ID_ATTRIBUTE, value)) {
+    flow->setAttribute(BinFiles::FRAGMENT_ID_ATTRIBUTE, value);
+  }
+}
+
+void BinManager::gatherReadyBins() {
+  std::lock_guard < std::mutex > lock(mutex_);
+  std::vector< std::string > emptyQueue;
+  for (std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>> >::iterator it=groupBinMap_.begin(); it !=groupBinMap_.end(); ++it) {
+    std::unique_ptr < std::deque<std::unique_ptr<Bin>>>&queue = it->second;
+    while (!queue->empty()) {
+      std::unique_ptr<Bin> &bin = queue->front();
+      if (bin->isReadyForMerge() || (binAge_ != ULLONG_MAX && bin->isOlderThan(binAge_))) {
+        readyBin_.push_back(std::move(bin));
+        queue->pop_front();
+        binCount_--;
+        logger_->log_info("BinManager move bin %s to ready bins for group %s", readyBin_.back()->getUUIDStr(), readyBin_.back()->getGroupId());
+      } else {
+        break;
+      }
+    }
+    if (queue->empty()) {
+      emptyQueue.push_back(it->first);
+    }
+  }
+  for (auto group : emptyQueue) {
+    // erase from the map if the queue is empty for the group
+    groupBinMap_.erase(group);
+  }
+  logger_->log_info("BinManager groupBinMap size %d", groupBinMap_.size());
+}
+
+void BinManager::removeOldestBin() {
+  std::lock_guard < std::mutex > lock(mutex_);
+  uint64_t olddate = ULLONG_MAX;
+  std::unique_ptr < std::deque<std::unique_ptr<Bin>>>*oldqueue;
+  for (std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>>>::iterator it=groupBinMap_.begin(); it !=groupBinMap_.end(); ++it) {
+    std::unique_ptr < std::deque<std::unique_ptr<Bin>>>&queue = it->second;
+    if (!queue->empty()) {
+      std::unique_ptr<Bin> &bin = queue->front();
+      if (bin->getBinAge() < olddate) {
+        olddate = bin->getBinAge();
+        oldqueue = &queue;
+      }
+    }
+  }
+  if (olddate != ULLONG_MAX) {
+    std::unique_ptr<Bin> &remove = (*oldqueue)->front();
+    std::string group = remove->getGroupId();
+    readyBin_.push_back(std::move(remove));
+    (*oldqueue)->pop_front();
+    binCount_--;
+    logger_->log_info("BinManager move bin %s to ready bins for group %s", readyBin_.back()->getUUIDStr(), readyBin_.back()->getGroupId());
+    if ((*oldqueue)->empty()) {
+      groupBinMap_.erase(group);
+    }
+  }
+  logger_->log_info("BinManager groupBinMap size %d", groupBinMap_.size());
+}
+
+void BinManager::getReadyBin(std::deque<std::unique_ptr<Bin>> &retBins) {
+  std::lock_guard < std::mutex > lock(mutex_);
+  while (!readyBin_.empty()) {
+    std::unique_ptr<Bin> &bin = readyBin_.front();
+    retBins.push_back(std::move(bin));
+    readyBin_.pop_front();
+  }
+}
+
+bool BinManager::offer(const std::string &group, std::shared_ptr<core::FlowFile> flow) {
+  std::lock_guard < std::mutex > lock(mutex_);
+  if (flow->getSize() > maxSize_) {
+    // could not be added to a bin -- too large by itself, so create a separate bin for just this guy.
+    std::unique_ptr<Bin> bin = std::unique_ptr < Bin > (new Bin(0, ULLONG_MAX, 1, INT_MAX, "", group));
+    if (!bin->offer(flow))
+      return false;
+    readyBin_.push_back(std::move(bin));
+    logger_->log_info("BinManager move bin %s to ready bins for group %s", readyBin_.back()->getUUIDStr(), group);
+    return true;
+  }
+  auto search = groupBinMap_.find(group);
+  if (search != groupBinMap_.end()) {
+    std::unique_ptr < std::deque<std::unique_ptr<Bin>>>&queue = search->second;
+    if (!queue->empty()) {
+      std::unique_ptr<Bin> &tail = queue->back();
+      if (!tail->offer(flow)) {
+        // last bin can not offer the flow
+        std::unique_ptr<Bin> bin = std::unique_ptr < Bin > (new Bin(minSize_, maxSize_, minEntries_, maxEntries_, fileCount_, group));
+        if (!bin->offer(flow))
+          return false;
+        queue->push_back(std::move(bin));
+        logger_->log_info("BinManager add bin %s to group %s", queue->back()->getUUIDStr(), group);
+        binCount_++;
+      }
+    } else {
+      std::unique_ptr<Bin> bin = std::unique_ptr < Bin > (new Bin(minSize_, maxSize_, minEntries_, maxEntries_, fileCount_, group));
+      if (!bin->offer(flow))
+        return false;
+      queue->push_back(std::move(bin));
+      binCount_++;
+      logger_->log_info("BinManager add bin %s to group %s", queue->back()->getUUIDStr(), group);
+    }
+  } else {
+    std::unique_ptr<std::deque<std::unique_ptr<Bin>>> queue = std::unique_ptr<std::deque<std::unique_ptr<Bin>>> (new std::deque<std::unique_ptr<Bin>>());
+    std::unique_ptr<Bin> bin = std::unique_ptr < Bin > (new Bin(minSize_, maxSize_, minEntries_, maxEntries_, fileCount_, group));
+    if (!bin->offer(flow))
+      return false;
+    queue->push_back(std::move(bin));
+    logger_->log_info("BinManager add bin %s to group %s", queue->back()->getUUIDStr(), group);
+    groupBinMap_.insert(std::make_pair(group, std::move(queue)));
+    binCount_++;
+  }
+
+  return true;
+}
+
+void BinFiles::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast < FlowFileRecord > (session->get());
+
+  if (flow != nullptr) {
+    preprocessFlowFile(context, session, flow);
+    std::string groupId = getGroupId(context, flow);
+
+    bool offer = this->binManager_.offer(groupId, flow);
+    if (!offer) {
+      session->transfer(flow, Failure);
+      context->yield();
+      return;
+    }
+
+    // remove the flowfile from the process session, it add to merge session later.
+    session->remove(flow);
+  }
+
+  // migrate bin to ready bin
+  this->binManager_.gatherReadyBins();
+  if (this->binManager_.getBinCount() > maxBinCount_) {
+    // bin count reach max allowed
+    context->yield();
+    logger_->log_info("BinFiles reach max bin count %d", this->binManager_.getBinCount());
+    this->binManager_.removeOldestBin();
+  }
+
+  // get the ready bin
+  std::deque<std::unique_ptr<Bin>> readyBins;
+  binManager_.getReadyBin(readyBins);
+
+  // process the ready bin
+  if (!readyBins.empty()) {
+    // create session for merge
+    core::ProcessSession mergeSession(context);
+    while (!readyBins.empty()) {
+      std::unique_ptr<Bin> bin = std::move(readyBins.front());
+      readyBins.pop_front();
+      // add bin's flows to the session
+      this->addFlowsToSession(context, &mergeSession, bin);
+      logger_->log_info("BinFiles start to process bin %s for group %s", bin->getUUIDStr(), bin->getGroupId());
+      if (!this->processBin(context, &mergeSession, bin))
+          this->transferFlowsToFail(context, &mergeSession, bin);
+    }
+    mergeSession.commit();
+  }
+}
+
+void BinFiles::transferFlowsToFail(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
+  std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
+  for (auto flow : flows) {
+    session->transfer(flow, Failure);
+  }
+  flows.clear();
+}
+
+void BinFiles::addFlowsToSession(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
+  std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
+  for (auto flow : flows) {
+    session->add(flow);
+  }
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/src/processors/MergeContent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/MergeContent.cpp b/libminifi/src/processors/MergeContent.cpp
new file mode 100644
index 0000000..ee1702f
--- /dev/null
+++ b/libminifi/src/processors/MergeContent.cpp
@@ -0,0 +1,283 @@
+/**
+ * @file MergeContent.cpp
+ * MergeContent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "processors/MergeContent.h"
+#include <stdio.h>
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
+#include <queue>
+#include <map>
+#include <deque>
+#include <utility>
+#include <algorithm>
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property MergeContent::MergeStrategy("Merge Strategy", "Defragment or Bin-Packing Algorithm", MERGE_STRATEGY_DEFRAGMENT);
+core::Property MergeContent::MergeFormat("Merge Format", "Merge Format", MERGE_FORMAT_CONCAT_VALUE);
+core::Property MergeContent::CorrelationAttributeName("Correlation Attribute Name", "Correlation Attribute Name", "");
+core::Property MergeContent::DelimiterStratgey("Delimiter Strategy", "Determines if Header, Footer, and Demarcator should point to files", DELIMITER_STRATEGY_FILENAME);
+core::Property MergeContent::Header("Header File", "Filename specifying the header to use", "");
+core::Property MergeContent::Footer("Footer File", "Filename specifying the footer to use", "");
+core::Property MergeContent::Demarcator("Demarcator File", "Filename specifying the demarcator to use", "");
+core::Property MergeContent::KeepPath("Keep Path", "If using the Zip or Tar Merge Format, specifies whether or not the FlowFiles' paths should be included in their entry", "false");
+core::Relationship MergeContent::Merge("merged", "The FlowFile containing the merged content");
+const char *BinaryConcatenationMerge::mimeType = "application/octet-stream";
+
+void MergeContent::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(MinSize);
+  properties.insert(MaxSize);
+  properties.insert(MinEntries);
+  properties.insert(MaxEntries);
+  properties.insert(MaxBinAge);
+  properties.insert(MaxBinCount);
+  properties.insert(MergeStrategy);
+  properties.insert(MergeFormat);
+  properties.insert(CorrelationAttributeName);
+  properties.insert(DelimiterStratgey);
+  properties.insert(Header);
+  properties.insert(Footer);
+  properties.insert(Demarcator);
+  properties.insert(KeepPath);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Original);
+  relationships.insert(Failure);
+  relationships.insert(Merge);
+  setSupportedRelationships(relationships);
+}
+
+std::string MergeContent::readContent(std::string path) {
+  std::string contents;
+  std::ifstream in(path.c_str(), std::ios::in | std::ios::binary);
+  if (in) {
+    in.seekg(0, std::ios::end);
+    contents.resize(in.tellg());
+    in.seekg(0, std::ios::beg);
+    in.read(&contents[0], contents.size());
+    in.close();
+  }
+  return (contents);
+}
+
+void MergeContent::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  BinFiles::onSchedule(context, sessionFactory);
+  if (context->getProperty(MergeStrategy.getName(), value) && !value.empty()) {
+    this->mergeStratgey_ = value;
+  }
+  value = "";
+  if (context->getProperty(MergeFormat.getName(), value) && !value.empty()) {
+    this->mergeFormat_ = value;
+  }
+  value = "";
+  if (context->getProperty(CorrelationAttributeName.getName(), value) && !value.empty()) {
+      this->correlationAttributeName_ = value;
+  }
+  value = "";
+  if (context->getProperty(DelimiterStratgey.getName(), value) && !value.empty()) {
+      this->delimiterStratgey_ = value;
+  }
+  value = "";
+  if (context->getProperty(Header.getName(), value) && !value.empty()) {
+      this->header_ = value;
+  }
+  value = "";
+  if (context->getProperty(Footer.getName(), value) && !value.empty()) {
+      this->footer_ = value;
+  }
+  value = "";
+  if (context->getProperty(Demarcator.getName(), value) && !value.empty()) {
+      this->demarcator_ = value;
+  }
+  value = "";
+  if (context->getProperty(KeepPath.getName(), value) && !value.empty()) {
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, keepPath_);
+  }
+  if (mergeStratgey_ == MERGE_STRATEGY_DEFRAGMENT) {
+    binManager_.setFileCount(FRAGMENT_COUNT_ATTRIBUTE);
+  }
+  logger_->log_info("Merge Content: Strategy [%s] Format [%s] Correlation Attribute [%s] Delimiter [%s]", mergeStratgey_, mergeFormat_, correlationAttributeName_, delimiterStratgey_);
+  logger_->log_info("Merge Content: Footer [%s] Header [%s] Demarcator [%s] KeepPath [%d]", footer_, header_, demarcator_, keepPath_);
+  if (delimiterStratgey_ == DELIMITER_STRATEGY_FILENAME) {
+    if (!header_.empty()) {
+      this->headerContent_ = readContent(header_);
+    }
+    if (!footer_.empty()) {
+       this->footerContent_ = readContent(footer_);
+    }
+    if (!demarcator_.empty()) {
+        this->demarcatorContent_ = readContent(demarcator_);
+    }
+  }
+  if (delimiterStratgey_ == DELIMITER_STRATEGY_TEXT) {
+    this->headerContent_ = header_;
+    this->footerContent_ = footer_;
+    this->demarcatorContent_ = demarcator_;
+  }
+}
+
+std::string MergeContent::getGroupId(core::ProcessContext *context, std::shared_ptr<core::FlowFile> flow) {
+  std::string groupId = "";
+  std::string value;
+  if (!correlationAttributeName_.empty()) {
+    if (flow->getAttribute(correlationAttributeName_, value))
+      groupId = value;
+  }
+  if (groupId.empty() && mergeStratgey_ == MERGE_STRATEGY_DEFRAGMENT) {
+    if (flow->getAttribute(FRAGMENT_ID_ATTRIBUTE, value))
+      groupId = value;
+  }
+  return groupId;
+}
+
+bool MergeContent::checkDefragment(std::unique_ptr<Bin> &bin) {
+  std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
+  if (!flows.empty()) {
+    std::shared_ptr<core::FlowFile> front = flows.front();
+    std::string fragId;
+    if (!front->getAttribute(BinFiles::FRAGMENT_ID_ATTRIBUTE, fragId))
+      return false;
+    std::string fragCount;
+    if (!front->getAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, fragCount))
+      return false;
+    int fragCountInt;
+    try {
+      fragCountInt = std::stoi(fragCount);
+    }
+    catch (...) {
+      return false;
+    }
+    for (auto flow : flows) {
+      std::string value;
+      if (!flow->getAttribute(BinFiles::FRAGMENT_ID_ATTRIBUTE, value))
+          return false;
+      if (value != fragId)
+        return false;
+      if (!flow->getAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, value))
+        return false;
+      if (value != fragCount)
+        return false;
+      if (!flow->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value))
+        return false;
+      try {
+        int index = std::stoi(value);
+        if (index < 0 || index >= fragCountInt)
+          return false;
+      }
+      catch (...) {
+        return false;
+      }
+    }
+  } else {
+    return false;
+  }
+
+  return true;
+}
+
+void MergeContent::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  BinFiles::onTrigger(context, session);
+}
+
+bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
+  if (mergeStratgey_ != MERGE_STRATEGY_DEFRAGMENT && mergeStratgey_ != MERGE_STRATEGY_BIN_PACK)
+    return false;
+
+  if (mergeStratgey_ == MERGE_STRATEGY_DEFRAGMENT) {
+    // check the flowfile fragment values
+    if (!checkDefragment(bin)) {
+      logger_->log_error("Merge Content check defgrament failed");
+      return false;
+    }
+    // sort the flowfile fragment index
+    std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
+    std::sort(flows.begin(), flows.end(), [] (const std::shared_ptr<core::FlowFile> &first, const std::shared_ptr<core::FlowFile> &second)
+        {std::string value;
+         first->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value);
+         int indexFirst = std::stoi(value);
+         second->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value);
+         int indexSecond = std::stoi(value);
+         if (indexSecond > indexFirst)
+           return true;
+         else
+           return false;
+        });
+  }
+
+  std::unique_ptr<MergeBin> mergeBin;
+  if (mergeFormat_ == MERGE_FORMAT_CONCAT_VALUE) {
+    mergeBin = std::unique_ptr<MergeBin> (new BinaryConcatenationMerge());
+    std::shared_ptr<core::FlowFile> mergeFlow;
+    try {
+      mergeFlow = mergeBin->merge(context, session, bin->getFlowFile(), this->headerContent_, this->footerContent_, this->demarcatorContent_);
+    } catch (...) {
+      logger_->log_error("Merge Content merge catch exception");
+      return false;
+    }
+    session->putAttribute(mergeFlow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(bin->getSize()));
+    // we successfully merge the flow
+    session->transfer(mergeFlow, Merge);
+    std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
+    for (auto flow : flows) {
+      session->transfer(flow, Original);
+    }
+    logger_->log_info("Merge FlowFile record UUID %s, payload length %d", mergeFlow->getUUIDStr(), mergeFlow->getSize());
+  } else {
+    logger_->log_error("Merge format not supported %s", mergeFormat_);
+    return false;
+  }
+  return true;
+}
+
+std::shared_ptr<core::FlowFile> BinaryConcatenationMerge::merge(core::ProcessContext *context, core::ProcessSession *session,
+        std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator) {
+  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
+  BinaryConcatenationMerge::WriteCallback callback(header, footer, demarcator, flows, session);
+  session->write(flowFile, &callback);
+  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType());
+  std::string fileName;
+  if (flows.size() == 1) {
+    flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  } else {
+    flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName);
+  }
+  if (!fileName.empty())
+    session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
+  return flowFile;
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8b3b1c88/libminifi/test/unit/MergeFileTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/MergeFileTests.cpp b/libminifi/test/unit/MergeFileTests.cpp
new file mode 100644
index 0000000..d3e4fa9
--- /dev/null
+++ b/libminifi/test/unit/MergeFileTests.cpp
@@ -0,0 +1,720 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN  // This tells Catch to provide a main() - only do this in one cpp file
+#include <uuid/uuid.h>
+#include <fstream>
+#include <map>
+#include <memory>
+#include <utility>
+#include <string>
+#include <set>
+#include "FlowController.h"
+#include "../TestBase.h"
+#include "core/Core.h"
+#include "../../include/core/FlowFile.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include "processors/MergeContent.h"
+#include <sstream>
+#include <iostream>
+
+static const char* FLOW_FILE = "/tmp/minifi-mergecontent";
+static const char* EXPECT_MERGE_CONTENT_FIRST = "/tmp/minifi-expect-mergecontent1.txt";
+static const char* EXPECT_MERGE_CONTENT_SECOND = "/tmp/minifi-expect-mergecontent2.txt";
+static const char* HEADER_FILE = "/tmp/minifi-mergecontent.header";
+static const char* FOOTER_FILE = "/tmp/minifi-mergecontent.footer";
+static const char* DEMARCATOR_FILE = "/tmp/minifi-mergecontent.demarcator";
+
+class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
+ public:
+  explicit ReadCallback(uint64_t size) :
+      read_size_(0) {
+    buffer_size_ = size;
+    buffer_ = new uint8_t[buffer_size_];
+  }
+  ~ReadCallback() {
+    if (buffer_)
+      delete[] buffer_;
+  }
+  int64_t process(std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> stream) {
+    int64_t ret = 0;
+    ret = stream->read(buffer_, buffer_size_);
+    if (!stream)
+      read_size_ = stream->getSize();
+    else
+      read_size_ = buffer_size_;
+    return ret;
+  }
+  uint8_t *buffer_;
+  uint64_t buffer_size_;
+  uint64_t read_size_;
+};
+
+TEST_CASE("MergeFileDefragment", "[mergefiletest1]") {
+  try {
+    std::ofstream expectfileFirst;
+    std::ofstream expectfileSecond;
+
+    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
+    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+
+    // Create and write to the test file
+    for (int i = 0; i < 6; i++) {
+      std::ofstream tmpfile;
+      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+      tmpfile.open(flowFileName.c_str());
+      for (int j = 0; j < 32; j++) {
+        tmpfile << std::to_string(i);
+        if (i < 3)
+          expectfileFirst << std::to_string(i);
+        else
+          expectfileSecond << std::to_string(i);
+      }
+      tmpfile.close();
+    }
+    expectfileFirst.close();
+    expectfileSecond.close();
+
+    TestController testController;
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+
+    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
+    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+    uuid_t processoruuid;
+    REQUIRE(true == processor->getUUID(processoruuid));
+    uuid_t logAttributeuuid;
+    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+    // connection from merge processor to log attribute
+    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+    connection->setRelationship(core::Relationship("merged", "Merge successful output"));
+    connection->setSource(processor);
+    connection->setDestination(logAttributeProcessor);
+    connection->setSourceUUID(processoruuid);
+    connection->setDestinationUUID(logAttributeuuid);
+    processor->addConnection(connection);
+    // connection to merge processor
+    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
+    mergeconnection->setDestination(processor);
+    mergeconnection->setDestinationUUID(processoruuid);
+    processor->addConnection(mergeconnection);
+
+    std::set<core::Relationship> autoTerminatedRelationships;
+    core::Relationship original("original", "");
+    core::Relationship failure("failure", "");
+    autoTerminatedRelationships.insert(original);
+    autoTerminatedRelationships.insert(failure);
+    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+    processor->incrementActiveTasks();
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    logAttributeProcessor->incrementActiveTasks();
+    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+
+    core::ProcessorNode node(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+    core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
+
+    core::ProcessSession sessionGenFlowFile(&context);
+    std::shared_ptr<core::FlowFile> record[6];
+
+    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+    std::shared_ptr<core::Connectable> income = node.getNextIncomingConnection();
+    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
+    for (int i = 0; i < 6; i++) {
+      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+      sessionGenFlowFile.import(flowFileName, flow, true, 0);
+      // three bundle
+      if (i < 3)
+        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
+      else
+        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
+      if (i < 3)
+        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
+      else
+        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
+      flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
+      record[i] = flow;
+    }
+    income_connection->put(record[0]);
+    income_connection->put(record[2]);
+    income_connection->put(record[5]);
+    income_connection->put(record[4]);
+    income_connection->put(record[1]);
+    income_connection->put(record[3]);
+
+    REQUIRE(processor->getName() == "mergecontent");
+    core::ProcessSessionFactory factory(&context);
+    processor->onSchedule(&context, &factory);
+    for (int i = 0; i < 6; i++) {
+      core::ProcessSession session(&context);
+      processor->onTrigger(&context, &session);
+      session.commit();
+    }
+    // validate the merge content
+    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
+    std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
+    REQUIRE(flow1->getSize() == 96);
+    {
+      ReadCallback callback(flow1->getSize());
+      sessionGenFlowFile.read(flow1, &callback);
+      std::ifstream file1;
+      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+      REQUIRE(expectContents == contents);
+      file1.close();
+    }
+    REQUIRE(flow2->getSize() == 96);
+    {
+      ReadCallback callback(flow2->getSize());
+      sessionGenFlowFile.read(flow2, &callback);
+      std::ifstream file2;
+      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
+      std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+      REQUIRE(expectContents == contents);
+      file2.close();
+    }
+    LogTestController::getInstance().reset();
+    for (int i = 0; i < 6; i++) {
+      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+      unlink(flowFileName.c_str());
+    }
+    unlink(EXPECT_MERGE_CONTENT_FIRST);
+    unlink(EXPECT_MERGE_CONTENT_SECOND);
+  } catch (...) {
+  }
+}
+
+TEST_CASE("MergeFileDefragmentDelimiter", "[mergefiletest2]") {
+  try {
+    std::ofstream expectfileFirst;
+    std::ofstream expectfileSecond;
+    std::ofstream headerfile, footerfile, demarcatorfile;
+    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
+    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+    headerfile.open(HEADER_FILE);
+    headerfile << "header";
+    expectfileFirst << "header";
+    expectfileSecond << "header";
+    headerfile.close();
+    footerfile.open(FOOTER_FILE);
+    footerfile << "footer";
+    footerfile.close();
+    demarcatorfile.open(DEMARCATOR_FILE);
+    demarcatorfile << "demarcator";
+    demarcatorfile.close();
+
+    // Create and write to the test file
+    for (int i = 0; i < 6; i++) {
+      if (i != 0 && i <= 2)
+        expectfileFirst << "demarcator";
+      if (i != 3 && i >= 4)
+        expectfileSecond << "demarcator";
+      std::ofstream tmpfile;
+      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+      tmpfile.open(flowFileName.c_str());
+      for (int j = 0; j < 32; j++) {
+        tmpfile << std::to_string(i);
+        if (i < 3)
+          expectfileFirst << std::to_string(i);
+        else
+          expectfileSecond << std::to_string(i);
+      }
+      tmpfile.close();
+    }
+    expectfileFirst << "footer";
+    expectfileSecond << "footer";
+    expectfileFirst.close();
+    expectfileSecond.close();
+
+    TestController testController;
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+
+    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
+    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+    uuid_t processoruuid;
+    REQUIRE(true == processor->getUUID(processoruuid));
+    uuid_t logAttributeuuid;
+    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+    // connection from merge processor to log attribute
+    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+    connection->setRelationship(core::Relationship("merged", "Merge successful output"));
+    connection->setSource(processor);
+    connection->setDestination(logAttributeProcessor);
+    connection->setSourceUUID(processoruuid);
+    connection->setDestinationUUID(logAttributeuuid);
+    processor->addConnection(connection);
+    // connection to merge processor
+    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
+    mergeconnection->setDestination(processor);
+    mergeconnection->setDestinationUUID(processoruuid);
+    processor->addConnection(mergeconnection);
+
+    std::set<core::Relationship> autoTerminatedRelationships;
+    core::Relationship original("original", "");
+    core::Relationship failure("failure", "");
+    autoTerminatedRelationships.insert(original);
+    autoTerminatedRelationships.insert(failure);
+    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+    processor->incrementActiveTasks();
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    logAttributeProcessor->incrementActiveTasks();
+    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+
+    core::ProcessorNode node(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+    core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_FILENAME);
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::Header, "/tmp/minifi-mergecontent.header");
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer, "/tmp/minifi-mergecontent.footer");
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, "/tmp/minifi-mergecontent.demarcator");
+
+    core::ProcessSession sessionGenFlowFile(&context);
+    std::shared_ptr<core::FlowFile> record[6];
+
+    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+    std::shared_ptr<core::Connectable> income = node.getNextIncomingConnection();
+    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
+    for (int i = 0; i < 6; i++) {
+      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+      sessionGenFlowFile.import(flowFileName, flow, true, 0);
+      // three bundle
+      if (i < 3)
+        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
+      else
+        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
+      if (i < 3)
+        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
+      else
+        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
+      flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
+      record[i] = flow;
+    }
+    income_connection->put(record[0]);
+    income_connection->put(record[2]);
+    income_connection->put(record[5]);
+    income_connection->put(record[4]);
+    income_connection->put(record[1]);
+    income_connection->put(record[3]);
+
+    REQUIRE(processor->getName() == "mergecontent");
+    core::ProcessSessionFactory factory(&context);
+    processor->onSchedule(&context, &factory);
+    for (int i = 0; i < 6; i++) {
+      core::ProcessSession session(&context);
+      processor->onTrigger(&context, &session);
+      session.commit();
+    }
+    // validate the merge content
+    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
+    std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
+    REQUIRE(flow1->getSize() == 128);
+    {
+      ReadCallback callback(flow1->getSize());
+      sessionGenFlowFile.read(flow1, &callback);
+      std::ifstream file1;
+      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+      REQUIRE(expectContents == contents);
+      file1.close();
+    }
+    REQUIRE(flow2->getSize() == 128);
+    {
+      ReadCallback callback(flow2->getSize());
+      sessionGenFlowFile.read(flow2, &callback);
+      std::ifstream file2;
+      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
+      std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+      REQUIRE(expectContents == contents);
+      file2.close();
+    }
+    LogTestController::getInstance().reset();
+    for (int i = 0; i < 6; i++) {
+      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+      unlink(flowFileName.c_str());
+    }
+    unlink(EXPECT_MERGE_CONTENT_FIRST);
+    unlink(EXPECT_MERGE_CONTENT_SECOND);
+    unlink(FOOTER_FILE);
+    unlink(HEADER_FILE);
+    unlink(DEMARCATOR_FILE);
+  } catch (...) {
+  }
+}
+
+TEST_CASE("MergeFileDefragmentDropFlow", "[mergefiletest3]") {
+  try {
+    std::ofstream expectfileFirst;
+    std::ofstream expectfileSecond;
+
+    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
+    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+
+    // Create and write to the test file, drop record 4
+    for (int i = 0; i < 6; i++) {
+      if (i == 4)
+        continue;
+      std::ofstream tmpfile;
+      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+      tmpfile.open(flowFileName.c_str());
+      for (int j = 0; j < 32; j++) {
+        tmpfile << std::to_string(i);
+        if (i < 3)
+          expectfileFirst << std::to_string(i);
+        else
+          expectfileSecond << std::to_string(i);
+      }
+      tmpfile.close();
+    }
+    expectfileFirst.close();
+    expectfileSecond.close();
+
+    TestController testController;
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+
+    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
+    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+    uuid_t processoruuid;
+    REQUIRE(true == processor->getUUID(processoruuid));
+    uuid_t logAttributeuuid;
+    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+    // connection from merge processor to log attribute
+    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+    connection->setRelationship(core::Relationship("merged", "Merge successful output"));
+    connection->setSource(processor);
+    connection->setDestination(logAttributeProcessor);
+    connection->setSourceUUID(processoruuid);
+    connection->setDestinationUUID(logAttributeuuid);
+    processor->addConnection(connection);
+    // connection to merge processor
+    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
+    mergeconnection->setDestination(processor);
+    mergeconnection->setDestinationUUID(processoruuid);
+    processor->addConnection(mergeconnection);
+
+    std::set<core::Relationship> autoTerminatedRelationships;
+    core::Relationship original("original", "");
+    core::Relationship failure("failure", "");
+    autoTerminatedRelationships.insert(original);
+    autoTerminatedRelationships.insert(failure);
+    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+    processor->incrementActiveTasks();
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    logAttributeProcessor->incrementActiveTasks();
+    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+
+    core::ProcessorNode node(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+    core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec");
+
+    core::ProcessSession sessionGenFlowFile(&context);
+    std::shared_ptr<core::FlowFile> record[6];
+
+    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+    std::shared_ptr<core::Connectable> income = node.getNextIncomingConnection();
+    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
+    for (int i = 0; i < 6; i++) {
+      if (i == 4)
+        continue;
+      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+      sessionGenFlowFile.import(flowFileName, flow, true, 0);
+      // three bundle
+      if (i < 3)
+        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
+      else
+        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
+      if (i < 3)
+        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
+      else
+        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
+      flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
+      record[i] = flow;
+    }
+    income_connection->put(record[0]);
+    income_connection->put(record[2]);
+    income_connection->put(record[5]);
+    income_connection->put(record[1]);
+    income_connection->put(record[3]);
+
+    REQUIRE(processor->getName() == "mergecontent");
+    core::ProcessSessionFactory factory(&context);
+    processor->onSchedule(&context, &factory);
+    for (int i = 0; i < 6; i++) {
+      if (i == 4)
+        continue;
+      core::ProcessSession session(&context);
+      processor->onTrigger(&context, &session);
+      session.commit();
+    }
+    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
+    {
+      core::ProcessSession session(&context);
+      processor->onTrigger(&context, &session);
+    }
+    // validate the merge content
+    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
+    std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
+    REQUIRE(flow1->getSize() == 96);
+    {
+      ReadCallback callback(flow1->getSize());
+      sessionGenFlowFile.read(flow1, &callback);
+      std::ifstream file1;
+      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+      REQUIRE(expectContents == contents);
+      file1.close();
+    }
+    REQUIRE(flow2->getSize() == 64);
+    {
+      ReadCallback callback(flow2->getSize());
+      sessionGenFlowFile.read(flow2, &callback);
+      std::ifstream file2;
+      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
+      std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+      REQUIRE(expectContents == contents);
+      file2.close();
+    }
+    LogTestController::getInstance().reset();
+    for (int i = 0; i < 6; i++) {
+      if (i == 4)
+        continue;
+      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+      unlink(flowFileName.c_str());
+    }
+    unlink(EXPECT_MERGE_CONTENT_FIRST);
+    unlink(EXPECT_MERGE_CONTENT_SECOND);
+  } catch (...) {
+  }
+}
+
+TEST_CASE("MergeFileBinPack", "[mergefiletest4]") {
+  try {
+    std::ofstream expectfileFirst;
+    std::ofstream expectfileSecond;
+    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
+    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+
+    // Create and write to the test file
+    for (int i = 0; i < 6; i++) {
+      std::ofstream tmpfile;
+      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+      tmpfile.open(flowFileName.c_str());
+      for (int j = 0; j < 32; j++) {
+        tmpfile << std::to_string(i);
+        if (i < 3)
+          expectfileFirst << std::to_string(i);
+        else
+          expectfileSecond << std::to_string(i);
+      }
+      tmpfile.close();
+    }
+    expectfileFirst.close();
+    expectfileSecond.close();
+
+    TestController testController;
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+
+    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
+    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+    uuid_t processoruuid;
+    REQUIRE(true == processor->getUUID(processoruuid));
+    uuid_t logAttributeuuid;
+    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+    // connection from merge processor to log attribute
+    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+    connection->setRelationship(core::Relationship("merged", "Merge successful output"));
+    connection->setSource(processor);
+    connection->setDestination(logAttributeProcessor);
+    connection->setSourceUUID(processoruuid);
+    connection->setDestinationUUID(logAttributeuuid);
+    processor->addConnection(connection);
+    // connection to merge processor
+    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
+    mergeconnection->setDestination(processor);
+    mergeconnection->setDestinationUUID(processoruuid);
+    processor->addConnection(mergeconnection);
+
+    std::set<core::Relationship> autoTerminatedRelationships;
+    core::Relationship original("original", "");
+    core::Relationship failure("failure", "");
+    autoTerminatedRelationships.insert(original);
+    autoTerminatedRelationships.insert(failure);
+    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+    processor->incrementActiveTasks();
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    logAttributeProcessor->incrementActiveTasks();
+    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+
+    core::ProcessorNode node(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+    core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
+    context.setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+
+    core::ProcessSession sessionGenFlowFile(&context);
+    std::shared_ptr<core::FlowFile> record[6];
+
+    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+    std::shared_ptr<core::Connectable> income = node.getNextIncomingConnection();
+    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
+    for (int i = 0; i < 6; i++) {
+      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+      sessionGenFlowFile.import(flowFileName, flow, true, 0);
+      flow->setAttribute("tag", "tag");
+      record[i] = flow;
+    }
+    income_connection->put(record[0]);
+    income_connection->put(record[1]);
+    income_connection->put(record[2]);
+    income_connection->put(record[3]);
+    income_connection->put(record[4]);
+    income_connection->put(record[5]);
+
+    REQUIRE(processor->getName() == "mergecontent");
+    core::ProcessSessionFactory factory(&context);
+    processor->onSchedule(&context, &factory);
+    for (int i = 0; i < 6; i++) {
+      core::ProcessSession session(&context);
+      processor->onTrigger(&context, &session);
+      session.commit();
+    }
+    // validate the merge content
+    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
+    std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
+    REQUIRE(flow1->getSize() == 96);
+    {
+      ReadCallback callback(flow1->getSize());
+      sessionGenFlowFile.read(flow1, &callback);
+      std::ifstream file1;
+      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+      REQUIRE(expectContents == contents);
+      file1.close();
+    }
+    REQUIRE(flow2->getSize() == 96);
+    {
+      ReadCallback callback(flow2->getSize());
+      sessionGenFlowFile.read(flow2, &callback);
+      std::ifstream file2;
+      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
+      std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+      REQUIRE(expectContents == contents);
+      file2.close();
+    }
+    LogTestController::getInstance().reset();
+    for (int i = 0; i < 6; i++) {
+      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+      unlink(flowFileName.c_str());
+    }
+    unlink(EXPECT_MERGE_CONTENT_FIRST);
+    unlink(EXPECT_MERGE_CONTENT_SECOND);
+  } catch (...) {
+  }
+}
+
+