You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/01/10 19:14:04 UTC
[3/4] nifi-minifi-cpp git commit: MINIFICPP-364: resolve issues with
test extension. This will cause issues with bootstrapping. Also resolve
linkage between civet and curl extensions
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civet_curl_tests/include/sitetositehttp/HTTPHandlers.h
----------------------------------------------------------------------
diff --git a/extensions/civet_curl_tests/include/sitetositehttp/HTTPHandlers.h b/extensions/civet_curl_tests/include/sitetositehttp/HTTPHandlers.h
deleted file mode 100644
index d188df3..0000000
--- a/extensions/civet_curl_tests/include/sitetositehttp/HTTPHandlers.h
+++ /dev/null
@@ -1,320 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "civetweb.h"
-#include "CivetServer.h"
-#include "concurrentqueue.h"
-#include "io/CivetStream.h"
-#include "io/CRCStream.h"
-#ifndef LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_
-#define LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_
-static std::atomic<int> transaction_id;
-static std::atomic<int> transaction_id_output;
-
-class FlowObj {
- public:
- FlowObj()
- : total_size(0) {
-
- }
- explicit FlowObj(const FlowObj &&other)
- : total_size(std::move(other.total_size)),
- attributes(std::move(other.attributes)),
- data(std::move(other.data)) {
-
- }
- uint64_t total_size;
- std::map<std::string, std::string> attributes;
- std::vector<uint8_t> data;
-
-};
-
-class SiteToSiteLocationResponder : public CivetHandler {
- public:
- explicit SiteToSiteLocationResponder(bool isSecure)
- : isSecure(isSecure) {
- }
- bool handleGet(CivetServer *server, struct mg_connection *conn) {
- std::string site2site_rest_resp = "{"
- "\"revision\": {"
- "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\""
- "},"
- "\"controller\": {"
- "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\","
- "\"name\": \"NiFi Flow\","
- "\"siteToSiteSecure\": ";
- site2site_rest_resp += (isSecure ? "true" : "false");
- site2site_rest_resp += "}}";
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- site2site_rest_resp.length());
- mg_printf(conn, "%s", site2site_rest_resp.c_str());
- return true;
- }
-
- protected:
- bool isSecure;
-};
-
-class PeerResponder : public CivetHandler {
- public:
-
- explicit PeerResponder(const std::string base_url)
- : base_url(base_url) {
- }
-
- bool handleGet(CivetServer *server, struct mg_connection *conn) {
- std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": \"localhost\", \"port\": 8082, \"secure\": false, \"flowFileCount\" : 0 }] }";
- std::stringstream headers;
- headers << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n";
- mg_printf(conn, "%s", headers.str().c_str());
- mg_printf(conn, "%s", site2site_rest_resp.c_str());
- return true;
- }
-
- protected:
- std::string base_url;
-};
-
-class TransactionResponder : public CivetHandler {
- public:
-
- explicit TransactionResponder(const std::string base_url, std::string port_id, bool input_port, bool wrong_uri, bool empty_transaction_uri)
- : base_url(base_url),
- wrong_uri(wrong_uri),
- empty_transaction_uri(empty_transaction_uri),
- input_port(input_port),
- port_id(port_id),
- flow_files_feed_(nullptr) {
-
- if (input_port) {
- transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de96";
- transaction_id_str += std::to_string(transaction_id.load());
- transaction_id++;
- } else {
- transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de95";
- transaction_id_str += std::to_string(transaction_id_output.load());
- transaction_id_output++;
- }
- }
-
- bool handlePost(CivetServer *server, struct mg_connection *conn) {
- std::string site2site_rest_resp = "";
- std::stringstream headers;
- headers << "HTTP/1.1 201 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nx-location-uri-intent: ";
- if (wrong_uri)
- headers << "ohstuff\r\n";
- else
- headers << "transaction-url\r\n";
-
- std::string port_type;
-
- if (input_port)
- port_type = "input-ports";
- else
- port_type = "output-ports";
- if (!empty_transaction_uri)
- headers << "Location: " << base_url << "/site-to-site/" << port_type << "/" << port_id << "/transactions/" << transaction_id_str << "\r\n";
- headers << "Connection: close\r\n\r\n";
- mg_printf(conn, "%s", headers.str().c_str());
- mg_printf(conn, "%s", site2site_rest_resp.c_str());
- return true;
- }
-
- void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) {
- flow_files_feed_ = feed;
- }
-
- std::string getTransactionId() {
- return transaction_id_str;
- }
- protected:
- std::string base_url;
- std::string transaction_id_str;
- bool wrong_uri;
- bool empty_transaction_uri;
- bool input_port;
- std::string port_id;
- moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
-};
-
-class FlowFileResponder : public CivetHandler {
- public:
-
- explicit FlowFileResponder(bool input_port, bool wrong_uri, bool invalid_checksum)
- : wrong_uri(wrong_uri),
- input_port(input_port),
- invalid_checksum(invalid_checksum),
- flow_files_feed_(nullptr) {
- }
-
- moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *getFlows() {
- return &flow_files_;
- }
-
- void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) {
- flow_files_feed_ = feed;
- }
-
- bool handlePost(CivetServer *server, struct mg_connection *conn) {
- std::string site2site_rest_resp = "";
- std::stringstream headers;
-
- if (!wrong_uri) {
- minifi::io::CivetStream civet_stream(conn);
- minifi::io::CRCStream<minifi::io::CivetStream> stream(&civet_stream);
- uint32_t num_attributes;
- uint64_t total_size = 0;
- total_size += stream.read(num_attributes);
-
- auto flow = std::make_shared<FlowObj>();
-
- for (int i = 0; i < num_attributes; i++) {
- std::string name, value;
- total_size += stream.readUTF(name, true);
- total_size += stream.readUTF(value, true);
- flow->attributes[name] = value;
- }
- uint64_t length;
- total_size += stream.read(length);
-
- total_size += length;
- flow->data.resize(length);
- flow->total_size = total_size;
-
- assert(stream.readData(flow->data.data(), length) == length);
-
- assert(flow->attributes["path"] == ".");
- assert(!flow->attributes["uuid"].empty());
- assert(!flow->attributes["filename"].empty());
-
- if (!invalid_checksum) {
- site2site_rest_resp = std::to_string(stream.getCRC());
- flow_files_.enqueue(flow);
- } else {
- site2site_rest_resp = "Imawrongchecksumshortandstout";
- }
-
- headers << "HTTP/1.1 202 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n";
- } else {
- headers << "HTTP/1.1 404\r\nConnection: close\r\n\r\n";
- }
-
- mg_printf(conn, "%s", headers.str().c_str());
- mg_printf(conn, "%s", site2site_rest_resp.c_str());
- return true;
- }
-
- bool handleGet(CivetServer *server, struct mg_connection *conn) {
-
- if (flow_files_feed_->size_approx() > 0) {
- std::shared_ptr<FlowObj> flow;
- uint8_t buf[1];
- std::vector<std::shared_ptr<FlowObj>> flows;
- uint64_t total = 0;
-
- while (flow_files_feed_->try_dequeue(flow)) {
- flows.push_back(flow);
- total += flow->total_size;
- }
- mg_printf(conn, "HTTP/1.1 200 OK\r\n"
- "Content-Length: %llu\r\n"
- "Content-Type: application/octet-stream\r\n"
- "Connection: close\r\n\r\n",
- total);
- minifi::io::BaseStream serializer;
- minifi::io::CRCStream<minifi::io::BaseStream> stream(&serializer);
- for (auto flow : flows) {
- uint32_t num_attributes = flow->attributes.size();
- stream.write(num_attributes);
- for (auto entry : flow->attributes) {
- stream.writeUTF(entry.first);
- stream.writeUTF(entry.second);
- }
- uint64_t length = flow->data.size();
- stream.write(length);
- stream.writeData(flow->data.data(), length);
- }
- auto ret = mg_write(conn, serializer.getBuffer(), total);
- } else {
- std::cout << "Nothing to transfer feed" << std::endl;
- mg_printf(conn, "HTTP/1.1 200 OK\r\nConnection: "
- "close\r\nContent-Length: 0\r\n");
- mg_printf(conn, "Content-Type: text/plain\r\n\r\n");
-
- }
- return true;
- }
-
- void setFlowUrl(std::string flowUrl) {
- base_url = flowUrl;
- }
-
- protected:
- // base url
- std::string base_url;
- // set the wrong url
- bool wrong_uri;
- // we are running an input port
- bool input_port;
- // invalid checksum is returned.
- bool invalid_checksum;
- moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> flow_files_;
- moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
-};
-
-class DeleteTransactionResponder : public CivetHandler {
- public:
-
- explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, int expected_resp_code)
- : flow_files_feed_(nullptr),
- base_url(base_url),
- response_code(response_code) {
- expected_resp_code_str = std::to_string(expected_resp_code);
- }
-
- explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed)
- : flow_files_feed_(feed),
- base_url(base_url),
- response_code(response_code) {
- }
-
- bool handleDelete(CivetServer *server, struct mg_connection *conn) {
-
- std::string site2site_rest_resp = "";
- std::stringstream headers;
- std::string resp;
- CivetServer::getParam(conn, "responseCode", resp);
- headers << "HTTP/1.1 " << response_code << "\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\n";
- headers << "Connection: close\r\n\r\n";
- mg_printf(conn, "%s", headers.str().c_str());
- mg_printf(conn, "%s", site2site_rest_resp.c_str());
- return true;
- }
-
- void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) {
- flow_files_feed_ = feed;
- }
-
- protected:
- moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
- std::string base_url;
- std::string expected_resp_code_str;
- std::string response_code;
-};
-
-#endif /* LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civet_curl_tests/unit/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/civet_curl_tests/unit/CMakeLists.txt b/extensions/civet_curl_tests/unit/CMakeLists.txt
deleted file mode 100644
index b645da1..0000000
--- a/extensions/civet_curl_tests/unit/CMakeLists.txt
+++ /dev/null
@@ -1,76 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-file(GLOB CURL_UNIT_TESTS "unit/*.cpp")
-file(GLOB CURL_INTEGRATION_TESTS "*.cpp")
-
-SET(CURL_INT_TEST_COUNT 0)
-
-FOREACH(testfile ${CURL_UNIT_TESTS})
- get_filename_component(testfilename "${testfile}" NAME_WE)
- add_executable("${testfilename}" "${testfile}")
- target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS})
- target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/")
- target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/client/")
- target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/processors/")
- target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/protocols/")
- target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/sitetosite/")
- target_link_libraries(${testfilename} ${CURL_LIBRARIES} )
- createTests("${testfilename}")
- target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
- if (APPLE)
- target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl )
- else ()
- target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive)
- endif()
- MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1")
-ENDFOREACH()
-
-FOREACH(testfile ${CURL_INTEGRATION_TESTS})
- get_filename_component(testfilename "${testfile}" NAME_WE)
- add_executable("${testfilename}" "${testfile}")
- target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS})
- target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/")
- target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/client/")
- target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/processors/")
- target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/protocols/")
- target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/sitetosite/")
- target_link_libraries(${testfilename} ${CURL_LIBRARIES} )
- createTests("${testfilename}")
- if (APPLE)
- target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl )
- else ()
- target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive)
- endif()
- MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1")
-ENDFOREACH()
-
-message("-- Finished building ${CURL_INT_TEST_COUNT} libcURL integration test file(s)...")
-
-add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
-add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
-add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml" "${TEST_RESOURCES}/")
-add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml" "${TEST_RESOURCES}/")
-add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" "${TEST_RESOURCES}/")
-add_test(NAME HttpPostIntegrationTestChunked COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/")
-add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/C2VerifyServeResults.yml" "${TEST_RESOURCES}/")
-add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/C2VerifyHeartbeatAndStop.yml" "${TEST_RESOURCES}/")
-add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8077/nifi-api/site-to-site")
-add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/")
-add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/ThreadPoolAdjust.yml" "${TEST_RESOURCES}/")
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civet_curl_tests/unit/InvokeHTTPTests.cpp
----------------------------------------------------------------------
diff --git a/extensions/civet_curl_tests/unit/InvokeHTTPTests.cpp b/extensions/civet_curl_tests/unit/InvokeHTTPTests.cpp
deleted file mode 100644
index 81d2714..0000000
--- a/extensions/civet_curl_tests/unit/InvokeHTTPTests.cpp
+++ /dev/null
@@ -1,315 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <uuid/uuid.h>
-#include <fstream>
-#include <map>
-#include <memory>
-#include <utility>
-#include <string>
-#include <set>
-#include "FlowController.h"
-#include "io/BaseStream.h"
-#include "TestBase.h"
-#include "processors/GetFile.h"
-#include "core/Core.h"
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
-#include "processors/ListenHTTP.h"
-#include "core/FlowFile.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "core/Processor.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/ProcessorNode.h"
-#include "processors/InvokeHTTP.h"
-#include "processors/ListenHTTP.h"
-#include "processors/LogAttribute.h"
-
-TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
- TestController testController;
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>();
-
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<core::Processor> getfileprocessor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
-
- std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
- char format[] = "/tmp/gt.XXXXXX";
- char *dir = testController.createTempDirectory(format);
-
- std::shared_ptr<core::Processor> listenhttp = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp");
-
- std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp");
- uuid_t processoruuid;
- REQUIRE(true == listenhttp->getUUID(processoruuid));
-
- uuid_t invokehttp_uuid;
- REQUIRE(true == invokehttp->getUUID(invokehttp_uuid));
-
- std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
- gcConnection->setRelationship(core::Relationship("success", "description"));
-
- std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
- laConnection->setRelationship(core::Relationship("success", "description"));
-
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
- connection->setRelationship(core::Relationship("success", "description"));
-
- std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp");
-
- connection2->setRelationship(core::Relationship("No Retry", "description"));
-
- // link the connections so that we can test results at the end for this
- connection->setSource(listenhttp);
-
- connection2->setSourceUUID(invokehttp_uuid);
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(invokehttp_uuid);
-
- listenhttp->addConnection(connection);
- invokehttp->addConnection(connection);
- invokehttp->addConnection(connection2);
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(listenhttp);
- std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(invokehttp);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
- std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo);
- context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8686");
- context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest");
-
- context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST");
- context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8686/testytesttest");
- auto session = std::make_shared<core::ProcessSession>(context);
- auto session2 = std::make_shared<core::ProcessSession>(context2);
-
- REQUIRE(listenhttp->getName() == "listenhttp");
-
- std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
-
- std::shared_ptr<core::FlowFile> record;
- listenhttp->setScheduledState(core::ScheduledState::RUNNING);
- listenhttp->onSchedule(context, factory);
- listenhttp->onTrigger(context, session);
-
- invokehttp->incrementActiveTasks();
- invokehttp->setScheduledState(core::ScheduledState::RUNNING);
- std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2);
- invokehttp->onSchedule(context2, factory2);
- invokehttp->onTrigger(context2, session2);
-
- provenance::ProvenanceReporter *reporter = session->getProvenanceReporter();
- std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
- record = session->get();
- REQUIRE(record == nullptr);
- REQUIRE(records.size() == 0);
-
- listenhttp->incrementActiveTasks();
- listenhttp->setScheduledState(core::ScheduledState::RUNNING);
- listenhttp->onTrigger(context, session);
-
- reporter = session->getProvenanceReporter();
-
- records = reporter->getEvents();
- session->commit();
-
- invokehttp->incrementActiveTasks();
- invokehttp->setScheduledState(core::ScheduledState::RUNNING);
- invokehttp->onTrigger(context2, session2);
-
- session2->commit();
- records = reporter->getEvents();
-
- for (provenance::ProvenanceEventRecord *provEventRecord : records) {
- REQUIRE(provEventRecord->getComponentType() == listenhttp->getName());
- }
- std::shared_ptr<core::FlowFile> ffr = session2->get();
- REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST"));
- LogTestController::getInstance().reset();
-}
-
-class CallBack : public minifi::OutputStreamCallback {
- public:
- CallBack() {
- }
- virtual ~CallBack() {
- }
- virtual int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) {
- // leaving the typo for posterity sake
- std::string st = "we're gnna write some test stuff";
- return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(st.c_str())), st.length());
- }
-};
-
-TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
- TestController testController;
- LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>();
-
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<core::Processor> getfileprocessor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
-
- std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
- char format[] = "/tmp/gt.XXXXXX";
- char *dir = testController.createTempDirectory(format);
-
- std::shared_ptr<core::Processor> listenhttp = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp");
-
- std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp");
- uuid_t processoruuid;
- REQUIRE(true == listenhttp->getUUID(processoruuid));
-
- uuid_t invokehttp_uuid;
- REQUIRE(true == invokehttp->getUUID(invokehttp_uuid));
-
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-
- std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
- gcConnection->setRelationship(core::Relationship("success", "description"));
-
- std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
- laConnection->setRelationship(core::Relationship("success", "description"));
-
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
- connection->setRelationship(core::Relationship("success", "description"));
-
- std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp");
-
- connection2->setRelationship(core::Relationship("No Retry", "description"));
-
- // link the connections so that we can test results at the end for this
- connection->setSource(listenhttp);
-
- connection->setSourceUUID(invokehttp_uuid);
- connection->setDestinationUUID(processoruuid);
-
- connection2->setSourceUUID(processoruuid);
- connection2->setSourceUUID(processoruuid);
-
- listenhttp->addConnection(connection);
- invokehttp->addConnection(connection);
- invokehttp->addConnection(connection2);
-
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(listenhttp);
- std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(invokehttp);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
- std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo);
- context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8680");
- context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest");
-
- context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST");
- context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8680/testytesttest");
- auto session = std::make_shared<core::ProcessSession>(context);
- auto session2 = std::make_shared<core::ProcessSession>(context2);
-
- REQUIRE(listenhttp->getName() == "listenhttp");
-
- std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
-
- std::shared_ptr<core::FlowFile> record;
-
- CallBack callback;
-
- std::map<std::string, std::string> attributes;
- attributes["testy"] = "test";
- std::shared_ptr<minifi::FlowFileRecord> flow = std::make_shared<minifi::FlowFileRecord>(repo, content_repo, attributes);
- session2->write(flow, &callback);
-
- invokehttp->incrementActiveTasks();
- invokehttp->setScheduledState(core::ScheduledState::RUNNING);
- std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2);
- invokehttp->onSchedule(context2, factory2);
- invokehttp->onTrigger(context2, session2);
-
- listenhttp->incrementActiveTasks();
- listenhttp->setScheduledState(core::ScheduledState::RUNNING);
- listenhttp->onSchedule(context, factory);
- listenhttp->onTrigger(context, session);
-
- provenance::ProvenanceReporter *reporter = session->getProvenanceReporter();
- std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
- record = session->get();
- REQUIRE(record == nullptr);
- REQUIRE(records.size() == 0);
-
- listenhttp->incrementActiveTasks();
- listenhttp->setScheduledState(core::ScheduledState::RUNNING);
- listenhttp->onTrigger(context, session);
-
- reporter = session->getProvenanceReporter();
-
- records = reporter->getEvents();
- session->commit();
-
- invokehttp->incrementActiveTasks();
- invokehttp->setScheduledState(core::ScheduledState::RUNNING);
- invokehttp->onTrigger(context2, session2);
-
- session2->commit();
- records = reporter->getEvents();
-
- for (provenance::ProvenanceEventRecord *provEventRecord : records) {
- REQUIRE(provEventRecord->getComponentType() == listenhttp->getName());
- }
- std::shared_ptr<core::FlowFile> ffr = session2->get();
- REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST"));
- LogTestController::getInstance().reset();
-}
-
-TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
- TestController testController;
- LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>();
- LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::ListenHTTP>();
- LogTestController::getInstance().setInfo<core::Processor>();
-
- std::shared_ptr<TestPlan> plan = testController.createPlan();
- std::shared_ptr<core::Processor> processor = plan->addProcessor("ListenHTTP", "listenhttp", core::Relationship("No Retry", "description"), false);
- std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true);
-
- REQUIRE(true == plan->setProperty(processor, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8685"));
- REQUIRE(true == plan->setProperty(processor, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "/testytesttest"));
-
- REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::Method.getName(), "POST"));
- REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::URL.getName(), "http://localhost:8685/testytesttest"));
- plan->reset();
- testController.runSession(plan, true);
-
- std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords();
- std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
- REQUIRE(record == nullptr);
- REQUIRE(records.size() == 0);
-
- plan->reset();
- testController.runSession(plan, true);
-
- records = plan->getProvenanceRecords();
- record = plan->getCurrentFlowFile();
-
- for (provenance::ProvenanceEventRecord *provEventRecord : records) {
- REQUIRE(provEventRecord->getComponentType() == processor->getName());
- }
- std::shared_ptr<core::FlowFile> ffr = plan->getCurrentFlowFile();
- REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST"));
- LogTestController::getInstance().reset();
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/civetweb/CMakeLists.txt b/extensions/civetweb/CMakeLists.txt
index d31baad..470a231 100644
--- a/extensions/civetweb/CMakeLists.txt
+++ b/extensions/civetweb/CMakeLists.txt
@@ -35,7 +35,7 @@ add_subdirectory(${CIVET_THIRDPARTY_ROOT}
${CIVET_BINARY_ROOT}
EXCLUDE_FROM_ALL)
-file(GLOB SOURCES "*.cpp")
+file(GLOB SOURCES "processors/*.cpp")
add_library(minifi-civet-extensions STATIC ${SOURCES})
set_property(TARGET minifi-civet-extensions PROPERTY POSITION_INDEPENDENT_CODE ON)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/CivetLoader.cpp
----------------------------------------------------------------------
diff --git a/extensions/civetweb/CivetLoader.cpp b/extensions/civetweb/CivetLoader.cpp
deleted file mode 100644
index c593bb7..0000000
--- a/extensions/civetweb/CivetLoader.cpp
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "CivetLoader.h"
-#include "core/FlowConfiguration.h"
-
-bool CivetFactory::added = core::FlowConfiguration::add_static_func("createCivetFactory");
-
-extern "C" {
-
-void *createCivetFactory(void) {
- return new CivetFactory();
-}
-
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/extensions/civetweb/ListenHTTP.cpp b/extensions/civetweb/ListenHTTP.cpp
deleted file mode 100644
index 62f8194..0000000
--- a/extensions/civetweb/ListenHTTP.cpp
+++ /dev/null
@@ -1,333 +0,0 @@
-/**
- * @file ListenHTTP.cpp
-
- * ListenHTTP class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "processors/ListenHTTP.h"
-#include <uuid/uuid.h>
-#include <CivetServer.h>
-#include <stdio.h>
-#include <sstream>
-#include <utility>
-#include <memory>
-#include <string>
-#include <iostream>
-#include <fstream>
-#include <set>
-#include <vector>
-#include "utils/TimeUtil.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/ProcessSessionFactory.h"
-#include "core/logging/LoggerConfiguration.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-core::Property ListenHTTP::BasePath("Base Path", "Base path for incoming connections", "contentListener");
-core::Property ListenHTTP::Port("Listening Port", "The Port to listen on for incoming connections", "");
-core::Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming"
- " connections. If the Pattern does not match the DN, the connection will be refused.",
- ".*");
-core::Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing PEM-formatted file including TLS/SSL certificate and key", "");
-core::Property ListenHTTP::SSLCertificateAuthority("SSL Certificate Authority", "File containing trusted PEM-formatted certificates", "");
-core::Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the client's certificate (yes/no)", "no");
-core::Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2");
-core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", "Specifies the Regular Expression that determines the names of HTTP Headers that"
- " should be passed along as FlowFile attributes",
- "");
-
-core::Relationship ListenHTTP::Success("success", "All files are routed to success");
-
-void ListenHTTP::initialize() {
- logger_->log_info("Initializing ListenHTTP");
-
- // Set the supported properties
- std::set<core::Property> properties;
- properties.insert(BasePath);
- properties.insert(Port);
- properties.insert(AuthorizedDNPattern);
- properties.insert(SSLCertificate);
- properties.insert(SSLCertificateAuthority);
- properties.insert(SSLVerifyPeer);
- properties.insert(SSLMinimumVersion);
- properties.insert(HeadersAsAttributesRegex);
- setSupportedProperties(properties);
- // Set the supported relationships
- std::set<core::Relationship> relationships;
- relationships.insert(Success);
- setSupportedRelationships(relationships);
-}
-
-void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
- std::string basePath;
-
- if (!context->getProperty(BasePath.getName(), basePath)) {
- logger_->log_info("%s attribute is missing, so default value of %s will be used", BasePath.getName().c_str(), BasePath.getValue().c_str());
- basePath = BasePath.getValue();
- }
-
- basePath.insert(0, "/");
-
- std::string listeningPort;
-
- if (!context->getProperty(Port.getName(), listeningPort)) {
- logger_->log_error("%s attribute is missing or invalid", Port.getName().c_str());
- return;
- }
-
- std::string authDNPattern;
-
- if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) && !authDNPattern.empty()) {
- logger_->log_info("ListenHTTP using %s: %s", AuthorizedDNPattern.getName().c_str(), authDNPattern.c_str());
- }
-
- std::string sslCertFile;
-
- if (context->getProperty(SSLCertificate.getName(), sslCertFile) && !sslCertFile.empty()) {
- logger_->log_info("ListenHTTP using %s: %s", SSLCertificate.getName().c_str(), sslCertFile.c_str());
- }
-
- // Read further TLS/SSL options only if TLS/SSL usage is implied by virtue of certificate value being set
- std::string sslCertAuthorityFile;
- std::string sslVerifyPeer;
- std::string sslMinVer;
-
- if (!sslCertFile.empty()) {
- if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile) && !sslCertAuthorityFile.empty()) {
- logger_->log_info("ListenHTTP using %s: %s", SSLCertificateAuthority.getName().c_str(), sslCertAuthorityFile.c_str());
- }
-
- if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer)) {
- if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) {
- logger_->log_info("ListenHTTP will not verify peers");
- } else {
- logger_->log_info("ListenHTTP will verify peers");
- }
- } else {
- logger_->log_info("ListenHTTP will not verify peers");
- }
-
- if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer)) {
- logger_->log_info("ListenHTTP using %s: %s", SSLMinimumVersion.getName().c_str(), sslMinVer.c_str());
- }
- }
-
- std::string headersAsAttributesPattern;
-
- if (context->getProperty(HeadersAsAttributesRegex.getName(), headersAsAttributesPattern) && !headersAsAttributesPattern.empty()) {
- logger_->log_info("ListenHTTP using %s: %s", HeadersAsAttributesRegex.getName().c_str(), headersAsAttributesPattern.c_str());
- }
-
- auto numThreads = getMaxConcurrentTasks();
-
- logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads", listeningPort.c_str(), basePath.c_str(), numThreads);
-
- // Initialize web server
- std::vector<std::string> options;
- options.push_back("enable_keep_alive");
- options.push_back("yes");
- options.push_back("keep_alive_timeout_ms");
- options.push_back("15000");
- options.push_back("num_threads");
- options.push_back(std::to_string(numThreads));
-
- if (sslCertFile.empty()) {
- options.push_back("listening_ports");
- options.push_back(listeningPort);
- } else {
- listeningPort += "s";
- options.push_back("listening_ports");
- options.push_back(listeningPort);
-
- options.push_back("ssl_certificate");
- options.push_back(sslCertFile);
-
- if (!sslCertAuthorityFile.empty()) {
- options.push_back("ssl_ca_file");
- options.push_back(sslCertAuthorityFile);
- }
-
- if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) {
- options.push_back("ssl_verify_peer");
- options.push_back("no");
- } else {
- options.push_back("ssl_verify_peer");
- options.push_back("yes");
- }
-
- if (sslMinVer.compare("SSL2") == 0) {
- options.push_back("ssl_protocol_version");
- options.push_back(std::to_string(0));
- } else if (sslMinVer.compare("SSL3") == 0) {
- options.push_back("ssl_protocol_version");
- options.push_back(std::to_string(1));
- } else if (sslMinVer.compare("TLS1.0") == 0) {
- options.push_back("ssl_protocol_version");
- options.push_back(std::to_string(2));
- } else if (sslMinVer.compare("TLS1.1") == 0) {
- options.push_back("ssl_protocol_version");
- options.push_back(std::to_string(3));
- } else {
- options.push_back("ssl_protocol_version");
- options.push_back(std::to_string(4));
- }
- }
-
- _server.reset(new CivetServer(options));
- _handler.reset(new Handler(context, sessionFactory, std::move(authDNPattern), std::move(headersAsAttributesPattern)));
- _server->addHandler(basePath, _handler.get());
-}
-
-ListenHTTP::~ListenHTTP() {
-}
-
-void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
-
- // Do nothing if there are no incoming files
- if (!flowFile) {
- return;
- }
-}
-
-ListenHTTP::Handler::Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern)
- : _authDNRegex(std::move(authDNPattern)),
- _headersAsAttributesRegex(std::move(headersAsAttributesPattern)),
- logger_(logging::LoggerFactory<ListenHTTP::Handler>::getLogger()) {
- _processContext = context;
- _processSessionFactory = sessionFactory;
-}
-
-void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) {
- mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"
- "Content-Type: text/html\r\n"
- "Content-Length: 0\r\n\r\n");
-}
-
-bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn) {
- auto req_info = mg_get_request_info(conn);
- logger_->log_info("ListenHTTP handling POST request of length %ll", req_info->content_length);
-
- // If this is a two-way TLS connection, authorize the peer against the configured pattern
- if (req_info->is_ssl && req_info->client_cert != nullptr) {
- if (!std::regex_match(req_info->client_cert->subject, _authDNRegex)) {
- mg_printf(conn, "HTTP/1.1 403 Forbidden\r\n"
- "Content-Type: text/html\r\n"
- "Content-Length: 0\r\n\r\n");
- logger_->log_warn("ListenHTTP client DN not authorized: %s", req_info->client_cert->subject);
- return true;
- }
- }
-
- // Always send 100 Continue, as allowed per standard to minimize client delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html)
- mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n");
-
- auto session = _processSessionFactory->createSession();
- ListenHTTP::WriteCallback callback(conn, req_info);
- auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
-
- if (!flowFile) {
- sendErrorResponse(conn);
- return true;
- }
-
- try {
- session->write(flowFile, &callback);
-
- // Add filename from "filename" header value (and pattern headers)
- for (int i = 0; i < req_info->num_headers; i++) {
- auto header = &req_info->http_headers[i];
-
- if (strcmp("filename", header->name) == 0) {
- if (!flowFile->updateAttribute("filename", header->value)) {
- flowFile->addAttribute("filename", header->value);
- }
- } else if (std::regex_match(header->name, _headersAsAttributesRegex)) {
- if (!flowFile->updateAttribute(header->name, header->value)) {
- flowFile->addAttribute(header->name, header->value);
- }
- }
- }
-
- session->transfer(flowFile, Success);
- session->commit();
- } catch (std::exception &exception) {
- logger_->log_debug("ListenHTTP Caught Exception %s", exception.what());
- sendErrorResponse(conn);
- session->rollback();
- throw;
- } catch (...) {
- logger_->log_debug("ListenHTTP Caught Exception Processor::onTrigger");
- sendErrorResponse(conn);
- session->rollback();
- throw;
- }
-
- mg_printf(conn, "HTTP/1.1 200 OK\r\n"
- "Content-Type: text/html\r\n"
- "Content-Length: 0\r\n\r\n");
-
- return true;
-}
-
-ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo)
- : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) {
- _conn = conn;
- _reqInfo = reqInfo;
-}
-
-int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
- int64_t rlen;
- int64_t nlen = 0;
- int64_t tlen = _reqInfo->content_length;
- uint8_t buf[16384];
-
- // if we have no content length we should call mg_read until
- // there is no data left from the stream to be HTTP/1.1 compliant
- while (tlen == -1 || nlen < tlen) {
- rlen = tlen == -1 ? sizeof(buf) : tlen - nlen;
-
- if (rlen > (int64_t)sizeof(buf)) {
- rlen = (int64_t)sizeof(buf);
- }
-
- // Read a buffer of data from client
- rlen = mg_read(_conn, &buf[0], (size_t) rlen);
-
- if (rlen <= 0) {
- break;
- }
-
- // Transfer buffer data to the output stream
- stream->write(&buf[0], rlen);
-
- nlen += rlen;
- }
-
- return nlen;
-}
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/RESTReceiver.cpp
----------------------------------------------------------------------
diff --git a/extensions/civetweb/RESTReceiver.cpp b/extensions/civetweb/RESTReceiver.cpp
deleted file mode 100644
index 1f015ad..0000000
--- a/extensions/civetweb/RESTReceiver.cpp
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "c2/protocols/RESTReceiver.h"
-#include <algorithm>
-#include <memory>
-#include <utility>
-#include <map>
-#include <string>
-#include <vector>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-int log_message(const struct mg_connection *conn, const char *message) {
- puts(message);
- return 1;
-}
-
-int ssl_protocol_en(void *ssl_context, void *user_data) {
- return 0;
-}
-
-RESTReceiver::RESTReceiver(std::string name, uuid_t uuid)
- : HeartBeatReporter(name, uuid),
- logger_(logging::LoggerFactory<RESTReceiver>::getLogger()) {
-}
-
-void RESTReceiver::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
- HeartBeatReporter::initialize(controller, configure);
- logger_->log_debug("Initializing rest receiveer");
- if (nullptr != configuration_) {
- std::string listeningPort, rootUri, caCert;
- configuration_->get("c2.rest.listener.port", listeningPort);
- configuration_->get("c2.rest.listener.heartbeat.rooturi", rootUri);
- configuration_->get("c2.rest.listener.cacert", caCert);
-
- if (!listeningPort.empty() && !rootUri.empty()) {
- handler = std::unique_ptr<ListeningProtocol>(new ListeningProtocol());
- if (!caCert.empty()) {
- listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()), caCert);
- } else {
- listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()));
- }
- }
- }
-}
-int16_t RESTReceiver::heartbeat(const C2Payload &payload) {
- std::string operation_request_str = getOperation(payload);
- std::string outputConfig;
- Json::Value json_payload;
- json_payload["operation"] = operation_request_str;
- if (payload.getIdentifier().length() > 0) {
- json_payload["operationid"] = payload.getIdentifier();
- }
- const std::vector<C2ContentResponse> &content = payload.getContent();
-
- for (const auto &payload_content : content) {
- Json::Value payload_content_values;
- bool use_sub_option = true;
- if (payload_content.op == payload.getOperation()) {
- for (auto content : payload_content.operation_arguments) {
- if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
- json_payload[payload_content.name] = content.second;
- use_sub_option = false;
- } else {
- payload_content_values[content.first] = content.second;
- }
- }
- }
- if (use_sub_option)
- json_payload[payload_content.name] = payload_content_values;
- }
-
- for (const auto &nested_payload : payload.getNestedPayloads()) {
- json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload);
- }
-
- Json::StyledWriter writer;
- outputConfig = writer.write(json_payload);
- if (handler != nullptr) {
- logger_->log_debug("Setting %s", outputConfig);
- handler->setResponse(outputConfig);
- }
-
- return 0;
-}
-
-std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert) {
- struct mg_callbacks callback;
-
- memset(&callback, 0, sizeof(callback));
- callback.init_ssl = ssl_protocol_en;
- std::string my_port = port;
- my_port += "s";
- callback.log_message = log_message;
- const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", "ALL",
- "ssl_verify_peer", "no", "num_threads", "1", 0 };
-
- std::vector<std::string> cpp_options;
- for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
- cpp_options.push_back(options[i]);
- }
- std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options));
-
- server->addHandler(rooturi, handler);
-
- return server;
-}
-
-std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler) {
- const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), "num_threads", "1", 0 };
-
- std::vector<std::string> cpp_options;
- for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
- cpp_options.push_back(options[i]);
- }
- std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options));
-
- server->addHandler(rooturi, handler);
-
- return server;
-}
-
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/include/CivetLoader.h
----------------------------------------------------------------------
diff --git a/extensions/civetweb/include/CivetLoader.h b/extensions/civetweb/include/CivetLoader.h
deleted file mode 100644
index f571b5d..0000000
--- a/extensions/civetweb/include/CivetLoader.h
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef EXTENSION_CIVETLOADER_H
-#define EXTENSION_CIVETLOADER_H
-
-#include "core/ClassLoader.h"
-#include "c2/protocols/RESTReceiver.h"
-#include "processors/ListenHTTP.h"
-
-class __attribute__((visibility("default"))) CivetFactory : public core::ObjectFactory {
- public:
- CivetFactory() {
-
- }
-
- /**
- * Gets the name of the object.
- * @return class name of processor
- */
- virtual std::string getName() {
- return "CivetFactory";
- }
-
- virtual std::string getClassName() {
- return "CivetFactory";
- }
- /**
- * Gets the class name for the object
- * @return class name for the processor.
- */
- virtual std::vector<std::string> getClassNames() {
- std::vector<std::string> class_names;
- class_names.push_back("RESTReceiver");
- class_names.push_back("ListenHTTP");
- return class_names;
- }
-
- virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) {
- if (utils::StringUtils::equalsIgnoreCase(class_name, "RESTReceiver")) {
- return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::c2::RESTReceiver>());
- } else if (utils::StringUtils::equalsIgnoreCase(class_name, "ListenHTTP")) {
- return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<processors::ListenHTTP>());
- } else {
- return nullptr;
- }
- }
-
- static bool added;
-
-};
-
-extern "C" {
-void *createCivetFactory(void);
-}
-#endif /* EXTENSION_CIVETLOADER_H */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/include/c2/protocols/RESTReceiver.h
----------------------------------------------------------------------
diff --git a/extensions/civetweb/include/c2/protocols/RESTReceiver.h b/extensions/civetweb/include/c2/protocols/RESTReceiver.h
deleted file mode 100644
index 4793ee3..0000000
--- a/extensions/civetweb/include/c2/protocols/RESTReceiver.h
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_C2_RESTRCVR_H_
-#define LIBMINIFI_INCLUDE_C2_RESTRCVR_H_
-
-#include "json/json.h"
-#include "json/writer.h"
-#include <string>
-#include <mutex>
-#include "core/Resource.h"
-#include "c2/protocols/RESTProtocol.h"
-#include "CivetServer.h"
-#include "c2/C2Protocol.h"
-#include "controllers/SSLContextService.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-int log_message(const struct mg_connection *conn, const char *message);
-
-int ssl_protocol_en(void *ssl_context, void *user_data);
-
-/**
- * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
- *
- * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
- * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
- * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
- *
- */
-class RESTReceiver : public RESTProtocol, public HeartBeatReporter {
- public:
- RESTReceiver(std::string name, uuid_t uuid = nullptr);
-
- virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
- virtual int16_t heartbeat(const C2Payload &heartbeat) override;
-
- protected:
-
- class ListeningProtocol : public CivetHandler {
-
- public:
- ListeningProtocol() {
-
- }
-
- bool handleGet(CivetServer *server, struct mg_connection *conn) {
- std::string currentvalue;
- {
- std::lock_guard<std::mutex> lock(reponse_mutex_);
- currentvalue = resp_;
- }
-
- std::stringstream output;
- output << "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: " << currentvalue.length() << "\r\nConnection: close\r\n\r\n";
-
- mg_printf(conn, "%s", output.str().c_str());
- mg_printf(conn, "%s", currentvalue.c_str());
- return true;
- }
-
- void setResponse(std::string response) {
- std::lock_guard<std::mutex> lock(reponse_mutex_);
- resp_ = response;
- }
-
- protected:
- std::mutex reponse_mutex_;
- std::string resp_;
-
- };
-
- std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert);
-
- std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler);
-
- std::unique_ptr<CivetServer> listener;
- std::unique_ptr<ListeningProtocol> handler;
-
- private:
- std::shared_ptr<logging::Logger> logger_;
-};
-
-REGISTER_RESOURCE(RESTReceiver);
-
-} /* namesapce c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif /* LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/include/io/CivetStream.h
----------------------------------------------------------------------
diff --git a/extensions/civetweb/include/io/CivetStream.h b/extensions/civetweb/include/io/CivetStream.h
deleted file mode 100644
index 571b0ca..0000000
--- a/extensions/civetweb/include/io/CivetStream.h
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_
-#define EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_
-
-#include <memory>
-#include <thread>
-#include <mutex>
-#include <future>
-#include <vector>
-
-#include "io/BaseStream.h"
-#include "civetweb.h"
-#include "CivetServer.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace io {
-
-class CivetStream : public io::BaseStream {
- public:
- /**
- * File Stream constructor that accepts an fstream shared pointer.
- * It must already be initialized for read and write.
- */
- explicit CivetStream(struct mg_connection *conn)
- : io::BaseStream(), conn(conn) {
-
- }
-
- virtual ~CivetStream() {
- }
- /**
- * Skip to the specified offset.
- * @param offset offset to which we will skip
- */
- void seek(uint64_t offset){
-
- }
-
- const uint64_t getSize() const {
- return BaseStream::readBuffer;
- }
-
- // data stream extensions
- /**
- * Reads data and places it into buf
- * @param buf buffer in which we extract data
- * @param buflen
- */
- virtual int readData(std::vector<uint8_t> &buf, int buflen) {
- if (buf.capacity() < buflen) {
- buf.resize(buflen);
- }
- int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen);
-
- if (ret < buflen) {
- buf.resize(ret);
- }
- return ret;
- }
-
- /**
- * Reads data and places it into buf
- * @param buf buffer in which we extract data
- * @param buflen
- */
- virtual int readData(uint8_t *buf, int buflen) {
- return mg_read(conn,buf,buflen);
- }
-
- /**
- * Write value to the stream using std::vector
- * @param buf incoming buffer
- * @param buflen buffer to write
- *
- */
- virtual int writeData(std::vector<uint8_t> &buf, int buflen) {
- return 0;
- }
-
- /**
- * writes value to stream
- * @param value value to write
- * @param size size of value
- */
- virtual int writeData(uint8_t *value, int size) {
- return 0;
- }
-
- protected:
-
- /**
- * Creates a vector and returns the vector using the provided
- * type name.
- * @param t incoming object
- * @returns vector.
- */
- template<typename T>
- inline std::vector<uint8_t> readBuffer(const T& t) {
- std::vector<uint8_t> buf;
- buf.resize(sizeof t);
- readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
- return buf;
- }
-
- void reset();
-
- //size_t pos;
- struct mg_connection *conn;
-
- private:
-
- std::shared_ptr<logging::Logger> logger_;
-};
-} /* namespace io */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif /* EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/include/processors/ListenHTTP.h
----------------------------------------------------------------------
diff --git a/extensions/civetweb/include/processors/ListenHTTP.h b/extensions/civetweb/include/processors/ListenHTTP.h
deleted file mode 100644
index 1b58dcd..0000000
--- a/extensions/civetweb/include/processors/ListenHTTP.h
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * @file ListenHTTP.h
- * ListenHTTP class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __LISTEN_HTTP_H__
-#define __LISTEN_HTTP_H__
-
-#include <memory>
-#include <regex>
-
-#include <CivetServer.h>
-
-#include "FlowFileRecord.h"
-#include "core/Processor.h"
-#include "core/ProcessSession.h"
-#include "core/Core.h"
-#include "core/Resource.h"
-#include "core/logging/LoggerConfiguration.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-// ListenHTTP Class
-class ListenHTTP : public core::Processor {
- public:
-
- // Constructor
- /*!
- * Create a new processor
- */
- ListenHTTP(std::string name, uuid_t uuid = NULL)
- : Processor(name, uuid),
- logger_(logging::LoggerFactory<ListenHTTP>::getLogger()) {
- }
- // Destructor
- virtual ~ListenHTTP();
- // Processor Name
- static constexpr char const* ProcessorName = "ListenHTTP";
- // Supported Properties
- static core::Property BasePath;
- static core::Property Port;
- static core::Property AuthorizedDNPattern;
- static core::Property SSLCertificate;
- static core::Property SSLCertificateAuthority;
- static core::Property SSLVerifyPeer;
- static core::Property SSLMinimumVersion;
- static core::Property HeadersAsAttributesRegex;
- // Supported Relationships
- static core::Relationship Success;
-
- void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
- void initialize();
- void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
-
- // HTTP request handler
- class Handler : public CivetHandler {
- public:
- Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern);bool handlePost(
- CivetServer *server, struct mg_connection *conn);
-
- private:
- // Send HTTP 500 error response to client
- void sendErrorResponse(struct mg_connection *conn);
- // Logger
- std::shared_ptr<logging::Logger> logger_;
-
- std::regex _authDNRegex;
- std::regex _headersAsAttributesRegex;
- core::ProcessContext *_processContext;
- core::ProcessSessionFactory *_processSessionFactory;
- };
-
- // Write callback for transferring data from HTTP request to content repo
- class WriteCallback : public OutputStreamCallback {
- public:
- WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo);
- int64_t process(std::shared_ptr<io::BaseStream> stream);
-
- private:
- // Logger
- std::shared_ptr<logging::Logger> logger_;
-
- struct mg_connection *_conn;
- const struct mg_request_info *_reqInfo;
- };
-
- private:
- // Logger
- std::shared_ptr<logging::Logger> logger_;
-
- std::unique_ptr<CivetServer> _server;
- std::unique_ptr<Handler> _handler;
-};
-
-REGISTER_RESOURCE(ListenHTTP);
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/processors/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp
new file mode 100644
index 0000000..73ade40
--- /dev/null
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -0,0 +1,333 @@
+/**
+ * @file ListenHTTP.cpp
+
+ * ListenHTTP class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ListenHTTP.h"
+#include <uuid/uuid.h>
+#include <CivetServer.h>
+#include <stdio.h>
+#include <sstream>
+#include <utility>
+#include <memory>
+#include <string>
+#include <iostream>
+#include <fstream>
+#include <set>
+#include <vector>
+#include "utils/TimeUtil.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property ListenHTTP::BasePath("Base Path", "Base path for incoming connections", "contentListener");
+core::Property ListenHTTP::Port("Listening Port", "The Port to listen on for incoming connections", "");
+core::Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming"
+ " connections. If the Pattern does not match the DN, the connection will be refused.",
+ ".*");
+core::Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing PEM-formatted file including TLS/SSL certificate and key", "");
+core::Property ListenHTTP::SSLCertificateAuthority("SSL Certificate Authority", "File containing trusted PEM-formatted certificates", "");
+core::Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the client's certificate (yes/no)", "no");
+core::Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2");
+core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", "Specifies the Regular Expression that determines the names of HTTP Headers that"
+ " should be passed along as FlowFile attributes",
+ "");
+
+core::Relationship ListenHTTP::Success("success", "All files are routed to success");
+
+void ListenHTTP::initialize() {
+ logger_->log_info("Initializing ListenHTTP");
+
+ // Set the supported properties
+ std::set<core::Property> properties;
+ properties.insert(BasePath);
+ properties.insert(Port);
+ properties.insert(AuthorizedDNPattern);
+ properties.insert(SSLCertificate);
+ properties.insert(SSLCertificateAuthority);
+ properties.insert(SSLVerifyPeer);
+ properties.insert(SSLMinimumVersion);
+ properties.insert(HeadersAsAttributesRegex);
+ setSupportedProperties(properties);
+ // Set the supported relationships
+ std::set<core::Relationship> relationships;
+ relationships.insert(Success);
+ setSupportedRelationships(relationships);
+}
+
+void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+ std::string basePath;
+
+ if (!context->getProperty(BasePath.getName(), basePath)) {
+ logger_->log_info("%s attribute is missing, so default value of %s will be used", BasePath.getName().c_str(), BasePath.getValue().c_str());
+ basePath = BasePath.getValue();
+ }
+
+ basePath.insert(0, "/");
+
+ std::string listeningPort;
+
+ if (!context->getProperty(Port.getName(), listeningPort)) {
+ logger_->log_error("%s attribute is missing or invalid", Port.getName().c_str());
+ return;
+ }
+
+ std::string authDNPattern;
+
+ if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) && !authDNPattern.empty()) {
+ logger_->log_info("ListenHTTP using %s: %s", AuthorizedDNPattern.getName().c_str(), authDNPattern.c_str());
+ }
+
+ std::string sslCertFile;
+
+ if (context->getProperty(SSLCertificate.getName(), sslCertFile) && !sslCertFile.empty()) {
+ logger_->log_info("ListenHTTP using %s: %s", SSLCertificate.getName().c_str(), sslCertFile.c_str());
+ }
+
+ // Read further TLS/SSL options only if TLS/SSL usage is implied by virtue of certificate value being set
+ std::string sslCertAuthorityFile;
+ std::string sslVerifyPeer;
+ std::string sslMinVer;
+
+ if (!sslCertFile.empty()) {
+ if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile) && !sslCertAuthorityFile.empty()) {
+ logger_->log_info("ListenHTTP using %s: %s", SSLCertificateAuthority.getName().c_str(), sslCertAuthorityFile.c_str());
+ }
+
+ if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer)) {
+ if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) {
+ logger_->log_info("ListenHTTP will not verify peers");
+ } else {
+ logger_->log_info("ListenHTTP will verify peers");
+ }
+ } else {
+ logger_->log_info("ListenHTTP will not verify peers");
+ }
+
+ if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer)) {
+ logger_->log_info("ListenHTTP using %s: %s", SSLMinimumVersion.getName().c_str(), sslMinVer.c_str());
+ }
+ }
+
+ std::string headersAsAttributesPattern;
+
+ if (context->getProperty(HeadersAsAttributesRegex.getName(), headersAsAttributesPattern) && !headersAsAttributesPattern.empty()) {
+ logger_->log_info("ListenHTTP using %s: %s", HeadersAsAttributesRegex.getName().c_str(), headersAsAttributesPattern.c_str());
+ }
+
+ auto numThreads = getMaxConcurrentTasks();
+
+ logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads", listeningPort.c_str(), basePath.c_str(), numThreads);
+
+ // Initialize web server
+ std::vector<std::string> options;
+ options.push_back("enable_keep_alive");
+ options.push_back("yes");
+ options.push_back("keep_alive_timeout_ms");
+ options.push_back("15000");
+ options.push_back("num_threads");
+ options.push_back(std::to_string(numThreads));
+
+ if (sslCertFile.empty()) {
+ options.push_back("listening_ports");
+ options.push_back(listeningPort);
+ } else {
+ listeningPort += "s";
+ options.push_back("listening_ports");
+ options.push_back(listeningPort);
+
+ options.push_back("ssl_certificate");
+ options.push_back(sslCertFile);
+
+ if (!sslCertAuthorityFile.empty()) {
+ options.push_back("ssl_ca_file");
+ options.push_back(sslCertAuthorityFile);
+ }
+
+ if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) {
+ options.push_back("ssl_verify_peer");
+ options.push_back("no");
+ } else {
+ options.push_back("ssl_verify_peer");
+ options.push_back("yes");
+ }
+
+ if (sslMinVer.compare("SSL2") == 0) {
+ options.push_back("ssl_protocol_version");
+ options.push_back(std::to_string(0));
+ } else if (sslMinVer.compare("SSL3") == 0) {
+ options.push_back("ssl_protocol_version");
+ options.push_back(std::to_string(1));
+ } else if (sslMinVer.compare("TLS1.0") == 0) {
+ options.push_back("ssl_protocol_version");
+ options.push_back(std::to_string(2));
+ } else if (sslMinVer.compare("TLS1.1") == 0) {
+ options.push_back("ssl_protocol_version");
+ options.push_back(std::to_string(3));
+ } else {
+ options.push_back("ssl_protocol_version");
+ options.push_back(std::to_string(4));
+ }
+ }
+
+ _server.reset(new CivetServer(options));
+ _handler.reset(new Handler(context, sessionFactory, std::move(authDNPattern), std::move(headersAsAttributesPattern)));
+ _server->addHandler(basePath, _handler.get());
+}
+
+ListenHTTP::~ListenHTTP() {
+}
+
+void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
+
+ // Do nothing if there are no incoming files
+ if (!flowFile) {
+ return;
+ }
+}
+
+ListenHTTP::Handler::Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern)
+ : _authDNRegex(std::move(authDNPattern)),
+ _headersAsAttributesRegex(std::move(headersAsAttributesPattern)),
+ logger_(logging::LoggerFactory<ListenHTTP::Handler>::getLogger()) {
+ _processContext = context;
+ _processSessionFactory = sessionFactory;
+}
+
+void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) {
+ mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"
+ "Content-Type: text/html\r\n"
+ "Content-Length: 0\r\n\r\n");
+}
+
+bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn) {
+ auto req_info = mg_get_request_info(conn);
+ logger_->log_info("ListenHTTP handling POST request of length %ll", req_info->content_length);
+
+ // If this is a two-way TLS connection, authorize the peer against the configured pattern
+ if (req_info->is_ssl && req_info->client_cert != nullptr) {
+ if (!std::regex_match(req_info->client_cert->subject, _authDNRegex)) {
+ mg_printf(conn, "HTTP/1.1 403 Forbidden\r\n"
+ "Content-Type: text/html\r\n"
+ "Content-Length: 0\r\n\r\n");
+ logger_->log_warn("ListenHTTP client DN not authorized: %s", req_info->client_cert->subject);
+ return true;
+ }
+ }
+
+ // Always send 100 Continue, as allowed per standard to minimize client delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html)
+ mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n");
+
+ auto session = _processSessionFactory->createSession();
+ ListenHTTP::WriteCallback callback(conn, req_info);
+ auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+
+ if (!flowFile) {
+ sendErrorResponse(conn);
+ return true;
+ }
+
+ try {
+ session->write(flowFile, &callback);
+
+ // Add filename from "filename" header value (and pattern headers)
+ for (int i = 0; i < req_info->num_headers; i++) {
+ auto header = &req_info->http_headers[i];
+
+ if (strcmp("filename", header->name) == 0) {
+ if (!flowFile->updateAttribute("filename", header->value)) {
+ flowFile->addAttribute("filename", header->value);
+ }
+ } else if (std::regex_match(header->name, _headersAsAttributesRegex)) {
+ if (!flowFile->updateAttribute(header->name, header->value)) {
+ flowFile->addAttribute(header->name, header->value);
+ }
+ }
+ }
+
+ session->transfer(flowFile, Success);
+ session->commit();
+ } catch (std::exception &exception) {
+ logger_->log_debug("ListenHTTP Caught Exception %s", exception.what());
+ sendErrorResponse(conn);
+ session->rollback();
+ throw;
+ } catch (...) {
+ logger_->log_debug("ListenHTTP Caught Exception Processor::onTrigger");
+ sendErrorResponse(conn);
+ session->rollback();
+ throw;
+ }
+
+ mg_printf(conn, "HTTP/1.1 200 OK\r\n"
+ "Content-Type: text/html\r\n"
+ "Content-Length: 0\r\n\r\n");
+
+ return true;
+}
+
+ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo)
+ : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) {
+ _conn = conn;
+ _reqInfo = reqInfo;
+}
+
+int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t rlen;
+ int64_t nlen = 0;
+ int64_t tlen = _reqInfo->content_length;
+ uint8_t buf[16384];
+
+ // if we have no content length we should call mg_read until
+ // there is no data left from the stream to be HTTP/1.1 compliant
+ while (tlen == -1 || nlen < tlen) {
+ rlen = tlen == -1 ? sizeof(buf) : tlen - nlen;
+
+ if (rlen > (int64_t)sizeof(buf)) {
+ rlen = (int64_t)sizeof(buf);
+ }
+
+ // Read a buffer of data from client
+ rlen = mg_read(_conn, &buf[0], (size_t) rlen);
+
+ if (rlen <= 0) {
+ break;
+ }
+
+ // Transfer buffer data to the output stream
+ stream->write(&buf[0], rlen);
+
+ nlen += rlen;
+ }
+
+ return nlen;
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/civetweb/processors/ListenHTTP.h
----------------------------------------------------------------------
diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h
new file mode 100644
index 0000000..5199d19
--- /dev/null
+++ b/extensions/civetweb/processors/ListenHTTP.h
@@ -0,0 +1,122 @@
+/**
+ * @file ListenHTTP.h
+ * ListenHTTP class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LISTEN_HTTP_H__
+#define __LISTEN_HTTP_H__
+
+#include <memory>
+#include <regex>
+
+#include <CivetServer.h>
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// ListenHTTP Class
+class ListenHTTP : public core::Processor {
+ public:
+
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ ListenHTTP(std::string name, uuid_t uuid = NULL)
+ : Processor(name, uuid),
+ logger_(logging::LoggerFactory<ListenHTTP>::getLogger()) {
+ }
+ // Destructor
+ virtual ~ListenHTTP();
+ // Processor Name
+ static constexpr char const* ProcessorName = "ListenHTTP";
+ // Supported Properties
+ static core::Property BasePath;
+ static core::Property Port;
+ static core::Property AuthorizedDNPattern;
+ static core::Property SSLCertificate;
+ static core::Property SSLCertificateAuthority;
+ static core::Property SSLVerifyPeer;
+ static core::Property SSLMinimumVersion;
+ static core::Property HeadersAsAttributesRegex;
+ // Supported Relationships
+ static core::Relationship Success;
+
+ void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+ void initialize();
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+
+ // HTTP request handler
+ class Handler : public CivetHandler {
+ public:
+ Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern);
+ bool handlePost(CivetServer *server, struct mg_connection *conn);
+
+ private:
+ // Send HTTP 500 error response to client
+ void sendErrorResponse(struct mg_connection *conn);
+
+ std::regex _authDNRegex;
+ std::regex _headersAsAttributesRegex;
+ core::ProcessContext *_processContext;
+ core::ProcessSessionFactory *_processSessionFactory;
+
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+ };
+
+ // Write callback for transferring data from HTTP request to content repo
+ class WriteCallback : public OutputStreamCallback {
+ public:
+ WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo);
+ int64_t process(std::shared_ptr<io::BaseStream> stream);
+
+ private:
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+
+ struct mg_connection *_conn;
+ const struct mg_request_info *_reqInfo;
+ };
+
+ private:
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+
+ std::unique_ptr<CivetServer> _server;
+ std::unique_ptr<Handler> _handler;
+};
+
+REGISTER_RESOURCE(ListenHTTP);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/http-curl/CMakeLists.txt b/extensions/http-curl/CMakeLists.txt
index b855e8e..dfa4259 100644
--- a/extensions/http-curl/CMakeLists.txt
+++ b/extensions/http-curl/CMakeLists.txt
@@ -22,7 +22,7 @@ find_package(CURL REQUIRED)
set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
-include_directories(../../libminifi/include ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/jsoncpp/include ../../thirdparty/)
+include_directories(../../libminifi/include ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT} ../../thirdparty/jsoncpp/include ../../thirdparty/)
include_directories(protocols client processors sitetosite)
file(GLOB SOURCES "*.cpp" "protocols/*.cpp" "client/*.cpp" "processors/*.cpp" "sitetosite/*.cpp")
@@ -49,6 +49,7 @@ find_package(UUID REQUIRED)
find_package(OpenSSL REQUIRED)
include_directories(${OPENSSL_INCLUDE_DIR})
target_link_libraries(minifi-http-curl ${CMAKE_DL_LIBS} ${OPENSSL_LIBRARIES})
+target_link_libraries(minifi-http-curl minifi-civet-extensions)
find_package(ZLIB REQUIRED)
include_directories(${ZLIB_INCLUDE_DIRS})
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/HTTPCurlLoader.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/HTTPCurlLoader.h b/extensions/http-curl/HTTPCurlLoader.h
index 9a6e821..ec90e99 100644
--- a/extensions/http-curl/HTTPCurlLoader.h
+++ b/extensions/http-curl/HTTPCurlLoader.h
@@ -19,7 +19,7 @@
#define EXTENSIONS_HTTPCURLLOADER_H_
#include "c2/protocols/RESTProtocol.h"
-#include "c2/protocols/RESTSender.h"
+#include "protocols/RESTSender.h"
#include "processors/InvokeHTTP.h"
#include "client/HTTPClient.h"
#include "core/ClassLoader.h"