You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/01/18 20:26:44 UTC

nifi-minifi-cpp git commit: MINIFI-182 Added initial event-based scheduler implementation

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master c45f05e51 -> 89b9a1987


MINIFI-182 Added initial event-based scheduler implementation

This closes #40.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/89b9a198
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/89b9a198
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/89b9a198

Branch: refs/heads/master
Commit: 89b9a1987766ab18196db676b085305689933b0f
Parents: c45f05e
Author: Andrew Christianson <an...@nextcentury.com>
Authored: Tue Jan 10 19:08:44 2017 +0000
Committer: Aldrin Piri <al...@apache.org>
Committed: Wed Jan 18 15:26:10 2017 -0500

----------------------------------------------------------------------
 libminifi/include/EventDrivenSchedulingAgent.h |  55 ++++++++++
 libminifi/include/FlowControlProtocol.h        |   8 +-
 libminifi/include/FlowController.h             |   5 +-
 libminifi/include/ProcessGroup.h               |   7 +-
 libminifi/include/Processor.h                  |  14 +++
 libminifi/include/ThreadedSchedulingAgent.h    |  70 ++++++++++++
 libminifi/include/TimerDrivenSchedulingAgent.h |  19 +---
 libminifi/src/Connection.cpp                   |  23 ++--
 libminifi/src/EventDrivenSchedulingAgent.cpp   |  47 ++++++++
 libminifi/src/FlowController.cpp               |  12 ++-
 libminifi/src/ProcessGroup.cpp                 |  14 ++-
 libminifi/src/Processor.cpp                    |  61 +++++++++++
 libminifi/src/SchedulingAgent.cpp              |   3 +-
 libminifi/src/ThreadedSchedulingAgent.cpp      | 113 ++++++++++++++++++++
 libminifi/src/TimerDrivenSchedulingAgent.cpp   |  99 +----------------
 15 files changed, 418 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/EventDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h
new file mode 100644
index 0000000..f6e6ffb
--- /dev/null
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -0,0 +1,55 @@
+/**
+ * @file EventDrivenSchedulingAgent.h
+ * EventDrivenSchedulingAgent class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __EVENT_DRIVEN_SCHEDULING_AGENT_H__
+#define __EVENT_DRIVEN_SCHEDULING_AGENT_H__
+
+#include "Logger.h"
+#include "Processor.h"
+#include "ProcessContext.h"
+#include "ThreadedSchedulingAgent.h"
+
+//! EventDrivenSchedulingAgent Class
+class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent
+{
+public:
+	//! Constructor
+	/*!
+	 * Create a new processor
+	 */
+	EventDrivenSchedulingAgent()
+	: ThreadedSchedulingAgent()
+	{
+	}
+	//! Destructor
+	virtual ~EventDrivenSchedulingAgent()
+	{
+	}
+	//! Run function for the thread
+	void run(Processor *processor);
+
+private:
+	// Prevent default copy constructor and assignment operation
+	// Only support pass by reference or pointer
+	EventDrivenSchedulingAgent(const EventDrivenSchedulingAgent &parent);
+	EventDrivenSchedulingAgent &operator=(const EventDrivenSchedulingAgent &parent);
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/FlowControlProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h
index be32e1e..2e8cc72 100644
--- a/libminifi/include/FlowControlProtocol.h
+++ b/libminifi/include/FlowControlProtocol.h
@@ -304,9 +304,9 @@ private:
 	//! Mutex for protection
 	std::mutex _mtx;
 	//! Logger
-	Logger *_logger;
+	Logger *_logger = NULL;
 	//! Configure
-	Configure *_configure;
+	Configure *_configure = NULL;
 	//! NiFi server Name
 	std::string _serverName;
 	//! NiFi server port
@@ -322,13 +322,13 @@ private:
 	//! seq number
 	uint32_t _seqNumber;
 	//! FlowController
-	FlowController *_controller;
+	FlowController *_controller = NULL;
 	//! report Blob
 	char *_reportBlob;
 	//! report Blob len;
 	int _reportBlobLen;
 	//! thread
-	std::thread *_thread;
+	std::thread *_thread = NULL;
 	//! whether it is running
 	bool _running;
 	// Prevent default copy constructor and assignment operation

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index ee8bb4f..9635bec 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -45,6 +45,7 @@
 #include "LogAttribute.h"
 #include "RealTimeDataCollector.h"
 #include "TimerDrivenSchedulingAgent.h"
+#include "EventDrivenSchedulingAgent.h"
 #include "FlowControlProtocol.h"
 #include "RemoteProcessorGroupPort.h"
 #include "Provenance.h"
@@ -198,8 +199,10 @@ protected:
 	//! Provenance Repo
 	ProvenanceRepository *_provenanceRepo;
 	//! Flow Engines
-	//! Flow Scheduler
+	//! Flow Timer Scheduler
 	TimerDrivenSchedulingAgent _timerScheduler;
+	//! Flow Event Scheduler
+	EventDrivenSchedulingAgent _eventScheduler;
 	//! Controller Service
 	//! Config
 	//! Site to Site Server Listener

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ProcessGroup.h b/libminifi/include/ProcessGroup.h
index 4dd26f8..304cfe6 100644
--- a/libminifi/include/ProcessGroup.h
+++ b/libminifi/include/ProcessGroup.h
@@ -33,6 +33,7 @@
 #include "Processor.h"
 #include "Exception.h"
 #include "TimerDrivenSchedulingAgent.h"
+#include "EventDrivenSchedulingAgent.h"
 
 //! Process Group Type
 enum ProcessGroupType
@@ -111,9 +112,11 @@ public:
 			return false;
 	}
 	//! Start Processing
-	void startProcessing(TimerDrivenSchedulingAgent *timeScheduler);
+	void startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+			EventDrivenSchedulingAgent *eventScheduler);
 	//! Stop Processing
-	void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler);
+	void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+			EventDrivenSchedulingAgent *eventScheduler);
 	//! Whether it is root process group
 	bool isRootProcessGroup();
 	//! set parent process group

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Processor.h b/libminifi/include/Processor.h
index db26ad0..eb3ef9f 100644
--- a/libminifi/include/Processor.h
+++ b/libminifi/include/Processor.h
@@ -25,9 +25,11 @@
 #include <queue>
 #include <map>
 #include <mutex>
+#include <condition_variable>
 #include <atomic>
 #include <algorithm>
 #include <set>
+#include <chrono>
 
 #include "TimeUtil.h"
 #include "Property.h"
@@ -278,6 +280,10 @@ public:
 	Connection *getNextIncomingConnection();
 	//! On Trigger
 	void onTrigger();
+	//! Block until work is available on any input connection, or the given duration elapses
+	void waitForWork(uint64_t timeoutMs);
+	//! Notify this processor that work may be available
+	void notifyWork();
 
 public:
 	//! OnTrigger method, implemented by NiFi Processor Designer
@@ -334,6 +340,14 @@ private:
 	std::atomic<uint64_t> _yieldExpiration;
 	//! Incoming connection Iterator
 	std::set<Connection *>::iterator _incomingConnectionsIter;
+	//! Condition for whether there is incoming work to do
+	bool _hasWork = false;
+	//! Concurrent condition mutex for whether there is incoming work to do
+	std::mutex _workAvailableMtx;
+	//! Concurrent condition variable for whether there is incoming work to do
+	std::condition_variable _hasWorkCondition;
+	//! Check all incoming connections for work
+	bool isWorkAvailable();
 	//! Logger
 	Logger *_logger;
 	// Prevent default copy constructor and assignment operation

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/ThreadedSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
new file mode 100644
index 0000000..2b14e3d
--- /dev/null
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -0,0 +1,70 @@
+/**
+ * @file ThreadedSchedulingAgent.h
+ * ThreadedSchedulingAgent class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __THREADED_SCHEDULING_AGENT_H__
+#define __THREADED_SCHEDULING_AGENT_H__
+
+#include "Logger.h"
+#include "Configure.h"
+#include "Processor.h"
+#include "ProcessContext.h"
+#include "SchedulingAgent.h"
+
+/**
+ * An abstract scheduling agent which creates and manages a pool of threads for
+ * each processor scheduled.
+ */
+class ThreadedSchedulingAgent : public SchedulingAgent
+{
+public:
+	//! Constructor
+	/*!
+	 * Create a new processor
+	 */
+	ThreadedSchedulingAgent()
+	: SchedulingAgent()
+	{
+	}
+	//! Destructor
+	virtual ~ThreadedSchedulingAgent()
+	{
+	}
+	
+	//! Run function for the thread
+	virtual void run(Processor *processor) = 0;
+
+public:
+	//! schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
+	virtual void schedule(Processor *processor);
+	//! unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent
+	virtual void unschedule(Processor *processor);
+
+protected:
+	//! Threads
+	std::map<std::string, std::vector<std::thread *>> _threads;
+
+private:
+	// Prevent default copy constructor and assignment operation
+	// Only support pass by reference or pointer
+	ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
+	ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/include/TimerDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index 9195745..7fd86f6 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -21,13 +21,12 @@
 #define __TIMER_DRIVEN_SCHEDULING_AGENT_H__
 
 #include "Logger.h"
-#include "Configure.h"
 #include "Processor.h"
 #include "ProcessContext.h"
-#include "SchedulingAgent.h"
+#include "ThreadedSchedulingAgent.h"
 
 //! TimerDrivenSchedulingAgent Class
-class TimerDrivenSchedulingAgent : public SchedulingAgent
+class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent
 {
 public:
 	//! Constructor
@@ -35,7 +34,7 @@ public:
 	 * Create a new processor
 	 */
 	TimerDrivenSchedulingAgent()
-	: SchedulingAgent()
+	: ThreadedSchedulingAgent()
 	{
 	}
 	//! Destructor
@@ -43,19 +42,9 @@ public:
 	{
 	}
 	//! Run function for the thread
-	static void run(TimerDrivenSchedulingAgent *agent, Processor *processor);
-
-public:
-	//! schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
-	virtual void schedule(Processor *processor);
-	//! unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent
-	virtual void unschedule(Processor *processor);
-
-protected:
+	void run(Processor *processor);
 
 private:
-	//! Threads
-	std::map<std::string, std::vector<std::thread *>> _threads;
 	// Prevent default copy constructor and assignment operation
 	// Only support pass by reference or pointer
 	TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index e036b89..7beaf7a 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -28,6 +28,7 @@
 #include <iostream>
 
 #include "Connection.h"
+#include "Processor.h"
 
 Connection::Connection(std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t destUUID)
 : _name(name)
@@ -81,14 +82,22 @@ bool Connection::isFull()
 
 void Connection::put(FlowFileRecord *flow)
 {
-	std::lock_guard<std::mutex> lock(_mtx);
-
-	_queue.push(flow);
-
-	_queuedDataSize += flow->getSize();
+	{
+		std::lock_guard<std::mutex> lock(_mtx);
+	
+		_queue.push(flow);
+	
+		_queuedDataSize += flow->getSize();
+	
+		_logger->log_debug("Enqueue flow file UUID %s to connection %s",
+				flow->getUUIDStr().c_str(), _name.c_str());
+	}
 
-	_logger->log_debug("Enqueue flow file UUID %s to connection %s",
-			flow->getUUIDStr().c_str(), _name.c_str());
+	// Notify receiving processor that work may be available
+	if(_destProcessor)
+	{
+		_destProcessor->notifyWork();
+	}
 }
 
 FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> &expiredFlowRecords)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/EventDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp
new file mode 100644
index 0000000..ed1d8ea
--- /dev/null
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -0,0 +1,47 @@
+/**
+ * @file EventDrivenSchedulingAgent.cpp
+ * EventDrivenSchedulingAgent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <chrono>
+#include <thread>
+#include <iostream>
+#include "Property.h"
+#include "EventDrivenSchedulingAgent.h"
+
+void EventDrivenSchedulingAgent::run(Processor *processor)
+{
+	while (this->_running)
+	{
+		bool shouldYield = this->onTrigger(processor);
+
+		if (processor->isYield())
+		{
+			// Honor the yield
+			std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
+		}
+		else if (shouldYield && this->_boredYieldDuration > 0)
+		{
+			// No work to do or need to apply back pressure
+			std::this_thread::sleep_for(std::chrono::milliseconds(this->_boredYieldDuration));
+		}
+
+		// Block until work is available
+		processor->waitForWork(1000);
+	}
+	return;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index caaa8ea..dce9e34 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -117,10 +117,13 @@ void FlowController::stop(bool force)
 	{
         _logger->log_info("Stop Flow Controller");
         this->_timerScheduler.stop();
+        this->_eventScheduler.stop();
         // Wait for sometime for thread stop
         std::this_thread::sleep_for(std::chrono::milliseconds(1000));
         if (this->_root)
-            this->_root->stopProcessing(&this->_timerScheduler);
+            this->_root->stopProcessing(
+                    &this->_timerScheduler,
+                    &this->_eventScheduler);
         _running = false;
     }
 }
@@ -621,7 +624,7 @@ void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, ProcessGro
                         this->parsePortYaml(&currPort, group, RECEIVE);
                     } // for node
                 }
-				
+
             }
         }
     }
@@ -1197,8 +1200,11 @@ bool FlowController::start() {
         if (!_running) {
             _logger->log_info("Start Flow Controller");
             this->_timerScheduler.start();
+            this->_eventScheduler.start();
             if (this->_root)
-                this->_root->startProcessing(&this->_timerScheduler);
+                this->_root->startProcessing(
+                        &this->_timerScheduler,
+                        &this->_eventScheduler);
             _running = true;
             this->_protocol->start();
         }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessGroup.cpp b/libminifi/src/ProcessGroup.cpp
index 70ee9d7..7c98278 100644
--- a/libminifi/src/ProcessGroup.cpp
+++ b/libminifi/src/ProcessGroup.cpp
@@ -127,7 +127,8 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child)
 	}
 }
 
-void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler)
+void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+		EventDrivenSchedulingAgent *eventScheduler)
 {
 	std::lock_guard<std::mutex> lock(_mtx);
 
@@ -141,13 +142,15 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler)
 			{
 				if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
 					timeScheduler->schedule(processor);
+				else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
+					eventScheduler->schedule(processor);
 			}
 		}
 
 		for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
 		{
 			ProcessGroup *processGroup(*it);
-			processGroup->startProcessing(timeScheduler);
+			processGroup->startProcessing(timeScheduler, eventScheduler);
 		}
 	}
 	catch (std::exception &exception)
@@ -162,7 +165,8 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler)
 	}
 }
 
-void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler)
+void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+		EventDrivenSchedulingAgent *eventScheduler)
 {
 	std::lock_guard<std::mutex> lock(_mtx);
 
@@ -174,12 +178,14 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler)
 			Processor *processor(*it);
 			if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
 					timeScheduler->unschedule(processor);
+			else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
+					eventScheduler->unschedule(processor);
 		}
 
 		for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
 		{
 			ProcessGroup *processGroup(*it);
-			processGroup->stopProcessing(timeScheduler);
+			processGroup->stopProcessing(timeScheduler, eventScheduler);
 		}
 	}
 	catch (std::exception &exception)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Processor.cpp b/libminifi/src/Processor.cpp
index cc136dc..8da253a 100644
--- a/libminifi/src/Processor.cpp
+++ b/libminifi/src/Processor.cpp
@@ -449,3 +449,64 @@ void Processor::onTrigger()
 		throw;
 	}
 }
+
+void Processor::waitForWork(uint64_t timeoutMs)
+{
+	std::unique_lock<std::mutex> lock(_workAvailableMtx);
+	_hasWork = isWorkAvailable();
+
+	if (!_hasWork)
+	{
+		_hasWorkCondition.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] { return _hasWork; });
+	}
+
+	lock.unlock();
+}
+
+void Processor::notifyWork()
+{
+	// Do nothing if we are not event-driven
+	if (_strategy != EVENT_DRIVEN)
+	{
+		return;
+	}
+
+	{
+		std::unique_lock<std::mutex> lock(_workAvailableMtx);
+		_hasWork = isWorkAvailable();
+
+		// Keep a scope-local copy of the state to avoid race conditions
+		bool hasWork = _hasWork;
+
+		lock.unlock();
+
+		if (hasWork)
+		{
+			_hasWorkCondition.notify_one();
+		}
+	}
+}
+
+bool Processor::isWorkAvailable()
+{
+	// We have work if any incoming connection has work
+	bool hasWork = false;
+
+	try
+	{
+		for (const auto &conn : getIncomingConnections())
+		{
+			if (conn->getQueueSize() > 0)
+			{
+				hasWork = true;
+				break;
+			}
+		}
+	}
+	catch (...)
+	{
+		_logger->log_error("Caught an exception while checking if work is available; unless it was positively determined that work is available, assuming NO work is available!");
+	}
+
+	return hasWork;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 211c328..a81cd55 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -82,5 +82,4 @@ bool SchedulingAgent::onTrigger(Processor *processor)
 	}
 
 	return false;
-}
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/ThreadedSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp
new file mode 100644
index 0000000..0338019
--- /dev/null
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -0,0 +1,113 @@
+/**
+ * @file ThreadedSchedulingAgent.cpp
+ * ThreadedSchedulingAgent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <thread>
+#include <iostream>
+
+#include "ThreadedSchedulingAgent.h"
+
+void ThreadedSchedulingAgent::schedule(Processor *processor)
+{
+	std::lock_guard<std::mutex> lock(_mtx);
+
+	_administrativeYieldDuration = 0;
+	std::string yieldValue;
+
+	if (_configure->get(Configure::nifi_administrative_yield_duration, yieldValue))
+	{
+		TimeUnit unit;
+		if (Property::StringToTime(yieldValue, _administrativeYieldDuration, unit) &&
+					Property::ConvertTimeUnitToMS(_administrativeYieldDuration, unit, _administrativeYieldDuration))
+		{
+			_logger->log_debug("nifi_administrative_yield_duration: [%d] ms", _administrativeYieldDuration);
+		}
+	}
+
+	_boredYieldDuration = 0;
+	if (_configure->get(Configure::nifi_bored_yield_duration, yieldValue))
+	{
+		TimeUnit unit;
+		if (Property::StringToTime(yieldValue, _boredYieldDuration, unit) &&
+					Property::ConvertTimeUnitToMS(_boredYieldDuration, unit, _boredYieldDuration))
+		{
+			_logger->log_debug("nifi_bored_yield_duration: [%d] ms", _boredYieldDuration);
+		}
+	}
+
+	if (processor->getScheduledState() != RUNNING)
+	{
+		_logger->log_info("Can not schedule threads for processor %s because it is not running", processor->getName().c_str());
+		return;
+	}
+
+	std::map<std::string, std::vector<std::thread *>>::iterator it =
+			_threads.find(processor->getUUIDStr());
+	if (it != _threads.end())
+	{
+		_logger->log_info("Can not schedule threads for processor %s because there are existing threads running");
+		return;
+	}
+
+	std::vector<std::thread *> threads;
+	for (int i = 0; i < processor->getMaxConcurrentTasks(); i++)
+	{
+	    ThreadedSchedulingAgent *agent = this;
+		std::thread *thread = new std::thread([agent, processor] () { agent->run(processor); });
+		thread->detach();
+		threads.push_back(thread);
+		_logger->log_info("Scheduled thread %d running for process %s", thread->get_id(),
+				processor->getName().c_str());
+	}
+	_threads[processor->getUUIDStr().c_str()] = threads;
+
+	return;
+}
+
+void ThreadedSchedulingAgent::unschedule(Processor *processor)
+{
+	std::lock_guard<std::mutex> lock(_mtx);
+	
+	_logger->log_info("Shutting down threads for processor %s/%s",
+			processor->getName().c_str(),
+			processor->getUUIDStr().c_str());
+
+	if (processor->getScheduledState() != RUNNING)
+	{
+		_logger->log_info("Cannot unschedule threads for processor %s because it is not running", processor->getName().c_str());
+		return;
+	}
+
+	std::map<std::string, std::vector<std::thread *>>::iterator it =
+			_threads.find(processor->getUUIDStr());
+
+	if (it == _threads.end())
+	{
+		_logger->log_info("Cannot unschedule threads for processor %s because there are no existing threads running", processor->getName().c_str());
+		return;
+	}
+	for (std::vector<std::thread *>::iterator itThread = it->second.begin(); itThread != it->second.end(); ++itThread)
+	{
+		std::thread *thread = *itThread;
+		_logger->log_info("Scheduled thread %d deleted for process %s", thread->get_id(),
+				processor->getName().c_str());
+		delete thread;
+	}
+	_threads.erase(processor->getUUIDStr());
+	processor->clearActiveTask();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/89b9a198/libminifi/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 3ce57ae..09b7ed6 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -23,112 +23,23 @@
 #include "Property.h"
 #include "TimerDrivenSchedulingAgent.h"
 
-void TimerDrivenSchedulingAgent::schedule(Processor *processor)
+void TimerDrivenSchedulingAgent::run(Processor *processor)
 {
-	std::lock_guard<std::mutex> lock(_mtx);
-
-	_administrativeYieldDuration = 0;
-	std::string yieldValue;
-
-	if (_configure->get(Configure::nifi_administrative_yield_duration, yieldValue))
+	while (this->_running)
 	{
-		TimeUnit unit;
-		if (Property::StringToTime(yieldValue, _administrativeYieldDuration, unit) &&
-					Property::ConvertTimeUnitToMS(_administrativeYieldDuration, unit, _administrativeYieldDuration))
-		{
-			_logger->log_debug("nifi_administrative_yield_duration: [%d] ms", _administrativeYieldDuration);
-		}
-	}
-
-	_boredYieldDuration = 0;
-	if (_configure->get(Configure::nifi_bored_yield_duration, yieldValue))
-	{
-		TimeUnit unit;
-		if (Property::StringToTime(yieldValue, _boredYieldDuration, unit) &&
-					Property::ConvertTimeUnitToMS(_boredYieldDuration, unit, _boredYieldDuration))
-		{
-			_logger->log_debug("nifi_bored_yield_duration: [%d] ms", _boredYieldDuration);
-		}
-	}
-
-	if (processor->getScheduledState() != RUNNING)
-	{
-		_logger->log_info("Can not schedule threads for processor %s because it is not running", processor->getName().c_str());
-		return;
-	}
-
-	std::map<std::string, std::vector<std::thread *>>::iterator it =
-			_threads.find(processor->getUUIDStr());
-	if (it != _threads.end())
-	{
-		_logger->log_info("Can not schedule threads for processor %s because there are existed thread running");
-		return;
-	}
-
-	std::vector<std::thread *> threads;
-	for (int i = 0; i < processor->getMaxConcurrentTasks(); i++)
-	{
-		std::thread *thread = new std::thread(run, this, processor);
-		thread->detach();
-		threads.push_back(thread);
-		_logger->log_info("Scheduled Time Driven thread %d running for process %s", thread->get_id(),
-				processor->getName().c_str());
-	}
-	_threads[processor->getUUIDStr().c_str()] = threads;
-
-	return;
-}
-
-void TimerDrivenSchedulingAgent::unschedule(Processor *processor)
-{
-	std::lock_guard<std::mutex> lock(_mtx);
-
-	if (processor->getScheduledState() != RUNNING)
-	{
-		_logger->log_info("Can not unschedule threads for processor %s because it is not running", processor->getName().c_str());
-		return;
-	}
-
-	std::map<std::string, std::vector<std::thread *>>::iterator it =
-			_threads.find(processor->getUUIDStr());
-
-	if (it == _threads.end())
-	{
-		_logger->log_info("Can not unschedule threads for processor %s because there are no existed thread running");
-		return;
-	}
-	for (std::vector<std::thread *>::iterator itThread = it->second.begin(); itThread != it->second.end(); ++itThread)
-	{
-		std::thread *thread = *itThread;
-		_logger->log_info("Scheduled Time Driven thread %d deleted for process %s", thread->get_id(),
-				processor->getName().c_str());
-		delete thread;
-	}
-	_threads.erase(processor->getUUIDStr());
-	processor->clearActiveTask();
-
-	return;
-}
-
-void TimerDrivenSchedulingAgent::run(TimerDrivenSchedulingAgent *agent, Processor *processor)
-{
-	while (agent->_running)
-	{
-		bool shouldYield = agent->onTrigger(processor);
+		bool shouldYield = this->onTrigger(processor);
 
 		if (processor->isYield())
 		{
 			// Honor the yield
 			std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
 		}
-		else if (shouldYield && agent->_boredYieldDuration > 0)
+		else if (shouldYield && this->_boredYieldDuration > 0)
 		{
 			// No work to do or need to apply back pressure
-			std::this_thread::sleep_for(std::chrono::milliseconds(agent->_boredYieldDuration));
+			std::this_thread::sleep_for(std::chrono::milliseconds(this->_boredYieldDuration));
 		}
 		std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
 	}
 	return;
 }
-
-