You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/04/24 12:17:03 UTC

[nifi-minifi-cpp] branch master updated (427fd5d -> 0448159)

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git.


    from 427fd5d  MINIFICPP-1192 - Add macOS support and in-function offsets to backtrace
     new 9552757  MINIFICPP-1157 Implement light weight heartbeat.   Remove agent manifest from regular heartbeat messages.   Send agent manifest in response to DESCRIBE manifest request
     new 0448159  MINIFICPP-1169 Simplify C2 metrics collection and reporting.   Introduce nifi.c2.full.heartbeat configuration parameter   Remove AgentInformationWithoutManifest, AgentInformationWithManifest classes   Simplify FlowController base classes. Get rid of inheriting from   StateManager class and do not use TreeUpdateListener

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 C2.md                                              |   5 +
 bin/minifi.sh                                      |   8 +-
 conf/minifi.properties                             |   2 +
 extensions/coap/tests/CoapIntegrationBase.h        |   7 +-
 .../http-curl/tests/C2DescribeManifestTest.cpp     |  65 +++++
 extensions/http-curl/tests/C2FailedUpdateTest.cpp  |   4 -
 extensions/http-curl/tests/C2JstackTest.cpp        | 159 +++---------
 .../http-curl/tests/C2VerifyHeartbeatAndStop.cpp   | 181 ++++---------
 extensions/http-curl/tests/CMakeLists.txt          |   1 +
 extensions/http-curl/tests/HTTPHandlers.h          | 102 ++++++++
 extensions/http-curl/tests/HTTPIntegrationBase.h   |  59 ++++-
 libminifi/include/FlowController.h                 |  44 ++--
 libminifi/include/SchedulingAgent.h                |   4 -
 libminifi/include/c2/C2Agent.h                     |  56 ++--
 libminifi/include/core/Core.h                      |   7 +-
 libminifi/include/core/state/StateManager.h        | 140 ----------
 libminifi/include/core/state/UpdateController.h    |  18 +-
 .../include/core/state/nodes/AgentInformation.h    | 109 ++++++--
 libminifi/include/core/state/nodes/MetricsBase.h   |  36 ++-
 .../include/core/state/nodes/TreeUpdateListener.h  |  43 ----
 libminifi/include/properties/Configure.h           |  17 +-
 libminifi/include/properties/Properties.h          |   2 +-
 libminifi/include/utils/BackTrace.h                |  27 +-
 libminifi/include/utils/ThreadPool.h               |   4 +-
 libminifi/src/Configure.cpp                        |   1 +
 libminifi/src/FlowController.cpp                   | 103 ++++----
 libminifi/src/c2/C2Agent.cpp                       | 286 +++++++++------------
 libminifi/src/core/Core.cpp                        |   2 -
 libminifi/src/core/state/StateManager.cpp          | 137 ----------
 libminifi/src/utils/BackTrace.cpp                  |  27 +-
 libminifi/src/utils/ThreadPool.cpp                 |   2 +-
 libminifi/test/integration/IntegrationBase.h       |  14 +-
 main/MiNiFiMain.cpp                                |   2 +
 nanofi/include/cxx/Instance.h                      |   7 +-
 34 files changed, 720 insertions(+), 961 deletions(-)
 create mode 100644 extensions/http-curl/tests/C2DescribeManifestTest.cpp
 delete mode 100644 libminifi/include/core/state/StateManager.h
 delete mode 100644 libminifi/src/core/state/StateManager.cpp


[nifi-minifi-cpp] 02/02: MINIFICPP-1169 Simplify C2 metrics collection and reporting. Introduce nifi.c2.full.heartbeat configuration parameter Remove AgentInformationWithoutManifest, AgentInformationWithManifest classes Simplify FlowController base classes. Get rid of inheriting from StateManager class and do not use TreeUpdateListener

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 04481593ee62e7ad51fa0f40b7fbd46f6a3917c8
Author: Murtuza <ms...@gmail.com>
AuthorDate: Tue Mar 3 14:02:59 2020 -0500

    MINIFICPP-1169 Simplify C2 metrics collection and reporting.
      Introduce nifi.c2.full.heartbeat configuration parameter
      Remove AgentInformationWithoutManifest, AgentInformationWithManifest classes
      Simplify FlowController base classes. Get rid of inheriting from
      StateManager class and do not use TreeUpdateListener
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #743
---
 C2.md                                              |   4 +-
 bin/minifi.sh                                      |   8 +-
 conf/minifi.properties                             |   2 +-
 extensions/coap/tests/CoapIntegrationBase.h        |   5 +-
 .../http-curl/tests/C2DescribeManifestTest.cpp     |  28 +--
 extensions/http-curl/tests/C2FailedUpdateTest.cpp  |   4 -
 extensions/http-curl/tests/C2JstackTest.cpp        |  32 +--
 .../http-curl/tests/C2VerifyHeartbeatAndStop.cpp   |  73 ++-----
 extensions/http-curl/tests/HTTPHandlers.h          |  30 +--
 extensions/http-curl/tests/HTTPIntegrationBase.h   |  33 +--
 libminifi/include/FlowController.h                 |  48 ++---
 libminifi/include/SchedulingAgent.h                |   4 -
 libminifi/include/c2/C2Agent.h                     |  49 +++--
 libminifi/include/core/Core.h                      |   7 +-
 libminifi/include/core/state/StateManager.h        | 140 -------------
 libminifi/include/core/state/UpdateController.h    |  18 +-
 .../include/core/state/nodes/AgentInformation.h    |  92 +++------
 libminifi/include/core/state/nodes/MetricsBase.h   |  36 +---
 .../include/core/state/nodes/TreeUpdateListener.h  |  43 ----
 libminifi/include/properties/Configure.h           |  17 +-
 libminifi/include/utils/BackTrace.h                |  27 ++-
 libminifi/include/utils/ThreadPool.h               |   4 +-
 libminifi/src/Configure.cpp                        |   1 +
 libminifi/src/FlowController.cpp                   | 133 +++++-------
 libminifi/src/c2/C2Agent.cpp                       | 223 +++++++++------------
 libminifi/src/core/Core.cpp                        |   2 -
 libminifi/src/core/state/StateManager.cpp          | 137 -------------
 libminifi/src/utils/BackTrace.cpp                  |  27 +--
 libminifi/src/utils/ThreadPool.cpp                 |   2 +-
 libminifi/test/integration/IntegrationBase.h       |   7 +-
 main/MiNiFiMain.cpp                                |   2 +
 nanofi/include/cxx/Instance.h                      |   7 +-
 32 files changed, 342 insertions(+), 903 deletions(-)

diff --git a/C2.md b/C2.md
index dc99778..560b8ea 100644
--- a/C2.md
+++ b/C2.md
@@ -49,8 +49,8 @@ Release 0.6.0: Please note that all c2 properties now exist as `nifi.c2.*`. If y
 files contain the former naming convention of `c2.*`, we will continue to support that as
 an alternate key, but you are encouraged to switch your configuration options as soon as possible.
 
-Note: In release 0.8.0 there is a configuration option to minizime the heartbeat payload size by excluding agent manifest.
-For that, replace "AgentInformation" with "AgentInformationWithoutManifest in nifi.c2.root.classes property value.
+Note: In release 0.8.0 there is a configuration option to minimize the heartbeat payload size by excluding agent manifest.
+For that, add "nifi.c2.full.heartbeat"=false property.
 With this change, heartbeat with agent manifest included is sent only for the first time then falls back to sending
 light weight heartbeat. If for some reason the C2 server does not receive the first full heartbeat, the manifest can
 be requested via C2 DESCRIBE manifest command.
diff --git a/bin/minifi.sh b/bin/minifi.sh
index ac0b409..d149507 100755
--- a/bin/minifi.sh
+++ b/bin/minifi.sh
@@ -192,8 +192,8 @@ case "\$1" in
         echo "MiNiFi is not currently running."
       else
         echo "Stopping MiNiFi (PID: \${saved_pid})."
-        # Send a SIGINT to MiNiFi so that the handler begins shutdown.
-        kill -2 \${saved_pid} > /dev/null 2>&1
+        # Send a SIGTERM to MiNiFi so that the handler begins shutdown.
+        kill -15 \${saved_pid} > /dev/null 2>&1
         if [ \$? -ne 0 ]; then
           echo "Could not successfully send termination signal to MiNiFi (PID: \${saved_pid})"
           exit 1;
@@ -321,8 +321,8 @@ case "$1" in
         echo "MiNiFi is not currently running."
       else
         echo "Stopping MiNiFi (PID: ${saved_pid})."
-        # Send a SIGINT to MiNiFi so that the handler begins shutdown.
-        kill -2 ${saved_pid} > /dev/null 2>&1
+        # Send a SIGTERM to MiNiFi so that the handler begins shutdown.
+        kill -15 ${saved_pid} > /dev/null 2>&1
         if [ $? -ne 0 ]; then
           echo "Could not successfully send termination signal to MiNiFi (PID: ${saved_pid})"
           exit 1;
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 249ca7a..3fa2c19 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -53,7 +53,7 @@ nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_reposi
 #nifi.c2.rest.url.ack=
 nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
 ## Minimize heartbeat payload size by excluding agent manifest from the heartbeat
-#nifi.c2.root.classes=DeviceInfoNode,AgentInformationWithoutManifest,FlowInformation
+#nifi.c2.full.heartbeat=false
 ## heartbeat 4 times a second
 #nifi.c2.agent.heartbeat.period=250
 ## define parameters about your agent 
diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
index bd7740e..c8f8499 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -41,8 +41,6 @@ class CoapIntegrationBase : public IntegrationBase {
 
   void setUrl(std::string url, CivetHandler *handler);
 
-  virtual ~CoapIntegrationBase() = default;
-
   void shutdownBeforeFlowController() override {
     stop_webserver(server);
   }
@@ -68,8 +66,6 @@ class CoapIntegrationBase : public IntegrationBase {
 
     queryRootProcessGroup(pg);
 
-    configureC2RootClasses();
-
     ptr.release();
 
     std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
@@ -83,6 +79,7 @@ class CoapIntegrationBase : public IntegrationBase {
 
     shutdownBeforeFlowController();
     controller->waitUnload(wait_time_);
+    controller->stopC2();
     runAssertions();
 
     cleanup();
diff --git a/extensions/http-curl/tests/C2DescribeManifestTest.cpp b/extensions/http-curl/tests/C2DescribeManifestTest.cpp
index 5a84c5c..891f6f5 100644
--- a/extensions/http-curl/tests/C2DescribeManifestTest.cpp
+++ b/extensions/http-curl/tests/C2DescribeManifestTest.cpp
@@ -16,35 +16,9 @@
  * limitations under the License.
  */
 
-#include <sys/stat.h>
 #undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
 #include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
 #include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "c2/C2Agent.h"
-#include "CivetServer.h"
-#include <cstring>
-#include "protocols/RESTSender.h"
 #include "HTTPIntegrationBase.h"
 #include "HTTPHandlers.h"
 
@@ -55,7 +29,7 @@ public:
       : HeartbeatHandler(isSecure) {
   }
 
-  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn) {
+  virtual void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) {
     sendHeartbeatResponse("DESCRIBE", "manifest", "889345", conn);
   }
 
diff --git a/extensions/http-curl/tests/C2FailedUpdateTest.cpp b/extensions/http-curl/tests/C2FailedUpdateTest.cpp
index f5f206e..35458ec 100644
--- a/extensions/http-curl/tests/C2FailedUpdateTest.cpp
+++ b/extensions/http-curl/tests/C2FailedUpdateTest.cpp
@@ -198,22 +198,18 @@ int main(int argc, char **argv) {
   std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
   std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
   ptr.release();
-  auto start = std::chrono::system_clock::now();
 
   controller->load();
   controller->start();
   waitToVerifyProcessor();
 
   controller->waitUnload(60000);
-  auto then = std::chrono::system_clock::now();
 
-  auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
   std::string logs = LogTestController::getInstance().log_output.str();
   assert(logs.find("Invalid configuration payload") != std::string::npos);
   assert(logs.find("update failed.") != std::string::npos);
   LogTestController::getInstance().reset();
   utils::file::FileUtils::delete_dir("content_repository",true);
-  assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
 
   return 0;
 }
diff --git a/extensions/http-curl/tests/C2JstackTest.cpp b/extensions/http-curl/tests/C2JstackTest.cpp
index 241af1c..e530444 100644
--- a/extensions/http-curl/tests/C2JstackTest.cpp
+++ b/extensions/http-curl/tests/C2JstackTest.cpp
@@ -16,35 +16,9 @@
  * limitations under the License.
  */
 
-#include <sys/stat.h>
 #undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
 #include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
 #include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "c2/C2Agent.h"
-#include "CivetServer.h"
-#include <cstring>
-#include "protocols/RESTSender.h"
 #include "HTTPIntegrationBase.h"
 #include "HTTPHandlers.h"
 
@@ -55,7 +29,7 @@ class VerifyC2DescribeJstack : public VerifyC2Describe {
   }
 
   virtual void runAssertions() {
-    assert(LogTestController::getInstance().contains("SchedulingAgent") == true);
+    assert(LogTestController::getInstance().contains("SchedulingAgent"));
   }
 };
 
@@ -65,12 +39,12 @@ class DescribeJstackHandler : public HeartbeatHandler {
      : HeartbeatHandler(isSecure) {
   }
 
-  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn) {
+  virtual void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) {
     sendHeartbeatResponse("DESCRIBE", "jstack", "889398", conn);
   }
 
   virtual void handleAcknowledge(const rapidjson::Document& root) {
-    assert(root.HasMember("SchedulingAgent #0") == true);
+    assert(root.HasMember("Flowcontroller threadpool #0"));
   }
 
 };
diff --git a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
index 6e0c1c2..8a15be9 100644
--- a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
+++ b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
@@ -16,65 +16,34 @@
  * limitations under the License.
  */
 
-#include <sys/stat.h>
 #undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
-#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
 #include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "CivetServer.h"
-#include "RemoteProcessorGroupPort.h"
-#include "core/ConfigurableComponent.h"
-#include "controllers/SSLContextService.h"
-#include "TestServer.h"
 #include "c2/C2Agent.h"
-#include "protocols/RESTReceiver.h"
+#include "protocols/RESTProtocol.h"
 #include "protocols/RESTSender.h"
+#include "protocols/RESTReceiver.h"
 #include "HTTPIntegrationBase.h"
 #include "HTTPHandlers.h"
-#include "agent/build_description.h"
-#include "processors/LogAttribute.h"
 
 class LightWeightC2Handler : public HeartbeatHandler {
  public:
   explicit LightWeightC2Handler(bool isSecure)
-      : HeartbeatHandler(isSecure),
-        calls_(0) {
+      : HeartbeatHandler(isSecure) {
   }
 
   virtual ~LightWeightC2Handler() = default;
 
-  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn)  {
-    (void)conn;
+  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection *)  {
     if (calls_ == 0) {
       verifyJsonHasAgentManifest(root);
     } else {
-      assert(root.HasMember("agentInfo") == true);
-      assert(root["agentInfo"].HasMember("agentManifest") == false);
+      assert(root.HasMember("agentInfo"));
+      assert(!root["agentInfo"].HasMember("agentManifest"));
     }
     calls_++;
   }
  private:
-  std::atomic<size_t> calls_;
+  std::atomic<size_t> calls_{0};
 };
 
 class VerifyC2Heartbeat : public VerifyC2Base {
@@ -83,8 +52,6 @@ class VerifyC2Heartbeat : public VerifyC2Base {
       : VerifyC2Base(isSecure) {
   }
 
-  virtual ~VerifyC2Heartbeat() = default;
-
   virtual void testSetup() {
     LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
     LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
@@ -94,15 +61,13 @@ class VerifyC2Heartbeat : public VerifyC2Base {
   }
 
   void runAssertions() {
-    assert(LogTestController::getInstance().contains("Received Ack from Server") == true);
-
-    assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component invoke") == true);
-
-    assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController") == true);
+    assert(LogTestController::getInstance().contains("Received Ack from Server"));
+    assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component invoke"));
+    assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController"));
   }
 
-  void configureC2RootClasses() {
-    configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformation,FlowInformation");
+  void configureFullHeartbeat() {
+    configuration->set("nifi.c2.full.heartbeat", "true");
   }
 };
 
@@ -112,10 +77,8 @@ public:
       : VerifyC2Heartbeat(isSecure) {
   }
 
-  virtual ~VerifyLightWeightC2Heartbeat() = default;
-
-  void configureC2RootClasses() {
-    configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformationWithoutManifest,FlowInformation");
+  void configureFullHeartbeat() {
+    configuration->set("nifi.c2.full.heartbeat", "false");
   }
 };
 
@@ -136,24 +99,16 @@ int main(int argc, char **argv) {
   }
   {
     VerifyC2Heartbeat harness(isSecure);
-
     harness.setKeyDir(key_dir);
-
     HeartbeatHandler responder(isSecure);
-
     harness.setUrl(url, &responder);
-
     harness.run(test_file_location);
   }
 
   VerifyLightWeightC2Heartbeat harness(isSecure);
-
   harness.setKeyDir(key_dir);
-
   LightWeightC2Handler responder(isSecure);
-
   harness.setUrl(url, &responder);
-
   harness.run(test_file_location);
 
   return 0;
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 1de39b3..4d3a549 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -353,18 +353,18 @@ class HeartbeatHandler : public CivetHandler {
 
   std::string readPost(struct mg_connection *conn) {
     std::string response;
-    int blockSize = 1024 * sizeof(char), readBytes;
+    int readBytes;
 
     char buffer[1024];
-    while ((readBytes = mg_read(conn, buffer, blockSize)) > 0) {
-      response.append(buffer, 0, (readBytes / sizeof(char)));
+    while ((readBytes = mg_read(conn, buffer, sizeof(buffer))) > 0) {
+      response.append(buffer, (readBytes / sizeof(char)));
     }
     return response;
   }
 
   void sendStopOperation(struct mg_connection *conn) {
-    std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\"  }, "
-        "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\"  } ]}";
+    std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"operand\" : \"invoke\"  }, "
+        "{ \"operationid\" : 42, \"operation\" : \"stop\", \"operand\" : \"FlowController\"  } ]}";
     mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
               "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
               resp.length());
@@ -385,14 +385,14 @@ class HeartbeatHandler : public CivetHandler {
 
   void verifyJsonHasAgentManifest(const rapidjson::Document& root) {
     bool found = false;
-    assert(root.HasMember("agentInfo") == true);
-    assert(root["agentInfo"].HasMember("agentManifest") == true);
-    assert(root["agentInfo"]["agentManifest"].HasMember("bundles") == true);
+    assert(root.HasMember("agentInfo"));
+    assert(root["agentInfo"].HasMember("agentManifest"));
+    assert(root["agentInfo"]["agentManifest"].HasMember("bundles"));
 
     for (auto &bundle : root["agentInfo"]["agentManifest"]["bundles"].GetArray()) {
       assert(bundle.HasMember("artifact"));
       std::string str = bundle["artifact"].GetString();
-      if (str == "minifi-system") {
+      if (str == "minifi-standard-processors") {
 
         std::vector<std::string> classes;
         for (auto &proc : bundle["componentManifest"]["processors"].GetArray()) {
@@ -407,23 +407,23 @@ class HeartbeatHandler : public CivetHandler {
 
       }
     }
-    assert(found == true);
+    assert(found);
   }
 
-  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn) {
-    (void)conn;
+  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection *) {
     verifyJsonHasAgentManifest(root);
   }
 
-  virtual void handleAcknowledge(const rapidjson::Document& root) {
+  virtual void handleAcknowledge(const rapidjson::Document&) {
   }
 
   void verify(struct mg_connection *conn) {
     auto post_data = readPost(conn);
-    std::cerr << post_data << std::endl;
+    //std::cerr << post_data << std::endl;
     if (!IsNullOrEmpty(post_data)) {
       rapidjson::Document root;
       rapidjson::ParseResult ok = root.Parse(post_data.data(), post_data.size());
+      assert(ok);
       std::string operation = root["operation"].GetString();
       if (operation == "heartbeat") {
         handleHeartbeat(root, conn);
@@ -435,7 +435,7 @@ class HeartbeatHandler : public CivetHandler {
     }
   }
 
-  bool handlePost(CivetServer *server, struct mg_connection *conn) {
+  bool handlePost(CivetServer *, struct mg_connection *conn) {
     verify(conn);
     sendStopOperation(conn);
     return true;
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 4226630..02e4104 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -91,33 +91,14 @@ class VerifyC2Base : public CoapIntegrationBase {
  public:
   explicit VerifyC2Base(bool isSecure)
       : isSecure(isSecure) {
-    char format[] = "/tmp/ssth.XXXXXX";
-    dir = testController.createTempDirectory(format);
   }
 
   virtual void testSetup() {
     LogTestController::getInstance().setDebug<utils::HTTPClient>();
     LogTestController::getInstance().setDebug<LogTestController>();
-    std::fstream file;
-    ss << dir << "/" << "tstFile.ext";
-    file.open(ss.str(), std::ios::out);
-    file << "tempFile";
-    file.close();
   }
 
-  void runAssertions() {
-  }
-
-  virtual void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
-    std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
-    assert(proc != nullptr);
-
-    std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
-
-    assert(inv != nullptr);
-    std::string url = "";
-    inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
-
+  virtual void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup>) override {
     std::string c2_url = std::string("http") + (isSecure ? "s" : "") + "://localhost:" + getWebPort() + "/api/heartbeat";
 
     configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
@@ -126,18 +107,15 @@ class VerifyC2Base : public CoapIntegrationBase {
     configuration->set("nifi.c2.rest.url", c2_url);
     configuration->set("nifi.c2.agent.heartbeat.period", "1000");
     configuration->set("nifi.c2.rest.url.ack", c2_url);
+    configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformation,FlowInformation");
   }
 
   void cleanup() {
     LogTestController::getInstance().reset();
-    unlink(ss.str().c_str());
   }
 
  protected:
   bool isSecure;
-  std::string dir;
-  std::stringstream ss;
-  TestController testController;
 };
 
 class VerifyC2Describe : public VerifyC2Base {
@@ -153,8 +131,11 @@ class VerifyC2Describe : public VerifyC2Base {
     VerifyC2Base::testSetup();
   }
 
-  void configureC2RootClasses() {
-    configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformationWithoutManifest,FlowInformation");
+  void configureFullHeartbeat() {
+    configuration->set("nifi.c2.full.heartbeat", "false");
+  }
+
+  void runAssertions() {
   }
 };
 #endif /* LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_ */
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index bef607d..5efac83 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -47,8 +47,9 @@
 #include "core/Property.h"
 #include "core/state/nodes/MetricsBase.h"
 #include "utils/Id.h"
-#include "core/state/StateManager.h"
+#include "core/state/UpdateController.h"
 #include "core/state/nodes/FlowInformation.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -61,7 +62,7 @@ namespace minifi {
  * Flow Controller class. Generally used by FlowController factory
  * as a singleton.
  */
-class FlowController : public core::controller::ControllerServiceProvider, public state::StateManager {
+class FlowController : public core::controller::ControllerServiceProvider, public state::response::NodeReporter,  public state::StateMonitor, public std::enable_shared_from_this<FlowController> {
  public:
   /**
    * Flow controller constructor
@@ -285,36 +286,23 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
   virtual void enableAllControllerServices();
 
   /**
-   * Retrieves all root response nodes from this source.
-   * @param metric_vector -- metrics will be placed in this vector.
-   * @return result of the get operation.
-   *  0 Success
-   *  1 No error condition, but cannot obtain lock in timely manner.
-   *  -1 failure
-   */
-  virtual int16_t getResponseNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector, uint16_t metricsClass);
-  /**
-   * Retrieves all metrics from this source.
-   * @param metric_vector -- metrics will be placed in this vector.
-   * @return result of the get operation.
-   *  0 Success
-   *  1 No error condition, but cannot obtain lock in timely manner.
-   *  -1 failure
+   * Retrieves metrics node
+   * @return metrics response node
    */
-  virtual int16_t getMetricsNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector, uint16_t metricsClass);
+  virtual std::shared_ptr<state::response::ResponseNode> getMetricsNode(const std::string& metricsClass) const;
 
   /**
-   * Retrieves agent information with manifest only from this source.
-   * @param manifest_vector -- manifest nodes vector.
-   * @return 0 on Success, -1 on failure
+   * Retrieves root nodes configured to be included in heartbeat
+   * @param includeManifest -- determines if manifest is to be included
+   * @return a list of response nodes
    */
-  virtual int16_t getManifestNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>& manifest_vector) const;
+  virtual std::vector<std::shared_ptr<state::response::ResponseNode>> getHeartbeatNodes(bool includeManifest) const;
 
   /**
-   * Returns a response node containing all agent information with manifest and agent status
-   * @return a shared pointer to agent information
+   * Retrieves the agent manifest to be sent as a response to C2 DESCRIBE manifest
+   * @return the agent manifest response node
    */
-  virtual std::shared_ptr<state::response::ResponseNode> getAgentInformation() const;
+  virtual std::shared_ptr<state::response::ResponseNode> getAgentManifest() const;
 
   virtual uint64_t getUptime();
 
@@ -322,6 +310,8 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
 
   void initializeC2();
 
+  void stopC2();
+
  protected:
 
   void loadC2ResponseConfiguration();
@@ -335,6 +325,8 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
 
   void initializeExternalComponents();
 
+  std::shared_ptr<state::response::ResponseNode> getAgentInformation() const;
+
   /**
    * Initializes flow controller paths.
    */
@@ -409,16 +401,14 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
 
   std::map<uint8_t, std::vector<std::shared_ptr<state::response::ResponseNode>>> component_metrics_by_id_;
 
-  // manifest cache
-  std::map<std::string, std::shared_ptr<state::response::ResponseNode>> agent_information_;
-
   // metrics last run
   std::chrono::steady_clock::time_point last_metrics_capture_;
 
  private:
   std::shared_ptr<logging::Logger> logger_;
   std::string serial_number_;
-  static std::shared_ptr<utils::IdGenerator> id_generator_;
+
+  std::unique_ptr<state::UpdateController> c2_agent_;
 };
 
 }
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 13d7ded..51c7b61 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -101,10 +101,6 @@ class SchedulingAgent {
     running_ = false;
   }
 
-  std::vector<BackTrace> getTraces() {
-    return thread_pool_.getTraces();
-  }
-
   void watchDogFunc();
 
   virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index c39589f..0ea2445 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -34,6 +34,9 @@
 #include "C2Protocol.h"
 #include "io/validation.h"
 #include "HeartBeatReporter.h"
+#include "utils/ThreadPool.h"
+#include "utils/Id.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -52,39 +55,27 @@ namespace c2 {
  *   0 HeartBeat --  RESERVED
  *   1-255 Defined by the configuration file.
  */
-class C2Agent : public state::UpdateController, public state::response::ResponseNodeSink, public std::enable_shared_from_this<C2Agent> {
+class C2Agent : public state::UpdateController {
  public:
 
-  C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure);
+  C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller,
+          const std::shared_ptr<state::StateMonitor> &updateSink,
+          const std::shared_ptr<Configure> &configure);
 
   virtual ~C2Agent() noexcept {
     delete protocol_.load();
   }
 
+  void start() override;
+
+  void stop() override;
+
   /**
    * Sends the heartbeat to ths server. Will include metrics
    * in the payload if they exist.
    */
   void performHeartBeat();
 
-  virtual std::vector<std::function<state::Update()>> getFunctions() {
-    return functions_;
-  }
-
-  /**
-   * Sets the metric within this sink
-   * @param metric metric to set
-   * @param return 0 on success, -1 on failure.
-   */
-  virtual int16_t setResponseNodes(const std::shared_ptr<state::response::ResponseNode> &metric);
-
-  /**
-    * Sets the metric within this sink
-    * @param metric metric to set
-    * @param return 0 on success, -1 on failure.
-    */
-   virtual int16_t setMetricsNodes(const std::shared_ptr<state::response::ResponseNode> &metric);
-
   int64_t getHeartBeatDelay(){
     std::lock_guard<std::mutex> lock(heartbeat_mutex);
     return heart_beat_period_;
@@ -180,9 +171,9 @@ class C2Agent : public state::UpdateController, public state::response::Response
   std::map<std::string, std::shared_ptr<state::response::ResponseNode>> metrics_map_;
 
   /**
-     * Device information stored in the metrics format
-     */
-    std::map<std::string, std::shared_ptr<state::response::ResponseNode>> root_response_nodes_;
+   * Device information stored in the metrics format
+   */
+  std::map<std::string, std::shared_ptr<state::response::ResponseNode>> root_response_nodes_;
 
   /**
    * Device information stored in the metrics format
@@ -210,16 +201,16 @@ class C2Agent : public state::UpdateController, public state::response::Response
   std::chrono::steady_clock::time_point last_run_;
 
   // function that performs the heartbeat
-  std::function<state::Update()> c2_producer_;
+  std::function<utils::TaskRescheduleInfo()> c2_producer_;
 
   // function that acts upon the
-  std::function<state::Update()> c2_consumer_;
+  std::function<utils::TaskRescheduleInfo()> c2_consumer_;
 
   // reference to the update sink, against which we will execute updates.
   std::shared_ptr<state::StateMonitor> update_sink_;
 
   // functions that will be used for the udpate controller.
-  std::vector<std::function<state::Update()>> functions_;
+  std::vector<std::function<utils::TaskRescheduleInfo()>> functions_;
 
   std::shared_ptr<controllers::UpdatePolicyControllerService> update_service_;
 
@@ -250,7 +241,13 @@ class C2Agent : public state::UpdateController, public state::response::Response
 
   std::shared_ptr<logging::Logger> logger_;
 
+  utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_;
+
+  std::vector<std::string> task_ids_;
+
   bool manifest_sent_;
+
+  const uint64_t C2RESPONSE_POLL_MS = 100;
 };
 
 } /* namesapce c2 */
diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h
index 51712e3..6c0b6d2 100644
--- a/libminifi/include/core/Core.h
+++ b/libminifi/include/core/Core.h
@@ -153,7 +153,7 @@ class CoreComponent {
       : name_(name) {
     if (uuid == nullptr) {
       // Generate the global UUID for the flow record
-      id_generator_->generate(uuid_);
+      utils::IdGenerator::getIdGenerator()->generate(uuid_);
     } else {
       uuid_ = uuid;
     }
@@ -163,7 +163,7 @@ class CoreComponent {
   explicit CoreComponent(const std::string &name)
       : name_(name) {
     // Generate the global UUID for the flow record
-    id_generator_->generate(uuid_);
+    utils::IdGenerator::getIdGenerator()->generate(uuid_);
     uuidStr_ = uuid_.to_string();
   }
 
@@ -226,9 +226,6 @@ protected:
 
   // Connectable's name
   std::string name_;
-
-private:
-  static std::shared_ptr<utils::IdGenerator> id_generator_;
 };
 
 namespace logging {
diff --git a/libminifi/include/core/state/StateManager.h b/libminifi/include/core/state/StateManager.h
deleted file mode 100644
index 983ed0b..0000000
--- a/libminifi/include/core/state/StateManager.h
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_STATE_MANAGER_H
-#define LIBMINIFI_INCLUDE_STATE_MANAGER_H
-
-#include <map>
-#include <atomic>
-#include <algorithm>
-
-#include "UpdateController.h"
-#include "io/validation.h"
-#include "utils/ThreadPool.h"
-#include "core/Core.h"
-#include "nodes/MetricsBase.h"
-#include "nodes/TreeUpdateListener.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-
-/**
- * State manager is meant to be used by implementing agents of this library. It represents the source and sink for metrics,
- * the sink for external updates, and encapsulates the thread pool that runs the listeners for various update operations
- * that can be performed.
- */
-class StateManager : public response::NodeReporter, public response::ResponseNodeSink, public StateMonitor, public std::enable_shared_from_this<StateManager> {
- public:
-
-  StateManager()
-      : metrics_listener_(nullptr) {
-
-  }
-
-  virtual ~StateManager() {
-
-  }
-
-  /**
-   * Initializes the thread pools.
-   */
-  void initialize();
-
-  /**
-   * State management operations.
-   */
-  /**
-   * Stop this controllable.
-   * @param force force stopping
-   * @param timeToWait time to wait before giving up.
-   * @return status of stopping this controller.
-   */
-  virtual int16_t stop(bool force, uint64_t timeToWait = 0);
-
-  /**
-   * Updates the given flow controller.
-   */
-  int16_t update(const std::shared_ptr<Update> &updateController);
-
-  /**
-   * Passes metrics to the update controllers if they are a metrics sink.
-   * @param metrics metric to pass through
-   */
-  int16_t setResponseNodes(const std::shared_ptr<response::ResponseNode> &metrics);
-
-  /**
-   * Metrics operations
-   */
-  virtual int16_t getResponseNodes(std::vector<std::shared_ptr<response::ResponseNode>> &metric_vector, uint16_t metricsClass);
-
-  virtual std::string getVersion(){
-    return "";
-  }
-
- protected:
-
-  void shutdownState(){
-    listener_thread_pool_.shutdown();
-    metrics_maps_.clear();
-    updateControllers.clear();
-  }
-
-  /**
-   * Function to apply updates for a given  update controller.
-   * @param source  source identifier
-   * @param updateController update controller mechanism.
-   */
-  virtual int16_t applyUpdate(const std::string &source, const std::shared_ptr<Update> &updateController) = 0;
-
-  /**
-   * Registers and update controller
-   * @param updateController update controller to add.
-   */
-  bool registerUpdateListener(const std::shared_ptr<UpdateController> &updateController, const int64_t &delay);
-
-
-  /**
-   * Base metrics function will employ the default metrics listener.
-   */
-  virtual bool startMetrics(const int64_t &delay);
-
- private:
-
-  std::timed_mutex mutex_;
-
-  std::map<std::string, std::shared_ptr<response::ResponseNode>> metrics_maps_;
-
-  std::vector<std::shared_ptr<UpdateController> > updateControllers;
-
-  std::unique_ptr<state::response::TreeUpdateListener> metrics_listener_;
-
-  utils::ThreadPool<Update> listener_thread_pool_;
-
-};
-
-
-
-} /* namespace state */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif /* LIBMINIFI_INCLUDE_C2_CONTROLLABLE_H_ */
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
index 8c76e58..9b8d785 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -256,12 +256,26 @@ class StateMonitor : public StateController {
 class UpdateController {
  public:
 
-  virtual std::vector<std::function<Update()>> getFunctions() = 0;
+  UpdateController()
+      : controller_running_(false) {
+  }
+
+  virtual ~UpdateController() = default;
+
+  virtual std::vector<std::function<utils::TaskRescheduleInfo()>> getFunctions() {
+    return {};
+  }
 
-  virtual ~UpdateController() {
+  virtual void start() = 0;
 
+  virtual void stop() = 0;
+
+  std::atomic<bool>& isControllerRunning() {
+    return controller_running_;
   }
+ protected:
 
+  std::atomic<bool> controller_running_;
 };
 
 } /* namespace state */
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h
index 5809a4c..4e163e5 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -504,8 +504,8 @@ class AgentStatus : public StateMonitorNode {
 class AgentIdentifier {
  public:
 
-  AgentIdentifier() {
-
+  AgentIdentifier()
+     : include_agent_manifest_(true) {
   }
 
   void setIdentifier(const std::string &identifier) {
@@ -516,9 +516,14 @@ class AgentIdentifier {
     agent_class_ = agentClass;
   }
 
+  void includeAgentManifest(bool include) {
+    include_agent_manifest_ = include;
+  }
+
  protected:
   std::string identifier_;
   std::string agent_class_;
+  bool include_agent_manifest_;
 };
 
 class AgentMonitor {
@@ -675,31 +680,9 @@ protected:
     serialized.push_back(agentManifest);
     return serialized;
   }
-};
-
-/**
- * This class is used for regular heartbeat without manifest
- * A light weight heartbeat
- */
-class AgentInformationWithoutManifest : public AgentNode {
-public:
-
-  AgentInformationWithoutManifest(const std::string& name, utils::Identifier & uuid)
-      : AgentNode(name, uuid) {
-    setArray(false);
-  }
 
-  explicit AgentInformationWithoutManifest(const std::string &name)
-      : AgentNode(name) {
-    setArray(false);
-  }
-
-  std::string getName() const {
-    return "agentInfo";
-  }
-
-  std::vector<SerializedResponseNode> serialize() {
-    std::vector<SerializedResponseNode> serialized(AgentNode::serialize());
+  std::vector<SerializedResponseNode> getAgentStatus() const {
+    std::vector<SerializedResponseNode> serialized;
 
     AgentStatus status("status");
     status.setRepositories(repositories_);
@@ -716,21 +699,23 @@ public:
   }
 };
 
-
 /**
- * This class is used for sending all agent information including manifest and status
- * A heavy weight heartbeat. Here to maintain backward compatibility
+ * This class is used for sending agent information while including
+ * or excluding the agent manifest. agent status and agent manifest
+ * is included by default
  */
-class AgentInformation : public AgentInformationWithoutManifest {
+class AgentInformation : public AgentNode {
  public:
 
   AgentInformation(const std::string& name, utils::Identifier & uuid)
-      : AgentInformationWithoutManifest(name, uuid) {
+      : AgentNode(name, uuid),
+        include_agent_status_(true) {
     setArray(false);
   }
 
   explicit AgentInformation(const std::string &name)
-      : AgentInformationWithoutManifest(name) {
+      : AgentNode(name),
+        include_agent_status_(true) {
     setArray(false);
   }
 
@@ -738,45 +723,28 @@ class AgentInformation : public AgentInformationWithoutManifest {
     return "agentInfo";
   }
 
-  std::vector<SerializedResponseNode> serialize() {
-    std::vector<SerializedResponseNode> serialized(AgentInformationWithoutManifest::serialize());
-    auto manifest = getAgentManifest();
-    serialized.insert(serialized.end(), std::make_move_iterator(manifest.begin()), std::make_move_iterator(manifest.end()));
-    return serialized;
-  }
-
-};
-
-/**
- * This class is used for response to DESCRIBE manifest request
- * It contains static information only
- */
-class AgentInformationWithManifest : public AgentNode {
-public:
-  AgentInformationWithManifest(const std::string& name, utils::Identifier & uuid)
-      : AgentNode(name, uuid) {
-    setArray(false);
-  }
-
-  explicit AgentInformationWithManifest(const std::string &name)
-      : AgentNode(name) {
-    setArray(false);
-  }
-
-  std::string getName() const {
-    return "agentInfo";
+  void includeAgentStatus(bool include) {
+    include_agent_status_ = include;
   }
 
   std::vector<SerializedResponseNode> serialize() {
     std::vector<SerializedResponseNode> serialized(AgentNode::serialize());
-    auto manifest = getAgentManifest();
-    serialized.insert(serialized.end(), std::make_move_iterator(manifest.begin()), std::make_move_iterator(manifest.end()));
+    if (include_agent_manifest_) {
+      auto manifest = getAgentManifest();
+      serialized.insert(serialized.end(), std::make_move_iterator(manifest.begin()), std::make_move_iterator(manifest.end()));
+    }
+
+    if (include_agent_status_) {
+      auto status = getAgentStatus();
+      serialized.insert(serialized.end(), std::make_move_iterator(status.begin()), std::make_move_iterator(status.end()));
+    }
     return serialized;
   }
+ protected:
+  bool include_agent_status_;
 };
 
 REGISTER_RESOURCE(AgentInformation, "Node part of an AST that defines all agent information, to include the manifest, and bundle information as part of a healthy hearbeat.");
-REGISTER_RESOURCE(AgentInformationWithoutManifest, "Node part of an AST that defines all agent information, without the manifest and bundle information as part of a healthy hearbeat.");
 
 } /* namespace metrics */
 } /* namespace state */
diff --git a/libminifi/include/core/state/nodes/MetricsBase.h b/libminifi/include/core/state/nodes/MetricsBase.h
index d91df6e..eb06ff7 100644
--- a/libminifi/include/core/state/nodes/MetricsBase.h
+++ b/libminifi/include/core/state/nodes/MetricsBase.h
@@ -209,37 +209,23 @@ class NodeReporter {
   }
 
   /**
-   * Retrieves all root response nodes from this source.
-   * @param metric_vector -- metrics will be placed in this vector.
-   * @return result of the get operation.
-   *  0 Success
-   *  1 No error condition, but cannot obtain lock in timely manner.
-   *  -1 failure
-   */
-  virtual int16_t getResponseNodes(std::vector<std::shared_ptr<ResponseNode>> &metric_vector, uint16_t metricsClass) = 0;
-
-  /**
-   * Retrieves all metrics from this source.
-   * @param metric_vector -- metrics will be placed in this vector.
-   * @return result of the get operation.
-   *  0 Success
-   *  1 No error condition, but cannot obtain lock in timely manner.
-   *  -1 failure
+   * Retrieves metrics node
+   * @return metrics response node
    */
-  virtual int16_t getMetricsNodes(std::vector<std::shared_ptr<ResponseNode>> &metric_vector, uint16_t metricsClass) = 0;
+  virtual std::shared_ptr<ResponseNode> getMetricsNode(const std::string& metricsClass) const = 0;
 
   /**
-   * Retrieves agent information with manifest only from this source.
-   * @param manifest_vector -- manifest nodes vector.
-   * @return 0 on Success, -1 on failure
+   * Retrieves root nodes configured to be included in heartbeat
+   * @param includeManifest -- determines if manifest is to be included
+   * @return a list of response nodes
    */
-  virtual int16_t getManifestNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>& manifest_vector) const = 0;
+  virtual std::vector<std::shared_ptr<ResponseNode>> getHeartbeatNodes(bool includeManifest) const = 0;
 
   /**
-   * Returns a response node containing all agent information with manifest and agent status
-   * @return a shared pointer to agent information
+   * Retrieves the agent manifest to be sent as a response to C2 DESCRIBE manifest
+   * @return the agent manifest response node
    */
-  virtual std::shared_ptr<state::response::ResponseNode> getAgentInformation() const = 0;
+  virtual std::shared_ptr<state::response::ResponseNode> getAgentManifest() const = 0;
 };
 
 /**
@@ -269,7 +255,7 @@ class ResponseNodeSink {
    *  1 No error condition, but cannot obtain lock in timely manner.
    *  -1 failure
    */
-//  virtual int16_t setMetricsNodes(const std::shared_ptr<ResponseNode> &metrics) = 0;
+  virtual int16_t setMetricsNodes(const std::shared_ptr<ResponseNode> &metrics) = 0;
 };
 
 } /* namespace metrics */
diff --git a/libminifi/include/core/state/nodes/TreeUpdateListener.h b/libminifi/include/core/state/nodes/TreeUpdateListener.h
index 4b7e237..ad22e2a 100644
--- a/libminifi/include/core/state/nodes/TreeUpdateListener.h
+++ b/libminifi/include/core/state/nodes/TreeUpdateListener.h
@@ -75,49 +75,6 @@ class OperationWatcher : public utils::AfterExecute<Update> {
 
 };
 
-class TreeUpdateListener {
- public:
-  TreeUpdateListener(const std::shared_ptr<response::NodeReporter> &source, const std::shared_ptr<response::ResponseNodeSink> &sink)
-      : running_(true),
-        source_(source),
-        sink_(sink){
-
-    function_ = [&]() {
-      while(running_) {
-        std::vector<std::shared_ptr<response::ResponseNode>> metric_vector;
-        // simple pass through for the metrics
-        if (nullptr != source_ && nullptr != sink_) {
-          source_->getResponseNodes(metric_vector,0);
-          for(auto metric : metric_vector) {
-            sink_->setResponseNodes(metric);
-          }
-        }
-        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-      }
-      return MetricsUpdate(UpdateState::READ_COMPLETE);
-    };
-  }
-
-  void stop() {
-    running_ = false;
-  }
-
-  std::function<Update()> &getFunction() {
-    return function_;
-  }
-
-  std::future<Update> &getFuture() {
-    return future_;
-  }
-
- private:
-  std::function<Update()> function_;
-  std::future<Update> future_;
-  std::atomic<bool> running_;
-  std::shared_ptr<response::NodeReporter> source_;
-  std::shared_ptr<response::ResponseNodeSink> sink_;
-};
-
 } /* namespace metrics */
 } /* namespace state */
 } /* namespace minifi */
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index 9250b5b..824d386 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -36,10 +36,21 @@ class Configure : public Properties {
     std::lock_guard<std::mutex> lock(mutex_);
     agent_identifier_ = identifier;
   }
-  std::string getAgentIdentifier() {
+  std::string getAgentIdentifier() const {
     std::lock_guard<std::mutex> lock(mutex_);
     return agent_identifier_;
   }
+
+  void setAgentClass(const std::string& agentClass) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    agent_class_ = agentClass;
+  }
+
+  std::string getAgentClass() const {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return agent_class_;
+  }
+
   // nifi.flow.configuration.file
   static const char *nifi_default_directory;
   static const char *nifi_flow_configuration_file;
@@ -87,10 +98,12 @@ class Configure : public Properties {
   static const char *nifi_c2_flow_id;
   static const char *nifi_c2_flow_url;
   static const char *nifi_c2_flow_base_url;
+  static const char *nifi_c2_full_heartbeat;
 
  private:
   std::string agent_identifier_;
-  std::mutex mutex_;
+  std::string agent_class_;
+  mutable std::mutex mutex_;
 };
 
 } /* namespace minifi */
diff --git a/libminifi/include/utils/BackTrace.h b/libminifi/include/utils/BackTrace.h
index 5bd95cb..8ba57c2 100644
--- a/libminifi/include/utils/BackTrace.h
+++ b/libminifi/include/utils/BackTrace.h
@@ -25,8 +25,10 @@
 #include <utility>
 #include <vector>
 #include <mutex>
+#include <condition_variable>
 #include <iostream>
 #include <sstream>
+#include <memory>
 
 #define TRACE_BUFFER_SIZE 128
 
@@ -87,7 +89,6 @@ void emplace_handler();
  */
 class TraceResolver {
  public:
-
   /**
    * Retrieves the backtrace for the provided thread reference
    * @return BackTrace instance
@@ -145,27 +146,25 @@ class TraceResolver {
     trace_.addLine(line.str());
   }
 
-  /**
-   * Returns the thread handle reference in the native format.
-   */
-  std::thread::native_handle_type getThreadHandle() {
-    return thread_handle_;
+  std::unique_lock<std::mutex> lock() {
+    return std::unique_lock<std::mutex>(trace_mutex_);
   }
 
-  /**
-   * Returns the caller handle reference in the native format.
-   */
-  std::thread::native_handle_type getCallerHandle() {
-    return caller_handle_;
+  void notifyPullTracesDone(std::unique_lock<std::mutex>& lock) {
+    std::unique_lock<std::mutex> tlock(std::move(lock));
+    pull_traces_ = true;
+    trace_condition_.notify_one();
   }
 
  private:
   TraceResolver() = default;
 
   BackTrace trace_;
-  std::thread::native_handle_type thread_handle_{0};
-  std::thread::native_handle_type caller_handle_{0};
-  std::mutex mutex_;
+  mutable std::mutex mutex_;
+
+  bool pull_traces_{false};
+  mutable std::mutex trace_mutex_;
+  std::condition_variable trace_condition_;
 };
 
 #endif /* LIBMINIFI_INCLUDE_UTILS_BACKTRACE_H_ */
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 86e2abb..56a550e 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -50,7 +50,7 @@ namespace utils {
 template<typename T>
 class Worker {
  public:
-  explicit Worker(std::function<T()> &task, const std::string &identifier, std::unique_ptr<AfterExecute<T>> run_determinant)
+  explicit Worker(const std::function<T()> &task, const std::string &identifier, std::unique_ptr<AfterExecute<T>> run_determinant)
       : identifier_(identifier),
         next_exec_time_(std::chrono::steady_clock::now()),
         task(task),
@@ -58,7 +58,7 @@ class Worker {
     promise = std::make_shared<std::promise<T>>();
   }
 
-  explicit Worker(std::function<T()> &task, const std::string &identifier)
+  explicit Worker(const std::function<T()> &task, const std::string &identifier)
       : identifier_(identifier),
         next_exec_time_(std::chrono::steady_clock::now()),
         task(task),
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 38b9c19..fbb0fa9 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -62,6 +62,7 @@ const char *Configure::nifi_c2_file_watch = "nifi.c2.file.watch";
 const char *Configure::nifi_c2_flow_id = "nifi.c2.flow.id";
 const char *Configure::nifi_c2_flow_url = "nifi.c2.flow.url";
 const char *Configure::nifi_c2_flow_base_url = "nifi.c2.flow.base.url";
+const char *Configure::nifi_c2_full_heartbeat = "nifi.c2.full.heartbeat";
 
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 1a251b5..350b64d 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -54,6 +54,7 @@
 #include "core/Connectable.h"
 #include "utils/HTTPClient.h"
 #include "io/NetworkPrioritizer.h"
+#include "io/validation.h"
 
 #ifdef _MSC_VER
 #ifndef PATH_MAX
@@ -66,8 +67,6 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-std::shared_ptr<utils::IdGenerator> FlowController::id_generator_ = utils::IdGenerator::getIdGenerator();
-
 #define DEFAULT_CONFIG_NAME "conf/config.yml"
 
 FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure,
@@ -98,7 +97,7 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
   if (IsNullOrEmpty(configuration_)) {
     throw std::runtime_error("Must supply a configuration.");
   }
-  id_generator_->generate(uuid_);
+  utils::IdGenerator::getIdGenerator()->generate(uuid_);
   setUUID(uuid_);
   flow_update_ = false;
   // Setup the default values
@@ -172,6 +171,7 @@ void FlowController::initializePaths(const std::string &adjustedFilename) {
 
 FlowController::~FlowController() {
   stop(true);
+  stopC2();
   unload();
   if (NULL != protocol_)
     delete protocol_;
@@ -179,6 +179,11 @@ FlowController::~FlowController() {
   provenance_repo_ = nullptr;
 }
 
+void FlowController::stopC2() {
+  if (c2_agent_)
+    c2_agent_->stop();
+}
+
 bool FlowController::applyConfiguration(const std::string &source, const std::string &configurePayload) {
   std::unique_ptr<core::ProcessGroup> newRoot;
   try {
@@ -243,7 +248,7 @@ int16_t FlowController::stop(bool force, uint64_t timeToWait) {
     this->timer_scheduler_->stop();
     this->event_scheduler_->stop();
     this->cron_scheduler_->stop();
-    this->thread_pool_.shutdown();
+    thread_pool_.shutdown();
     running_ = false;
   }
   return 0;
@@ -282,8 +287,6 @@ void FlowController::unload() {
     initialized_ = false;
     name_ = "";
   }
-
-  return;
 }
 
 void FlowController::load(const std::shared_ptr<core::ProcessGroup> &root, bool reload) {
@@ -398,6 +401,7 @@ int16_t FlowController::start() {
       this->protocol_->start();
       this->provenance_repo_->start();
       this->flow_file_repo_->start();
+      thread_pool_.start();
       logger_->log_info("Started Flow Controller");
     }
     return 0;
@@ -414,6 +418,7 @@ void FlowController::initializeC2() {
 
   // don't need to worry about the return code, only whether class_str is defined.
   configuration_->get("nifi.c2.agent.class", "c2.agent.class", class_str);
+  configuration_->setAgentClass(class_str);
 
   if (configuration_->get(Configure::nifi_c2_enable, "c2.enable", c2_enable_str)) {
     bool enable_c2 = true;
@@ -447,28 +452,17 @@ void FlowController::initializeC2() {
     // set to the flow controller's identifier
     identifier_str = uuidStr_;
   }
+  configuration_->setAgentIdentifier(identifier_str);
 
-  if (!c2_initialized_) {
-    configuration_->setAgentIdentifier(identifier_str);
-    state::StateManager::initialize();
-    std::shared_ptr<c2::C2Agent> agent = std::make_shared<c2::C2Agent>(std::dynamic_pointer_cast<FlowController>(shared_from_this()), std::dynamic_pointer_cast<FlowController>(shared_from_this()),
-                                                                       configuration_);
-    registerUpdateListener(agent, agent->getHeartBeatDelay());
-
-    state::StateManager::startMetrics(agent->getHeartBeatDelay());
-
-    c2_initialized_ = true;
-  } else {
-    if (!flow_update_) {
-      return;
-    }
+  if (c2_initialized_ && !flow_update_) {
+    return;
   }
+
   device_information_.clear();
   component_metrics_.clear();
   component_metrics_by_id_.clear();
-  agent_information_.clear();
-  std::string class_csv;
 
+  std::string class_csv;
   if (root_ != nullptr) {
     std::shared_ptr<state::response::QueueMetrics> queueMetrics = std::make_shared<state::response::QueueMetrics>();
 
@@ -485,15 +479,6 @@ void FlowController::initializeC2() {
     repoMetrics->addRepository(flow_file_repo_);
 
     device_information_[repoMetrics->getName()] = repoMetrics;
-
-    std::shared_ptr<state::response::AgentInformationWithManifest> manifest = std::make_shared<state::response::AgentInformationWithManifest>("agentInformation");
-    auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(manifest);
-
-    if (identifier != nullptr) {
-      identifier->setIdentifier(identifier_str);
-      identifier->setAgentClass(class_str);
-      agent_information_[manifest->getName()] = manifest;
-    }
   }
 
   if (configuration_->get("nifi.c2.root.classes", class_csv)) {
@@ -611,6 +596,14 @@ void FlowController::initializeC2() {
   }
 
   loadC2ResponseConfiguration();
+
+  if (!c2_initialized_) {
+    c2_agent_ = std::unique_ptr<c2::C2Agent>(new c2::C2Agent(std::dynamic_pointer_cast<FlowController>(shared_from_this()),
+                                                             std::dynamic_pointer_cast<FlowController>(shared_from_this()),
+                                                             configuration_));
+    c2_agent_->start();
+    c2_initialized_ = true;
+  }
 }
 
 void FlowController::loadC2ResponseConfiguration(const std::string &prefix) {
@@ -917,57 +910,44 @@ int16_t FlowController::clearConnection(const std::string &connection) {
   return -1;
 }
 
-int16_t FlowController::getResponseNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector, uint16_t metricsClass) {
-  std::lock_guard<std::mutex> lock(metrics_mutex_);
-
-  for (auto metric : root_response_nodes_) {
-    metric_vector.push_back(metric.second);
-  }
-
-  return 0;
-}
-
-int16_t FlowController::getMetricsNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector, uint16_t metricsClass) {
+std::shared_ptr<state::response::ResponseNode> FlowController::getMetricsNode(const std::string& metricsClass) const {
   std::lock_guard<std::mutex> lock(metrics_mutex_);
-  if (metricsClass == 0) {
-    for (auto metric : device_information_) {
-      metric_vector.push_back(metric.second);
+  if (!metricsClass.empty()) {
+    const auto citer = component_metrics_.find(metricsClass);
+    if (citer != component_metrics_.end()) {
+      return citer->second;
     }
   } else {
-    auto metrics = component_metrics_by_id_[metricsClass];
-    for (const auto &metric : metrics) {
-      metric_vector.push_back(metric);
+    const auto iter = root_response_nodes_.find("metrics");
+    if (iter != root_response_nodes_.end()) {
+      return iter->second;
     }
   }
-  return 0;
+  return nullptr;
 }
 
-int16_t FlowController::getManifestNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>& manifest_vector) const {
-    std::lock_guard<std::mutex> lock(metrics_mutex_);
-    for (const auto& metric : agent_information_) {
-        manifest_vector.push_back(metric.second);
+std::vector<std::shared_ptr<state::response::ResponseNode>> FlowController::getHeartbeatNodes(bool includeManifest) const {
+  std::string fullHb{"true"};
+  configuration_->get("nifi.c2.full.heartbeat", fullHb);
+  const bool include = includeManifest ? true : (fullHb == "true");
+
+  std::vector<std::shared_ptr<state::response::ResponseNode>> nodes;
+  for (const auto& entry : root_response_nodes_) {
+    auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(entry.second);
+    if (identifier) {
+      identifier->includeAgentManifest(include);
     }
-    return 0;
+    nodes.push_back(entry.second);
+  }
+  return nodes;
 }
 
-std::shared_ptr<state::response::ResponseNode> FlowController::getAgentInformation() const {
-    auto agentInfo = std::make_shared<state::response::AgentInformation>("agentInfo");
-    auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(agentInfo);
-
-    if (identifier != nullptr) {
-      std::string class_str;
-      configuration_->get("nifi.c2.agent.class", "c2.agent.class", class_str);
-
-      std::string identifier_str;
-      if (!configuration_->get("nifi.c2.agent.identifier", "c2.agent.identifier", identifier_str) || identifier_str.empty()) {
-        identifier_str = uuidStr_;
-      }
-
-      identifier->setIdentifier(identifier_str);
-      identifier->setAgentClass(class_str);
-      return agentInfo;
-    }
-    return nullptr;
+std::shared_ptr<state::response::ResponseNode> FlowController::getAgentManifest() const {
+  auto agentInfo = std::make_shared<state::response::AgentInformation>("agentInfo");
+  agentInfo->setIdentifier(configuration_->getAgentIdentifier());
+  agentInfo->setAgentClass(configuration_->getAgentClass());
+  agentInfo->includeAgentStatus(false);
+  return agentInfo;
 }
 
 std::vector<std::shared_ptr<state::StateController>> FlowController::getAllComponents() {
@@ -1029,14 +1009,7 @@ uint64_t FlowController::getUptime() {
 }
 
 std::vector<BackTrace> FlowController::getTraces() {
-  std::vector<BackTrace> traces;
-  auto timer_driven = timer_scheduler_->getTraces();
-  traces.insert(traces.end(), std::make_move_iterator(timer_driven.begin()), std::make_move_iterator(timer_driven.end()));
-  auto event_driven = event_scheduler_->getTraces();
-  traces.insert(traces.end(), std::make_move_iterator(event_driven.begin()), std::make_move_iterator(event_driven.end()));
-  auto cron_driven = cron_scheduler_->getTraces();
-  traces.insert(traces.end(), std::make_move_iterator(cron_driven.begin()), std::make_move_iterator(cron_driven.end()));
-  // repositories
+  std::vector<BackTrace> traces{thread_pool_.getTraces()};
   auto prov_repo_trace = provenance_repo_->getTraces();
   traces.emplace_back(std::move(prov_repo_trace));
   auto flow_repo_trace = flow_file_repo_->getTraces();
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 77d3f7a..6670a73 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -32,13 +32,17 @@
 #include "utils/file/FileUtils.h"
 #include "utils/file/FileManager.h"
 #include "utils/HTTPClient.h"
+#include "utils/GeneralUtils.h"
+#include "utils/Monitors.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace c2 {
 
-C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller,
+                 const std::shared_ptr<state::StateMonitor> &updateSink,
                  const std::shared_ptr<Configure> &configuration)
     : heart_beat_period_(3000),
       max_c2_responses(5),
@@ -47,7 +51,8 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
       controller_(controller),
       configuration_(configuration),
       protocol_(nullptr),
-      logger_(logging::LoggerFactory<C2Agent>::getLogger()) {
+      logger_(logging::LoggerFactory<C2Agent>::getLogger()),
+      thread_pool_(2, false, nullptr, "C2 threadpool") {
   allow_updates_ = true;
 
   manifest_sent_ = false;
@@ -67,12 +72,10 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
   configure(configuration, false);
 
   c2_producer_ = [&]() {
-    auto now = std::chrono::steady_clock::now();
-    auto time_since = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_run_).count();
-
     // place priority on messages to send to the c2 server
-      if ( protocol_.load() != nullptr && request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) {
-        if (requests.size() > 0) {
+      if (protocol_.load() != nullptr && request_mutex.try_lock_for(std::chrono::seconds(1))) {
+        std::lock_guard<std::timed_mutex> lock(request_mutex, std::adopt_lock);
+        if (!requests.empty()) {
           int count = 0;
           do {
             const C2Payload payload(std::move(requests.back()));
@@ -87,48 +90,72 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
             catch(...) {
               logger_->log_error("Unknonwn exception occurred while consuming payload.");
             }
-          }while(requests.size() > 0 && ++count < max_c2_responses);
+          }while(!requests.empty() && ++count < max_c2_responses);
         }
-        request_mutex.unlock();
       }
-
-      if ( time_since > heart_beat_period_ ) {
-        last_run_ = now;
-        try {
-          performHeartBeat();
-        }
-        catch(const std::exception &e) {
-          logger_->log_error("Exception occurred while performing heartbeat. error: %s", e.what());
-        }
-        catch(...) {
-          logger_->log_error("Unknonwn exception occurred while performing heartbeat.");
-        }
+      try {
+        performHeartBeat();
+      }
+      catch(const std::exception &e) {
+        logger_->log_error("Exception occurred while performing heartbeat. error: %s", e.what());
+      }
+      catch(...) {
+        logger_->log_error("Unknonwn exception occurred while performing heartbeat.");
       }
 
       checkTriggers();
 
-      std::this_thread::sleep_for(std::chrono::milliseconds(heart_beat_period_ > 500 ? 500 : heart_beat_period_));
-      return state::Update(state::UpdateStatus(state::UpdateState::READ_COMPLETE, false));
+      return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
     };
-
   functions_.push_back(c2_producer_);
 
   c2_consumer_ = [&]() {
-    auto now = std::chrono::steady_clock::now();
-    if ( queue_mutex.try_lock_until(now + std::chrono::seconds(1)) ) {
-      if (responses.size() > 0) {
-        const C2Payload payload(std::move(responses.back()));
+    if ( queue_mutex.try_lock_for(std::chrono::seconds(1)) ) {
+      C2Payload payload(Operation::HEARTBEAT);
+      {
+        std::lock_guard<std::timed_mutex> lock(queue_mutex, std::adopt_lock);
+        if (responses.empty()) {
+          return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
+        }
+        payload = std::move(responses.back());
         responses.pop_back();
-        extractPayload(std::move(payload));
       }
-      queue_mutex.unlock();
+      extractPayload(std::move(payload));
     }
-    return state::Update(state::UpdateStatus(state::UpdateState::READ_COMPLETE, false));
+    return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
   };
-
   functions_.push_back(c2_consumer_);
 }
 
+void C2Agent::start() {
+  if (controller_running_) {
+    return;
+  }
+  task_ids_.clear();
+  for (const auto& function : functions_) {
+    utils::Identifier uuid;
+    utils::IdGenerator::getIdGenerator()->generate(uuid);
+    const std::string uuid_str = uuid.to_string();
+    task_ids_.push_back(uuid_str);
+    auto monitor = utils::make_unique<utils::ComplexMonitor>();
+    utils::Worker<utils::TaskRescheduleInfo> functor(function, uuid_str, std::move(monitor));
+    std::future<utils::TaskRescheduleInfo> future;
+    thread_pool_.execute(std::move(functor), future);
+  }
+  controller_running_ = true;
+  thread_pool_.start();
+  logger_->log_info("C2 agent started");
+}
+
+void C2Agent::stop() {
+  controller_running_ = false;
+  for (const auto& id : task_ids_) {
+    thread_pool_.stopTasks(id);
+  }
+  thread_pool_.shutdown();
+  logger_->log_info("C2 agent stopped");
+}
+
 void C2Agent::checkTriggers() {
   logger_->log_debug("Checking %d triggers", triggers_.size());
   for (const auto &trigger : triggers_) {
@@ -275,60 +302,25 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf
 
 void C2Agent::performHeartBeat() {
   C2Payload payload(Operation::HEARTBEAT);
-
   logger_->log_trace("Performing heartbeat");
-
-  std::map<std::string, std::shared_ptr<state::response::ResponseNode>> metrics_copy;
-  {
-    std::lock_guard<std::timed_mutex> lock(metrics_mutex_);
-    if (metrics_map_.size() > 0) {
-      metrics_copy = std::move(metrics_map_);
-    }
-  }
-
-  if (metrics_copy.size() > 0) {
-    C2Payload metrics(Operation::HEARTBEAT);
-    metrics.setLabel("metrics");
-
-    for (auto metric : metrics_copy) {
-      if (metric.second->serialize().size() == 0)
-        continue;
-      C2Payload child_metric_payload(Operation::HEARTBEAT);
-      child_metric_payload.setLabel(metric.first);
-      serializeMetrics(child_metric_payload, metric.first, metric.second->serialize(), metric.second->isArray());
-      metrics.addPayload(std::move(child_metric_payload));
-    }
-    payload.addPayload(std::move(metrics));
-  }
-
-  for (auto metric : root_response_nodes_) {
-    C2Payload child_metric_payload(Operation::HEARTBEAT);
-    bool isArray{false};
-    std::string metricName;
-    std::vector<state::response::SerializedResponseNode> metrics;
-    std::shared_ptr<state::response::NodeReporter> reporter;
-    std::shared_ptr<state::response::ResponseNode> agentInfo;
-
-    // Send agent manifest in first heartbeat
-    if (!manifest_sent_
-        && (reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_))
-        && (agentInfo = reporter->getAgentInformation())
-        && metric.first == agentInfo->getName()) {
-      metricName = agentInfo->getName();
-      isArray = agentInfo->isArray();
-      metrics = agentInfo->serialize();
+  std::shared_ptr<state::response::NodeReporter> reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_);
+  std::vector<std::shared_ptr<state::response::ResponseNode>> metrics;
+  if (reporter) {
+    if (!manifest_sent_) {
+      // include agent manifest for the first heartbeat
+      metrics = reporter->getHeartbeatNodes(true);
       manifest_sent_ = true;
     } else {
-      metricName = metric.first;
-      isArray = metric.second->isArray();
-      metrics = metric.second->serialize();
+      metrics = reporter->getHeartbeatNodes(false);
     }
-    child_metric_payload.setLabel(metricName);
-    if (isArray) {
-      child_metric_payload.setContainer(true);
+
+    for (const auto& metric : metrics) {
+      C2Payload child_metric_payload(Operation::HEARTBEAT);
+      child_metric_payload.setLabel(metric->getName());
+      child_metric_payload.setContainer(metric->isArray());
+      serializeMetrics(child_metric_payload, metric->getName(), metric->serialize(), metric->isArray());
+      payload.addPayload(std::move(child_metric_payload));
     }
-    serializeMetrics(child_metric_payload, metricName, metrics, isArray);
-    payload.addPayload(std::move(child_metric_payload));
   }
   C2Payload && response = protocol_.load()->consumePayload(payload);
 
@@ -456,7 +448,7 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
       update_sink_->stop(true);
       C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
       protocol_.load()->consumePayload(std::move(response));
-      exit(1);
+      restart_agent();
     }
       break;
     case Operation::START:
@@ -466,7 +458,6 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
       }
 
       std::vector<std::shared_ptr<state::StateController>> components = update_sink_->getComponents(resp.name);
-
       // stop all referenced components.
       for (auto &component : components) {
         logger_->log_debug("Stopping component %s", component->getComponentName());
@@ -516,54 +507,39 @@ C2Payload C2Agent::prepareConfigurationOptions(const C2ContentResponse &resp) co
  * to be put into the acknowledgement
  */
 void C2Agent::handle_describe(const C2ContentResponse &resp) {
+  auto reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_);
   if (resp.name == "metrics") {
-    auto reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_);
-
+    C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
     if (reporter != nullptr) {
-      auto metricsClass = resp.operation_arguments.find("metricsClass");
-      uint8_t metric_class_id = 0;
-      if (metricsClass != resp.operation_arguments.end()) {
-        // we have a class
-        try {
-          metric_class_id = std::stoi(metricsClass->second.to_string());
-        } catch (...) {
-          logger_->log_error("Could not convert %s into an integer", metricsClass->second.to_string());
-        }
+      auto iter = resp.operation_arguments.find("metricsClass");
+      std::string metricsClass;
+      if (iter != resp.operation_arguments.end()) {
+        metricsClass = iter->second.to_string();
       }
-
-      std::vector<std::shared_ptr<state::response::ResponseNode>> metrics_vec;
-      C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+      auto metricsNode = reporter->getMetricsNode(metricsClass);
       C2Payload metrics(Operation::ACKNOWLEDGE);
-      metrics.setLabel("metrics");
-      reporter->getResponseNodes(metrics_vec, 0);
-      for (auto metric : metrics_vec) {
-        serializeMetrics(metrics, metric->getName(), metric->serialize());
+      metricsClass.empty() ? metrics.setLabel("metrics") : metrics.setLabel(metricsClass);
+      if (metricsNode) {
+        serializeMetrics(metrics, metricsNode->getName(), metricsNode->serialize(), metricsNode->isArray());
       }
       response.addPayload(std::move(metrics));
-      enqueue_c2_response(std::move(response));
     }
-
+    enqueue_c2_response(std::move(response));
+    return;
   } else if (resp.name == "configuration") {
     auto configOptions = prepareConfigurationOptions(resp);
     enqueue_c2_response(std::move(configOptions));
     return;
   } else if (resp.name == "manifest") {
     C2Payload response(prepareConfigurationOptions(resp));
-
-    auto reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_);
     if (reporter != nullptr) {
-      std::vector<std::shared_ptr<state::response::ResponseNode>> metrics_vec;
-
       C2Payload agentInfo(Operation::ACKNOWLEDGE, resp.ident, false, true);
       agentInfo.setLabel("agentInfo");
 
-      reporter->getManifestNodes(metrics_vec);
-      for (const auto& metric : metrics_vec) {
-          serializeMetrics(agentInfo, metric->getName(), metric->serialize());
-      }
+      const auto manifest = reporter->getAgentManifest();
+      serializeMetrics(agentInfo, manifest->getName(), manifest->serialize());
       response.addPayload(std::move(agentInfo));
     }
-
     enqueue_c2_response(std::move(response));
     return;
   } else if (resp.name == "jstack") {
@@ -839,7 +815,8 @@ void C2Agent::restart_agent() {
   }
 
   std::stringstream command;
-  command << cwd << "/minifi.sh restart";
+  command << cwd << "/bin/minifi.sh restart";
+  system(command.str().c_str());
 }
 
 void C2Agent::update_agent() {
@@ -848,26 +825,6 @@ void C2Agent::update_agent() {
   }
 }
 
-int16_t C2Agent::setResponseNodes(const std::shared_ptr<state::response::ResponseNode> &metric) {
-  auto now = std::chrono::steady_clock::now();
-  if (metrics_mutex_.try_lock_until(now + std::chrono::seconds(1))) {
-    root_response_nodes_[metric->getName()] = metric;
-    metrics_mutex_.unlock();
-    return 0;
-  }
-  return -1;
-}
-
-int16_t C2Agent::setMetricsNodes(const std::shared_ptr<state::response::ResponseNode> &metric) {
-  auto now = std::chrono::steady_clock::now();
-  if (metrics_mutex_.try_lock_until(now + std::chrono::seconds(1))) {
-    metrics_map_[metric->getName()] = metric;
-    metrics_mutex_.unlock();
-    return 0;
-  }
-  return -1;
-}
-
 } /* namespace c2 */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/libminifi/src/core/Core.cpp b/libminifi/src/core/Core.cpp
index 1e95194..0cc6932 100644
--- a/libminifi/src/core/Core.cpp
+++ b/libminifi/src/core/Core.cpp
@@ -26,8 +26,6 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
-std::shared_ptr<utils::IdGenerator> CoreComponent::id_generator_ = utils::IdGenerator::getIdGenerator();
-
 // Set UUID
 void CoreComponent::setUUID(utils::Identifier &uuid) {
   uuid_ = uuid;
diff --git a/libminifi/src/core/state/StateManager.cpp b/libminifi/src/core/state/StateManager.cpp
deleted file mode 100644
index ce46bc5..0000000
--- a/libminifi/src/core/state/StateManager.cpp
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "core/state/StateManager.h"
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "core/state/nodes/MetricsBase.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-
-void StateManager::initialize() {
-  metrics_listener_ = std::unique_ptr<state::response::TreeUpdateListener>(new state::response::TreeUpdateListener(shared_from_this(), shared_from_this()));
-  // manually add the c2 agent for now
-  listener_thread_pool_.setMaxConcurrentTasks(2);
-  listener_thread_pool_.start();
-  controller_running_ = true;
-}
-/**
- * State management operations.
- */
-int16_t StateManager::stop(bool force, uint64_t timeToWait) {
-  controller_running_ = false;
-  listener_thread_pool_.shutdown();
-  return 1;
-}
-
-int16_t StateManager::update(const std::shared_ptr<Update> &updateController) {
-  // must be stopped to update.
-  if (isStateMonitorRunning()) {
-    return -1;
-  }
-  int16_t ret = applyUpdate("StateManager", updateController);
-  switch (ret) {
-    case -1:
-      return -1;
-    default:
-      return 1;
-  }
-}
-
-/**
- * Passes metrics to the update controllers if they are a metrics sink.
- * @param metrics metric to pass through
- */
-int16_t StateManager::setResponseNodes(const std::shared_ptr<response::ResponseNode> &metrics) {
-  if (IsNullOrEmpty(metrics)) {
-    return -1;
-  }
-  auto now = std::chrono::steady_clock::now();
-  if (mutex_.try_lock_until(now + std::chrono::milliseconds(100))) {
-    // update controllers can be metric sinks too
-    for (auto controller : updateControllers) {
-      std::shared_ptr<response::ResponseNodeSink> sink = std::dynamic_pointer_cast<response::ResponseNodeSink>(controller);
-      if (sink != nullptr) {
-        sink->setResponseNodes(metrics);
-      }
-    }
-    metrics_maps_[metrics->getName()] = metrics;
-    mutex_.unlock();
-  } else {
-    return -1;
-  }
-  return 0;
-}
-/**
- * Metrics operations
- */
-int16_t StateManager::getResponseNodes(std::vector<std::shared_ptr<response::ResponseNode>> &metric_vector, uint16_t metricsClass) {
-  auto now = std::chrono::steady_clock::now();
-  const std::chrono::steady_clock::time_point wait_time = now + std::chrono::milliseconds(100);
-  if (mutex_.try_lock_until(wait_time)) {
-    for (auto metric : metrics_maps_) {
-      metric_vector.push_back(metric.second);
-    }
-    mutex_.unlock();
-    return 0;
-  }
-  return -1;
-}
-
-bool StateManager::registerUpdateListener(const std::shared_ptr<UpdateController> &updateController, const int64_t &delay) {
-  auto functions = updateController->getFunctions();
-
-  updateControllers.push_back(updateController);
-  // run all functions independently
-
-  for (auto function : functions) {
-    std::unique_ptr<utils::AfterExecute<Update>> after_execute = std::unique_ptr<utils::AfterExecute<Update>>(new UpdateRunner(isStateMonitorRunning(), delay));
-    utils::Worker<Update> functor(function, "listeners", std::move(after_execute));
-    std::future<Update> future;
-    if (!listener_thread_pool_.execute(std::move(functor), future)) {
-      // denote failure
-      return false;
-    }
-  }
-  return true;
-}
-
-/**
- * Base metrics function will employ the default metrics listener.
- */
-bool StateManager::startMetrics(const int64_t &delay) {
-  std::unique_ptr<utils::AfterExecute<Update>> after_execute = std::unique_ptr<utils::AfterExecute<Update>>(new UpdateRunner(isStateMonitorRunning(), delay));
-  utils::Worker<Update> functor(metrics_listener_->getFunction(), "metrics", std::move(after_execute));
-  if (!listener_thread_pool_.execute(std::move(functor), metrics_listener_->getFuture())) {
-    // denote failure
-    return false;
-  }
-  return true;
-}
-
-} /* namespace state */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/libminifi/src/utils/BackTrace.cpp b/libminifi/src/utils/BackTrace.cpp
index 282c69f..fba8d9a 100644
--- a/libminifi/src/utils/BackTrace.cpp
+++ b/libminifi/src/utils/BackTrace.cpp
@@ -132,25 +132,21 @@ BackTrace TraceResolver::getBackTrace(std::string thread_name, std::thread::nati
   // lock so that we only perform one backtrace at a time.
 #ifdef HAS_EXECINFO
   std::lock_guard<std::mutex> lock(mutex_);
-
-  caller_handle_ = pthread_self();
-  thread_handle_ = thread_handle;
   trace_ = BackTrace(std::move(thread_name));
 
-  if (0 == thread_handle_ || pthread_equal(caller_handle_, thread_handle)) {
+  if (0 == thread_handle || pthread_equal(pthread_self(), thread_handle)) {
     pull_trace();
   } else {
-    if (thread_handle_ == 0) {
+    if (thread_handle == 0) {
       return std::move(trace_);
     }
     emplace_handler();
-    if (pthread_kill(thread_handle_, SIGUSR2) != 0) {
+    std::unique_lock<std::mutex> ulock(trace_mutex_);
+    if (pthread_kill(thread_handle, SIGUSR2) != 0) {
       return std::move(trace_);
     }
-    sigset_t mask;
-    sigfillset(&mask);
-    sigdelset(&mask, SIGUSR2);
-    sigsuspend(&mask);
+    pull_traces_ = false;
+    trace_condition_.wait(ulock, [this] { return pull_traces_; });
   }
 #else
   // even if tracing is disabled, include thread name into the trace object
@@ -159,15 +155,10 @@ BackTrace TraceResolver::getBackTrace(std::string thread_name, std::thread::nati
   return std::move(trace_);
 }
 #ifdef HAS_EXECINFO
-static void handler(int, siginfo_t*, void*) {
-  // not the intended thread
-  if (!pthread_equal(pthread_self(), TraceResolver::getResolver().getThreadHandle())) {
-    return;
-  }
-
+void handler(int signr, siginfo_t *info, void *secret) {
+  std::unique_lock<std::mutex> lock(TraceResolver::getResolver().lock());
   pull_trace();
-
-  pthread_kill(TraceResolver::getResolver().getCallerHandle(), SIGUSR2);
+  TraceResolver::getResolver().notifyPullTracesDone(lock);
 }
 #endif
 
diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp
index 01651d8..7bafd60 100644
--- a/libminifi/src/utils/ThreadPool.cpp
+++ b/libminifi/src/utils/ThreadPool.cpp
@@ -16,7 +16,7 @@
  */
 
 #include "utils/ThreadPool.h"
-#include "core/state/StateManager.h"
+#include "core/state/UpdateController.h"
 
 namespace org {
 namespace apache {
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 30f771a..91bafe8 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -75,7 +75,7 @@ class IntegrationBase {
 
   }
 
-  virtual void configureC2RootClasses() {
+  virtual void configureFullHeartbeat() {
 
   }
 
@@ -127,7 +127,7 @@ void IntegrationBase::run(std::string test_file_location) {
 
   queryRootProcessGroup(pg);
 
-  configureC2RootClasses();
+  configureFullHeartbeat();
 
   ptr.release();
 
@@ -142,8 +142,9 @@ void IntegrationBase::run(std::string test_file_location) {
 
   shutdownBeforeFlowController();
   flowController_->unload();
-  runAssertions();
+  flowController_->stopC2();
 
+  runAssertions();
   cleanup();
 }
 
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index 9b0e671..465ed34 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -329,6 +329,8 @@ int main(int argc, char **argv) {
    */
   controller->waitUnload(stop_wait_time);
 
+  controller->stopC2();
+
   flow_repo = nullptr;
 
   prov_repo = nullptr;
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
index 4684cd3..0a594e4 100644
--- a/nanofi/include/cxx/Instance.h
+++ b/nanofi/include/cxx/Instance.h
@@ -155,9 +155,8 @@ class Instance {
     // run all functions independently
 
     for (auto function : functions) {
-      std::unique_ptr<utils::AfterExecute<state::Update>> after_execute = std::unique_ptr<utils::AfterExecute<state::Update>>(new state::UpdateRunner(running_, delay));
-      utils::Worker<state::Update> functor(function, "listeners", std::move(after_execute));
-      std::future<state::Update> future;
+      utils::Worker<utils::TaskRescheduleInfo> functor(function, "listeners");
+      std::future<utils::TaskRescheduleInfo> future;
       if (!listener_thread_pool_.execute(std::move(functor), future)) {
         // denote failure
         return false;
@@ -181,7 +180,7 @@ class Instance {
   std::string url_;
   std::shared_ptr<Configure> configure_;
 
-  utils::ThreadPool<state::Update> listener_thread_pool_{ 1 };
+  utils::ThreadPool<utils::TaskRescheduleInfo> listener_thread_pool_;
 };
 
 } /* namespace minifi */


[nifi-minifi-cpp] 01/02: MINIFICPP-1157 Implement light weight heartbeat. Remove agent manifest from regular heartbeat messages. Send agent manifest in response to DESCRIBE manifest request

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 9552757b07ce95e22601f85ae661aa62555272d7
Author: Murtuza <ms...@gmail.com>
AuthorDate: Mon Feb 10 16:20:09 2020 -0500

    MINIFICPP-1157 Implement light weight heartbeat.
      Remove agent manifest from regular heartbeat messages.
      Send agent manifest in response to DESCRIBE manifest request
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
---
 C2.md                                              |   5 +
 conf/minifi.properties                             |   2 +
 extensions/coap/tests/CoapIntegrationBase.h        |   8 +-
 .../http-curl/tests/C2DescribeManifestTest.cpp     |  91 +++++++++++++
 extensions/http-curl/tests/C2JstackTest.cpp        | 133 +++++--------------
 .../http-curl/tests/C2VerifyHeartbeatAndStop.cpp   | 144 +++++++--------------
 extensions/http-curl/tests/CMakeLists.txt          |   1 +
 extensions/http-curl/tests/HTTPHandlers.h          | 102 +++++++++++++++
 extensions/http-curl/tests/HTTPIntegrationBase.h   |  78 ++++++++++-
 libminifi/include/FlowController.h                 |  20 ++-
 libminifi/include/c2/C2Agent.h                     |   7 +
 .../include/core/state/nodes/AgentInformation.h    | 137 +++++++++++++++++---
 libminifi/include/core/state/nodes/MetricsBase.h   |  20 ++-
 libminifi/include/properties/Properties.h          |   2 +-
 libminifi/src/FlowController.cpp                   |  38 ++++++
 libminifi/src/c2/C2Agent.cpp                       | 143 ++++++++++----------
 libminifi/test/integration/IntegrationBase.h       |  11 +-
 17 files changed, 631 insertions(+), 311 deletions(-)

diff --git a/C2.md b/C2.md
index b907926..dc99778 100644
--- a/C2.md
+++ b/C2.md
@@ -49,6 +49,11 @@ Release 0.6.0: Please note that all c2 properties now exist as `nifi.c2.*`. If y
 files contain the former naming convention of `c2.*`, we will continue to support that as
 an alternate key, but you are encouraged to switch your configuration options as soon as possible.
 
+Note: In release 0.8.0 there is a configuration option to minizime the heartbeat payload size by excluding agent manifest.
+For that, replace "AgentInformation" with "AgentInformationWithoutManifest in nifi.c2.root.classes property value.
+With this change, heartbeat with agent manifest included is sent only for the first time then falls back to sending
+light weight heartbeat. If for some reason the C2 server does not receive the first full heartbeat, the manifest can
+be requested via C2 DESCRIBE manifest command.
 
 	#in minifi.properties
 
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 4f7166a..249ca7a 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -52,6 +52,8 @@ nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_reposi
 #nifi.c2.rest.url=
 #nifi.c2.rest.url.ack=
 nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
+## Minimize heartbeat payload size by excluding agent manifest from the heartbeat
+#nifi.c2.root.classes=DeviceInfoNode,AgentInformationWithoutManifest,FlowInformation
 ## heartbeat 4 times a second
 #nifi.c2.agent.heartbeat.period=250
 ## define parameters about your agent 
diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
index 83935bf..bd7740e 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -41,7 +41,7 @@ class CoapIntegrationBase : public IntegrationBase {
 
   void setUrl(std::string url, CivetHandler *handler);
 
-  virtual ~CoapIntegrationBase();
+  virtual ~CoapIntegrationBase() = default;
 
   void shutdownBeforeFlowController() override {
     stop_webserver(server);
@@ -68,6 +68,8 @@ class CoapIntegrationBase : public IntegrationBase {
 
     queryRootProcessGroup(pg);
 
+    configureC2RootClasses();
+
     ptr.release();
 
     std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
@@ -90,10 +92,6 @@ class CoapIntegrationBase : public IntegrationBase {
   CivetServer *server;
 };
 
-CoapIntegrationBase::~CoapIntegrationBase() {
-
-}
-
 void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
 
   parse_http_components(url, port, scheme, path);
diff --git a/extensions/http-curl/tests/C2DescribeManifestTest.cpp b/extensions/http-curl/tests/C2DescribeManifestTest.cpp
new file mode 100644
index 0000000..5a84c5c
--- /dev/null
+++ b/extensions/http-curl/tests/C2DescribeManifestTest.cpp
@@ -0,0 +1,91 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "c2/C2Agent.h"
+#include "CivetServer.h"
+#include <cstring>
+#include "protocols/RESTSender.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+
+class DescribeManifestHandler: public HeartbeatHandler {
+public:
+
+  explicit DescribeManifestHandler(bool isSecure)
+      : HeartbeatHandler(isSecure) {
+  }
+
+  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn) {
+    sendHeartbeatResponse("DESCRIBE", "manifest", "889345", conn);
+  }
+
+  virtual void handleAcknowledge(const rapidjson::Document& root) {
+    verifyJsonHasAgentManifest(root);
+  }
+};
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  url = "http://localhost:0/api/heartbeat";
+  if (argc > 1) {
+    test_file_location = argv[1];
+    if (argc > 2) {
+      url = "https://localhost:0/api/heartbeat";
+      key_dir = argv[2];
+    }
+  }
+
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
+
+  VerifyC2Describe harness(isSecure);
+
+  harness.setKeyDir(key_dir);
+
+  DescribeManifestHandler responder(isSecure);
+
+  harness.setUrl(url, &responder);
+
+  harness.run(test_file_location);
+}
diff --git a/extensions/http-curl/tests/C2JstackTest.cpp b/extensions/http-curl/tests/C2JstackTest.cpp
index dd2ad27..241af1c 100644
--- a/extensions/http-curl/tests/C2JstackTest.cpp
+++ b/extensions/http-curl/tests/C2JstackTest.cpp
@@ -45,123 +45,60 @@
 #include "CivetServer.h"
 #include <cstring>
 #include "protocols/RESTSender.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
 
-void waitToVerifyProcessor() {
-  std::this_thread::sleep_for(std::chrono::seconds(10));
-}
-
-
-class ConfigHandler : public CivetHandler {
+class VerifyC2DescribeJstack : public VerifyC2Describe {
  public:
-  ConfigHandler() {
-    calls_ = 0;
-  }
-  bool handlePost(CivetServer *server, struct mg_connection *conn) {
-    calls_++;
-    std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [  {"
-          "\"operation\" : \"describe\", "
-          "\"operationid\" : \"8675309\", "
-          "\"name\": \"jstack\""
-          "}]}";
-      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-                "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
-                heartbeat_response.length());
-      mg_printf(conn, "%s", heartbeat_response.c_str());
-
-
-    return true;
+  explicit VerifyC2DescribeJstack(bool isSecure)
+      : VerifyC2Describe(isSecure) {
   }
 
-  bool handleGet(CivetServer *server, struct mg_connection *conn) {
-    std::ifstream myfile(test_file_location_.c_str());
-
-    if (myfile.is_open()) {
-      std::stringstream buffer;
-      buffer << myfile.rdbuf();
-      std::string str = buffer.str();
-      myfile.close();
-      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-                "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
-                str.length());
-      mg_printf(conn, "%s", str.c_str());
-    } else {
-      mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
-    }
-
-    return true;
+  virtual void runAssertions() {
+    assert(LogTestController::getInstance().contains("SchedulingAgent") == true);
   }
-  std::string test_file_location_;
-  std::atomic<size_t> calls_;
 };
 
-int main(int argc, char **argv) {
-  mg_init_library(0);
-  LogTestController::getInstance().setInfo<minifi::FlowController>();
-  LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
-  LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
-  LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+class DescribeJstackHandler : public HeartbeatHandler {
+ public:
+  explicit DescribeJstackHandler(bool isSecure)
+     : HeartbeatHandler(isSecure) {
+  }
 
-  const char *options[] = { "document_root", ".", "listening_ports", "0", 0 };
-  std::vector<std::string> cpp_options;
-  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
-    cpp_options.push_back(options[i]);
+  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn) {
+    sendHeartbeatResponse("DESCRIBE", "jstack", "889398", conn);
   }
 
-  CivetServer server(cpp_options);
+  virtual void handleAcknowledge(const rapidjson::Document& root) {
+    assert(root.HasMember("SchedulingAgent #0") == true);
+  }
 
-  std::string port_str = std::to_string(server.getListeningPorts()[0]);
+};
 
-  ConfigHandler h_ex;
-  server.addHandler("/update", h_ex);
-  std::string key_dir, test_file_location;
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  url = "http://localhost:0/api/heartbeat";
   if (argc > 1) {
-    h_ex.test_file_location_ = test_file_location = argv[1];
-    key_dir = argv[2];
+    test_file_location = argv[1];
+    if (argc > 2) {
+      url = "https://localhost:0/api/heartbeat";
+      key_dir = argv[2];
+    }
   }
 
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
 
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-
-  std::string c2_rest_url = "http://localhost:" + port_str + "/update";
-
-  configuration->set("c2.rest.url", c2_rest_url);
-  configuration->set("c2.agent.heartbeat.period", "1000");
-
-  std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
-  std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
-
-  configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
-
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
-      new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
-  std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
-
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
-  true);
-
-  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+  VerifyC2DescribeJstack harness(isSecure);
 
-  std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
-  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
-  ptr.release();
-  auto start = std::chrono::system_clock::now();
+  harness.setKeyDir(key_dir);
 
-  controller->load();
-  controller->start();
-  waitToVerifyProcessor();
+  DescribeJstackHandler responder(isSecure);
 
-  controller->waitUnload(60000);
-  auto then = std::chrono::system_clock::now();
+  harness.setUrl(url, &responder);
 
-  auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
-  std::string logs = LogTestController::getInstance().log_output.str();
-  #ifndef WIN32
-  assert(logs.find("SchedulingAgent") != std::string::npos);
-  #endif
-  LogTestController::getInstance().reset();
-  assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
+  harness.run(test_file_location);
 
-  return 0;
 }
diff --git a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
index 2eaa10b..6e0c1c2 100644
--- a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
+++ b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
@@ -50,98 +50,47 @@
 #include "protocols/RESTReceiver.h"
 #include "protocols/RESTSender.h"
 #include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
 #include "agent/build_description.h"
 #include "processors/LogAttribute.h"
 
-class Responder : public CivetHandler {
+class LightWeightC2Handler : public HeartbeatHandler {
  public:
-  explicit Responder(bool isSecure)
-      : isSecure(isSecure) {
+  explicit LightWeightC2Handler(bool isSecure)
+      : HeartbeatHandler(isSecure),
+        calls_(0) {
   }
 
-  std::string readPost(struct mg_connection *conn) {
-    std::string response;
-    int blockSize = 1024 * sizeof(char), readBytes;
+  virtual ~LightWeightC2Handler() = default;
 
-    char buffer[1024];
-    while ((readBytes = mg_read(conn, buffer, blockSize)) > 0) {
-      response.append(buffer, 0, (readBytes / sizeof(char)));
+  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn)  {
+    (void)conn;
+    if (calls_ == 0) {
+      verifyJsonHasAgentManifest(root);
+    } else {
+      assert(root.HasMember("agentInfo") == true);
+      assert(root["agentInfo"].HasMember("agentManifest") == false);
     }
-    return response;
+    calls_++;
   }
-  bool handlePost(CivetServer *server, struct mg_connection *conn) {
-    auto post_data = readPost(conn);
-
-    std::cerr << post_data << std::endl;
-
-    if (!IsNullOrEmpty(post_data)) {
-      rapidjson::Document root;
-      rapidjson::ParseResult ok = root.Parse(post_data.data(), post_data.size());
-      bool found = false;
-      std::string operation = root["operation"].GetString();
-      if (operation == "heartbeat") {
-        assert(root.HasMember("agentInfo") == true);
-        assert(root["agentInfo"]["agentManifest"].HasMember("bundles") == true);
-
-        for (auto &bundle : root["agentInfo"]["agentManifest"]["bundles"].GetArray()) {
-          assert(bundle.HasMember("artifact"));
-          std::string str = bundle["artifact"].GetString();
-          if (str == "minifi-system") {
-
-            std::vector<std::string> classes;
-            for (auto &proc : bundle["componentManifest"]["processors"].GetArray()) {
-              classes.push_back(proc["type"].GetString());
-            }
-
-            auto group = minifi::BuildDescription::getClassDescriptions(str);
-            for (auto proc : group.processors_) {
-              assert(std::find(classes.begin(), classes.end(), proc.class_name_) != std::end(classes));
-              found = true;
-            }
-
-          }
-        }
-        assert(found == true);
-      }
-    }
-    std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\"  }, "
-        "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\"  } ]}";
-    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
-              resp.length());
-    mg_printf(conn, "%s", resp.c_str());
-    return true;
-  }
-
- protected:
-  bool isSecure;
+ private:
+  std::atomic<size_t> calls_;
 };
 
-class VerifyC2Heartbeat : public CoapIntegrationBase {
+class VerifyC2Heartbeat : public VerifyC2Base {
  public:
   explicit VerifyC2Heartbeat(bool isSecure)
-      : isSecure(isSecure) {
-    char format[] = "/tmp/ssth.XXXXXX";
-    dir = testController.createTempDirectory(format);
+      : VerifyC2Base(isSecure) {
   }
 
-  void testSetup() {
-    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+  virtual ~VerifyC2Heartbeat() = default;
+
+  virtual void testSetup() {
     LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
-    LogTestController::getInstance().setDebug<LogTestController>();
     LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
     LogTestController::getInstance().setDebug<minifi::c2::RESTProtocol>();
     LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
-    std::fstream file;
-    ss << dir << "/" << "tstFile.ext";
-    file.open(ss.str(), std::ios::out);
-    file << "tempFile";
-    file.close();
-  }
-
-  void cleanup() {
-    LogTestController::getInstance().reset();
-    unlink(ss.str().c_str());
+    VerifyC2Base::testSetup();
   }
 
   void runAssertions() {
@@ -152,32 +101,22 @@ class VerifyC2Heartbeat : public CoapIntegrationBase {
     assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController") == true);
   }
 
-  void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
-    std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
-    assert(proc != nullptr);
-
-    std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
+  void configureC2RootClasses() {
+    configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformation,FlowInformation");
+  }
+};
 
-    assert(inv != nullptr);
-    std::string url = "";
-    inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+class VerifyLightWeightC2Heartbeat : public VerifyC2Heartbeat {
+public:
+  explicit VerifyLightWeightC2Heartbeat(bool isSecure)
+      : VerifyC2Heartbeat(isSecure) {
+  }
 
-    std::string c2_url = std::string("http") + (isSecure ? "s" : "") + "://localhost:" + getWebPort() + "/api/heartbeat";
+  virtual ~VerifyLightWeightC2Heartbeat() = default;
 
-    configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
-    configuration->set("nifi.c2.enable", "true");
-    configuration->set("nifi.c2.agent.class", "test");
-    configuration->set("nifi.c2.rest.url", c2_url);
-    configuration->set("nifi.c2.agent.heartbeat.period", "1000");
-    configuration->set("nifi.c2.rest.url.ack", c2_url);
-    configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformation,FlowInformation");
+  void configureC2RootClasses() {
+    configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformationWithoutManifest,FlowInformation");
   }
-
- protected:
-  bool isSecure;
-  std::string dir;
-  std::stringstream ss;
-  TestController testController;
 };
 
 int main(int argc, char **argv) {
@@ -195,12 +134,23 @@ int main(int argc, char **argv) {
   if (url.find("https") != std::string::npos) {
     isSecure = true;
   }
+  {
+    VerifyC2Heartbeat harness(isSecure);
+
+    harness.setKeyDir(key_dir);
+
+    HeartbeatHandler responder(isSecure);
+
+    harness.setUrl(url, &responder);
+
+    harness.run(test_file_location);
+  }
 
-  VerifyC2Heartbeat harness(isSecure);
+  VerifyLightWeightC2Heartbeat harness(isSecure);
 
   harness.setKeyDir(key_dir);
 
-  Responder responder(isSecure);
+  LightWeightC2Handler responder(isSecure);
 
   harness.setUrl(url, &responder);
 
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index ab9176d..ff848b5 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -72,6 +72,7 @@ add_test(NAME HTTPStreamingCallbackTests COMMAND "HTTPStreamingCallbackTests" WO
 add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2JstackTest COMMAND C2JstackTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
+add_test(NAME C2DescribeManifestTest COMMAND C2DescribeManifestTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2UpdateAgentTest COMMAND C2UpdateAgentTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2FailedUpdateTest COMMAND C2FailedUpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/TestBad.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml"  "${TEST_RESOURCES}/")
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 282b470..1de39b3 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -20,6 +20,8 @@
 #include "concurrentqueue.h"
 #include "CivetStream.h"
 #include "io/CRCStream.h"
+#include "rapidjson/document.h"
+
 #ifndef LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_
 #define LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_
 static std::atomic<int> transaction_id;
@@ -343,4 +345,104 @@ class DeleteTransactionResponder : public CivetHandler {
   std::string response_code;
 };
 
+class HeartbeatHandler : public CivetHandler {
+ public:
+  explicit HeartbeatHandler(bool isSecure)
+      : isSecure(isSecure) {
+  }
+
+  std::string readPost(struct mg_connection *conn) {
+    std::string response;
+    int blockSize = 1024 * sizeof(char), readBytes;
+
+    char buffer[1024];
+    while ((readBytes = mg_read(conn, buffer, blockSize)) > 0) {
+      response.append(buffer, 0, (readBytes / sizeof(char)));
+    }
+    return response;
+  }
+
+  void sendStopOperation(struct mg_connection *conn) {
+    std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\"  }, "
+        "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\"  } ]}";
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+              resp.length());
+    mg_printf(conn, "%s", resp.c_str());
+  }
+
+  void sendHeartbeatResponse(const std::string& operation, const std::string& operand, const std::string& operationId, struct mg_connection * conn) {
+    std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [  {"
+          "\"operation\" : \"" + operation + "\","
+          "\"operationid\" : \"" + operationId + "\","
+          "\"operand\": \"" + operand + "\"}]}";
+
+      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+                "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+                heartbeat_response.length());
+      mg_printf(conn, "%s", heartbeat_response.c_str());
+  }
+
+  void verifyJsonHasAgentManifest(const rapidjson::Document& root) {
+    bool found = false;
+    assert(root.HasMember("agentInfo") == true);
+    assert(root["agentInfo"].HasMember("agentManifest") == true);
+    assert(root["agentInfo"]["agentManifest"].HasMember("bundles") == true);
+
+    for (auto &bundle : root["agentInfo"]["agentManifest"]["bundles"].GetArray()) {
+      assert(bundle.HasMember("artifact"));
+      std::string str = bundle["artifact"].GetString();
+      if (str == "minifi-system") {
+
+        std::vector<std::string> classes;
+        for (auto &proc : bundle["componentManifest"]["processors"].GetArray()) {
+          classes.push_back(proc["type"].GetString());
+        }
+
+        auto group = minifi::BuildDescription::getClassDescriptions(str);
+        for (auto proc : group.processors_) {
+          assert(std::find(classes.begin(), classes.end(), proc.class_name_) != std::end(classes));
+          found = true;
+        }
+
+      }
+    }
+    assert(found == true);
+  }
+
+  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn) {
+    (void)conn;
+    verifyJsonHasAgentManifest(root);
+  }
+
+  virtual void handleAcknowledge(const rapidjson::Document& root) {
+  }
+
+  void verify(struct mg_connection *conn) {
+    auto post_data = readPost(conn);
+    std::cerr << post_data << std::endl;
+    if (!IsNullOrEmpty(post_data)) {
+      rapidjson::Document root;
+      rapidjson::ParseResult ok = root.Parse(post_data.data(), post_data.size());
+      std::string operation = root["operation"].GetString();
+      if (operation == "heartbeat") {
+        handleHeartbeat(root, conn);
+      } else if (operation == "acknowledge") {
+        handleAcknowledge(root);
+      } else {
+        throw std::runtime_error("operation not supported " + operation);
+      }
+    }
+  }
+
+  bool handlePost(CivetServer *server, struct mg_connection *conn) {
+    verify(conn);
+    sendStopOperation(conn);
+    return true;
+  }
+
+ protected:
+  bool isSecure;
+};
+
 #endif /* LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ */
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 4ab623b..4226630 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -21,6 +21,8 @@
 #include "../tests/TestServer.h"
 #include "CivetServer.h"
 #include "integration/IntegrationBase.h"
+#include "c2/C2Agent.h"
+#include "protocols/RESTSender.h"
 
 int log_message(const struct mg_connection *conn, const char *message) {
   puts(message);
@@ -41,8 +43,6 @@ class CoapIntegrationBase : public IntegrationBase {
 
   void setUrl(std::string url, CivetHandler *handler);
 
-  virtual ~CoapIntegrationBase();
-
   void shutdownBeforeFlowController() {
     stop_webserver(server);
   }
@@ -59,10 +59,6 @@ class CoapIntegrationBase : public IntegrationBase {
   CivetServer *server;
 };
 
-CoapIntegrationBase::~CoapIntegrationBase() {
-
-}
-
 void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
 
   parse_http_components(url, port, scheme, path);
@@ -91,4 +87,74 @@ void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
   }
 }
 
+class VerifyC2Base : public CoapIntegrationBase {
+ public:
+  explicit VerifyC2Base(bool isSecure)
+      : isSecure(isSecure) {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+  }
+
+  virtual void testSetup() {
+    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+    LogTestController::getInstance().setDebug<LogTestController>();
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+  }
+
+  void runAssertions() {
+  }
+
+  virtual void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
+    std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
+    assert(proc != nullptr);
+
+    std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
+
+    assert(inv != nullptr);
+    std::string url = "";
+    inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+
+    std::string c2_url = std::string("http") + (isSecure ? "s" : "") + "://localhost:" + getWebPort() + "/api/heartbeat";
+
+    configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
+    configuration->set("nifi.c2.enable", "true");
+    configuration->set("nifi.c2.agent.class", "test");
+    configuration->set("nifi.c2.rest.url", c2_url);
+    configuration->set("nifi.c2.agent.heartbeat.period", "1000");
+    configuration->set("nifi.c2.rest.url.ack", c2_url);
+  }
+
+  void cleanup() {
+    LogTestController::getInstance().reset();
+    unlink(ss.str().c_str());
+  }
+
+ protected:
+  bool isSecure;
+  std::string dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+class VerifyC2Describe : public VerifyC2Base {
+ public:
+  explicit VerifyC2Describe(bool isSecure)
+      : VerifyC2Base(isSecure) {
+  }
+
+  void testSetup() {
+    LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+    LogTestController::getInstance().setInfo<minifi::FlowController>();
+    VerifyC2Base::testSetup();
+  }
+
+  void configureC2RootClasses() {
+    configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformationWithoutManifest,FlowInformation");
+  }
+};
 #endif /* LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_ */
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 5602452..bef607d 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -303,6 +303,19 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
    */
   virtual int16_t getMetricsNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector, uint16_t metricsClass);
 
+  /**
+   * Retrieves agent information with manifest only from this source.
+   * @param manifest_vector -- manifest nodes vector.
+   * @return 0 on Success, -1 on failure
+   */
+  virtual int16_t getManifestNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>& manifest_vector) const;
+
+  /**
+   * Returns a response node containing all agent information with manifest and agent status
+   * @return a shared pointer to agent information
+   */
+  virtual std::shared_ptr<state::response::ResponseNode> getAgentInformation() const;
+
   virtual uint64_t getUptime();
 
   virtual std::vector<BackTrace> getTraces();
@@ -384,9 +397,10 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
 
   std::chrono::steady_clock::time_point start_time_;
 
-  std::mutex metrics_mutex_;
+  mutable std::mutex metrics_mutex_;
   // root_nodes cache
   std::map<std::string, std::shared_ptr<state::response::ResponseNode>> root_response_nodes_;
+
   // metrics cache
   std::map<std::string, std::shared_ptr<state::response::ResponseNode>> device_information_;
 
@@ -394,6 +408,10 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
   std::map<std::string, std::shared_ptr<state::response::ResponseNode>> component_metrics_;
 
   std::map<uint8_t, std::vector<std::shared_ptr<state::response::ResponseNode>>> component_metrics_by_id_;
+
+  // manifest cache
+  std::map<std::string, std::shared_ptr<state::response::ResponseNode>> agent_information_;
+
   // metrics last run
   std::chrono::steady_clock::time_point last_metrics_capture_;
 
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index f84c131..c39589f 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -171,6 +171,11 @@ class C2Agent : public state::UpdateController, public state::response::Response
    */
   bool update_property(const std::string &property_name, const std::string &property_value,  bool persist = false);
 
+  /**
+   * Creates configuration options C2 payload for response
+   */
+  C2Payload prepareConfigurationOptions(const C2ContentResponse &resp) const;
+
   std::timed_mutex metrics_mutex_;
   std::map<std::string, std::shared_ptr<state::response::ResponseNode>> metrics_map_;
 
@@ -244,6 +249,8 @@ class C2Agent : public state::UpdateController, public state::response::Response
   std::string bin_location_;
 
   std::shared_ptr<logging::Logger> logger_;
+
+  bool manifest_sent_;
 };
 
 } /* namesapce c2 */
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h
index abf0c6f..5809a4c 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -68,7 +68,7 @@ namespace response {
 
 class ComponentManifest : public DeviceInformation {
  public:
-  ComponentManifest(std::string name, utils::Identifier & uuid)
+  ComponentManifest(const std::string& name, utils::Identifier & uuid)
       : DeviceInformation(name, uuid) {
   }
 
@@ -92,7 +92,7 @@ class ComponentManifest : public DeviceInformation {
   }
  protected:
 
-  void serializeClassDescription(const std::vector<ClassDescription> &descriptions, const std::string name, SerializedResponseNode &response) {
+  void serializeClassDescription(const std::vector<ClassDescription> &descriptions, const std::string name, SerializedResponseNode &response) const {
     if (!descriptions.empty()) {
       SerializedResponseNode type;
       type.name = name;
@@ -307,7 +307,7 @@ class ComponentManifest : public DeviceInformation {
 
 class ExternalManifest : public ComponentManifest {
  public:
-  ExternalManifest(std::string name, utils::Identifier & uuid)
+  ExternalManifest(const std::string& name, utils::Identifier & uuid)
       : ComponentManifest(name, uuid) {
   }
 
@@ -329,7 +329,7 @@ class ExternalManifest : public ComponentManifest {
 
 class Bundles : public DeviceInformation {
  public:
-  Bundles(std::string name, utils::Identifier & uuid)
+  Bundles(const std::string& name, utils::Identifier & uuid)
       : DeviceInformation(name, uuid) {
     setArray(true);
   }
@@ -452,8 +452,17 @@ class AgentStatus : public StateMonitorNode {
         queuesize.name = "size";
         queuesize.value = repo.second->getRepoSize();
 
-        repoNode.children.push_back(queuesize);
+        SerializedResponseNode isRunning;
+        isRunning.name = "running";
+        isRunning.value = repo.second->isRunning();
+
+        SerializedResponseNode isFull;
+        isFull.name = "full";
+        isFull.value = repo.second->isFull();
 
+        repoNode.children.push_back(queuesize);
+        repoNode.children.push_back(isRunning);
+        repoNode.children.push_back(isFull);
         repositories.children.push_back(repoNode);
 
       }
@@ -540,14 +549,12 @@ class AgentMonitor {
 class AgentManifest : public DeviceInformation {
  public:
 
-  AgentManifest(std::string name, utils::Identifier & uuid)
+  AgentManifest(const std::string& name, utils::Identifier & uuid)
       : DeviceInformation(name, uuid) {
-    //setArray(true);
   }
 
   AgentManifest(const std::string &name)
       : DeviceInformation(name) {
-    //  setArray(true);
   }
 
   std::string getName() const {
@@ -622,25 +629,21 @@ class AgentManifest : public DeviceInformation {
   }
 };
 
-/**
- * Purpose and Justification: Prints classes along with their properties for the current agent.
- */
-class AgentInformation : public DeviceInformation, public AgentMonitor, public AgentIdentifier {
- public:
 
-  AgentInformation(std::string name, utils::Identifier & uuid)
+class AgentNode : public DeviceInformation, public AgentMonitor, public AgentIdentifier {
+public:
+
+  AgentNode(const std::string& name, utils::Identifier & uuid)
       : DeviceInformation(name, uuid) {
     setArray(false);
   }
 
-  AgentInformation(const std::string &name)
+  explicit AgentNode(const std::string& name)
       : DeviceInformation(name) {
     setArray(false);
   }
 
-  std::string getName() const {
-    return "agentInfo";
-  }
+protected:
 
   std::vector<SerializedResponseNode> serialize() {
     std::vector<SerializedResponseNode> serialized;
@@ -654,6 +657,14 @@ class AgentInformation : public DeviceInformation, public AgentMonitor, public A
     agentClass.name = "agentClass";
     agentClass.value = agent_class_;
 
+    serialized.push_back(ident);
+    serialized.push_back(agentClass);
+
+    return serialized;
+  }
+
+  std::vector<SerializedResponseNode> getAgentManifest() const {
+    std::vector<SerializedResponseNode> serialized;
     AgentManifest manifest("manifest");
 
     SerializedResponseNode agentManifest;
@@ -661,6 +672,34 @@ class AgentInformation : public DeviceInformation, public AgentMonitor, public A
     for (auto &ser : manifest.serialize()) {
       agentManifest.children.push_back(std::move(ser));
     }
+    serialized.push_back(agentManifest);
+    return serialized;
+  }
+};
+
+/**
+ * This class is used for regular heartbeat without manifest
+ * A light weight heartbeat
+ */
+class AgentInformationWithoutManifest : public AgentNode {
+public:
+
+  AgentInformationWithoutManifest(const std::string& name, utils::Identifier & uuid)
+      : AgentNode(name, uuid) {
+    setArray(false);
+  }
+
+  explicit AgentInformationWithoutManifest(const std::string &name)
+      : AgentNode(name) {
+    setArray(false);
+  }
+
+  std::string getName() const {
+    return "agentInfo";
+  }
+
+  std::vector<SerializedResponseNode> serialize() {
+    std::vector<SerializedResponseNode> serialized(AgentNode::serialize());
 
     AgentStatus status("status");
     status.setRepositories(repositories_);
@@ -672,16 +711,72 @@ class AgentInformation : public DeviceInformation, public AgentMonitor, public A
       agentStatus.children.push_back(std::move(ser));
     }
 
-    serialized.push_back(ident);
-    serialized.push_back(agentClass);
-    serialized.push_back(agentManifest);
     serialized.push_back(agentStatus);
     return serialized;
   }
+};
+
+
+/**
+ * This class is used for sending all agent information including manifest and status
+ * A heavy weight heartbeat. Here to maintain backward compatibility
+ */
+class AgentInformation : public AgentInformationWithoutManifest {
+ public:
+
+  AgentInformation(const std::string& name, utils::Identifier & uuid)
+      : AgentInformationWithoutManifest(name, uuid) {
+    setArray(false);
+  }
+
+  explicit AgentInformation(const std::string &name)
+      : AgentInformationWithoutManifest(name) {
+    setArray(false);
+  }
+
+  std::string getName() const {
+    return "agentInfo";
+  }
+
+  std::vector<SerializedResponseNode> serialize() {
+    std::vector<SerializedResponseNode> serialized(AgentInformationWithoutManifest::serialize());
+    auto manifest = getAgentManifest();
+    serialized.insert(serialized.end(), std::make_move_iterator(manifest.begin()), std::make_move_iterator(manifest.end()));
+    return serialized;
+  }
+
+};
+
+/**
+ * This class is used for response to DESCRIBE manifest request
+ * It contains static information only
+ */
+class AgentInformationWithManifest : public AgentNode {
+public:
+  AgentInformationWithManifest(const std::string& name, utils::Identifier & uuid)
+      : AgentNode(name, uuid) {
+    setArray(false);
+  }
 
+  explicit AgentInformationWithManifest(const std::string &name)
+      : AgentNode(name) {
+    setArray(false);
+  }
+
+  std::string getName() const {
+    return "agentInfo";
+  }
+
+  std::vector<SerializedResponseNode> serialize() {
+    std::vector<SerializedResponseNode> serialized(AgentNode::serialize());
+    auto manifest = getAgentManifest();
+    serialized.insert(serialized.end(), std::make_move_iterator(manifest.begin()), std::make_move_iterator(manifest.end()));
+    return serialized;
+  }
 };
 
 REGISTER_RESOURCE(AgentInformation, "Node part of an AST that defines all agent information, to include the manifest, and bundle information as part of a healthy hearbeat.");
+REGISTER_RESOURCE(AgentInformationWithoutManifest, "Node part of an AST that defines all agent information, without the manifest and bundle information as part of a healthy hearbeat.");
 
 } /* namespace metrics */
 } /* namespace state */
diff --git a/libminifi/include/core/state/nodes/MetricsBase.h b/libminifi/include/core/state/nodes/MetricsBase.h
index 40a0c0c..d91df6e 100644
--- a/libminifi/include/core/state/nodes/MetricsBase.h
+++ b/libminifi/include/core/state/nodes/MetricsBase.h
@@ -43,12 +43,12 @@ class ResponseNode : public core::Connectable {
         is_array_(false) {
   }
 
-  ResponseNode(std::string name)
+  ResponseNode(const std::string& name)
       : core::Connectable(name),
         is_array_(false) {
   }
 
-  ResponseNode(std::string name, utils::Identifier & uuid)
+  ResponseNode(const std::string& name, utils::Identifier & uuid)
       : core::Connectable(name, uuid),
         is_array_(false) {
   }
@@ -90,10 +90,10 @@ class ResponseNode : public core::Connectable {
  */
 class DeviceInformation : public ResponseNode {
  public:
-  DeviceInformation(std::string name, utils::Identifier & uuid)
+  DeviceInformation(const std::string& name, utils::Identifier & uuid)
       : ResponseNode(name, uuid) {
   }
-  DeviceInformation(std::string name)
+  DeviceInformation(const std::string& name)
       : ResponseNode(name) {
   }
 };
@@ -228,6 +228,18 @@ class NodeReporter {
    */
   virtual int16_t getMetricsNodes(std::vector<std::shared_ptr<ResponseNode>> &metric_vector, uint16_t metricsClass) = 0;
 
+  /**
+   * Retrieves agent information with manifest only from this source.
+   * @param manifest_vector -- manifest nodes vector.
+   * @return 0 on Success, -1 on failure
+   */
+  virtual int16_t getManifestNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>& manifest_vector) const = 0;
+
+  /**
+   * Returns a response node containing all agent information with manifest and agent status
+   * @return a shared pointer to agent information
+   */
+  virtual std::shared_ptr<state::response::ResponseNode> getAgentInformation() const = 0;
 };
 
 /**
diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h
index 2a855ba..648d86f 100644
--- a/libminifi/include/properties/Properties.h
+++ b/libminifi/include/properties/Properties.h
@@ -96,7 +96,7 @@ class Properties {
     minifi_home_ = minifiHome;
   }
 
-  std::vector<std::string> getConfiguredKeys() {
+  std::vector<std::string> getConfiguredKeys() const {
     std::vector<std::string> keys;
     for (auto &property : properties_) {
       keys.push_back(property.first);
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 31160c8..1a251b5 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -466,6 +466,7 @@ void FlowController::initializeC2() {
   device_information_.clear();
   component_metrics_.clear();
   component_metrics_by_id_.clear();
+  agent_information_.clear();
   std::string class_csv;
 
   if (root_ != nullptr) {
@@ -484,6 +485,15 @@ void FlowController::initializeC2() {
     repoMetrics->addRepository(flow_file_repo_);
 
     device_information_[repoMetrics->getName()] = repoMetrics;
+
+    std::shared_ptr<state::response::AgentInformationWithManifest> manifest = std::make_shared<state::response::AgentInformationWithManifest>("agentInformation");
+    auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(manifest);
+
+    if (identifier != nullptr) {
+      identifier->setIdentifier(identifier_str);
+      identifier->setAgentClass(class_str);
+      agent_information_[manifest->getName()] = manifest;
+    }
   }
 
   if (configuration_->get("nifi.c2.root.classes", class_csv)) {
@@ -932,6 +942,34 @@ int16_t FlowController::getMetricsNodes(std::vector<std::shared_ptr<state::respo
   return 0;
 }
 
+int16_t FlowController::getManifestNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>& manifest_vector) const {
+    std::lock_guard<std::mutex> lock(metrics_mutex_);
+    for (const auto& metric : agent_information_) {
+        manifest_vector.push_back(metric.second);
+    }
+    return 0;
+}
+
+std::shared_ptr<state::response::ResponseNode> FlowController::getAgentInformation() const {
+    auto agentInfo = std::make_shared<state::response::AgentInformation>("agentInfo");
+    auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(agentInfo);
+
+    if (identifier != nullptr) {
+      std::string class_str;
+      configuration_->get("nifi.c2.agent.class", "c2.agent.class", class_str);
+
+      std::string identifier_str;
+      if (!configuration_->get("nifi.c2.agent.identifier", "c2.agent.identifier", identifier_str) || identifier_str.empty()) {
+        identifier_str = uuidStr_;
+      }
+
+      identifier->setIdentifier(identifier_str);
+      identifier->setAgentClass(class_str);
+      return agentInfo;
+    }
+    return nullptr;
+}
+
 std::vector<std::shared_ptr<state::StateController>> FlowController::getAllComponents() {
   std::vector<std::shared_ptr<state::StateController>> vec;
   vec.push_back(shared_from_this());
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 7c4b6c2..77d3f7a 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -50,6 +50,8 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
       logger_(logging::LoggerFactory<C2Agent>::getLogger()) {
   allow_updates_ = true;
 
+  manifest_sent_ = false;
+
   running_c2_configuration = std::make_shared<Configure>();
 
   last_run_ = std::chrono::steady_clock::now();
@@ -299,32 +301,34 @@ void C2Agent::performHeartBeat() {
     payload.addPayload(std::move(metrics));
   }
 
-  if (device_information_.size() > 0) {
-    C2Payload deviceInfo(Operation::HEARTBEAT);
-    deviceInfo.setLabel("AgentInformation");
-
-    for (auto metric : device_information_) {
-      C2Payload child_metric_payload(Operation::HEARTBEAT);
-      child_metric_payload.setLabel(metric.first);
-      if (metric.second->isArray()) {
-        child_metric_payload.setContainer(true);
-      }
-      serializeMetrics(child_metric_payload, metric.first, metric.second->serialize(), metric.second->isArray());
-      deviceInfo.addPayload(std::move(child_metric_payload));
+  for (auto metric : root_response_nodes_) {
+    C2Payload child_metric_payload(Operation::HEARTBEAT);
+    bool isArray{false};
+    std::string metricName;
+    std::vector<state::response::SerializedResponseNode> metrics;
+    std::shared_ptr<state::response::NodeReporter> reporter;
+    std::shared_ptr<state::response::ResponseNode> agentInfo;
+
+    // Send agent manifest in first heartbeat
+    if (!manifest_sent_
+        && (reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_))
+        && (agentInfo = reporter->getAgentInformation())
+        && metric.first == agentInfo->getName()) {
+      metricName = agentInfo->getName();
+      isArray = agentInfo->isArray();
+      metrics = agentInfo->serialize();
+      manifest_sent_ = true;
+    } else {
+      metricName = metric.first;
+      isArray = metric.second->isArray();
+      metrics = metric.second->serialize();
     }
-    payload.addPayload(std::move(deviceInfo));
-  }
-
-  if (!root_response_nodes_.empty()) {
-    for (auto metric : root_response_nodes_) {
-      C2Payload child_metric_payload(Operation::HEARTBEAT);
-      child_metric_payload.setLabel(metric.first);
-      if (metric.second->isArray()) {
-        child_metric_payload.setContainer(true);
-      }
-      serializeMetrics(child_metric_payload, metric.first, metric.second->serialize(), metric.second->isArray());
-      payload.addPayload(std::move(child_metric_payload));
+    child_metric_payload.setLabel(metricName);
+    if (isArray) {
+      child_metric_payload.setContainer(true);
     }
+    serializeMetrics(child_metric_payload, metricName, metrics, isArray);
+    payload.addPayload(std::move(child_metric_payload));
   }
   C2Payload && response = protocol_.load()->consumePayload(payload);
 
@@ -485,6 +489,28 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
   }
 }
 
+C2Payload C2Agent::prepareConfigurationOptions(const C2ContentResponse &resp) const {
+    auto unsanitized_keys = configuration_->getConfiguredKeys();
+    std::vector<std::string> keys;
+    std::copy_if(unsanitized_keys.begin(), unsanitized_keys.end(), std::back_inserter(keys),
+            [](std::string key) {return key.find("pass") == std::string::npos;});
+
+    C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+    C2Payload options(Operation::ACKNOWLEDGE);
+    options.setLabel("configuration_options");
+    std::string value;
+    for (auto key : keys) {
+      C2ContentResponse option(Operation::ACKNOWLEDGE);
+      option.name = key;
+      if (configuration_->get(key, value)) {
+        option.operation_arguments[key] = value;
+        options.addContent(std::move(option));
+      }
+    }
+    response.addPayload(std::move(options));
+    return response;
+}
+
 /**
  * Descriptions are special types of requests that require information
  * to be put into the acknowledgement
@@ -506,67 +532,36 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) {
       }
 
       std::vector<std::shared_ptr<state::response::ResponseNode>> metrics_vec;
-
-      reporter->getResponseNodes(metrics_vec, metric_class_id);
       C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
-      response.setLabel("metrics");
+      C2Payload metrics(Operation::ACKNOWLEDGE);
+      metrics.setLabel("metrics");
+      reporter->getResponseNodes(metrics_vec, 0);
       for (auto metric : metrics_vec) {
-        serializeMetrics(response, metric->getName(), metric->serialize());
+        serializeMetrics(metrics, metric->getName(), metric->serialize());
       }
+      response.addPayload(std::move(metrics));
       enqueue_c2_response(std::move(response));
     }
 
   } else if (resp.name == "configuration") {
-    auto unsanitized_keys = configuration_->getConfiguredKeys();
-    std::vector<std::string> keys;
-    std::copy_if(unsanitized_keys.begin(), unsanitized_keys.end(), std::back_inserter(keys), [](std::string key) {return key.find("pass") == std::string::npos;});
-    C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
-    response.setLabel("configuration_options");
-    C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true);
-    options.setLabel("configuration_options");
-    std::string value;
-    for (auto key : keys) {
-      C2ContentResponse option(Operation::ACKNOWLEDGE);
-      option.name = key;
-      if (configuration_->get(key, value)) {
-        option.operation_arguments[key] = value;
-        options.addContent(std::move(option));
-      }
-    }
-    response.addPayload(std::move(options));
-    enqueue_c2_response(std::move(response));
+    auto configOptions = prepareConfigurationOptions(resp);
+    enqueue_c2_response(std::move(configOptions));
     return;
   } else if (resp.name == "manifest") {
-    auto keys = configuration_->getConfiguredKeys();
-    C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
-    response.setLabel("configuration_options");
-    C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true);
-    options.setLabel("configuration_options");
-    std::string value;
-    for (auto key : keys) {
-      C2ContentResponse option(Operation::ACKNOWLEDGE);
-      option.name = key;
-      if (configuration_->get(key, value)) {
-        option.operation_arguments[key] = value;
-        options.addContent(std::move(option));
-      }
-    }
-    response.addPayload(std::move(options));
+    C2Payload response(prepareConfigurationOptions(resp));
+
+    auto reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_);
+    if (reporter != nullptr) {
+      std::vector<std::shared_ptr<state::response::ResponseNode>> metrics_vec;
 
-    if (device_information_.size() > 0) {
-      C2Payload deviceInfo(Operation::HEARTBEAT);
-      deviceInfo.setLabel("AgentInformation");
+      C2Payload agentInfo(Operation::ACKNOWLEDGE, resp.ident, false, true);
+      agentInfo.setLabel("agentInfo");
 
-      for (auto metric : device_information_) {
-        C2Payload child_metric_payload(Operation::HEARTBEAT);
-        child_metric_payload.setLabel(metric.first);
-        if (metric.second->isArray()) {
-          child_metric_payload.setContainer(true);
-        }
-        serializeMetrics(child_metric_payload, metric.first, metric.second->serialize(), metric.second->isArray());
-        deviceInfo.addPayload(std::move(child_metric_payload));
+      reporter->getManifestNodes(metrics_vec);
+      for (const auto& metric : metrics_vec) {
+          serializeMetrics(agentInfo, metric->getName(), metric->serialize());
       }
-      response.addPayload(std::move(deviceInfo));
+      response.addPayload(std::move(agentInfo));
     }
 
     enqueue_c2_response(std::move(response));
@@ -581,7 +576,6 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) {
       }
       auto keys = configuration_->getConfiguredKeys();
       C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
-      response.setLabel("configuration_options");
       for (const auto &trace : traces) {
         C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true);
         options.setLabel(trace.getName());
@@ -596,6 +590,7 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) {
       }
       enqueue_c2_response(std::move(response));
     }
+    return;
   }
   C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
   enqueue_c2_response(std::move(response));
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 0cc4cc6..30f771a 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -35,7 +35,7 @@ class IntegrationBase {
  public:
   IntegrationBase(uint64_t waitTime = DEFAULT_WAITTIME_MSECS);
 
-  virtual ~IntegrationBase();
+  virtual ~IntegrationBase() = default;
 
   virtual void run(std::string test_file_location);
 
@@ -75,6 +75,10 @@ class IntegrationBase {
 
   }
 
+  virtual void configureC2RootClasses() {
+
+  }
+
   virtual void updateProperties(std::shared_ptr<minifi::FlowController> fc) {
 
   }
@@ -92,9 +96,6 @@ IntegrationBase::IntegrationBase(uint64_t waitTime)
       wait_time_(waitTime) {
 }
 
-IntegrationBase::~IntegrationBase() {
-}
-
 void IntegrationBase::configureSecurity() {
   if (!key_dir.empty()) {
     configuration->set(minifi::Configure::nifi_security_client_certificate, key_dir + "cn.crt.pem");
@@ -126,6 +127,8 @@ void IntegrationBase::run(std::string test_file_location) {
 
   queryRootProcessGroup(pg);
 
+  configureC2RootClasses();
+
   ptr.release();
 
   std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);