You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2019/04/11 16:55:39 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-608: Add Cron example

This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 37a72af  MINIFICPP-608: Add Cron example
37a72af is described below

commit 37a72afb509408bd5a8aca8f544f80bfa2514e5d
Author: Marc Parisi <ph...@apache.org>
AuthorDate: Mon Mar 4 09:08:21 2019 -0500

    MINIFICPP-608: Add Cron example
    
    This closes #495.
    
    Approved on GH by arpadboda
    
    Signed-off-by: Marc Parisi <ph...@apache.org>
---
 CONFIGURE.md                                    |   5 +
 LICENSE                                         |  25 ++++-
 cmake/BuildTests.cmake                          |   4 +
 controller/CMakeLists.txt                       |   2 +-
 extensions/ExtensionHeader.txt                  |   2 +-
 libminifi/CMakeLists.txt                        |   2 +-
 libminifi/include/CronDrivenSchedulingAgent.h   |  74 +++++++++++++++
 libminifi/include/FlowController.h              |   3 +
 libminifi/include/core/ProcessGroup.h           |   5 +-
 libminifi/include/core/Processor.h              |  21 ++++
 libminifi/src/CronDrivenSchedulingAgent.cpp     |  85 +++++++++++++++++
 libminifi/src/FlowController.cpp                |  21 +++-
 libminifi/src/core/ProcessGroup.cpp             |  43 ++++++---
 libminifi/src/core/yaml/YamlConfiguration.cpp   |  12 ++-
 libminifi/test/integration/TailFileCronTest.cpp | 108 +++++++++++++++++++++
 libminifi/test/resources/TestTailFileCron.yml   |  63 ++++++++++++
 main/CMakeLists.txt                             |   2 +-
 nanofi/CMakeLists.txt                           |   4 +-
 thirdparty/cron/Cron.h                          | 121 ++++++++++++++++++++++++
 thirdparty/cron/LICENSE.TXT                     |  21 ++++
 20 files changed, 593 insertions(+), 30 deletions(-)

diff --git a/CONFIGURE.md b/CONFIGURE.md
index 2ad8a57..5e6c31c 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -66,6 +66,11 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc
                 max concurrent tasks: 1
                 Properties:
 
+### Scheduling strategies
+Currently Apache NiFi MiNiFi C++ supports TIMER_DRIVEN, EVENT_DRIVEN, and CRON_DRIVEN. TIMER_DRIVEN uses periods to execute your processor(s) at given intervals.
+The EVENT_DRIVEN strategy awaits for data be available or some other notification mechanism to trigger execution. CRON_DRIVEN executes at the desired intervals
+based on the CRON periods. Apache NiFi MiNiFi C++ supports standard CRON expressions without intervals ( */5 * * * * ). 
+
 ### SiteToSite Security Configuration
 
     in minifi.properties
diff --git a/LICENSE b/LICENSE
index 8f2a2cd..1c05dee 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1663,7 +1663,6 @@ ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
 This project bundles `FindFLEX.cmake` and` FindBISON.cmake` from Kitware, Inc.
 These files are available under a 3-Clause BSD license:
-=============================================================================
  Copyright 2009 Kitware, Inc.
  Copyright 2006 Tristan Carel
  Modified 2010 by Jon Siwek, adding HEADER option
@@ -1704,4 +1703,26 @@ These files are available under a 3-Clause BSD license:
  This software is distributed WITHOUT ANY WARRANTY; without even the
  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  See the License for more information.
-=============================================================================
\ No newline at end of file
+
+This project bundles Borma, which is available under an MIT Licenses:
+MIT License
+
+Copyright (c) 2017 Bosma
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index c57c0b3..7063e65 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -37,6 +37,7 @@ endif()
 
 function(appendIncludes testName)
     target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/catch")
+    target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/cron")
     target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/spdlog-20170710/include")
     target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/yaml-cpp-yaml-cpp-0.5.3/include")
     target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/include")
@@ -99,6 +100,7 @@ SET(TEST_BASE_LIB test_base)
 add_library(${TEST_BASE_LIB} STATIC "${TEST_DIR}/TestBase.cpp")
 target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/catch")
 target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include/")
+target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/cron")
 target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/spdlog-20170710/include")
 target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE "${CIVET_THIRDPARTY_ROOT}/include")
 if(WIN32)
@@ -161,3 +163,5 @@ add_test(NAME SecureSocketGetTCPTest COMMAND SecureSocketGetTCPTest "${TEST_RESO
 
 add_test(NAME TailFileTest COMMAND TailFileTest "${TEST_RESOURCES}/TestTailFile.yml"  "${TEST_RESOURCES}/")
 
+add_test(NAME TailFileCronTest COMMAND TailFileCronTest "${TEST_RESOURCES}/TestTailFileCron.yml"  "${TEST_RESOURCES}/")
+
diff --git a/controller/CMakeLists.txt b/controller/CMakeLists.txt
index 6b6c8bc..370197a 100644
--- a/controller/CMakeLists.txt
+++ b/controller/CMakeLists.txt
@@ -25,7 +25,7 @@ ENDIF(POLICY CMP0048)
 
 
 
-include_directories(../main/ ../libminifi/include  ../libminifi/include/c2  ../libminifi/include/c2/protocols/  ../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics  ../libminifi/include/core/yaml  ../libminifi/include/core  ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ${CIVET_THIRDPARTY_ROOT}/include ../thirdparty/cxxopts/include  ../thirdparty/)
+include_directories(../main/ ../libminifi/include  ../libminifi/include/c2  ../libminifi/include/c2/protocols/  ../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics  ../libminifi/include/core/yaml  ../libminifi/include/core  ../thirdparty/cron ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ${CIVET_THIRDPARTY_ROOT}/include ../thirdparty/cxxopts/include  ../thirdparty/)
 
 
 if(WIN32)
diff --git a/extensions/ExtensionHeader.txt b/extensions/ExtensionHeader.txt
index 920200c..311e561 100644
--- a/extensions/ExtensionHeader.txt
+++ b/extensions/ExtensionHeader.txt
@@ -20,7 +20,7 @@
 cmake_minimum_required(VERSION 2.6)
 
 
-include_directories(../../libminifi/include  ../../libminifi/include/core/yaml  ../../libminifi/include/core  ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/)
+include_directories(../../libminifi/include  ../../libminifi/include/core/yaml  ../../libminifi/include/core  ../../thirdparty/cron  ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/)
 
 if(WIN32)
 	include_directories(../../libminifi/opsys/win)
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 0dc527b..3de1d1a 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -79,7 +79,7 @@ endif()
 
 include_directories(../thirdparty/spdlog-20170710/include)
 include_directories(../thirdparty/yaml-cpp-yaml-cpp-20171024/include)
-
+include_directories(../thirdparty/cron)
 include_directories(../thirdparty/rapidjson-1.1.0/include)
 include_directories(../thirdparty/concurrentqueue/)
 include_directories(include)
diff --git a/libminifi/include/CronDrivenSchedulingAgent.h b/libminifi/include/CronDrivenSchedulingAgent.h
new file mode 100644
index 0000000..0943570
--- /dev/null
+++ b/libminifi/include/CronDrivenSchedulingAgent.h
@@ -0,0 +1,74 @@
+/**
+ * @file CronDrivenSchedulingAgent.h
+ * CronDrivenSchedulingAgent class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CRON_DRIVEN_SCHEDULING_AGENT_H__
+#define CRON_DRIVEN_SCHEDULING_AGENT_H__
+
+#include "core/logging/Logger.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSessionFactory.h"
+#include "ThreadedSchedulingAgent.h"
+#include <chrono>
+
+#include "Cron.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+// CronDrivenSchedulingAgent Class
+class CronDrivenSchedulingAgent : public ThreadedSchedulingAgent {
+ public:
+  // Constructor
+  /*!
+   * Create a new event driven scheduling agent.
+   */
+  CronDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
+                            std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
+      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration) {
+  }
+  // Destructor
+  virtual ~CronDrivenSchedulingAgent() {
+  }
+  // Run function for the thread
+  uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
+
+  virtual void stop() {
+    std::lock_guard<std::mutex> locK(mutex_);
+    schedules_.clear();
+    last_exec_.clear();
+  }
+
+ private:
+  std::mutex mutex_;
+  std::map<std::string, Bosma::Cron> schedules_;
+  std::map<std::string, std::chrono::system_clock::time_point> last_exec_;
+  // Prevent default copy constructor and assignment operation
+  // Only support pass by reference or pointer
+  CronDrivenSchedulingAgent(const CronDrivenSchedulingAgent &parent);
+  CronDrivenSchedulingAgent &operator=(const CronDrivenSchedulingAgent &parent);
+
+};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 8a5c7d3..aa3da22 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -43,6 +43,7 @@
 #include "core/controller/ControllerServiceProvider.h"
 #include "TimerDrivenSchedulingAgent.h"
 #include "EventDrivenSchedulingAgent.h"
+#include "CronDrivenSchedulingAgent.h"
 #include "FlowControlProtocol.h"
 #include "core/Property.h"
 #include "core/state/nodes/MetricsBase.h"
@@ -385,6 +386,8 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
   std::shared_ptr<TimerDrivenSchedulingAgent> timer_scheduler_;
   // Flow Event Scheduler
   std::shared_ptr<EventDrivenSchedulingAgent> event_scheduler_;
+  // Cron Schedule
+  std::shared_ptr<CronDrivenSchedulingAgent> cron_scheduler_;
   // Controller Service
   // Config
   // Site to Site Server Listener
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index 8c2c469..6341523 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -31,6 +31,7 @@
 #include "Exception.h"
 #include "TimerDrivenSchedulingAgent.h"
 #include "EventDrivenSchedulingAgent.h"
+#include "CronDrivenSchedulingAgent.h"
 #include "core/logging/Logger.h"
 #include "controller/ControllerServiceNode.h"
 #include "controller/ControllerServiceMap.h"
@@ -160,9 +161,9 @@ class ProcessGroup {
     return config_version_;
   }
   // Start Processing
-  void startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler);
+  void startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler);
   // Stop Processing
-  void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler);
+  void stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler);
   // Whether it is root process group
   bool isRootProcessGroup();
   // set parent process group
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index a280031..ee16b62 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -109,6 +109,25 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
   uint64_t getSchedulingPeriodNano(void) {
     return scheduling_period_nano_;
   }
+
+
+  /**
+   * Sets the cron period
+   * @param period cron period.
+   */
+  void setCronPeriod(const std::string &period) {
+    cron_period_ = period;
+  }
+
+  /**
+   * Returns the cron period
+   * @return cron period
+   */
+  const std::string getCronPeriod() const {
+    return cron_period_;
+  }
+
+
   // Set Processor Run Duration in Nano Second
   void setRunDurationNano(uint64_t period) {
     run_duration_nano_ = period;
@@ -271,6 +290,8 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
   // Trigger the Processor even if the incoming connection is empty
   std::atomic<bool> _triggerWhenEmpty;
 
+  std::string cron_period_;
+
  private:
 
   // Mutex for protection
diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp b/libminifi/src/CronDrivenSchedulingAgent.cpp
new file mode 100644
index 0000000..41ffa96
--- /dev/null
+++ b/libminifi/src/CronDrivenSchedulingAgent.cpp
@@ -0,0 +1,85 @@
+/**
+ * @file CronDrivenSchedulingAgent.cpp
+ * CronDrivenSchedulingAgent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "CronDrivenSchedulingAgent.h"
+#include <chrono>
+#include <memory>
+#include <thread>
+#include <iostream>
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/Property.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+uint64_t CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+                                        const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  if (this->running_ && processor->isRunning()) {
+    std::chrono::system_clock::time_point leap_nanos;
+    auto uuidStr = processor->getUUIDStr();
+    std::chrono::system_clock::time_point result;
+    std::chrono::system_clock::time_point from = std::chrono::system_clock::now();
+    {
+      std::lock_guard<std::mutex> locK(mutex_);
+
+      auto sched_f = schedules_.find(uuidStr);
+      if (sched_f != std::end(schedules_)) {
+        result = last_exec_[uuidStr];
+        if (from >= result) {
+          result = sched_f->second.cron_to_next(from);
+          last_exec_[uuidStr] = result;
+        } else {
+          // we may be woken up a little early so that we can honor our time.
+          // in this case we can return the next time to run with the expectation
+          // that the wakeup mechanism gets more granular.
+          return std::chrono::duration_cast<std::chrono::milliseconds>(result - from).count();
+        }
+      } else {
+        Bosma::Cron schedule(processor->getCronPeriod());
+        result = schedule.cron_to_next(from);
+        last_exec_[uuidStr] = result;
+        schedules_.insert(std::make_pair(uuidStr, schedule));
+      }
+    }
+
+    if (result > from) {
+      bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
+
+      if (processor->isYield()) {
+        // Honor the yield
+        return processor->getYieldTime();
+      } else if (shouldYield && this->bored_yield_duration_ > 0) {
+        // No work to do or need to apply back pressure
+        return this->bored_yield_duration_;
+      }
+    }
+    auto sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(result - from).count();
+    return sleep_time;
+  }
+  return 0;
+}
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 0e55f67..9de7822 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -86,6 +86,7 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
       controller_service_map_(std::make_shared<core::controller::ControllerServiceMap>()),
       timer_scheduler_(nullptr),
       event_scheduler_(nullptr),
+      cron_scheduler_(nullptr),
       controller_service_provider_(nullptr),
       flow_configuration_(std::move(flow_configuration)),
       configuration_(configure),
@@ -239,12 +240,13 @@ int16_t FlowController::stop(bool force, uint64_t timeToWait) {
     // immediately indicate that we are not running
     logger_->log_info("Stop Flow Controller");
     if (this->root_)
-      this->root_->stopProcessing(this->timer_scheduler_.get(), this->event_scheduler_.get());
+      this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_);
     this->flow_file_repo_->stop();
     this->provenance_repo_->stop();
     // stop after we've attempted to stop the processors.
     this->timer_scheduler_->stop();
     this->event_scheduler_->stop();
+    this->cron_scheduler_->stop();
     running_ = false;
   }
   return 0;
@@ -322,6 +324,12 @@ void FlowController::load(const std::shared_ptr<core::ProcessGroup> &root, bool
           configuration_);
     }
 
+    if (nullptr == cron_scheduler_ || reload) {
+      cron_scheduler_ = std::make_shared<CronDrivenSchedulingAgent>(
+          std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_,
+          configuration_);
+    }
+
     std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_);
     std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setSchedulingAgent(
         std::static_pointer_cast<minifi::SchedulingAgent>(event_scheduler_));
@@ -378,10 +386,11 @@ int16_t FlowController::start() {
       controller_service_provider_->enableAllControllerServices();
       this->timer_scheduler_->start();
       this->event_scheduler_->start();
+      this->cron_scheduler_->start();
 
       if (this->root_ != nullptr) {
         start_time_ = std::chrono::steady_clock::now();
-        this->root_->startProcessing(this->timer_scheduler_.get(), this->event_scheduler_.get());
+        this->root_->startProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_);
       }
       initializeC2();
       running_ = true;
@@ -936,6 +945,9 @@ std::vector<std::shared_ptr<state::StateController>> FlowController::getAllCompo
         case core::SchedulingStrategy::EVENT_DRIVEN:
           vec.push_back(std::make_shared<state::ProcessorController>(processor, event_scheduler_));
           break;
+        case core::SchedulingStrategy::CRON_DRIVEN:
+          vec.push_back(std::make_shared<state::ProcessorController>(processor, cron_scheduler_));
+          break;
         default:
           break;
       }
@@ -959,6 +971,9 @@ std::vector<std::shared_ptr<state::StateController>> FlowController::getComponen
         case core::SchedulingStrategy::EVENT_DRIVEN:
           vec.push_back(std::make_shared<state::ProcessorController>(processor, event_scheduler_));
           break;
+        case core::SchedulingStrategy::CRON_DRIVEN:
+          vec.push_back(std::make_shared<state::ProcessorController>(processor, cron_scheduler_));
+          break;
         default:
           break;
       }
@@ -980,6 +995,8 @@ std::vector<BackTrace> FlowController::getTraces() {
   traces.insert(traces.end(), std::make_move_iterator(timer_driven.begin()), std::make_move_iterator(timer_driven.end()));
   auto event_driven = event_scheduler_->getTraces();
   traces.insert(traces.end(), std::make_move_iterator(event_driven.begin()), std::make_move_iterator(event_driven.end()));
+  auto cron_driven = cron_scheduler_->getTraces();
+  traces.insert(traces.end(), std::make_move_iterator(cron_driven.begin()), std::make_move_iterator(cron_driven.end()));
   // repositories
   auto prov_repo_trace = provenance_repo_->getTraces();
   traces.emplace_back(std::move(prov_repo_trace));
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 7df1b09..abc59f4 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -136,24 +136,29 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
   }
 }
 
-void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) {
+void ProcessGroup::startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
+                                   const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   try {
     // Start all the processor node, input and output ports
-    for (auto processor : processors_) {
+    for (const auto &processor : processors_) {
       logger_->log_debug("Starting %s", processor->getName());
-
-      if (!processor->isRunning() && processor->getScheduledState() != DISABLED) {
-        if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
+      switch (processor->getSchedulingStrategy()) {
+        case TIMER_DRIVEN:
           timeScheduler->schedule(processor);
-        else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
+          break;
+        case EVENT_DRIVEN:
           eventScheduler->schedule(processor);
+          break;
+        case CRON_DRIVEN:
+          cronScheduler->schedule(processor);
+          break;
       }
     }
     // Start processing the group
     for (auto processGroup : child_process_groups_) {
-      processGroup->startProcessing(timeScheduler, eventScheduler);
+      processGroup->startProcessing(timeScheduler, eventScheduler, cronScheduler);
     }
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
@@ -164,22 +169,30 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, Ev
   }
 }
 
-void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) {
+void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
+                                  const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   try {
     // Stop all the processor node, input and output ports
-    for (std::set<std::shared_ptr<Processor> >::iterator it = processors_.begin(); it != processors_.end(); ++it) {
-      std::shared_ptr<Processor> processor(*it);
-      if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
-        timeScheduler->unschedule(processor);
-      else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
-        eventScheduler->unschedule(processor);
+    for (const auto &processor : processors_) {
+      logger_->log_debug("Stopping %s", processor->getName());
+      switch (processor->getSchedulingStrategy()) {
+        case TIMER_DRIVEN:
+          timeScheduler->unschedule(processor);
+          break;
+        case EVENT_DRIVEN:
+          eventScheduler->unschedule(processor);
+          break;
+        case CRON_DRIVEN:
+          cronScheduler->unschedule(processor);
+          break;
+      }
     }
 
     for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); it != child_process_groups_.end(); ++it) {
       ProcessGroup *processGroup(*it);
-      processGroup->stopProcessing(timeScheduler, eventScheduler);
+      processGroup->stopProcessing(timeScheduler, eventScheduler, cronScheduler);
     }
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 5ebc74e..756b5af 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -175,10 +175,16 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
         }
 
         // Take care of scheduling
+
         core::TimeUnit unit;
-        if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
-          logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%ll] ns", schedulingPeriod);
-          processor->setSchedulingPeriodNano(schedulingPeriod);
+
+        if (procCfg.schedulingStrategy == "TIMER_DRIVEN" || procCfg.schedulingStrategy == "EVENT_DRIVEN") {
+          if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+            logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%ll] ns", schedulingPeriod);
+            processor->setSchedulingPeriodNano(schedulingPeriod);
+          }
+        } else {
+          processor->setCronPeriod(procCfg.schedulingPeriod);
         }
 
         if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
diff --git a/libminifi/test/integration/TailFileCronTest.cpp b/libminifi/test/integration/TailFileCronTest.cpp
new file mode 100644
index 0000000..b3c2f31
--- /dev/null
+++ b/libminifi/test/integration/TailFileCronTest.cpp
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "../TestBase.h"
+#include "processors/TailFile.h"
+#include "processors/LogAttribute.h"
+#include "state/ProcessorController.h"
+#include "IntegrationBase.h"
+
+class TailFileTestHarness : public IntegrationBase {
+ public:
+  TailFileTestHarness() {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+
+    statefile = dir;
+    statefile += "/statefile";
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "Lin\\e1\nli\\nen\nli\\ne3\nli\\ne4\nli\\ne5\n";
+    file.close();
+  }
+
+  void testSetup() {
+    LogTestController::getInstance().setInfo<minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+    LogTestController::getInstance().setInfo<minifi::FlowController>();
+    LogTestController::getInstance().setTrace<core::ProcessSession>();
+    LogTestController::getInstance().setDebug<core::ConfigurableComponent>();
+  }
+
+  virtual void cleanup() {
+    unlink(ss.str().c_str());
+    unlink(statefile.c_str());
+  }
+
+  virtual void runAssertions() {
+    assert(LogTestController::getInstance().contains("5 flowfiles were received from TailFile input") == true);
+    assert(LogTestController::getInstance().contains("Looking for delimiter 0xA") == true);
+    assert(LogTestController::getInstance().contains("li\\ne5") == true);
+  }
+
+ protected:
+  virtual void updateProperties(std::shared_ptr<minifi::FlowController> fc) {
+    for (auto &comp : fc->getComponents("tf")) {
+      std::shared_ptr<minifi::state::ProcessorController> proc = std::dynamic_pointer_cast<minifi::state::ProcessorController>(comp);
+      if (nullptr != proc) {
+        proc->getProcessor()->setProperty(minifi::processors::TailFile::FileName, ss.str());
+        proc->getProcessor()->setProperty(minifi::processors::TailFile::StateFile, statefile);
+      }
+    }
+  }
+
+  std::string statefile;
+  char *dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+  }
+
+  TailFileTestHarness harness;
+
+  harness.run(test_file_location);
+
+  return 0;
+}
diff --git a/libminifi/test/resources/TestTailFileCron.yml b/libminifi/test/resources/TestTailFileCron.yml
new file mode 100644
index 0000000..dd30937
--- /dev/null
+++ b/libminifi/test/resources/TestTailFileCron.yml
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+    name: MiNiFi Flow
+    id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+    - name: tf
+      id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      class: org.apache.nifi.processors.standard.TailFile
+      max concurrent tasks: 1
+      scheduling strategy: CRON_DRIVEN
+      scheduling period: "* * * * *"
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+        Input Delimiter: \n
+    - name: la
+      id: 2438e3c8-015a-1000-79ca-83af40ec1995
+      class: org.apache.nifi.processors.standard.LogAttribute
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 500 msec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list: 
+       - success
+      Properties:
+        Log Payload: true
+
+Connections:
+    - name: tr1
+      id: 2438e3c8-015a-1000-79ca-83af40ec1997
+      source name: tf
+      source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      source relationship name: success
+      destination name: la
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1995
+      source relationship name: success
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec      
+
+Remote Processing Groups:
+    
diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt
index b8aef83..49361c6 100644
--- a/main/CMakeLists.txt
+++ b/main/CMakeLists.txt
@@ -23,7 +23,7 @@ IF(POLICY CMP0048)
   CMAKE_POLICY(SET CMP0048 OLD)
 ENDIF(POLICY CMP0048)
 
-include_directories(../libminifi/include ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../thirdparty/rapidjson-1.1.0/include ../thirdparty/)
+include_directories(../libminifi/include ../thirdparty/cron ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../thirdparty/rapidjson-1.1.0/include ../thirdparty/)
 include_directories(${JEMALLOC_INCLUDE_DIRS})
 
 if(WIN32)
diff --git a/nanofi/CMakeLists.txt b/nanofi/CMakeLists.txt
index 14b4a56..059ab55 100644
--- a/nanofi/CMakeLists.txt
+++ b/nanofi/CMakeLists.txt
@@ -24,7 +24,7 @@ IF(POLICY CMP0048)
 ENDIF(POLICY CMP0048)
 
 include_directories(include)
-include_directories(../libminifi/include ../thirdparty/spdlog-20170710/include)
+include_directories(../libminifi/include ../thirdparty/cron ../thirdparty/spdlog-20170710/include)
 include_directories(../thirdparty/ut)
 
 if(WIN32)
@@ -98,4 +98,4 @@ endif(ENABLE_PYTHON AND NOT STATIC_BUILD)
 
 if (NOT DISABLE_CURL)
 add_subdirectory(examples)
-endif()
\ No newline at end of file
+endif()
diff --git a/thirdparty/cron/Cron.h b/thirdparty/cron/Cron.h
new file mode 100644
index 0000000..6992d47
--- /dev/null
+++ b/thirdparty/cron/Cron.h
@@ -0,0 +1,121 @@
+#include <chrono>
+#include <string>
+#include <sstream>
+#include <vector>
+#include <iterator>
+
+namespace Bosma {
+    using Clock = std::chrono::system_clock;
+
+    inline void add(std::tm &tm, Clock::duration time) {
+      auto tp = Clock::from_time_t(std::mktime(&tm));
+      auto tp_adjusted = tp + time;
+      auto tm_adjusted = Clock::to_time_t(tp_adjusted);
+      tm = *std::localtime(&tm_adjusted);
+    }
+
+    class BadCronExpression : public std::exception {
+    public:
+        explicit BadCronExpression(std::string msg) : msg_(std::move(msg)) {}
+
+        const char *what() const noexcept override { return (msg_.c_str()); }
+
+    private:
+        std::string msg_;
+    };
+
+    inline void
+    verify_and_set(const std::string &token, const std::string &expression, int &field, const int lower_bound,
+                   const int upper_bound, const bool adjust = false) {
+      if (token == "*")
+        field = -1;
+      else {
+        try {
+          field = std::stoi(token);
+        } catch (const std::invalid_argument &) {
+          throw BadCronExpression("malformed cron string (`" + token + "` not an integer or *): " + expression);
+        } catch (const std::out_of_range &) {
+          throw BadCronExpression("malformed cron string (`" + token + "` not convertable to int): " + expression);
+        }
+        if (field < lower_bound || field > upper_bound) {
+          std::ostringstream oss;
+          oss << "malformed cron string ('" << token << "' must be <= " << upper_bound << " and >= " << lower_bound
+              << "): " << expression;
+          throw BadCronExpression(oss.str());
+        }
+        if (adjust)
+          field--;
+      }
+    }
+
+    class Cron {
+    public:
+        explicit Cron(const std::string &expression) {
+          std::istringstream iss(expression);
+          std::vector<std::string> tokens{std::istream_iterator<std::string>{iss},
+                                          std::istream_iterator<std::string>{}};
+
+          if (tokens.size() != 5) throw BadCronExpression("malformed cron string (must be 5 fields): " + expression);
+
+          verify_and_set(tokens[0], expression, minute, 0, 59);
+          verify_and_set(tokens[1], expression, hour, 0, 23);
+          verify_and_set(tokens[2], expression, day, 1, 31);
+          verify_and_set(tokens[3], expression, month, 1, 12, true);
+          verify_and_set(tokens[4], expression, day_of_week, 0, 6);
+        }
+
+        // http://stackoverflow.com/a/322058/1284550
+        Clock::time_point cron_to_next(const Clock::time_point from = Clock::now()) const {
+          // get current time as a tm object
+          auto now = Clock::to_time_t(from);
+          std::tm next(*std::localtime(&now));
+          // it will always at least run the next minute
+          next.tm_sec = 0;
+          add(next, std::chrono::minutes(1));
+          while (true) {
+            if (month != -1 && next.tm_mon != month) {
+              // add a month
+              // if this will bring us over a year, increment the year instead and reset the month
+              if (next.tm_mon + 1 > 11) {
+                next.tm_mon = 0;
+                next.tm_year++;
+              } else
+                next.tm_mon++;
+
+              next.tm_mday = 1;
+              next.tm_hour = 0;
+              next.tm_min = 0;
+              continue;
+            }
+            if (day != -1 && next.tm_mday != day) {
+              add(next, std::chrono::hours(24));
+              next.tm_hour = 0;
+              next.tm_min = 0;
+              continue;
+            }
+            if (day_of_week != -1 && next.tm_wday != day_of_week) {
+              add(next, std::chrono::hours(24));
+              next.tm_hour = 0;
+              next.tm_min = 0;
+              continue;
+            }
+            if (hour != -1 && next.tm_hour != hour) {
+              add(next, std::chrono::hours(1));
+              next.tm_min = 0;
+              continue;
+            }
+            if (minute != -1 && next.tm_min != minute) {
+              add(next, std::chrono::minutes(1));
+              continue;
+            }
+            break;
+          }
+
+          // telling mktime to figure out dst
+          next.tm_isdst = -1;
+          return Clock::from_time_t(std::mktime(&next));
+        }
+
+        int minute, hour, day, month, day_of_week;
+    };
+}
diff --git a/thirdparty/cron/LICENSE.TXT b/thirdparty/cron/LICENSE.TXT
new file mode 100644
index 0000000..8e86f89
--- /dev/null
+++ b/thirdparty/cron/LICENSE.TXT
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2017 Bosma
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file