You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/09/23 00:34:51 UTC

[nifi-minifi-cpp] branch master updated (86d5d3f -> 6fe6ae8)

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git.


    from 86d5d3f  MINIFICPP-1031 - RemoteProcessorGroupPort::setURL fails in case port is not specified in URL
     add decf568  MINIFICPP-621 Add initial tailfile nanofi example
     new 8e40c3f  Merge remote-tracking branch 'apache/master'
     new 6fe6ae8  MINIFICPP-927 Add delimited tailfile processor

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 libminifi/include/core/ContentRepository.h         |   2 +-
 libminifi/include/core/ProcessContext.h            |  18 +-
 libminifi/include/core/StreamManager.h             |   2 +-
 libminifi/include/utils/file/FileUtils.h           |  49 +-
 .../src/core/repository/FileSystemRepository.cpp   |   2 +-
 nanofi/CMakeLists.txt                              |   9 +-
 .../bootstrap => nanofi/ecu}/CMakeLists.txt        |  29 +-
 nanofi/ecu/log_aggregator.c                        |  82 ++++
 nanofi/ecu/tailfile_chunk.c                        |  77 +++
 nanofi/ecu/tailfile_delimited.c                    |  77 +++
 nanofi/examples/CMakeLists.txt                     |   4 -
 nanofi/examples/tail_file.c                        | 257 ----------
 nanofi/include/api/ecu.h                           |  95 ++++
 nanofi/include/api/nanofi.h                        |  38 +-
 nanofi/include/core/cstructs.h                     |  37 +-
 nanofi/include/core/file_utils.h                   |  78 +++
 nanofi/include/core/flowfiles.h                    |  51 ++
 nanofi/include/core/string_utils.h                 |  61 ++-
 nanofi/include/cxx/Instance.h                      |  19 +-
 nanofi/src/api/ecu.c                               | 530 +++++++++++++++++++++
 nanofi/src/api/nanofi.cpp                          |  96 +++-
 nanofi/src/core/file_utils.c                       | 141 ++++++
 nanofi/src/core/flowfiles.c                        | 164 +++++++
 nanofi/src/core/string_utils.c                     | 154 ++++--
 nanofi/tests/CAPITests.cpp                         |  12 +-
 nanofi/tests/CLogAggregatorTests.cpp               | 365 ++++++++++++++
 nanofi/tests/CTailFileChunkTests.cpp               | 135 ++++++
 nanofi/tests/CTailFileDelimitedTests.cpp           | 256 ++++++++++
 nanofi/tests/CTestsBase.h                          | 141 ++++++
 29 files changed, 2611 insertions(+), 370 deletions(-)
 copy {extensions/bootstrap => nanofi/ecu}/CMakeLists.txt (54%)
 create mode 100644 nanofi/ecu/log_aggregator.c
 create mode 100644 nanofi/ecu/tailfile_chunk.c
 create mode 100644 nanofi/ecu/tailfile_delimited.c
 delete mode 100644 nanofi/examples/tail_file.c
 create mode 100644 nanofi/include/api/ecu.h
 create mode 100644 nanofi/include/core/file_utils.h
 create mode 100644 nanofi/include/core/flowfiles.h
 create mode 100644 nanofi/src/api/ecu.c
 create mode 100644 nanofi/src/core/file_utils.c
 create mode 100644 nanofi/src/core/flowfiles.c
 create mode 100644 nanofi/tests/CLogAggregatorTests.cpp
 create mode 100644 nanofi/tests/CTailFileChunkTests.cpp
 create mode 100644 nanofi/tests/CTailFileDelimitedTests.cpp
 create mode 100644 nanofi/tests/CTestsBase.h


[nifi-minifi-cpp] 02/02: MINIFICPP-927 Add delimited tailfile processor

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 6fe6ae8d4743643b474214ace79e86d8b60c724b
Author: Murtuza Shareef <ms...@cloudera.com>
AuthorDate: Mon Jun 10 18:15:48 2019 -0400

    MINIFICPP-927 Add delimited tailfile processor
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #613
---
 libminifi/include/core/ContentRepository.h         |   2 +-
 libminifi/include/core/ProcessContext.h            |  18 +-
 libminifi/include/core/StreamManager.h             |   2 +-
 libminifi/include/utils/file/FileUtils.h           |  49 +-
 .../src/core/repository/FileSystemRepository.cpp   |   2 +-
 nanofi/CMakeLists.txt                              |   6 +-
 nanofi/ecu/CMakeLists.txt                          |  39 +-
 nanofi/ecu/log_aggregator.c                        |  82 ++++
 nanofi/ecu/tail_file.c                             | 206 --------
 nanofi/ecu/tailfile_chunk.c                        |  77 +++
 nanofi/ecu/tailfile_delimited.c                    |  77 +++
 nanofi/examples/CMakeLists.txt                     |   4 -
 nanofi/examples/tail_file.c                        | 257 ----------
 nanofi/include/api/ecu.h                           |  95 ++++
 nanofi/include/api/nanofi.h                        |  38 +-
 nanofi/include/core/cstructs.h                     |  21 +-
 nanofi/include/core/file_utils.h                   |  50 +-
 nanofi/include/core/flowfiles.h                    |  27 +-
 nanofi/include/cxx/Instance.h                      |  19 +-
 nanofi/src/api/ecu.c                               | 530 +++++++++++++++++++++
 nanofi/src/api/nanofi.cpp                          |  96 +++-
 nanofi/src/core/file_utils.c                       | 136 ++++--
 nanofi/src/core/flowfiles.c                        | 149 +++++-
 nanofi/tests/CAPITests.cpp                         |  12 +-
 ...{CTailFileTests.cpp => CLogAggregatorTests.cpp} | 196 ++++----
 nanofi/tests/CTailFileChunkTests.cpp               | 135 ++++++
 nanofi/tests/CTailFileDelimitedTests.cpp           | 256 ++++++++++
 nanofi/tests/CTestsBase.h                          | 141 ++++++
 28 files changed, 2016 insertions(+), 706 deletions(-)

diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index 348725e..c3bec9f 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -46,7 +46,7 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim> {
    */
   virtual bool initialize(const std::shared_ptr<Configure> &configure) = 0;
 
-  virtual std::string getStoragePath() {
+  virtual std::string getStoragePath() const {
     return directory_;
   }
 
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index 424e306..be8ed91 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -59,7 +59,9 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
         flow_repo_(flow_repo),
         content_repo_(content_repo),
         processor_node_(processor),
-        logger_(logging::LoggerFactory<ProcessContext>::getLogger()) {
+        logger_(logging::LoggerFactory<ProcessContext>::getLogger()),
+        configure_(std::make_shared<minifi::Configure>()),
+        initialized_(false) {
     repo_ = repo;
   }
 
@@ -75,7 +77,8 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
         flow_repo_(flow_repo),
         content_repo_(content_repo),
         processor_node_(processor),
-        logger_(logging::LoggerFactory<ProcessContext>::getLogger()) {
+        logger_(logging::LoggerFactory<ProcessContext>::getLogger()),
+        initialized_(false) {
     repo_ = repo;
   }
   // Destructor
@@ -197,6 +200,15 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
     return controller_service_provider_->getControllerServiceName(identifier);
   }
 
+  void initializeContentRepository(const std::string& home) {
+      configure_->setHome(home);
+      content_repo_->initialize(configure_);
+      initialized_ = true;
+  }
+
+  bool isInitialized() const {
+      return initialized_;
+  }
  private:
 
   template<typename T>
@@ -217,7 +229,9 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
 
   // Logger
   std::shared_ptr<logging::Logger> logger_;
+  std::shared_ptr<Configure> configure_;
 
+  bool initialized_;
 };
 
 } /* namespace core */
diff --git a/libminifi/include/core/StreamManager.h b/libminifi/include/core/StreamManager.h
index 65e0414..a2a2e78 100644
--- a/libminifi/include/core/StreamManager.h
+++ b/libminifi/include/core/StreamManager.h
@@ -41,7 +41,7 @@ class StreamManager {
 
   }
 
-  virtual std::string getStoragePath() = 0;
+  virtual std::string getStoragePath() const = 0;
 
   /**
    * Create a write stream using the streamId as a reference.
diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h
index 91fc130..bedfee8 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -282,7 +282,15 @@ class FileUtils {
   }
 #endif
 
-  static int create_dir(const std::string &path, bool create = true) {
+  static int is_directory(const char * path) {
+      struct stat dir_stat;
+      if (stat(path, &dir_stat) < 0) {
+          return 0;
+      }
+      return S_ISDIR(dir_stat.st_mode);
+  }
+
+  static int create_dir(const std::string& path, bool recursive = true) {
 #ifdef BOOST_VERSION
     boost::filesystem::path dir(path);
     if(boost::filesystem::create_directory(dir))
@@ -301,12 +309,41 @@ class FileUtils {
       return 0;
     }
 #else
-    struct stat dir_stat;
-    if (stat(path.c_str(), &dir_stat)) {
-      if (mkdir(path.c_str(), 0700) != 0 && errno != EEXIST) {
+    if (!recursive) {
+        if (mkdir(path.c_str(), 0700) != 0 && errno != EEXIST) {
+            return -1;
+        }
+        return 0;
+    }
+
+    int ret = mkdir(path.c_str(), 0700);
+    if (ret == 0) {
+        return 0;
+    }
+
+    switch (errno) {
+    case ENOENT: {
+        size_t found = path.find_last_of(get_separator(0));
+
+        if (found == std::string::npos) {
+            return -1;
+        }
+
+        const std::string dir = path.substr(0, found);
+        int res = create_dir(dir);
+        if (res < 0) {
+            return -1;
+        }
+        return mkdir(path.c_str(), 0700);
+    }
+    case EEXIST: {
+        if (is_directory(path.c_str())) {
+            return 0;
+        }
+        return -1;
+    }
+    default:
         return -1;
-      }
-      return 0;
     }
 #endif
     return -1;
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp
index 4607d74..4f9b83d 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -34,7 +34,7 @@ bool FileSystemRepository::initialize(const std::shared_ptr<minifi::Configure> &
   if (configuration->get(Configure::nifi_dbcontent_repository_directory_default, value)) {
     directory_ = value;
   } else {
-    directory_ = configuration->getHome() + "/contentrepository";
+    directory_ = configuration->getHome();
   }
   utils::file::FileUtils::create_dir(directory_);
   return true;
diff --git a/nanofi/CMakeLists.txt b/nanofi/CMakeLists.txt
index 9e4e1e8..7110519 100644
--- a/nanofi/CMakeLists.txt
+++ b/nanofi/CMakeLists.txt
@@ -33,9 +33,11 @@ else()
 include_directories(../libminifi/opsys/posix)
 endif()
 
-file(GLOB NANOFI_SOURCES  "src/api/*.cpp" "src/core/*.c*" "src/cxx/*.cpp" "src/sitetosite/*.c*")
+file(GLOB NANOFI_SOURCES "src/api/*.c*" "src/core/*.c*" "src/cxx/*.cpp" "src/sitetosite/*.c*")
 
-file(GLOB NANOFI_EXAMPLES_SOURCES  "examples/*.c" )
+if(WIN32)
+list(REMOVE_ITEM NANOFI_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/api/ecu.c ${CMAKE_CURRENT_SOURCE_DIR}/src/core/file_utils.c ${CMAKE_CURRENT_SOURCE_DIR}/src/core/flowfiles.c)
+endif()
 
 file(GLOB NANOFI_ECU_SOURCES "ecu/*.c")
 
diff --git a/nanofi/ecu/CMakeLists.txt b/nanofi/ecu/CMakeLists.txt
index b28af76..fccb443 100644
--- a/nanofi/ecu/CMakeLists.txt
+++ b/nanofi/ecu/CMakeLists.txt
@@ -19,33 +19,6 @@
 
 cmake_minimum_required(VERSION 2.6)
 
-IF(POLICY CMP0048)
-  CMAKE_POLICY(SET CMP0048 OLD)
-ENDIF(POLICY CMP0048)
-
-include_directories(/include)
-
-include(CheckCXXCompilerFlag)
-if (WIN32)
-  if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900"))
-	    CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported)
-	    if (_cpp_latest_flag_supported)
-	        add_compile_options("/std:c++14")
-	    endif()
-	endif()
-else()
-CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
-CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
-if(COMPILER_SUPPORTS_CXX11)
-    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
-elseif(COMPILER_SUPPORTS_CXX0X)
-    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
-else()
- message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
-endif()
-
-endif()
-
 if (APPLE)
     set(LINK_FLAGS "-Wl,-all_load")
     set(LINK_END_FLAGS "")
@@ -56,8 +29,16 @@ endif ()
 
 if (NOT WIN32)
 
-add_executable(tail_file tail_file.c)
+add_executable(log_aggregator log_aggregator.c)
+
+target_link_libraries(log_aggregator nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+add_executable(tailfile_chunk tailfile_chunk.c)
+
+target_link_libraries(tailfile_chunk nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+
+add_executable(tailfile_delimited tailfile_delimited.c)
 
-target_link_libraries(tail_file nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
+target_link_libraries(tailfile_delimited nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
 
 endif()
\ No newline at end of file
diff --git a/nanofi/ecu/log_aggregator.c b/nanofi/ecu/log_aggregator.c
new file mode 100644
index 0000000..52d4f23
--- /dev/null
+++ b/nanofi/ecu/log_aggregator.c
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+    *
+    *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/nanofi.h"
+#include "api/ecu.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+int main(int argc, char** argv) {
+
+    if (argc < 7) {
+        printf("Error: must run ./log_aggregator <file> <interval> <delimiter> <hostname> <tcp port number> <nifi port uuid>\n");
+        exit(1);
+    }
+
+    tailfile_input_params input_params = init_logaggregate_input(argv);
+
+    uint64_t intrvl = 0;
+    uint64_t port_num = 0;
+    if (validate_input_params(&input_params, &intrvl, &port_num) < 0) {
+        return 1;
+    }
+
+    setup_signal_action();
+    nifi_proc_params params = setup_nifi_processor(&input_params, "LogAggregator", on_trigger_logaggregator);
+
+    set_standalone_property(params.processor, "file_path", input_params.file);
+    set_standalone_property(params.processor, "delimiter", input_params.delimiter);
+
+    struct CRawSiteToSiteClient * client = createClient(input_params.instance, port_num, input_params.nifi_port_uuid);
+
+    char uuid_str[37];
+    get_proc_uuid_from_processor(params.processor, uuid_str);
+
+    while (!stopped) {
+        flow_file_record * new_ff = invoke(params.processor);
+        struct processor_params * pp = NULL;
+        HASH_FIND_STR(procparams, uuid_str, pp);
+        if (pp) {
+            transmit_payload(client, pp->ff_list, 0);
+            delete_all_flow_files_from_proc(uuid_str);
+        }
+        free_flowfile(new_ff);
+        sleep(intrvl);
+    }
+
+    printf("log aggregator processor stopped\n");
+    if (client) {
+        destroyClient(client);
+    }
+    clear_content_repo(params.instance);
+    delete_all_flow_files_from_proc(uuid_str);
+    free_standalone_processor(params.processor);
+    free_instance(params.instance);
+    free_proc_params(uuid_str);
+    return 0;
+}
diff --git a/nanofi/ecu/tail_file.c b/nanofi/ecu/tail_file.c
deleted file mode 100644
index c428be1..0000000
--- a/nanofi/ecu/tail_file.c
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
-    *
-    *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-#include "api/nanofi.h"
-#include "core/string_utils.h"
-#include "core/cstructs.h"
-#include "core/file_utils.h"
-#include <unistd.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <errno.h>
-#include <limits.h>
-#include <signal.h>
-#include <sys/stat.h>
-
-struct flow_file_records * flowfiles = NULL;
-nifi_instance * instance = NULL;
-standalone_processor * proc = NULL;
-int file_offset = 0;
-int stopped = 0;
-flow_file_list ff_list;
-token_list tks;
-
-void signal_handler(int signum) {
-    if (signum == SIGINT || signum == SIGTERM) {
-        stopped = 1;
-    }
-}
-
-void transmit_flow_files(nifi_instance * instance) {
-    flow_file_list_node * head = ff_list.head;
-    while (head) {
-        transmit_flowfile(head->ff_record, instance);
-        head = head->next;
-    }
-}
-
-void set_offset(int offset) {
-    file_offset = offset;
-}
-
-int get_offset() {
-    return file_offset;
-}
-
-void on_trigger_callback(processor_session * ps, processor_context * ctx) {
-
-    char file_path[4096];
-    char delimiter[3];
-
-    if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) {
-        return;
-    }
-
-    if (get_property(ctx, "delimiter", delimiter, sizeof(delimiter)) != 0) {
-        return;
-    }
-
-    if (strlen(delimiter) == 0) {
-        printf("Delimiter not specified or it is empty\n");
-        return;
-    }
-    char delim = delimiter[0];
-
-    if (delim == '\\') {
-          if (strlen(delimiter) > 1) {
-            switch (delimiter[1]) {
-              case 'r':
-                delim = '\r';
-                break;
-              case 't':
-                delim = '\t';
-                break;
-              case 'n':
-                delim = '\n';
-                break;
-              case '\\':
-                delim = '\\';
-                break;
-              default:
-                break;
-            }
-        }
-    }
-
-    tks = tail_file(file_path, delim, get_offset());
-
-    if (!validate_list(&tks)) return;
-
-    set_offset(get_offset() + tks.total_bytes);
-
-    token_node * head;
-    for (head = tks.head; head && head->data; head = head->next) {
-        flow_file_record * ffr = generate_flow_file(instance, proc);
-        const char * flow_file_path = ffr->contentLocation;
-        FILE * ffp = fopen(flow_file_path, "wb");
-        if (!ffp) {
-            printf("Cannot open flow file at path %s to write content to.\n", flow_file_path);
-            break;
-        }
-
-        int count = strlen(head->data);
-        int ret = fwrite(head->data, 1, count, ffp);
-        if (ret < count) {
-            fclose(ffp);
-            break;
-        }
-        fseek(ffp, 0, SEEK_END);
-        ffr->size = ftell(ffp);
-        fclose(ffp);
-        add_flow_file_record(&ff_list, ffr);
-    }
-    free_all_tokens(&tks);
-}
-
-int main(int argc, char** argv) {
-
-    if (argc < 6) {
-        printf("Error: must run ./tail_file <file> <interval> <delimiter> <nifi instance url> <remote port>\n");
-        exit(1);
-    }
-
-    char * file = argv[1];
-    char * interval = argv[2];
-    char * delimiter = argv[3];
-    char * instance_str = argv[4];
-    char * port_str = argv[5];
-
-    if (access(file, F_OK) == -1) {
-        printf("Error: %s doesn't exist!\n", file);
-        exit(1);
-    }
-
-    struct stat stats;
-    errno = 0;
-    int ret = stat(file, &stats);
-
-    if (ret == -1) {
-        printf("Error occurred while getting file status {file: %s, error: %s}\n", file, strerror(errno));
-        exit(1);
-    }
-    // Check for file existence
-    if (S_ISDIR(stats.st_mode)){
-        printf("Error: %s is a directory!\n", file);
-        exit(1);
-    }
-
-    errno = 0;
-    unsigned long intrvl = strtol(interval, NULL, 10);
-
-    if (errno == ERANGE || intrvl == LONG_MAX || intrvl == LONG_MIN) {
-        printf("Invalid interval value specified\n");
-        return 0;
-    }
-
-    struct sigaction action;
-    memset(&action, 0, sizeof(sigaction));
-    action.sa_handler = signal_handler;
-    sigaction(SIGTERM, &action, NULL);
-    sigaction(SIGINT, &action, NULL);
-
-    nifi_port port;
-
-    port.port_id = port_str;
-
-    instance = create_instance(instance_str, &port);
-
-    const char * processor_name = "TailFile";
-
-    add_custom_processor(processor_name, on_trigger_callback);
-
-    proc = create_processor(processor_name);
-
-    set_standalone_property(proc, "file_path", file);
-    set_standalone_property(proc, "delimiter", delimiter);
-
-    set_offset(0);
-    while (!stopped) {
-        flow_file_record * new_ff = invoke(proc);
-        transmit_flow_files(instance);
-        free_flow_file_list(&ff_list);
-        free_flowfile(new_ff);
-        sleep(intrvl);
-    }
-
-    printf("tail file processor stopped\n");
-    free_standalone_processor(proc);
-    free(instance);
-
-    return 0;
-}
diff --git a/nanofi/ecu/tailfile_chunk.c b/nanofi/ecu/tailfile_chunk.c
new file mode 100644
index 0000000..26b9966
--- /dev/null
+++ b/nanofi/ecu/tailfile_chunk.c
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+    *
+    *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/ecu.h"
+#include "core/flowfiles.h"
+#include <unistd.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+
+int main(int argc, char** argv) {
+
+    if (argc < 7) {
+        printf("Error: must run ./tailfile_chunk <file> <interval> <chunksize> <hostname> <tcp port number> <nifi port uuid>\n");
+        exit(1);
+    }
+
+    tailfile_input_params input_params = init_tailfile_chunk_input(argv);
+
+    uint64_t intrvl = 0;
+    uint64_t port_num = 0;
+    if (validate_input_params(&input_params, &intrvl, &port_num) < 0) {
+        return 1;
+    }
+
+    setup_signal_action();
+    nifi_proc_params params = setup_nifi_processor(&input_params, "TailFileChunk", on_trigger_tailfilechunk);
+
+    set_standalone_property(params.processor, "file_path", input_params.file);
+    set_standalone_property(params.processor, "chunk_size", input_params.chunk_size);
+
+    struct CRawSiteToSiteClient * client = createClient(input_params.instance, port_num, input_params.nifi_port_uuid);
+
+    char uuid_str[37];
+    get_proc_uuid_from_processor(params.processor, uuid_str);
+
+    while (!stopped) {
+        flow_file_record * new_ff = invoke(params.processor);
+        struct processor_params * pp = NULL;
+        HASH_FIND_STR(procparams, uuid_str, pp);
+        if (pp) {
+            transmit_payload(client, pp->ff_list, 0);
+            delete_all_flow_files_from_proc(uuid_str);
+        }
+        free_flowfile(new_ff);
+        sleep(intrvl);
+    }
+
+    printf("processor stopped\n");
+    if (client) {
+        destroyClient(client);
+    }
+    clear_content_repo(params.instance);
+    delete_all_flow_files_from_proc(uuid_str);
+    free_standalone_processor(params.processor);
+    free_instance(params.instance);
+    free_proc_params(uuid_str);
+    return 0;
+}
diff --git a/nanofi/ecu/tailfile_delimited.c b/nanofi/ecu/tailfile_delimited.c
new file mode 100644
index 0000000..3495231
--- /dev/null
+++ b/nanofi/ecu/tailfile_delimited.c
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+    *
+    *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/ecu.h"
+#include "core/flowfiles.h"
+#include <unistd.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+
+int main(int argc, char** argv) {
+
+    if (argc < 7) {
+        printf("Error: must run ./tailfile_delimited <file> <interval> <delimiter> <hostname> <tcp port number> <nifi port uuid>\n");
+        exit(1);
+    }
+
+    tailfile_input_params input_params = init_logaggregate_input(argv);
+
+    uint64_t intrvl = 0;
+    uint64_t port_num = 0;
+    if (validate_input_params(&input_params, &intrvl, &port_num) < 0) {
+        return 1;
+    }
+
+    setup_signal_action();
+    nifi_proc_params params = setup_nifi_processor(&input_params, "TailFileDelimited", on_trigger_tailfiledelimited);
+
+    set_standalone_property(params.processor, "file_path", input_params.file);
+    set_standalone_property(params.processor, "delimiter", input_params.delimiter);
+
+    struct CRawSiteToSiteClient * client = createClient(input_params.instance, port_num, input_params.nifi_port_uuid);
+
+    char uuid_str[37];
+    get_proc_uuid_from_processor(params.processor, uuid_str);
+
+    while (!stopped) {
+        flow_file_record * new_ff = invoke(params.processor);
+        struct processor_params * pp = NULL;
+        HASH_FIND_STR(procparams, uuid_str, pp);
+        if (pp) {
+            transmit_payload(client, pp->ff_list, 1);
+            delete_completed_flow_files_from_proc(uuid_str);
+        }
+        free_flowfile(new_ff);
+        sleep(intrvl);
+    }
+
+    printf("tailfile delimited processor stopped\n");
+    if (client) {
+        destroyClient(client);
+    }
+    clear_content_repo(params.instance);
+    delete_all_flow_files_from_proc(uuid_str);
+    free_standalone_processor(params.processor);
+    free_instance(params.instance);
+    free_proc_params(uuid_str);
+    return 0;
+}
diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt
index b9480ed..6a9779c 100644
--- a/nanofi/examples/CMakeLists.txt
+++ b/nanofi/examples/CMakeLists.txt
@@ -83,8 +83,4 @@ add_executable(monitor_directory monitor_directory.c)
 
 target_link_libraries(monitor_directory nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
 
-#add_executable(tail_file tail_file.c)
-
-#target_link_libraries(tail_file nanofi ${CMAKE_THREAD_LIBS_INIT}  ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS})
-
 endif()
diff --git a/nanofi/examples/tail_file.c b/nanofi/examples/tail_file.c
deleted file mode 100644
index 2200df4..0000000
--- a/nanofi/examples/tail_file.c
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
-    *
-    *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-#include "api/nanofi.h"
-#include "core/string_utils.h"
-#include <unistd.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <errno.h>
-#include <limits.h>
-#include <signal.h>
-#include <sys/stat.h>
-
-typedef struct flow_file_records {
-    flow_file_record ** records;
-    uint64_t len;
-} flow_file_records;
-
-struct flow_file_records * flowfiles = NULL;
-nifi_instance * instance = NULL;
-standalone_processor * proc = NULL;
-int file_offset = 0;
-int stopped = 0;
-
-void signal_handler(int signum) {
-    if (signum == SIGINT || signum == SIGTERM) {
-        stopped = 1;
-    }
-}
-
-void transmit_flow_files(nifi_instance * instance) {
-    NULL_CHECK( ,flowfiles);
-    int i;
-    for (i = 0; i < flowfiles->len; ++i) {
-        NULL_CHECK( ,flowfiles->records[i]);
-        transmit_flowfile(flowfiles->records[i], instance);
-    }
-}
-
-void free_flow_file_records() {
-    NULL_CHECK( ,flowfiles);
-    int i;
-    for (i = 0; i < flowfiles->len; ++i) {
-        free_flowfile(flowfiles->records[i]);
-    }
-    free(flowfiles);
-    flowfiles = NULL;
-}
-
-void set_offset(int offset) {
-    file_offset = offset;
-}
-
-int get_offset() {
-    return file_offset;
-}
-
-void free_all_strings(char ** strings, int num_strings) {
-    int i;
-    for (i = 0; i < num_strings; ++i) {
-        free(strings[i]);
-    }
-}
-
-void on_trigger_callback(processor_session * ps, processor_context * ctx) {
-
-    char file_path[4096];
-    char delimiter[2];
-
-    if (get_property(ctx, "file_path", file_path, 50) != 0) {
-        return;
-    }
-
-    if (get_property(ctx, "delimiter", delimiter, 2) != 0) {
-        return;
-    }
-
-    if (strlen(delimiter) == 0) {
-        printf("Delimiter not specified or it is empty\n");
-        return;
-    }
-    char delim = '\0';
-    if (strlen(delimiter) > 0) {
-        delim = delimiter[0];
-    }
-
-    if (delim == '\0') {
-        printf("Invalid delimiter \n");
-        return;
-    }
-
-    if (delim == '\\') {
-          if (strlen(delimiter) > 1) {
-            switch (delimiter[1]) {
-              case 'r':
-                delim = '\r';
-                break;
-              case 't':
-                delim = '\t';
-                break;
-              case 'n':
-                delim = '\n';
-                break;
-              case '\\':
-                delim = '\\';
-                break;
-              default:
-                break;
-            }
-        }
-    }
-
-    int curr_offset = get_offset();
-    int max_bytes_read = 4096;
-    char buff[max_bytes_read + 1];
-    memset(buff,'\0', max_bytes_read);
-    FILE * fp = fopen(file_path, "rb");
-    if (!fp) return;
-    fseek(fp, curr_offset, SEEK_SET);
-
-    int bytes_read = 0;
-    while ((bytes_read = fread(buff, 1, max_bytes_read, fp)) > 0) {
-        buff[bytes_read] = '\0';
-        tokenizer_mode_t mode = TAILFILE_MODE;
-        struct tokens tks = tokenize_string(buff, delim, mode);
-
-        if (tks.num_strings == 0) return;
-
-        set_offset(get_offset() + tks.total_bytes);
-
-        flowfiles = (flow_file_records *)malloc(sizeof(flow_file_records));
-        flowfiles->records = malloc(sizeof(flow_file_record *) * tks.num_strings);
-        flowfiles->len = tks.num_strings;
-
-        int i;
-        for (i = 0; i < tks.num_strings; ++i) {
-            flowfiles->records[i] = NULL;
-        }
-
-        for (i = 0; i < tks.num_strings; ++i) {
-            if (tks.str_list[i] && strlen(tks.str_list[i]) > 0) {
-                flow_file_record * ffr = generate_flow_file(instance, proc);
-                const char * flow_file_path = ffr->contentLocation;
-                FILE * ffp = fopen(flow_file_path, "wb");
-                if (!ffp) {
-                    printf("Cannot open flow file at path %s to write content to.\n", flow_file_path);
-                    fclose(fp);
-                    free_tokens(&tks);
-                    return;
-                }
-                int count = strlen(tks.str_list[i]);
-                int ret = fwrite(tks.str_list[i], 1, count, ffp);
-                if (ret < count) {
-                    fclose(ffp);
-                    return;
-                }
-                fseek(ffp, 0, SEEK_END);
-                ffr->size = ftell(ffp);
-                fclose(ffp);
-                flowfiles->records[i] = ffr;
-            }
-        }
-        free_tokens(&tks);
-    }
-    fclose(fp);
-}
-
-int main(int argc, char** argv) {
-
-    if (argc < 6) {
-        printf("Error: must run ./tail_file <file> <interval> <delimiter> <nifi instance url> <remote port>\n");
-        exit(1);
-    }
-
-    char * file = argv[1];
-    char * interval = argv[2];
-    char * delimiter = argv[3];
-    char * instance_str = argv[4];
-    char * port_str = argv[5];
-
-    if (access(file, F_OK) == -1) {
-        printf("Error: %s doesn't exist!\n", file);
-        exit(1);
-    }
-
-    struct stat stats;
-    int ret = stat(file, &stats);
-
-    errno = 0;
-    if (ret == -1) {
-        printf("Error occurred while getting file status {file: %s, error: %s}\n", file, strerror(errno));
-        exit(1);
-    }
-    // Check for file existence
-    if (S_ISDIR(stats.st_mode)){
-        printf("Error: %s is a directory!\n", file);
-        exit(1);
-    }
-
-    errno = 0;
-    unsigned long intrvl = strtol(interval, NULL, 10);
-
-    if (errno == ERANGE || intrvl == LONG_MAX || intrvl == LONG_MIN) {
-        printf("Invalid interval value specified\n");
-        return 0;
-    }
-
-    struct sigaction action;
-    memset(&action, 0, sizeof(sigaction));
-    action.sa_handler = signal_handler;
-    sigaction(SIGTERM, &action, NULL);
-    sigaction(SIGINT, &action, NULL);
-
-    nifi_port port;
-
-    port.port_id = port_str;
-
-    instance = create_instance(instance_str, &port);
-
-    const char * processor_name = "TailFile";
-
-    add_custom_processor(processor_name, on_trigger_callback);
-
-    proc = create_processor(processor_name);
-
-    set_standalone_property(proc, "file_path", file);
-    set_standalone_property(proc, "delimiter", delimiter);
-
-    set_offset(0);
-    while (!stopped) {
-        flow_file_record * new_ff = invoke(proc);
-        transmit_flow_files(instance);
-        free_flow_file_records();
-        free_flowfile(new_ff);
-        sleep(intrvl);
-    }
-
-    free_standalone_processor(proc);
-    free(instance);
-
-    return 0;
-}
diff --git a/nanofi/include/api/ecu.h b/nanofi/include/api/ecu.h
new file mode 100644
index 0000000..9549491
--- /dev/null
+++ b/nanofi/include/api/ecu.h
@@ -0,0 +1,95 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+    *
+    *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#ifndef NANOFI_INCLUDE_API_ECU_H_
+#define NANOFI_INCLUDE_API_ECU_H_
+
+#include <signal.h>
+#include "api/nanofi.h"
+#include "uthash.h"
+#include "utlist.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct proc_properties {
+    char * file_path;
+    char delimiter;
+    uint64_t chunk_size;
+} proc_properties;
+
+typedef struct processor_params {
+    char uuid_str[37]; //key
+    struct flow_file_list * ff_list;
+    uint64_t curr_offset;
+    struct proc_properties * properties;
+    UT_hash_handle hh;
+} processor_params;
+
+extern processor_params * procparams;
+extern volatile sig_atomic_t stopped;
+
+typedef struct tailfile_input_params {
+    char * file;
+    char * interval;
+    char * delimiter;
+    char * instance;
+    char * tcp_port;
+    char * nifi_port_uuid;
+    char * chunk_size;
+} tailfile_input_params;
+
+typedef struct nifi_proc_params {
+    nifi_instance * instance;
+    standalone_processor * processor;
+} nifi_proc_params;
+
+/**
+ * Tails a delimited file starting from an offset up to the end of file
+ * @param file the path to the file to tail
+ * @param delim the delimiter character
+ * @param ctx the process context
+ * For eg. To tail from beginning of the file curr_offset = 0
+ * @return a list of flow file info containing list of flow file records
+ * and the current offset in the file
+ */
+flow_file_info log_aggregate(const char * file_path, char delim, processor_context * ctx);
+void on_trigger_tailfilechunk(processor_session * ps, processor_context * ctx);
+void on_trigger_logaggregator(processor_session * ps, processor_context * ctx);
+void on_trigger_tailfiledelimited(processor_session * ps, processor_context * ctx);
+void signal_handler(int signum);
+void delete_all_flow_files_from_proc(const char * uuid);
+void delete_completed_flow_files_from_proc(const char * uuid);
+void update_proc_params(const char * uuid, uint64_t value, flow_file_list * ff);
+processor_params * get_proc_params(const char * uuid);
+
+void init_common_input(tailfile_input_params * input_params, char ** args);
+tailfile_input_params init_logaggregate_input(char ** args);
+tailfile_input_params init_tailfile_chunk_input(char ** args);
+
+int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, uint64_t * port_num);
+void setup_signal_action();
+nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, const char * processor_name, void(*callback)(processor_session *, processor_context *));
+void free_proc_params(const char * uuid);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* NANOFI_INCLUDE_API_ECU_H_ */
diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h
index 074ae70..f401073 100644
--- a/nanofi/include/api/nanofi.h
+++ b/nanofi/include/api/nanofi.h
@@ -150,7 +150,7 @@ processor *add_python_processor(flow *, processor_logic* logic);
  * @param name the name of the processor to instanciate
  * @return pointer to the new processor or nullptr in case it cannot be instantiated (wrong name?)
  **/
-standalone_processor *create_processor(const char * name);
+standalone_processor *create_processor(const char * name, nifi_instance * instance);
 
 /**
  * Free a standalone processor
@@ -318,6 +318,13 @@ flow_file_record* create_ff_object_nc();
 flow_file_record* generate_flow_file(nifi_instance * instance, standalone_processor * proc);
 
 /**
+ * Adds content to the flow file record.
+ * @param ctx the processor context
+ * @return a flow file record
+ */
+flow_file_record * generate_flow(processor_context * ctx);
+
+/**
  * Get incoming flow file. To be used in processor logic callbacks.
  * @param session current processor session
  * @param context current processor context
@@ -441,6 +448,35 @@ int delete_custom_processor(const char * name);
  **/
 int transfer_to_relationship(flow_file_record * ffr, processor_session * ps, const char * relationship);
 
+/**
+ * Write content to a flow file and return a pointer to flow file record
+ * @param buff, the buffer to read content from
+ * @param count the number of bytes to read
+ * @param ctx the processor context
+ */
+flow_file_record * write_to_flow(const char * buff, size_t count, processor_context * ctx);
+
+/**
+ * Initialize content repository
+ * @param ctx the processor context
+ */
+void initialize_content_repo(processor_context * ctx, const char * uuid);
+
+/**
+ * Clear content repository contents
+ */
+void clear_content_repo(const nifi_instance * instance);
+
+/**
+ * Get the processor uuid from processor context
+ */
+void get_proc_uuid_from_context(const processor_context * ctx, char * uuid_target);
+
+/**
+ * Get the processor uuid from processor
+ */
+void get_proc_uuid_from_processor(standalone_processor * proc, char * uuid_target);
+
 /****
  * ##################################################################
  *  Persistence Operations
diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h
index 3be3ce3..846be5b 100644
--- a/nanofi/include/core/cstructs.h
+++ b/nanofi/include/core/cstructs.h
@@ -117,9 +117,9 @@ typedef struct {
 
   char * contentLocation; /**< Filesystem location of this object */
 
-  void *attributes; /**< Hash map of attributes */
+  void * attributes; /**< Hash map of attributes */
 
-  void *ffp;
+  void * ffp;
 
   uint8_t keepContent;
 
@@ -175,16 +175,15 @@ typedef struct token_list {
  * ##################################################################
  */
 
-typedef struct flow_file_list_node {
-    flow_file_record * ff_record;
-    struct flow_file_list_node * next;
-} flow_file_list_node;
-
 typedef struct flow_file_list {
-    flow_file_list_node * head;
-    flow_file_list_node * tail;
-    int len;
-    int offset;
+    flow_file_record * ff_record;
+    int complete;
+    struct flow_file_list * next;
 } flow_file_list;
 
+typedef struct flow_file_info {
+    struct flow_file_list * ff_list;
+    uint64_t total_bytes;
+} flow_file_info;
+
 #endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */
diff --git a/nanofi/include/core/file_utils.h b/nanofi/include/core/file_utils.h
index 25c3029..15806e4 100644
--- a/nanofi/include/core/file_utils.h
+++ b/nanofi/include/core/file_utils.h
@@ -19,6 +19,7 @@
 #ifndef NANOFI_INCLUDE_CORE_FILE_UTILS_H_
 #define NANOFI_INCLUDE_CORE_FILE_UTILS_H_
 
+#include "utlist.h"
 #include "flowfiles.h"
 
 #ifdef __cplusplus
@@ -26,14 +27,49 @@ extern "C" {
 #endif
 
 /**
- * Tails a delimited file starting from an offset up to the end of file
- * @param file the path to the file to tail
- * @param delim the delimiter character
- * @param curr_offset the offset in the file to tail from.
- * For eg. To tail from beginning of the file curr_offset = 0
- * @return a list of tokens
+ * Recursively deletes a directory tree
+ * @param path, the path to the directory
  */
-token_list tail_file(const char * file, char delim, int curr_offset);
+void remove_directory(const char * path);
+
+/**
+ * Determine if the provided directory/file path is a directory
+ * @path the absolute path to the file/directory
+ * @return 1 if path is directory else 0
+ */
+int is_directory(const char * path);
+
+/*
+ * Get the platform-specific path separator.
+ * @param force_posix returns the posix path separator ('/'), even when not on posix. Useful when dealing with remote posix paths.
+ * @return the path separator character
+ */
+const char * get_separator(int force_posix);
+
+/**
+ * Joins parent path with child path
+ * @param parent the parent path
+ * @param child the child path
+ * @return concatenated path
+ * @attention this function allocates memory for the returned concatenated path
+ * and it is left for the caller to free the memory
+ */
+char * concat_path(const char * parent, const char * child);
+
+/**
+ * Make a directory tree specified by path
+ * @param path the path to the directory
+ * @return 1 if successful else 0
+ */
+int make_dir(const char * path);
+
+/**
+ * Return the current working directory
+ * @return the current working directory
+ * @attention this function allocates memory on heap
+ * it is left to the caller to free it
+ */
+char * get_current_working_directory();
 
 #ifdef __cplusplus
 }
diff --git a/nanofi/include/core/flowfiles.h b/nanofi/include/core/flowfiles.h
index 60ac02e..72307ec 100644
--- a/nanofi/include/core/flowfiles.h
+++ b/nanofi/include/core/flowfiles.h
@@ -19,10 +19,33 @@
 #ifndef NANOFI_INCLUDE_CORE_FLOWFILES_H_
 #define NANOFI_INCLUDE_CORE_FLOWFILES_H_
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 #include "cstructs.h"
+#include "api/ecu.h"
+#include "sitetosite/CPeer.h"
+#include "sitetosite/CRawSocketProtocol.h"
+
+flow_file_list * add_flow_file_record(flow_file_list ** ff_list, flow_file_record * record);
+
+void free_flow_file_list(flow_file_list ** ff_list);
+
+void add_attributes(flow_file_record * ffr, const char * file_path, uint64_t curr_offset);
+
+void update_attributes(flow_file_record * ffr, const char * file_path, uint64_t curr_offset);
+
+void transmit_flow_files(nifi_instance * instance, flow_file_list * ff_list, int complete);
+
+void transmit_payload(struct CRawSiteToSiteClient * client, struct flow_file_list * ff_list, int complete);
+
+uint64_t flow_files_size(flow_file_list * ff_list);
 
-void add_flow_file_record(flow_file_list * ff_list, flow_file_record * record);
+void read_payload_and_transmit(struct flow_file_list * ffl, struct CRawSiteToSiteClient * client);
 
-void free_flow_file_list(flow_file_list * ff_list);
+#ifdef __cplusplus
+}
+#endif
 
 #endif /* NANOFI_INCLUDE_CORE_FLOWFILES_H_ */
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
index b10b95f..0325081 100644
--- a/nanofi/include/cxx/Instance.h
+++ b/nanofi/include/cxx/Instance.h
@@ -27,6 +27,7 @@
 #include "RemoteProcessorGroupPort.h"
 #include "core/ContentRepository.h"
 #include "core/repository/VolatileContentRepository.h"
+#include "core/repository/FileSystemRepository.h"
 #include "core/Repository.h"
 
 #include "C2CallbackAgent.h"
@@ -40,6 +41,8 @@
 #include "ReflexiveSession.h"
 #include "utils/ThreadPool.h"
 #include "core/state/UpdateController.h"
+#include "core/file_utils.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -63,14 +66,24 @@ class ProcessorLink {
 class Instance {
  public:
 
-  explicit Instance(const std::string &url, const std::string &port)
+  explicit Instance(const std::string &url, const std::string &port, const std::string &repo_class_name = "")
       : configure_(std::make_shared<Configure>()),
         url_(url),
         agent_(nullptr),
         rpgInitialized_(false),
         listener_thread_pool_(1),
-        content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
         no_op_repo_(std::make_shared<minifi::core::Repository>()) {
+
+    if (repo_class_name == "filesystemrepository") {
+        content_repo_ = std::make_shared<minifi::core::repository::FileSystemRepository>();
+    } else {
+        content_repo_ = std::make_shared<minifi::core::repository::VolatileContentRepository>();
+    }
+    char * cwd = get_current_working_directory();
+    if (cwd) {
+        configure_->setHome(std::string(cwd));
+        free(cwd);
+    }
     running_ = false;
     stream_factory_ = minifi::io::StreamFactory::getInstance(configure_);
     utils::Identifier uuid;
@@ -118,7 +131,7 @@ class Instance {
     return no_op_repo_;
   }
 
-  std::shared_ptr<minifi::core::ContentRepository> getContentRepository() {
+  std::shared_ptr<minifi::core::ContentRepository> getContentRepository() const {
     return content_repo_;
   }
 
diff --git a/nanofi/src/api/ecu.c b/nanofi/src/api/ecu.c
new file mode 100644
index 0000000..709c681
--- /dev/null
+++ b/nanofi/src/api/ecu.c
@@ -0,0 +1,530 @@
+#include "api/ecu.h"
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+processor_params * procparams = NULL;
+volatile sig_atomic_t stopped = 0;
+
+void free_proc_params(const char * uuid) {
+
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp) {
+        free_flow_file_list(&pp->ff_list);
+        free(pp->properties->file_path);
+        free(pp->properties);
+        HASH_DEL(procparams, pp);
+        free(pp);
+    }
+}
+
+void signal_handler(int signum) {
+    if (signum == SIGINT || signum == SIGTERM) {
+        stopped = 1;
+    }
+}
+
+void init_common_input(tailfile_input_params * input_params, char ** args) {
+    if (args && *args) {
+        input_params->file = args[1];
+        input_params->interval = args[2];
+        input_params->instance = args[4];
+        input_params->tcp_port = args[5];
+        input_params->nifi_port_uuid = args[6];
+    }
+}
+
+tailfile_input_params init_logaggregate_input(char ** args) {
+    tailfile_input_params input_params;
+    memset(&input_params, 0, sizeof(input_params));
+    init_common_input(&input_params, args);
+    input_params.delimiter = args[3];
+    return input_params;
+}
+
+tailfile_input_params init_tailfile_chunk_input(char ** args) {
+    tailfile_input_params input_params;
+    memset(&input_params, 0, sizeof(input_params));
+    init_common_input(&input_params, args);
+    input_params.chunk_size = args[3];
+    return input_params;
+}
+
+int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, uint64_t * port_num) {
+    if (access(params->file, F_OK) == -1) {
+        printf("Error: %s doesn't exist!\n", params->file);
+        return -1;
+    }
+
+    struct stat stats;
+    int ret = stat(params->file, &stats);
+
+    if (ret == -1) {
+        printf("Error occurred while getting file status {file: %s, error: %s}\n", params->file, strerror(errno));
+        return -1;
+    }
+    // Check for file existence
+    if (S_ISDIR(stats.st_mode)){
+        printf("Error: %s is a directory!\n", params->file);
+        return -1;
+    }
+
+    errno = 0;
+    *intrvl = (uint64_t)(strtoul(params->interval, NULL, 10));
+
+    if (errno != 0) {
+        printf("Invalid interval value specified\n");
+        return -1;
+    }
+
+    errno = 0;
+    *port_num = (uint64_t)(strtoul(params->tcp_port, NULL, 10));
+    if (errno != 0) {
+        printf("Cannot convert tcp port to numeric value\n");
+        return -1;
+    }
+    return 0;
+}
+
+void setup_signal_action() {
+    struct sigaction action;
+    memset(&action, 0, sizeof(sigaction));
+    action.sa_handler = signal_handler;
+    sigaction(SIGTERM, &action, NULL);
+    sigaction(SIGINT, &action, NULL);
+}
+
+nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, const char * processor_name, void(*callback)(processor_session *, processor_context *)) {
+    nifi_proc_params params;
+    nifi_port port;
+    port.port_id = input_params->nifi_port_uuid;
+
+    nifi_instance * instance = create_instance(input_params->instance, &port);
+    add_custom_processor(processor_name, callback);
+    standalone_processor * proc = create_processor(processor_name, instance);
+    params.instance = instance;
+    params.processor = proc;
+    return params;
+}
+
+void add_to_hash_table(flow_file_record * ffr, uint64_t offset, const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp == NULL) {
+        pp = (struct processor_params*)malloc(sizeof(struct processor_params));
+        memset(pp, 0, sizeof(struct processor_params));
+        strcpy(pp->uuid_str, uuid);
+        HASH_ADD_STR(procparams, uuid_str, pp);
+    }
+
+    add_flow_file_record(&pp->ff_list, ffr);
+    pp->curr_offset = offset;
+}
+
+void delete_all_flow_files_from_proc(const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp) {
+        struct flow_file_list * head = pp->ff_list;
+        while (head) {
+            struct flow_file_list * tmp = head;
+            free_flowfile(tmp->ff_record);
+            head = head->next;
+            free(tmp);
+        }
+        pp->ff_list = head;
+    }
+}
+
+void delete_completed_flow_files_from_proc(const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp) {
+        struct flow_file_list * head = pp->ff_list;
+        while (head) {
+            struct flow_file_list * tmp = head;
+            if (tmp->complete) {
+                free_flowfile(tmp->ff_record);
+                head = head->next;
+                free(tmp);
+            }
+            else {
+                break;
+            }
+        }
+        pp->ff_list = head;
+    }
+}
+
+uint64_t get_current_offset(const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp) {
+        return pp->curr_offset;
+    }
+    return 0;
+}
+
+processor_params * get_proc_params(const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    return pp;
+}
+
+void update_proc_params(const char * uuid, uint64_t value, flow_file_list * ffl) {
+    struct processor_params * pp = get_proc_params(uuid);
+    if (!pp) {
+        pp = (struct processor_params *)malloc(sizeof(struct processor_params));
+        memset(pp, 0, sizeof(struct processor_params));
+        pp->ff_list = ffl;
+        pp->curr_offset = value;
+        strcpy(pp->uuid_str, uuid);
+        HASH_ADD_STR(procparams, uuid_str, pp);
+        return;
+    }
+    delete_all_flow_files_from_proc(uuid);
+    pp->curr_offset += value;
+    pp->ff_list = ffl;
+}
+
+uint64_t update_curr_offset(const char * uuid, uint64_t value) {
+    struct processor_params * pp = get_proc_params(uuid);
+    if (pp) {
+        pp->curr_offset += value;
+        return pp->curr_offset;
+    }
+
+    pp = (struct processor_params *)malloc(sizeof(struct processor_params));
+    memset(pp, 0, sizeof(struct processor_params));
+    strcpy(pp->uuid_str, uuid);
+    pp->curr_offset = value;
+    HASH_ADD_STR(procparams, uuid_str, pp);
+    return pp->curr_offset;
+}
+
+struct proc_properties * get_processor_properties(const char * uuid) {
+    if (!uuid) {
+        return NULL;
+    }
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (!pp) {
+        return NULL;
+    }
+    return pp->properties;
+}
+
+void add_processor_properties(const char * uuid, struct proc_properties * const props) {
+    struct processor_params * pp = get_proc_params(uuid);
+    if (pp) {
+        pp->properties = props;
+        return;
+    }
+
+    pp = (struct processor_params *)malloc(sizeof(struct processor_params));
+    memset(pp, 0, sizeof(struct processor_params));
+    strcpy(pp->uuid_str, uuid);
+    pp->properties = props;
+    HASH_ADD_STR(procparams, uuid_str, pp);
+}
+
+void on_trigger_tailfilechunk(processor_session * ps, processor_context * ctx) {
+
+    char uuid_str[37];
+    get_proc_uuid_from_context(ctx, uuid_str);
+
+    initialize_content_repo(ctx, uuid_str);
+
+    struct proc_properties * props = get_processor_properties(uuid_str);
+    if (!props) {
+        char file_path[4096];
+        char chunk_size[50];
+        if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) {
+            return;
+        }
+
+        if (get_property(ctx, "chunk_size", chunk_size, sizeof(chunk_size)) != 0) {
+            return;
+        }
+
+        errno = 0;
+        uint64_t chunk_size_value = strtoul(chunk_size, NULL, 10);
+
+        if (errno != 0 || chunk_size_value == 0) {
+            printf("Invalid chunk size specified\n");
+            return;
+        }
+
+        props = (struct proc_properties *)malloc(sizeof(struct proc_properties));
+        memset(props, 0, sizeof(struct proc_properties));
+        int len = strlen(file_path);
+        props->file_path = (char *)malloc((len + 1) * sizeof(char));
+        strncpy(props->file_path, file_path, len);
+        props->file_path[len] = '\0';
+        props->chunk_size = chunk_size_value;
+        add_processor_properties(uuid_str, props);
+    }
+
+    FILE * fp = fopen(props->file_path, "rb");
+
+    if (!fp) {
+        printf("Unable to open file. {file: %s, reason: %s}\n", props->file_path, strerror(errno));
+        return;
+    }
+
+    char * buff = (char *)malloc((props->chunk_size +1 ) * sizeof(char));
+    size_t bytes_read = 0;
+
+    uint64_t curr_offset = get_current_offset(uuid_str);
+    fseek(fp, curr_offset, SEEK_SET);
+    while ((bytes_read = fread(buff, 1, props->chunk_size, fp)) > 0) {
+        if (bytes_read < props->chunk_size) {
+            break;
+        }
+        buff[props->chunk_size] = '\0';
+        flow_file_record * ffr = write_to_flow(buff, strlen(buff), ctx);
+        curr_offset = ftell(fp);
+        add_attributes(ffr, props->file_path, curr_offset);
+        add_to_hash_table(ffr, curr_offset, uuid_str);
+    }
+    free(buff);
+    fclose(fp);
+}
+
+flow_file_info log_aggregate(const char * file_path, char delim, processor_context * ctx) {
+    flow_file_info ff_info;
+    memset(&ff_info, 0, sizeof(ff_info));
+
+    if (!file_path) {
+        return ff_info;
+    }
+
+    char uuid_str[37];
+    get_proc_uuid_from_context(ctx, uuid_str);
+
+    char buff[MAX_BYTES_READ + 1];
+    errno = 0;
+    FILE * fp = fopen(file_path, "rb");
+    if (!fp) {
+        printf("Cannot open file: {file: %s, reason: %s}\n", file_path, strerror(errno));
+        return ff_info;
+    }
+
+    uint64_t curr_offset = get_current_offset(uuid_str);
+
+    fseek(fp, curr_offset, SEEK_SET);
+
+    flow_file_list * ffl = NULL;
+    size_t bytes_read = 0;
+    while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) {
+        buff[bytes_read] = '\0';
+        struct token_list tokens = tokenize_string_tailfile(buff, delim);
+        if (tokens.total_bytes > 0) {
+            ff_info.total_bytes += tokens.total_bytes;
+            curr_offset += tokens.total_bytes;
+            fseek(fp, curr_offset, SEEK_SET);
+        }
+
+        token_node * head;
+        for (head = tokens.head; head && head->data; head = head->next) {
+            flow_file_record * ffr = write_to_flow(head->data, strlen(head->data), ctx);
+            add_attributes(ffr, file_path, curr_offset);
+            add_flow_file_record(&ffl, ffr);
+        }
+        free_all_tokens(&tokens);
+    }
+    fclose(fp);
+    ff_info.ff_list = ffl;
+    return ff_info;
+}
+
+struct proc_properties * get_properties(const char * uuid, processor_context * ctx) {
+    struct proc_properties * props = get_processor_properties(uuid);
+    if (props) {
+        return props;
+    }
+
+    char file_path[4096];
+    char delimiter[3];
+
+    if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) {
+        return props;
+    }
+
+    if (get_property(ctx, "delimiter", delimiter, sizeof(delimiter)) != 0) {
+        printf("No delimiter found\n");
+        return props;
+    }
+
+    if (strlen(delimiter) == 0) {
+        printf("Delimiter not specified or it is empty\n");
+        return props;
+    }
+
+    props = (struct proc_properties *)malloc(sizeof(struct proc_properties));
+    memset(props, 0, sizeof(struct proc_properties));
+
+    char delim = delimiter[0];
+
+    if (delim == '\\') {
+          if (strlen(delimiter) > 1) {
+            switch (delimiter[1]) {
+              case 'r':
+                delim = '\r';
+                break;
+              case 't':
+                delim = '\t';
+                break;
+              case 'n':
+                delim = '\n';
+                break;
+              case '\\':
+                delim = '\\';
+                break;
+              default:
+                break;
+            }
+        }
+    }
+
+    int len = strlen(file_path);
+    props->file_path = (char *)malloc((len + 1) * sizeof(char));
+    strncpy(props->file_path, file_path, len);
+    props->file_path[len] = '\0';
+    props->delimiter = delim;
+
+    add_processor_properties(uuid, props);
+    return props;
+}
+
+void on_trigger_logaggregator(processor_session * ps, processor_context * ctx) {
+    char uuid_str[37];
+    get_proc_uuid_from_context(ctx, uuid_str);
+
+    struct proc_properties * props = get_properties(uuid_str, ctx);
+
+    if (!props || !props->file_path) return;
+
+    char delim = props->delimiter;
+
+    initialize_content_repo(ctx, uuid_str);
+    flow_file_info ff_info = log_aggregate(props->file_path, delim, ctx);
+
+    update_proc_params(uuid_str, ff_info.total_bytes, ff_info.ff_list);
+}
+
+void write_flow_file(flow_file_record * ffr, const char * buff, size_t count) {
+    FILE * ffp = fopen(ffr->contentLocation, "ab");
+    if (!ffp) return;
+    if (fwrite(buff, 1, count, ffp) < count) {
+        fclose(ffp);
+        free_flowfile(ffr);
+        return;
+    }
+    fclose(ffp);
+}
+
+flow_file_list * get_last_flow_file(const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (!pp) {
+        return NULL;
+    }
+
+    flow_file_list * ff_list = pp->ff_list;
+    flow_file_list * el = NULL;
+    LL_FOREACH(ff_list, el) {
+        if (el && !el->next) {
+            return el;
+        }
+    }
+    return NULL;
+}
+
+flow_file_list * add_flow_file_to_proc_params(const char * uuid, flow_file_record * ffr) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (!pp) {
+        pp = (struct processor_params *)malloc(sizeof(struct processor_params));
+        memset(pp, 0, sizeof(struct processor_params));
+        strcpy(pp->uuid_str, uuid);
+        HASH_ADD_STR(procparams, uuid_str, pp);
+    }
+    flow_file_list * ffl_node = add_flow_file_record(&pp->ff_list, ffr);
+    ffl_node->complete = 0;
+    return ffl_node;
+}
+
+void on_trigger_tailfiledelimited(processor_session * ps, processor_context * ctx) {
+    char uuid_str[37];
+    get_proc_uuid_from_context(ctx, uuid_str);
+
+    initialize_content_repo(ctx, uuid_str);
+    struct proc_properties * props = get_properties(uuid_str, ctx);
+
+    if (!props || !props->file_path) return;
+
+    char delim = props->delimiter;
+
+    FILE * fp = fopen(props->file_path, "rb");
+
+    if (!fp) {
+        printf("Unable to open file. {file: %s, reason: %s}\n", props->file_path, strerror(errno));
+        return;
+    }
+
+    char buff[MAX_BYTES_READ + 1];
+    size_t bytes_read = 0;
+
+    uint64_t curr_offset = get_current_offset(uuid_str);
+    fseek(fp, curr_offset, SEEK_SET);
+
+    flow_file_list * ffl_node = get_last_flow_file(uuid_str);
+    while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) {
+        buff[bytes_read] = '\0';
+        const char * begin = buff;
+        const char * end = NULL;
+
+        while ((end = strchr(begin, delim))) {
+            uint64_t len = end - begin;
+            if (len > 0) {
+                if (!ffl_node || ffl_node->complete) {
+                    ffl_node = add_flow_file_to_proc_params(uuid_str, generate_flow(ctx));
+                }
+                write_flow_file(ffl_node->ff_record, begin, len);
+                update_curr_offset(uuid_str, (len + 1));
+            }
+            else {
+                update_curr_offset(uuid_str, 1);
+            }
+            if (ffl_node) {
+                ffl_node->complete = 1;
+                update_attributes(ffl_node->ff_record, props->file_path, get_current_offset(uuid_str));
+            }
+            begin = (end + 1);
+        }
+
+        if (!end && *begin != '\0') {
+            if (!ffl_node || ffl_node->complete) {
+                ffl_node = add_flow_file_to_proc_params(uuid_str, generate_flow(ctx));
+            }
+            size_t count = strlen(begin);
+            write_flow_file(ffl_node->ff_record, begin, count);
+            update_curr_offset(uuid_str, count);
+            update_attributes(ffl_node->ff_record, props->file_path, get_current_offset(uuid_str));
+        }
+    }
+    fclose(fp);
+}
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index d37fc6b..e8ea25a 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -113,7 +113,7 @@ nifi_instance *create_instance(const char *url, nifi_port *port) {
    * This API will gradually move away from C++, hence malloc is used for nifi_instance
    * Since minifi::Instance is currently being used, then we need to use new in that case.
    */
-  instance->instance_ptr = new minifi::Instance(url, port->port_id);
+  instance->instance_ptr = new minifi::Instance(url, port->port_id, "filesystemrepository");
 
   NULL_CHECK(nullptr, instance->instance_ptr);
 
@@ -124,25 +124,53 @@ nifi_instance *create_instance(const char *url, nifi_port *port) {
   return instance;
 }
 
-standalone_processor *create_processor(const char *name) {
+standalone_processor * create_processor(const char *name, nifi_instance * instance) {
   NULL_CHECK(nullptr, name);
   auto ptr = ExecutionPlan::createProcessor(name, name);
   if (!ptr) {
     return nullptr;
   }
-  if (standalone_instance == nullptr) {
+  if (instance == NULL) {
     nifi_port port;
     char portnum[] = "98765";
     port.port_id = portnum;
-    standalone_instance = create_instance("internal_standalone", &port);
+    instance = create_instance("internal_standalone", &port);
   }
-  auto flow = create_new_flow(standalone_instance);
+  auto flow = create_new_flow(instance);
   std::shared_ptr<ExecutionPlan> plan(flow);
   plan->addProcessor(ptr, name);
   ExecutionPlan::addProcessorWithPlan(ptr->getUUIDStr(), plan);
   return static_cast<standalone_processor*>(ptr.get());
 }
 
+void initialize_content_repo(processor_context * ctx, const char * uuid) {
+    if (ctx->isInitialized()) {
+        return;
+    }
+    char * cwd = get_current_working_directory();
+    if (cwd) {
+        const char * sep = get_separator(0);
+        const std::string repo_path = std::string(cwd) + sep + "contentrepository" + sep + uuid;
+        ctx->initializeContentRepository(repo_path);
+        free(cwd);
+    }
+}
+
+void clear_content_repo(const nifi_instance * instance) {
+    const auto content_repo = static_cast<minifi::Instance*>(instance->instance_ptr)->getContentRepository();
+    const auto storage_path = content_repo->getStoragePath();
+    remove_directory(storage_path.c_str());
+}
+
+void get_proc_uuid_from_processor(standalone_processor * proc, char * uuid_target) {
+    strcpy(uuid_target, proc->getUUIDStr().c_str());
+}
+
+void get_proc_uuid_from_context(const processor_context * ctx, char * uuid_target) {
+    standalone_processor * proc = static_cast<standalone_processor*>(ctx->getProcessorNode()->getProcessor().get());
+    get_proc_uuid_from_processor(proc, uuid_target);
+}
+
 void free_standalone_processor(standalone_processor* proc) {
   NULL_CHECK(, proc);
   ExecutionPlan::removeProcWithPlan(proc->getUUIDStr());
@@ -245,28 +273,58 @@ flow_file_record* create_ff_object_nc() {
   return new_ff;
 }
 
-flow_file_record * generate_flow_file(nifi_instance * instance, standalone_processor * proc) {
-    if (!instance || !proc) {
-        return nullptr;
-    }
+flow_file_record * generate_flow(processor_context * ctx) {
     flow_file_record * ffr = create_ff_object_nc();
 
-    auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
-    auto content_repo = minifi_instance_ref->getContentRepository();
+    if (ffr->crp) {
+    	delete static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+    }
+    ffr->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>(ctx->getContentRepository()));
+
+    auto plan = ExecutionPlan::getPlan(ctx->getProcessorNode()->getProcessor()->getUUIDStr());
 
-    ffr->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>(content_repo));
-    auto plan = ExecutionPlan::getPlan(proc->getUUIDStr());
     if (!plan) {
         return nullptr;
     }
-    ffr->ffp = static_cast<void*>(new std::shared_ptr<core::FlowFile>(plan->getCurrentFlowFile()));
-    ffr->keepContent = 1;
+    ffr->ffp = NULL;
+    ffr->keepContent = 0;
     auto ff_content_repo_ptr = (static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp));
     auto claim = std::make_shared<minifi::ResourceClaim>(*ff_content_repo_ptr);
-    const char * full_path = claim->getContentFullPath().c_str();
-    int len = strlen(full_path);
-    ffr->contentLocation = (char *) malloc(sizeof(char) * (len + 1));
-    snprintf(ffr->contentLocation, len + 1, "%s", full_path);
+
+    size_t len = strlen(claim->getContentFullPath().c_str());
+    ffr->contentLocation = (char *) malloc((len + 1) * sizeof(char));
+    snprintf(ffr->contentLocation, len+1, "%s", claim->getContentFullPath().c_str());
+    return ffr;
+}
+
+flow_file_record * write_to_flow(const char * buff, size_t count, processor_context * ctx) {
+    if (!ctx) {
+        return NULL;
+    }
+
+    flow_file_record * ffr = generate_flow(ctx);
+
+    if (ffr == NULL) {
+        printf("Could not generate flow file\n");
+        return NULL;
+    }
+
+    FILE * ffp = fopen(ffr->contentLocation, "wb");
+    if (!ffp) {
+        printf("Cannot open flow file at path %s to write content to.\n", ffr->contentLocation);
+        free_flowfile(ffr);
+        return NULL;
+    }
+
+    int ret = fwrite(buff, 1, count, ffp);
+    if (ret < count) {
+        fclose(ffp);
+        free_flowfile(ffr);
+        return NULL;
+    }
+    fseek(ffp, 0, SEEK_END);
+    ffr->size = ftell(ffp);
+    fclose(ffp);
     return ffr;
 }
 
diff --git a/nanofi/src/core/file_utils.c b/nanofi/src/core/file_utils.c
index 3f7b79e..1eeedc6 100644
--- a/nanofi/src/core/file_utils.c
+++ b/nanofi/src/core/file_utils.c
@@ -20,44 +20,122 @@
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#include <unistd.h>
+#include <limits.h>
 
-#include "api/nanofi.h"
 #include "core/string_utils.h"
 #include "core/file_utils.h"
 
-token_list tail_file(const char * file_path, char delim, int curr_offset) {
-    token_list tkn_list;
-    memset(&tkn_list, 0, sizeof(struct token_list));
+#ifdef _MSC_VER
+#ifndef PATH_MAX
+#define PATH_MAX 260
+#endif
+#endif
 
-    if (!file_path) {
-        return tkn_list;
+int is_directory(const char * path) {
+    struct stat dir_stat;
+    if (stat(path, &dir_stat) < 0) {
+        return 0;
     }
+    return S_ISDIR(dir_stat.st_mode);
+}
+
+const char * get_separator(int force_posix)
+{
+#ifdef WIN32
+    if (!force_posix) {
+        return "\\";
+    }
+#endif
+    return "/";
+}
+
+char * concat_path(const char * parent, const char * child) {
+    char * path = (char *)malloc((strlen(parent) + strlen(child) + 2) * sizeof(char));
+    strcpy(path, parent);
+    const char * sep = get_separator(0);
+    strcat(path, sep);
+    strcat(path, child);
+    return path;
+}
+
+void remove_directory(const char * dir_path) {
+
+    if (!is_directory(dir_path)) {
+        if (unlink(dir_path) == -1) {
+            printf("Could not remove file %s\n", dir_path);
+        }
+        return;
+    }
+
+    uint64_t path_len = strlen(dir_path);
+    struct dirent * dir;
+    DIR * d = opendir(dir_path);
+
+    while ((dir = readdir(d)) != NULL) {
+        char * entry_name = dir->d_name;
+        if (!strcmp(entry_name, ".") || !strcmp(entry_name, "..")) {
+            continue;
+        }
+        char * path = concat_path(dir_path, entry_name);
+        remove_directory(path);
+        free(path);
+    }
+
+    rmdir(dir_path);
+    closedir(d);
+}
+
+int make_dir(const char * path) {
+    if (!path) return -1;
 
-    char buff[MAX_BYTES_READ + 1];
-    memset(buff, 0, MAX_BYTES_READ+1);
     errno = 0;
-    FILE * fp = fopen(file_path, "rb");
-    if (!fp) {
-        printf("Cannot open file: {file: %s, reason: %s}\n", file_path, strerror(errno));
-        return tkn_list;
-    }
-    fseek(fp, curr_offset, SEEK_SET);
-
-    int bytes_read = 0;
-    int i = 0;
-    while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) {
-        buff[bytes_read] = '\0';
-        struct token_list tokens = tokenize_string_tailfile(buff, delim);
-        if (tokens.size > 0) {
-            attach_lists(&tkn_list, &tokens);
+    int ret = mkdir(path, S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
+    if (ret == 0) {
+        return 0;
+    }
+
+    switch (errno) {
+    case ENOENT: {
+        char * found = strrchr(path, '/');
+        if (!found) {
+            return -1;
         }
-        tkn_list.total_bytes += tokens.total_bytes;
-        if (tokens.total_bytes > 0) {
-            curr_offset += tokens.total_bytes;
-            fseek(fp, curr_offset, SEEK_SET);
+        int len = found - path;
+        char * dir = calloc(len + 1, sizeof(char));
+        strncpy(dir, path, len);
+        dir[len] = '\0';
+        int res = make_dir(dir);
+        free(dir);
+        if (res < 0) {
+            return -1;
         }
-        memset(buff, 0, MAX_BYTES_READ);
+        return mkdir(path, S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
+    }
+    case EEXIST: {
+        if (is_directory(path)) {
+            return 0;
+        }
+        return -1;
+    }
+    default:
+        return -1;
+    }
+}
+
+char * get_current_working_directory() {
+    char * cwd = (char *)malloc(PATH_MAX * sizeof(char));
+    memset(cwd, 0, PATH_MAX);
+    #ifdef WIN32
+    if (_getcwd(cwd, PATH_MAX) != NULL)
+        return cwd;
+    #else
+    if (getcwd(cwd, PATH_MAX) != NULL) {
+        return cwd;
     }
-    fclose(fp);
-    return tkn_list;
+    #endif
+    free(cwd);
+    return NULL;
 }
diff --git a/nanofi/src/core/flowfiles.c b/nanofi/src/core/flowfiles.c
index edf2c5b..6c86e98 100644
--- a/nanofi/src/core/flowfiles.c
+++ b/nanofi/src/core/flowfiles.c
@@ -16,38 +16,149 @@
  * limitations under the License.
  */
 
+#include "api/nanofi.h"
+#include "api/ecu.h"
 #include "core/flowfiles.h"
+
+#include "utlist.h"
 #include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/stat.h>
 
-void add_flow_file_record(flow_file_list * ff_list, flow_file_record * record) {
-    if (!ff_list || !record) return;
+flow_file_list * add_flow_file_record(flow_file_list ** ff_list, flow_file_record * record) {
+    if (!record) {
+        return *ff_list;
+    }
 
-    struct flow_file_list_node * new_node = (struct flow_file_list_node *)malloc(sizeof(struct flow_file_list_node));
+    struct flow_file_list * new_node = (struct flow_file_list *)malloc(sizeof(struct flow_file_list));
     new_node->ff_record = record;
-    new_node->next = NULL;
+    LL_APPEND(*ff_list, new_node);
+    return new_node;
+}
 
-    if (!ff_list->head || !ff_list->tail) {
-        ff_list->head = ff_list->tail = new_node;
-        ff_list->len = 1;
+void free_flow_file_list(flow_file_list ** ff_list) {
+    if (!*ff_list) {
         return;
     }
+    flow_file_list * head = *ff_list;
+    while (head) {
+       flow_file_list * tmp = head;
+       free_flowfile(tmp->ff_record);
+       head = head->next;
+       free(tmp);
+    }
+}
+
+void add_attributes(flow_file_record * ffr, const char * file_path, uint64_t curr_offset) {
+    char offset_str[21];
+    snprintf(offset_str, sizeof(offset_str), "%llu", curr_offset);
+    add_attribute(ffr, "current offset", offset_str, strlen(offset_str));
+    char content_location[strlen(ffr->contentLocation) + 1];
+    snprintf(content_location, sizeof(content_location), "%s", ffr->contentLocation);
+    add_attribute(ffr, "content location", content_location, strlen(content_location));
+    add_attribute(ffr, "tailfile path", (char*)file_path, strlen(file_path));
+}
+
+void update_attributes(flow_file_record * ffr, const char * file_path, uint64_t curr_offset) {
+    char offset_str[21];
+    snprintf(offset_str, sizeof(offset_str), "%llu", curr_offset);
+    update_attribute(ffr, "current offset", offset_str, strlen(offset_str));
+    char content_location[strlen(ffr->contentLocation) + 1];
+    snprintf(content_location, sizeof(content_location), "%s", ffr->contentLocation);
+    update_attribute(ffr, "content location", content_location, strlen(content_location));
+    update_attribute(ffr, "tailfile path", (char*)file_path, strlen(file_path));
+}
 
-    ff_list->tail->next = new_node;
-    ff_list->tail = new_node;
-    ff_list->len++;
+void transmit_flow_files(nifi_instance * instance, flow_file_list * ff_list, int complete) {
+    if (!instance || !ff_list) {
+        return;
+    }
+    flow_file_list * el = NULL;
+    LL_FOREACH(ff_list, el) {
+        if (!complete || el->complete) {
+            transmit_flowfile(el->ff_record, instance);
+        }
+    }
 }
 
-void free_flow_file_list(flow_file_list * ff_list) {
-    if (!ff_list || !ff_list->head) {
+void read_payload_and_transmit(struct flow_file_list * ffl, struct CRawSiteToSiteClient * client) {
+    if (!ffl || !client) {
         return;
     }
 
-    flow_file_list_node * head = ff_list->head;
-    while (head) {
-        free_flowfile(head->ff_record);
-        flow_file_list_node * tmp = head;
-        head = head->next;
-        free(tmp);
+    char * file = ffl->ff_record->contentLocation;
+    FILE * fp = fopen(file, "rb");
+    if (!fp) {
+        return;
+    }
+
+    struct stat statfs;
+    if (stat(file, &statfs) < 0) {
+        return;
     }
-    memset(ff_list, 0, sizeof(struct flow_file_list));
+    size_t file_size = statfs.st_size;
+
+    attribute attr;
+    attr.key = "current offset";
+    if (get_attribute(ffl->ff_record, &attr) < 0) {
+        printf("Error looking up flow file attribute %s\n", attr.key);
+        return;
+    }
+
+    errno = 0;
+    uint64_t offset = strtoull((const char *)attr.value, NULL, 10);
+    if (errno != 0) {
+        printf("Error converting flow file offset value\n");
+        return;
+    }
+    uint64_t begin_offset =  offset - file_size;
+    char * buff = (char *)malloc(sizeof(char) * 4097);
+    size_t count = 0;
+    while ((count = fread(buff, 1, 4096, fp)) > 0) {
+        buff[count] = '\0';
+        begin_offset += count;
+        char offset_str[21];
+        snprintf(offset_str, sizeof(offset_str), "%llu", begin_offset);
+        update_attribute(ffl->ff_record, "current offset", offset_str, strlen(offset_str));
+
+        attribute_set as;
+        uint64_t num_attrs = get_attribute_quantity(ffl->ff_record);
+        as.size = num_attrs;
+        as.attributes = (attribute *)malloc(num_attrs * sizeof(attribute));
+        get_all_attributes(ffl->ff_record, &as);
+
+        if (transmitPayload(client, buff, &as) == 0) {
+            printf("payload of %zu bytes from %s sent successfully\n", count, ffl->ff_record->contentLocation);
+        }
+        else {
+            printf("Failed to send payload, flow file %s\n", ffl->ff_record->contentLocation);
+        }
+        free(as.attributes);
+    }
+    free(buff);
+    fclose(fp);
+}
+
+void transmit_payload(struct CRawSiteToSiteClient * client, struct flow_file_list * ff_list, int complete) {
+    if (!client || !ff_list) {
+        return;
+    }
+    flow_file_list * el = NULL;
+    LL_FOREACH(ff_list, el) {
+        if (!complete || el->complete) {
+            read_payload_and_transmit(el, client);
+        }
+    }
+}
+
+uint64_t flow_files_size(flow_file_list * ff_list) {
+    if (!ff_list) {
+        return 0;
+    }
+
+    uint64_t counter = 0;
+    flow_file_list * el = NULL;
+    LL_COUNT(ff_list, el, counter);
+    return counter;
 }
diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp
index 9a63c4a..769b1dc 100644
--- a/nanofi/tests/CAPITests.cpp
+++ b/nanofi/tests/CAPITests.cpp
@@ -318,7 +318,7 @@ TEST_CASE("Test standalone processors", "[testStandalone]") {
 
   create_testfile_for_getfile(sourcedir.c_str());
 
-  standalone_processor* getfile_proc = create_processor("GetFile");
+  standalone_processor* getfile_proc = create_processor("GetFile", NULL);
   REQUIRE(set_standalone_property(getfile_proc, "Input Directory", sourcedir.c_str()) == 0);
 
   flow_file_record* ffr = invoke(getfile_proc);
@@ -326,7 +326,7 @@ TEST_CASE("Test standalone processors", "[testStandalone]") {
   REQUIRE(ffr != nullptr);
   REQUIRE(get_attribute_quantity(ffr) > 0);
 
-  standalone_processor* extract_test = create_processor("ExtractText");
+  standalone_processor* extract_test = create_processor("ExtractText", NULL);
   REQUIRE(extract_test != nullptr);
   REQUIRE(set_standalone_property(extract_test, "Attribute", "TestAttr") == 0);
 
@@ -379,7 +379,7 @@ TEST_CASE("Test interaction of flow and standlone processors", "[testStandaloneW
   flow_file_record *record = get_next_flow_file(instance, test_flow);
   REQUIRE(record != nullptr);
 
-  standalone_processor* putfile_proc = create_processor("PutFile");
+  standalone_processor* putfile_proc = create_processor("PutFile", NULL);
   REQUIRE(set_standalone_property(putfile_proc, "Directory", putfiledir.c_str()) == 0);
 
   flow_file_record* put_record = invoke_ff(putfile_proc, record);
@@ -409,7 +409,7 @@ TEST_CASE("Test standalone processors with file input", "[testStandaloneWithFile
   auto sourcedir = testController.createTempDirectory(src_format);
   std::string path = create_testfile_for_getfile(sourcedir.c_str());
 
-  standalone_processor* extract_test = create_processor("ExtractText");
+  standalone_processor* extract_test = create_processor("ExtractText", NULL);
   REQUIRE(extract_test != nullptr);
   REQUIRE(set_standalone_property(extract_test, "Attribute", "TestAttr") == 0);
 
@@ -465,9 +465,9 @@ TEST_CASE("C API robustness test", "[TestRobustness]") {
   free_standalone_processor(nullptr);
   free_instance(nullptr);
 
-  REQUIRE(create_processor(nullptr) == nullptr);
+  REQUIRE(create_processor(nullptr, nullptr) == nullptr);
 
-  standalone_processor *standalone_proc = create_processor("GetFile");
+  standalone_processor *standalone_proc = create_processor("GetFile", NULL);
   REQUIRE(standalone_proc != nullptr);
 
   REQUIRE(set_property(nullptr, "prop_name", "prop_value") == -1);
diff --git a/nanofi/tests/CTailFileTests.cpp b/nanofi/tests/CLogAggregatorTests.cpp
similarity index 69%
rename from nanofi/tests/CTailFileTests.cpp
rename to nanofi/tests/CLogAggregatorTests.cpp
index 491e8c0..edf62c9 100644
--- a/nanofi/tests/CTailFileTests.cpp
+++ b/nanofi/tests/CLogAggregatorTests.cpp
@@ -16,20 +16,20 @@
  * limitations under the License.
  */
 
+#ifndef _WIN32
 #include "catch.hpp"
 
 #include <vector>
 #include <string>
-#include <fstream>
 #include <numeric>
 #include <algorithm>
-#include <assert.h>
 #include <unistd.h>
 #include <string.h>
 #include <sys/stat.h>
 #include "core/string_utils.h"
 #include "core/file_utils.h"
 
+#include "CTestsBase.h"
 
 void test_lists_equal(token_list * tknlist, const std::vector<std::string>& sv) {
     REQUIRE(tknlist != NULL);
@@ -174,131 +174,101 @@ TEST_CASE("Test string tokenizer for string starting and ending with delimited c
  * ##################################################################
  */
 
-class FileManager {
-public:
-    FileManager(const std::string& filePath) {
-        assert(!filePath.empty() && "filePath provided cannot be empty!");
-        filePath_ = filePath;
-        outputStream_.open(filePath_, std::ios::binary);
-    }
-
-    ~FileManager() {
-        std::ifstream ifs(filePath_);
-        if (ifs.good()) {
-            remove(filePath_.c_str());
-        }
-    }
-
-    void Write(const std::string& str) {
-        outputStream_ << str;
-    }
-
-    std::string WriteNChars(uint64_t n, char c) {
-        std::string s(n, c);
-        outputStream_ << s;
-        return s;
-    }
-
-    std::string getFilePath() const {
-        return filePath_;
-    }
-
-    void CloseStream() {
-        outputStream_.flush();
-        outputStream_.close();
-    }
-
-    uint64_t GetFileSize() {
-        CloseStream();
-        struct stat buff;
-        if (stat(filePath_.c_str(), &buff) == 0) {
-            return buff.st_size;
-        }
-        return 0;
-    }
-
-private:
-    std::string filePath_;
-    std::ofstream outputStream_;
-};
-
-TEST_CASE("Simple tail file test", "[testTailFile]") {
+TEST_CASE("Simple log aggregator test", "[testLogAggregator]") {
 
+    const char * content = "hello world";
     FileManager fm("test.txt");
-    fm.Write("hello world");
+    fm.Write(content);
     fm.CloseStream();
 
-    const char * file = fm.getFilePath().c_str();
-    struct token_list tkn_list = tail_file(file, ';', 0);
-    REQUIRE(tkn_list.size == 0);
-    REQUIRE(tkn_list.head == NULL);
-    REQUIRE(tkn_list.total_bytes == 0);
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+    struct processor_params * pp = invoke_processor(mgr, fm.getFilePath().c_str());
+
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->curr_offset == 0);
+    REQUIRE(flow_files_size(pp->ff_list) == 0);
 }
 
-TEST_CASE("Empty file tail test", "[testEmptyFileTail]") {
+TEST_CASE("Empty file log aggregator test", "[testEmptyFileLogAggregator]") {
     FileManager fm("test.txt");
     fm.CloseStream();
 
-    const char * file = fm.getFilePath().c_str();
-    struct token_list tkn_list = tail_file(file, ';', 0);
-    REQUIRE(tkn_list.size == 0);
-    REQUIRE(tkn_list.head == NULL);
-    REQUIRE(tkn_list.total_bytes == 0);
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+    struct processor_params * pp = invoke_processor(mgr, fm.getFilePath().c_str());
+
+    REQUIRE(pp != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 0);
+    REQUIRE(pp->ff_list == NULL);
+    REQUIRE(pp->curr_offset == 0);
 }
 
-TEST_CASE("File containing only delimiters tail test", "[testDelimiterOnlyFileTail]") {
+TEST_CASE("File containing only delimiters test", "[testDelimiterOnlyLogAggregator]") {
     FileManager fm("test.txt");
-    fm.Write("----");
+    fm.Write(";;;;");
     fm.CloseStream();
 
-    const char * file = fm.getFilePath().c_str();
-    struct token_list tkn_list = tail_file(file, '-', 0);
-    REQUIRE(tkn_list.size == 0);
-    REQUIRE(tkn_list.head == NULL);
-    REQUIRE(tkn_list.total_bytes == 4);
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+    struct processor_params * pp = invoke_processor(mgr, fm.getFilePath().c_str());
+
+    REQUIRE(pp != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 0);
+    REQUIRE(pp->ff_list == NULL);
+    REQUIRE(pp->curr_offset == 4);
 }
 
-TEST_CASE("File tail test string starting with delimiter", "[testDelimiterOnlyFileTail]") {
+TEST_CASE("File containing string starting with delimiter", "[testDelimiterStartingStrings]") {
     FileManager fm("test.txt");
-    fm.Write("----hello");
+    fm.Write(";;;;hello");
     fm.CloseStream();
 
-    const char * file = fm.getFilePath().c_str();
-    struct token_list tkn_list = tail_file(file, '-', 0);
-    REQUIRE(tkn_list.size == 0);
-    REQUIRE(tkn_list.head == NULL);
-    REQUIRE(tkn_list.total_bytes == 4);
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+    auto pp = invoke_processor(mgr, fm.getFilePath().c_str());
+
+    REQUIRE(flow_files_size(pp->ff_list) == 0);
+    REQUIRE(pp->ff_list == NULL);
+    REQUIRE(pp->curr_offset == 4);
 }
 
-TEST_CASE("Test tail file with less than 4096 delimited chars", "[testTailFileDelimitedString]") {
+TEST_CASE("Test tail file with less than 4096 delimited chars", "[testLogAggregateFileLessThan4KB]") {
+    const std::string token1("token1");
+    const std::string token2("token2");
+    const std::string token3("token3");
+    std::vector<std::string> tokens = {token1, token2, token3};
 
-    const std::string delimitedString = "token1--token2--token3";
+    const std::string delimitedString = join_strings(tokens, ";;");
     FileManager fm("test.txt");
     fm.Write(delimitedString);
     const std::string filePath = fm.getFilePath();
     fm.CloseStream();
 
-    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
-    test_lists_equal(&tokens, std::vector<std::string>{"token1", "token2"});
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+    auto pp = invoke_processor(mgr, filePath.c_str());
+
+    REQUIRE(pp->curr_offset == (token1.size() + token2.size() + (2 * std::string("--").size())));
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
 }
 
 // Although there is no delimiter within the string that is at least 4096 bytes long,
 // tail_file still creates a flow file for the first 4096 bytes.
-TEST_CASE("Test tail file having 4096 bytes without delimiter", "[testTailFile4096Chars]") {
+TEST_CASE("Test tail file having 4096 bytes without delimiter", "[testLogAggregateFile4096Chars]") {
 
     FileManager fm("test.txt");
     const std::string s = std::move(fm.WriteNChars(4096, 'a'));
     const std::string filePath = fm.getFilePath();
     fm.CloseStream();
 
-    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
-    test_lists_equal(&tokens, std::vector<std::string>{std::move(s)});
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
+    auto pp = invoke_processor(mgr, filePath.c_str());
+    REQUIRE(pp->curr_offset == 4096);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 1);
 }
 
 // Although there is no delimiter within the string that is equal to 4096 bytes or longer
 // tail_file creates a flow file for each subsequent 4096 byte chunk. It leaves the last chunk
 // if it is smaller than 4096 bytes and not delimited
-TEST_CASE("Test tail file having more than 4096 bytes without delimiter", "[testTailFileMoreThan4096Chars]") {
+TEST_CASE("Test tail file having more than 4096 bytes without delimiter", "[testLogAggregarteFileMoreThan4096Chars]") {
 
     FileManager fm("test.txt");
     const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
@@ -307,63 +277,89 @@ TEST_CASE("Test tail file having more than 4096 bytes without delimiter", "[test
     fm.Write(s3);
     fm.CloseStream();
 
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
     const uint64_t totalStringsSize = s1.size() + s2.size() + s3.size();
     const std::string filePath = fm.getFilePath();
     const uint64_t bytesWrittenToStream = fm.GetFileSize();
     REQUIRE(bytesWrittenToStream == totalStringsSize);
 
-    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
-    test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s2)});
+    auto pp = invoke_processor(mgr, filePath.c_str());
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+    flow_file_list * el = NULL;
+    LL_FOREACH(pp->ff_list, el) {
+        REQUIRE(el->ff_record->size == 4096);
+    }
+    REQUIRE(pp->curr_offset == (s1.size() + s2.size()));
 }
 
-TEST_CASE("Test tail file having more than 4096 bytes with delimiter", "[testTailFileWithDelimitedString]") {
+TEST_CASE("Test tail file having more than 4096 bytes with delimiter", "[testLogAggregateWithDelimitedString]") {
 
     FileManager fm("test.txt");
     const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
-    const std::string d1 = std::move(fm.WriteNChars(2, '-'));
+    const std::string d1 = std::move(fm.WriteNChars(2, ';'));
     const std::string s2 = std::move(fm.WriteNChars(4096, 'b'));
     fm.CloseStream();
 
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
     const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size();
     const std::string filePath = fm.getFilePath();
     const uint64_t bytesWrittenToStream = fm.GetFileSize();
     REQUIRE(bytesWrittenToStream == totalStringsSize);
 
-    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
-    test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s2)});
+    auto pp = invoke_processor(mgr, filePath.c_str());
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+    REQUIRE(pp->curr_offset == totalStringsSize);
+    flow_file_list * el = NULL;
+    LL_FOREACH(pp->ff_list, el) {
+        REQUIRE(el->ff_record->size == 4096);
+    }
 }
 
-TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk less than 4096", "[testTailFileWithDelimitedString]") {
+TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk less than 4096", "[testLogAggregateDelimited]") {
 
     FileManager fm("test.txt");
     const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
-    const std::string d1 = std::move(fm.WriteNChars(2, '-'));
+    const std::string d1 = std::move(fm.WriteNChars(2, ';'));
     const std::string s2 = std::move(fm.WriteNChars(4000, 'b'));
     fm.CloseStream();
 
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
     const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size();
     const std::string filePath = fm.getFilePath();
     const uint64_t bytesWrittenToStream = fm.GetFileSize();
     REQUIRE(bytesWrittenToStream == totalStringsSize);
 
-    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
-    test_lists_equal(&tokens, std::vector<std::string>{std::move(s1)});
+    auto pp = invoke_processor(mgr, filePath.c_str());
+    REQUIRE(pp->curr_offset == (s1.size() + d1.size()));
+    REQUIRE(flow_files_size(pp->ff_list) == 1);
+
+    flow_file_list * el = NULL;
+    LL_FOREACH(pp->ff_list, el) {
+        REQUIRE(el->ff_record->size == 4096);
+    }
 }
 
-TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk more than 4096", "[testTailFileWithDelimitedString]") {
+TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk more than 4096", "[testLogAggregateDelimited]") {
 
     FileManager fm("test.txt");
     const std::string s1 = std::move(fm.WriteNChars(4096, 'a'));
-    const std::string d1 = std::move(fm.WriteNChars(2, '-'));
+    const std::string d1 = std::move(fm.WriteNChars(2, ';'));
     const std::string s2 = std::move(fm.WriteNChars(4098, 'b'));
     fm.CloseStream();
 
+    TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator);
     const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size();
     const std::string filePath = fm.getFilePath();
     const uint64_t bytesWrittenToStream = fm.GetFileSize();
     REQUIRE(bytesWrittenToStream == totalStringsSize);
 
-    const std::string s3 = std::string(s2.data(), s2.data()+4096);
-    struct token_list tokens = tail_file(filePath.c_str(), '-', 0);
-    test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s3)});
+    auto pp = invoke_processor(mgr, filePath.c_str());
+    REQUIRE(pp->curr_offset == 8194);
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+    flow_file_list * el = NULL;
+    LL_FOREACH(pp->ff_list, el) {
+        REQUIRE(el->ff_record->size == 4096);
+    }
 }
+#endif
diff --git a/nanofi/tests/CTailFileChunkTests.cpp b/nanofi/tests/CTailFileChunkTests.cpp
new file mode 100644
index 0000000..ff933c8
--- /dev/null
+++ b/nanofi/tests/CTailFileChunkTests.cpp
@@ -0,0 +1,135 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _WIN32
+#include "catch.hpp"
+
+#include <dirent.h>
+#include <stdio.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "core/string_utils.h"
+#include "core/file_utils.h"
+
+#include "CTestsBase.h"
+
+/****
+ * ##################################################################
+ *  CTAILFILE CHUNK TESTS
+ * ##################################################################
+ */
+
+TEST_CASE("Test tailfile chunk size 4096, file size 8KB", "[tailfileChunk8KBFileSize]") {
+
+    TailFileTestResourceManager mgr("TailFileChunk", on_trigger_tailfilechunk);
+    const char * file = "./e.txt";
+    const char * chunksize = "4096";
+
+    //Write 8192 bytes to the file
+    FileManager fm(file);
+    fm.WriteNChars(4096, 'a');
+    fm.WriteNChars(4096, 'b');
+    fm.CloseStream();
+
+    standalone_processor * proc = mgr.getProcessor();
+    set_standalone_property(proc, "file_path", file);
+    set_standalone_property(proc, "chunk_size", chunksize);
+
+    flow_file_record * new_ff = invoke(proc);
+
+    char uuid_str[37];
+    get_proc_uuid_from_processor(proc, uuid_str);
+    struct processor_params * pp = get_proc_params(uuid_str);
+
+    //Test that two flow file records were created
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(pp->ff_list->ff_record != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+    //Test that the current offset in the file is 8192 bytes
+    REQUIRE(pp->curr_offset == 8192);
+}
+
+TEST_CASE("Test tailfile chunk size 4096, file size less than 8KB", "[tailfileChunkFileSizeLessThan8KB]") {
+
+    TailFileTestResourceManager mgr("TailFileChunk", on_trigger_tailfilechunk);
+    const char * file = "./e.txt";
+    const char * chunksize = "4096";
+
+    //Write 4505 bytes to the file
+    FileManager fm(file);
+    fm.WriteNChars(4096, 'a');
+    fm.WriteNChars(409, 'b');
+    fm.CloseStream();
+
+    standalone_processor * proc = mgr.getProcessor();
+    set_standalone_property(proc, "file_path", file);
+    set_standalone_property(proc, "chunk_size", chunksize);
+
+    flow_file_record * new_ff = invoke(proc);
+
+    char uuid_str[37];
+    get_proc_uuid_from_processor(proc, uuid_str);
+    struct processor_params * pp = get_proc_params(uuid_str);
+    //Test that one flow file record was created
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(pp->ff_list->ff_record != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 1);
+
+    //Test that the current offset in the file is 4096 bytes
+    REQUIRE(pp->curr_offset == 4096);
+    REQUIRE(pp->ff_list->ff_record->size == 4096);
+
+    struct stat fstat;
+    REQUIRE(stat(pp->ff_list->ff_record->contentLocation, &fstat) == 0);
+    REQUIRE(fstat.st_size == 4096);
+}
+
+TEST_CASE("Test tailfile chunk size 512, file size equal to 4608B", "[tailfileChunkFileSize8KB]") {
+
+    TailFileTestResourceManager mgr("TailFileChunk", on_trigger_tailfilechunk);
+    const char * file = "./e.txt";
+    const char * chunksize = "512";
+
+    //Write 4608 bytes to the file
+    FileManager fm(file);
+    fm.WriteNChars(4608, 'a');
+    fm.CloseStream();
+
+    standalone_processor * proc = mgr.getProcessor();
+    set_standalone_property(proc, "file_path", file);
+    set_standalone_property(proc, "chunk_size", chunksize);
+
+    flow_file_record * new_ff = invoke(proc);
+
+    char uuid_str[37];
+    get_proc_uuid_from_processor(proc, uuid_str);
+    struct processor_params * pp = get_proc_params(uuid_str);
+
+    //Test that one flow file record was created
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(pp->ff_list->ff_record != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 9);
+
+    //Test that the current offset in the file is 4608 bytes
+    REQUIRE(pp->curr_offset == 4608);
+}
+#endif
diff --git a/nanofi/tests/CTailFileDelimitedTests.cpp b/nanofi/tests/CTailFileDelimitedTests.cpp
new file mode 100644
index 0000000..ed079d1
--- /dev/null
+++ b/nanofi/tests/CTailFileDelimitedTests.cpp
@@ -0,0 +1,256 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "catch.hpp"
+
+#include "CTestsBase.h"
+
+/****
+ * ##################################################################
+ *  CTAILFILE DELIMITED TESTS
+ * ##################################################################
+ */
+
+TEST_CASE("Test tailfile delimited. Empty file", "[tailfileDelimitedEmptyFileTest]") {
+
+    TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+    const char * file = "./e.txt";
+    const char * delimiter = ";";
+
+    //Create empty file
+    FileManager fm(file);
+
+    auto pp = invoke_processor(mgr, file);
+
+    //Test that no flowfiles were created
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->ff_list == NULL);
+}
+
+TEST_CASE("Test tailfile delimited. File has less than 4096 chars", "[tailfileDelimitedLessThan4096Chars]") {
+
+    TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+    const char * file = "./e.txt";
+    const char * delimiter = ";";
+
+    FileManager fm(file);
+    fm.WriteNChars(34, 'a');
+    fm.CloseStream();
+
+    auto pp = invoke_processor(mgr, file);
+
+    //No flow files will be created
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 1);
+    REQUIRE(pp->ff_list->complete == 0);
+
+    //Test that the current offset in the file is 34
+    REQUIRE(pp->curr_offset == 34);
+}
+
+TEST_CASE("Test tailfile delimited. Simple test", "[tailfileDelimitedSimpleTest]") {
+
+    TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+    const char * file = "./e.txt";
+    const char * delimiter = ";";
+
+    //Write 8192 bytes to the file
+    FileManager fm(file);
+    fm.WriteNChars(34, 'a');
+    fm.WriteNChars(1, ';');
+    fm.WriteNChars(6, 'b');
+    fm.WriteNChars(1, ';');
+    fm.CloseStream();
+
+    auto pp = invoke_processor(mgr, file);
+
+    //Test that two flow file records were created
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(pp->ff_list->ff_record != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+    //Test that the current offset in the file is 42 bytes
+    REQUIRE(pp->curr_offset == 42);
+
+    //Test the flow file sizes
+    const char * flowfile1_path = pp->ff_list->ff_record->contentLocation;
+    const char * flowfile2_path = pp->ff_list->next->ff_record->contentLocation;
+
+    struct stat fstat;
+    stat(flowfile1_path, &fstat);
+    REQUIRE(fstat.st_size == 34);
+
+    stat(flowfile2_path, &fstat);
+    REQUIRE(fstat.st_size == 6);
+
+    REQUIRE(pp->ff_list->complete == 1);
+    REQUIRE(pp->ff_list->next->complete == 1);
+}
+
+TEST_CASE("Test tailfile delimited. trailing non delimited string", "[tailfileNonDelimitedTest]") {
+
+    TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+    const char * file = "./e.txt";
+    const char * delimiter = ";";
+
+    //Write 8192 bytes to the file
+    FileManager fm(file);
+    fm.WriteNChars(34, 'a');
+    fm.WriteNChars(1, ';');
+    fm.WriteNChars(32, 'b');
+    fm.CloseStream();
+
+    auto pp = invoke_processor(mgr, file);
+
+    //Test that two flow file records were created
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(pp->ff_list->ff_record != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+    //Test that the current offset in the file is 35 bytes
+    REQUIRE(pp->curr_offset == 67);
+    REQUIRE(pp->ff_list->complete == 1);
+    REQUIRE(pp->ff_list->next->complete == 0);
+    struct stat fstat;
+    stat(pp->ff_list->ff_record->contentLocation, &fstat);
+    REQUIRE(fstat.st_size == 34);
+
+    //Append a delimiter at the end of the file
+    fm.OpenStream();
+    fm.WriteNChars(1, ';');
+    fm.CloseStream();
+
+    pp = invoke_processor(mgr, file);
+    REQUIRE(pp != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+    stat(pp->ff_list->next->ff_record->contentLocation, &fstat);
+    REQUIRE(fstat.st_size == 32);
+    REQUIRE(pp->ff_list->next->complete == 1);
+}
+
+TEST_CASE("Test tailfile delimited 4096 chars non delimited", "[tailfileDelimitedSimpleTest]") {
+
+    TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+    const char * file = "./e.txt";
+    const char * delimiter = ";";
+
+    //Write 4096 bytes to the file
+    FileManager fm(file);
+    fm.WriteNChars(4096, 'a');
+    fm.CloseStream();
+
+    auto pp = invoke_processor(mgr, file);
+
+    REQUIRE(pp !=  NULL);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 1);
+    REQUIRE(pp->ff_list->complete == 0);
+    //Test that the current offset in the file is 4096 bytes
+    REQUIRE(pp->curr_offset == 4096);
+
+    //Write another 2048 characters
+    fm.OpenStream();
+    fm.WriteNChars(2048, 'b');
+    fm.CloseStream();
+
+    pp = invoke_processor(mgr, file);
+
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 1);
+    REQUIRE(pp->ff_list->complete == 0);
+
+    //Test that the current offset in the file is (4096 + 2048)
+    REQUIRE(pp->curr_offset == 6144);
+
+    //Write another 2048 characters
+    fm.OpenStream();
+    fm.WriteNChars(2048, 'c');
+    fm.CloseStream();
+
+    pp = invoke_processor(mgr, file);
+
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 1);
+
+    //Test that the current offset in the file is 8192 bytes only
+    REQUIRE(pp->curr_offset == 8192);
+
+    //Write a delimiter at the end and expect a flow file size of 8192 bytes
+    fm.OpenStream();
+    fm.WriteNChars(1, ';');
+    fm.CloseStream();
+
+    pp = invoke_processor(mgr, file);
+
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(pp->ff_list->ff_record != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 1);
+    REQUIRE(pp->ff_list->complete == 1);
+    const char * flowfile_path = pp->ff_list->ff_record->contentLocation;
+    struct stat fstat;
+    stat(flowfile_path, &fstat);
+    REQUIRE(fstat.st_size == 8192);
+}
+
+TEST_CASE("Test tailfile delimited. string starting with delimiter", "[tailfileDelimiterStartStringTest]") {
+
+    TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited);
+    const char * file = "./e.txt";
+    const char * delimiter = ";";
+
+    //Write 8192 bytes to the file
+    FileManager fm(file);
+    fm.WriteNChars(5, ';');
+    fm.WriteNChars(34, 'a');
+    fm.WriteNChars(4, ';');
+    fm.WriteNChars(32, 'b');
+    fm.CloseStream();
+
+    auto pp = invoke_processor(mgr, file);
+
+    //Test that two flow file records were created
+    REQUIRE(pp != NULL);
+    REQUIRE(pp->ff_list != NULL);
+    REQUIRE(pp->ff_list->ff_record != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+    //Test that the current offset in the file is 35 bytes
+    REQUIRE(pp->curr_offset == 75);
+    REQUIRE(pp->ff_list->complete == 1);
+    REQUIRE(pp->ff_list->next->complete == 0);
+    struct stat fstat;
+    stat(pp->ff_list->ff_record->contentLocation, &fstat);
+    REQUIRE(fstat.st_size == 34);
+
+    //Append a delimiter at the end of the file
+    fm.OpenStream();
+    fm.WriteNChars(1, ';');
+    fm.CloseStream();
+
+    pp = invoke_processor(mgr, file);
+    REQUIRE(pp != NULL);
+    REQUIRE(flow_files_size(pp->ff_list) == 2);
+
+    stat(pp->ff_list->next->ff_record->contentLocation, &fstat);
+    REQUIRE(fstat.st_size == 32);
+    REQUIRE(pp->ff_list->next->complete == 1);
+}
diff --git a/nanofi/tests/CTestsBase.h b/nanofi/tests/CTestsBase.h
new file mode 100644
index 0000000..bcde416
--- /dev/null
+++ b/nanofi/tests/CTestsBase.h
@@ -0,0 +1,141 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _WIN32
+#ifndef NANOFI_TESTS_CTESTSBASE_H_
+#define NANOFI_TESTS_CTESTSBASE_H_
+
+#include <vector>
+#include <string>
+#include <fstream>
+#include <assert.h>
+#include <sys/stat.h>
+
+#include "core/file_utils.h"
+#include "api/ecu.h"
+#include "api/nanofi.h"
+
+class FileManager {
+public:
+    FileManager(const std::string& filePath) {
+        assert(!filePath.empty() && "filePath provided cannot be empty!");
+        struct stat statbuff;
+        assert(!is_directory(filePath.c_str()) && "Provided file is not a filepath");
+        filePath_ = filePath;
+        remove(filePath_.c_str());
+        outputStream_.open(filePath_, std::ios::binary);
+    }
+
+    ~FileManager() {
+        std::ifstream ifs(filePath_);
+        if (ifs.good()) {
+            remove(filePath_.c_str());
+        }
+    }
+
+    void Write(const std::string& str) {
+        outputStream_ << str;
+    }
+
+    std::string WriteNChars(uint64_t n, char c) {
+        std::string s(n, c);
+        outputStream_ << s;
+        return s;
+    }
+
+    std::string getFilePath() const {
+        return filePath_;
+    }
+
+    void OpenStream() {
+        outputStream_.open(filePath_, std::ios::binary|std::ios::app);
+    }
+
+    void CloseStream() {
+        outputStream_.flush();
+        outputStream_.close();
+    }
+
+    uint64_t GetFileSize() {
+        CloseStream();
+        struct stat buff;
+        if (stat(filePath_.c_str(), &buff) == 0) {
+            return buff.st_size;
+        }
+        return 0;
+    }
+
+private:
+    std::string filePath_;
+    std::ofstream outputStream_;
+};
+
+class TailFileTestResourceManager {
+public:
+    TailFileTestResourceManager(const std::string& processor_name, void(*callback)(processor_session * ps, processor_context * ctx)) {
+        const char * port_str = "uuid";
+        nifi_port port;
+        port.port_id = (char *)port_str;
+        const char * instance_str = "nifi";
+        instance_ = create_instance(instance_str, &port);
+        add_custom_processor(processor_name.c_str(), callback);
+        processor_ = create_processor(processor_name.c_str(), instance_);
+    }
+
+    ~TailFileTestResourceManager() {
+        remove_directory("./contentrepository");
+        char uuid_str[37];
+        get_proc_uuid_from_processor(processor_, uuid_str);
+        delete_all_flow_files_from_proc(uuid_str);
+        struct processor_params * tmp, * pp = NULL;
+        HASH_ITER(hh, procparams, pp, tmp) {
+            HASH_DEL(procparams, pp);
+            free(pp);
+        }
+        free_standalone_processor(processor_);
+        free_instance(instance_);
+    }
+
+    standalone_processor * getProcessor() const {
+        return processor_;
+    }
+
+    nifi_instance * getInstance() const {
+        return instance_;
+    }
+
+private:
+    nifi_instance * instance_;
+    standalone_processor * processor_;
+};
+
+struct processor_params * invoke_processor(TailFileTestResourceManager& mgr, const char * filePath) {
+    standalone_processor * proc = mgr.getProcessor();
+    set_standalone_property(proc, "file_path", filePath);
+    set_standalone_property(proc, "delimiter", ";");
+
+    flow_file_record * new_ff = invoke(proc);
+
+    char uuid_str[37];
+    get_proc_uuid_from_processor(proc, uuid_str);
+    struct processor_params * pp = get_proc_params(uuid_str);
+    return pp;
+}
+
+#endif /* NANOFI_TESTS_CTESTSBASE_H_ */
+#endif


[nifi-minifi-cpp] 01/02: Merge remote-tracking branch 'apache/master'

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 8e40c3fa29d82a89efc995eb0615afd8a7fd92f7
Merge: decf568 86d5d3f
Author: Arpad Boda <ar...@gmail.com>
AuthorDate: Mon Sep 23 00:32:29 2019 +0000

    Merge remote-tracking branch 'apache/master'

 CMakeLists.txt                                     |   2 +
 conf/minifi-log.properties                         |   2 +
 docker/ContainerBuild.sh                           |   4 +
 docker/centos/Dockerfile                           |   2 +-
 docker/test/integration/minifi/__init__.py         |  18 +-
 docker/test/integration/minifi/test/__init__.py    |  53 +++++-
 ...{test_zero_file.py => test_filter_zero_file.py} |   7 +-
 docker/test/integration/test_zero_file.py          |   5 +-
 extensions/civetweb/CMakeLists.txt                 |   1 +
 extensions/civetweb/processors/ListenHTTP.cpp      |  22 +--
 extensions/civetweb/tests/ListenHTTPTests.cpp      |   3 +-
 extensions/http-curl/HTTPCurlLoader.h              |   2 +-
 extensions/http-curl/client/HTTPClient.cpp         |   1 +
 extensions/http-curl/protocols/RESTReceiver.cpp    |   2 +-
 extensions/http-curl/sitetosite/HTTPProtocol.cpp   |  58 +++---
 extensions/http-curl/tests/TestServer.h            |   2 +-
 extensions/librdkafka/CMakeLists.txt               |   1 +
 extensions/opencv/FrameIO.h                        |  77 ++++++++
 extensions/opencv/MotionDetector.cpp               | 203 +++++++++++++++++++++
 extensions/opencv/MotionDetector.h                 |  88 +++++++++
 extensions/opencv/tests/CaptureRTSPFrameTest.cpp   |   5 +-
 libminifi/include/Connection.h                     |  10 +
 libminifi/include/RemoteProcessorGroupPort.h       |   2 +-
 .../include/core/logging/LoggerConfiguration.h     |  33 +++-
 libminifi/include/utils/ClassUtils.h               |  44 +++++
 libminifi/include/utils/HTTPClient.h               |   4 +-
 libminifi/include/utils/StringUtils.h              |   3 +-
 libminifi/src/Connection.cpp                       |   8 +
 libminifi/src/core/FlowConfiguration.cpp           |   2 +-
 libminifi/src/core/logging/LoggerConfiguration.cpp |  15 ++
 libminifi/src/core/yaml/YamlConfiguration.cpp      |   8 +
 libminifi/src/sitetosite/RawSocketProtocol.cpp     |   6 +-
 libminifi/src/sitetosite/SiteToSiteClient.cpp      |  64 ++++---
 libminifi/src/utils/ClassUtils.cpp                 |  58 ++++++
 libminifi/src/utils/HTTPClient.cpp                 |  68 +++----
 libminifi/test/TestBase.h                          |  92 ++++++++--
 libminifi/test/unit/ClassUtilsTests.cpp            |  61 +++++++
 libminifi/test/unit/HTTPUtilTests.cpp              |  67 +++++++
 libminifi/test/unit/LoggerTests.cpp                |  36 +++-
 39 files changed, 962 insertions(+), 177 deletions(-)