You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2019/02/17 21:49:24 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-558: Initial
provisioning for CoAP C2
This is an automated email from the ASF dual-hosted git repository.
aldrin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 9c99cea MINIFICPP-558: Initial provisioning for CoAP C2
9c99cea is described below
commit 9c99cea2381b0bea19a6fa46bc11f53e8f87dc67
Author: Marc Parisi <ph...@apache.org>
AuthorDate: Tue Oct 23 11:51:19 2018 -0400
MINIFICPP-558: Initial provisioning for CoAP C2
This closes #447.
Signed-off-by: Aldrin Piri <al...@apache.org>
---
C2.md | 25 +-
CMakeLists.txt | 7 +-
CMakeSettings.json | 13 +-
appveyor.yml | 2 +-
bootstrap.sh | 8 +-
bstrp_functions.sh | 6 +-
centos.sh | 2 +
darwin.sh | 4 +
debian.sh | 2 +
docker/test/integration/minifi/test/__init__.py | 1 -
extensions/bustache/CMakeLists.txt | 4 +-
extensions/coap/CMakeLists.txt | 93 +++++++
.../PayloadSerializer.cpp => coap/COAPLoader.cpp} | 24 +-
extensions/coap/COAPLoader.h | 81 ++++++
.../coap/controllerservice/CoapConnector.cpp | 99 +++++++
extensions/coap/controllerservice/CoapConnector.h | 138 ++++++++++
extensions/coap/controllerservice/CoapMessaging.h | 113 ++++++++
extensions/coap/controllerservice/CoapResponse.h | 114 ++++++++
extensions/coap/nanofi/coap_connection.c | 126 +++++++++
extensions/coap/nanofi/coap_connection.h | 68 +++++
extensions/coap/nanofi/coap_functions.c | 208 ++++++++++++++
extensions/coap/nanofi/coap_functions.h | 126 +++++++++
.../nanofi/coap_message.c} | 25 +-
.../nanofi/coap_message.h} | 49 ++--
extensions/coap/nanofi/coap_server.c | 76 ++++++
extensions/coap/nanofi/coap_server.h | 83 ++++++
extensions/coap/protocols/CoapC2Protocol.cpp | 300 +++++++++++++++++++++
extensions/coap/protocols/CoapC2Protocol.h | 149 ++++++++++
.../server/CoapServer.cpp} | 21 +-
extensions/coap/server/CoapServer.h | 245 +++++++++++++++++
extensions/coap/tests/CMakeLists.txt | 55 ++++
.../tests/CoapC2VerifyHeartbeat.cpp} | 92 ++++++-
.../tests/CoapIntegrationBase.h} | 51 +++-
extensions/gps/CMakeLists.txt | 4 +-
extensions/http-curl/client/HTTPClient.cpp | 13 +-
extensions/http-curl/protocols/RESTSender.cpp | 6 +-
extensions/http-curl/protocols/RESTSender.h | 10 +-
extensions/http-curl/tests/C2NullConfiguration.cpp | 2 +-
extensions/http-curl/tests/C2UpdateAgentTest.cpp | 4 +-
.../http-curl/tests/C2VerifyHeartbeatAndStop.cpp | 2 +-
.../http-curl/tests/C2VerifyServeResults.cpp | 2 +-
extensions/http-curl/tests/HTTPIntegrationBase.h | 10 +-
extensions/http-curl/tests/HTTPSiteToSiteTests.cpp | 4 +-
.../http-curl/tests/HttpPostIntegrationTest.cpp | 2 +-
extensions/http-curl/tests/SiteToSiteRestTest.cpp | 2 +-
extensions/mqtt/processors/ConvertBase.cpp | 2 +-
extensions/mqtt/processors/ConvertHeartBeat.cpp | 4 +-
extensions/mqtt/processors/ConvertJSONAck.cpp | 4 +-
extensions/mqtt/protocol/MQTTC2Protocol.cpp | 4 +-
extensions/mqtt/protocol/MQTTC2Protocol.h | 2 +-
fedora.sh | 3 +
libminifi/include/c2/C2Payload.h | 8 +-
libminifi/include/c2/C2Protocol.h | 2 +-
libminifi/include/c2/PayloadParser.h | 188 +++++++++++++
.../include/c2}/PayloadSerializer.h | 22 +-
libminifi/include/c2/protocols/RESTProtocol.h | 10 +
libminifi/src/FlowController.cpp | 2 +-
libminifi/src/c2/C2Agent.cpp | 50 +++-
libminifi/src/c2/C2Payload.cpp | 8 +-
.../src/c2}/PayloadSerializer.cpp | 4 +-
libminifi/src/c2/protocols/RESTProtocol.cpp | 4 +-
libminifi/test/coap-tests/CMakeLists.txt | 35 +++
libminifi/test/integration/IntegrationBase.h | 3 +-
libminifi/test/resources/CoapC2VerifyServe.yml | 73 +++++
libminifi/test/unit/PayloadParserTests.cpp | 115 ++++++++
65 files changed, 2858 insertions(+), 156 deletions(-)
diff --git a/C2.md b/C2.md
index c6c1572..aa7e322 100644
--- a/C2.md
+++ b/C2.md
@@ -141,9 +141,28 @@ configuration produces the following JSON:
### Protocols
The default protocol is a RESTFul service; however, there is an MQTT protocol with a translation to use the
-RESTFul C2 server. This is useful for cases where an MQTT C2 server isn't available, or enclave partioning
-requires a single ingress/egress through a gateway. In these classes of devices, MQTT can be used as the intermediate
-or RESTSender can be used for C2 operations.
+RESTFul C2 server and a CoAP Protocol implementation. The CoAP protocol requires that COAP be enabled either
+through the bootstrap or the cmake flag -DENABLE_COAP=ON .
+
+Once configured, COAP uses a controller service within the flow OR minifi properties entries: nifi.c2.agent.coap.host and nifi.c2.agent.coap.port.
+Note that with CoAP, the payload will be significantly smaller, paring down metrics that are sent in each heartbeat. This will be useful for
+constrained environments.
+
+ nifi.c2.agent.coap.host=hostname
+ nifi.c2.agent.coap.port=<port number>
+
+
+If you wish to use the Controller service you made add a controller service named CoapConnectorService with the properties in the example config
+below. Note that Max Queue Size is the only non-required property:
+
+ Controller Services:
+ - id: 94491a38-015a-1000-0000-000000000001
+ name: coapservice
+ class: CoapConnectorService
+ Properties:
+ Remote Server: server
+ Remote Port: port
+ Max Queue Size: 1000
As defined, above, MQTTC2Protocol can be used for the agent protocol class. If you wish to communicate with a RESTFul C2 server,
you may use the ConvertBase, ConvertHeartBeat, ConvertJSONAack, and ConvertUpdate classes on an agent to perform the translation.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a500992..a00d4c1 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -27,7 +27,7 @@ option(SKIP_TESTS "Skips building all tests." OFF)
option(PORTABLE "Instructs the compiler to remove architecture specific optimizations" ON)
option(USE_SYSTEM_OPENSSL "Instructs the build system to search for and use an SSL library available in the host system" ON)
option(OPENSSL_OFF "Disables OpenSSL" OFF)
-option(ENABLE_OPS "Enable Operations Tools" ON)
+option(ENABLE_OPS "Enable Operations/zlib Tools" ON)
option(USE_SYSTEM_UUID "Instructs the build system to search for and use an UUID library available in the host system" OFF)
option(USE_SYSTEM_CURL "Instructs the build system to search for and use a cURL library available in the host system" ON)
option(BUILD_SHARED_LIBS "Build yaml cpp shared lib" OFF)
@@ -419,6 +419,11 @@ if (ENABLE_ALL OR ENABLE_GPS)
createExtension(GPS-EXTENSION "GPS EXTENSIONS" "Enables LibGPS Functionality and the GetGPS processor." "extensions/gps" "${TEST_DIR}/gps-tests")
endif()
+option(ENABLE_COAP "Enables the CoAP extension." OFF)
+if (ENABLE_ALL OR ENABLE_COAP STREQUAL "ON")
+ createExtension(COAP-EXTENSION "COAP EXTENSIONS" "Enables LibCOAP Functionality." "extensions/coap" "extensions/coap/tests/")
+endif()
+
if (WIN32)
option(ENABLE_WEL "Enables the suite of Windows Event Log extensions." OFF)
if (ENABLE_ALL OR ENABLE_WEL)
diff --git a/CMakeSettings.json b/CMakeSettings.json
index e58a362..486eff5 100644
--- a/CMakeSettings.json
+++ b/CMakeSettings.json
@@ -44,6 +44,10 @@
"value": "FALSE"
},
{
+ "name": "ENABLE_COAP",
+ "value": "OFF"
+ },
+ {
"name": "ENABLE_WEL",
"value": "TRUE"
},
@@ -75,7 +79,6 @@
"name": "SKIP_TESTS",
"value": "TRUE"
}
-
],
"ctestCommandArgs": ""
},
@@ -139,6 +142,14 @@
"value": "OFF"
},
{
+ "name": "FORCE_WINDOWS",
+ "value": "ON"
+ },
+ {
+ "name": "ENABLE_COAP",
+ "value": "OFF"
+ },
+ {
"name": "DISABLE_LIBARCHIVE",
"value": "TRUE"
},
diff --git a/appveyor.yml b/appveyor.yml
index 0f1e3af..82e6316 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -22,7 +22,7 @@ build_script:
- cd C:\projects\nifi-minifi-cpp
- mkdir build & exit 0
- cd build
- - cmake -g"Ninja" -DWIN32=WIN32 -DOPENSSL_OFF=ON -DUSE_SYSTEM_ZLIB=OFF -DFORCE_WINDOWS=ON -DUSE_SYSTEM_CURL=OFF -DUSE_SYSTEM_UUID=OFF -DDISABLE_ROCKSDB=ON -DDISABLE_CURL=ON -DDISABLE_LIBARCHIVE=ON -DDISABLE_SCRIPTING=ON -DEXCLUDE_BOOST=ON -DDISABLE_EXPRESSION_LANGUAGE=ON -DENABLE_WEL=TRUE -DSKIP_TESTS=ON ..
+ - cmake -g"Ninja" -DWIN32=WIN32 -DOPENSSL_OFF=ON -DUSE_SYSTEM_ZLIB=OFF -DFORCE_WINDOWS=ON -DUSE_SYSTEM_CURL=OFF -DUSE_SYSTEM_UUID=OFF -DDISABLE_ROCKSDB=ON -DDISABLE_CURL=ON -DDISABLE_LIBARCHIVE=ON -DDISABLE_SCRIPTING=ON -DEXCLUDE_BOOST=ON -DDISABLE_EXPRESSION_LANGUAGE=ON -DENABLE_COAP=OFF -DENABLE_WEL=TRUE -DSKIP_TESTS=ON ..
set msbuild_platform=x64
- msbuild nifi-minifi-cpp.sln
diff --git a/bootstrap.sh b/bootstrap.sh
index 064d87a..2acd303 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -271,6 +271,10 @@ add_disabled_option MQTT_ENABLED ${FALSE} "ENABLE_MQTT"
add_disabled_option PYTHON_ENABLED ${FALSE} "ENABLE_PYTHON"
add_dependency PYTHON_ENABLED "python"
+add_disabled_option COAP_ENABLED ${FALSE} "ENABLE_COAP"
+add_dependency COAP_ENABLED "automake"
+add_dependency COAP_ENABLED "libtool"
+
TESTS_DISABLED=${FALSE}
add_disabled_option SQLITE_ENABLED ${FALSE} "ENABLE_SQLITE"
@@ -377,7 +381,7 @@ build_cmake_command(){
fi
done
if [ "$FOUND" = "1" ]; then
- CMAKE_BUILD_COMMAND="${CMAKE_BUILD_COMMAND} -D${FOUND_VALUE}=TRUE"
+ CMAKE_BUILD_COMMAND="${CMAKE_BUILD_COMMAND} -D${FOUND_VALUE}=ON"
fi
else
FOUND=""
@@ -393,7 +397,7 @@ build_cmake_command(){
done
fi
if [ "$FOUND" = "1" ]; then
- CMAKE_BUILD_COMMAND="${CMAKE_BUILD_COMMAND} -D${FOUND_VALUE}=TRUE"
+ CMAKE_BUILD_COMMAND="${CMAKE_BUILD_COMMAND} -D${FOUND_VALUE}=ON"
fi
fi
done
diff --git a/bstrp_functions.sh b/bstrp_functions.sh
index 32bcde1..0938ed0 100755
--- a/bstrp_functions.sh
+++ b/bstrp_functions.sh
@@ -257,6 +257,7 @@ show_supported_features() {
echo "L. MQTT Support ................$(print_feature_status MQTT_ENABLED)"
echo "M. SQLite Support ..............$(print_feature_status SQLITE_ENABLED)"
echo "N. Python Support ..............$(print_feature_status PYTHON_ENABLED)"
+ echo "O. COAP Support ................$(print_feature_status COAP_ENABLED)"
echo "****************************************"
echo " Build Options."
echo "****************************************"
@@ -273,7 +274,7 @@ show_supported_features() {
read_feature_options(){
local choice
- read -p "Enter choice [ A - N ] " choice
+ read -p "Enter choice [ A - P or 1-2 ] " choice
choice=$(echo ${choice} | tr '[:upper:]' '[:lower:]')
case $choice in
a) ToggleFeature ROCKSDB_ENABLED ;;
@@ -290,6 +291,7 @@ read_feature_options(){
l) ToggleFeature MQTT_ENABLED ;;
m) ToggleFeature SQLLITE_ENABLED ;;
n) ToggleFeature PYTHON_ENABLED ;;
+ o) ToggleFeature COAP_ENABLED ;;
1) ToggleFeature TESTS_DISABLED ;;
2) EnableAllFeatures ;;
p) FEATURES_SELECTED="true" ;;
@@ -298,7 +300,7 @@ read_feature_options(){
fi
;;
q) exit 0;;
- *) echo -e "${RED}Please enter an option A-L...${NO_COLOR}" && sleep 2
+ *) echo -e "${RED}Please enter an option A-P or 1-2...${NO_COLOR}" && sleep 2
esac
}
diff --git a/centos.sh b/centos.sh
index f99d51a..06c77d6 100644
--- a/centos.sh
+++ b/centos.sh
@@ -137,6 +137,8 @@ build_deps(){
install_bison
elif [ "$FOUND_VALUE" = "flex" ]; then
INSTALLED+=("flex")
+ elif [ "$FOUND_VALUE" = "automake" ]; then
+ INSTALLED+=("automake")
elif [ "$FOUND_VALUE" = "python" ]; then
INSTALLED+=("python34-devel")
elif [ "$FOUND_VALUE" = "lua" ]; then
diff --git a/darwin.sh b/darwin.sh
index e02e1eb..d7d574a 100644
--- a/darwin.sh
+++ b/darwin.sh
@@ -102,6 +102,10 @@ build_deps(){
INSTALLED+=("boost")
elif [ "$FOUND_VALUE" = "lua" ]; then
INSTALLED+=("lua")
+ elif [ "$FOUND_VALUE" = "libtool" ]; then
+ INSTALLED+=("libtool")
+ elif [ "$FOUND_VALUE" = "automake" ]; then
+ INSTALLED+=("automake")
elif [ "$FOUND_VALUE" = "gpsd" ]; then
INSTALLED+=("gpsd")
elif [ "$FOUND_VALUE" = "libarchive" ]; then
diff --git a/debian.sh b/debian.sh
index 8a8ae25..ba1ec7d 100644
--- a/debian.sh
+++ b/debian.sh
@@ -69,6 +69,8 @@ build_deps(){
INSTALLED+=("libpython3-dev")
elif [ "$FOUND_VALUE" = "lua" ]; then
INSTALLED+=("liblua5.1-0-dev")
+ elif [ "$FOUND_VALUE" = "automake" ]; then
+ INSTALLED+=("automake")
elif [ "$FOUND_VALUE" = "gpsd" ]; then
INSTALLED+=("libgps-dev")
elif [ "$FOUND_VALUE" = "libarchive" ]; then
diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py
index 3687a0f..d25e538 100644
--- a/docker/test/integration/minifi/test/__init__.py
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -228,7 +228,6 @@ class SingleFileOutputValidator(OutputValidator):
return False
-
class SegfaultValidator(OutputValidator):
"""
Validate that a file was received.
diff --git a/extensions/bustache/CMakeLists.txt b/extensions/bustache/CMakeLists.txt
index 4ae25a0..2efb586 100644
--- a/extensions/bustache/CMakeLists.txt
+++ b/extensions/bustache/CMakeLists.txt
@@ -17,9 +17,9 @@
# under the License.
#
-cmake_minimum_required(VERSION 2.6)
-include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+cmake_minimum_required(VERSION 2.6)
find_package(Boost COMPONENTS system filesystem iostreams REQUIRED)
include_directories(${Boost_INCLUDE_DIRS})
diff --git a/extensions/coap/CMakeLists.txt b/extensions/coap/CMakeLists.txt
new file mode 100644
index 0000000..3f38328
--- /dev/null
+++ b/extensions/coap/CMakeLists.txt
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+include_directories(protocols nanofi controllerservice server)
+include_directories(../http-curl/)
+
+file(GLOB CSOURCES "nanofi/*.c")
+file(GLOB SOURCES "*.cpp" "protocols/*.cpp" "processors/*.cpp" "controllerservice/*.cpp" "server/*.cpp" )
+
+add_library(nanofi-coap-c STATIC ${CSOURCES})
+add_library(minifi-coap STATIC ${SOURCES})
+set_property(TARGET minifi-coap PROPERTY POSITION_INDEPENDENT_CODE ON)
+
+if(CMAKE_THREAD_LIBS_INIT)
+ target_link_libraries(minifi-coap "${CMAKE_THREAD_LIBS_INIT}")
+endif()
+
+ set(BASE_DIR "${CMAKE_CURRENT_BINARY_DIR}/extensions/coap")
+ # determine version of GNUTLSs
+ if (APPLE)
+ set(BYPRODUCT "${BASE_DIR}/extensions/coap/thirdparty/libcoap-src/.libs/libcoap-2.a")
+ else()
+ set(BYPRODUCT "${BASE_DIR}/extensions/coap/thirdparty/libcoap-src/.libs/libcoap-2.a")
+ endif()
+ set(DIR "${BASE_DIR}/extensions/coap/thirdparty/libcoap-src")
+ ExternalProject_Add(
+ coap-external
+ GIT_REPOSITORY "https://github.com/obgm/libcoap.git"
+ GIT_TAG "00486a4f46e0278dd24a8ff3411416ff420cde29"
+ PREFIX "${BASE_DIR}/extensions/coap/thirdparty/libcoap"
+ BUILD_IN_SOURCE true
+ SOURCE_DIR "${DIR}"
+ BUILD_COMMAND make
+ CMAKE_COMMAND ""
+ UPDATE_COMMAND ""
+ BUILD_BYPRODUCTS ${BYPRODUCT}
+ INSTALL_COMMAND cmake -E echo "Skipping install step."
+ CONFIGURE_COMMAND ""
+ PATCH_COMMAND ./autogen.sh && ./configure --disable-examples --disable-tests --disable-documentation
+ STEP_TARGETS build
+ EXCLUDE_FROM_ALL TRUE
+ )
+ add_definitions("-DWITH_POSIX=1")
+
+ add_library(coaplib STATIC IMPORTED)
+ set_target_properties(coaplib PROPERTIES IMPORTED_LOCATION "${BYPRODUCT}")
+ add_dependencies(coaplib coap-external)
+ set(COAP_FOUND "YES" CACHE STRING "" FORCE)
+ set(COAP_INCLUDE_DIRS "${DIR}/include" CACHE STRING "" FORCE)
+ set(COAP_LIBRARIES coaplib CACHE STRING "" FORCE)
+ set(COAP_LIBRARY coaplib CACHE STRING "" FORCE)
+ set(COAP_LIBRARY coaplib CACHE STRING "" FORCE)
+target_link_libraries(minifi-coap ${CMAKE_DL_LIBS})
+
+include_directories(${COAP_INCLUDE_DIRS})
+
+target_link_libraries (nanofi-coap-c ${COAP_LIBRARIES})
+target_link_libraries (nanofi-coap-c ${HTTP-CURL})
+target_link_libraries (minifi-coap nanofi-coap-c ${COAP_LIBRARIES})
+if (WIN32)
+ set_target_properties(minifi-coap PROPERTIES
+ LINK_FLAGS "/WHOLEARCHIVE"
+ )
+elseif (APPLE)
+ set_target_properties(minifi-coap PROPERTIES
+ LINK_FLAGS "-Wl,-all_load"
+ )
+else ()
+ set_target_properties(minifi-coap PROPERTIES
+ LINK_FLAGS "-Wl,--whole-archive"
+ )
+endif ()
+
+SET (COAP-EXTENSION minifi-coap PARENT_SCOPE)
+register_extension(minifi-coap)
diff --git a/extensions/mqtt/protocol/PayloadSerializer.cpp b/extensions/coap/COAPLoader.cpp
similarity index 69%
copy from extensions/mqtt/protocol/PayloadSerializer.cpp
copy to extensions/coap/COAPLoader.cpp
index 5f62227..ea49cd6 100644
--- a/extensions/mqtt/protocol/PayloadSerializer.cpp
+++ b/extensions/coap/COAPLoader.cpp
@@ -15,24 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "PayloadSerializer.h"
+#include "core/FlowConfiguration.h"
+#include "COAPLoader.h"
+
+bool COAPObjectFactory::added = core::FlowConfiguration::add_static_func("createCOAPFactory");
+extern "C" {
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-namespace mqtt {
-PayloadSerializer::PayloadSerializer() {
-}
-PayloadSerializer::~PayloadSerializer() {
+void *createCOAPFactory(void) {
+ return new COAPObjectFactory();
}
-} /* namespace mqtt */
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}
diff --git a/extensions/coap/COAPLoader.h b/extensions/coap/COAPLoader.h
new file mode 100644
index 0000000..ce6383f
--- /dev/null
+++ b/extensions/coap/COAPLoader.h
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_COAPLOADER_H_
+#define EXTENSIONS_COAPLOADER_H_
+
+#ifdef WIN32
+#pragma comment(lib, "wldap32.lib" )
+#pragma comment(lib, "crypt32.lib" )
+#pragma comment(lib, "Ws2_32.lib")
+#endif
+
+#include "core/ClassLoader.h"
+#include "utils/StringUtils.h"
+#include "protocols/CoapC2Protocol.h"
+
+/**
+ * Object factory class loader for this extension.
+ * Can add extensions to the default class loader through REGISTER_RESOURCE,
+ * but we want to ensure this factory is used specifically for CoAP and not the default loader.
+ */
+class COAPObjectFactory : public core::ObjectFactory {
+ public:
+ COAPObjectFactory() {
+
+ }
+
+ /**
+ * Gets the name of the object.
+ * @return class name of processor
+ */
+ virtual std::string getName() override{
+ return "COAPObjectFactory";
+ }
+
+ virtual std::string getClassName() override{
+ return "COAPObjectFactory";
+ }
+ /**
+ * Gets the class name for the object
+ * @return class name for the processor.
+ */
+ virtual std::vector<std::string> getClassNames() override{
+ std::vector<std::string> class_names;
+ class_names.push_back("CoapProtocol");
+ class_names.push_back("CoapConnectorService");
+ return class_names;
+ }
+
+ virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) override{
+ if (utils::StringUtils::equalsIgnoreCase(class_name, "CoapProtocol")) {
+ return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::coap::c2::CoapProtocol>());
+ } else if (utils::StringUtils::equalsIgnoreCase(class_name, "CoapConnectorService")) {
+ return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::coap::controllers::CoapConnectorService>());
+ } else {
+ return nullptr;
+ }
+ }
+
+ static bool added;
+
+};
+
+extern "C" {
+ DLL_EXPORT void *createCOAPFactory(void);
+}
+#endif /* EXTENSIONS_COAPLOADER_H_ */
diff --git a/extensions/coap/controllerservice/CoapConnector.cpp b/extensions/coap/controllerservice/CoapConnector.cpp
new file mode 100644
index 0000000..1d152ff
--- /dev/null
+++ b/extensions/coap/controllerservice/CoapConnector.cpp
@@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CoapConnector.h"
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+#include <string>
+#include <memory>
+#include <set>
+#include "core/Property.h"
+#include "CoapConnector.h"
+#include "io/validation.h"
+#include "properties/Configure.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace coap {
+namespace controllers {
+
+static core::Property RemoteServer;
+static core::Property Port;
+static core::Property MaxQueueSize;
+
+core::Property CoapConnectorService::RemoteServer(core::PropertyBuilder::createProperty("Remote Server")->withDescription("Remote CoAP server")->isRequired(true)->build());
+core::Property CoapConnectorService::Port(
+ core::PropertyBuilder::createProperty("Remote Port")->withDescription("Remote CoAP server port")->withDefaultValue<uint64_t>(8181)->isRequired(true)->build());
+core::Property CoapConnectorService::MaxQueueSize(
+ core::PropertyBuilder::createProperty("Max Queue Size")->withDescription("Max queue size for received data ")->withDefaultValue<uint64_t>(1000)->isRequired(false)->build());
+
+void CoapConnectorService::initialize() {
+ if (initialized_)
+ return;
+
+ CoapMessaging::getInstance();
+
+ std::lock_guard<std::mutex> lock(initialization_mutex_);
+
+ ControllerService::initialize();
+
+ initializeProperties();
+
+ initialized_ = true;
+}
+
+void CoapConnectorService::onEnable() {
+ std::string port_str;
+ if (getProperty(RemoteServer.getName(), host_) && !host_.empty() && getProperty(Port.getName(), port_str) && !port_str.empty()) {
+ core::Property::StringToInt(port_str, port_);
+ } else {
+ // this is the case where we aren't being used in the context of a single controller service.
+ if (configuration_->get("nifi.c2.agent.coap.host", host_) && configuration_->get("nifi.c2.agent.coap.port", port_str)) {
+ core::Property::StringToInt(port_str, port_);
+ }
+
+ }
+}
+
+CoapResponse CoapConnectorService::sendPayload(uint8_t type, const std::string &endpoint, const CoapMessage *message) {
+ // internally we are dealing with CoAPMessage in the two way communication, but the C++ ControllerService
+ // will provide a CoAPResponse
+ auto pdu = create_connection(type, host_.c_str(), endpoint.c_str(), port_, message);
+ send_pdu(pdu);
+ auto response = CoapMessaging::getInstance().pop(pdu->ctx);
+ free_pdu(pdu);
+ return response;
+}
+
+void CoapConnectorService::initializeProperties() {
+ std::set<core::Property> supportedProperties;
+ supportedProperties.insert(RemoteServer);
+ supportedProperties.insert(Port);
+ supportedProperties.insert(MaxQueueSize);
+ setSupportedProperties(supportedProperties);
+}
+
+} /* namespace controllers */
+} /* namespace coap */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/coap/controllerservice/CoapConnector.h b/extensions/coap/controllerservice/CoapConnector.h
new file mode 100644
index 0000000..1570b90
--- /dev/null
+++ b/extensions/coap/controllerservice/CoapConnector.h
@@ -0,0 +1,138 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_COAPCONNECTOR_H_
+#define LIBMINIFI_INCLUDE_CONTROLLERS_COAPCONNECTOR_H_
+
+#include "core/logging/LoggerConfiguration.h"
+#include "core/controller/ControllerService.h"
+
+#include "CoapResponse.h"
+#include "CoapMessaging.h"
+#include "coap_functions.h"
+#include "coap_connection.h"
+#include "coap_message.h"
+#include <memory>
+#include <unordered_map>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace coap {
+namespace controllers {
+
+/**
+ * Purpose and Justification: Controller services function as a layerable way to provide
+ * services to internal services. While a controller service is generally configured from the flow,
+ * we want to follow the open closed principle and provide CoAP services to other components.
+ */
+class CoapConnectorService : public core::controller::ControllerService {
+ public:
+
+ /**
+ * Constructors for the controller service.
+ */
+ explicit CoapConnectorService(const std::string &name, const std::string &id)
+ : ControllerService(name, id),
+ port_(0),
+ initialized_(false),
+ logger_(logging::LoggerFactory<CoapConnectorService>::getLogger()) {
+ initialize();
+ }
+
+ explicit CoapConnectorService(const std::string &name, utils::Identifier uuid = utils::Identifier())
+ : ControllerService(name, uuid),
+ port_(0),
+ initialized_(false),
+ logger_(logging::LoggerFactory<CoapConnectorService>::getLogger()) {
+ initialize();
+ }
+
+ explicit CoapConnectorService(const std::string &name, const std::shared_ptr<Configure> &configuration)
+ : ControllerService(name),
+ port_(0),
+ initialized_(false),
+ logger_(logging::LoggerFactory<CoapConnectorService>::getLogger()) {
+ setConfiguration(configuration);
+ initialize();
+ }
+
+ /**
+ * Parameters needed.
+ */
+ static core::Property RemoteServer;
+ static core::Property Port;
+ static core::Property MaxQueueSize;
+
+ virtual void initialize();
+
+ void yield() {
+
+ }
+
+ bool isRunning() {
+ return getState() == core::controller::ControllerServiceState::ENABLED;
+ }
+
+ bool isWorkAvailable() {
+ return false;
+ }
+
+ virtual void onEnable();
+
+ /**
+ * Sends the payload to the endpoint, returning the response as we await. Will retry transmission
+ * @param type type of payload to endpoint interaction ( GET, POST, PUT, DELETE ).
+ * @param end endpoint is the connecting endpoint on the server
+ * @param payload is the data to be sent
+ * @param size size of the payload to be sent
+ * @return CoAPMessage that contains the response code and data, if any.
+ */
+ CoapResponse sendPayload(uint8_t type, const std::string &endpoint, const CoapMessage * const message);
+
+ protected:
+
+ void initializeProperties();
+
+ // connector mutex to controll access to the mapping, above.
+ std::mutex connector_mutex_;
+
+ // initialization mutex.
+ std::mutex initialization_mutex_;
+
+ std::atomic<bool> initialized_;
+
+ private:
+
+ // host connecting to.
+ std::string host_;
+ // port connecting to
+ unsigned int port_;
+
+ std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namespace controllers */
+} /* namespace coap */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_COAPCONNECTOR_H_ */
diff --git a/extensions/coap/controllerservice/CoapMessaging.h b/extensions/coap/controllerservice/CoapMessaging.h
new file mode 100644
index 0000000..281e5a7
--- /dev/null
+++ b/extensions/coap/controllerservice/CoapMessaging.h
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_COAP_CONTROLLERSERVICE_COAPMESSAGING_H_
+#define EXTENSIONS_COAP_CONTROLLERSERVICE_COAPMESSAGING_H_
+
+#include "CoapResponse.h"
+#include "coap_functions.h"
+#include "coap_connection.h"
+#include "coap_message.h"
+#include <memory>
+#include <unordered_map>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace coap {
+namespace controllers {
+
+class CoapMessaging {
+ public:
+ static CoapMessaging &getInstance() {
+ static CoapMessaging instance;
+ return instance;
+ }
+
+ /**
+ * Determines if the pointer is present in the internal map.
+ */
+ bool hasResponse(coap_context_t *ctx) const {
+ std::lock_guard<std::mutex> lock(connector_mutex_);
+ return messages_.find(ctx) != messages_.end();
+ }
+ /**
+ * Returns a response if one exists.
+ */
+ CoapResponse pop(const coap_context_t * const ctx) {
+ CoapResponse response(-1);
+ std::lock_guard<std::mutex> lock(connector_mutex_);
+ auto msg = messages_.find(const_cast<coap_context_t*>(ctx));
+ if (msg != std::end(messages_)) {
+ response = std::move(msg->second);
+ messages_.erase(const_cast<coap_context_t*>(ctx));
+ }
+ return response;
+ }
+ protected:
+
+ /**
+ * Intended to receive errors from the context.
+ */
+ static void receiveError(void *receiver_context, coap_context_t *ctx, unsigned int code) {
+ CoapMessaging *connector = static_cast<CoapMessaging*>(receiver_context);
+ CoapResponse message(code);
+ connector->enqueueResponse(ctx, std::move(message));
+ }
+
+ /**
+ * Receives messages from the context.
+ */
+ static void receiveMessage(void *receiver_context, coap_context_t *ctx, CoapMessage * const msg) {
+ CoapMessaging *connector = static_cast<CoapMessaging*>(receiver_context);
+ CoapResponse message(msg);
+ connector->enqueueResponse(ctx, std::move(message));
+ }
+
+ void enqueueResponse(coap_context_t *ctx, CoapResponse &&msg) {
+ std::lock_guard<std::mutex> lock(connector_mutex_);
+ messages_.insert(std::make_pair(ctx, std::move(msg)));
+ }
+
+ private:
+ /**
+ * Private constructor since this is intended to be a singleton
+ */
+ CoapMessaging() {
+ callback_pointers ptrs;
+ ptrs.data_received = receiveMessage;
+ ptrs.received_error = receiveError;
+ init_coap_api(this, &ptrs);
+
+ }
+ // connector mutex. mutable since it's used within hasResponse.
+ mutable std::mutex connector_mutex_;
+ // map of messages based on the context. We only allow a single message per context
+ // at any given time.
+ std::unordered_map<coap_context_t*, CoapResponse> messages_;
+
+};
+
+} /* namespace controllers */
+} /* namespace coap */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* EXTENSIONS_COAP_CONTROLLERSERVICE_COAPMESSAGING_H_ */
diff --git a/extensions/coap/controllerservice/CoapResponse.h b/extensions/coap/controllerservice/CoapResponse.h
new file mode 100644
index 0000000..a8c9e33
--- /dev/null
+++ b/extensions/coap/controllerservice/CoapResponse.h
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_COAPRESPONSE_H_
+#define LIBMINIFI_INCLUDE_CONTROLLERS_COAPRESPONSE_H_
+
+#include "coap_message.h"
+
+#include <memory>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace coap {
+namespace controllers {
+/**
+ * Purpose and Justification :CoapMessage is in internal message format that is sent to and from consumers of this controller service.
+ *
+ */
+class CoapResponse {
+ public:
+
+ /**
+ * Creates a CoAPResponse to a CoAPMessage. Takes ownership of the argument
+ * and copies the data.
+ */
+ explicit CoapResponse(CoapMessage * const msg)
+ : code_(msg->code_),
+ size_(msg->size_) {
+ // we take ownership of data_;
+ data_ = std::unique_ptr<uint8_t>(new uint8_t[msg->size_]);
+ memcpy(data_.get(), msg->data_, size_);
+ free_coap_message(msg);
+ }
+
+ explicit CoapResponse(uint32_t code)
+ : code_(code),
+ size_(0),
+ data_(nullptr) {
+
+ }
+
+ CoapResponse(const CoapResponse &other) = delete;
+
+ CoapResponse(CoapResponse &&other) = default;
+
+ ~CoapResponse() {
+ }
+
+ /**
+ * Retrieve the size of the coap response.
+ * @return size_t of size
+ */
+ size_t getSize() const {
+ return size_;
+ }
+
+ /**
+ * Returns a const pointer to the constant data.
+ * @return data pointer.
+ */
+ const uint8_t * const getData() const {
+ return data_.get();
+ }
+
+ /**
+ * Returns the response code from the data.
+ * @return data.
+ */
+ uint32_t getCode() const {
+ return code_;
+ }
+
+ /**
+ * Ease of use function to take ownership of the CoAPResponse.
+ * @param data, data pointer.
+ * @size_t size of the data.
+ */
+ void takeOwnership(uint8_t **data, size_t &size) {
+ size = size_;
+ *data = data_.release();
+ }
+
+ CoapResponse &operator=(const CoapResponse &other) = delete;
+ CoapResponse &operator=(CoapResponse &&other) = default;
+ private:
+ uint32_t code_;
+ size_t size_;
+ std::unique_ptr<uint8_t> data_;
+};
+
+} /* namespace controllers */
+} /* namespace coap */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_COAPRESPONSE_H_ */
diff --git a/extensions/coap/nanofi/coap_connection.c b/extensions/coap/nanofi/coap_connection.c
new file mode 100644
index 0000000..c49c3e2
--- /dev/null
+++ b/extensions/coap/nanofi/coap_connection.c
@@ -0,0 +1,126 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "coap_connection.h"
+
+
+CoapPDU *create_connection(uint8_t type, const char * const server, const char * const endpoint, int port, const CoapMessage * const message) {
+ CoapPDU *pdu = (CoapPDU*)malloc(sizeof(CoapPDU));
+
+ pdu->ctx = NULL;
+ pdu->session = NULL;
+
+ coap_uri_t uri;
+ uri.host.s = (uint8_t*)server; // may be a loss in resolution, but hostnames should be char *
+ uri.host.length = strlen(server);
+ uri.path.s = (uint8_t*)endpoint; // ^ same as above for paths.
+ uri.path.length = strlen(endpoint);
+ uri.port = port;
+ uri.scheme = COAP_URI_SCHEME_COAP;
+
+ fd_set readfds;
+ coap_pdu_t* request;
+ unsigned char get_method = 1;
+
+ int res = resolve_address(&uri.host, &pdu->dst_addr.addr.sa);
+ if (res < 0) {
+ return NULL;
+ }
+
+ pdu->dst_addr.size = res;
+ pdu->dst_addr.addr.sin.sin_port = htons(uri.port);
+
+ void *addrptr = NULL;
+ char port_str[NI_MAXSERV] = "0";
+
+ char node_str[NI_MAXHOST] = "";
+ switch (pdu->dst_addr.addr.sa.sa_family) {
+ case AF_INET:
+ addrptr = &pdu->dst_addr.addr.sin.sin_addr;
+ if (!create_session(&pdu->ctx, &pdu->session, node_str[0] == 0 ? "0.0.0.0" : node_str, port_str, &pdu->dst_addr)) {
+ break;
+ } else {
+ return NULL;
+ }
+ case AF_INET6:
+ addrptr = &pdu->dst_addr.addr.sin6.sin6_addr;
+ if (!create_session(&pdu->ctx, &pdu->session, node_str[0] == 0 ? "::" : node_str, port_str, &pdu->dst_addr)) {
+ break;
+ } else {
+ return NULL;
+ }
+ default:
+ ;
+ }
+
+ // we want to register handlers in the event that an error occurs or nack is returned
+ // from the library
+ coap_register_event_handler(pdu->ctx, coap_event);
+ coap_register_nack_handler(pdu->ctx, no_acknowledgement);
+
+ coap_context_set_keepalive(pdu->ctx, 1);
+
+ coap_str_const_t pld;
+ pld.length = message->size_;
+ pld.s = message->data_;
+
+ coap_register_option(pdu->ctx, COAP_OPTION_BLOCK2);
+
+ // set the response handler
+ coap_register_response_handler(pdu->ctx, response_handler);
+
+ pdu->session->max_retransmit = 1;
+ pdu->optlist = NULL;
+
+ // add the URI option to the options list
+ coap_insert_optlist(&pdu->optlist, coap_new_optlist(COAP_OPTION_URI_PATH, uri.path.length, uri.path.s));
+
+ // next, create the PDU.
+ if (!(request = create_request(pdu->ctx, pdu->session, &pdu->optlist, type, &pld)))
+ return NULL;
+
+ // send the PDU using the session.
+ coap_send(pdu->session, request);
+ return pdu;
+}
+
+int8_t send_pdu(const CoapPDU * const pdu) {
+ uint64_t wait_ms = 1 * 200;
+ // run once will attempt to send the first time
+ int runResponse = coap_run_once(pdu->ctx, wait_ms);
+ // if no data is received, we will attempt re-transmission
+ // until the number of attempts has been reached.
+ while (!coap_can_exit(pdu->ctx)) {
+ runResponse = coap_run_once(pdu->ctx, wait_ms);
+ }
+ if (runResponse < 0)
+ return -1;
+ else
+ return 0;
+}
+
+int8_t free_pdu(CoapPDU * pdu) {
+ if (NULL == pdu) {
+ return -1;
+ }
+ coap_delete_optlist(pdu->optlist);
+ coap_session_release(pdu->session);
+ coap_free_context(pdu->ctx);
+ free(pdu);
+ return 0;
+}
+
diff --git a/extensions/coap/nanofi/coap_connection.h b/extensions/coap/nanofi/coap_connection.h
new file mode 100644
index 0000000..6a762d2
--- /dev/null
+++ b/extensions/coap/nanofi/coap_connection.h
@@ -0,0 +1,68 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_COAP_NANOFI_COAP_CONNECTION_H_
+#define EXTENSIONS_COAP_NANOFI_COAP_CONNECTION_H_
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <coap2/coap.h>
+#include <netdb.h>
+#include "coap_message.h"
+#include "coap_functions.h"
+
+
+
+typedef struct {
+ struct coap_context_t* ctx;
+ struct coap_session_t* session;
+ coap_address_t dst_addr;
+ coap_address_t src_addr;
+ coap_optlist_t *optlist;
+} CoapPDU;
+
+
+/**
+ * Creates a connection to the server
+ * @param type connection type
+ * @param server server name
+ * @param endpoint endpoint to connect to
+ * @param port CoAP port
+ * @param message message to send
+ * @return CoAPPDU object.
+ */
+CoapPDU *create_connection(uint8_t type, const char * const server, const char * const endpoint, int port, const CoapMessage * const message);
+
+/**
+ * Sends the pdu
+ * @param pdu to send
+ * @return result 0 if success, failure otherwise
+ */
+int8_t send_pdu(const CoapPDU *const pdu);
+/**
+ * Frees the pdu
+ * @param pdu to free
+ * @return result 0 if success, failure otherwise
+ */
+
+int8_t free_pdu(CoapPDU *pdu);
+
+#ifdef __cplusplus
+}
+#endif
+#endif /* EXTENSIONS_COAP_NANOFI_COAP_CONNECTION_H_ */
diff --git a/extensions/coap/nanofi/coap_functions.c b/extensions/coap/nanofi/coap_functions.c
new file mode 100644
index 0000000..c66cf3d
--- /dev/null
+++ b/extensions/coap/nanofi/coap_functions.c
@@ -0,0 +1,208 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "coap_functions.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Initialize the API access. Not thread safe.
+ */
+void init_coap_api(void *rcvr, callback_pointers *ptrs) {
+ global_ptrs.data_received = ptrs->data_received;
+ global_ptrs.received_error = ptrs->received_error;
+ receiver = rcvr;
+}
+
+int create_session(coap_context_t **ctx, coap_session_t **session, const char *node, const char *port, coap_address_t *dst_addr) {
+ int getaddrres;
+ struct addrinfo hints;
+ coap_proto_t proto = COAP_PROTO_UDP;
+ struct addrinfo *result, *rp;
+
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_UNSPEC; // ipv4 or ipv6
+ hints.ai_socktype = COAP_PROTO_RELIABLE(proto) ? SOCK_STREAM : SOCK_DGRAM;
+ hints.ai_flags = AI_PASSIVE | AI_NUMERICHOST | AI_NUMERICSERV | AI_ALL;
+
+ getaddrres = getaddrinfo(node, port, &hints, &result);
+ if (getaddrres != 0) {
+ return -1;
+ }
+
+ for (rp = result; rp != NULL; rp = rp->ai_next) {
+ coap_address_t addr;
+
+ if (rp->ai_addrlen <= sizeof(addr.addr)) {
+ coap_address_init(&addr);
+ addr.size = rp->ai_addrlen;
+ memcpy(&addr.addr, rp->ai_addr, rp->ai_addrlen);
+
+ *ctx = coap_new_context(0x00);
+
+ *session = coap_new_client_session(*ctx, &addr, dst_addr, proto);
+ if (*ctx && *session) {
+ freeaddrinfo(result);
+ return 0;
+ }
+ }
+ }
+
+ freeaddrinfo(result);
+ return -2;
+}
+
+int create_endpoint_context(coap_context_t **ctx, const char *node, const char *port) {
+ struct addrinfo hints;
+ coap_proto_t proto = COAP_PROTO_UDP;
+ struct addrinfo *result, *rp;
+
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_UNSPEC; // ipv4 or ipv6
+ hints.ai_socktype = SOCK_DGRAM;
+ hints.ai_flags = AI_PASSIVE | AI_NUMERICHOST ;
+
+ int getaddrres = getaddrinfo(node, port, &hints, &result);
+ if (getaddrres != 0) {
+ return -1;
+ }
+
+ for (rp = result; rp != NULL; rp = rp->ai_next) {
+ coap_address_t addr;
+
+ if (rp->ai_addrlen <= sizeof(addr.addr)) {
+ coap_address_init(&addr);
+ addr.size = rp->ai_addrlen;
+ memcpy(&addr.addr, rp->ai_addr, rp->ai_addrlen);
+
+ *ctx = coap_new_context(0x00);
+
+ coap_endpoint_t * ep_udp = coap_new_endpoint(*ctx, &addr, COAP_PROTO_UDP);
+
+ if (*ctx && ep_udp) {
+ freeaddrinfo(result);
+ return 0;
+ }
+ }
+ }
+
+ freeaddrinfo(result);
+ return -2;
+}
+
+struct coap_pdu_t *create_request(struct coap_context_t *ctx, struct coap_session_t *session, coap_optlist_t **optlist, unsigned char code, coap_str_const_t *ptr) {
+ coap_pdu_t *pdu;
+
+ if (!(pdu = coap_new_pdu(session)))
+ return NULL;
+
+ pdu->type = COAP_MESSAGE_CON;
+ pdu->tid = coap_new_message_id(session);
+ pdu->code = code;
+
+ if (optlist) {
+ coap_add_optlist_pdu(pdu, optlist);
+ }
+
+ int flags = 0;
+ coap_add_data(pdu, ptr->length, ptr->s);
+ return pdu;
+}
+
+int coap_event(struct coap_context_t *ctx, coap_event_t event, struct coap_session_t *session) {
+ if (event == COAP_EVENT_SESSION_FAILED && global_ptrs.received_error) {
+ global_ptrs.received_error(receiver, ctx, -1);
+ }
+ return 0;
+}
+
+void no_acknowledgement(struct coap_context_t *ctx, coap_session_t *session, coap_pdu_t *sent, coap_nack_reason_t reason, const coap_tid_t id) {
+ if (global_ptrs.received_error) {
+ global_ptrs.received_error(receiver, ctx, -1);
+ }
+}
+
+void response_handler(struct coap_context_t *ctx, struct coap_session_t *session, coap_pdu_t *sent, coap_pdu_t *received, const coap_tid_t id) {
+ unsigned char* data;
+ size_t data_len;
+ coap_opt_iterator_t opt_iter;
+ coap_opt_t * block_opt = coap_check_option(received, COAP_OPTION_BLOCK1, &opt_iter);
+ if (block_opt) {
+ printf("Block option not currently supported");
+ } else {
+ if (!global_ptrs.data_received) {
+ return;
+ }
+
+ if (COAP_RESPONSE_CLASS(received->code) == 2 || received->code == COAP_RESPONSE_400) {
+ if (global_ptrs.data_received) {
+ CoapMessage * const msg = create_coap_message(received);
+ global_ptrs.data_received(receiver, ctx, msg);
+ }
+ } else {
+ if (global_ptrs.received_error)
+ global_ptrs.received_error(receiver, ctx, received->code);
+ }
+ }
+
+}
+
+int resolve_address(const struct coap_str_const_t *server, struct sockaddr *dst) {
+ struct addrinfo *res, *ainfo;
+ struct addrinfo hints;
+ static char addrstr[256];
+ int error, len = -1;
+
+ memset(addrstr, 0, sizeof(addrstr));
+ if (server->length)
+ memcpy(addrstr, server->s, server->length);
+ else
+ memcpy(addrstr, "127.0.0.1", 9);
+
+ memset((char * ) &hints, 0, sizeof(hints));
+ hints.ai_socktype = SOCK_DGRAM;
+ hints.ai_family = AF_UNSPEC;
+
+ error = getaddrinfo(addrstr, NULL, &hints, &res);
+
+ if (error != 0) {
+ return error;
+ }
+
+ for (ainfo = res; ainfo != NULL; ainfo = ainfo->ai_next) {
+ switch (ainfo->ai_family) {
+ case AF_INET6:
+ case AF_INET:
+ len = ainfo->ai_addrlen;
+ memcpy(dst, ainfo->ai_addr, len);
+ freeaddrinfo(res);
+ return len;
+ default:
+ ;
+ }
+ }
+
+ freeaddrinfo(res);
+ return len;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
diff --git a/extensions/coap/nanofi/coap_functions.h b/extensions/coap/nanofi/coap_functions.h
new file mode 100644
index 0000000..c86362d
--- /dev/null
+++ b/extensions/coap/nanofi/coap_functions.h
@@ -0,0 +1,126 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_COAP_NANOFI_COAP_FUNCTIONS_H_
+#define EXTENSIONS_COAP_NANOFI_COAP_FUNCTIONS_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+
+typedef unsigned char method_t;
+
+#include "coap2/coap.h"
+#include "coap2/uri.h"
+#include "coap2/address.h"
+#include <stdio.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <string.h>
+#include <netdb.h>
+#include "coap_message.h"
+
+
+typedef struct {
+ void (*data_received)(void *receiver_context, struct coap_context_t *ctx, CoapMessage *const);
+ void (*received_error)(void *receiver_context, struct coap_context_t *ctx, unsigned int code);
+} callback_pointers;
+
+// defines the context specific data for the data receiver
+static void *receiver;
+static callback_pointers global_ptrs;
+
+/**
+ * Initialize the API access. Not thread safe.
+ */
+void init_coap_api(void *rcvr, callback_pointers *ptrs);
+
+/**
+ * Creates a CoAP session. Provide it double pointer to the context and session to instantiate those structs.
+ * @param ctx coap context
+ * @param session coap session
+ * @param node device node name
+ * @param dst_addr destination address
+ * @return 0 if success -1 otherwise.
+ */
+int create_session(coap_context_t **ctx, coap_session_t **session, const char *node, const char *port, coap_address_t *dst_addr);
+
+/**
+ * Creates an endpoint context
+ * @param ctx pointer to a context
+ * @param node device node name
+ * @param port device port
+ */
+int create_endpoint_context(coap_context_t **ctx, const char *node, const char *port);
+
+/**
+ * Creates a request object with an already allocated context and session. The option list is sent in as an arry ptr.
+ * @param ctx coap context
+ * @param session coap session
+ * @param optlist option list array
+ * @param code coap request code
+ * @param ptr ptr to the coap payload
+ * @returns pointer to a newly formed PDU ( protocol data unit )
+ */
+struct coap_pdu_t *create_request(struct coap_context_t *ctx, struct coap_session_t *session, coap_optlist_t **optlist, unsigned char code, coap_str_const_t *ptr);
+
+/**
+ * Function can be used to receive coap events
+ * @param ctx context that performed event
+ * @param event coap event launched
+ * @param session session that performed event.
+ * @return 0 as a success ( events in this library aren't noteworthy to PDU )
+ */
+int coap_event(struct coap_context_t *ctx, coap_event_t event, struct coap_session_t *session);
+
+/**
+ * Function can be used when a noack is received from the library
+ * @param ctx context that performed event
+ * @param session session that performed event.
+ * @parma sent pdu that was sent
+ * @param reason reason PDU was not acked
+ * @param event coap event launched
+ * @param id id of ack
+ */
+void no_acknowledgement(struct coap_context_t *ctx, coap_session_t *session, coap_pdu_t *sent, coap_nack_reason_t reason, const coap_tid_t id);
+
+/**
+ * Responser handler is the function launched when data is received
+ * @param ctx context
+ * @param session coap session
+ * @param sent PDU that was sent
+ * @param received PDU that was received
+ * @param id id of ack
+ */
+void response_handler(struct coap_context_t *ctx, struct coap_session_t *session, coap_pdu_t *sent, coap_pdu_t *received, const coap_tid_t id);
+
+/**
+ * Resolves the destination address of the server and places that into dst
+ * @param server server to connect to
+ * @param dst destination pointer
+ * @return 0 if sucess -1 otherwise
+ */
+int resolve_address(const struct coap_str_const_t *server, struct sockaddr *dst);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* EXTENSIONS_COAP_NANOFI_COAP_FUNCTIONS_H_ */
+
diff --git a/extensions/mqtt/protocol/PayloadSerializer.cpp b/extensions/coap/nanofi/coap_message.c
similarity index 69%
copy from extensions/mqtt/protocol/PayloadSerializer.cpp
copy to extensions/coap/nanofi/coap_message.c
index 5f62227..403bf5f 100644
--- a/extensions/mqtt/protocol/PayloadSerializer.cpp
+++ b/extensions/coap/nanofi/coap_message.c
@@ -15,24 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "PayloadSerializer.h"
+#include "coap_message.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-namespace mqtt {
-
-PayloadSerializer::PayloadSerializer() {
+CoapMessage * const create_coap_message(const coap_pdu_t * const pdu) {
+ CoapMessage *message = (CoapMessage*) malloc(sizeof(CoapMessage));
+ coap_get_data(pdu, &message->size_, &message->data_);
+ message->code_ = pdu->code;
+ return message;
}
-PayloadSerializer::~PayloadSerializer() {
+void free_coap_message(CoapMessage *msg) {
+ free(msg);
}
-
-} /* namespace mqtt */
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/extensions/mqtt/protocol/PayloadSerializer.cpp b/extensions/coap/nanofi/coap_message.h
similarity index 50%
copy from extensions/mqtt/protocol/PayloadSerializer.cpp
copy to extensions/coap/nanofi/coap_message.h
index 5f62227..f2c4486 100644
--- a/extensions/mqtt/protocol/PayloadSerializer.cpp
+++ b/extensions/coap/nanofi/coap_message.h
@@ -15,24 +15,41 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "PayloadSerializer.h"
+#ifndef EXTENSIONS_COAP_NANOFI_COAP_MESSAGE_H
+#define EXTENSIONS_COAP_NANOFI_COAP_MESSAGE_H
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-namespace mqtt {
-PayloadSerializer::PayloadSerializer() {
-}
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <coap2/coap.h>
+#include <stdint.h>
+#include <stdio.h>
+
+
+
+/**
+ * CoAP-2 in libcoap uses uint8_t * while the first version uses a different type, so we will have to cast
+ * the data. We have to keep this in mind with the API that we use.
+ */
+typedef struct {
+ uint32_t code_;
+ size_t size_;
+ uint8_t *data_;
+} CoapMessage;
+
+/**
+ * Create a new CoAMessage, taking ownership of the aforementioned buffers
+ */
+CoapMessage * const create_coap_message(const coap_pdu_t * const pdu);
+/**
+ * FRee the CoAP messages that are provided.
+ */
+void free_coap_message(CoapMessage *msg);
-PayloadSerializer::~PayloadSerializer() {
+#ifdef __cplusplus
}
+#endif
-} /* namespace mqtt */
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+#endif /* EXTENSIONS_COAP_NANOFI_COAP_CONNECTION_H_ */
diff --git a/extensions/coap/nanofi/coap_server.c b/extensions/coap/nanofi/coap_server.c
new file mode 100644
index 0000000..cd94bef
--- /dev/null
+++ b/extensions/coap/nanofi/coap_server.c
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "coap_server.h"
+#include "coap_functions.h"
+/**
+ * Create a new CoAPServer
+ */
+CoapServerContext * const create_server(const char *const server_hostname, const char * const port){
+ CoapServerContext *server = (CoapServerContext*)malloc(sizeof(CoapServerContext));
+ memset(server,0x00, sizeof(CoapServerContext));
+ if ( create_endpoint_context(&server->ctx,server_hostname,port) ) {
+ free_server(server);
+ }
+
+ return server;
+}
+
+CoapEndpoint *const create_endpoint(CoapServerContext * const server, const char * const resource_path, uint8_t method, coap_method_handler_t handler){
+ CoapEndpoint *endpoint = (CoapEndpoint*)malloc(sizeof(CoapEndpoint));
+ memset(endpoint,0x00, sizeof(CoapEndpoint));
+ endpoint->server = server;
+ int8_t flags = COAP_RESOURCE_FLAGS_NOTIFY_CON;
+ coap_str_const_t *path = NULL;
+ if (NULL != resource_path){
+ path = coap_new_str_const((const uint8_t *)resource_path,strlen(resource_path));
+ }
+ endpoint->resource = coap_resource_init(path, flags);
+ coap_add_attr(endpoint->resource, coap_make_str_const("title"), coap_make_str_const("\"Internal Clock\""), 0);
+ assert( !add_endpoint(endpoint,method,handler) );
+ coap_add_resource(server->ctx,endpoint->resource);
+ if (path){
+ coap_delete_str_const(path);
+ }
+ return endpoint;
+
+}
+
+int8_t add_endpoint(CoapEndpoint * const endpoint, uint8_t method, coap_method_handler_t handler){
+ if (endpoint == NULL || handler == NULL)
+ return -1;
+
+ coap_register_handler(endpoint->resource, method, handler);
+ return 0;
+}
+
+
+/**
+ * FRee the CoAP messages that are provided.
+ */
+void free_endpoint(CoapEndpoint * const endpoint){
+ if (endpoint){
+ free((void*)endpoint);
+ }
+}
+void free_server(CoapServerContext * const server){
+ if (server){
+ coap_delete_all_resources( server->ctx );
+ coap_free_context( server->ctx );
+ free(server);
+ }
+}
diff --git a/extensions/coap/nanofi/coap_server.h b/extensions/coap/nanofi/coap_server.h
new file mode 100644
index 0000000..f4292a7
--- /dev/null
+++ b/extensions/coap/nanofi/coap_server.h
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_COAP_NANOFI_COAP_SERVER_H
+#define EXTENSIONS_COAP_NANOFI_COAP_SERVER_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <coap2/coap.h>
+
+#include <stdint.h>
+#include <stdio.h>
+
+/**
+ * CoAP-2 in libcoap uses uint8_t * while the first version uses a different type, so we will have to cast
+ * the data. We have to keep this in mind with the API that we use.
+ */
+typedef struct {
+ struct coap_context_t* ctx;
+ coap_address_t src_addr;
+ coap_optlist_t *optlist;
+} CoapServerContext;
+
+
+typedef struct {
+ CoapServerContext *server;
+ coap_resource_t *resource;
+} CoapEndpoint;
+
+
+/**
+ * Create a new CoAPServer using the host name provide
+ * @param server_hostname hostname
+ * @param port port requested
+ * @param title title of base resource
+ * @return CoAPServer structure.
+ */
+CoapServerContext * const create_server(const char *const server_hostname, const char * const port);
+
+/**
+ * Creates an endpoint for the provided service context
+ */
+CoapEndpoint * const create_endpoint(CoapServerContext * const, const char * const resource_path, uint8_t method, coap_method_handler_t handler);
+
+/**
+ * Adds an endpoint to the provided CoapEndpoint structure
+ * @param endpoint endpoint we are adding the handler to
+ * @method method we're adding for CoAP messages
+ * @param handler handler we're using for the endpoint.
+ */
+int8_t add_endpoint(CoapEndpoint * const endpoint, uint8_t method, coap_method_handler_t handler);
+
+
+/**
+ * Free the CoAP messages that are provided.
+ */
+void free_endpoint(CoapEndpoint * const);
+/**
+ * Frees the CoAP Server context.
+ */
+void free_server(CoapServerContext * const);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* EXTENSIONS_COAP_NANOFI_COAP_SERVER_H */
diff --git a/extensions/coap/protocols/CoapC2Protocol.cpp b/extensions/coap/protocols/CoapC2Protocol.cpp
new file mode 100644
index 0000000..1eb8f63
--- /dev/null
+++ b/extensions/coap/protocols/CoapC2Protocol.cpp
@@ -0,0 +1,300 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "CoapC2Protocol.h"
+#include "c2/PayloadSerializer.h"
+#include "c2/PayloadParser.h"
+#include "coap_functions.h"
+#include "coap_message.h"
+#include "io/BaseStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace coap {
+namespace c2 {
+
+uint8_t CoapProtocol::REGISTRATION_MSG[8] = { 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72 };
+
+CoapProtocol::CoapProtocol(const std::string &name, const utils::Identifier &uuid)
+ : RESTSender(name, uuid),
+ require_registration_(false),
+ logger_(logging::LoggerFactory<CoapProtocol>::getLogger()) {
+}
+
+CoapProtocol::~CoapProtocol() {
+}
+
+void CoapProtocol::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+ RESTSender::initialize(controller, configure);
+ if (configure->get("nifi.c2.coap.connector.service", controller_service_name_)) {
+ auto service = controller->getControllerService(controller_service_name_);
+ coap_service_ = std::static_pointer_cast<coap::controllers::CoapConnectorService>(service);
+ } else {
+ logger_->log_info("No CoAP connector configured, so using default service");
+ coap_service_ = std::make_shared<coap::controllers::CoapConnectorService>("cs", configure);
+ coap_service_->onEnable();
+ }
+}
+
+minifi::c2::C2Payload CoapProtocol::consumePayload(const std::string &url, const minifi::c2::C2Payload &payload, minifi::c2::Direction direction, bool async) {
+ return RESTSender::consumePayload(url, payload, direction, false);
+}
+
+int CoapProtocol::writeAcknowledgement(io::BaseStream *stream, const minifi::c2::C2Payload &payload) {
+ auto ident = payload.getIdentifier();
+ auto state = payload.getStatus().getState();
+ stream->writeUTF(ident);
+ uint8_t payloadState = 0;
+ switch (state) {
+ case state::UpdateState::NESTED:
+ case state::UpdateState::INITIATE:
+ case state::UpdateState::FULLY_APPLIED:
+ case state::UpdateState::READ_COMPLETE:
+ payloadState = 0;
+ break;
+ case state::UpdateState::NOT_APPLIED:
+ case state::UpdateState::PARTIALLY_APPLIED:
+ payloadState = 1;
+ break;
+ case state::UpdateState::READ_ERROR:
+ payloadState = 2;
+ break;
+ case state::UpdateState::SET_ERROR:
+ payloadState = 3;
+ break;
+ }
+ stream->write(&payloadState, 1);
+ return 0;
+}
+
+int CoapProtocol::writeHeartbeat(io::BaseStream *stream, const minifi::c2::C2Payload &payload) {
+ bool byte;
+ uint16_t size = 0;
+
+ logger_->log_trace("Writing heartbeat");
+
+ try {
+ const std::string deviceIdent = minifi::c2::PayloadParser::getInstance(payload).in("deviceInfo").getAs<std::string>("identifier");
+
+ const std::string agentIdent = minifi::c2::PayloadParser::getInstance(payload).in("agentInfo").getAs<std::string>("identifier");
+
+ stream->writeUTF(deviceIdent, false);
+
+ logger_->log_trace("Writing heartbeat with device Ident %s and agent Ident %s", deviceIdent, agentIdent);
+
+ if (agentIdent.empty()) {
+ return -1;
+ }
+ stream->writeUTF(agentIdent, false);
+
+ try {
+ auto flowInfoParser = minifi::c2::PayloadParser::getInstance(payload).in("flowInfo");
+ auto componentParser = flowInfoParser.in("components");
+ auto queueParser = flowInfoParser.in("queues");
+ auto vfsParser = flowInfoParser.in("versionedFlowSnapshotURI");
+ byte = true;
+ stream->write(byte);
+ size = componentParser.getSize();
+ stream->write(size);
+
+ componentParser.foreach([this, stream](const minifi::c2::C2Payload &component) {
+ auto myParser = minifi::c2::PayloadParser::getInstance(component);
+ bool running = false;
+ stream->writeUTF(component.getLabel());
+ try {
+ running = myParser.getAs<bool>("running");
+ }
+ catch(const minifi::c2::PayloadParseException &e) {
+ logger_->log_error("Could not find running in components");
+ }
+ stream->write(running);
+ });
+ size = queueParser.getSize();
+ stream->write(size);
+ queueParser.foreach([this, stream](const minifi::c2::C2Payload &component) {
+ auto myParser = minifi::c2::PayloadParser::getInstance(component);
+ stream->writeUTF(component.getLabel());
+ uint64_t datasize = 0, datasizemax = 0, qsize = 0, sizemax = 0;
+ try {
+ datasize = myParser.getAs<uint64_t>("dataSize");
+ datasizemax = myParser.getAs<uint64_t>("dataSizeMax");
+ qsize = myParser.getAs<uint64_t>("size");
+ sizemax = myParser.getAs<uint64_t>("sizeMax");
+ }
+ catch(const minifi::c2::PayloadParseException &e) {
+ logger_->log_error("Could not find queue sizes");
+ }
+ stream->write(datasize);
+ stream->write(datasizemax);
+ stream->write(qsize);
+ stream->write(sizemax);
+ });
+
+ auto bucketId = vfsParser.getAs<std::string>("bucketId");
+ auto flowId = vfsParser.getAs<std::string>("flowId");
+
+ stream->writeUTF(bucketId);
+ stream->writeUTF(flowId);
+
+ } catch (const minifi::c2::PayloadParseException &pe) {
+ logger_->log_error("Parser exception occurred, but is ignorable, reason %s", pe.what());
+ // okay to ignore
+ byte = false;
+ stream->write(byte);
+ }
+ } catch (const minifi::c2::PayloadParseException &e) {
+ logger_->log_error("Parser exception occurred, reason %s", e.what());
+ return -1;
+ }
+ return 0;
+}
+
+minifi::c2::Operation CoapProtocol::getOperation(int type) const {
+ switch (type) {
+ case 0:
+ return minifi::c2::ACKNOWLEDGE;
+ case 1:
+ return minifi::c2::HEARTBEAT;
+ case 2:
+ return minifi::c2::CLEAR;
+ case 3:
+ return minifi::c2::DESCRIBE;
+ case 4:
+ return minifi::c2::RESTART;
+ case 5:
+ return minifi::c2::START;
+ case 6:
+ return minifi::c2::UPDATE;
+ case 7:
+ return minifi::c2::STOP;
+ }
+ return minifi::c2::ACKNOWLEDGE;
+}
+
+minifi::c2::C2Payload CoapProtocol::serialize(const minifi::c2::C2Payload &payload) {
+ if (nullptr == coap_service_) {
+ // return an error if we don't have a coap service
+ logger_->log_error("CoAP service requested without any configured hostname or port");
+ return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+ }
+
+ if (require_registration_) {
+ logger_->log_debug("Server requested agent registration, so attempting");
+ auto response = minifi::c2::RESTSender::consumePayload(rest_uri_, payload, minifi::c2::TRANSMIT, false);
+ if (response.getStatus().getState() == state::UpdateState::READ_ERROR) {
+ logger_->log_trace("Could not register");
+ return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
+ } else {
+ logger_->log_trace("Registered agent.");
+ }
+ require_registration_ = false;
+
+ return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
+
+ }
+
+ uint16_t version = 0;
+ uint8_t payload_type = 0;
+ uint64_t payload_u64 = 0;
+ uint16_t size = 0;
+ io::BaseStream stream;
+
+ stream.write(version);
+ std::string endpoint = "heartbeat";
+ switch (payload.getOperation()) {
+ case minifi::c2::ACKNOWLEDGE:
+ endpoint = "acknowledge";
+ payload_type = 0;
+ stream.write(&payload_type, 1);
+ if (writeAcknowledgement(&stream, payload) != 0) {
+ return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+ }
+ break;
+ case minifi::c2::HEARTBEAT:
+ payload_type = 1;
+ stream.write(&payload_type, 1);
+ if (writeHeartbeat(&stream, payload) != 0) {
+ logger_->log_error("Could not write heartbeat");
+ return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+ }
+ break;
+ default:
+ logger_->log_error("Could not identify operation");
+ return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+ };
+
+ size_t bsize = stream.getSize();
+
+ CoapMessage msg;
+ msg.data_ = const_cast<uint8_t *>(stream.getBuffer());
+ msg.size_ = bsize;
+
+ coap::controllers::CoapResponse message = coap_service_->sendPayload(COAP_REQUEST_POST, endpoint, &msg);
+
+ if (isRegistrationMessage(message)) {
+ require_registration_ = true;
+ } else if (message.getSize() > 0) {
+ io::DataStream byteStream(message.getData(), message.getSize());
+ io::BaseStream responseStream(&byteStream);
+ responseStream.read(version);
+ responseStream.read(size);
+ logger_->log_trace("Received ack. version %d. number of operations %d", version, size);
+ minifi::c2::C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true);
+ for (int i = 0; i < size; i++) {
+
+ uint8_t operationType;
+ uint16_t argsize = 0;
+ std::string operand, id;
+ REQUIRE_SIZE_IF(1, responseStream.read(operationType))
+ REQUIRE_VALID(responseStream.readUTF(id, false))
+ REQUIRE_VALID(responseStream.readUTF(operand, false))
+
+ logger_->log_trace("Received op %d, with id %s and operand %s", operationType, id, operand);
+ auto newOp = getOperation(operationType);
+ minifi::c2::C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, true);
+ nested_payload.setIdentifier(id);
+ minifi::c2::C2ContentResponse new_command(newOp);
+ new_command.delay = 0;
+ new_command.required = true;
+ new_command.ttl = -1;
+ new_command.name = operand;
+ new_command.ident = id;
+ responseStream.read(argsize);
+ for (int j = 0; j < argsize; j++) {
+ std::string key, value;
+ REQUIRE_VALID(responseStream.readUTF(key))
+ REQUIRE_VALID(responseStream.readUTF(value))
+ new_command.operation_arguments[key] = value;
+ }
+
+ nested_payload.addContent(std::move(new_command));
+ new_payload.addPayload(std::move(nested_payload));
+ }
+ return new_payload;
+ }
+
+ return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+}
+
+} /* namespace c2 */
+} /* namespace coap */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/coap/protocols/CoapC2Protocol.h b/extensions/coap/protocols/CoapC2Protocol.h
new file mode 100644
index 0000000..0fe9395
--- /dev/null
+++ b/extensions/coap/protocols/CoapC2Protocol.h
@@ -0,0 +1,149 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_COAPPROTOCOL_H_
+#define EXTENSIONS_COAPPROTOCOL_H_
+
+#include <algorithm>
+#include <iostream>
+#include <memory>
+#include <utility>
+#include <map>
+#include <string>
+#include <vector>
+
+#include "c2/C2Protocol.h"
+#include "io/BaseStream.h"
+#include "agent/agent_version.h"
+#include "CoapConnector.h"
+
+#include "coap2/coap.h"
+#include "coap2/uri.h"
+#include "coap2/address.h"
+#include <stdio.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <string.h>
+#include "protocols/RESTSender.h"
+
+#undef RAPIDJSON_ASSERT
+#define RAPIDJSON_ASSERT(x) if(!(x)) throw std::logic_error("rapidjson exception"); //NOLINT
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace coap {
+namespace c2 {
+
+#define REQUIRE_VALID(x) \
+ if (-1 == x){ \
+ return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true); \
+ }
+
+#define REQUIRE_SIZE_IF(y,x) \
+ if (y != x){ \
+ return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true); \
+ }
+
+/**
+ * CoAP is the Constrained Application Protocol, which defines a specialized web transfer protocol that can be
+ * used on devices with constrained resources.
+ */
+class CoapProtocol : public minifi::c2::RESTSender {
+ public:
+ explicit CoapProtocol(const std::string &name, const utils::Identifier &uuid = utils::Identifier());
+
+ virtual ~CoapProtocol();
+
+ /**
+ * Consume the payload.
+ * @param url to evaluate.
+ * @param payload payload to consume.
+ * @param direction direction of operation.
+ */
+ virtual minifi::c2::C2Payload consumePayload(const std::string &url, const minifi::c2::C2Payload &payload, minifi::c2::Direction direction, bool async) override;
+
+ virtual minifi::c2::C2Payload consumePayload(const minifi::c2::C2Payload &payload, minifi::c2::Direction direction, bool async) override {
+ return serialize(payload);
+ }
+
+ virtual void update(const std::shared_ptr<Configure> &configure) override {
+ // no op.
+ }
+
+ virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
+
+ // Supported Properties
+
+ protected:
+
+ bool isRegistrationMessage(controllers::CoapResponse &response) {
+ if (LIKELY(response.getSize() != 8)) {
+ return false;
+ }
+ return response.getCode() == COAP_RESPONSE_400 && !memcmp(response.getData(), REGISTRATION_MSG, response.getSize());
+ }
+
+ /**
+ * Returns the operation for the translated integer
+ * @param type input type
+ * @return Operation
+ */
+ minifi::c2::Operation getOperation(int type) const;
+
+ /**
+ * Writes a heartbeat to the provided BaseStream ptr.
+ * @param stream BaseStream
+ * @param payload payload to serialize
+ * @return result 0 if success failure otherwise
+ */
+ int writeHeartbeat(io::BaseStream *stream, const minifi::c2::C2Payload &payload);
+
+ /**
+ * Writes a acknowledgement to the provided BaseStream ptr.
+ * @param stream BaseStream
+ * @param payload payload to serialize
+ * @return result 0 if success failure otherwise
+ */
+ int writeAcknowledgement(io::BaseStream *stream, const minifi::c2::C2Payload &payload);
+
+ minifi::c2::C2Payload serialize(const minifi::c2::C2Payload &payload);
+
+ std::shared_ptr<coap::controllers::CoapConnectorService> coap_service_;
+
+ std::mutex protocol_mutex_;
+
+ bool require_registration_;
+
+ std::string controller_service_name_;
+
+ private:
+
+ static uint8_t REGISTRATION_MSG[8];
+
+ std::shared_ptr<logging::Logger> logger_;
+
+};
+} /* namespace c2 */
+} /* namespace coap */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* EXTENSIONS_COAPPROTOCOL_H_ */
diff --git a/extensions/mqtt/protocol/PayloadSerializer.cpp b/extensions/coap/server/CoapServer.cpp
similarity index 75%
copy from extensions/mqtt/protocol/PayloadSerializer.cpp
copy to extensions/coap/server/CoapServer.cpp
index 5f62227..d8cbd7a 100644
--- a/extensions/mqtt/protocol/PayloadSerializer.cpp
+++ b/extensions/coap/server/CoapServer.cpp
@@ -15,23 +15,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "PayloadSerializer.h"
+#include "CoapServer.h"
+#include <coap2/utlist.h>
+#include <coap2/coap.h>
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
-namespace c2 {
-namespace mqtt {
+namespace coap {
-PayloadSerializer::PayloadSerializer() {
-}
-PayloadSerializer::~PayloadSerializer() {
+std::map<coap_resource_t*, std::function<CoapResponse(CoapQuery)>> CoapServer::functions_;
+CoapServer::~CoapServer() {
+ running_ = false;
+ future.get();
+ if(server_){
+ free_server(server_);
+ }
}
-} /* namespace mqtt */
-} /* namespace c2 */
+
+} /* namespace coap */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
diff --git a/extensions/coap/server/CoapServer.h b/extensions/coap/server/CoapServer.h
new file mode 100644
index 0000000..e929925
--- /dev/null
+++ b/extensions/coap/server/CoapServer.h
@@ -0,0 +1,245 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef EXTENSIONS_COAP_SERVER_COAPSERVER_H_
+#define EXTENSIONS_COAP_SERVER_COAPSERVER_H_
+#include "core/Connectable.h"
+#include "coap_server.h"
+#include "coap_message.h"
+#include <coap2/coap.h>
+#include <functional>
+#include <thread>
+#include <future>
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace coap {
+
+enum METHOD {
+ GET,
+ POST,
+ PUT,
+ DELETE
+};
+
+/**
+ * CoapQuery is the request message sent by the client
+ */
+class CoapQuery {
+ public:
+ CoapQuery(const std::string &query, std::unique_ptr<CoapMessage, decltype(&free_coap_message)> message)
+ : query_(query),
+ message_(std::move(message)) {
+
+ }
+ virtual ~CoapQuery() {
+ }
+ CoapQuery(const CoapQuery &qry) = delete;
+ CoapQuery(CoapQuery &&qry) = default;
+
+ private:
+ std::string query_;
+ std::unique_ptr<CoapMessage, decltype(&free_coap_message)> message_;
+};
+
+/**
+ * Coap Response that is generated by the CoapServer to the library
+ */
+class CoapResponse {
+ friend class CoapQuery;
+ public:
+ CoapResponse(int code, std::unique_ptr<uint8_t> data, size_t size)
+ : code_(code),
+ data_(std::move(data)),
+ size_(size) {
+
+ }
+
+ int getCode() const {
+ return code_;
+ }
+ size_t getSize() const {
+ return size_;
+ }
+
+ uint8_t * const getData() const {
+ return data_.get();
+ }
+
+ CoapResponse(const CoapResponse &qry) = delete;
+ CoapResponse(CoapResponse &&qry) = default;
+ CoapResponse &operator=(CoapResponse &&qry) = default;
+ private:
+ int code_;
+ std::unique_ptr<uint8_t> data_;
+ size_t size_;
+};
+
+/**
+ * Wrapper for the coap server functionality using an async callback to perform
+ * custom operations. Intended to be used for testing, but may provide capabilities
+ * elsewhere.
+ */
+class CoapServer : public core::Connectable {
+ public:
+ explicit CoapServer(const std::string &name, const utils::Identifier &uuid)
+ : core::Connectable(name, uuid),
+ server_(nullptr),
+ port_(0) {
+ //TODO: this allows this class to be instantiated via the the class loader
+ //need to define this capability in the future.
+ }
+ CoapServer(const std::string &hostname, uint16_t port)
+ : core::Connectable(hostname),
+ hostname_(hostname),
+ server_(nullptr),
+ port_(port) {
+ coap_startup();
+ auto port_str = std::to_string(port_);
+ //coap_set_log_level(coap_log_t::LOG_DEBUG);
+ server_ = create_server(hostname_.c_str(), port_str.c_str());
+ }
+
+ virtual ~CoapServer();
+
+ void start() {
+ running_ = true;
+
+ future = std::async(std::launch::async, [&]() -> uint64_t {
+ while (running_) {
+ int res = coap_run_once(server_->ctx, 100);
+ if (res < 0 ) {
+ break;
+ }
+ coap_check_notify(server_->ctx);
+ }
+ return 0;
+
+ });
+ }
+
+ void add_endpoint(const std::string &path, METHOD method, std::function<CoapResponse(CoapQuery)> functor) {
+ unsigned char mthd = 0;
+ switch (method) {
+ case GET:
+ mthd = COAP_REQUEST_GET;
+ break;
+ case POST:
+ mthd = COAP_REQUEST_POST;
+ break;
+ case PUT:
+ mthd = COAP_REQUEST_PUT;
+ break;
+ case DELETE:
+ mthd = COAP_REQUEST_DELETE;
+ break;
+ }
+ auto current_endpoint = endpoints_.find(path);
+ if (current_endpoint != endpoints_.end()) {
+ ::add_endpoint(current_endpoint->second, mthd, handle_response_with_passthrough);
+ } else {
+ CoapEndpoint * const endpoint = create_endpoint(server_, path.c_str(), mthd, handle_response_with_passthrough);
+ functions_.insert(std::make_pair(endpoint->resource, functor));
+ endpoints_.insert(std::make_pair(path, endpoint));
+ }
+ }
+
+ void add_endpoint(METHOD method, std::function<CoapResponse(CoapQuery)> functor) {
+ unsigned char mthd = 0;
+ switch (method) {
+ case GET:
+ mthd = COAP_REQUEST_GET;
+ break;
+ case POST:
+ mthd = COAP_REQUEST_POST;
+ break;
+ case PUT:
+ mthd = COAP_REQUEST_PUT;
+ break;
+ case DELETE:
+ mthd = COAP_REQUEST_DELETE;
+ break;
+ }
+ CoapEndpoint * const endpoint = create_endpoint(server_, NULL, mthd, handle_response_with_passthrough);
+ functions_.insert(std::make_pair(endpoint->resource, functor));
+ endpoints_.insert(std::make_pair("", endpoint));
+ }
+
+ /**
+ * Determines if we are connected and operating
+ */
+ virtual bool isRunning() {
+ return running_.load();
+ }
+
+ /**
+ * Block until work is available on any input connection, or the given duration elapses
+ * @param timeoutMs timeout in milliseconds
+ */
+ void waitForWork(uint64_t timeoutMs);
+
+ virtual void yield() {
+
+ }
+
+ /**
+ * Determines if work is available by this connectable
+ * @return boolean if work is available.
+ */
+ virtual bool isWorkAvailable() {
+ return true;
+ }
+
+ protected:
+
+ static void handle_response_with_passthrough(coap_context_t *ctx, struct coap_resource_t *resource, coap_session_t *session, coap_pdu_t *request, coap_binary_t *token, coap_string_t *query,
+ coap_pdu_t *response) {
+
+ auto fx = functions_.find(resource);
+ if (fx != functions_.end()) {
+ auto message = create_coap_message(request);
+ CoapQuery qry("", std::unique_ptr<CoapMessage, decltype(&free_coap_message)>(message, free_coap_message));
+ // call the UDF
+ auto udfResponse = fx->second(std::move(qry));
+ response = coap_pdu_init(COAP_MESSAGE_CON, COAP_RESPONSE_CODE(udfResponse.getCode()), coap_new_message_id(session), udfResponse.getSize() + 1);
+ coap_add_data(response, udfResponse.getSize(), udfResponse.getData());
+ if (coap_send(session, response) == COAP_INVALID_TID) {
+ printf("error while returning response");
+ }
+
+ }
+
+ }
+
+ std::future<uint64_t> future;
+ std::atomic<bool> running_;
+ std::string hostname_;
+ CoapServerContext *server_;
+ static std::map<coap_resource_t*, std::function<CoapResponse(CoapQuery)>> functions_;
+ std::map<std::string, CoapEndpoint*> endpoints_;
+ uint16_t port_;
+};
+
+} /* namespace coap */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* EXTENSIONS_COAP_SERVER_COAPSERVER_H_ */
diff --git a/extensions/coap/tests/CMakeLists.txt b/extensions/coap/tests/CMakeLists.txt
new file mode 100644
index 0000000..df97e82
--- /dev/null
+++ b/extensions/coap/tests/CMakeLists.txt
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB CURL_UNIT_TESTS "unit/*.cpp")
+file(GLOB CURL_INTEGRATION_TESTS "*.cpp")
+
+SET(CURL_INT_TEST_COUNT 0)
+
+FOREACH(testfile ${CURL_INTEGRATION_TESTS})
+ get_filename_component(testfilename "${testfile}" NAME_WE)
+ add_executable("${testfilename}" "${testfile}")
+ target_include_directories(${testfilename} BEFORE PRIVATE ${COAP_INCLUDE_DIRS})
+ target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS})
+ target_include_directories(${testfilename} BEFORE PRIVATE "${CIVET_THIRDPARTY_ROOT}/include")
+ target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../server")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../nanofi")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../protocols")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../controllerservice")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../../http-curl/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../../http-curl/client/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../../http-curl/processors/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../../http-curl/protocols/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../../http-curl/sitetosite/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/civetweb/")
+ target_include_directories(${testfilename} BEFORE PRIVATE ./include)
+ createTests("${testfilename}")
+ if (APPLE)
+ target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl minifi-coap minifi-civet-extensions)
+ else ()
+ target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl minifi-coap minifi-civet-extensions -Wl,--no-whole-archive)
+ endif()
+ MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1")
+ENDFOREACH()
+
+message("-- Finished building ${CURL_INT_TEST_COUNT} CoAP integration test file(s)...")
+
+add_test(NAME CoapC2VerifyHeartbeat COMMAND CoapC2VerifyHeartbeat "${TEST_RESOURCES}/CoapC2VerifyServe.yml" "${TEST_RESOURCES}/" "http://localhost:9888/geturl")
diff --git a/extensions/http-curl/tests/C2VerifyServeResults.cpp b/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
similarity index 55%
copy from extensions/http-curl/tests/C2VerifyServeResults.cpp
copy to extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
index 1d3e03f..e363195 100644
--- a/extensions/http-curl/tests/C2VerifyServeResults.cpp
+++ b/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
@@ -45,11 +45,14 @@
#include "RemoteProcessorGroupPort.h"
#include "core/ConfigurableComponent.h"
#include "controllers/SSLContextService.h"
-#include "TestServer.h"
#include "c2/C2Agent.h"
#include "protocols/RESTReceiver.h"
-#include "HTTPIntegrationBase.h"
+#include "CoapIntegrationBase.h"
#include "processors/LogAttribute.h"
+#include "CoapC2Protocol.h"
+#include "CoapServer.h"
+#include "io/BaseStream.h"
+#include "concurrentqueue.h"
class Responder : public CivetHandler {
public:
@@ -70,9 +73,9 @@ class Responder : public CivetHandler {
bool isSecure;
};
-class VerifyC2Server : public HTTPIntegrationBase {
+class VerifyCoAPServer : public CoapIntegrationBase {
public:
- explicit VerifyC2Server(bool isSecure)
+ explicit VerifyCoAPServer(bool isSecure)
: isSecure(isSecure) {
char format[] = "/tmp/ssth.XXXXXX";
dir = testController.createTempDirectory(format);
@@ -80,11 +83,12 @@ class VerifyC2Server : public HTTPIntegrationBase {
void testSetup() {
LogTestController::getInstance().setDebug<utils::HTTPClient>();
- LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
+ LogTestController::getInstance().setOff<processors::InvokeHTTP>();
LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
- LogTestController::getInstance().setDebug<processors::LogAttribute>();
- LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
+ LogTestController::getInstance().setTrace<minifi::coap::c2::CoapProtocol>();
+ LogTestController::getInstance().setOff<processors::LogAttribute>();
+ LogTestController::getInstance().setOff<minifi::core::ProcessSession>();
std::fstream file;
ss << dir << "/" << "tstFile.ext";
file.open(ss.str(), std::ios::out);
@@ -97,10 +101,12 @@ class VerifyC2Server : public HTTPIntegrationBase {
}
void runAssertions() {
- assert(LogTestController::getInstance().contains("Import offset 0") == true);
-
- assert(LogTestController::getInstance().contains("Outputting success and response") == true);
+ std::this_thread::sleep_for(std::chrono::milliseconds(3000));
+ assert(LogTestController::getInstance().contains("Received ack. version 3. number of operations 1") == true);
+ assert(LogTestController::getInstance().contains("Received ack. version 3. number of operations 0") == true);
+ assert(LogTestController::getInstance().contains("Received error event from protocol") == true);
+ assert(LogTestController::getInstance().contains("Received op 1, with id id and operand operand") == true);
}
void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
@@ -114,16 +120,76 @@ class VerifyC2Server : public HTTPIntegrationBase {
inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
std::string port, scheme, path;
+
parse_http_components(url, port, scheme, path);
+ auto new_port_str = std::to_string(std::stoi(port) + 2);
+
+ server = std::unique_ptr<minifi::coap::CoapServer>(new minifi::coap::CoapServer("127.0.0.1", std::stoi(port) + 2));
+
+ server->add_endpoint(minifi::coap::METHOD::POST, [](minifi::coap::CoapQuery)->minifi::coap::CoapResponse {
+ minifi::coap::CoapResponse response(205,0x00,0);
+ return response;
+
+ });
+
+ {
+ // valid response version 3, 0 ops
+ uint8_t *data = new uint8_t[5] { 0x00, 0x03, 0x00, 0x01, 0x00 };
+ minifi::coap::CoapResponse response(205, std::unique_ptr<uint8_t>(data), 5);
+ responses.enqueue(std::move(response));
+ }
+
+ {
+ // valid response
+ uint8_t *data = new uint8_t[5] { 0x00, 0x03, 0x00, 0x00, 0x00 };
+ minifi::coap::CoapResponse response(205, std::unique_ptr<uint8_t>(data), 5);
+ responses.enqueue(std::move(response));
+ }
+
+ {
+ // should result in valid operation
+ minifi::io::BaseStream stream;
+ uint16_t version=0, size=1;
+ uint8_t operation = 1;
+ stream.write(version);
+ stream.write(size);
+ stream.write(&operation,1);
+ stream.writeUTF("id");
+ stream.writeUTF("operand");
+
+ uint8_t *data = new uint8_t[ stream.getSize() ];
+ memcpy(data,stream.getBuffer(), stream.getSize() );
+ minifi::coap::CoapResponse response(205, std::unique_ptr<uint8_t>(data), stream.getSize());
+ responses.enqueue(std::move(response));
+ }
+
+ server->add_endpoint("heartbeat", minifi::coap::METHOD::POST, [&](minifi::coap::CoapQuery)-> minifi::coap::CoapResponse {
+ if (responses.size_approx() > 0) {
+ minifi::coap::CoapResponse resp(500,0,0);;
+ responses.try_dequeue(resp);
+ return resp;
+ }
+ else {
+ minifi::coap::CoapResponse response(500,0,0);
+ return response;
+ }
+
+ });
+
+ server->start();
configuration->set("c2.enable", "true");
configuration->set("c2.agent.class", "test");
- configuration->set("c2.agent.heartbeat.reporter.classes", "RESTReceiver");
- configuration->set("c2.rest.listener.port", port);
+ configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformation,FlowInformation,RepositoryMetrics");
+ configuration->set("nifi.c2.agent.protocol.class", "CoapProtocol");
+ configuration->set("nifi.c2.agent.coap.host", "127.0.0.1");
+ configuration->set("nifi.c2.agent.coap.port", new_port_str);
configuration->set("c2.agent.heartbeat.period", "10");
configuration->set("c2.rest.listener.heartbeat.rooturi", path);
}
protected:
+ moodycamel::ConcurrentQueue<minifi::coap::CoapResponse> responses;
+ std::unique_ptr<minifi::coap::CoapServer> server;
bool isSecure;
char *dir;
std::stringstream ss;
@@ -142,7 +208,7 @@ int main(int argc, char **argv) {
isSecure = true;
}
- VerifyC2Server harness(isSecure);
+ VerifyCoAPServer harness(isSecure);
harness.setKeyDir(key_dir);
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
similarity index 50%
copy from extensions/http-curl/tests/HTTPIntegrationBase.h
copy to extensions/coap/tests/CoapIntegrationBase.h
index 1bc4c72..85af8c6 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -32,30 +32,69 @@ int ssl_enable(void *ssl_context, void *user_data) {
return 0;
}
-class HTTPIntegrationBase : public IntegrationBase {
+class CoapIntegrationBase : public IntegrationBase {
public:
- HTTPIntegrationBase(uint64_t waitTime = 60000)
+ CoapIntegrationBase(uint64_t waitTime = 60000)
: IntegrationBase(waitTime),
server(nullptr) {
}
void setUrl(std::string url, CivetHandler *handler);
- virtual ~HTTPIntegrationBase();
+ virtual ~CoapIntegrationBase();
- void shutdownBeforeFlowController() {
+ void shutdownBeforeFlowController() override {
stop_webserver(server);
}
+ virtual void run(std::string test_file_location) override {
+ testSetup();
+
+ std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
+
+ configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(configuration);
+ std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
+ std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+ new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
+
+ core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+
+ std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
+ std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
+
+ queryRootProcessGroup(pg);
+
+ ptr.release();
+
+ std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
+
+ std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
+ true);
+
+ controller->load();
+ controller->start();
+ waitToVerifyProcessor();
+
+ shutdownBeforeFlowController();
+ controller->waitUnload(wait_time_);
+ runAssertions();
+
+ cleanup();
+ }
+
protected:
CivetServer *server;
};
-HTTPIntegrationBase::~HTTPIntegrationBase() {
+CoapIntegrationBase::~CoapIntegrationBase() {
}
-void HTTPIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
+void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
parse_http_components(url, port, scheme, path);
struct mg_callbacks callback;
diff --git a/extensions/gps/CMakeLists.txt b/extensions/gps/CMakeLists.txt
index 8d6dd35..8a67e8d 100644
--- a/extensions/gps/CMakeLists.txt
+++ b/extensions/gps/CMakeLists.txt
@@ -17,9 +17,7 @@
# under the License.
#
-cmake_minimum_required(VERSION 2.6)
-
-include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
find_package(LibGPS REQUIRED)
diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index d607664..3940918 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -250,7 +250,7 @@ bool HTTPClient::submit() {
curl_easy_getinfo(http_session_, CURLINFO_RESPONSE_CODE, &http_code);
curl_easy_getinfo(http_session_, CURLINFO_CONTENT_TYPE, &content_type_str_);
if (res != CURLE_OK) {
- logger_->log_error("curl_easy_perform() failed %s\n", curl_easy_strerror(res));
+ logger_->log_error("curl_easy_perform() failed %s on %s\n", curl_easy_strerror(res), url_);
return false;
}
@@ -301,11 +301,11 @@ const char *HTTPClient::getContentType() {
}
const std::vector<char> &HTTPClient::getResponseBody() {
- if (response_body_.size() == 0){
- if (callback && callback->ptr){
+ if (response_body_.size() == 0) {
+ if (callback && callback->ptr) {
response_body_ = callback->ptr->to_string();
- }else{
- response_body_ = read_callback_.to_string();
+ } else {
+ response_body_ = read_callback_.to_string();
}
}
return response_body_;
@@ -384,10 +384,9 @@ bool HTTPClient::isSecure(const std::string &url) {
}
void HTTPClient::setInterface(const std::string &ifc) {
- curl_easy_setopt(http_session_, CURLOPT_INTERFACE, ifc.c_str());
+ curl_easy_setopt(http_session_, CURLOPT_INTERFACE, ifc.c_str());
}
-
} /* namespace utils */
} /* namespace minifi */
} /* namespace nifi */
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index 9b4ce5e..11d53e2 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -36,7 +36,7 @@ namespace nifi {
namespace minifi {
namespace c2 {
-RESTSender::RESTSender(std::string name, utils::Identifier uuid)
+RESTSender::RESTSender(const std::string &name, const utils::Identifier &uuid)
: C2Protocol(name, uuid),
logger_(logging::LoggerFactory<Connectable>::getLogger()) {
}
@@ -83,6 +83,9 @@ void RESTSender::update(const std::shared_ptr<Configure> &configure) {
}
const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) {
+ if (url.empty()){
+ return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+ }
utils::HTTPClient client(url, ssl_context_service_);
client.setConnectionTimeout(2);
std::unique_ptr<utils::ByteInputCallBack> input = nullptr;
@@ -117,6 +120,7 @@ const C2Payload RESTSender::sendPayload(const std::string url, const Direction d
}
bool isOkay = client.submit();
int64_t respCode = client.getResponseCode();
+ auto rs = client.getResponseBody();
if (isOkay && respCode) {
if (payload.isRaw()) {
C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true);
diff --git a/extensions/http-curl/protocols/RESTSender.h b/extensions/http-curl/protocols/RESTSender.h
index 4873ae3..ccb7b03 100644
--- a/extensions/http-curl/protocols/RESTSender.h
+++ b/extensions/http-curl/protocols/RESTSender.h
@@ -34,6 +34,9 @@ namespace nifi {
namespace minifi {
namespace c2 {
+#undef RAPIDJSON_ASSERT
+#define RAPIDJSON_ASSERT(x) if(!(x)) throw std::logic_error("rapidjson exception"); //NOLINT
+
/**
* Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
*
@@ -45,7 +48,7 @@ namespace c2 {
class RESTSender : public RESTProtocol, public C2Protocol {
public:
- explicit RESTSender(std::string name, utils::Identifier uuid = utils::Identifier());
+ explicit RESTSender(const std::string &name, const utils::Identifier &uuid = utils::Identifier());
virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) override;
@@ -61,10 +64,11 @@ class RESTSender : public RESTProtocol, public C2Protocol {
std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
- private:
- std::shared_ptr<logging::Logger> logger_;
std::string rest_uri_;
std::string ack_uri_;
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
};
REGISTER_RESOURCE(RESTSender, "Encapsulates the restful protocol that is built upon C2Protocol.");
diff --git a/extensions/http-curl/tests/C2NullConfiguration.cpp b/extensions/http-curl/tests/C2NullConfiguration.cpp
index 02474d1..4850844 100644
--- a/extensions/http-curl/tests/C2NullConfiguration.cpp
+++ b/extensions/http-curl/tests/C2NullConfiguration.cpp
@@ -52,7 +52,7 @@
#include "processors/LogAttribute.h"
#include "HTTPIntegrationBase.h"
-class VerifyC2Server : public HTTPIntegrationBase {
+class VerifyC2Server : public CoapIntegrationBase {
public:
explicit VerifyC2Server(bool isSecure)
: isSecure(isSecure) {
diff --git a/extensions/http-curl/tests/C2UpdateAgentTest.cpp b/extensions/http-curl/tests/C2UpdateAgentTest.cpp
index d1e83aa..049f506 100644
--- a/extensions/http-curl/tests/C2UpdateAgentTest.cpp
+++ b/extensions/http-curl/tests/C2UpdateAgentTest.cpp
@@ -98,7 +98,7 @@ class ConfigHandler : public CivetHandler {
int main(int argc, char **argv) {
mg_init_library(0);
LogTestController::getInstance().setInfo<minifi::FlowController>();
- LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
+ LogTestController::getInstance().setTrace<minifi::utils::HTTPClient>();
LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
@@ -142,6 +142,7 @@ int main(int argc, char **argv) {
configuration->set("c2.enable", "true");
configuration->set("c2.agent.class", "test");
+ configuration->set("c2.agent.update.allow","true");
configuration->set("c2.rest.url", "http://localhost:7072/update");
configuration->set("c2.agent.heartbeat.period", "1000");
mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
@@ -178,6 +179,7 @@ int main(int argc, char **argv) {
auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
std::string logs = LogTestController::getInstance().log_output.str();
assert(logs.find("removing file") != std::string::npos);
+ assert(logs.find("May not have command processor") != std::string::npos);
LogTestController::getInstance().reset();
rmdir("./content_repository");
assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
diff --git a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
index 60738e4..d299851 100644
--- a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
+++ b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
@@ -117,7 +117,7 @@ class Responder : public CivetHandler {
bool isSecure;
};
-class VerifyC2Heartbeat : public HTTPIntegrationBase {
+class VerifyC2Heartbeat : public CoapIntegrationBase {
public:
explicit VerifyC2Heartbeat(bool isSecure)
: isSecure(isSecure) {
diff --git a/extensions/http-curl/tests/C2VerifyServeResults.cpp b/extensions/http-curl/tests/C2VerifyServeResults.cpp
index 1d3e03f..601a5a2 100644
--- a/extensions/http-curl/tests/C2VerifyServeResults.cpp
+++ b/extensions/http-curl/tests/C2VerifyServeResults.cpp
@@ -70,7 +70,7 @@ class Responder : public CivetHandler {
bool isSecure;
};
-class VerifyC2Server : public HTTPIntegrationBase {
+class VerifyC2Server : public CoapIntegrationBase {
public:
explicit VerifyC2Server(bool isSecure)
: isSecure(isSecure) {
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 1bc4c72..85cae26 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -32,16 +32,16 @@ int ssl_enable(void *ssl_context, void *user_data) {
return 0;
}
-class HTTPIntegrationBase : public IntegrationBase {
+class CoapIntegrationBase : public IntegrationBase {
public:
- HTTPIntegrationBase(uint64_t waitTime = 60000)
+ CoapIntegrationBase(uint64_t waitTime = 60000)
: IntegrationBase(waitTime),
server(nullptr) {
}
void setUrl(std::string url, CivetHandler *handler);
- virtual ~HTTPIntegrationBase();
+ virtual ~CoapIntegrationBase();
void shutdownBeforeFlowController() {
stop_webserver(server);
@@ -51,11 +51,11 @@ class HTTPIntegrationBase : public IntegrationBase {
CivetServer *server;
};
-HTTPIntegrationBase::~HTTPIntegrationBase() {
+CoapIntegrationBase::~CoapIntegrationBase() {
}
-void HTTPIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
+void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
parse_http_components(url, port, scheme, path);
struct mg_callbacks callback;
diff --git a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
index b908208..de7113b 100644
--- a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
+++ b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
@@ -50,10 +50,10 @@
#include "HTTPHandlers.h"
#include "client/HTTPStream.h"
-class SiteToSiteTestHarness : public HTTPIntegrationBase {
+class SiteToSiteTestHarness : public CoapIntegrationBase {
public:
explicit SiteToSiteTestHarness(bool isSecure)
- : HTTPIntegrationBase(2000), isSecure(isSecure) {
+ : CoapIntegrationBase(2000), isSecure(isSecure) {
char format[] = "/tmp/ssth.XXXXXX";
dir = testController.createTempDirectory(format);
}
diff --git a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
index 554db26..9b149f9 100644
--- a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
+++ b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
@@ -50,7 +50,7 @@
#include "../tests/TestServer.h"
#include "HTTPIntegrationBase.h"
-class HttpTestHarness : public HTTPIntegrationBase {
+class HttpTestHarness : public CoapIntegrationBase {
public:
HttpTestHarness() {
char format[] = "/tmp/ssth.XXXXXX";
diff --git a/extensions/http-curl/tests/SiteToSiteRestTest.cpp b/extensions/http-curl/tests/SiteToSiteRestTest.cpp
index 4252564..63721e1 100644
--- a/extensions/http-curl/tests/SiteToSiteRestTest.cpp
+++ b/extensions/http-curl/tests/SiteToSiteRestTest.cpp
@@ -77,7 +77,7 @@ class Responder : public CivetHandler {
bool isSecure;
};
-class SiteToSiteTestHarness : public HTTPIntegrationBase {
+class SiteToSiteTestHarness : public CoapIntegrationBase {
public:
explicit SiteToSiteTestHarness(bool isSecure)
: isSecure(isSecure) {
diff --git a/extensions/mqtt/processors/ConvertBase.cpp b/extensions/mqtt/processors/ConvertBase.cpp
index 10571d4..02a8d3d 100644
--- a/extensions/mqtt/processors/ConvertBase.cpp
+++ b/extensions/mqtt/processors/ConvertBase.cpp
@@ -26,7 +26,7 @@
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "ConvertBase.h"
-#include "PayloadSerializer.h"
+#include "c2/PayloadSerializer.h"
#include "utils/ByteArrayCallback.h"
namespace org {
namespace apache {
diff --git a/extensions/mqtt/processors/ConvertHeartBeat.cpp b/extensions/mqtt/processors/ConvertHeartBeat.cpp
index cf5574f..a240d16 100644
--- a/extensions/mqtt/processors/ConvertHeartBeat.cpp
+++ b/extensions/mqtt/processors/ConvertHeartBeat.cpp
@@ -26,7 +26,7 @@
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "ConvertHeartBeat.h"
-#include "PayloadSerializer.h"
+#include "c2/PayloadSerializer.h"
#include "utils/ByteArrayCallback.h"
namespace org {
namespace apache {
@@ -49,7 +49,7 @@ void ConvertHeartBeat::onTrigger(const std::shared_ptr<core::ProcessContext> &co
// while we have heartbeats we can continue to loop.
while (mqtt_service_->get(100, listening_topic, heartbeat)) {
if (heartbeat.size() > 0) {
- c2::C2Payload payload = c2::mqtt::PayloadSerializer::deserialize(heartbeat);
+ c2::C2Payload payload = c2::PayloadSerializer::deserialize(heartbeat);
auto serialized = serializeJsonRootPayload(payload);
logger_->log_debug("Converted JSON output %s", serialized);
minifi::utils::StreamOutputCallback byteCallback(serialized.size() + 1);
diff --git a/extensions/mqtt/processors/ConvertJSONAck.cpp b/extensions/mqtt/processors/ConvertJSONAck.cpp
index 6ea3eaa..a8f978f 100644
--- a/extensions/mqtt/processors/ConvertJSONAck.cpp
+++ b/extensions/mqtt/processors/ConvertJSONAck.cpp
@@ -27,7 +27,7 @@
#include "utils/StringUtils.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
-#include "PayloadSerializer.h"
+#include "c2/PayloadSerializer.h"
#include "utils/ByteArrayCallback.h"
namespace org {
namespace apache {
@@ -98,7 +98,7 @@ void ConvertJSONAck::onTrigger(const std::shared_ptr<core::ProcessContext> &cont
std::string str(callback.buffer_.data(),callback.buffer_.size());
auto payload = parseJsonResponse(response_payload, callback.buffer_);
- auto stream = c2::mqtt::PayloadSerializer::serialize(payload);
+ auto stream = c2::PayloadSerializer::serialize(1,payload);
mqtt_service_->send(topic, stream->getBuffer(), stream->getSize());
diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.cpp b/extensions/mqtt/protocol/MQTTC2Protocol.cpp
index 7590617..7a5df3d 100644
--- a/extensions/mqtt/protocol/MQTTC2Protocol.cpp
+++ b/extensions/mqtt/protocol/MQTTC2Protocol.cpp
@@ -86,12 +86,12 @@ C2Payload MQTTC2Protocol::serialize(const C2Payload &payload) {
std::lock_guard<std::mutex> lock(input_mutex_);
- auto stream = c2::mqtt::PayloadSerializer::serialize(payload);
+ auto stream = c2::PayloadSerializer::serialize(0x00, payload);
auto transmit_id = mqtt_service_->send(heartbeat_topic_, stream->getBuffer(), stream->getSize());
std::vector<uint8_t> response;
if (transmit_id > 0 && mqtt_service_->awaitResponse(5000, transmit_id, in_topic_, response)) {
- return c2::mqtt::PayloadSerializer::deserialize(response);
+ return c2::PayloadSerializer::deserialize(response);
}
return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
}
diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.h b/extensions/mqtt/protocol/MQTTC2Protocol.h
index 45c7938..47e04a6 100644
--- a/extensions/mqtt/protocol/MQTTC2Protocol.h
+++ b/extensions/mqtt/protocol/MQTTC2Protocol.h
@@ -30,7 +30,7 @@
#include "c2/C2Protocol.h"
#include "io/BaseStream.h"
#include "agent/agent_version.h"
-#include "PayloadSerializer.h"
+#include "c2/PayloadSerializer.h"
namespace org {
namespace apache {
diff --git a/fedora.sh b/fedora.sh
index 0bf7d4b..db45485 100644
--- a/fedora.sh
+++ b/fedora.sh
@@ -68,6 +68,9 @@ build_deps(){
INSTALLED+=("python3-devel")
elif [ "$FOUND_VALUE" = "lua" ]; then
INSTALLED+=("lua-devel")
+ elif [ "$FOUND_VALUE" = "automake" ]; then
+ INSTALLED+=("autoconf")
+ INSTALLED+=("automake")
elif [ "$FOUND_VALUE" = "gpsd" ]; then
INSTALLED+=("gpsd-devel")
elif [ "$FOUND_VALUE" = "libarchive" ]; then
diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h
index f804c2b..974750a 100644
--- a/libminifi/include/c2/C2Payload.h
+++ b/libminifi/include/c2/C2Payload.h
@@ -44,6 +44,10 @@ enum Operation {
TRANSFER
};
+#define PAYLOAD_NO_STATUS 0
+#define PAYLOAD_SUCCESS 1
+#define PAYLOAD_FAILURE 2
+
enum Direction {
TRANSMIT,
RECEIVE
@@ -108,7 +112,9 @@ class C2Payload : public state::Update {
}
- C2Payload(Operation op, std::string identifier, bool resp = false, bool isRaw = false);
+ C2Payload(Operation op, const std::string &identifier, bool resp = false, bool isRaw = false);
+
+ C2Payload(Operation op, state::UpdateState state,const std::string &identifier, bool resp = false, bool isRaw = false);
C2Payload(Operation op, bool resp = false, bool isRaw = false);
diff --git a/libminifi/include/c2/C2Protocol.h b/libminifi/include/c2/C2Protocol.h
index 71cdd17..57e36b9 100644
--- a/libminifi/include/c2/C2Protocol.h
+++ b/libminifi/include/c2/C2Protocol.h
@@ -34,7 +34,7 @@ namespace c2 {
class C2Protocol : public core::Connectable {
public:
- C2Protocol(std::string name, utils::Identifier &uuid)
+ C2Protocol(const std::string &name, const utils::Identifier &uuid)
: core::Connectable(name, uuid),
running_(true) {
diff --git a/libminifi/include/c2/PayloadParser.h b/libminifi/include/c2/PayloadParser.h
new file mode 100644
index 0000000..a64eb09
--- /dev/null
+++ b/libminifi/include/c2/PayloadParser.h
@@ -0,0 +1,188 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_PAYLOADPARSER_H_
+#define LIBMINIFI_INCLUDE_C2_PAYLOADPARSER_H_
+
+#include "C2Payload.h"
+#include "core/state/Value.h"
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+class PayloadParseException : public std::runtime_error {
+ public:
+ PayloadParseException(const std::string &msg)
+ : std::runtime_error(msg) {
+ }
+
+};
+
+template<typename T, typename C>
+class convert_if_base {
+ protected:
+ const std::shared_ptr<state::response::Value> node_;
+ explicit convert_if_base(const std::shared_ptr<state::response::Value> &node)
+ : node_(node) {
+ }
+ public:
+ T operator()() const {
+ if (auto sub_type = std::dynamic_pointer_cast<C>(node_))
+ return sub_type->getValue();
+ throw PayloadParseException("No known type");
+ }
+};
+
+template<typename T>
+struct convert_if {
+ explicit convert_if(const std::shared_ptr<state::response::Value> &node) {
+ }
+
+
+ std::string operator()() const {
+ throw PayloadParseException("No known type");
+ }
+};
+
+template<>
+struct convert_if<std::string> : public convert_if_base<std::string, state::response::Value> {
+ explicit convert_if(const std::shared_ptr<state::response::Value> &node)
+ : convert_if_base(node) {
+ }
+
+ std::string operator()() const {
+ return node_->getStringValue();
+ }
+};
+
+template<>
+struct convert_if<uint64_t> : public convert_if_base<uint64_t, state::response::UInt64Value> {
+ explicit convert_if(const std::shared_ptr<state::response::Value> &node)
+ : convert_if_base(node) {
+ }
+};
+
+template<>
+struct convert_if<int64_t> : public convert_if_base<int64_t, state::response::Int64Value> {
+ explicit convert_if(const std::shared_ptr<state::response::Value> &node)
+ : convert_if_base(node) {
+ }
+};
+
+template<>
+struct convert_if<int> : public convert_if_base<int, state::response::IntValue> {
+ explicit convert_if(const std::shared_ptr<state::response::Value> &node)
+ : convert_if_base(node) {
+ }
+};
+
+template<>
+struct convert_if<bool> : public convert_if_base<bool, state::response::BoolValue> {
+ explicit convert_if(const std::shared_ptr<state::response::Value> &node)
+ : convert_if_base(node) {
+ }
+};
+
+/**
+ * Defines a fluent parser that uses Exception management for flow control.
+ *
+ * Note that this isn't functionally complete.
+ */
+class PayloadParser {
+
+ public:
+
+ static PayloadParser getInstance(const C2Payload &payload) {
+ return PayloadParser(payload);
+ }
+
+ inline PayloadParser in(const std::string &payload) {
+ for (const auto &pl : ref_.getNestedPayloads()) {
+ if (pl.getLabel() == payload || pl.getIdentifier() == payload) {
+ return PayloadParser(pl);
+ }
+ }
+ throw PayloadParseException("Invalid payload. Could not find " + payload);
+ }
+
+ template<typename Functor>
+ inline void foreach(Functor f) {
+ for (const auto &component : ref_.getNestedPayloads()) {
+ f(component);
+ }
+ }
+
+ template<typename T>
+ inline T getAs(const std::string &field) {
+ for (const auto &cmd : ref_.getContent()) {
+ auto exists = cmd.operation_arguments.find(field);
+ if (exists != cmd.operation_arguments.end()) {
+ return convert_if<T>(exists->second.getValue())();
+ }
+ }
+ std::stringstream ss;
+ ss << "Invalid Field. Could not find " << field << " in " << ref_.getLabel();
+ throw PayloadParseException(ss.str());
+ }
+
+ template<typename T>
+ inline T getAs(const std::string &field, const T &fallback) {
+ for (const auto &cmd : ref_.getContent()) {
+ auto exists = cmd.operation_arguments.find(field);
+ if (exists != cmd.operation_arguments.end()) {
+ return convert_if<T>(exists->second.getValue())();
+ }
+ }
+ return fallback;
+ }
+
+ size_t getSize() const {
+ return ref_.getNestedPayloads().size();
+ }
+
+ /**
+ * Make these explicitly public.
+ */
+
+ PayloadParser(const PayloadParser &p) = delete;
+ const PayloadParser &operator=(const PayloadParser &p) = delete;
+ PayloadParser(PayloadParser &&parser) = default;
+
+ private:
+
+ PayloadParser(const C2Payload &payload)
+ : ref_(payload) {
+ }
+
+ const C2Payload &ref_;
+
+ std::vector<std::string> fields_;
+
+ std::string component_to_get_;
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_PAYLOADPARSER_H_ */
diff --git a/extensions/mqtt/protocol/PayloadSerializer.h b/libminifi/include/c2/PayloadSerializer.h
similarity index 95%
rename from extensions/mqtt/protocol/PayloadSerializer.h
rename to libminifi/include/c2/PayloadSerializer.h
index fbc5d38..61b7e73 100644
--- a/extensions/mqtt/protocol/PayloadSerializer.h
+++ b/libminifi/include/c2/PayloadSerializer.h
@@ -27,7 +27,6 @@ namespace apache {
namespace nifi {
namespace minifi {
namespace c2 {
-namespace mqtt {
class PayloadSerializer {
public:
@@ -37,6 +36,11 @@ class PayloadSerializer {
*/
static void serializeValueNode(state::response::ValueNode &value, std::shared_ptr<io::BaseStream> stream) {
auto base_type = value.getValue();
+ if (!base_type) {
+ uint8_t type = 0;
+ stream->write(&type, 1);
+ return;
+ }
uint8_t type = 0x00;
if (auto sub_type = std::dynamic_pointer_cast<state::response::IntValue>(base_type)) {
type = 1;
@@ -64,13 +68,13 @@ class PayloadSerializer {
stream->writeUTF(str);
}
}
- static void serialize(uint8_t op, const C2Payload &payload, std::shared_ptr<io::BaseStream> stream) {
+ static void serialize(uint16_t op, const C2Payload &payload, std::shared_ptr<io::BaseStream> stream) {
uint8_t st;
uint32_t size = payload.getNestedPayloads().size();
stream->write(size);
for (auto nested_payload : payload.getNestedPayloads()) {
op = opToInt(nested_payload.getOperation());
- stream->write(&op, 1);
+ stream->write(op);
stream->write(&st, 1);
stream->writeUTF(nested_payload.getLabel());
stream->writeUTF(nested_payload.getIdentifier());
@@ -126,11 +130,13 @@ class PayloadSerializer {
}
return op;
}
- static std::shared_ptr<io::BaseStream> serialize(const C2Payload &payload) {
+ static std::shared_ptr<io::BaseStream> serialize(uint16_t version, const C2Payload &payload) {
std::shared_ptr<io::BaseStream> stream = std::make_shared<io::BaseStream>();
- uint8_t op, st = 0;
+ uint16_t op = 0;
+ uint8_t st = 0;
op = opToInt(payload.getOperation());
- stream->write(&op, 1);
+ stream->write(version);
+ stream->write(op);
if (payload.getStatus().getState() == state::UpdateState::NESTED) {
st = 1;
stream->write(&st, 1);
@@ -242,11 +248,12 @@ class PayloadSerializer {
io::BaseStream stream(&dataStream);
uint8_t op, st = 0;
- ;
+ uint16_t version = 0;
std::string identifier, label;
// read op
stream.read(op);
+ stream.read(version);
stream.read(st);
stream.readUTF(label);
stream.readUTF(identifier);
@@ -308,7 +315,6 @@ class PayloadSerializer {
virtual ~PayloadSerializer();
};
-} /* namespace mqtt */
} /* namespace c2 */
} /* namespace minifi */
} /* namespace nifi */
diff --git a/libminifi/include/c2/protocols/RESTProtocol.h b/libminifi/include/c2/protocols/RESTProtocol.h
index 251622c..6d848ec 100644
--- a/libminifi/include/c2/protocols/RESTProtocol.h
+++ b/libminifi/include/c2/protocols/RESTProtocol.h
@@ -18,6 +18,16 @@
#ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
#define LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
+
+
+#include <stdexcept>
+
+
+#ifdef RAPIDJSON_ASSERT
+#undef RAPIDJSON_ASSERT
+#endif
+#define RAPIDJSON_ASSERT(x) if(!(x)) throw std::logic_error("rapidjson exception"); //NOLINT
+
#include "rapidjson/document.h"
#include "rapidjson/writer.h"
#include "rapidjson/stringbuffer.h"
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 1b6b073..2e1f79e 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -410,7 +410,7 @@ void FlowController::initializeC2() {
}
std::string identifier_str;
- if (!configuration_->get("nifi.c2.agent.identifier", identifier_str) || identifier_str.empty()) {
+ if (!configuration_->get("nifi.c2.agent.identifier", "c2.agent.identifier", identifier_str) || identifier_str.empty()) {
// set to the flow controller's identifier
identifier_str = uuidStr_;
}
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index ea5802f..6362902 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -73,8 +73,16 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
do {
const C2Payload payload(std::move(requests.back()));
requests.pop_back();
- C2Payload && response = protocol_.load()->consumePayload(payload);
- enqueue_c2_server_response(std::move(response));
+ try {
+ C2Payload && response = protocol_.load()->consumePayload(payload);
+ enqueue_c2_server_response(std::move(response));
+ }
+ catch(const std::exception &e) {
+ logger_->log_error("Exception occurred while consuming payload. error: %s", e.what());
+ }
+ catch(...) {
+ logger_->log_error("Unknonwn exception occurred while consuming payload.");
+ }
}while(requests.size() > 0 && ++count < max_c2_responses);
}
request_mutex.unlock();
@@ -82,7 +90,15 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
if ( time_since > heart_beat_period_ ) {
last_run_ = now;
- performHeartBeat();
+ try {
+ performHeartBeat();
+ }
+ catch(const std::exception &e) {
+ logger_->log_error("Exception occurred while performing heartbeat. error: %s", e.what());
+ }
+ catch(...) {
+ logger_->log_error("Unknonwn exception occurred while performing heartbeat.");
+ }
}
checkTriggers();
@@ -416,8 +432,6 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
break;
case Operation::UPDATE: {
handle_update(resp);
- C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
- enqueue_c2_response(std::move(response));
}
break;
@@ -603,7 +617,7 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
unlink(file_path.c_str());
// if we can apply the update, we will acknowledge it and then backup the configuration file.
if (update_sink_->applyUpdate(urlStr, raw_data_str)) {
- C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+ C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::FULLY_APPLIED, resp.ident, false, true);
enqueue_c2_response(std::move(response));
if (persist != resp.operation_arguments.end() && utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true")) {
@@ -647,8 +661,7 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
}
} else {
logger_->log_debug("update failed.");
- std::cout << raw_data_str << std::endl;
- C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+ C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::SET_ERROR, resp.ident, false, true);
enqueue_c2_response(std::move(response));
}
// send
@@ -658,7 +671,7 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
if (update_text != resp.operation_arguments.end()) {
if (update_sink_->applyUpdate(url->second.to_string(), update_text->second.to_string()) != 0 && persist != resp.operation_arguments.end()
&& utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true")) {
- C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+ C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::FULLY_APPLIED, resp.ident, false, true);
enqueue_c2_response(std::move(response));
// update nifi.flow.configuration.file=./conf/config.yml
std::string config_file;
@@ -683,7 +696,7 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
writer.close();
}
} else {
- C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+ C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::SET_ERROR, resp.ident, false, true);
enqueue_c2_response(std::move(response));
}
}
@@ -714,17 +727,19 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
if (resp.operation_arguments.size() > 0)
configure(running_c2_configuration);
- C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+ C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::FULLY_APPLIED, resp.ident, false, true);
enqueue_c2_response(std::move(response));
} else if (resp.name == "agent") {
// we are upgrading the agent. therefore we must be given a location
auto location = resp.operation_arguments.find("location");
auto isPartialStr = resp.operation_arguments.find("partial");
+
bool partial_update = false;
if (isPartialStr != std::end(resp.operation_arguments)) {
partial_update = utils::StringUtils::equalsIgnoreCase(isPartialStr->second.to_string(), "true");
}
if (location != resp.operation_arguments.end()) {
+ logger_->log_trace("Update agent with location %s", location->second.to_string());
// we will not have a raw payload
C2Payload payload(Operation::TRANSFER, false, true);
@@ -734,13 +749,16 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
std::string file_path = std::string(raw_data.data(), raw_data.size());
+ logger_->log_trace("Update requested with file %s", file_path);
+
// acknowledge the transfer. For a transfer, the response identifier should be the checksum of the
// file transferred.
- C2Payload transfer_response(Operation::ACKNOWLEDGE, response.getIdentifier(), false, true);
+ C2Payload transfer_response(Operation::ACKNOWLEDGE, state::UpdateState::FULLY_APPLIED, response.getIdentifier(), false, true);
protocol_.load()->consumePayload(std::move(transfer_response));
if (allow_updates_) {
+ logger_->log_trace("Update allowed from file %s", file_path);
if (partial_update && !bin_location_.empty()) {
utils::file::DiffUtils::apply_binary_diff(bin_location_.c_str(), file_path.c_str(), update_location_.c_str());
} else {
@@ -750,8 +768,16 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
logger_->log_trace("removing file %s", file_path);
unlink(file_path.c_str());
update_agent();
+ } else {
+ logger_->log_trace("Update disallowed from file %s", file_path);
}
+
+ } else {
+ logger_->log_trace("No location present");
}
+ } else {
+ C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::NOT_APPLIED, resp.ident, false, true);
+ enqueue_c2_response(std::move(response));
}
}
diff --git a/libminifi/src/c2/C2Payload.cpp b/libminifi/src/c2/C2Payload.cpp
index 405de73..61487ee 100644
--- a/libminifi/src/c2/C2Payload.cpp
+++ b/libminifi/src/c2/C2Payload.cpp
@@ -75,8 +75,8 @@ C2ContentResponse &C2ContentResponse::operator=(const C2ContentResponse &other)
return *this;
}
-C2Payload::C2Payload(Operation op, std::string identifier, bool resp, bool isRaw)
- : state::Update(state::UpdateStatus(state::UpdateState::INITIATE, 0)),
+C2Payload::C2Payload(Operation op, state::UpdateState state, const std::string &identifier, bool resp, bool isRaw)
+ : state::Update(state::UpdateStatus(state, 0)),
op_(op),
raw_(isRaw),
ident_(identifier),
@@ -85,6 +85,10 @@ C2Payload::C2Payload(Operation op, std::string identifier, bool resp, bool isRaw
is_collapsible_(true) {
}
+C2Payload::C2Payload(Operation op, const std::string &identifier, bool resp, bool isRaw)
+ : C2Payload(op, state::UpdateState::FULLY_APPLIED, identifier, resp, isRaw) {
+}
+
C2Payload::C2Payload(Operation op, bool resp, bool isRaw)
: state::Update(state::UpdateStatus(state::UpdateState::INITIATE, 0)),
op_(op),
diff --git a/extensions/mqtt/protocol/PayloadSerializer.cpp b/libminifi/src/c2/PayloadSerializer.cpp
similarity index 93%
rename from extensions/mqtt/protocol/PayloadSerializer.cpp
rename to libminifi/src/c2/PayloadSerializer.cpp
index 5f62227..107b7d1 100644
--- a/extensions/mqtt/protocol/PayloadSerializer.cpp
+++ b/libminifi/src/c2/PayloadSerializer.cpp
@@ -15,14 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "PayloadSerializer.h"
+#include "c2/PayloadSerializer.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace c2 {
-namespace mqtt {
PayloadSerializer::PayloadSerializer() {
}
@@ -30,7 +29,6 @@ PayloadSerializer::PayloadSerializer() {
PayloadSerializer::~PayloadSerializer() {
}
-} /* namespace mqtt */
} /* namespace c2 */
} /* namespace minifi */
} /* namespace nifi */
diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp
index 297b76d..d081510 100644
--- a/libminifi/src/c2/protocols/RESTProtocol.cpp
+++ b/libminifi/src/c2/protocols/RESTProtocol.cpp
@@ -51,7 +51,7 @@ const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const
identifier = root["identifier"].GetString();
}
if (root["requested_operations"].Size() == 0 && root["requestedOperations"].Size() == 0)
- return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true));
+ return C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true);
@@ -129,7 +129,7 @@ const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const
} catch (...) {
}
#endif
- return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true));
+ return C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
}
void setJsonStr(const std::string& key, const state::response::ValueNode& value, rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) { // NOLINT
diff --git a/libminifi/test/coap-tests/CMakeLists.txt b/libminifi/test/coap-tests/CMakeLists.txt
new file mode 100644
index 0000000..4da8737
--- /dev/null
+++ b/libminifi/test/coap-tests/CMakeLists.txt
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB COAP_INTEGRATION_TESTS "*.cpp")
+SET(COAP_TEST_COUNT 0)
+FOREACH(testfile ${COAP_INTEGRATION_TESTS})
+ get_filename_component(testfilename "${testfile}" NAME_WE)
+ add_executable("${testfilename}" "${testfile}")
+ target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/coap/")
+ createTests("${testfilename}")
+ if (APPLE)
+ target_link_libraries (${testfilename} -Wl,-all_load minifi-coap)
+ else ()
+ target_link_libraries (${testfilename} -Wl,--whole-archive minifi-coap -Wl,--no-whole-archive)
+ endif ()
+ MATH(EXPR COAP_TEST_COUNT "${COAP_TEST_COUNT}+1")
+ add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
+ENDFOREACH()
+message("-- Finished building ${COAP_TEST_COUNT} COAP related test file(s)...")
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 4d7e303..95e14c4 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -35,7 +35,7 @@ class IntegrationBase {
virtual ~IntegrationBase();
- void run(std::string test_file_location);
+ virtual void run(std::string test_file_location);
void setKeyDir(const std::string key_dir) {
this->key_dir = key_dir;
@@ -123,7 +123,6 @@ void IntegrationBase::run(std::string test_file_location) {
shutdownBeforeFlowController();
controller->unload();
-
runAssertions();
cleanup();
diff --git a/libminifi/test/resources/CoapC2VerifyServe.yml b/libminifi/test/resources/CoapC2VerifyServe.yml
new file mode 100644
index 0000000..1be926a
--- /dev/null
+++ b/libminifi/test/resources/CoapC2VerifyServe.yml
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+ name: MiNiFi Flow
+ id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+ - name: invoke
+ id: 2438e3c8-015a-1000-79ca-83af40ec1991
+ class: org.apache.nifi.processors.standard.InvokeHTTP
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 1 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ Properties:
+ HTTP Method: GET
+ Remote URL: http://localhost:9888/geturl
+ - name: LogAttribute
+ id: 2438e3c8-015a-1000-79ca-83af40ec1992
+ class: org.apache.nifi.processors.standard.LogAttribute
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 1 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: response
+ Properties:
+ Log Level: info
+ Log Payload: true
+
+Connections:
+ - name: TransferFilesToRPG
+ id: 2438e3c8-015a-1000-79ca-83af40ec1997
+ source name: invoke
+ source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+ source relationship name: success
+ destination name: LogAttribute
+ destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+ - name: TransferFilesToRPG2
+ id: 2438e3c8-015a-1000-79ca-83af40ec1917
+ source name: LogAttribute
+ source id: 2438e3c8-015a-1000-79ca-83af40ec1992
+ destination name: LogAttribute
+ destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
+ source relationship name: success
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+
+Remote Processing Groups:
+
diff --git a/libminifi/test/unit/PayloadParserTests.cpp b/libminifi/test/unit/PayloadParserTests.cpp
new file mode 100644
index 0000000..9ce6757
--- /dev/null
+++ b/libminifi/test/unit/PayloadParserTests.cpp
@@ -0,0 +1,115 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <vector>
+#include "c2/C2Payload.h"
+#include "c2/PayloadParser.h"
+#include "../TestBase.h"
+
+TEST_CASE("Test Valid Payload", "[tv1]") {
+ std::string ident = "identifier";
+ std::string cheese = "cheese";
+ std::string chips = "chips";
+ minifi::c2::C2Payload payload(minifi::c2::Operation::ACKNOWLEDGE, ident);
+ minifi::c2::C2Payload payload2(minifi::c2::Operation::ACKNOWLEDGE, minifi::state::UpdateState::FULLY_APPLIED, cheese);
+ minifi::c2::C2ContentResponse response(minifi::c2::Operation::ACKNOWLEDGE);
+ response.operation_arguments["type"] = "munster";
+ payload2.addContent(std::move(response));
+ payload.addPayload(std::move(payload2));
+ payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::ACKNOWLEDGE, chips));
+ REQUIRE("munster" == minifi::c2::PayloadParser::getInstance(payload).in("cheese").getAs<std::string>("type"));
+}
+
+TEST_CASE("Test Invalid not found", "[tv2]") {
+ std::string ident = "identifier";
+ std::string cheese = "cheese";
+ std::string chips = "chips";
+ minifi::c2::C2Payload payload(minifi::c2::Operation::ACKNOWLEDGE, ident);
+ minifi::c2::C2Payload payload2(minifi::c2::Operation::ACKNOWLEDGE, cheese);
+ minifi::c2::C2ContentResponse response(minifi::c2::Operation::ACKNOWLEDGE);
+ response.operation_arguments["typeS"] = "munster";
+ payload2.addContent(std::move(response));
+ payload.addPayload(std::move(payload2));
+ payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::ACKNOWLEDGE, chips));
+ REQUIRE_THROWS_AS(minifi::c2::PayloadParser::getInstance(payload).in("cheese").getAs<std::string>("type"), minifi::c2::PayloadParseException);
+}
+
+
+TEST_CASE("Test Invalid coercion", "[tv3]") {
+ std::string ident = "identifier";
+ std::string cheese = "cheese";
+ std::string chips = "chips";
+ minifi::c2::C2Payload payload(minifi::c2::Operation::ACKNOWLEDGE, ident);
+ minifi::c2::C2Payload payload2(minifi::c2::Operation::ACKNOWLEDGE, minifi::state::UpdateState::FULLY_APPLIED, cheese);
+ minifi::c2::C2ContentResponse response(minifi::c2::Operation::ACKNOWLEDGE);
+ response.operation_arguments["type"] = "munster";
+ payload2.addContent(std::move(response));
+ payload.addPayload(std::move(payload2));
+ payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::ACKNOWLEDGE, chips));
+ REQUIRE_THROWS_AS(minifi::c2::PayloadParser::getInstance(payload).in("cheese").getAs<uint64_t>("type"), minifi::c2::PayloadParseException);
+}
+
+TEST_CASE("Test Invalid not there", "[tv4]") {
+ std::string ident = "identifier";
+ std::string cheese = "cheese";
+ std::string chips = "chips";
+ minifi::c2::C2Payload payload(minifi::c2::Operation::ACKNOWLEDGE, ident);
+ minifi::c2::C2Payload payload2(minifi::c2::Operation::ACKNOWLEDGE, minifi::state::UpdateState::FULLY_APPLIED, cheese);
+ minifi::c2::C2ContentResponse response(minifi::c2::Operation::ACKNOWLEDGE);
+ response.operation_arguments["type"] = "munster";
+ payload2.addContent(std::move(response));
+ payload.addPayload(std::move(payload2));
+ payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::ACKNOWLEDGE, chips));
+ REQUIRE_THROWS_AS(minifi::c2::PayloadParser::getInstance(payload).in("cheeses").getAs<uint64_t>("type"), minifi::c2::PayloadParseException);
+}
+
+TEST_CASE("Test typed conversions", "[tv5]") {
+ std::string ident = "identifier";
+ std::string cheese = "cheese";
+ std::string chips = "chips";
+ uint64_t size = 233;
+ bool isvalid = false;
+ minifi::c2::C2Payload payload(minifi::c2::Operation::ACKNOWLEDGE, ident);
+ minifi::c2::C2Payload payload2(minifi::c2::Operation::ACKNOWLEDGE, minifi::state::UpdateState::FULLY_APPLIED, cheese);
+ minifi::c2::C2ContentResponse response(minifi::c2::Operation::ACKNOWLEDGE);
+ response.operation_arguments["type"] = "munster";
+ response.operation_arguments["isvalid"] = isvalid;
+ response.operation_arguments["size"] = size;
+ payload2.addContent(std::move(response));
+ payload.addPayload(std::move(payload2));
+ payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::ACKNOWLEDGE, chips));
+ REQUIRE("munster" == minifi::c2::PayloadParser::getInstance(payload).in("cheese").getAs<std::string>("type"));
+ REQUIRE(233 == minifi::c2::PayloadParser::getInstance(payload).in("cheese").getAs<uint64_t>("size"));
+ REQUIRE(false == minifi::c2::PayloadParser::getInstance(payload).in("cheese").getAs<bool>("isvalid"));
+}
+
+
+TEST_CASE("Test Invalid not there deep", "[tv6]") {
+ std::string ident = "identifier";
+ std::string cheese = "cheese";
+ std::string chips = "chips";
+ minifi::c2::C2Payload payload(minifi::c2::Operation::ACKNOWLEDGE, ident);
+ minifi::c2::C2Payload payload2(minifi::c2::Operation::ACKNOWLEDGE, minifi::state::UpdateState::FULLY_APPLIED, cheese);
+ minifi::c2::C2ContentResponse response(minifi::c2::Operation::ACKNOWLEDGE);
+ response.operation_arguments["type"] = "munster";
+ payload2.addContent(std::move(response));
+ payload.addPayload(std::move(payload2));
+ payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::ACKNOWLEDGE, chips));
+ REQUIRE_THROWS_AS(minifi::c2::PayloadParser::getInstance(payload).in("chips").getAs<uint64_t>("type"), minifi::c2::PayloadParseException);
+}