You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2019/02/17 21:49:24 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-558: Initial provisioning for CoAP C2

This is an automated email from the ASF dual-hosted git repository.

aldrin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c99cea  MINIFICPP-558: Initial provisioning for CoAP C2
9c99cea is described below

commit 9c99cea2381b0bea19a6fa46bc11f53e8f87dc67
Author: Marc Parisi <ph...@apache.org>
AuthorDate: Tue Oct 23 11:51:19 2018 -0400

    MINIFICPP-558: Initial provisioning for CoAP C2
    
    This closes #447.
    
    Signed-off-by: Aldrin Piri <al...@apache.org>
---
 C2.md                                              |  25 +-
 CMakeLists.txt                                     |   7 +-
 CMakeSettings.json                                 |  13 +-
 appveyor.yml                                       |   2 +-
 bootstrap.sh                                       |   8 +-
 bstrp_functions.sh                                 |   6 +-
 centos.sh                                          |   2 +
 darwin.sh                                          |   4 +
 debian.sh                                          |   2 +
 docker/test/integration/minifi/test/__init__.py    |   1 -
 extensions/bustache/CMakeLists.txt                 |   4 +-
 extensions/coap/CMakeLists.txt                     |  93 +++++++
 .../PayloadSerializer.cpp => coap/COAPLoader.cpp}  |  24 +-
 extensions/coap/COAPLoader.h                       |  81 ++++++
 .../coap/controllerservice/CoapConnector.cpp       |  99 +++++++
 extensions/coap/controllerservice/CoapConnector.h  | 138 ++++++++++
 extensions/coap/controllerservice/CoapMessaging.h  | 113 ++++++++
 extensions/coap/controllerservice/CoapResponse.h   | 114 ++++++++
 extensions/coap/nanofi/coap_connection.c           | 126 +++++++++
 extensions/coap/nanofi/coap_connection.h           |  68 +++++
 extensions/coap/nanofi/coap_functions.c            | 208 ++++++++++++++
 extensions/coap/nanofi/coap_functions.h            | 126 +++++++++
 .../nanofi/coap_message.c}                         |  25 +-
 .../nanofi/coap_message.h}                         |  49 ++--
 extensions/coap/nanofi/coap_server.c               |  76 ++++++
 extensions/coap/nanofi/coap_server.h               |  83 ++++++
 extensions/coap/protocols/CoapC2Protocol.cpp       | 300 +++++++++++++++++++++
 extensions/coap/protocols/CoapC2Protocol.h         | 149 ++++++++++
 .../server/CoapServer.cpp}                         |  21 +-
 extensions/coap/server/CoapServer.h                | 245 +++++++++++++++++
 extensions/coap/tests/CMakeLists.txt               |  55 ++++
 .../tests/CoapC2VerifyHeartbeat.cpp}               |  92 ++++++-
 .../tests/CoapIntegrationBase.h}                   |  51 +++-
 extensions/gps/CMakeLists.txt                      |   4 +-
 extensions/http-curl/client/HTTPClient.cpp         |  13 +-
 extensions/http-curl/protocols/RESTSender.cpp      |   6 +-
 extensions/http-curl/protocols/RESTSender.h        |  10 +-
 extensions/http-curl/tests/C2NullConfiguration.cpp |   2 +-
 extensions/http-curl/tests/C2UpdateAgentTest.cpp   |   4 +-
 .../http-curl/tests/C2VerifyHeartbeatAndStop.cpp   |   2 +-
 .../http-curl/tests/C2VerifyServeResults.cpp       |   2 +-
 extensions/http-curl/tests/HTTPIntegrationBase.h   |  10 +-
 extensions/http-curl/tests/HTTPSiteToSiteTests.cpp |   4 +-
 .../http-curl/tests/HttpPostIntegrationTest.cpp    |   2 +-
 extensions/http-curl/tests/SiteToSiteRestTest.cpp  |   2 +-
 extensions/mqtt/processors/ConvertBase.cpp         |   2 +-
 extensions/mqtt/processors/ConvertHeartBeat.cpp    |   4 +-
 extensions/mqtt/processors/ConvertJSONAck.cpp      |   4 +-
 extensions/mqtt/protocol/MQTTC2Protocol.cpp        |   4 +-
 extensions/mqtt/protocol/MQTTC2Protocol.h          |   2 +-
 fedora.sh                                          |   3 +
 libminifi/include/c2/C2Payload.h                   |   8 +-
 libminifi/include/c2/C2Protocol.h                  |   2 +-
 libminifi/include/c2/PayloadParser.h               | 188 +++++++++++++
 .../include/c2}/PayloadSerializer.h                |  22 +-
 libminifi/include/c2/protocols/RESTProtocol.h      |  10 +
 libminifi/src/FlowController.cpp                   |   2 +-
 libminifi/src/c2/C2Agent.cpp                       |  50 +++-
 libminifi/src/c2/C2Payload.cpp                     |   8 +-
 .../src/c2}/PayloadSerializer.cpp                  |   4 +-
 libminifi/src/c2/protocols/RESTProtocol.cpp        |   4 +-
 libminifi/test/coap-tests/CMakeLists.txt           |  35 +++
 libminifi/test/integration/IntegrationBase.h       |   3 +-
 libminifi/test/resources/CoapC2VerifyServe.yml     |  73 +++++
 libminifi/test/unit/PayloadParserTests.cpp         | 115 ++++++++
 65 files changed, 2858 insertions(+), 156 deletions(-)

diff --git a/C2.md b/C2.md
index c6c1572..aa7e322 100644
--- a/C2.md
+++ b/C2.md
@@ -141,9 +141,28 @@ configuration produces the following JSON:
 ### Protocols
 
 The default protocol is a RESTFul service; however, there is an MQTT protocol with a translation to use the 
-RESTFul C2 server. This is useful for cases where an MQTT C2 server isn't available, or enclave partioning
-requires a single ingress/egress through a gateway. In these classes of devices, MQTT can be used as the intermediate
-or RESTSender can be used for C2 operations.
+RESTFul C2 server and a CoAP Protocol implementation. The CoAP protocol requires that COAP be enabled either
+through the bootstrap or the cmake flag -DENABLE_COAP=ON .
+
+Once configured, COAP uses a controller service within the flow OR minifi properties entries: nifi.c2.agent.coap.host and nifi.c2.agent.coap.port.
+Note that with CoAP, the payload will be significantly smaller, paring down metrics that are sent in each heartbeat. This will be useful for
+constrained environments. 
+
+	nifi.c2.agent.coap.host=hostname
+	nifi.c2.agent.coap.port=<port number>
+   
+   
+If you wish to use the Controller service you made add a controller service named CoapConnectorService with the properties in the example config
+below. Note that Max Queue Size is the only non-required property:
+
+	Controller Services:
+	- id: 94491a38-015a-1000-0000-000000000001
+	  name: coapservice
+	  class: CoapConnectorService
+	  Properties:
+	    Remote Server: server
+	    Remote Port: port
+	    Max Queue Size: 1000
 
 As defined, above, MQTTC2Protocol can be used for the agent protocol class. If you wish to communicate with a RESTFul C2 server,
 you may use the ConvertBase, ConvertHeartBeat, ConvertJSONAack, and ConvertUpdate classes on an agent to perform the translation.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a500992..a00d4c1 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -27,7 +27,7 @@ option(SKIP_TESTS "Skips building all tests." OFF)
 option(PORTABLE "Instructs the compiler to remove architecture specific optimizations" ON)
 option(USE_SYSTEM_OPENSSL "Instructs the build system to search for and use an SSL library available in the host system" ON)
 option(OPENSSL_OFF "Disables OpenSSL" OFF)
-option(ENABLE_OPS "Enable Operations Tools" ON)
+option(ENABLE_OPS "Enable Operations/zlib Tools" ON)
 option(USE_SYSTEM_UUID "Instructs the build system to search for and use an UUID library available in the host system" OFF)
 option(USE_SYSTEM_CURL "Instructs the build system to search for and use a cURL library available in the host system" ON)
 option(BUILD_SHARED_LIBS "Build yaml cpp shared lib" OFF)
@@ -419,6 +419,11 @@ if (ENABLE_ALL OR ENABLE_GPS)
 	createExtension(GPS-EXTENSION "GPS EXTENSIONS" "Enables LibGPS Functionality and the GetGPS processor." "extensions/gps" "${TEST_DIR}/gps-tests")
 endif()
 
+option(ENABLE_COAP "Enables the CoAP extension." OFF)
+if (ENABLE_ALL OR ENABLE_COAP STREQUAL "ON")
+	createExtension(COAP-EXTENSION "COAP EXTENSIONS" "Enables LibCOAP Functionality." "extensions/coap" "extensions/coap/tests/")
+endif()
+
 if (WIN32)
 option(ENABLE_WEL "Enables the suite of Windows Event Log extensions." OFF)
 if (ENABLE_ALL OR ENABLE_WEL)
diff --git a/CMakeSettings.json b/CMakeSettings.json
index e58a362..486eff5 100644
--- a/CMakeSettings.json
+++ b/CMakeSettings.json
@@ -44,6 +44,10 @@
           "value": "FALSE"
         },
         {
+            "name": "ENABLE_COAP",
+            "value": "OFF"
+        },
+        {
           "name": "ENABLE_WEL",
           "value": "TRUE"
         },
@@ -75,7 +79,6 @@
           "name": "SKIP_TESTS",
           "value": "TRUE"
         }
-
       ],
       "ctestCommandArgs": ""
     },
@@ -139,6 +142,14 @@
           "value": "OFF"
         },
         {
+          "name": "FORCE_WINDOWS",
+          "value": "ON"
+        },
+        {
+          "name": "ENABLE_COAP",
+          "value": "OFF"
+        },
+        {
           "name": "DISABLE_LIBARCHIVE",
           "value": "TRUE"
         },
diff --git a/appveyor.yml b/appveyor.yml
index 0f1e3af..82e6316 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -22,7 +22,7 @@ build_script:
    - cd C:\projects\nifi-minifi-cpp
    - mkdir build & exit 0
    - cd build
-   - cmake -g"Ninja" -DWIN32=WIN32 -DOPENSSL_OFF=ON -DUSE_SYSTEM_ZLIB=OFF -DFORCE_WINDOWS=ON -DUSE_SYSTEM_CURL=OFF -DUSE_SYSTEM_UUID=OFF -DDISABLE_ROCKSDB=ON -DDISABLE_CURL=ON -DDISABLE_LIBARCHIVE=ON -DDISABLE_SCRIPTING=ON -DEXCLUDE_BOOST=ON -DDISABLE_EXPRESSION_LANGUAGE=ON  -DENABLE_WEL=TRUE -DSKIP_TESTS=ON ..
+   - cmake -g"Ninja" -DWIN32=WIN32 -DOPENSSL_OFF=ON -DUSE_SYSTEM_ZLIB=OFF -DFORCE_WINDOWS=ON -DUSE_SYSTEM_CURL=OFF -DUSE_SYSTEM_UUID=OFF -DDISABLE_ROCKSDB=ON -DDISABLE_CURL=ON -DDISABLE_LIBARCHIVE=ON -DDISABLE_SCRIPTING=ON -DEXCLUDE_BOOST=ON -DDISABLE_EXPRESSION_LANGUAGE=ON -DENABLE_COAP=OFF -DENABLE_WEL=TRUE -DSKIP_TESTS=ON ..
    
     set msbuild_platform=x64 
    - msbuild nifi-minifi-cpp.sln
diff --git a/bootstrap.sh b/bootstrap.sh
index 064d87a..2acd303 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -271,6 +271,10 @@ add_disabled_option MQTT_ENABLED ${FALSE} "ENABLE_MQTT"
 add_disabled_option PYTHON_ENABLED ${FALSE} "ENABLE_PYTHON"
 add_dependency PYTHON_ENABLED "python"
 
+add_disabled_option COAP_ENABLED ${FALSE} "ENABLE_COAP"
+add_dependency COAP_ENABLED "automake"
+add_dependency COAP_ENABLED "libtool"
+
 TESTS_DISABLED=${FALSE}
 add_disabled_option SQLITE_ENABLED ${FALSE} "ENABLE_SQLITE"
 
@@ -377,7 +381,7 @@ build_cmake_command(){
         fi
       done
       if [ "$FOUND" = "1" ]; then
-        CMAKE_BUILD_COMMAND="${CMAKE_BUILD_COMMAND} -D${FOUND_VALUE}=TRUE"
+        CMAKE_BUILD_COMMAND="${CMAKE_BUILD_COMMAND} -D${FOUND_VALUE}=ON"
       fi
     else
       FOUND=""
@@ -393,7 +397,7 @@ build_cmake_command(){
         done
       fi
       if [ "$FOUND" = "1" ]; then
-        CMAKE_BUILD_COMMAND="${CMAKE_BUILD_COMMAND} -D${FOUND_VALUE}=TRUE"
+        CMAKE_BUILD_COMMAND="${CMAKE_BUILD_COMMAND} -D${FOUND_VALUE}=ON"
       fi
     fi
   done
diff --git a/bstrp_functions.sh b/bstrp_functions.sh
index 32bcde1..0938ed0 100755
--- a/bstrp_functions.sh
+++ b/bstrp_functions.sh
@@ -257,6 +257,7 @@ show_supported_features() {
   echo "L. MQTT Support ................$(print_feature_status MQTT_ENABLED)"
   echo "M. SQLite Support ..............$(print_feature_status SQLITE_ENABLED)"
   echo "N. Python Support ..............$(print_feature_status PYTHON_ENABLED)"
+  echo "O. COAP Support ................$(print_feature_status COAP_ENABLED)"
   echo "****************************************"
   echo "            Build Options."
   echo "****************************************"
@@ -273,7 +274,7 @@ show_supported_features() {
 
 read_feature_options(){
   local choice
-  read -p "Enter choice [ A - N ] " choice
+  read -p "Enter choice [ A - P or 1-2 ] " choice
   choice=$(echo ${choice} | tr '[:upper:]' '[:lower:]')
   case $choice in
     a) ToggleFeature ROCKSDB_ENABLED ;;
@@ -290,6 +291,7 @@ read_feature_options(){
     l) ToggleFeature MQTT_ENABLED ;;
     m) ToggleFeature SQLLITE_ENABLED ;;
     n) ToggleFeature PYTHON_ENABLED ;;
+    o) ToggleFeature COAP_ENABLED ;;
     1) ToggleFeature TESTS_DISABLED ;;
     2) EnableAllFeatures ;;
     p) FEATURES_SELECTED="true" ;;
@@ -298,7 +300,7 @@ read_feature_options(){
       fi
       ;;
     q) exit 0;;
-    *) echo -e "${RED}Please enter an option A-L...${NO_COLOR}" && sleep 2
+    *) echo -e "${RED}Please enter an option A-P or 1-2...${NO_COLOR}" && sleep 2
   esac
 }
 
diff --git a/centos.sh b/centos.sh
index f99d51a..06c77d6 100644
--- a/centos.sh
+++ b/centos.sh
@@ -137,6 +137,8 @@ build_deps(){
             install_bison
           elif [ "$FOUND_VALUE" = "flex" ]; then
             INSTALLED+=("flex")
+          elif [ "$FOUND_VALUE" = "automake" ]; then
+            INSTALLED+=("automake")
           elif [ "$FOUND_VALUE" = "python" ]; then
             INSTALLED+=("python34-devel")
           elif [ "$FOUND_VALUE" = "lua" ]; then
diff --git a/darwin.sh b/darwin.sh
index e02e1eb..d7d574a 100644
--- a/darwin.sh
+++ b/darwin.sh
@@ -102,6 +102,10 @@ build_deps(){
             INSTALLED+=("boost")
           elif [ "$FOUND_VALUE" = "lua" ]; then
             INSTALLED+=("lua")
+          elif [ "$FOUND_VALUE" = "libtool" ]; then
+            INSTALLED+=("libtool")
+          elif [ "$FOUND_VALUE" = "automake" ]; then
+            INSTALLED+=("automake")
           elif [ "$FOUND_VALUE" = "gpsd" ]; then
             INSTALLED+=("gpsd")
           elif [ "$FOUND_VALUE" = "libarchive" ]; then
diff --git a/debian.sh b/debian.sh
index 8a8ae25..ba1ec7d 100644
--- a/debian.sh
+++ b/debian.sh
@@ -69,6 +69,8 @@ build_deps(){
             INSTALLED+=("libpython3-dev")
           elif [ "$FOUND_VALUE" = "lua" ]; then
             INSTALLED+=("liblua5.1-0-dev")
+          elif [ "$FOUND_VALUE" = "automake" ]; then
+            INSTALLED+=("automake")
           elif [ "$FOUND_VALUE" = "gpsd" ]; then
             INSTALLED+=("libgps-dev")
           elif [ "$FOUND_VALUE" = "libarchive" ]; then
diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py
index 3687a0f..d25e538 100644
--- a/docker/test/integration/minifi/test/__init__.py
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -228,7 +228,6 @@ class SingleFileOutputValidator(OutputValidator):
 
         return False
 
-
 class SegfaultValidator(OutputValidator):
     """
     Validate that a file was received.
diff --git a/extensions/bustache/CMakeLists.txt b/extensions/bustache/CMakeLists.txt
index 4ae25a0..2efb586 100644
--- a/extensions/bustache/CMakeLists.txt
+++ b/extensions/bustache/CMakeLists.txt
@@ -17,9 +17,9 @@
 # under the License.
 #
 
-cmake_minimum_required(VERSION 2.6)
 
-include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) 
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+cmake_minimum_required(VERSION 2.6)
 
 find_package(Boost COMPONENTS system filesystem iostreams REQUIRED)
 include_directories(${Boost_INCLUDE_DIRS})
diff --git a/extensions/coap/CMakeLists.txt b/extensions/coap/CMakeLists.txt
new file mode 100644
index 0000000..3f38328
--- /dev/null
+++ b/extensions/coap/CMakeLists.txt
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+include_directories(protocols nanofi controllerservice server)
+include_directories(../http-curl/)
+
+file(GLOB CSOURCES "nanofi/*.c")
+file(GLOB SOURCES "*.cpp" "protocols/*.cpp" "processors/*.cpp" "controllerservice/*.cpp" "server/*.cpp" )
+
+add_library(nanofi-coap-c STATIC ${CSOURCES})
+add_library(minifi-coap STATIC ${SOURCES})
+set_property(TARGET minifi-coap PROPERTY POSITION_INDEPENDENT_CODE ON)
+
+if(CMAKE_THREAD_LIBS_INIT)
+  target_link_libraries(minifi-coap "${CMAKE_THREAD_LIBS_INIT}")
+endif()
+
+  set(BASE_DIR "${CMAKE_CURRENT_BINARY_DIR}/extensions/coap")
+  # determine version of GNUTLSs
+  if (APPLE)
+  set(BYPRODUCT "${BASE_DIR}/extensions/coap/thirdparty/libcoap-src/.libs/libcoap-2.a")
+  else()
+  set(BYPRODUCT "${BASE_DIR}/extensions/coap/thirdparty/libcoap-src/.libs/libcoap-2.a")
+  endif()
+  set(DIR "${BASE_DIR}/extensions/coap/thirdparty/libcoap-src")
+  ExternalProject_Add(
+    coap-external
+    GIT_REPOSITORY "https://github.com/obgm/libcoap.git"
+    GIT_TAG "00486a4f46e0278dd24a8ff3411416ff420cde29" 
+    PREFIX "${BASE_DIR}/extensions/coap/thirdparty/libcoap"
+    BUILD_IN_SOURCE true
+    SOURCE_DIR "${DIR}"
+    BUILD_COMMAND make
+    CMAKE_COMMAND ""
+    UPDATE_COMMAND ""
+    BUILD_BYPRODUCTS ${BYPRODUCT} 
+    INSTALL_COMMAND cmake -E echo "Skipping install step."
+    CONFIGURE_COMMAND ""
+    PATCH_COMMAND ./autogen.sh && ./configure --disable-examples --disable-tests --disable-documentation
+    STEP_TARGETS build
+    EXCLUDE_FROM_ALL TRUE
+  )
+  add_definitions("-DWITH_POSIX=1")
+
+  add_library(coaplib STATIC IMPORTED)
+  set_target_properties(coaplib PROPERTIES IMPORTED_LOCATION "${BYPRODUCT}")
+  add_dependencies(coaplib coap-external)
+  set(COAP_FOUND "YES" CACHE STRING "" FORCE)
+  set(COAP_INCLUDE_DIRS "${DIR}/include" CACHE STRING "" FORCE)
+  set(COAP_LIBRARIES coaplib CACHE STRING "" FORCE)
+  set(COAP_LIBRARY coaplib CACHE STRING "" FORCE)
+  set(COAP_LIBRARY coaplib CACHE STRING "" FORCE)
+target_link_libraries(minifi-coap ${CMAKE_DL_LIBS})
+
+include_directories(${COAP_INCLUDE_DIRS})
+
+target_link_libraries (nanofi-coap-c ${COAP_LIBRARIES})
+target_link_libraries (nanofi-coap-c ${HTTP-CURL})
+target_link_libraries (minifi-coap nanofi-coap-c ${COAP_LIBRARIES})
+if (WIN32)
+    set_target_properties(minifi-coap PROPERTIES
+        LINK_FLAGS "/WHOLEARCHIVE"
+    )
+elseif (APPLE)
+    set_target_properties(minifi-coap PROPERTIES
+        LINK_FLAGS "-Wl,-all_load"
+    )
+else ()
+    set_target_properties(minifi-coap PROPERTIES
+        LINK_FLAGS "-Wl,--whole-archive"
+    )
+endif ()
+
+SET (COAP-EXTENSION minifi-coap PARENT_SCOPE)
+register_extension(minifi-coap)
diff --git a/extensions/mqtt/protocol/PayloadSerializer.cpp b/extensions/coap/COAPLoader.cpp
similarity index 69%
copy from extensions/mqtt/protocol/PayloadSerializer.cpp
copy to extensions/coap/COAPLoader.cpp
index 5f62227..ea49cd6 100644
--- a/extensions/mqtt/protocol/PayloadSerializer.cpp
+++ b/extensions/coap/COAPLoader.cpp
@@ -15,24 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "PayloadSerializer.h"
+#include "core/FlowConfiguration.h"
+#include "COAPLoader.h"
+
+bool COAPObjectFactory::added = core::FlowConfiguration::add_static_func("createCOAPFactory");
+extern "C" {
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-namespace mqtt {
 
-PayloadSerializer::PayloadSerializer() {
-}
 
-PayloadSerializer::~PayloadSerializer() {
+void *createCOAPFactory(void) {
+  return new COAPObjectFactory();
 }
 
-} /* namespace mqtt */
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}
diff --git a/extensions/coap/COAPLoader.h b/extensions/coap/COAPLoader.h
new file mode 100644
index 0000000..ce6383f
--- /dev/null
+++ b/extensions/coap/COAPLoader.h
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_COAPLOADER_H_
+#define EXTENSIONS_COAPLOADER_H_
+
+#ifdef WIN32
+#pragma comment(lib, "wldap32.lib" )
+#pragma comment(lib, "crypt32.lib" )
+#pragma comment(lib, "Ws2_32.lib")
+#endif
+
+#include "core/ClassLoader.h"
+#include "utils/StringUtils.h"
+#include "protocols/CoapC2Protocol.h"
+
+/**
+ * Object factory class loader for this extension.
+ * Can add extensions to the default class loader through REGISTER_RESOURCE,
+ * but we want to ensure this factory is used specifically for CoAP and not the default loader.
+ */
+class COAPObjectFactory : public core::ObjectFactory {
+ public:
+  COAPObjectFactory() {
+
+  }
+
+  /**
+   * Gets the name of the object.
+   * @return class name of processor
+   */
+  virtual std::string getName() override{
+    return "COAPObjectFactory";
+  }
+
+  virtual std::string getClassName() override{
+    return "COAPObjectFactory";
+  }
+  /**
+   * Gets the class name for the object
+   * @return class name for the processor.
+   */
+  virtual std::vector<std::string> getClassNames() override{
+    std::vector<std::string> class_names;
+    class_names.push_back("CoapProtocol");
+    class_names.push_back("CoapConnectorService");
+    return class_names;
+  }
+
+  virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) override{
+    if (utils::StringUtils::equalsIgnoreCase(class_name, "CoapProtocol")) {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::coap::c2::CoapProtocol>());
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, "CoapConnectorService")) {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::coap::controllers::CoapConnectorService>());
+    } else {
+      return nullptr;
+    }
+  }
+
+  static bool added;
+
+};
+
+extern "C" {
+	DLL_EXPORT void *createCOAPFactory(void);
+}
+#endif /* EXTENSIONS_COAPLOADER_H_ */
diff --git a/extensions/coap/controllerservice/CoapConnector.cpp b/extensions/coap/controllerservice/CoapConnector.cpp
new file mode 100644
index 0000000..1d152ff
--- /dev/null
+++ b/extensions/coap/controllerservice/CoapConnector.cpp
@@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CoapConnector.h"
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+#include <string>
+#include <memory>
+#include <set>
+#include "core/Property.h"
+#include "CoapConnector.h"
+#include "io/validation.h"
+#include "properties/Configure.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace coap {
+namespace controllers {
+
+static core::Property RemoteServer;
+static core::Property Port;
+static core::Property MaxQueueSize;
+
+core::Property CoapConnectorService::RemoteServer(core::PropertyBuilder::createProperty("Remote Server")->withDescription("Remote CoAP server")->isRequired(true)->build());
+core::Property CoapConnectorService::Port(
+    core::PropertyBuilder::createProperty("Remote Port")->withDescription("Remote CoAP server port")->withDefaultValue<uint64_t>(8181)->isRequired(true)->build());
+core::Property CoapConnectorService::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Queue Size")->withDescription("Max queue size for received data ")->withDefaultValue<uint64_t>(1000)->isRequired(false)->build());
+
+void CoapConnectorService::initialize() {
+  if (initialized_)
+    return;
+
+  CoapMessaging::getInstance();
+
+  std::lock_guard<std::mutex> lock(initialization_mutex_);
+
+  ControllerService::initialize();
+
+  initializeProperties();
+
+  initialized_ = true;
+}
+
+void CoapConnectorService::onEnable() {
+  std::string port_str;
+  if (getProperty(RemoteServer.getName(), host_) && !host_.empty() && getProperty(Port.getName(), port_str) && !port_str.empty()) {
+    core::Property::StringToInt(port_str, port_);
+  } else {
+    // this is the case where we aren't being used in the context of a single controller service.
+    if (configuration_->get("nifi.c2.agent.coap.host", host_) && configuration_->get("nifi.c2.agent.coap.port", port_str)) {
+      core::Property::StringToInt(port_str, port_);
+    }
+
+  }
+}
+
+CoapResponse CoapConnectorService::sendPayload(uint8_t type, const std::string &endpoint, const CoapMessage *message) {
+  // internally we are dealing with CoAPMessage in the two way communication, but the C++ ControllerService
+  // will provide a CoAPResponse
+  auto pdu = create_connection(type, host_.c_str(), endpoint.c_str(), port_, message);
+  send_pdu(pdu);
+  auto response = CoapMessaging::getInstance().pop(pdu->ctx);
+  free_pdu(pdu);
+  return response;
+}
+
+void CoapConnectorService::initializeProperties() {
+  std::set<core::Property> supportedProperties;
+  supportedProperties.insert(RemoteServer);
+  supportedProperties.insert(Port);
+  supportedProperties.insert(MaxQueueSize);
+  setSupportedProperties(supportedProperties);
+}
+
+} /* namespace controllers */
+} /* namespace coap */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/coap/controllerservice/CoapConnector.h b/extensions/coap/controllerservice/CoapConnector.h
new file mode 100644
index 0000000..1570b90
--- /dev/null
+++ b/extensions/coap/controllerservice/CoapConnector.h
@@ -0,0 +1,138 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_COAPCONNECTOR_H_
+#define LIBMINIFI_INCLUDE_CONTROLLERS_COAPCONNECTOR_H_
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+
+#include "CoapResponse.h"
+#include "CoapMessaging.h"
+#include "coap_functions.h"
+#include "coap_connection.h"
+#include "coap_message.h"
+#include <memory>
+#include <unordered_map>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace coap {
+namespace controllers {
+
+/**
+ * Purpose and Justification: Controller services function as a layerable way to provide
+ * services to internal services. While a controller service is generally configured from the flow,
+ * we want to follow the open closed principle and provide CoAP services to other components.
+ */
+class CoapConnectorService : public core::controller::ControllerService {
+ public:
+
+  /**
+   * Constructors for the controller service.
+   */
+  explicit CoapConnectorService(const std::string &name, const std::string &id)
+      : ControllerService(name, id),
+        port_(0),
+        initialized_(false),
+        logger_(logging::LoggerFactory<CoapConnectorService>::getLogger()) {
+    initialize();
+  }
+
+  explicit CoapConnectorService(const std::string &name, utils::Identifier uuid = utils::Identifier())
+      : ControllerService(name, uuid),
+        port_(0),
+        initialized_(false),
+        logger_(logging::LoggerFactory<CoapConnectorService>::getLogger()) {
+    initialize();
+  }
+
+  explicit CoapConnectorService(const std::string &name, const std::shared_ptr<Configure> &configuration)
+      : ControllerService(name),
+        port_(0),
+        initialized_(false),
+        logger_(logging::LoggerFactory<CoapConnectorService>::getLogger()) {
+    setConfiguration(configuration);
+    initialize();
+  }
+
+  /**
+   * Parameters needed.
+   */
+  static core::Property RemoteServer;
+  static core::Property Port;
+  static core::Property MaxQueueSize;
+
+  virtual void initialize();
+
+  void yield() {
+
+  }
+
+  bool isRunning() {
+    return getState() == core::controller::ControllerServiceState::ENABLED;
+  }
+
+  bool isWorkAvailable() {
+    return false;
+  }
+
+  virtual void onEnable();
+
+  /**
+   * Sends the payload to the endpoint, returning the response as we await. Will retry transmission
+   * @param type type of payload to endpoint interaction ( GET, POST, PUT, DELETE ).
+   * @param end endpoint is the connecting endpoint on the server
+   * @param payload is the data to be sent
+   * @param size size of the payload to be sent
+   * @return CoAPMessage that contains the response code and data, if any.
+   */
+  CoapResponse sendPayload(uint8_t type, const std::string &endpoint, const CoapMessage * const message);
+
+ protected:
+
+  void initializeProperties();
+
+  // connector mutex to controll access to the mapping, above.
+  std::mutex connector_mutex_;
+
+  // initialization mutex.
+  std::mutex initialization_mutex_;
+
+  std::atomic<bool> initialized_;
+
+ private:
+
+  // host connecting to.
+  std::string host_;
+  // port connecting to
+  unsigned int port_;
+
+  std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namespace controllers */
+} /* namespace coap */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_COAPCONNECTOR_H_ */
diff --git a/extensions/coap/controllerservice/CoapMessaging.h b/extensions/coap/controllerservice/CoapMessaging.h
new file mode 100644
index 0000000..281e5a7
--- /dev/null
+++ b/extensions/coap/controllerservice/CoapMessaging.h
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_COAP_CONTROLLERSERVICE_COAPMESSAGING_H_
+#define EXTENSIONS_COAP_CONTROLLERSERVICE_COAPMESSAGING_H_
+
+#include "CoapResponse.h"
+#include "coap_functions.h"
+#include "coap_connection.h"
+#include "coap_message.h"
+#include <memory>
+#include <unordered_map>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace coap {
+namespace controllers {
+
+class CoapMessaging {
+ public:
+  static CoapMessaging &getInstance() {
+    static CoapMessaging instance;
+    return instance;
+  }
+
+  /**
+   * Determines if the pointer is present in the internal map.
+   */
+  bool hasResponse(coap_context_t *ctx) const {
+    std::lock_guard<std::mutex> lock(connector_mutex_);
+    return messages_.find(ctx) != messages_.end();
+  }
+  /**
+   * Returns a response if one exists.
+   */
+  CoapResponse pop(const coap_context_t * const ctx) {
+    CoapResponse response(-1);
+    std::lock_guard<std::mutex> lock(connector_mutex_);
+    auto msg = messages_.find(const_cast<coap_context_t*>(ctx));
+    if (msg != std::end(messages_)) {
+      response = std::move(msg->second);
+      messages_.erase(const_cast<coap_context_t*>(ctx));
+    }
+    return response;
+  }
+ protected:
+
+  /**
+   * Intended to receive errors from the context.
+   */
+  static void receiveError(void *receiver_context, coap_context_t *ctx, unsigned int code) {
+    CoapMessaging *connector = static_cast<CoapMessaging*>(receiver_context);
+    CoapResponse message(code);
+    connector->enqueueResponse(ctx, std::move(message));
+  }
+
+  /**
+   * Receives messages from the context.
+   */
+  static void receiveMessage(void *receiver_context, coap_context_t *ctx, CoapMessage * const msg) {
+    CoapMessaging *connector = static_cast<CoapMessaging*>(receiver_context);
+    CoapResponse message(msg);
+    connector->enqueueResponse(ctx, std::move(message));
+  }
+
+  void enqueueResponse(coap_context_t *ctx, CoapResponse &&msg) {
+    std::lock_guard<std::mutex> lock(connector_mutex_);
+    messages_.insert(std::make_pair(ctx, std::move(msg)));
+  }
+
+ private:
+  /**
+   * Private constructor since this is intended to be a singleton
+   */
+  CoapMessaging() {
+    callback_pointers ptrs;
+    ptrs.data_received = receiveMessage;
+    ptrs.received_error = receiveError;
+    init_coap_api(this, &ptrs);
+
+  }
+  // connector mutex. mutable since it's used within hasResponse.
+  mutable std::mutex connector_mutex_;
+  // map of messages based on the context. We only allow a single message per context
+  // at any given time.
+  std::unordered_map<coap_context_t*, CoapResponse> messages_;
+
+};
+
+} /* namespace controllers */
+} /* namespace coap */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* EXTENSIONS_COAP_CONTROLLERSERVICE_COAPMESSAGING_H_ */
diff --git a/extensions/coap/controllerservice/CoapResponse.h b/extensions/coap/controllerservice/CoapResponse.h
new file mode 100644
index 0000000..a8c9e33
--- /dev/null
+++ b/extensions/coap/controllerservice/CoapResponse.h
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_COAPRESPONSE_H_
+#define LIBMINIFI_INCLUDE_CONTROLLERS_COAPRESPONSE_H_
+
+#include "coap_message.h"
+
+#include <memory>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace coap {
+namespace controllers {
+/**
+ * Purpose and Justification :CoapMessage is in internal message format that is sent to and from consumers of this controller service.
+ *
+ */
+class CoapResponse {
+ public:
+
+  /**
+   * Creates a CoAPResponse to a CoAPMessage. Takes ownership of the argument
+   * and copies the data.
+   */
+  explicit CoapResponse(CoapMessage * const msg)
+      : code_(msg->code_),
+        size_(msg->size_) {
+    // we take ownership of data_;
+    data_ = std::unique_ptr<uint8_t>(new uint8_t[msg->size_]);
+    memcpy(data_.get(), msg->data_, size_);
+    free_coap_message(msg);
+  }
+
+  explicit CoapResponse(uint32_t code)
+      : code_(code),
+        size_(0),
+        data_(nullptr) {
+
+  }
+
+  CoapResponse(const CoapResponse &other) = delete;
+
+  CoapResponse(CoapResponse &&other) = default;
+
+  ~CoapResponse() {
+  }
+
+  /**
+   * Retrieve the size of the coap response.
+   * @return size_t of size
+   */
+  size_t getSize() const {
+    return size_;
+  }
+
+  /**
+   * Returns a const pointer to the constant data.
+   * @return data pointer.
+   */
+  const uint8_t * const getData() const {
+    return data_.get();
+  }
+
+  /**
+   * Returns the response code from the data.
+   * @return data.
+   */
+  uint32_t getCode() const {
+    return code_;
+  }
+
+  /**
+   * Ease of use function to take ownership of the CoAPResponse.
+   * @param data, data pointer.
+   * @size_t size of the data.
+   */
+  void takeOwnership(uint8_t **data, size_t &size) {
+    size = size_;
+    *data = data_.release();
+  }
+
+  CoapResponse &operator=(const CoapResponse &other) = delete;
+  CoapResponse &operator=(CoapResponse &&other) = default;
+ private:
+  uint32_t code_;
+  size_t size_;
+  std::unique_ptr<uint8_t> data_;
+};
+
+} /* namespace controllers */
+} /* namespace coap */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_COAPRESPONSE_H_ */
diff --git a/extensions/coap/nanofi/coap_connection.c b/extensions/coap/nanofi/coap_connection.c
new file mode 100644
index 0000000..c49c3e2
--- /dev/null
+++ b/extensions/coap/nanofi/coap_connection.c
@@ -0,0 +1,126 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "coap_connection.h"
+
+
+CoapPDU *create_connection(uint8_t type, const char * const server, const char * const endpoint, int port, const CoapMessage * const message) {
+  CoapPDU *pdu = (CoapPDU*)malloc(sizeof(CoapPDU));
+
+  pdu->ctx = NULL;
+  pdu->session = NULL;
+
+  coap_uri_t uri;
+  uri.host.s = (uint8_t*)server; // may be a loss in resolution, but hostnames should be char *
+  uri.host.length = strlen(server);
+  uri.path.s = (uint8_t*)endpoint; // ^ same as above for paths.
+  uri.path.length = strlen(endpoint);
+  uri.port = port;
+  uri.scheme = COAP_URI_SCHEME_COAP;
+
+  fd_set readfds;
+  coap_pdu_t* request;
+  unsigned char get_method = 1;
+
+  int res = resolve_address(&uri.host, &pdu->dst_addr.addr.sa);
+  if (res < 0) {
+    return NULL;
+  }
+
+  pdu->dst_addr.size = res;
+  pdu->dst_addr.addr.sin.sin_port = htons(uri.port);
+
+  void *addrptr = NULL;
+  char port_str[NI_MAXSERV] = "0";
+
+  char node_str[NI_MAXHOST] = "";
+  switch (pdu->dst_addr.addr.sa.sa_family) {
+    case AF_INET:
+      addrptr = &pdu->dst_addr.addr.sin.sin_addr;
+      if (!create_session(&pdu->ctx, &pdu->session, node_str[0] == 0 ? "0.0.0.0" : node_str, port_str, &pdu->dst_addr)) {
+        break;
+      } else {
+        return NULL;
+      }
+    case AF_INET6:
+      addrptr = &pdu->dst_addr.addr.sin6.sin6_addr;
+      if (!create_session(&pdu->ctx, &pdu->session, node_str[0] == 0 ? "::" : node_str, port_str, &pdu->dst_addr)) {
+        break;
+      } else {
+        return NULL;
+      }
+    default:
+      ;
+  }
+
+  // we want to register handlers in the event that an error occurs or nack is returned
+  // from the library
+  coap_register_event_handler(pdu->ctx, coap_event);
+  coap_register_nack_handler(pdu->ctx, no_acknowledgement);
+
+  coap_context_set_keepalive(pdu->ctx, 1);
+
+  coap_str_const_t pld;
+  pld.length = message->size_;
+  pld.s = message->data_;
+
+  coap_register_option(pdu->ctx, COAP_OPTION_BLOCK2);
+
+  // set the response handler
+  coap_register_response_handler(pdu->ctx, response_handler);
+
+  pdu->session->max_retransmit = 1;
+  pdu->optlist = NULL;
+
+  // add the URI option to the options list
+  coap_insert_optlist(&pdu->optlist, coap_new_optlist(COAP_OPTION_URI_PATH, uri.path.length, uri.path.s));
+
+  // next, create the PDU.
+  if (!(request = create_request(pdu->ctx, pdu->session, &pdu->optlist, type, &pld)))
+    return NULL;
+
+  // send the PDU using the session.
+  coap_send(pdu->session, request);
+  return pdu;
+}
+
+int8_t send_pdu(const CoapPDU * const pdu) {
+  uint64_t wait_ms = 1 * 200;
+  // run once will attempt to send the first time
+  int runResponse = coap_run_once(pdu->ctx, wait_ms);
+  // if no data is received, we will attempt re-transmission
+  // until the number of attempts has been reached.
+  while (!coap_can_exit(pdu->ctx)) {
+    runResponse = coap_run_once(pdu->ctx, wait_ms);
+  }
+  if (runResponse < 0)
+    return -1;
+  else
+    return 0;
+}
+
+int8_t free_pdu(CoapPDU * pdu) {
+  if (NULL == pdu) {
+    return -1;
+  }
+  coap_delete_optlist(pdu->optlist);
+  coap_session_release(pdu->session);
+  coap_free_context(pdu->ctx);
+  free(pdu);
+  return 0;
+}
+
diff --git a/extensions/coap/nanofi/coap_connection.h b/extensions/coap/nanofi/coap_connection.h
new file mode 100644
index 0000000..6a762d2
--- /dev/null
+++ b/extensions/coap/nanofi/coap_connection.h
@@ -0,0 +1,68 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_COAP_NANOFI_COAP_CONNECTION_H_
+#define EXTENSIONS_COAP_NANOFI_COAP_CONNECTION_H_
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <coap2/coap.h>
+#include <netdb.h>
+#include "coap_message.h"
+#include "coap_functions.h"
+
+
+
+typedef struct {
+  struct coap_context_t* ctx;
+  struct coap_session_t* session;
+  coap_address_t dst_addr;
+  coap_address_t src_addr;
+  coap_optlist_t *optlist;
+} CoapPDU;
+
+
+/**
+ * Creates a connection to the server
+ * @param type connection type
+ * @param server server name
+ * @param endpoint endpoint to connect to
+ * @param port CoAP port
+ * @param message message to send
+ * @return CoAPPDU object.
+ */
+CoapPDU *create_connection(uint8_t type, const char * const server, const char * const endpoint, int port, const CoapMessage * const message);
+
+/**
+ * Sends the pdu
+ * @param pdu to send
+ * @return result 0 if success, failure otherwise
+ */
+int8_t send_pdu(const CoapPDU *const pdu);
+/**
+ * Frees the pdu
+ * @param pdu to free
+ * @return result 0 if success, failure otherwise
+ */
+
+int8_t free_pdu(CoapPDU *pdu);
+
+#ifdef __cplusplus
+}
+#endif
+#endif /* EXTENSIONS_COAP_NANOFI_COAP_CONNECTION_H_ */
diff --git a/extensions/coap/nanofi/coap_functions.c b/extensions/coap/nanofi/coap_functions.c
new file mode 100644
index 0000000..c66cf3d
--- /dev/null
+++ b/extensions/coap/nanofi/coap_functions.c
@@ -0,0 +1,208 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "coap_functions.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Initialize the API access. Not thread safe.
+ */
+void init_coap_api(void *rcvr, callback_pointers *ptrs) {
+  global_ptrs.data_received = ptrs->data_received;
+  global_ptrs.received_error = ptrs->received_error;
+  receiver = rcvr;
+}
+
+int create_session(coap_context_t **ctx, coap_session_t **session, const char *node, const char *port, coap_address_t *dst_addr) {
+  int getaddrres;
+  struct addrinfo hints;
+  coap_proto_t proto = COAP_PROTO_UDP;
+  struct addrinfo *result, *rp;
+
+  memset(&hints, 0, sizeof(struct addrinfo));
+  hints.ai_family = AF_UNSPEC;  // ipv4 or ipv6
+  hints.ai_socktype = COAP_PROTO_RELIABLE(proto) ? SOCK_STREAM : SOCK_DGRAM;
+  hints.ai_flags = AI_PASSIVE | AI_NUMERICHOST | AI_NUMERICSERV | AI_ALL;
+
+  getaddrres = getaddrinfo(node, port, &hints, &result);
+  if (getaddrres != 0) {
+    return -1;
+  }
+
+  for (rp = result; rp != NULL; rp = rp->ai_next) {
+    coap_address_t addr;
+
+    if (rp->ai_addrlen <= sizeof(addr.addr)) {
+      coap_address_init(&addr);
+      addr.size = rp->ai_addrlen;
+      memcpy(&addr.addr, rp->ai_addr, rp->ai_addrlen);
+
+      *ctx = coap_new_context(0x00);
+
+      *session = coap_new_client_session(*ctx, &addr, dst_addr, proto);
+      if (*ctx && *session) {
+        freeaddrinfo(result);
+        return 0;
+      }
+    }
+  }
+
+  freeaddrinfo(result);
+  return -2;
+}
+
+int create_endpoint_context(coap_context_t **ctx, const char *node, const char *port) {
+  struct addrinfo hints;
+  coap_proto_t proto = COAP_PROTO_UDP;
+  struct addrinfo *result, *rp;
+
+  memset(&hints, 0, sizeof(struct addrinfo));
+  hints.ai_family = AF_UNSPEC;  // ipv4 or ipv6
+  hints.ai_socktype = SOCK_DGRAM;
+  hints.ai_flags = AI_PASSIVE | AI_NUMERICHOST ;
+
+  int getaddrres = getaddrinfo(node, port, &hints, &result);
+  if (getaddrres != 0) {
+    return -1;
+  }
+
+  for (rp = result; rp != NULL; rp = rp->ai_next) {
+    coap_address_t addr;
+
+    if (rp->ai_addrlen <= sizeof(addr.addr)) {
+      coap_address_init(&addr);
+      addr.size = rp->ai_addrlen;
+      memcpy(&addr.addr, rp->ai_addr, rp->ai_addrlen);
+
+      *ctx = coap_new_context(0x00);
+
+      coap_endpoint_t * ep_udp = coap_new_endpoint(*ctx, &addr, COAP_PROTO_UDP);
+
+      if (*ctx && ep_udp) {
+        freeaddrinfo(result);
+        return 0;
+      }
+    }
+  }
+
+  freeaddrinfo(result);
+  return -2;
+}
+
+struct coap_pdu_t *create_request(struct coap_context_t *ctx, struct coap_session_t *session, coap_optlist_t **optlist, unsigned char code, coap_str_const_t *ptr) {
+  coap_pdu_t *pdu;
+
+  if (!(pdu = coap_new_pdu(session)))
+    return NULL;
+
+  pdu->type = COAP_MESSAGE_CON;
+  pdu->tid = coap_new_message_id(session);
+  pdu->code = code;
+
+  if (optlist) {
+    coap_add_optlist_pdu(pdu, optlist);
+  }
+
+  int flags = 0;
+  coap_add_data(pdu, ptr->length, ptr->s);
+  return pdu;
+}
+
+int coap_event(struct coap_context_t *ctx, coap_event_t event, struct coap_session_t *session) {
+  if (event == COAP_EVENT_SESSION_FAILED && global_ptrs.received_error) {
+    global_ptrs.received_error(receiver, ctx, -1);
+  }
+  return 0;
+}
+
+void no_acknowledgement(struct coap_context_t *ctx, coap_session_t *session, coap_pdu_t *sent, coap_nack_reason_t reason, const coap_tid_t id) {
+  if (global_ptrs.received_error) {
+    global_ptrs.received_error(receiver, ctx, -1);
+  }
+}
+
+void response_handler(struct coap_context_t *ctx, struct coap_session_t *session, coap_pdu_t *sent, coap_pdu_t *received, const coap_tid_t id) {
+  unsigned char* data;
+  size_t data_len;
+  coap_opt_iterator_t opt_iter;
+  coap_opt_t * block_opt = coap_check_option(received, COAP_OPTION_BLOCK1, &opt_iter);
+  if (block_opt) {
+    printf("Block option not currently supported");
+  } else {
+    if (!global_ptrs.data_received) {
+      return;
+    }
+
+    if (COAP_RESPONSE_CLASS(received->code) == 2 || received->code == COAP_RESPONSE_400) {
+      if (global_ptrs.data_received) {
+        CoapMessage * const msg = create_coap_message(received);
+        global_ptrs.data_received(receiver, ctx, msg);
+      }
+    } else {
+      if (global_ptrs.received_error)
+        global_ptrs.received_error(receiver, ctx, received->code);
+    }
+  }
+
+}
+
+int resolve_address(const struct coap_str_const_t *server, struct sockaddr *dst) {
+  struct addrinfo *res, *ainfo;
+  struct addrinfo hints;
+  static char addrstr[256];
+  int error, len = -1;
+
+  memset(addrstr, 0, sizeof(addrstr));
+  if (server->length)
+    memcpy(addrstr, server->s, server->length);
+  else
+    memcpy(addrstr, "127.0.0.1", 9);
+
+  memset((char * ) &hints, 0, sizeof(hints));
+  hints.ai_socktype = SOCK_DGRAM;
+  hints.ai_family = AF_UNSPEC;
+
+  error = getaddrinfo(addrstr, NULL, &hints, &res);
+
+  if (error != 0) {
+    return error;
+  }
+
+  for (ainfo = res; ainfo != NULL; ainfo = ainfo->ai_next) {
+    switch (ainfo->ai_family) {
+      case AF_INET6:
+      case AF_INET:
+        len = ainfo->ai_addrlen;
+        memcpy(dst, ainfo->ai_addr, len);
+        freeaddrinfo(res);
+        return len;
+      default:
+        ;
+    }
+  }
+
+  freeaddrinfo(res);
+  return len;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
diff --git a/extensions/coap/nanofi/coap_functions.h b/extensions/coap/nanofi/coap_functions.h
new file mode 100644
index 0000000..c86362d
--- /dev/null
+++ b/extensions/coap/nanofi/coap_functions.h
@@ -0,0 +1,126 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_COAP_NANOFI_COAP_FUNCTIONS_H_
+#define EXTENSIONS_COAP_NANOFI_COAP_FUNCTIONS_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+
+typedef unsigned char method_t;
+
+#include "coap2/coap.h"
+#include "coap2/uri.h"
+#include "coap2/address.h"
+#include <stdio.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <string.h>
+#include <netdb.h>
+#include "coap_message.h"
+
+
+typedef struct {
+  void (*data_received)(void *receiver_context, struct coap_context_t *ctx, CoapMessage *const);
+  void (*received_error)(void *receiver_context, struct coap_context_t *ctx, unsigned int code);
+} callback_pointers;
+
+// defines the context specific data for the data receiver
+static void *receiver;
+static callback_pointers global_ptrs;
+
+/**
+ * Initialize the API access. Not thread safe.
+ */
+void init_coap_api(void *rcvr, callback_pointers *ptrs);
+
+/**
+ * Creates a CoAP session. Provide it double pointer to the context and session to instantiate those structs.
+ * @param ctx coap context
+ * @param session coap session
+ * @param node device node name
+ * @param dst_addr destination address
+ * @return 0 if success -1 otherwise.
+ */
+int create_session(coap_context_t **ctx, coap_session_t **session, const char *node, const char *port, coap_address_t *dst_addr);
+
+/**
+ * Creates an endpoint context
+ * @param ctx pointer to a context
+ * @param node device node name
+ * @param port device port
+ */
+int create_endpoint_context(coap_context_t **ctx, const char *node, const char *port);
+
+/**
+ * Creates a request object with an already allocated context and session. The option list is sent in as an arry ptr.
+ * @param ctx coap context
+ * @param session coap session
+ * @param optlist option list array
+ * @param code coap request code
+ * @param ptr ptr to the coap payload
+ * @returns pointer to a newly formed PDU ( protocol data unit )
+ */
+struct coap_pdu_t *create_request(struct coap_context_t *ctx, struct coap_session_t *session, coap_optlist_t **optlist, unsigned char code, coap_str_const_t *ptr);
+
+/**
+ * Function can be used to receive coap events
+ * @param ctx context that performed event
+ * @param event coap event launched
+ * @param session session that performed event.
+ * @return 0 as a success ( events in this library aren't noteworthy to PDU )
+ */
+int coap_event(struct coap_context_t *ctx, coap_event_t event, struct coap_session_t *session);
+
+/**
+ * Function can be used when a noack is received from the library
+ * @param ctx context that performed event
+ * @param session session that performed event.
+ * @parma sent pdu that was sent
+ * @param reason reason PDU was not acked
+ * @param event coap event launched
+ * @param id id of ack
+ */
+void no_acknowledgement(struct coap_context_t *ctx, coap_session_t *session, coap_pdu_t *sent, coap_nack_reason_t reason, const coap_tid_t id);
+
+/**
+ * Responser handler is the function launched when data is received
+ * @param ctx context
+ * @param session coap session
+ * @param sent PDU that was sent
+ * @param received PDU that was received
+ * @param id id of ack
+ */
+void response_handler(struct coap_context_t *ctx, struct coap_session_t *session, coap_pdu_t *sent, coap_pdu_t *received, const coap_tid_t id);
+
+/**
+ * Resolves the destination address of the server and places that into dst
+ * @param server server to connect to
+ * @param dst destination pointer
+ * @return 0 if sucess -1 otherwise
+ */
+int resolve_address(const struct coap_str_const_t *server, struct sockaddr *dst);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* EXTENSIONS_COAP_NANOFI_COAP_FUNCTIONS_H_ */
+
diff --git a/extensions/mqtt/protocol/PayloadSerializer.cpp b/extensions/coap/nanofi/coap_message.c
similarity index 69%
copy from extensions/mqtt/protocol/PayloadSerializer.cpp
copy to extensions/coap/nanofi/coap_message.c
index 5f62227..403bf5f 100644
--- a/extensions/mqtt/protocol/PayloadSerializer.cpp
+++ b/extensions/coap/nanofi/coap_message.c
@@ -15,24 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "PayloadSerializer.h"
+#include "coap_message.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-namespace mqtt {
-
-PayloadSerializer::PayloadSerializer() {
+CoapMessage * const create_coap_message(const coap_pdu_t * const pdu) {
+  CoapMessage *message = (CoapMessage*) malloc(sizeof(CoapMessage));
+  coap_get_data(pdu, &message->size_, &message->data_);
+  message->code_ = pdu->code;
+  return message;
 }
 
-PayloadSerializer::~PayloadSerializer() {
+void free_coap_message(CoapMessage *msg) {
+  free(msg);
 }
-
-} /* namespace mqtt */
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/extensions/mqtt/protocol/PayloadSerializer.cpp b/extensions/coap/nanofi/coap_message.h
similarity index 50%
copy from extensions/mqtt/protocol/PayloadSerializer.cpp
copy to extensions/coap/nanofi/coap_message.h
index 5f62227..f2c4486 100644
--- a/extensions/mqtt/protocol/PayloadSerializer.cpp
+++ b/extensions/coap/nanofi/coap_message.h
@@ -15,24 +15,41 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "PayloadSerializer.h"
+#ifndef EXTENSIONS_COAP_NANOFI_COAP_MESSAGE_H
+#define EXTENSIONS_COAP_NANOFI_COAP_MESSAGE_H
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-namespace mqtt {
 
-PayloadSerializer::PayloadSerializer() {
-}
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <coap2/coap.h>
+#include <stdint.h>
+#include <stdio.h>
+
+
+
+/**
+ * CoAP-2 in libcoap uses uint8_t *  while the first version uses a different type, so we will have to cast
+ * the data. We have to keep this in mind with the API that we use.
+ */
+typedef struct {
+  uint32_t code_;
+  size_t size_;
+  uint8_t *data_;
+} CoapMessage;
+
+/**
+ * Create a new CoAMessage, taking ownership of the aforementioned buffers
+ */
+CoapMessage * const create_coap_message(const coap_pdu_t * const pdu);
+/**
+ * FRee the CoAP messages that are provided.
+ */
+void free_coap_message(CoapMessage *msg);
 
-PayloadSerializer::~PayloadSerializer() {
+#ifdef __cplusplus
 }
+#endif
 
-} /* namespace mqtt */
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+#endif /* EXTENSIONS_COAP_NANOFI_COAP_CONNECTION_H_ */
diff --git a/extensions/coap/nanofi/coap_server.c b/extensions/coap/nanofi/coap_server.c
new file mode 100644
index 0000000..cd94bef
--- /dev/null
+++ b/extensions/coap/nanofi/coap_server.c
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "coap_server.h"
+#include "coap_functions.h"
+/**
+ * Create a new CoAPServer
+ */
+CoapServerContext * const create_server(const char *const server_hostname, const char * const port){
+  CoapServerContext *server = (CoapServerContext*)malloc(sizeof(CoapServerContext));
+  memset(server,0x00, sizeof(CoapServerContext));
+  if ( create_endpoint_context(&server->ctx,server_hostname,port) ) {
+    free_server(server);
+  }
+
+  return server;
+}
+
+CoapEndpoint *const create_endpoint(CoapServerContext * const server, const char * const resource_path, uint8_t method, coap_method_handler_t handler){
+  CoapEndpoint *endpoint = (CoapEndpoint*)malloc(sizeof(CoapEndpoint));
+  memset(endpoint,0x00, sizeof(CoapEndpoint));
+  endpoint->server = server;
+  int8_t flags = COAP_RESOURCE_FLAGS_NOTIFY_CON;
+  coap_str_const_t *path = NULL;
+  if (NULL != resource_path){
+      path = coap_new_str_const((const uint8_t *)resource_path,strlen(resource_path));
+  }
+  endpoint->resource = coap_resource_init(path, flags);
+  coap_add_attr(endpoint->resource, coap_make_str_const("title"), coap_make_str_const("\"Internal Clock\""), 0);
+  assert( !add_endpoint(endpoint,method,handler) );
+  coap_add_resource(server->ctx,endpoint->resource);
+  if (path){
+    coap_delete_str_const(path);
+  }
+  return endpoint;
+
+}
+
+int8_t add_endpoint(CoapEndpoint * const endpoint, uint8_t method, coap_method_handler_t handler){
+  if (endpoint == NULL || handler == NULL)
+    return -1;
+
+  coap_register_handler(endpoint->resource, method, handler);
+  return 0;
+}
+
+
+/**
+ * FRee the CoAP messages that are provided.
+ */
+void free_endpoint(CoapEndpoint * const endpoint){
+  if (endpoint){
+    free((void*)endpoint);
+  }
+}
+void free_server(CoapServerContext * const server){
+  if (server){
+    coap_delete_all_resources( server->ctx );
+    coap_free_context( server->ctx );
+    free(server);
+  }
+}
diff --git a/extensions/coap/nanofi/coap_server.h b/extensions/coap/nanofi/coap_server.h
new file mode 100644
index 0000000..f4292a7
--- /dev/null
+++ b/extensions/coap/nanofi/coap_server.h
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_COAP_NANOFI_COAP_SERVER_H
+#define EXTENSIONS_COAP_NANOFI_COAP_SERVER_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <coap2/coap.h>
+
+#include <stdint.h>
+#include <stdio.h>
+
+/**
+ * CoAP-2 in libcoap uses uint8_t *  while the first version uses a different type, so we will have to cast
+ * the data. We have to keep this in mind with the API that we use.
+ */
+typedef struct {
+  struct coap_context_t* ctx;
+  coap_address_t src_addr;
+  coap_optlist_t *optlist;
+} CoapServerContext;
+
+
+typedef struct {
+  CoapServerContext *server;
+  coap_resource_t *resource;
+} CoapEndpoint;
+
+
+/**
+ * Create a new CoAPServer using the host name provide
+ * @param server_hostname hostname
+ * @param port port requested
+ * @param title title of base resource
+ * @return CoAPServer structure.
+ */
+CoapServerContext * const create_server(const char *const server_hostname, const char * const port);
+
+/**
+ * Creates an endpoint for the provided service context
+ */
+CoapEndpoint * const create_endpoint(CoapServerContext * const, const char * const resource_path, uint8_t method, coap_method_handler_t handler);
+
+/**
+ * Adds an endpoint to the provided CoapEndpoint structure
+ * @param endpoint endpoint we are adding the handler to
+ * @method method we're adding for CoAP messages
+ * @param handler handler we're using for the endpoint.
+ */
+int8_t add_endpoint(CoapEndpoint * const endpoint, uint8_t method, coap_method_handler_t handler);
+
+
+/**
+ * Free the CoAP messages that are provided.
+ */
+void free_endpoint(CoapEndpoint * const);
+/**
+ * Frees the CoAP Server context.
+ */
+void free_server(CoapServerContext * const);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* EXTENSIONS_COAP_NANOFI_COAP_SERVER_H */
diff --git a/extensions/coap/protocols/CoapC2Protocol.cpp b/extensions/coap/protocols/CoapC2Protocol.cpp
new file mode 100644
index 0000000..1eb8f63
--- /dev/null
+++ b/extensions/coap/protocols/CoapC2Protocol.cpp
@@ -0,0 +1,300 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "CoapC2Protocol.h"
+#include "c2/PayloadSerializer.h"
+#include "c2/PayloadParser.h"
+#include "coap_functions.h"
+#include "coap_message.h"
+#include "io/BaseStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace coap {
+namespace c2 {
+
+uint8_t CoapProtocol::REGISTRATION_MSG[8] = { 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72 };
+
+CoapProtocol::CoapProtocol(const std::string &name, const utils::Identifier &uuid)
+    : RESTSender(name, uuid),
+      require_registration_(false),
+      logger_(logging::LoggerFactory<CoapProtocol>::getLogger()) {
+}
+
+CoapProtocol::~CoapProtocol() {
+}
+
+void CoapProtocol::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+  RESTSender::initialize(controller, configure);
+  if (configure->get("nifi.c2.coap.connector.service", controller_service_name_)) {
+    auto service = controller->getControllerService(controller_service_name_);
+    coap_service_ = std::static_pointer_cast<coap::controllers::CoapConnectorService>(service);
+  } else {
+    logger_->log_info("No CoAP connector configured, so using default service");
+    coap_service_ = std::make_shared<coap::controllers::CoapConnectorService>("cs", configure);
+    coap_service_->onEnable();
+  }
+}
+
+minifi::c2::C2Payload CoapProtocol::consumePayload(const std::string &url, const minifi::c2::C2Payload &payload, minifi::c2::Direction direction, bool async) {
+  return RESTSender::consumePayload(url, payload, direction, false);
+}
+
+int CoapProtocol::writeAcknowledgement(io::BaseStream *stream, const minifi::c2::C2Payload &payload) {
+  auto ident = payload.getIdentifier();
+  auto state = payload.getStatus().getState();
+  stream->writeUTF(ident);
+  uint8_t payloadState = 0;
+  switch (state) {
+    case state::UpdateState::NESTED:
+    case state::UpdateState::INITIATE:
+    case state::UpdateState::FULLY_APPLIED:
+    case state::UpdateState::READ_COMPLETE:
+      payloadState = 0;
+      break;
+    case state::UpdateState::NOT_APPLIED:
+    case state::UpdateState::PARTIALLY_APPLIED:
+      payloadState = 1;
+      break;
+    case state::UpdateState::READ_ERROR:
+      payloadState = 2;
+      break;
+    case state::UpdateState::SET_ERROR:
+      payloadState = 3;
+      break;
+  }
+  stream->write(&payloadState, 1);
+  return 0;
+}
+
+int CoapProtocol::writeHeartbeat(io::BaseStream *stream, const minifi::c2::C2Payload &payload) {
+  bool byte;
+  uint16_t size = 0;
+
+  logger_->log_trace("Writing heartbeat");
+
+  try {
+    const std::string deviceIdent = minifi::c2::PayloadParser::getInstance(payload).in("deviceInfo").getAs<std::string>("identifier");
+
+    const std::string agentIdent = minifi::c2::PayloadParser::getInstance(payload).in("agentInfo").getAs<std::string>("identifier");
+
+    stream->writeUTF(deviceIdent, false);
+
+    logger_->log_trace("Writing heartbeat with device Ident %s and agent Ident %s", deviceIdent, agentIdent);
+
+    if (agentIdent.empty()) {
+      return -1;
+    }
+    stream->writeUTF(agentIdent, false);
+
+    try {
+      auto flowInfoParser = minifi::c2::PayloadParser::getInstance(payload).in("flowInfo");
+      auto componentParser = flowInfoParser.in("components");
+      auto queueParser = flowInfoParser.in("queues");
+      auto vfsParser = flowInfoParser.in("versionedFlowSnapshotURI");
+      byte = true;
+      stream->write(byte);
+      size = componentParser.getSize();
+      stream->write(size);
+
+      componentParser.foreach([this, stream](const minifi::c2::C2Payload &component) {
+        auto myParser = minifi::c2::PayloadParser::getInstance(component);
+        bool running = false;
+        stream->writeUTF(component.getLabel());
+        try {
+          running = myParser.getAs<bool>("running");
+        }
+        catch(const minifi::c2::PayloadParseException &e) {
+          logger_->log_error("Could not find running in components");
+        }
+        stream->write(running);
+      });
+      size = queueParser.getSize();
+      stream->write(size);
+      queueParser.foreach([this, stream](const minifi::c2::C2Payload &component) {
+        auto myParser = minifi::c2::PayloadParser::getInstance(component);
+        stream->writeUTF(component.getLabel());
+        uint64_t datasize = 0, datasizemax = 0, qsize = 0, sizemax = 0;
+        try {
+          datasize = myParser.getAs<uint64_t>("dataSize");
+          datasizemax = myParser.getAs<uint64_t>("dataSizeMax");
+          qsize = myParser.getAs<uint64_t>("size");
+          sizemax = myParser.getAs<uint64_t>("sizeMax");
+        }
+        catch(const minifi::c2::PayloadParseException &e) {
+          logger_->log_error("Could not find queue sizes");
+        }
+        stream->write(datasize);
+        stream->write(datasizemax);
+        stream->write(qsize);
+        stream->write(sizemax);
+      });
+
+      auto bucketId = vfsParser.getAs<std::string>("bucketId");
+      auto flowId = vfsParser.getAs<std::string>("flowId");
+
+      stream->writeUTF(bucketId);
+      stream->writeUTF(flowId);
+
+    } catch (const minifi::c2::PayloadParseException &pe) {
+      logger_->log_error("Parser exception occurred, but is ignorable, reason %s", pe.what());
+      // okay to ignore
+      byte = false;
+      stream->write(byte);
+    }
+  } catch (const minifi::c2::PayloadParseException &e) {
+    logger_->log_error("Parser exception occurred, reason %s", e.what());
+    return -1;
+  }
+  return 0;
+}
+
+minifi::c2::Operation CoapProtocol::getOperation(int type) const {
+  switch (type) {
+    case 0:
+      return minifi::c2::ACKNOWLEDGE;
+    case 1:
+      return minifi::c2::HEARTBEAT;
+    case 2:
+      return minifi::c2::CLEAR;
+    case 3:
+      return minifi::c2::DESCRIBE;
+    case 4:
+      return minifi::c2::RESTART;
+    case 5:
+      return minifi::c2::START;
+    case 6:
+      return minifi::c2::UPDATE;
+    case 7:
+      return minifi::c2::STOP;
+  }
+  return minifi::c2::ACKNOWLEDGE;
+}
+
+minifi::c2::C2Payload CoapProtocol::serialize(const minifi::c2::C2Payload &payload) {
+  if (nullptr == coap_service_) {
+    // return an error if we don't have a coap service
+    logger_->log_error("CoAP service requested without any configured hostname or port");
+    return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+  }
+
+  if (require_registration_) {
+    logger_->log_debug("Server requested agent registration, so attempting");
+    auto response = minifi::c2::RESTSender::consumePayload(rest_uri_, payload, minifi::c2::TRANSMIT, false);
+    if (response.getStatus().getState() == state::UpdateState::READ_ERROR) {
+      logger_->log_trace("Could not register");
+      return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
+    } else {
+      logger_->log_trace("Registered agent.");
+    }
+    require_registration_ = false;
+
+    return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
+
+  }
+
+  uint16_t version = 0;
+  uint8_t payload_type = 0;
+  uint64_t payload_u64 = 0;
+  uint16_t size = 0;
+  io::BaseStream stream;
+
+  stream.write(version);
+  std::string endpoint = "heartbeat";
+  switch (payload.getOperation()) {
+    case minifi::c2::ACKNOWLEDGE:
+      endpoint = "acknowledge";
+      payload_type = 0;
+      stream.write(&payload_type, 1);
+      if (writeAcknowledgement(&stream, payload) != 0) {
+        return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+      }
+      break;
+    case minifi::c2::HEARTBEAT:
+      payload_type = 1;
+      stream.write(&payload_type, 1);
+      if (writeHeartbeat(&stream, payload) != 0) {
+        logger_->log_error("Could not write heartbeat");
+        return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+      }
+      break;
+    default:
+      logger_->log_error("Could not identify operation");
+      return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+  };
+
+  size_t bsize = stream.getSize();
+
+  CoapMessage msg;
+  msg.data_ = const_cast<uint8_t *>(stream.getBuffer());
+  msg.size_ = bsize;
+
+  coap::controllers::CoapResponse message = coap_service_->sendPayload(COAP_REQUEST_POST, endpoint, &msg);
+
+  if (isRegistrationMessage(message)) {
+    require_registration_ = true;
+  } else if (message.getSize() > 0) {
+    io::DataStream byteStream(message.getData(), message.getSize());
+    io::BaseStream responseStream(&byteStream);
+    responseStream.read(version);
+    responseStream.read(size);
+    logger_->log_trace("Received ack. version %d. number of operations %d", version, size);
+    minifi::c2::C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true);
+    for (int i = 0; i < size; i++) {
+
+      uint8_t operationType;
+      uint16_t argsize = 0;
+      std::string operand, id;
+      REQUIRE_SIZE_IF(1, responseStream.read(operationType))
+      REQUIRE_VALID(responseStream.readUTF(id, false))
+      REQUIRE_VALID(responseStream.readUTF(operand, false))
+
+      logger_->log_trace("Received op %d, with id %s and operand %s", operationType, id, operand);
+      auto newOp = getOperation(operationType);
+      minifi::c2::C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, true);
+      nested_payload.setIdentifier(id);
+      minifi::c2::C2ContentResponse new_command(newOp);
+      new_command.delay = 0;
+      new_command.required = true;
+      new_command.ttl = -1;
+      new_command.name = operand;
+      new_command.ident = id;
+      responseStream.read(argsize);
+      for (int j = 0; j < argsize; j++) {
+        std::string key, value;
+        REQUIRE_VALID(responseStream.readUTF(key))
+        REQUIRE_VALID(responseStream.readUTF(value))
+        new_command.operation_arguments[key] = value;
+      }
+
+      nested_payload.addContent(std::move(new_command));
+      new_payload.addPayload(std::move(nested_payload));
+    }
+    return new_payload;
+  }
+
+  return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+}
+
+} /* namespace c2 */
+} /* namespace coap */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/coap/protocols/CoapC2Protocol.h b/extensions/coap/protocols/CoapC2Protocol.h
new file mode 100644
index 0000000..0fe9395
--- /dev/null
+++ b/extensions/coap/protocols/CoapC2Protocol.h
@@ -0,0 +1,149 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_COAPPROTOCOL_H_
+#define EXTENSIONS_COAPPROTOCOL_H_
+
+#include <algorithm>
+#include <iostream>
+#include <memory>
+#include <utility>
+#include <map>
+#include <string>
+#include <vector>
+
+#include "c2/C2Protocol.h"
+#include "io/BaseStream.h"
+#include "agent/agent_version.h"
+#include "CoapConnector.h"
+
+#include "coap2/coap.h"
+#include "coap2/uri.h"
+#include "coap2/address.h"
+#include <stdio.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <string.h>
+#include "protocols/RESTSender.h"
+
+#undef RAPIDJSON_ASSERT
+#define RAPIDJSON_ASSERT(x) if(!(x)) throw std::logic_error("rapidjson exception"); //NOLINT
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace coap {
+namespace c2 {
+
+#define REQUIRE_VALID(x) \
+  if (-1 == x){ \
+    return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true); \
+  }
+
+#define REQUIRE_SIZE_IF(y,x) \
+  if (y != x){ \
+    return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true); \
+  }
+
+/**
+ * CoAP is the Constrained Application Protocol, which defines a specialized web transfer protocol that can be
+ * used on devices with constrained resources.
+ */
+class CoapProtocol : public minifi::c2::RESTSender {
+ public:
+  explicit CoapProtocol(const std::string &name, const utils::Identifier &uuid = utils::Identifier());
+
+  virtual ~CoapProtocol();
+
+  /**
+   * Consume the payload.
+   * @param url to evaluate.
+   * @param payload payload to consume.
+   * @param direction direction of operation.
+   */
+  virtual minifi::c2::C2Payload consumePayload(const std::string &url, const minifi::c2::C2Payload &payload, minifi::c2::Direction direction, bool async) override;
+
+  virtual minifi::c2::C2Payload consumePayload(const minifi::c2::C2Payload &payload, minifi::c2::Direction direction, bool async) override {
+      return serialize(payload);
+  }
+
+  virtual void update(const std::shared_ptr<Configure> &configure) override {
+    // no op.
+  }
+
+  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
+
+  // Supported Properties
+
+ protected:
+
+  bool isRegistrationMessage(controllers::CoapResponse &response) {
+    if (LIKELY(response.getSize() != 8)) {
+      return false;
+    }
+    return response.getCode() == COAP_RESPONSE_400 && !memcmp(response.getData(), REGISTRATION_MSG, response.getSize());
+  }
+
+  /**
+   * Returns the operation for the translated integer
+   * @param type input type
+   * @return Operation
+   */
+  minifi::c2::Operation getOperation(int type) const;
+
+  /**
+   * Writes a heartbeat to the provided BaseStream ptr.
+   * @param stream BaseStream
+   * @param payload payload to serialize
+   * @return result 0 if success failure otherwise
+   */
+  int writeHeartbeat(io::BaseStream *stream, const minifi::c2::C2Payload &payload);
+
+  /**
+   * Writes a acknowledgement to the provided BaseStream ptr.
+   * @param stream BaseStream
+   * @param payload payload to serialize
+   * @return result 0 if success failure otherwise
+   */
+  int writeAcknowledgement(io::BaseStream *stream, const minifi::c2::C2Payload &payload);
+
+  minifi::c2::C2Payload serialize(const minifi::c2::C2Payload &payload);
+
+  std::shared_ptr<coap::controllers::CoapConnectorService> coap_service_;
+
+  std::mutex protocol_mutex_;
+
+  bool require_registration_;
+
+  std::string controller_service_name_;
+
+ private:
+
+  static uint8_t REGISTRATION_MSG[8];
+
+  std::shared_ptr<logging::Logger> logger_;
+
+};
+} /* namespace c2 */
+} /* namespace coap */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* EXTENSIONS_COAPPROTOCOL_H_ */
diff --git a/extensions/mqtt/protocol/PayloadSerializer.cpp b/extensions/coap/server/CoapServer.cpp
similarity index 75%
copy from extensions/mqtt/protocol/PayloadSerializer.cpp
copy to extensions/coap/server/CoapServer.cpp
index 5f62227..d8cbd7a 100644
--- a/extensions/mqtt/protocol/PayloadSerializer.cpp
+++ b/extensions/coap/server/CoapServer.cpp
@@ -15,23 +15,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "PayloadSerializer.h"
+#include "CoapServer.h"
+#include <coap2/utlist.h>
+#include <coap2/coap.h>
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
-namespace c2 {
-namespace mqtt {
+namespace coap {
 
-PayloadSerializer::PayloadSerializer() {
-}
 
-PayloadSerializer::~PayloadSerializer() {
+std::map<coap_resource_t*, std::function<CoapResponse(CoapQuery)>> CoapServer::functions_;
+CoapServer::~CoapServer() {
+  running_ = false;
+  future.get();
+  if(server_){
+    free_server(server_);
+  }
 }
 
-} /* namespace mqtt */
-} /* namespace c2 */
+
+} /* namespace coap */
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */
diff --git a/extensions/coap/server/CoapServer.h b/extensions/coap/server/CoapServer.h
new file mode 100644
index 0000000..e929925
--- /dev/null
+++ b/extensions/coap/server/CoapServer.h
@@ -0,0 +1,245 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef EXTENSIONS_COAP_SERVER_COAPSERVER_H_
+#define EXTENSIONS_COAP_SERVER_COAPSERVER_H_
+#include "core/Connectable.h"
+#include "coap_server.h"
+#include "coap_message.h"
+#include <coap2/coap.h>
+#include <functional>
+#include <thread>
+#include <future>
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace coap {
+
+enum METHOD {
+  GET,
+  POST,
+  PUT,
+  DELETE
+};
+
+/**
+ * CoapQuery is the request message sent by the client
+ */
+class CoapQuery {
+ public:
+  CoapQuery(const std::string &query, std::unique_ptr<CoapMessage, decltype(&free_coap_message)> message)
+      : query_(query),
+        message_(std::move(message)) {
+
+  }
+  virtual ~CoapQuery() {
+  }
+  CoapQuery(const CoapQuery &qry) = delete;
+  CoapQuery(CoapQuery &&qry) = default;
+
+ private:
+  std::string query_;
+  std::unique_ptr<CoapMessage, decltype(&free_coap_message)> message_;
+};
+
+/**
+ * Coap Response that is generated by the CoapServer to the library
+ */
+class CoapResponse {
+  friend class CoapQuery;
+ public:
+  CoapResponse(int code, std::unique_ptr<uint8_t> data, size_t size)
+      : code_(code),
+        data_(std::move(data)),
+        size_(size) {
+
+  }
+
+  int getCode() const {
+    return code_;
+  }
+  size_t getSize() const {
+    return size_;
+  }
+
+  uint8_t * const getData() const {
+    return data_.get();
+  }
+
+  CoapResponse(const CoapResponse &qry) = delete;
+  CoapResponse(CoapResponse &&qry) = default;
+  CoapResponse &operator=(CoapResponse &&qry) = default;
+ private:
+  int code_;
+  std::unique_ptr<uint8_t> data_;
+  size_t size_;
+};
+
+/**
+ * Wrapper for the coap server functionality using an async callback to perform
+ * custom operations. Intended to be used for testing, but may provide capabilities
+ * elsewhere.
+ */
+class CoapServer : public core::Connectable {
+ public:
+  explicit CoapServer(const std::string &name, const utils::Identifier &uuid)
+      : core::Connectable(name, uuid),
+        server_(nullptr),
+        port_(0) {
+    //TODO: this allows this class to be instantiated via the the class loader
+    //need to define this capability in the future.
+  }
+  CoapServer(const std::string &hostname, uint16_t port)
+      : core::Connectable(hostname),
+        hostname_(hostname),
+        server_(nullptr),
+        port_(port) {
+    coap_startup();
+    auto port_str = std::to_string(port_);
+    //coap_set_log_level(coap_log_t::LOG_DEBUG);
+    server_ = create_server(hostname_.c_str(), port_str.c_str());
+  }
+
+  virtual ~CoapServer();
+
+  void start() {
+    running_ = true;
+
+    future = std::async(std::launch::async, [&]() -> uint64_t {
+      while (running_) {
+        int res = coap_run_once(server_->ctx, 100);
+        if (res < 0 ) {
+          break;
+        }
+        coap_check_notify(server_->ctx);
+      }
+      return 0;
+
+    });
+  }
+
+  void add_endpoint(const std::string &path, METHOD method, std::function<CoapResponse(CoapQuery)> functor) {
+    unsigned char mthd = 0;
+    switch (method) {
+      case GET:
+        mthd = COAP_REQUEST_GET;
+        break;
+      case POST:
+        mthd = COAP_REQUEST_POST;
+        break;
+      case PUT:
+        mthd = COAP_REQUEST_PUT;
+        break;
+      case DELETE:
+        mthd = COAP_REQUEST_DELETE;
+        break;
+    }
+    auto current_endpoint = endpoints_.find(path);
+    if (current_endpoint != endpoints_.end()) {
+      ::add_endpoint(current_endpoint->second, mthd, handle_response_with_passthrough);
+    } else {
+      CoapEndpoint * const endpoint = create_endpoint(server_, path.c_str(), mthd, handle_response_with_passthrough);
+      functions_.insert(std::make_pair(endpoint->resource, functor));
+      endpoints_.insert(std::make_pair(path, endpoint));
+    }
+  }
+
+  void add_endpoint(METHOD method, std::function<CoapResponse(CoapQuery)> functor) {
+    unsigned char mthd = 0;
+    switch (method) {
+      case GET:
+        mthd = COAP_REQUEST_GET;
+        break;
+      case POST:
+        mthd = COAP_REQUEST_POST;
+        break;
+      case PUT:
+        mthd = COAP_REQUEST_PUT;
+        break;
+      case DELETE:
+        mthd = COAP_REQUEST_DELETE;
+        break;
+    }
+    CoapEndpoint * const endpoint = create_endpoint(server_, NULL, mthd, handle_response_with_passthrough);
+    functions_.insert(std::make_pair(endpoint->resource, functor));
+    endpoints_.insert(std::make_pair("", endpoint));
+  }
+
+  /**
+   * Determines if we are connected and operating
+   */
+  virtual bool isRunning() {
+    return running_.load();
+  }
+
+  /**
+   * Block until work is available on any input connection, or the given duration elapses
+   * @param timeoutMs timeout in milliseconds
+   */
+  void waitForWork(uint64_t timeoutMs);
+
+  virtual void yield() {
+
+  }
+
+  /**
+   * Determines if work is available by this connectable
+   * @return boolean if work is available.
+   */
+  virtual bool isWorkAvailable() {
+    return true;
+  }
+
+ protected:
+
+  static void handle_response_with_passthrough(coap_context_t *ctx, struct coap_resource_t *resource, coap_session_t *session, coap_pdu_t *request, coap_binary_t *token, coap_string_t *query,
+                                               coap_pdu_t *response) {
+
+    auto fx = functions_.find(resource);
+    if (fx != functions_.end()) {
+      auto message = create_coap_message(request);
+      CoapQuery qry("", std::unique_ptr<CoapMessage, decltype(&free_coap_message)>(message, free_coap_message));
+      // call the UDF
+      auto udfResponse = fx->second(std::move(qry));
+      response = coap_pdu_init(COAP_MESSAGE_CON, COAP_RESPONSE_CODE(udfResponse.getCode()), coap_new_message_id(session), udfResponse.getSize() + 1);
+      coap_add_data(response, udfResponse.getSize(), udfResponse.getData());
+      if (coap_send(session, response) == COAP_INVALID_TID) {
+        printf("error while returning response");
+      }
+
+    }
+
+  }
+
+  std::future<uint64_t> future;
+  std::atomic<bool> running_;
+  std::string hostname_;
+  CoapServerContext *server_;
+  static std::map<coap_resource_t*, std::function<CoapResponse(CoapQuery)>> functions_;
+  std::map<std::string, CoapEndpoint*> endpoints_;
+  uint16_t port_;
+};
+
+} /* namespace coap */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* EXTENSIONS_COAP_SERVER_COAPSERVER_H_ */
diff --git a/extensions/coap/tests/CMakeLists.txt b/extensions/coap/tests/CMakeLists.txt
new file mode 100644
index 0000000..df97e82
--- /dev/null
+++ b/extensions/coap/tests/CMakeLists.txt
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB CURL_UNIT_TESTS  "unit/*.cpp")
+file(GLOB CURL_INTEGRATION_TESTS "*.cpp")
+
+SET(CURL_INT_TEST_COUNT 0)
+
+FOREACH(testfile ${CURL_INTEGRATION_TESTS})
+  	get_filename_component(testfilename "${testfile}" NAME_WE)
+  	add_executable("${testfilename}" "${testfile}")
+  	target_include_directories(${testfilename} BEFORE PRIVATE ${COAP_INCLUDE_DIRS})
+  	target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS})
+  	target_include_directories(${testfilename} BEFORE PRIVATE "${CIVET_THIRDPARTY_ROOT}/include")
+	target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../server")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../nanofi")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../protocols")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../controllerservice")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../../http-curl/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../../http-curl/client/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../../http-curl/processors/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../../http-curl/protocols/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "../../http-curl/sitetosite/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/civetweb/")
+	target_include_directories(${testfilename} BEFORE PRIVATE ./include)
+    createTests("${testfilename}")
+    if (APPLE)
+    	target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl minifi-coap minifi-civet-extensions)
+	else ()
+  		target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl minifi-coap minifi-civet-extensions -Wl,--no-whole-archive)
+  	endif()
+  MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1")
+ENDFOREACH()
+
+message("-- Finished building ${CURL_INT_TEST_COUNT} CoAP integration test file(s)...")
+
+add_test(NAME CoapC2VerifyHeartbeat COMMAND CoapC2VerifyHeartbeat "${TEST_RESOURCES}/CoapC2VerifyServe.yml" "${TEST_RESOURCES}/" "http://localhost:9888/geturl")
diff --git a/extensions/http-curl/tests/C2VerifyServeResults.cpp b/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
similarity index 55%
copy from extensions/http-curl/tests/C2VerifyServeResults.cpp
copy to extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
index 1d3e03f..e363195 100644
--- a/extensions/http-curl/tests/C2VerifyServeResults.cpp
+++ b/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
@@ -45,11 +45,14 @@
 #include "RemoteProcessorGroupPort.h"
 #include "core/ConfigurableComponent.h"
 #include "controllers/SSLContextService.h"
-#include "TestServer.h"
 #include "c2/C2Agent.h"
 #include "protocols/RESTReceiver.h"
-#include "HTTPIntegrationBase.h"
+#include "CoapIntegrationBase.h"
 #include "processors/LogAttribute.h"
+#include "CoapC2Protocol.h"
+#include "CoapServer.h"
+#include "io/BaseStream.h"
+#include "concurrentqueue.h"
 
 class Responder : public CivetHandler {
  public:
@@ -70,9 +73,9 @@ class Responder : public CivetHandler {
   bool isSecure;
 };
 
-class VerifyC2Server : public HTTPIntegrationBase {
+class VerifyCoAPServer : public CoapIntegrationBase {
  public:
-  explicit VerifyC2Server(bool isSecure)
+  explicit VerifyCoAPServer(bool isSecure)
       : isSecure(isSecure) {
     char format[] = "/tmp/ssth.XXXXXX";
     dir = testController.createTempDirectory(format);
@@ -80,11 +83,12 @@ class VerifyC2Server : public HTTPIntegrationBase {
 
   void testSetup() {
     LogTestController::getInstance().setDebug<utils::HTTPClient>();
-    LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
+    LogTestController::getInstance().setOff<processors::InvokeHTTP>();
     LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
     LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
-    LogTestController::getInstance().setDebug<processors::LogAttribute>();
-    LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
+    LogTestController::getInstance().setTrace<minifi::coap::c2::CoapProtocol>();
+    LogTestController::getInstance().setOff<processors::LogAttribute>();
+    LogTestController::getInstance().setOff<minifi::core::ProcessSession>();
     std::fstream file;
     ss << dir << "/" << "tstFile.ext";
     file.open(ss.str(), std::ios::out);
@@ -97,10 +101,12 @@ class VerifyC2Server : public HTTPIntegrationBase {
   }
 
   void runAssertions() {
-    assert(LogTestController::getInstance().contains("Import offset 0") == true);
-
-    assert(LogTestController::getInstance().contains("Outputting success and response") == true);
 
+    std::this_thread::sleep_for(std::chrono::milliseconds(3000));
+    assert(LogTestController::getInstance().contains("Received ack. version 3. number of operations 1") == true);
+    assert(LogTestController::getInstance().contains("Received ack. version 3. number of operations 0") == true);
+    assert(LogTestController::getInstance().contains("Received error event from protocol") == true);
+    assert(LogTestController::getInstance().contains("Received op 1, with id id and operand operand") == true);
   }
 
   void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
@@ -114,16 +120,76 @@ class VerifyC2Server : public HTTPIntegrationBase {
     inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
 
     std::string port, scheme, path;
+
     parse_http_components(url, port, scheme, path);
+    auto new_port_str = std::to_string(std::stoi(port) + 2);
+
+    server = std::unique_ptr<minifi::coap::CoapServer>(new minifi::coap::CoapServer("127.0.0.1", std::stoi(port) + 2));
+
+    server->add_endpoint(minifi::coap::METHOD::POST, [](minifi::coap::CoapQuery)->minifi::coap::CoapResponse {
+      minifi::coap::CoapResponse response(205,0x00,0);
+      return response;
+
+    });
+
+    {
+      // valid response version 3, 0 ops
+      uint8_t *data = new uint8_t[5] { 0x00, 0x03, 0x00, 0x01, 0x00 };
+      minifi::coap::CoapResponse response(205, std::unique_ptr<uint8_t>(data), 5);
+      responses.enqueue(std::move(response));
+    }
+
+    {
+      // valid response
+      uint8_t *data = new uint8_t[5] { 0x00, 0x03, 0x00, 0x00, 0x00 };
+      minifi::coap::CoapResponse response(205, std::unique_ptr<uint8_t>(data), 5);
+      responses.enqueue(std::move(response));
+    }
+
+    {
+      // should result in valid operation
+      minifi::io::BaseStream stream;
+      uint16_t version=0, size=1;
+      uint8_t operation = 1;
+      stream.write(version);
+      stream.write(size);
+      stream.write(&operation,1);
+      stream.writeUTF("id");
+      stream.writeUTF("operand");
+
+      uint8_t *data = new uint8_t[ stream.getSize() ];
+      memcpy(data,stream.getBuffer(), stream.getSize() );
+      minifi::coap::CoapResponse response(205, std::unique_ptr<uint8_t>(data), stream.getSize());
+      responses.enqueue(std::move(response));
+    }
+
+    server->add_endpoint("heartbeat", minifi::coap::METHOD::POST, [&](minifi::coap::CoapQuery)-> minifi::coap::CoapResponse {
+      if (responses.size_approx() > 0) {
+        minifi::coap::CoapResponse resp(500,0,0);;
+        responses.try_dequeue(resp);
+        return resp;
+      }
+      else {
+        minifi::coap::CoapResponse response(500,0,0);
+        return response;
+      }
+
+    });
+
+    server->start();
     configuration->set("c2.enable", "true");
     configuration->set("c2.agent.class", "test");
-    configuration->set("c2.agent.heartbeat.reporter.classes", "RESTReceiver");
-    configuration->set("c2.rest.listener.port", port);
+    configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformation,FlowInformation,RepositoryMetrics");
+    configuration->set("nifi.c2.agent.protocol.class", "CoapProtocol");
+    configuration->set("nifi.c2.agent.coap.host", "127.0.0.1");
+    configuration->set("nifi.c2.agent.coap.port", new_port_str);
     configuration->set("c2.agent.heartbeat.period", "10");
     configuration->set("c2.rest.listener.heartbeat.rooturi", path);
   }
 
  protected:
+  moodycamel::ConcurrentQueue<minifi::coap::CoapResponse> responses;
+  std::unique_ptr<minifi::coap::CoapServer> server;
   bool isSecure;
   char *dir;
   std::stringstream ss;
@@ -142,7 +208,7 @@ int main(int argc, char **argv) {
     isSecure = true;
   }
 
-  VerifyC2Server harness(isSecure);
+  VerifyCoAPServer harness(isSecure);
 
   harness.setKeyDir(key_dir);
 
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
similarity index 50%
copy from extensions/http-curl/tests/HTTPIntegrationBase.h
copy to extensions/coap/tests/CoapIntegrationBase.h
index 1bc4c72..85af8c6 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -32,30 +32,69 @@ int ssl_enable(void *ssl_context, void *user_data) {
   return 0;
 }
 
-class HTTPIntegrationBase : public IntegrationBase {
+class CoapIntegrationBase : public IntegrationBase {
  public:
-  HTTPIntegrationBase(uint64_t waitTime = 60000)
+  CoapIntegrationBase(uint64_t waitTime = 60000)
       : IntegrationBase(waitTime),
         server(nullptr) {
   }
 
   void setUrl(std::string url, CivetHandler *handler);
 
-  virtual ~HTTPIntegrationBase();
+  virtual ~CoapIntegrationBase();
 
-  void shutdownBeforeFlowController() {
+  void shutdownBeforeFlowController() override {
     stop_webserver(server);
   }
 
+  virtual void run(std::string test_file_location) override {
+    testSetup();
+
+    std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
+    std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
+
+    configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+
+    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    content_repo->initialize(configuration);
+    std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
+    std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+        new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
+
+    core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+
+    std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
+    std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
+
+    queryRootProcessGroup(pg);
+
+    ptr.release();
+
+    std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
+
+    std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
+                                                                                                  true);
+
+    controller->load();
+    controller->start();
+    waitToVerifyProcessor();
+
+    shutdownBeforeFlowController();
+    controller->waitUnload(wait_time_);
+    runAssertions();
+
+    cleanup();
+  }
+
  protected:
   CivetServer *server;
 };
 
-HTTPIntegrationBase::~HTTPIntegrationBase() {
+CoapIntegrationBase::~CoapIntegrationBase() {
 
 }
 
-void HTTPIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
+void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
 
   parse_http_components(url, port, scheme, path);
   struct mg_callbacks callback;
diff --git a/extensions/gps/CMakeLists.txt b/extensions/gps/CMakeLists.txt
index 8d6dd35..8a67e8d 100644
--- a/extensions/gps/CMakeLists.txt
+++ b/extensions/gps/CMakeLists.txt
@@ -17,9 +17,7 @@
 # under the License.
 #
 
-cmake_minimum_required(VERSION 2.6)
-
-include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) 
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
 
 find_package(LibGPS REQUIRED)
 
diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index d607664..3940918 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -250,7 +250,7 @@ bool HTTPClient::submit() {
   curl_easy_getinfo(http_session_, CURLINFO_RESPONSE_CODE, &http_code);
   curl_easy_getinfo(http_session_, CURLINFO_CONTENT_TYPE, &content_type_str_);
   if (res != CURLE_OK) {
-    logger_->log_error("curl_easy_perform() failed %s\n", curl_easy_strerror(res));
+    logger_->log_error("curl_easy_perform() failed %s on %s\n", curl_easy_strerror(res), url_);
     return false;
   }
 
@@ -301,11 +301,11 @@ const char *HTTPClient::getContentType() {
 }
 
 const std::vector<char> &HTTPClient::getResponseBody() {
-  if (response_body_.size() == 0){
-    if (callback && callback->ptr){
+  if (response_body_.size() == 0) {
+    if (callback && callback->ptr) {
       response_body_ = callback->ptr->to_string();
-    }else{
-    response_body_ = read_callback_.to_string();
+    } else {
+      response_body_ = read_callback_.to_string();
     }
   }
   return response_body_;
@@ -384,10 +384,9 @@ bool HTTPClient::isSecure(const std::string &url) {
 }
 
 void HTTPClient::setInterface(const std::string &ifc) {
-	curl_easy_setopt(http_session_, CURLOPT_INTERFACE, ifc.c_str());
+  curl_easy_setopt(http_session_, CURLOPT_INTERFACE, ifc.c_str());
 }
 
-
 } /* namespace utils */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index 9b4ce5e..11d53e2 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -36,7 +36,7 @@ namespace nifi {
 namespace minifi {
 namespace c2 {
 
-RESTSender::RESTSender(std::string name, utils::Identifier uuid)
+RESTSender::RESTSender(const std::string &name, const utils::Identifier &uuid)
     : C2Protocol(name, uuid),
       logger_(logging::LoggerFactory<Connectable>::getLogger()) {
 }
@@ -83,6 +83,9 @@ void RESTSender::update(const std::shared_ptr<Configure> &configure) {
 }
 
 const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) {
+  if (url.empty()){
+    return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+  }
   utils::HTTPClient client(url, ssl_context_service_);
   client.setConnectionTimeout(2);
   std::unique_ptr<utils::ByteInputCallBack> input = nullptr;
@@ -117,6 +120,7 @@ const C2Payload RESTSender::sendPayload(const std::string url, const Direction d
   }
   bool isOkay = client.submit();
   int64_t respCode = client.getResponseCode();
+  auto rs = client.getResponseBody();
   if (isOkay && respCode) {
     if (payload.isRaw()) {
       C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true);
diff --git a/extensions/http-curl/protocols/RESTSender.h b/extensions/http-curl/protocols/RESTSender.h
index 4873ae3..ccb7b03 100644
--- a/extensions/http-curl/protocols/RESTSender.h
+++ b/extensions/http-curl/protocols/RESTSender.h
@@ -34,6 +34,9 @@ namespace nifi {
 namespace minifi {
 namespace c2 {
 
+#undef RAPIDJSON_ASSERT
+#define RAPIDJSON_ASSERT(x) if(!(x)) throw std::logic_error("rapidjson exception"); //NOLINT
+
 /**
  * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
  *
@@ -45,7 +48,7 @@ namespace c2 {
 class RESTSender : public RESTProtocol, public C2Protocol {
  public:
 
-  explicit RESTSender(std::string name, utils::Identifier uuid = utils::Identifier());
+  explicit RESTSender(const std::string &name, const utils::Identifier &uuid = utils::Identifier());
 
   virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) override;
 
@@ -61,10 +64,11 @@ class RESTSender : public RESTProtocol, public C2Protocol {
 
   std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
 
- private:
-  std::shared_ptr<logging::Logger> logger_;
   std::string rest_uri_;
   std::string ack_uri_;
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
 };
 
 REGISTER_RESOURCE(RESTSender, "Encapsulates the restful protocol that is built upon C2Protocol.");
diff --git a/extensions/http-curl/tests/C2NullConfiguration.cpp b/extensions/http-curl/tests/C2NullConfiguration.cpp
index 02474d1..4850844 100644
--- a/extensions/http-curl/tests/C2NullConfiguration.cpp
+++ b/extensions/http-curl/tests/C2NullConfiguration.cpp
@@ -52,7 +52,7 @@
 #include "processors/LogAttribute.h"
 #include "HTTPIntegrationBase.h"
 
-class VerifyC2Server : public HTTPIntegrationBase {
+class VerifyC2Server : public CoapIntegrationBase {
  public:
   explicit VerifyC2Server(bool isSecure)
       : isSecure(isSecure) {
diff --git a/extensions/http-curl/tests/C2UpdateAgentTest.cpp b/extensions/http-curl/tests/C2UpdateAgentTest.cpp
index d1e83aa..049f506 100644
--- a/extensions/http-curl/tests/C2UpdateAgentTest.cpp
+++ b/extensions/http-curl/tests/C2UpdateAgentTest.cpp
@@ -98,7 +98,7 @@ class ConfigHandler : public CivetHandler {
 int main(int argc, char **argv) {
   mg_init_library(0);
   LogTestController::getInstance().setInfo<minifi::FlowController>();
-  LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
+  LogTestController::getInstance().setTrace<minifi::utils::HTTPClient>();
   LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
   LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
 
@@ -142,6 +142,7 @@ int main(int argc, char **argv) {
 
   configuration->set("c2.enable", "true");
   configuration->set("c2.agent.class", "test");
+  configuration->set("c2.agent.update.allow","true");
   configuration->set("c2.rest.url", "http://localhost:7072/update");
   configuration->set("c2.agent.heartbeat.period", "1000");
   mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
@@ -178,6 +179,7 @@ int main(int argc, char **argv) {
   auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
   std::string logs = LogTestController::getInstance().log_output.str();
   assert(logs.find("removing file") != std::string::npos);
+  assert(logs.find("May not have command processor") != std::string::npos);
   LogTestController::getInstance().reset();
   rmdir("./content_repository");
   assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
diff --git a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
index 60738e4..d299851 100644
--- a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
+++ b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
@@ -117,7 +117,7 @@ class Responder : public CivetHandler {
   bool isSecure;
 };
 
-class VerifyC2Heartbeat : public HTTPIntegrationBase {
+class VerifyC2Heartbeat : public CoapIntegrationBase {
  public:
   explicit VerifyC2Heartbeat(bool isSecure)
       : isSecure(isSecure) {
diff --git a/extensions/http-curl/tests/C2VerifyServeResults.cpp b/extensions/http-curl/tests/C2VerifyServeResults.cpp
index 1d3e03f..601a5a2 100644
--- a/extensions/http-curl/tests/C2VerifyServeResults.cpp
+++ b/extensions/http-curl/tests/C2VerifyServeResults.cpp
@@ -70,7 +70,7 @@ class Responder : public CivetHandler {
   bool isSecure;
 };
 
-class VerifyC2Server : public HTTPIntegrationBase {
+class VerifyC2Server : public CoapIntegrationBase {
  public:
   explicit VerifyC2Server(bool isSecure)
       : isSecure(isSecure) {
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 1bc4c72..85cae26 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -32,16 +32,16 @@ int ssl_enable(void *ssl_context, void *user_data) {
   return 0;
 }
 
-class HTTPIntegrationBase : public IntegrationBase {
+class CoapIntegrationBase : public IntegrationBase {
  public:
-  HTTPIntegrationBase(uint64_t waitTime = 60000)
+  CoapIntegrationBase(uint64_t waitTime = 60000)
       : IntegrationBase(waitTime),
         server(nullptr) {
   }
 
   void setUrl(std::string url, CivetHandler *handler);
 
-  virtual ~HTTPIntegrationBase();
+  virtual ~CoapIntegrationBase();
 
   void shutdownBeforeFlowController() {
     stop_webserver(server);
@@ -51,11 +51,11 @@ class HTTPIntegrationBase : public IntegrationBase {
   CivetServer *server;
 };
 
-HTTPIntegrationBase::~HTTPIntegrationBase() {
+CoapIntegrationBase::~CoapIntegrationBase() {
 
 }
 
-void HTTPIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
+void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
 
   parse_http_components(url, port, scheme, path);
   struct mg_callbacks callback;
diff --git a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
index b908208..de7113b 100644
--- a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
+++ b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
@@ -50,10 +50,10 @@
 #include "HTTPHandlers.h"
 #include "client/HTTPStream.h"
 
-class SiteToSiteTestHarness : public HTTPIntegrationBase {
+class SiteToSiteTestHarness : public CoapIntegrationBase {
  public:
   explicit SiteToSiteTestHarness(bool isSecure)
-      : HTTPIntegrationBase(2000), isSecure(isSecure) {
+      : CoapIntegrationBase(2000), isSecure(isSecure) {
     char format[] = "/tmp/ssth.XXXXXX";
     dir = testController.createTempDirectory(format);
   }
diff --git a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
index 554db26..9b149f9 100644
--- a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
+++ b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
@@ -50,7 +50,7 @@
 #include "../tests/TestServer.h"
 #include "HTTPIntegrationBase.h"
 
-class HttpTestHarness : public HTTPIntegrationBase {
+class HttpTestHarness : public CoapIntegrationBase {
  public:
   HttpTestHarness() {
     char format[] = "/tmp/ssth.XXXXXX";
diff --git a/extensions/http-curl/tests/SiteToSiteRestTest.cpp b/extensions/http-curl/tests/SiteToSiteRestTest.cpp
index 4252564..63721e1 100644
--- a/extensions/http-curl/tests/SiteToSiteRestTest.cpp
+++ b/extensions/http-curl/tests/SiteToSiteRestTest.cpp
@@ -77,7 +77,7 @@ class Responder : public CivetHandler {
   bool isSecure;
 };
 
-class SiteToSiteTestHarness : public HTTPIntegrationBase {
+class SiteToSiteTestHarness : public CoapIntegrationBase {
  public:
   explicit SiteToSiteTestHarness(bool isSecure)
       : isSecure(isSecure) {
diff --git a/extensions/mqtt/processors/ConvertBase.cpp b/extensions/mqtt/processors/ConvertBase.cpp
index 10571d4..02a8d3d 100644
--- a/extensions/mqtt/processors/ConvertBase.cpp
+++ b/extensions/mqtt/processors/ConvertBase.cpp
@@ -26,7 +26,7 @@
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "ConvertBase.h"
-#include "PayloadSerializer.h"
+#include "c2/PayloadSerializer.h"
 #include "utils/ByteArrayCallback.h"
 namespace org {
 namespace apache {
diff --git a/extensions/mqtt/processors/ConvertHeartBeat.cpp b/extensions/mqtt/processors/ConvertHeartBeat.cpp
index cf5574f..a240d16 100644
--- a/extensions/mqtt/processors/ConvertHeartBeat.cpp
+++ b/extensions/mqtt/processors/ConvertHeartBeat.cpp
@@ -26,7 +26,7 @@
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "ConvertHeartBeat.h"
-#include "PayloadSerializer.h"
+#include "c2/PayloadSerializer.h"
 #include "utils/ByteArrayCallback.h"
 namespace org {
 namespace apache {
@@ -49,7 +49,7 @@ void ConvertHeartBeat::onTrigger(const std::shared_ptr<core::ProcessContext> &co
   // while we have heartbeats we can continue to loop.
   while (mqtt_service_->get(100, listening_topic, heartbeat)) {
     if (heartbeat.size() > 0) {
-      c2::C2Payload payload = c2::mqtt::PayloadSerializer::deserialize(heartbeat);
+      c2::C2Payload payload = c2::PayloadSerializer::deserialize(heartbeat);
       auto serialized = serializeJsonRootPayload(payload);
       logger_->log_debug("Converted JSON output %s", serialized);
       minifi::utils::StreamOutputCallback byteCallback(serialized.size() + 1);
diff --git a/extensions/mqtt/processors/ConvertJSONAck.cpp b/extensions/mqtt/processors/ConvertJSONAck.cpp
index 6ea3eaa..a8f978f 100644
--- a/extensions/mqtt/processors/ConvertJSONAck.cpp
+++ b/extensions/mqtt/processors/ConvertJSONAck.cpp
@@ -27,7 +27,7 @@
 #include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
-#include "PayloadSerializer.h"
+#include "c2/PayloadSerializer.h"
 #include "utils/ByteArrayCallback.h"
 namespace org {
 namespace apache {
@@ -98,7 +98,7 @@ void ConvertJSONAck::onTrigger(const std::shared_ptr<core::ProcessContext> &cont
     std::string str(callback.buffer_.data(),callback.buffer_.size());
     auto payload = parseJsonResponse(response_payload, callback.buffer_);
 
-    auto stream = c2::mqtt::PayloadSerializer::serialize(payload);
+    auto stream = c2::PayloadSerializer::serialize(1,payload);
 
     mqtt_service_->send(topic, stream->getBuffer(), stream->getSize());
 
diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.cpp b/extensions/mqtt/protocol/MQTTC2Protocol.cpp
index 7590617..7a5df3d 100644
--- a/extensions/mqtt/protocol/MQTTC2Protocol.cpp
+++ b/extensions/mqtt/protocol/MQTTC2Protocol.cpp
@@ -86,12 +86,12 @@ C2Payload MQTTC2Protocol::serialize(const C2Payload &payload) {
 
   std::lock_guard<std::mutex> lock(input_mutex_);
 
-  auto stream = c2::mqtt::PayloadSerializer::serialize(payload);
+  auto stream = c2::PayloadSerializer::serialize(0x00, payload);
 
   auto transmit_id = mqtt_service_->send(heartbeat_topic_, stream->getBuffer(), stream->getSize());
   std::vector<uint8_t> response;
   if (transmit_id > 0 && mqtt_service_->awaitResponse(5000, transmit_id, in_topic_, response)) {
-    return  c2::mqtt::PayloadSerializer::deserialize(response);
+    return c2::PayloadSerializer::deserialize(response);
   }
   return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
 }
diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.h b/extensions/mqtt/protocol/MQTTC2Protocol.h
index 45c7938..47e04a6 100644
--- a/extensions/mqtt/protocol/MQTTC2Protocol.h
+++ b/extensions/mqtt/protocol/MQTTC2Protocol.h
@@ -30,7 +30,7 @@
 #include "c2/C2Protocol.h"
 #include "io/BaseStream.h"
 #include "agent/agent_version.h"
-#include "PayloadSerializer.h"
+#include "c2/PayloadSerializer.h"
 
 namespace org {
 namespace apache {
diff --git a/fedora.sh b/fedora.sh
index 0bf7d4b..db45485 100644
--- a/fedora.sh
+++ b/fedora.sh
@@ -68,6 +68,9 @@ build_deps(){
             INSTALLED+=("python3-devel")
           elif [ "$FOUND_VALUE" = "lua" ]; then
             INSTALLED+=("lua-devel")
+          elif [ "$FOUND_VALUE" = "automake" ]; then
+            INSTALLED+=("autoconf")
+            INSTALLED+=("automake")
           elif [ "$FOUND_VALUE" = "gpsd" ]; then
             INSTALLED+=("gpsd-devel")
           elif [ "$FOUND_VALUE" = "libarchive" ]; then
diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h
index f804c2b..974750a 100644
--- a/libminifi/include/c2/C2Payload.h
+++ b/libminifi/include/c2/C2Payload.h
@@ -44,6 +44,10 @@ enum Operation {
   TRANSFER
 };
 
+#define PAYLOAD_NO_STATUS 0
+#define PAYLOAD_SUCCESS 1
+#define PAYLOAD_FAILURE 2
+
 enum Direction {
   TRANSMIT,
   RECEIVE
@@ -108,7 +112,9 @@ class C2Payload : public state::Update {
 
   }
 
-  C2Payload(Operation op, std::string identifier, bool resp = false, bool isRaw = false);
+  C2Payload(Operation op, const std::string &identifier, bool resp = false, bool isRaw = false);
+
+  C2Payload(Operation op, state::UpdateState state,const std::string &identifier, bool resp = false, bool isRaw = false);
 
   C2Payload(Operation op, bool resp = false, bool isRaw = false);
 
diff --git a/libminifi/include/c2/C2Protocol.h b/libminifi/include/c2/C2Protocol.h
index 71cdd17..57e36b9 100644
--- a/libminifi/include/c2/C2Protocol.h
+++ b/libminifi/include/c2/C2Protocol.h
@@ -34,7 +34,7 @@ namespace c2 {
 class C2Protocol : public core::Connectable {
  public:
 
-  C2Protocol(std::string name, utils::Identifier &uuid)
+  C2Protocol(const std::string &name, const utils::Identifier &uuid)
       : core::Connectable(name, uuid),
         running_(true) {
 
diff --git a/libminifi/include/c2/PayloadParser.h b/libminifi/include/c2/PayloadParser.h
new file mode 100644
index 0000000..a64eb09
--- /dev/null
+++ b/libminifi/include/c2/PayloadParser.h
@@ -0,0 +1,188 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_PAYLOADPARSER_H_
+#define LIBMINIFI_INCLUDE_C2_PAYLOADPARSER_H_
+
+#include "C2Payload.h"
+#include "core/state/Value.h"
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+class PayloadParseException : public std::runtime_error {
+ public:
+  PayloadParseException(const std::string &msg)
+      : std::runtime_error(msg) {
+  }
+
+};
+
+template<typename T, typename C>
+class convert_if_base {
+ protected:
+  const std::shared_ptr<state::response::Value> node_;
+  explicit convert_if_base(const std::shared_ptr<state::response::Value> &node)
+      : node_(node) {
+  }
+ public:
+  T operator()() const {
+    if (auto sub_type = std::dynamic_pointer_cast<C>(node_))
+      return sub_type->getValue();
+    throw PayloadParseException("No known type");
+  }
+};
+
+template<typename T>
+struct convert_if {
+  explicit convert_if(const std::shared_ptr<state::response::Value> &node) {
+  }
+
+
+  std::string operator()() const {
+    throw PayloadParseException("No known type");
+  }
+};
+
+template<>
+struct convert_if<std::string> : public convert_if_base<std::string, state::response::Value> {
+  explicit convert_if(const std::shared_ptr<state::response::Value> &node)
+      : convert_if_base(node) {
+  }
+
+  std::string operator()() const {
+    return node_->getStringValue();
+  }
+};
+
+template<>
+struct convert_if<uint64_t> : public convert_if_base<uint64_t, state::response::UInt64Value> {
+  explicit convert_if(const std::shared_ptr<state::response::Value> &node)
+      : convert_if_base(node) {
+  }
+};
+
+template<>
+struct convert_if<int64_t> : public convert_if_base<int64_t, state::response::Int64Value> {
+  explicit convert_if(const std::shared_ptr<state::response::Value> &node)
+      : convert_if_base(node) {
+  }
+};
+
+template<>
+struct convert_if<int> : public convert_if_base<int, state::response::IntValue> {
+  explicit convert_if(const std::shared_ptr<state::response::Value> &node)
+      : convert_if_base(node) {
+  }
+};
+
+template<>
+struct convert_if<bool> : public convert_if_base<bool, state::response::BoolValue> {
+  explicit convert_if(const std::shared_ptr<state::response::Value> &node)
+      : convert_if_base(node) {
+  }
+};
+
+/**
+ * Defines a fluent parser that uses Exception management for flow control.
+ *
+ * Note that this isn't functionally complete.
+ */
+class PayloadParser {
+
+ public:
+
+  static PayloadParser getInstance(const C2Payload &payload) {
+    return PayloadParser(payload);
+  }
+
+  inline PayloadParser in(const std::string &payload) {
+    for (const auto &pl : ref_.getNestedPayloads()) {
+      if (pl.getLabel() == payload || pl.getIdentifier() == payload) {
+        return PayloadParser(pl);
+      }
+    }
+    throw PayloadParseException("Invalid payload. Could not find " + payload);
+  }
+
+  template<typename Functor>
+  inline void foreach(Functor f) {
+    for (const auto &component : ref_.getNestedPayloads()) {
+      f(component);
+    }
+  }
+
+  template<typename T>
+  inline T getAs(const std::string &field) {
+    for (const auto &cmd : ref_.getContent()) {
+      auto exists = cmd.operation_arguments.find(field);
+      if (exists != cmd.operation_arguments.end()) {
+        return convert_if<T>(exists->second.getValue())();
+      }
+    }
+    std::stringstream ss;
+    ss << "Invalid Field. Could not find " << field << " in " << ref_.getLabel();
+    throw PayloadParseException(ss.str());
+  }
+
+  template<typename T>
+  inline T getAs(const std::string &field, const T &fallback) {
+    for (const auto &cmd : ref_.getContent()) {
+      auto exists = cmd.operation_arguments.find(field);
+      if (exists != cmd.operation_arguments.end()) {
+        return convert_if<T>(exists->second.getValue())();
+      }
+    }
+    return fallback;
+  }
+
+  size_t getSize() const {
+    return ref_.getNestedPayloads().size();
+  }
+
+  /**
+   * Make these explicitly public.
+   */
+
+  PayloadParser(const PayloadParser &p) = delete;
+  const PayloadParser &operator=(const PayloadParser &p) = delete;
+  PayloadParser(PayloadParser &&parser) = default;
+
+ private:
+
+  PayloadParser(const C2Payload &payload)
+      : ref_(payload) {
+  }
+
+  const C2Payload &ref_;
+
+  std::vector<std::string> fields_;
+
+  std::string component_to_get_;
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_PAYLOADPARSER_H_ */
diff --git a/extensions/mqtt/protocol/PayloadSerializer.h b/libminifi/include/c2/PayloadSerializer.h
similarity index 95%
rename from extensions/mqtt/protocol/PayloadSerializer.h
rename to libminifi/include/c2/PayloadSerializer.h
index fbc5d38..61b7e73 100644
--- a/extensions/mqtt/protocol/PayloadSerializer.h
+++ b/libminifi/include/c2/PayloadSerializer.h
@@ -27,7 +27,6 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 namespace c2 {
-namespace mqtt {
 
 class PayloadSerializer {
  public:
@@ -37,6 +36,11 @@ class PayloadSerializer {
    */
   static void serializeValueNode(state::response::ValueNode &value, std::shared_ptr<io::BaseStream> stream) {
     auto base_type = value.getValue();
+    if (!base_type) {
+      uint8_t type = 0;
+      stream->write(&type, 1);
+      return;
+    }
     uint8_t type = 0x00;
     if (auto sub_type = std::dynamic_pointer_cast<state::response::IntValue>(base_type)) {
       type = 1;
@@ -64,13 +68,13 @@ class PayloadSerializer {
       stream->writeUTF(str);
     }
   }
-  static void serialize(uint8_t op, const C2Payload &payload, std::shared_ptr<io::BaseStream> stream) {
+  static void serialize(uint16_t op, const C2Payload &payload, std::shared_ptr<io::BaseStream> stream) {
     uint8_t st;
     uint32_t size = payload.getNestedPayloads().size();
     stream->write(size);
     for (auto nested_payload : payload.getNestedPayloads()) {
       op = opToInt(nested_payload.getOperation());
-      stream->write(&op, 1);
+      stream->write(op);
       stream->write(&st, 1);
       stream->writeUTF(nested_payload.getLabel());
       stream->writeUTF(nested_payload.getIdentifier());
@@ -126,11 +130,13 @@ class PayloadSerializer {
     }
     return op;
   }
-  static std::shared_ptr<io::BaseStream> serialize(const C2Payload &payload) {
+  static std::shared_ptr<io::BaseStream> serialize(uint16_t version, const C2Payload &payload) {
     std::shared_ptr<io::BaseStream> stream = std::make_shared<io::BaseStream>();
-    uint8_t op, st = 0;
+    uint16_t op = 0;
+    uint8_t st = 0;
     op = opToInt(payload.getOperation());
-    stream->write(&op, 1);
+    stream->write(version);
+    stream->write(op);
     if (payload.getStatus().getState() == state::UpdateState::NESTED) {
       st = 1;
       stream->write(&st, 1);
@@ -242,11 +248,12 @@ class PayloadSerializer {
     io::BaseStream stream(&dataStream);
 
     uint8_t op, st = 0;
-    ;
+    uint16_t version = 0;
 
     std::string identifier, label;
     // read op
     stream.read(op);
+    stream.read(version);
     stream.read(st);
     stream.readUTF(label);
     stream.readUTF(identifier);
@@ -308,7 +315,6 @@ class PayloadSerializer {
   virtual ~PayloadSerializer();
 };
 
-} /* namespace mqtt */
 } /* namespace c2 */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/libminifi/include/c2/protocols/RESTProtocol.h b/libminifi/include/c2/protocols/RESTProtocol.h
index 251622c..6d848ec 100644
--- a/libminifi/include/c2/protocols/RESTProtocol.h
+++ b/libminifi/include/c2/protocols/RESTProtocol.h
@@ -18,6 +18,16 @@
 #ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
 #define LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
 
+
+
+#include <stdexcept>
+
+
+#ifdef RAPIDJSON_ASSERT
+#undef RAPIDJSON_ASSERT
+#endif
+#define RAPIDJSON_ASSERT(x) if(!(x)) throw std::logic_error("rapidjson exception"); //NOLINT
+
 #include "rapidjson/document.h"
 #include "rapidjson/writer.h"
 #include "rapidjson/stringbuffer.h"
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 1b6b073..2e1f79e 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -410,7 +410,7 @@ void FlowController::initializeC2() {
   }
 
   std::string identifier_str;
-  if (!configuration_->get("nifi.c2.agent.identifier", identifier_str) || identifier_str.empty()) {
+  if (!configuration_->get("nifi.c2.agent.identifier", "c2.agent.identifier", identifier_str) || identifier_str.empty()) {
     // set to the flow controller's identifier
     identifier_str = uuidStr_;
   }
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index ea5802f..6362902 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -73,8 +73,16 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
           do {
             const C2Payload payload(std::move(requests.back()));
             requests.pop_back();
-            C2Payload && response = protocol_.load()->consumePayload(payload);
-            enqueue_c2_server_response(std::move(response));
+            try {
+              C2Payload && response = protocol_.load()->consumePayload(payload);
+              enqueue_c2_server_response(std::move(response));
+            }
+            catch(const std::exception &e) {
+              logger_->log_error("Exception occurred while consuming payload. error: %s", e.what());
+            }
+            catch(...) {
+              logger_->log_error("Unknonwn exception occurred while consuming payload.");
+            }
           }while(requests.size() > 0 && ++count < max_c2_responses);
         }
         request_mutex.unlock();
@@ -82,7 +90,15 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
 
       if ( time_since > heart_beat_period_ ) {
         last_run_ = now;
-        performHeartBeat();
+        try {
+          performHeartBeat();
+        }
+        catch(const std::exception &e) {
+          logger_->log_error("Exception occurred while performing heartbeat. error: %s", e.what());
+        }
+        catch(...) {
+          logger_->log_error("Unknonwn exception occurred while performing heartbeat.");
+        }
       }
 
       checkTriggers();
@@ -416,8 +432,6 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
       break;
     case Operation::UPDATE: {
       handle_update(resp);
-      C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
-      enqueue_c2_response(std::move(response));
     }
       break;
 
@@ -603,7 +617,7 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
       unlink(file_path.c_str());
       // if we can apply the update, we will acknowledge it and then backup the configuration file.
       if (update_sink_->applyUpdate(urlStr, raw_data_str)) {
-        C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+        C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::FULLY_APPLIED, resp.ident, false, true);
         enqueue_c2_response(std::move(response));
 
         if (persist != resp.operation_arguments.end() && utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true")) {
@@ -647,8 +661,7 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
         }
       } else {
         logger_->log_debug("update failed.");
-        std::cout << raw_data_str << std::endl;
-        C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+        C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::SET_ERROR, resp.ident, false, true);
         enqueue_c2_response(std::move(response));
       }
       // send
@@ -658,7 +671,7 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
       if (update_text != resp.operation_arguments.end()) {
         if (update_sink_->applyUpdate(url->second.to_string(), update_text->second.to_string()) != 0 && persist != resp.operation_arguments.end()
             && utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true")) {
-          C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+          C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::FULLY_APPLIED, resp.ident, false, true);
           enqueue_c2_response(std::move(response));
           // update nifi.flow.configuration.file=./conf/config.yml
           std::string config_file;
@@ -683,7 +696,7 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
             writer.close();
           }
         } else {
-          C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+          C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::SET_ERROR, resp.ident, false, true);
           enqueue_c2_response(std::move(response));
         }
       }
@@ -714,17 +727,19 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
 
     if (resp.operation_arguments.size() > 0)
       configure(running_c2_configuration);
-    C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+    C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::FULLY_APPLIED, resp.ident, false, true);
     enqueue_c2_response(std::move(response));
   } else if (resp.name == "agent") {
     // we are upgrading the agent. therefore we must be given a location
     auto location = resp.operation_arguments.find("location");
     auto isPartialStr = resp.operation_arguments.find("partial");
+
     bool partial_update = false;
     if (isPartialStr != std::end(resp.operation_arguments)) {
       partial_update = utils::StringUtils::equalsIgnoreCase(isPartialStr->second.to_string(), "true");
     }
     if (location != resp.operation_arguments.end()) {
+      logger_->log_trace("Update agent with location %s", location->second.to_string());
       // we will not have a raw payload
       C2Payload payload(Operation::TRANSFER, false, true);
 
@@ -734,13 +749,16 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
 
       std::string file_path = std::string(raw_data.data(), raw_data.size());
 
+      logger_->log_trace("Update requested with file %s", file_path);
+
       // acknowledge the transfer. For a transfer, the response identifier should be the checksum of the
       // file transferred.
-      C2Payload transfer_response(Operation::ACKNOWLEDGE, response.getIdentifier(), false, true);
+      C2Payload transfer_response(Operation::ACKNOWLEDGE, state::UpdateState::FULLY_APPLIED, response.getIdentifier(), false, true);
 
       protocol_.load()->consumePayload(std::move(transfer_response));
 
       if (allow_updates_) {
+        logger_->log_trace("Update allowed from file %s", file_path);
         if (partial_update && !bin_location_.empty()) {
           utils::file::DiffUtils::apply_binary_diff(bin_location_.c_str(), file_path.c_str(), update_location_.c_str());
         } else {
@@ -750,8 +768,16 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
         logger_->log_trace("removing file %s", file_path);
         unlink(file_path.c_str());
         update_agent();
+      } else {
+        logger_->log_trace("Update disallowed from file %s", file_path);
       }
+
+    } else {
+      logger_->log_trace("No location present");
     }
+  } else {
+    C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::NOT_APPLIED, resp.ident, false, true);
+    enqueue_c2_response(std::move(response));
   }
 }
 
diff --git a/libminifi/src/c2/C2Payload.cpp b/libminifi/src/c2/C2Payload.cpp
index 405de73..61487ee 100644
--- a/libminifi/src/c2/C2Payload.cpp
+++ b/libminifi/src/c2/C2Payload.cpp
@@ -75,8 +75,8 @@ C2ContentResponse &C2ContentResponse::operator=(const C2ContentResponse &other)
   return *this;
 }
 
-C2Payload::C2Payload(Operation op, std::string identifier, bool resp, bool isRaw)
-    : state::Update(state::UpdateStatus(state::UpdateState::INITIATE, 0)),
+C2Payload::C2Payload(Operation op, state::UpdateState state, const std::string &identifier, bool resp, bool isRaw)
+    : state::Update(state::UpdateStatus(state, 0)),
       op_(op),
       raw_(isRaw),
       ident_(identifier),
@@ -85,6 +85,10 @@ C2Payload::C2Payload(Operation op, std::string identifier, bool resp, bool isRaw
       is_collapsible_(true) {
 }
 
+C2Payload::C2Payload(Operation op, const std::string &identifier, bool resp, bool isRaw)
+    : C2Payload(op, state::UpdateState::FULLY_APPLIED, identifier, resp, isRaw) {
+}
+
 C2Payload::C2Payload(Operation op, bool resp, bool isRaw)
     : state::Update(state::UpdateStatus(state::UpdateState::INITIATE, 0)),
       op_(op),
diff --git a/extensions/mqtt/protocol/PayloadSerializer.cpp b/libminifi/src/c2/PayloadSerializer.cpp
similarity index 93%
rename from extensions/mqtt/protocol/PayloadSerializer.cpp
rename to libminifi/src/c2/PayloadSerializer.cpp
index 5f62227..107b7d1 100644
--- a/extensions/mqtt/protocol/PayloadSerializer.cpp
+++ b/libminifi/src/c2/PayloadSerializer.cpp
@@ -15,14 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "PayloadSerializer.h"
+#include "c2/PayloadSerializer.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace c2 {
-namespace mqtt {
 
 PayloadSerializer::PayloadSerializer() {
 }
@@ -30,7 +29,6 @@ PayloadSerializer::PayloadSerializer() {
 PayloadSerializer::~PayloadSerializer() {
 }
 
-} /* namespace mqtt */
 } /* namespace c2 */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp
index 297b76d..d081510 100644
--- a/libminifi/src/c2/protocols/RESTProtocol.cpp
+++ b/libminifi/src/c2/protocols/RESTProtocol.cpp
@@ -51,7 +51,7 @@ const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const
         identifier = root["identifier"].GetString();
       }
       if (root["requested_operations"].Size() == 0 && root["requestedOperations"].Size() == 0)
-        return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true));
+        return C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
 
       C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true);
 
@@ -129,7 +129,7 @@ const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const
   } catch (...) {
   }
 #endif
-  return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true));
+  return C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
 }
 
 void setJsonStr(const std::string& key, const state::response::ValueNode& value, rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) {  // NOLINT
diff --git a/libminifi/test/coap-tests/CMakeLists.txt b/libminifi/test/coap-tests/CMakeLists.txt
new file mode 100644
index 0000000..4da8737
--- /dev/null
+++ b/libminifi/test/coap-tests/CMakeLists.txt
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB COAP_INTEGRATION_TESTS  "*.cpp")
+SET(COAP_TEST_COUNT 0)
+FOREACH(testfile ${COAP_INTEGRATION_TESTS})
+  	get_filename_component(testfilename "${testfile}" NAME_WE)
+  	add_executable("${testfilename}" "${testfile}")
+  	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/coap/")
+	createTests("${testfilename}")	
+  	if (APPLE)
+		target_link_libraries (${testfilename} -Wl,-all_load minifi-coap)
+	else ()
+	  	target_link_libraries (${testfilename} -Wl,--whole-archive minifi-coap -Wl,--no-whole-archive)
+	endif ()
+	MATH(EXPR COAP_TEST_COUNT "${COAP_TEST_COUNT}+1")
+	add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
+ENDFOREACH()
+message("-- Finished building ${COAP_TEST_COUNT} COAP related test file(s)...")
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 4d7e303..95e14c4 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -35,7 +35,7 @@ class IntegrationBase {
 
   virtual ~IntegrationBase();
 
-  void run(std::string test_file_location);
+  virtual void run(std::string test_file_location);
 
   void setKeyDir(const std::string key_dir) {
     this->key_dir = key_dir;
@@ -123,7 +123,6 @@ void IntegrationBase::run(std::string test_file_location) {
 
   shutdownBeforeFlowController();
   controller->unload();
-
   runAssertions();
 
   cleanup();
diff --git a/libminifi/test/resources/CoapC2VerifyServe.yml b/libminifi/test/resources/CoapC2VerifyServe.yml
new file mode 100644
index 0000000..1be926a
--- /dev/null
+++ b/libminifi/test/resources/CoapC2VerifyServe.yml
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+    name: MiNiFi Flow
+    id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+    - name: invoke
+      id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      class: org.apache.nifi.processors.standard.InvokeHTTP
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+          HTTP Method: GET
+          Remote URL: http://localhost:9888/geturl
+    - name: LogAttribute
+      id: 2438e3c8-015a-1000-79ca-83af40ec1992
+      class: org.apache.nifi.processors.standard.LogAttribute
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list: response
+      Properties:
+        Log Level: info
+        Log Payload: true
+
+Connections:
+    - name: TransferFilesToRPG
+      id: 2438e3c8-015a-1000-79ca-83af40ec1997
+      source name: invoke
+      source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      source relationship name: success
+      destination name: LogAttribute
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec
+    - name: TransferFilesToRPG2
+      id: 2438e3c8-015a-1000-79ca-83af40ec1917
+      source name: LogAttribute
+      source id: 2438e3c8-015a-1000-79ca-83af40ec1992
+      destination name: LogAttribute
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992  
+      source relationship name: success
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec
+
+Remote Processing Groups:
+    
diff --git a/libminifi/test/unit/PayloadParserTests.cpp b/libminifi/test/unit/PayloadParserTests.cpp
new file mode 100644
index 0000000..9ce6757
--- /dev/null
+++ b/libminifi/test/unit/PayloadParserTests.cpp
@@ -0,0 +1,115 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <vector>
+#include "c2/C2Payload.h"
+#include "c2/PayloadParser.h"
+#include "../TestBase.h"
+
+TEST_CASE("Test Valid Payload", "[tv1]") {
+  std::string ident = "identifier";
+  std::string cheese = "cheese";
+  std::string chips = "chips";
+  minifi::c2::C2Payload payload(minifi::c2::Operation::ACKNOWLEDGE, ident);
+  minifi::c2::C2Payload payload2(minifi::c2::Operation::ACKNOWLEDGE, minifi::state::UpdateState::FULLY_APPLIED, cheese);
+  minifi::c2::C2ContentResponse response(minifi::c2::Operation::ACKNOWLEDGE);
+  response.operation_arguments["type"] = "munster";
+  payload2.addContent(std::move(response));
+  payload.addPayload(std::move(payload2));
+  payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::ACKNOWLEDGE, chips));
+  REQUIRE("munster" == minifi::c2::PayloadParser::getInstance(payload).in("cheese").getAs<std::string>("type"));
+}
+
+TEST_CASE("Test Invalid not found", "[tv2]") {
+  std::string ident = "identifier";
+  std::string cheese = "cheese";
+  std::string chips = "chips";
+  minifi::c2::C2Payload payload(minifi::c2::Operation::ACKNOWLEDGE, ident);
+  minifi::c2::C2Payload payload2(minifi::c2::Operation::ACKNOWLEDGE, cheese);
+  minifi::c2::C2ContentResponse response(minifi::c2::Operation::ACKNOWLEDGE);
+  response.operation_arguments["typeS"] = "munster";
+  payload2.addContent(std::move(response));
+  payload.addPayload(std::move(payload2));
+  payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::ACKNOWLEDGE, chips));
+  REQUIRE_THROWS_AS(minifi::c2::PayloadParser::getInstance(payload).in("cheese").getAs<std::string>("type"), minifi::c2::PayloadParseException);
+}
+
+
+TEST_CASE("Test Invalid coercion", "[tv3]") {
+  std::string ident = "identifier";
+  std::string cheese = "cheese";
+  std::string chips = "chips";
+  minifi::c2::C2Payload payload(minifi::c2::Operation::ACKNOWLEDGE, ident);
+  minifi::c2::C2Payload payload2(minifi::c2::Operation::ACKNOWLEDGE, minifi::state::UpdateState::FULLY_APPLIED, cheese);
+  minifi::c2::C2ContentResponse response(minifi::c2::Operation::ACKNOWLEDGE);
+  response.operation_arguments["type"] = "munster";
+  payload2.addContent(std::move(response));
+  payload.addPayload(std::move(payload2));
+  payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::ACKNOWLEDGE, chips));
+  REQUIRE_THROWS_AS(minifi::c2::PayloadParser::getInstance(payload).in("cheese").getAs<uint64_t>("type"), minifi::c2::PayloadParseException);
+}
+
+TEST_CASE("Test Invalid not there", "[tv4]") {
+  std::string ident = "identifier";
+  std::string cheese = "cheese";
+  std::string chips = "chips";
+  minifi::c2::C2Payload payload(minifi::c2::Operation::ACKNOWLEDGE, ident);
+  minifi::c2::C2Payload payload2(minifi::c2::Operation::ACKNOWLEDGE, minifi::state::UpdateState::FULLY_APPLIED, cheese);
+  minifi::c2::C2ContentResponse response(minifi::c2::Operation::ACKNOWLEDGE);
+  response.operation_arguments["type"] = "munster";
+  payload2.addContent(std::move(response));
+  payload.addPayload(std::move(payload2));
+  payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::ACKNOWLEDGE, chips));
+  REQUIRE_THROWS_AS(minifi::c2::PayloadParser::getInstance(payload).in("cheeses").getAs<uint64_t>("type"), minifi::c2::PayloadParseException);
+}
+
+TEST_CASE("Test typed conversions", "[tv5]") {
+  std::string ident = "identifier";
+  std::string cheese = "cheese";
+  std::string chips = "chips";
+  uint64_t size = 233;
+  bool isvalid = false;
+  minifi::c2::C2Payload payload(minifi::c2::Operation::ACKNOWLEDGE, ident);
+  minifi::c2::C2Payload payload2(minifi::c2::Operation::ACKNOWLEDGE, minifi::state::UpdateState::FULLY_APPLIED, cheese);
+  minifi::c2::C2ContentResponse response(minifi::c2::Operation::ACKNOWLEDGE);
+  response.operation_arguments["type"] = "munster";
+  response.operation_arguments["isvalid"] = isvalid;
+  response.operation_arguments["size"] = size;
+  payload2.addContent(std::move(response));
+  payload.addPayload(std::move(payload2));
+  payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::ACKNOWLEDGE, chips));
+  REQUIRE("munster" == minifi::c2::PayloadParser::getInstance(payload).in("cheese").getAs<std::string>("type"));
+  REQUIRE(233 == minifi::c2::PayloadParser::getInstance(payload).in("cheese").getAs<uint64_t>("size"));
+  REQUIRE(false == minifi::c2::PayloadParser::getInstance(payload).in("cheese").getAs<bool>("isvalid"));
+}
+
+
+TEST_CASE("Test Invalid not there deep", "[tv6]") {
+  std::string ident = "identifier";
+  std::string cheese = "cheese";
+  std::string chips = "chips";
+  minifi::c2::C2Payload payload(minifi::c2::Operation::ACKNOWLEDGE, ident);
+  minifi::c2::C2Payload payload2(minifi::c2::Operation::ACKNOWLEDGE, minifi::state::UpdateState::FULLY_APPLIED, cheese);
+  minifi::c2::C2ContentResponse response(minifi::c2::Operation::ACKNOWLEDGE);
+  response.operation_arguments["type"] = "munster";
+  payload2.addContent(std::move(response));
+  payload.addPayload(std::move(payload2));
+  payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::ACKNOWLEDGE, chips));
+  REQUIRE_THROWS_AS(minifi::c2::PayloadParser::getInstance(payload).in("chips").getAs<uint64_t>("type"), minifi::c2::PayloadParseException);
+}