You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/01/18 20:26:44 UTC
nifi-minifi-cpp git commit: MINIFI-182 Added initial event-based
scheduler implementation
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master c45f05e51 -> 89b9a1987
MINIFI-182 Added initial event-based scheduler implementation
This closes #40.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/89b9a198
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/89b9a198
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/89b9a198
Branch: refs/heads/master
Commit: 89b9a1987766ab18196db676b085305689933b0f
Parents: c45f05e
Author: Andrew Christianson <an...@nextcentury.com>
Authored: Tue Jan 10 19:08:44 2017 +0000
Committer: Aldrin Piri <al...@apache.org>
Committed: Wed Jan 18 15:26:10 2017 -0500
----------------------------------------------------------------------
libminifi/include/EventDrivenSchedulingAgent.h | 55 ++++++++++
libminifi/include/FlowControlProtocol.h | 8 +-
libminifi/include/FlowController.h | 5 +-
libminifi/include/ProcessGroup.h | 7 +-
libminifi/include/Processor.h | 14 +++
libminifi/include/ThreadedSchedulingAgent.h | 70 ++++++++++++
libminifi/include/TimerDrivenSchedulingAgent.h | 19 +---
libminifi/src/Connection.cpp | 23 ++--
libminifi/src/EventDrivenSchedulingAgent.cpp | 47 ++++++++
libminifi/src/FlowController.cpp | 12 ++-
libminifi/src/ProcessGroup.cpp | 14 ++-
libminifi/src/Processor.cpp | 61 +++++++++++
libminifi/src/SchedulingAgent.cpp | 3 +-
libminifi/src/ThreadedSchedulingAgent.cpp | 113 ++++++++++++++++++++
libminifi/src/TimerDrivenSchedulingAgent.cpp | 99 +----------------
15 files changed, 418 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/EventDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h
new file mode 100644
index 0000000..f6e6ffb
--- /dev/null
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -0,0 +1,55 @@
+/**
+ * @file EventDrivenSchedulingAgent.h
+ * EventDrivenSchedulingAgent class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __EVENT_DRIVEN_SCHEDULING_AGENT_H__
+#define __EVENT_DRIVEN_SCHEDULING_AGENT_H__
+
+#include "Logger.h"
+#include "Processor.h"
+#include "ProcessContext.h"
+#include "ThreadedSchedulingAgent.h"
+
+//! EventDrivenSchedulingAgent Class
+class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent
+{
+public:
+ //! Constructor
+ /*!
+ * Create a new processor
+ */
+ EventDrivenSchedulingAgent()
+ : ThreadedSchedulingAgent()
+ {
+ }
+ //! Destructor
+ virtual ~EventDrivenSchedulingAgent()
+ {
+ }
+ //! Run function for the thread
+ void run(Processor *processor);
+
+private:
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ EventDrivenSchedulingAgent(const EventDrivenSchedulingAgent &parent);
+ EventDrivenSchedulingAgent &operator=(const EventDrivenSchedulingAgent &parent);
+
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/FlowControlProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h
index be32e1e..2e8cc72 100644
--- a/libminifi/include/FlowControlProtocol.h
+++ b/libminifi/include/FlowControlProtocol.h
@@ -304,9 +304,9 @@ private:
//! Mutex for protection
std::mutex _mtx;
//! Logger
- Logger *_logger;
+ Logger *_logger = NULL;
//! Configure
- Configure *_configure;
+ Configure *_configure = NULL;
//! NiFi server Name
std::string _serverName;
//! NiFi server port
@@ -322,13 +322,13 @@ private:
//! seq number
uint32_t _seqNumber;
//! FlowController
- FlowController *_controller;
+ FlowController *_controller = NULL;
//! report Blob
char *_reportBlob;
//! report Blob len;
int _reportBlobLen;
//! thread
- std::thread *_thread;
+ std::thread *_thread = NULL;
//! whether it is running
bool _running;
// Prevent default copy constructor and assignment operation
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index ee8bb4f..9635bec 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -45,6 +45,7 @@
#include "LogAttribute.h"
#include "RealTimeDataCollector.h"
#include "TimerDrivenSchedulingAgent.h"
+#include "EventDrivenSchedulingAgent.h"
#include "FlowControlProtocol.h"
#include "RemoteProcessorGroupPort.h"
#include "Provenance.h"
@@ -198,8 +199,10 @@ protected:
//! Provenance Repo
ProvenanceRepository *_provenanceRepo;
//! Flow Engines
- //! Flow Scheduler
+ //! Flow Timer Scheduler
TimerDrivenSchedulingAgent _timerScheduler;
+ //! Flow Event Scheduler
+ EventDrivenSchedulingAgent _eventScheduler;
//! Controller Service
//! Config
//! Site to Site Server Listener
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ProcessGroup.h b/libminifi/include/ProcessGroup.h
index 4dd26f8..304cfe6 100644
--- a/libminifi/include/ProcessGroup.h
+++ b/libminifi/include/ProcessGroup.h
@@ -33,6 +33,7 @@
#include "Processor.h"
#include "Exception.h"
#include "TimerDrivenSchedulingAgent.h"
+#include "EventDrivenSchedulingAgent.h"
//! Process Group Type
enum ProcessGroupType
@@ -111,9 +112,11 @@ public:
return false;
}
//! Start Processing
- void startProcessing(TimerDrivenSchedulingAgent *timeScheduler);
+ void startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+ EventDrivenSchedulingAgent *eventScheduler);
//! Stop Processing
- void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler);
+ void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+ EventDrivenSchedulingAgent *eventScheduler);
//! Whether it is root process group
bool isRootProcessGroup();
//! set parent process group
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Processor.h b/libminifi/include/Processor.h
index db26ad0..eb3ef9f 100644
--- a/libminifi/include/Processor.h
+++ b/libminifi/include/Processor.h
@@ -25,9 +25,11 @@
#include <queue>
#include <map>
#include <mutex>
+#include <condition_variable>
#include <atomic>
#include <algorithm>
#include <set>
+#include <chrono>
#include "TimeUtil.h"
#include "Property.h"
@@ -278,6 +280,10 @@ public:
Connection *getNextIncomingConnection();
//! On Trigger
void onTrigger();
+ //! Block until work is available on any input connection, or the given duration elapses
+ void waitForWork(uint64_t timeoutMs);
+ //! Notify this processor that work may be available
+ void notifyWork();
public:
//! OnTrigger method, implemented by NiFi Processor Designer
@@ -334,6 +340,14 @@ private:
std::atomic<uint64_t> _yieldExpiration;
//! Incoming connection Iterator
std::set<Connection *>::iterator _incomingConnectionsIter;
+ //! Condition for whether there is incoming work to do
+ bool _hasWork = false;
+ //! Concurrent condition mutex for whether there is incoming work to do
+ std::mutex _workAvailableMtx;
+ //! Concurrent condition variable for whether there is incoming work to do
+ std::condition_variable _hasWorkCondition;
+ //! Check all incoming connections for work
+ bool isWorkAvailable();
//! Logger
Logger *_logger;
// Prevent default copy constructor and assignment operation
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/ThreadedSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
new file mode 100644
index 0000000..2b14e3d
--- /dev/null
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -0,0 +1,70 @@
+/**
+ * @file ThreadedSchedulingAgent.h
+ * ThreadedSchedulingAgent class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __THREADED_SCHEDULING_AGENT_H__
+#define __THREADED_SCHEDULING_AGENT_H__
+
+#include "Logger.h"
+#include "Configure.h"
+#include "Processor.h"
+#include "ProcessContext.h"
+#include "SchedulingAgent.h"
+
+/**
+ * An abstract scheduling agent which creates and manages a pool of threads for
+ * each processor scheduled.
+ */
+class ThreadedSchedulingAgent : public SchedulingAgent
+{
+public:
+ //! Constructor
+ /*!
+ * Create a new processor
+ */
+ ThreadedSchedulingAgent()
+ : SchedulingAgent()
+ {
+ }
+ //! Destructor
+ virtual ~ThreadedSchedulingAgent()
+ {
+ }
+
+ //! Run function for the thread
+ virtual void run(Processor *processor) = 0;
+
+public:
+ //! schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
+ virtual void schedule(Processor *processor);
+ //! unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent
+ virtual void unschedule(Processor *processor);
+
+protected:
+ //! Threads
+ std::map<std::string, std::vector<std::thread *>> _threads;
+
+private:
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
+ ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
+
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/TimerDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index 9195745..7fd86f6 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -21,13 +21,12 @@
#define __TIMER_DRIVEN_SCHEDULING_AGENT_H__
#include "Logger.h"
-#include "Configure.h"
#include "Processor.h"
#include "ProcessContext.h"
-#include "SchedulingAgent.h"
+#include "ThreadedSchedulingAgent.h"
//! TimerDrivenSchedulingAgent Class
-class TimerDrivenSchedulingAgent : public SchedulingAgent
+class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent
{
public:
//! Constructor
@@ -35,7 +34,7 @@ public:
* Create a new processor
*/
TimerDrivenSchedulingAgent()
- : SchedulingAgent()
+ : ThreadedSchedulingAgent()
{
}
//! Destructor
@@ -43,19 +42,9 @@ public:
{
}
//! Run function for the thread
- static void run(TimerDrivenSchedulingAgent *agent, Processor *processor);
-
-public:
- //! schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
- virtual void schedule(Processor *processor);
- //! unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent
- virtual void unschedule(Processor *processor);
-
-protected:
+ void run(Processor *processor);
private:
- //! Threads
- std::map<std::string, std::vector<std::thread *>> _threads;
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index e036b89..7beaf7a 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -28,6 +28,7 @@
#include <iostream>
#include "Connection.h"
+#include "Processor.h"
Connection::Connection(std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t destUUID)
: _name(name)
@@ -81,14 +82,22 @@ bool Connection::isFull()
void Connection::put(FlowFileRecord *flow)
{
- std::lock_guard<std::mutex> lock(_mtx);
-
- _queue.push(flow);
-
- _queuedDataSize += flow->getSize();
+ {
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ _queue.push(flow);
+
+ _queuedDataSize += flow->getSize();
+
+ _logger->log_debug("Enqueue flow file UUID %s to connection %s",
+ flow->getUUIDStr().c_str(), _name.c_str());
+ }
- _logger->log_debug("Enqueue flow file UUID %s to connection %s",
- flow->getUUIDStr().c_str(), _name.c_str());
+ // Notify receiving processor that work may be available
+ if(_destProcessor)
+ {
+ _destProcessor->notifyWork();
+ }
}
FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> &expiredFlowRecords)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/EventDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp
new file mode 100644
index 0000000..ed1d8ea
--- /dev/null
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -0,0 +1,47 @@
+/**
+ * @file EventDrivenSchedulingAgent.cpp
+ * EventDrivenSchedulingAgent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <chrono>
+#include <thread>
+#include <iostream>
+#include "Property.h"
+#include "EventDrivenSchedulingAgent.h"
+
+void EventDrivenSchedulingAgent::run(Processor *processor)
+{
+ while (this->_running)
+ {
+ bool shouldYield = this->onTrigger(processor);
+
+ if (processor->isYield())
+ {
+ // Honor the yield
+ std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
+ }
+ else if (shouldYield && this->_boredYieldDuration > 0)
+ {
+ // No work to do or need to apply back pressure
+ std::this_thread::sleep_for(std::chrono::milliseconds(this->_boredYieldDuration));
+ }
+
+ // Block until work is available
+ processor->waitForWork(1000);
+ }
+ return;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index caaa8ea..dce9e34 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -117,10 +117,13 @@ void FlowController::stop(bool force)
{
_logger->log_info("Stop Flow Controller");
this->_timerScheduler.stop();
+ this->_eventScheduler.stop();
// Wait for sometime for thread stop
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
if (this->_root)
- this->_root->stopProcessing(&this->_timerScheduler);
+ this->_root->stopProcessing(
+ &this->_timerScheduler,
+ &this->_eventScheduler);
_running = false;
}
}
@@ -621,7 +624,7 @@ void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, ProcessGro
this->parsePortYaml(&currPort, group, RECEIVE);
} // for node
}
-
+
}
}
}
@@ -1197,8 +1200,11 @@ bool FlowController::start() {
if (!_running) {
_logger->log_info("Start Flow Controller");
this->_timerScheduler.start();
+ this->_eventScheduler.start();
if (this->_root)
- this->_root->startProcessing(&this->_timerScheduler);
+ this->_root->startProcessing(
+ &this->_timerScheduler,
+ &this->_eventScheduler);
_running = true;
this->_protocol->start();
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessGroup.cpp b/libminifi/src/ProcessGroup.cpp
index 70ee9d7..7c98278 100644
--- a/libminifi/src/ProcessGroup.cpp
+++ b/libminifi/src/ProcessGroup.cpp
@@ -127,7 +127,8 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child)
}
}
-void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler)
+void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+ EventDrivenSchedulingAgent *eventScheduler)
{
std::lock_guard<std::mutex> lock(_mtx);
@@ -141,13 +142,15 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler)
{
if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
timeScheduler->schedule(processor);
+ else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
+ eventScheduler->schedule(processor);
}
}
for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
{
ProcessGroup *processGroup(*it);
- processGroup->startProcessing(timeScheduler);
+ processGroup->startProcessing(timeScheduler, eventScheduler);
}
}
catch (std::exception &exception)
@@ -162,7 +165,8 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler)
}
}
-void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler)
+void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+ EventDrivenSchedulingAgent *eventScheduler)
{
std::lock_guard<std::mutex> lock(_mtx);
@@ -174,12 +178,14 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler)
Processor *processor(*it);
if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
timeScheduler->unschedule(processor);
+ else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
+ eventScheduler->unschedule(processor);
}
for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
{
ProcessGroup *processGroup(*it);
- processGroup->stopProcessing(timeScheduler);
+ processGroup->stopProcessing(timeScheduler, eventScheduler);
}
}
catch (std::exception &exception)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Processor.cpp b/libminifi/src/Processor.cpp
index cc136dc..8da253a 100644
--- a/libminifi/src/Processor.cpp
+++ b/libminifi/src/Processor.cpp
@@ -449,3 +449,64 @@ void Processor::onTrigger()
throw;
}
}
+
+void Processor::waitForWork(uint64_t timeoutMs)
+{
+ std::unique_lock<std::mutex> lock(_workAvailableMtx);
+ _hasWork = isWorkAvailable();
+
+ if (!_hasWork)
+ {
+ _hasWorkCondition.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] { return _hasWork; });
+ }
+
+ lock.unlock();
+}
+
+void Processor::notifyWork()
+{
+ // Do nothing if we are not event-driven
+ if (_strategy != EVENT_DRIVEN)
+ {
+ return;
+ }
+
+ {
+ std::unique_lock<std::mutex> lock(_workAvailableMtx);
+ _hasWork = isWorkAvailable();
+
+ // Keep a scope-local copy of the state to avoid race conditions
+ bool hasWork = _hasWork;
+
+ lock.unlock();
+
+ if (hasWork)
+ {
+ _hasWorkCondition.notify_one();
+ }
+ }
+}
+
+bool Processor::isWorkAvailable()
+{
+ // We have work if any incoming connection has work
+ bool hasWork = false;
+
+ try
+ {
+ for (const auto &conn : getIncomingConnections())
+ {
+ if (conn->getQueueSize() > 0)
+ {
+ hasWork = true;
+ break;
+ }
+ }
+ }
+ catch (...)
+ {
+ _logger->log_error("Caught an exception while checking if work is available; unless it was positively determined that work is available, assuming NO work is available!");
+ }
+
+ return hasWork;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 211c328..a81cd55 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -82,5 +82,4 @@ bool SchedulingAgent::onTrigger(Processor *processor)
}
return false;
-}
-
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/ThreadedSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp
new file mode 100644
index 0000000..0338019
--- /dev/null
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -0,0 +1,113 @@
+/**
+ * @file ThreadedSchedulingAgent.cpp
+ * ThreadedSchedulingAgent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <thread>
+#include <iostream>
+
+#include "ThreadedSchedulingAgent.h"
+
+void ThreadedSchedulingAgent::schedule(Processor *processor)
+{
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ _administrativeYieldDuration = 0;
+ std::string yieldValue;
+
+ if (_configure->get(Configure::nifi_administrative_yield_duration, yieldValue))
+ {
+ TimeUnit unit;
+ if (Property::StringToTime(yieldValue, _administrativeYieldDuration, unit) &&
+ Property::ConvertTimeUnitToMS(_administrativeYieldDuration, unit, _administrativeYieldDuration))
+ {
+ _logger->log_debug("nifi_administrative_yield_duration: [%d] ms", _administrativeYieldDuration);
+ }
+ }
+
+ _boredYieldDuration = 0;
+ if (_configure->get(Configure::nifi_bored_yield_duration, yieldValue))
+ {
+ TimeUnit unit;
+ if (Property::StringToTime(yieldValue, _boredYieldDuration, unit) &&
+ Property::ConvertTimeUnitToMS(_boredYieldDuration, unit, _boredYieldDuration))
+ {
+ _logger->log_debug("nifi_bored_yield_duration: [%d] ms", _boredYieldDuration);
+ }
+ }
+
+ if (processor->getScheduledState() != RUNNING)
+ {
+ _logger->log_info("Can not schedule threads for processor %s because it is not running", processor->getName().c_str());
+ return;
+ }
+
+ std::map<std::string, std::vector<std::thread *>>::iterator it =
+ _threads.find(processor->getUUIDStr());
+ if (it != _threads.end())
+ {
+ _logger->log_info("Can not schedule threads for processor %s because there are existing threads running");
+ return;
+ }
+
+ std::vector<std::thread *> threads;
+ for (int i = 0; i < processor->getMaxConcurrentTasks(); i++)
+ {
+ ThreadedSchedulingAgent *agent = this;
+ std::thread *thread = new std::thread([agent, processor] () { agent->run(processor); });
+ thread->detach();
+ threads.push_back(thread);
+ _logger->log_info("Scheduled thread %d running for process %s", thread->get_id(),
+ processor->getName().c_str());
+ }
+ _threads[processor->getUUIDStr().c_str()] = threads;
+
+ return;
+}
+
+void ThreadedSchedulingAgent::unschedule(Processor *processor)
+{
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ _logger->log_info("Shutting down threads for processor %s/%s",
+ processor->getName().c_str(),
+ processor->getUUIDStr().c_str());
+
+ if (processor->getScheduledState() != RUNNING)
+ {
+ _logger->log_info("Cannot unschedule threads for processor %s because it is not running", processor->getName().c_str());
+ return;
+ }
+
+ std::map<std::string, std::vector<std::thread *>>::iterator it =
+ _threads.find(processor->getUUIDStr());
+
+ if (it == _threads.end())
+ {
+ _logger->log_info("Cannot unschedule threads for processor %s because there are no existing threads running", processor->getName().c_str());
+ return;
+ }
+ for (std::vector<std::thread *>::iterator itThread = it->second.begin(); itThread != it->second.end(); ++itThread)
+ {
+ std::thread *thread = *itThread;
+ _logger->log_info("Scheduled thread %d deleted for process %s", thread->get_id(),
+ processor->getName().c_str());
+ delete thread;
+ }
+ _threads.erase(processor->getUUIDStr());
+ processor->clearActiveTask();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 3ce57ae..09b7ed6 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -23,112 +23,23 @@
#include "Property.h"
#include "TimerDrivenSchedulingAgent.h"
-void TimerDrivenSchedulingAgent::schedule(Processor *processor)
+void TimerDrivenSchedulingAgent::run(Processor *processor)
{
- std::lock_guard<std::mutex> lock(_mtx);
-
- _administrativeYieldDuration = 0;
- std::string yieldValue;
-
- if (_configure->get(Configure::nifi_administrative_yield_duration, yieldValue))
+ while (this->_running)
{
- TimeUnit unit;
- if (Property::StringToTime(yieldValue, _administrativeYieldDuration, unit) &&
- Property::ConvertTimeUnitToMS(_administrativeYieldDuration, unit, _administrativeYieldDuration))
- {
- _logger->log_debug("nifi_administrative_yield_duration: [%d] ms", _administrativeYieldDuration);
- }
- }
-
- _boredYieldDuration = 0;
- if (_configure->get(Configure::nifi_bored_yield_duration, yieldValue))
- {
- TimeUnit unit;
- if (Property::StringToTime(yieldValue, _boredYieldDuration, unit) &&
- Property::ConvertTimeUnitToMS(_boredYieldDuration, unit, _boredYieldDuration))
- {
- _logger->log_debug("nifi_bored_yield_duration: [%d] ms", _boredYieldDuration);
- }
- }
-
- if (processor->getScheduledState() != RUNNING)
- {
- _logger->log_info("Can not schedule threads for processor %s because it is not running", processor->getName().c_str());
- return;
- }
-
- std::map<std::string, std::vector<std::thread *>>::iterator it =
- _threads.find(processor->getUUIDStr());
- if (it != _threads.end())
- {
- _logger->log_info("Can not schedule threads for processor %s because there are existed thread running");
- return;
- }
-
- std::vector<std::thread *> threads;
- for (int i = 0; i < processor->getMaxConcurrentTasks(); i++)
- {
- std::thread *thread = new std::thread(run, this, processor);
- thread->detach();
- threads.push_back(thread);
- _logger->log_info("Scheduled Time Driven thread %d running for process %s", thread->get_id(),
- processor->getName().c_str());
- }
- _threads[processor->getUUIDStr().c_str()] = threads;
-
- return;
-}
-
-void TimerDrivenSchedulingAgent::unschedule(Processor *processor)
-{
- std::lock_guard<std::mutex> lock(_mtx);
-
- if (processor->getScheduledState() != RUNNING)
- {
- _logger->log_info("Can not unschedule threads for processor %s because it is not running", processor->getName().c_str());
- return;
- }
-
- std::map<std::string, std::vector<std::thread *>>::iterator it =
- _threads.find(processor->getUUIDStr());
-
- if (it == _threads.end())
- {
- _logger->log_info("Can not unschedule threads for processor %s because there are no existed thread running");
- return;
- }
- for (std::vector<std::thread *>::iterator itThread = it->second.begin(); itThread != it->second.end(); ++itThread)
- {
- std::thread *thread = *itThread;
- _logger->log_info("Scheduled Time Driven thread %d deleted for process %s", thread->get_id(),
- processor->getName().c_str());
- delete thread;
- }
- _threads.erase(processor->getUUIDStr());
- processor->clearActiveTask();
-
- return;
-}
-
-void TimerDrivenSchedulingAgent::run(TimerDrivenSchedulingAgent *agent, Processor *processor)
-{
- while (agent->_running)
- {
- bool shouldYield = agent->onTrigger(processor);
+ bool shouldYield = this->onTrigger(processor);
if (processor->isYield())
{
// Honor the yield
std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
}
- else if (shouldYield && agent->_boredYieldDuration > 0)
+ else if (shouldYield && this->_boredYieldDuration > 0)
{
// No work to do or need to apply back pressure
- std::this_thread::sleep_for(std::chrono::milliseconds(agent->_boredYieldDuration));
+ std::this_thread::sleep_for(std::chrono::milliseconds(this->_boredYieldDuration));
}
std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
}
return;
}
-
-