You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2019/04/11 16:55:39 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-608: Add Cron
example
This is an automated email from the ASF dual-hosted git repository.
phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 37a72af MINIFICPP-608: Add Cron example
37a72af is described below
commit 37a72afb509408bd5a8aca8f544f80bfa2514e5d
Author: Marc Parisi <ph...@apache.org>
AuthorDate: Mon Mar 4 09:08:21 2019 -0500
MINIFICPP-608: Add Cron example
This closes #495.
Approved on GH by arpadboda
Signed-off-by: Marc Parisi <ph...@apache.org>
---
CONFIGURE.md | 5 +
LICENSE | 25 ++++-
cmake/BuildTests.cmake | 4 +
controller/CMakeLists.txt | 2 +-
extensions/ExtensionHeader.txt | 2 +-
libminifi/CMakeLists.txt | 2 +-
libminifi/include/CronDrivenSchedulingAgent.h | 74 +++++++++++++++
libminifi/include/FlowController.h | 3 +
libminifi/include/core/ProcessGroup.h | 5 +-
libminifi/include/core/Processor.h | 21 ++++
libminifi/src/CronDrivenSchedulingAgent.cpp | 85 +++++++++++++++++
libminifi/src/FlowController.cpp | 21 +++-
libminifi/src/core/ProcessGroup.cpp | 43 ++++++---
libminifi/src/core/yaml/YamlConfiguration.cpp | 12 ++-
libminifi/test/integration/TailFileCronTest.cpp | 108 +++++++++++++++++++++
libminifi/test/resources/TestTailFileCron.yml | 63 ++++++++++++
main/CMakeLists.txt | 2 +-
nanofi/CMakeLists.txt | 4 +-
thirdparty/cron/Cron.h | 121 ++++++++++++++++++++++++
thirdparty/cron/LICENSE.TXT | 21 ++++
20 files changed, 593 insertions(+), 30 deletions(-)
diff --git a/CONFIGURE.md b/CONFIGURE.md
index 2ad8a57..5e6c31c 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -66,6 +66,11 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc
max concurrent tasks: 1
Properties:
+### Scheduling strategies
+Currently Apache NiFi MiNiFi C++ supports TIMER_DRIVEN, EVENT_DRIVEN, and CRON_DRIVEN. TIMER_DRIVEN uses periods to execute your processor(s) at given intervals.
+The EVENT_DRIVEN strategy awaits for data be available or some other notification mechanism to trigger execution. CRON_DRIVEN executes at the desired intervals
+based on the CRON periods. Apache NiFi MiNiFi C++ supports standard CRON expressions without intervals ( */5 * * * * ).
+
### SiteToSite Security Configuration
in minifi.properties
diff --git a/LICENSE b/LICENSE
index 8f2a2cd..1c05dee 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1663,7 +1663,6 @@ ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
This project bundles `FindFLEX.cmake` and` FindBISON.cmake` from Kitware, Inc.
These files are available under a 3-Clause BSD license:
-=============================================================================
Copyright 2009 Kitware, Inc.
Copyright 2006 Tristan Carel
Modified 2010 by Jon Siwek, adding HEADER option
@@ -1704,4 +1703,26 @@ These files are available under a 3-Clause BSD license:
This software is distributed WITHOUT ANY WARRANTY; without even the
implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the License for more information.
-=============================================================================
\ No newline at end of file
+
+This project bundles Borma, which is available under an MIT Licenses:
+MIT License
+
+Copyright (c) 2017 Bosma
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index c57c0b3..7063e65 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -37,6 +37,7 @@ endif()
function(appendIncludes testName)
target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/catch")
+ target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/cron")
target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/spdlog-20170710/include")
target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/yaml-cpp-yaml-cpp-0.5.3/include")
target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/include")
@@ -99,6 +100,7 @@ SET(TEST_BASE_LIB test_base)
add_library(${TEST_BASE_LIB} STATIC "${TEST_DIR}/TestBase.cpp")
target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/catch")
target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include/")
+target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/cron")
target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/spdlog-20170710/include")
target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE "${CIVET_THIRDPARTY_ROOT}/include")
if(WIN32)
@@ -161,3 +163,5 @@ add_test(NAME SecureSocketGetTCPTest COMMAND SecureSocketGetTCPTest "${TEST_RESO
add_test(NAME TailFileTest COMMAND TailFileTest "${TEST_RESOURCES}/TestTailFile.yml" "${TEST_RESOURCES}/")
+add_test(NAME TailFileCronTest COMMAND TailFileCronTest "${TEST_RESOURCES}/TestTailFileCron.yml" "${TEST_RESOURCES}/")
+
diff --git a/controller/CMakeLists.txt b/controller/CMakeLists.txt
index 6b6c8bc..370197a 100644
--- a/controller/CMakeLists.txt
+++ b/controller/CMakeLists.txt
@@ -25,7 +25,7 @@ ENDIF(POLICY CMP0048)
-include_directories(../main/ ../libminifi/include ../libminifi/include/c2 ../libminifi/include/c2/protocols/ ../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../libminifi/include/core/yaml ../libminifi/include/core ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ${CIVET_THIRDPARTY_ROOT}/include ../thirdparty/cxxopts/include ../thirdparty/)
+include_directories(../main/ ../libminifi/include ../libminifi/include/c2 ../libminifi/include/c2/protocols/ ../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../libminifi/include/core/yaml ../libminifi/include/core ../thirdparty/cron ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ${CIVET_THIRDPARTY_ROOT}/include ../thirdparty/cxxopts/include ../thirdparty/)
if(WIN32)
diff --git a/extensions/ExtensionHeader.txt b/extensions/ExtensionHeader.txt
index 920200c..311e561 100644
--- a/extensions/ExtensionHeader.txt
+++ b/extensions/ExtensionHeader.txt
@@ -20,7 +20,7 @@
cmake_minimum_required(VERSION 2.6)
-include_directories(../../libminifi/include ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/)
+include_directories(../../libminifi/include ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/cron ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/)
if(WIN32)
include_directories(../../libminifi/opsys/win)
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 0dc527b..3de1d1a 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -79,7 +79,7 @@ endif()
include_directories(../thirdparty/spdlog-20170710/include)
include_directories(../thirdparty/yaml-cpp-yaml-cpp-20171024/include)
-
+include_directories(../thirdparty/cron)
include_directories(../thirdparty/rapidjson-1.1.0/include)
include_directories(../thirdparty/concurrentqueue/)
include_directories(include)
diff --git a/libminifi/include/CronDrivenSchedulingAgent.h b/libminifi/include/CronDrivenSchedulingAgent.h
new file mode 100644
index 0000000..0943570
--- /dev/null
+++ b/libminifi/include/CronDrivenSchedulingAgent.h
@@ -0,0 +1,74 @@
+/**
+ * @file CronDrivenSchedulingAgent.h
+ * CronDrivenSchedulingAgent class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CRON_DRIVEN_SCHEDULING_AGENT_H__
+#define CRON_DRIVEN_SCHEDULING_AGENT_H__
+
+#include "core/logging/Logger.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSessionFactory.h"
+#include "ThreadedSchedulingAgent.h"
+#include <chrono>
+
+#include "Cron.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+// CronDrivenSchedulingAgent Class
+class CronDrivenSchedulingAgent : public ThreadedSchedulingAgent {
+ public:
+ // Constructor
+ /*!
+ * Create a new event driven scheduling agent.
+ */
+ CronDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
+ std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
+ : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration) {
+ }
+ // Destructor
+ virtual ~CronDrivenSchedulingAgent() {
+ }
+ // Run function for the thread
+ uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
+
+ virtual void stop() {
+ std::lock_guard<std::mutex> locK(mutex_);
+ schedules_.clear();
+ last_exec_.clear();
+ }
+
+ private:
+ std::mutex mutex_;
+ std::map<std::string, Bosma::Cron> schedules_;
+ std::map<std::string, std::chrono::system_clock::time_point> last_exec_;
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ CronDrivenSchedulingAgent(const CronDrivenSchedulingAgent &parent);
+ CronDrivenSchedulingAgent &operator=(const CronDrivenSchedulingAgent &parent);
+
+};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 8a5c7d3..aa3da22 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -43,6 +43,7 @@
#include "core/controller/ControllerServiceProvider.h"
#include "TimerDrivenSchedulingAgent.h"
#include "EventDrivenSchedulingAgent.h"
+#include "CronDrivenSchedulingAgent.h"
#include "FlowControlProtocol.h"
#include "core/Property.h"
#include "core/state/nodes/MetricsBase.h"
@@ -385,6 +386,8 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
std::shared_ptr<TimerDrivenSchedulingAgent> timer_scheduler_;
// Flow Event Scheduler
std::shared_ptr<EventDrivenSchedulingAgent> event_scheduler_;
+ // Cron Schedule
+ std::shared_ptr<CronDrivenSchedulingAgent> cron_scheduler_;
// Controller Service
// Config
// Site to Site Server Listener
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index 8c2c469..6341523 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -31,6 +31,7 @@
#include "Exception.h"
#include "TimerDrivenSchedulingAgent.h"
#include "EventDrivenSchedulingAgent.h"
+#include "CronDrivenSchedulingAgent.h"
#include "core/logging/Logger.h"
#include "controller/ControllerServiceNode.h"
#include "controller/ControllerServiceMap.h"
@@ -160,9 +161,9 @@ class ProcessGroup {
return config_version_;
}
// Start Processing
- void startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler);
+ void startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler);
// Stop Processing
- void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler);
+ void stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler);
// Whether it is root process group
bool isRootProcessGroup();
// set parent process group
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index a280031..ee16b62 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -109,6 +109,25 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
uint64_t getSchedulingPeriodNano(void) {
return scheduling_period_nano_;
}
+
+
+ /**
+ * Sets the cron period
+ * @param period cron period.
+ */
+ void setCronPeriod(const std::string &period) {
+ cron_period_ = period;
+ }
+
+ /**
+ * Returns the cron period
+ * @return cron period
+ */
+ const std::string getCronPeriod() const {
+ return cron_period_;
+ }
+
+
// Set Processor Run Duration in Nano Second
void setRunDurationNano(uint64_t period) {
run_duration_nano_ = period;
@@ -271,6 +290,8 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
// Trigger the Processor even if the incoming connection is empty
std::atomic<bool> _triggerWhenEmpty;
+ std::string cron_period_;
+
private:
// Mutex for protection
diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp b/libminifi/src/CronDrivenSchedulingAgent.cpp
new file mode 100644
index 0000000..41ffa96
--- /dev/null
+++ b/libminifi/src/CronDrivenSchedulingAgent.cpp
@@ -0,0 +1,85 @@
+/**
+ * @file CronDrivenSchedulingAgent.cpp
+ * CronDrivenSchedulingAgent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "CronDrivenSchedulingAgent.h"
+#include <chrono>
+#include <memory>
+#include <thread>
+#include <iostream>
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/Property.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+uint64_t CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+ const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+ if (this->running_ && processor->isRunning()) {
+ std::chrono::system_clock::time_point leap_nanos;
+ auto uuidStr = processor->getUUIDStr();
+ std::chrono::system_clock::time_point result;
+ std::chrono::system_clock::time_point from = std::chrono::system_clock::now();
+ {
+ std::lock_guard<std::mutex> locK(mutex_);
+
+ auto sched_f = schedules_.find(uuidStr);
+ if (sched_f != std::end(schedules_)) {
+ result = last_exec_[uuidStr];
+ if (from >= result) {
+ result = sched_f->second.cron_to_next(from);
+ last_exec_[uuidStr] = result;
+ } else {
+ // we may be woken up a little early so that we can honor our time.
+ // in this case we can return the next time to run with the expectation
+ // that the wakeup mechanism gets more granular.
+ return std::chrono::duration_cast<std::chrono::milliseconds>(result - from).count();
+ }
+ } else {
+ Bosma::Cron schedule(processor->getCronPeriod());
+ result = schedule.cron_to_next(from);
+ last_exec_[uuidStr] = result;
+ schedules_.insert(std::make_pair(uuidStr, schedule));
+ }
+ }
+
+ if (result > from) {
+ bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
+
+ if (processor->isYield()) {
+ // Honor the yield
+ return processor->getYieldTime();
+ } else if (shouldYield && this->bored_yield_duration_ > 0) {
+ // No work to do or need to apply back pressure
+ return this->bored_yield_duration_;
+ }
+ }
+ auto sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(result - from).count();
+ return sleep_time;
+ }
+ return 0;
+}
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 0e55f67..9de7822 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -86,6 +86,7 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
controller_service_map_(std::make_shared<core::controller::ControllerServiceMap>()),
timer_scheduler_(nullptr),
event_scheduler_(nullptr),
+ cron_scheduler_(nullptr),
controller_service_provider_(nullptr),
flow_configuration_(std::move(flow_configuration)),
configuration_(configure),
@@ -239,12 +240,13 @@ int16_t FlowController::stop(bool force, uint64_t timeToWait) {
// immediately indicate that we are not running
logger_->log_info("Stop Flow Controller");
if (this->root_)
- this->root_->stopProcessing(this->timer_scheduler_.get(), this->event_scheduler_.get());
+ this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_);
this->flow_file_repo_->stop();
this->provenance_repo_->stop();
// stop after we've attempted to stop the processors.
this->timer_scheduler_->stop();
this->event_scheduler_->stop();
+ this->cron_scheduler_->stop();
running_ = false;
}
return 0;
@@ -322,6 +324,12 @@ void FlowController::load(const std::shared_ptr<core::ProcessGroup> &root, bool
configuration_);
}
+ if (nullptr == cron_scheduler_ || reload) {
+ cron_scheduler_ = std::make_shared<CronDrivenSchedulingAgent>(
+ std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_,
+ configuration_);
+ }
+
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_);
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setSchedulingAgent(
std::static_pointer_cast<minifi::SchedulingAgent>(event_scheduler_));
@@ -378,10 +386,11 @@ int16_t FlowController::start() {
controller_service_provider_->enableAllControllerServices();
this->timer_scheduler_->start();
this->event_scheduler_->start();
+ this->cron_scheduler_->start();
if (this->root_ != nullptr) {
start_time_ = std::chrono::steady_clock::now();
- this->root_->startProcessing(this->timer_scheduler_.get(), this->event_scheduler_.get());
+ this->root_->startProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_);
}
initializeC2();
running_ = true;
@@ -936,6 +945,9 @@ std::vector<std::shared_ptr<state::StateController>> FlowController::getAllCompo
case core::SchedulingStrategy::EVENT_DRIVEN:
vec.push_back(std::make_shared<state::ProcessorController>(processor, event_scheduler_));
break;
+ case core::SchedulingStrategy::CRON_DRIVEN:
+ vec.push_back(std::make_shared<state::ProcessorController>(processor, cron_scheduler_));
+ break;
default:
break;
}
@@ -959,6 +971,9 @@ std::vector<std::shared_ptr<state::StateController>> FlowController::getComponen
case core::SchedulingStrategy::EVENT_DRIVEN:
vec.push_back(std::make_shared<state::ProcessorController>(processor, event_scheduler_));
break;
+ case core::SchedulingStrategy::CRON_DRIVEN:
+ vec.push_back(std::make_shared<state::ProcessorController>(processor, cron_scheduler_));
+ break;
default:
break;
}
@@ -980,6 +995,8 @@ std::vector<BackTrace> FlowController::getTraces() {
traces.insert(traces.end(), std::make_move_iterator(timer_driven.begin()), std::make_move_iterator(timer_driven.end()));
auto event_driven = event_scheduler_->getTraces();
traces.insert(traces.end(), std::make_move_iterator(event_driven.begin()), std::make_move_iterator(event_driven.end()));
+ auto cron_driven = cron_scheduler_->getTraces();
+ traces.insert(traces.end(), std::make_move_iterator(cron_driven.begin()), std::make_move_iterator(cron_driven.end()));
// repositories
auto prov_repo_trace = provenance_repo_->getTraces();
traces.emplace_back(std::move(prov_repo_trace));
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 7df1b09..abc59f4 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -136,24 +136,29 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
}
}
-void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) {
+void ProcessGroup::startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
+ const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
try {
// Start all the processor node, input and output ports
- for (auto processor : processors_) {
+ for (const auto &processor : processors_) {
logger_->log_debug("Starting %s", processor->getName());
-
- if (!processor->isRunning() && processor->getScheduledState() != DISABLED) {
- if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
+ switch (processor->getSchedulingStrategy()) {
+ case TIMER_DRIVEN:
timeScheduler->schedule(processor);
- else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
+ break;
+ case EVENT_DRIVEN:
eventScheduler->schedule(processor);
+ break;
+ case CRON_DRIVEN:
+ cronScheduler->schedule(processor);
+ break;
}
}
// Start processing the group
for (auto processGroup : child_process_groups_) {
- processGroup->startProcessing(timeScheduler, eventScheduler);
+ processGroup->startProcessing(timeScheduler, eventScheduler, cronScheduler);
}
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
@@ -164,22 +169,30 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, Ev
}
}
-void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) {
+void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
+ const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
try {
// Stop all the processor node, input and output ports
- for (std::set<std::shared_ptr<Processor> >::iterator it = processors_.begin(); it != processors_.end(); ++it) {
- std::shared_ptr<Processor> processor(*it);
- if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
- timeScheduler->unschedule(processor);
- else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
- eventScheduler->unschedule(processor);
+ for (const auto &processor : processors_) {
+ logger_->log_debug("Stopping %s", processor->getName());
+ switch (processor->getSchedulingStrategy()) {
+ case TIMER_DRIVEN:
+ timeScheduler->unschedule(processor);
+ break;
+ case EVENT_DRIVEN:
+ eventScheduler->unschedule(processor);
+ break;
+ case CRON_DRIVEN:
+ cronScheduler->unschedule(processor);
+ break;
+ }
}
for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); it != child_process_groups_.end(); ++it) {
ProcessGroup *processGroup(*it);
- processGroup->stopProcessing(timeScheduler, eventScheduler);
+ processGroup->stopProcessing(timeScheduler, eventScheduler, cronScheduler);
}
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 5ebc74e..756b5af 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -175,10 +175,16 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
}
// Take care of scheduling
+
core::TimeUnit unit;
- if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
- logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%ll] ns", schedulingPeriod);
- processor->setSchedulingPeriodNano(schedulingPeriod);
+
+ if (procCfg.schedulingStrategy == "TIMER_DRIVEN" || procCfg.schedulingStrategy == "EVENT_DRIVEN") {
+ if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+ logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%ll] ns", schedulingPeriod);
+ processor->setSchedulingPeriodNano(schedulingPeriod);
+ }
+ } else {
+ processor->setCronPeriod(procCfg.schedulingPeriod);
}
if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
diff --git a/libminifi/test/integration/TailFileCronTest.cpp b/libminifi/test/integration/TailFileCronTest.cpp
new file mode 100644
index 0000000..b3c2f31
--- /dev/null
+++ b/libminifi/test/integration/TailFileCronTest.cpp
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "../TestBase.h"
+#include "processors/TailFile.h"
+#include "processors/LogAttribute.h"
+#include "state/ProcessorController.h"
+#include "IntegrationBase.h"
+
+class TailFileTestHarness : public IntegrationBase {
+ public:
+ TailFileTestHarness() {
+ char format[] = "/tmp/ssth.XXXXXX";
+ dir = testController.createTempDirectory(format);
+
+ statefile = dir;
+ statefile += "/statefile";
+ std::fstream file;
+ ss << dir << "/" << "tstFile.ext";
+ file.open(ss.str(), std::ios::out);
+ file << "Lin\\e1\nli\\nen\nli\\ne3\nli\\ne4\nli\\ne5\n";
+ file.close();
+ }
+
+ void testSetup() {
+ LogTestController::getInstance().setInfo<minifi::processors::LogAttribute>();
+ LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+ LogTestController::getInstance().setInfo<minifi::FlowController>();
+ LogTestController::getInstance().setTrace<core::ProcessSession>();
+ LogTestController::getInstance().setDebug<core::ConfigurableComponent>();
+ }
+
+ virtual void cleanup() {
+ unlink(ss.str().c_str());
+ unlink(statefile.c_str());
+ }
+
+ virtual void runAssertions() {
+ assert(LogTestController::getInstance().contains("5 flowfiles were received from TailFile input") == true);
+ assert(LogTestController::getInstance().contains("Looking for delimiter 0xA") == true);
+ assert(LogTestController::getInstance().contains("li\\ne5") == true);
+ }
+
+ protected:
+ virtual void updateProperties(std::shared_ptr<minifi::FlowController> fc) {
+ for (auto &comp : fc->getComponents("tf")) {
+ std::shared_ptr<minifi::state::ProcessorController> proc = std::dynamic_pointer_cast<minifi::state::ProcessorController>(comp);
+ if (nullptr != proc) {
+ proc->getProcessor()->setProperty(minifi::processors::TailFile::FileName, ss.str());
+ proc->getProcessor()->setProperty(minifi::processors::TailFile::StateFile, statefile);
+ }
+ }
+ }
+
+ std::string statefile;
+ char *dir;
+ std::stringstream ss;
+ TestController testController;
+};
+
+int main(int argc, char **argv) {
+ std::string key_dir, test_file_location, url;
+ if (argc > 1) {
+ test_file_location = argv[1];
+ key_dir = argv[2];
+ }
+
+ TailFileTestHarness harness;
+
+ harness.run(test_file_location);
+
+ return 0;
+}
diff --git a/libminifi/test/resources/TestTailFileCron.yml b/libminifi/test/resources/TestTailFileCron.yml
new file mode 100644
index 0000000..dd30937
--- /dev/null
+++ b/libminifi/test/resources/TestTailFileCron.yml
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+ name: MiNiFi Flow
+ id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+ - name: tf
+ id: 2438e3c8-015a-1000-79ca-83af40ec1991
+ class: org.apache.nifi.processors.standard.TailFile
+ max concurrent tasks: 1
+ scheduling strategy: CRON_DRIVEN
+ scheduling period: "* * * * *"
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ Properties:
+ Input Delimiter: \n
+ - name: la
+ id: 2438e3c8-015a-1000-79ca-83af40ec1995
+ class: org.apache.nifi.processors.standard.LogAttribute
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 500 msec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ - success
+ Properties:
+ Log Payload: true
+
+Connections:
+ - name: tr1
+ id: 2438e3c8-015a-1000-79ca-83af40ec1997
+ source name: tf
+ source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+ source relationship name: success
+ destination name: la
+ destination id: 2438e3c8-015a-1000-79ca-83af40ec1995
+ source relationship name: success
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+
+Remote Processing Groups:
+
diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt
index b8aef83..49361c6 100644
--- a/main/CMakeLists.txt
+++ b/main/CMakeLists.txt
@@ -23,7 +23,7 @@ IF(POLICY CMP0048)
CMAKE_POLICY(SET CMP0048 OLD)
ENDIF(POLICY CMP0048)
-include_directories(../libminifi/include ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../thirdparty/rapidjson-1.1.0/include ../thirdparty/)
+include_directories(../libminifi/include ../thirdparty/cron ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../thirdparty/rapidjson-1.1.0/include ../thirdparty/)
include_directories(${JEMALLOC_INCLUDE_DIRS})
if(WIN32)
diff --git a/nanofi/CMakeLists.txt b/nanofi/CMakeLists.txt
index 14b4a56..059ab55 100644
--- a/nanofi/CMakeLists.txt
+++ b/nanofi/CMakeLists.txt
@@ -24,7 +24,7 @@ IF(POLICY CMP0048)
ENDIF(POLICY CMP0048)
include_directories(include)
-include_directories(../libminifi/include ../thirdparty/spdlog-20170710/include)
+include_directories(../libminifi/include ../thirdparty/cron ../thirdparty/spdlog-20170710/include)
include_directories(../thirdparty/ut)
if(WIN32)
@@ -98,4 +98,4 @@ endif(ENABLE_PYTHON AND NOT STATIC_BUILD)
if (NOT DISABLE_CURL)
add_subdirectory(examples)
-endif()
\ No newline at end of file
+endif()
diff --git a/thirdparty/cron/Cron.h b/thirdparty/cron/Cron.h
new file mode 100644
index 0000000..6992d47
--- /dev/null
+++ b/thirdparty/cron/Cron.h
@@ -0,0 +1,121 @@
+#include <chrono>
+#include <string>
+#include <sstream>
+#include <vector>
+#include <iterator>
+
+namespace Bosma {
+ using Clock = std::chrono::system_clock;
+
+ inline void add(std::tm &tm, Clock::duration time) {
+ auto tp = Clock::from_time_t(std::mktime(&tm));
+ auto tp_adjusted = tp + time;
+ auto tm_adjusted = Clock::to_time_t(tp_adjusted);
+ tm = *std::localtime(&tm_adjusted);
+ }
+
+ class BadCronExpression : public std::exception {
+ public:
+ explicit BadCronExpression(std::string msg) : msg_(std::move(msg)) {}
+
+ const char *what() const noexcept override { return (msg_.c_str()); }
+
+ private:
+ std::string msg_;
+ };
+
+ inline void
+ verify_and_set(const std::string &token, const std::string &expression, int &field, const int lower_bound,
+ const int upper_bound, const bool adjust = false) {
+ if (token == "*")
+ field = -1;
+ else {
+ try {
+ field = std::stoi(token);
+ } catch (const std::invalid_argument &) {
+ throw BadCronExpression("malformed cron string (`" + token + "` not an integer or *): " + expression);
+ } catch (const std::out_of_range &) {
+ throw BadCronExpression("malformed cron string (`" + token + "` not convertable to int): " + expression);
+ }
+ if (field < lower_bound || field > upper_bound) {
+ std::ostringstream oss;
+ oss << "malformed cron string ('" << token << "' must be <= " << upper_bound << " and >= " << lower_bound
+ << "): " << expression;
+ throw BadCronExpression(oss.str());
+ }
+ if (adjust)
+ field--;
+ }
+ }
+
+ class Cron {
+ public:
+ explicit Cron(const std::string &expression) {
+ std::istringstream iss(expression);
+ std::vector<std::string> tokens{std::istream_iterator<std::string>{iss},
+ std::istream_iterator<std::string>{}};
+
+ if (tokens.size() != 5) throw BadCronExpression("malformed cron string (must be 5 fields): " + expression);
+
+ verify_and_set(tokens[0], expression, minute, 0, 59);
+ verify_and_set(tokens[1], expression, hour, 0, 23);
+ verify_and_set(tokens[2], expression, day, 1, 31);
+ verify_and_set(tokens[3], expression, month, 1, 12, true);
+ verify_and_set(tokens[4], expression, day_of_week, 0, 6);
+ }
+
+ // http://stackoverflow.com/a/322058/1284550
+ Clock::time_point cron_to_next(const Clock::time_point from = Clock::now()) const {
+ // get current time as a tm object
+ auto now = Clock::to_time_t(from);
+ std::tm next(*std::localtime(&now));
+ // it will always at least run the next minute
+ next.tm_sec = 0;
+ add(next, std::chrono::minutes(1));
+ while (true) {
+ if (month != -1 && next.tm_mon != month) {
+ // add a month
+ // if this will bring us over a year, increment the year instead and reset the month
+ if (next.tm_mon + 1 > 11) {
+ next.tm_mon = 0;
+ next.tm_year++;
+ } else
+ next.tm_mon++;
+
+ next.tm_mday = 1;
+ next.tm_hour = 0;
+ next.tm_min = 0;
+ continue;
+ }
+ if (day != -1 && next.tm_mday != day) {
+ add(next, std::chrono::hours(24));
+ next.tm_hour = 0;
+ next.tm_min = 0;
+ continue;
+ }
+ if (day_of_week != -1 && next.tm_wday != day_of_week) {
+ add(next, std::chrono::hours(24));
+ next.tm_hour = 0;
+ next.tm_min = 0;
+ continue;
+ }
+ if (hour != -1 && next.tm_hour != hour) {
+ add(next, std::chrono::hours(1));
+ next.tm_min = 0;
+ continue;
+ }
+ if (minute != -1 && next.tm_min != minute) {
+ add(next, std::chrono::minutes(1));
+ continue;
+ }
+ break;
+ }
+
+ // telling mktime to figure out dst
+ next.tm_isdst = -1;
+ return Clock::from_time_t(std::mktime(&next));
+ }
+
+ int minute, hour, day, month, day_of_week;
+ };
+}
diff --git a/thirdparty/cron/LICENSE.TXT b/thirdparty/cron/LICENSE.TXT
new file mode 100644
index 0000000..8e86f89
--- /dev/null
+++ b/thirdparty/cron/LICENSE.TXT
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2017 Bosma
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file