You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2018/01/08 18:27:00 UTC
[3/5] nifi-minifi-cpp git commit: MINIFICPP-251 Move Civet
implementations to an extension.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTProtocol.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTProtocol.cpp b/extensions/http-curl/protocols/RESTProtocol.cpp
deleted file mode 100644
index afbe3c9..0000000
--- a/extensions/http-curl/protocols/RESTProtocol.cpp
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "RESTProtocol.h"
-
-#include <algorithm>
-#include <memory>
-#include <utility>
-#include <map>
-#include <string>
-#include <vector>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const std::vector<char> &response) {
- Json::Reader reader;
- Json::Value root;
- try {
- if (reader.parse(std::string(response.data(), response.size()), root)) {
- std::string requested_operation = getOperation(payload);
-
- std::string identifier;
- if (root.isMember("operationid")) {
- identifier = root["operationid"].asString();
- }
- if (root["operation"].asString() == requested_operation) {
- if (root["requested_operations"].size() == 0) {
- return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true));
- }
- C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true);
-
- new_payload.setIdentifier(identifier);
-
- for (const Json::Value& request : root["requested_operations"]) {
- Operation newOp = stringToOperation(request["operation"].asString());
- C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, true);
- C2ContentResponse new_command(newOp);
- new_command.delay = 0;
- new_command.required = true;
- new_command.ttl = -1;
- // set the identifier if one exists
- if (request.isMember("operationid")) {
- new_command.ident = request["operationid"].asString();
- nested_payload.setIdentifier(new_command.ident);
- }
- new_command.name = request["name"].asString();
-
- if (request.isMember("content") && request["content"].size() > 0) {
- for (const auto &name : request["content"].getMemberNames()) {
- new_command.operation_arguments[name] = request["content"][name].asString();
- }
- }
- nested_payload.addContent(std::move(new_command));
- new_payload.addPayload(std::move(nested_payload));
- }
- // we have a response for this request
- return new_payload;
- }
- }
- } catch (...) {
- }
- return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true));
-}
-
-Json::Value RESTProtocol::serializeJsonPayload(Json::Value &json_root, const C2Payload &payload) {
- // get the name from the content
- Json::Value json_payload;
- std::map<std::string, std::vector<Json::Value>> children;
- for (const auto &nested_payload : payload.getNestedPayloads()) {
- Json::Value child_payload = serializeJsonPayload(json_payload, nested_payload);
- children[nested_payload.getLabel()].push_back(child_payload);
- }
- for (auto child_vector : children) {
- if (child_vector.second.size() > 1) {
- Json::Value children_json(Json::arrayValue);
- for (auto child : child_vector.second) {
- json_payload[child_vector.first] = child;
- }
- } else {
- if (child_vector.second.size() == 1) {
- if (child_vector.second.at(0).isMember(child_vector.first)) {
- json_payload[child_vector.first] = child_vector.second.at(0)[child_vector.first];
- } else {
- json_payload[child_vector.first] = child_vector.second.at(0);
- }
- }
- }
- }
-
- const std::vector<C2ContentResponse> &content = payload.getContent();
- for (const auto &payload_content : content) {
- Json::Value payload_content_values;
- bool use_sub_option = true;
- if (payload_content.op == payload.getOperation()) {
- for (auto content : payload_content.operation_arguments) {
- if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
- json_payload[payload_content.name] = content.second;
- use_sub_option = false;
- } else {
- payload_content_values[content.first] = content.second;
- }
- }
- }
- if (use_sub_option)
- json_payload[payload_content.name] = payload_content_values;
- }
- return json_payload;
-}
-
-std::string RESTProtocol::getOperation(const C2Payload &payload) {
- switch (payload.getOperation()) {
- case Operation::ACKNOWLEDGE:
- return "acknowledge";
- case Operation::HEARTBEAT:
- return "heartbeat";
- case Operation::RESTART:
- return "restart";
- case Operation::DESCRIBE:
- return "describe";
- case Operation::STOP:
- return "stop";
- case Operation::START:
- return "start";
- case Operation::UPDATE:
- return "update";
- default:
- return "heartbeat";
- }
-}
-
-Operation RESTProtocol::stringToOperation(const std::string str) {
- std::string op = str;
- std::transform(str.begin(), str.end(), op.begin(), ::tolower);
- if (op == "heartbeat") {
- return Operation::HEARTBEAT;
- } else if (op == "acknowledge") {
- return Operation::ACKNOWLEDGE;
- } else if (op == "update") {
- return Operation::UPDATE;
- } else if (op == "describe") {
- return Operation::DESCRIBE;
- } else if (op == "restart") {
- return Operation::RESTART;
- } else if (op == "clear") {
- return Operation::CLEAR;
- } else if (op == "stop") {
- return Operation::STOP;
- } else if (op == "start") {
- return Operation::START;
- }
- return Operation::HEARTBEAT;
-}
-
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTProtocol.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTProtocol.h b/extensions/http-curl/protocols/RESTProtocol.h
deleted file mode 100644
index 4767e77..0000000
--- a/extensions/http-curl/protocols/RESTProtocol.h
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
-#define LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
-
-#include "json/json.h"
-#include "json/writer.h"
-#include <string>
-#include <mutex>
-
-#include "utils/ByteArrayCallback.h"
-#include "CivetServer.h"
-#include "c2/C2Protocol.h"
-#include "c2/HeartBeatReporter.h"
-#include "controllers/SSLContextService.h"
-#include "utils/HTTPClient.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-/**
- * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
- *
- * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
- * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
- * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
- *
- */
-class RESTProtocol {
- public:
- RESTProtocol() {
-
- }
-
- virtual ~RESTProtocol() {
-
- }
-
- protected:
-
- virtual Json::Value serializeJsonPayload(Json::Value &json_root, const C2Payload &payload);
-
- virtual const C2Payload parseJsonResponse(const C2Payload &payload, const std::vector<char> &response);
-
- virtual std::string getOperation(const C2Payload &payload);
-
- virtual Operation stringToOperation(const std::string str);
-
-};
-
-} /* namesapce c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif /* LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTOPERATIONS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTReceiver.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp b/extensions/http-curl/protocols/RESTReceiver.cpp
deleted file mode 100644
index 4c46516..0000000
--- a/extensions/http-curl/protocols/RESTReceiver.cpp
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "RESTReceiver.h"
-#include <algorithm>
-#include <memory>
-#include <utility>
-#include <map>
-#include <string>
-#include <vector>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-int log_message(const struct mg_connection *conn, const char *message) {
- puts(message);
- return 1;
-}
-
-int ssl_protocol_en(void *ssl_context, void *user_data) {
- return 0;
-}
-
-RESTReceiver::RESTReceiver(std::string name, uuid_t uuid)
- : HeartBeatReporter(name, uuid),
- logger_(logging::LoggerFactory<RESTReceiver>::getLogger()) {
-}
-
-void RESTReceiver::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
- HeartBeatReporter::initialize(controller, configure);
- logger_->log_debug("Initializing rest receiveer");
- if (nullptr != configuration_) {
- std::string listeningPort, rootUri, caCert;
- configuration_->get("c2.rest.listener.port", listeningPort);
- configuration_->get("c2.rest.listener.heartbeat.rooturi", rootUri);
- configuration_->get("c2.rest.listener.cacert", caCert);
-
- if (!listeningPort.empty() && !rootUri.empty()) {
- handler = std::unique_ptr<ListeningProtocol>(new ListeningProtocol());
- if (!caCert.empty()) {
- listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()), caCert);
- } else {
- listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()));
- }
- }
- }
-}
-int16_t RESTReceiver::heartbeat(const C2Payload &payload) {
- std::string operation_request_str = getOperation(payload);
- std::string outputConfig;
- Json::Value json_payload;
- json_payload["operation"] = operation_request_str;
- if (payload.getIdentifier().length() > 0) {
- json_payload["operationid"] = payload.getIdentifier();
- }
- const std::vector<C2ContentResponse> &content = payload.getContent();
-
- for (const auto &payload_content : content) {
- Json::Value payload_content_values;
- bool use_sub_option = true;
- if (payload_content.op == payload.getOperation()) {
- for (auto content : payload_content.operation_arguments) {
- if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
- json_payload[payload_content.name] = content.second;
- use_sub_option = false;
- } else {
- payload_content_values[content.first] = content.second;
- }
- }
- }
- if (use_sub_option)
- json_payload[payload_content.name] = payload_content_values;
- }
-
- for (const auto &nested_payload : payload.getNestedPayloads()) {
- json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload);
- }
-
- Json::StyledWriter writer;
- outputConfig = writer.write(json_payload);
- if (handler != nullptr) {
- logger_->log_debug("Setting %s", outputConfig);
- handler->setResponse(outputConfig);
- }
-
- return 0;
-}
-
-std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert) {
- struct mg_callbacks callback;
-
- memset(&callback, 0, sizeof(callback));
- callback.init_ssl = ssl_protocol_en;
- std::string my_port = port;
- my_port += "s";
- callback.log_message = log_message;
- const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", "ALL",
- "ssl_verify_peer", "no", "num_threads", "1", 0 };
-
- std::vector<std::string> cpp_options;
- for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
- cpp_options.push_back(options[i]);
- }
- std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options));
-
- server->addHandler(rooturi, handler);
-
- return server;
-}
-
-std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler) {
- const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), "num_threads", "1", 0 };
-
- std::vector<std::string> cpp_options;
- for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
- cpp_options.push_back(options[i]);
- }
- std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options));
-
- server->addHandler(rooturi, handler);
-
- return server;
-}
-
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTReceiver.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTReceiver.h b/extensions/http-curl/protocols/RESTReceiver.h
deleted file mode 100644
index b0de62a..0000000
--- a/extensions/http-curl/protocols/RESTReceiver.h
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_C2_RESTRCVR_H_
-#define LIBMINIFI_INCLUDE_C2_RESTRCVR_H_
-
-#include "RESTSender.h"
-#include "json/json.h"
-#include "json/writer.h"
-#include <string>
-#include <mutex>
-#include "core/Resource.h"
-#include "RESTProtocol.h"
-#include "CivetServer.h"
-#include "c2/C2Protocol.h"
-#include "controllers/SSLContextService.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-int log_message(const struct mg_connection *conn, const char *message);
-
-int ssl_protocol_en(void *ssl_context, void *user_data);
-
-/**
- * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
- *
- * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
- * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
- * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
- *
- */
-class RESTReceiver : public RESTProtocol, public HeartBeatReporter {
- public:
- RESTReceiver(std::string name, uuid_t uuid = nullptr);
-
- virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
- virtual int16_t heartbeat(const C2Payload &heartbeat) override;
-
- protected:
-
- class ListeningProtocol : public CivetHandler {
-
- public:
- ListeningProtocol() {
-
- }
-
- bool handleGet(CivetServer *server, struct mg_connection *conn) {
- std::string currentvalue;
- {
- std::lock_guard<std::mutex> lock(reponse_mutex_);
- currentvalue = resp_;
- }
-
- std::stringstream output;
- output << "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: " << currentvalue.length() << "\r\nConnection: close\r\n\r\n";
-
- mg_printf(conn, "%s", output.str().c_str());
- mg_printf(conn, "%s", currentvalue.c_str());
- return true;
- }
-
- void setResponse(std::string response) {
- std::lock_guard<std::mutex> lock(reponse_mutex_);
- resp_ = response;
- }
-
- protected:
- std::mutex reponse_mutex_;
- std::string resp_;
-
- };
-
- std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert);
-
- std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler);
-
- std::unique_ptr<CivetServer> listener;
- std::unique_ptr<ListeningProtocol> handler;
-
- private:
- std::shared_ptr<logging::Logger> logger_;
-};
-
-REGISTER_RESOURCE(RESTReceiver);
-
-} /* namesapce c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif /* LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTSender.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
deleted file mode 100644
index ebf532a..0000000
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "RESTSender.h"
-
-#include <algorithm>
-#include <memory>
-#include <utility>
-#include <map>
-#include <string>
-#include <vector>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-RESTSender::RESTSender(std::string name, uuid_t uuid)
- : C2Protocol(name, uuid),
- logger_(logging::LoggerFactory<Connectable>::getLogger()) {
-}
-
-void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
- C2Protocol::initialize(controller, configure);
- // base URL when one is not specified.
- if (nullptr != configure) {
- configure->get("c2.rest.url", rest_uri_);
- configure->get("c2.rest.url.ack", ack_uri_);
- }
- logger_->log_info("Submitting to %s", rest_uri_);
-}
-C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) {
- std::string operation_request_str = getOperation(payload);
- std::string outputConfig;
- if (direction == Direction::TRANSMIT) {
- Json::Value json_payload;
- json_payload["operation"] = operation_request_str;
- if (payload.getIdentifier().length() > 0) {
- json_payload["operationid"] = payload.getIdentifier();
- }
- const std::vector<C2ContentResponse> &content = payload.getContent();
-
- for (const auto &payload_content : content) {
- Json::Value payload_content_values;
- bool use_sub_option = true;
- if (payload_content.op == payload.getOperation()) {
- for (auto content : payload_content.operation_arguments) {
- if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
- json_payload[payload_content.name] = content.second;
- use_sub_option = false;
- } else {
- payload_content_values[content.first] = content.second;
- }
- }
- }
- if (use_sub_option)
- json_payload[payload_content.name] = payload_content_values;
- }
-
- for (const auto &nested_payload : payload.getNestedPayloads()) {
- json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload);
- }
-
- Json::StyledWriter writer;
- outputConfig = writer.write(json_payload);
- }
-
- return sendPayload(url, direction, payload, outputConfig);
-}
-
-C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction direction, bool async) {
- if (payload.getOperation() == ACKNOWLEDGE) {
- return consumePayload(ack_uri_, payload, direction, async);
- }
- return consumePayload(rest_uri_, payload, direction, async);
-}
-
-void RESTSender::update(const std::shared_ptr<Configure> &configure) {
- std::string url;
- configure->get("c2.rest.url", url);
- configure->get("c2.rest.url.ack", url);
-}
-
-const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) {
- utils::HTTPClient client(url, ssl_context_service_);
- client.setConnectionTimeout(2);
-
- std::unique_ptr<utils::ByteInputCallBack> input = nullptr;
- std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr;
- if (direction == Direction::TRANSMIT) {
- input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
- callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback);
- input->write(outputConfig);
- callback->ptr = input.get();
- callback->pos = 0;
- client.set_request_method("POST");
- client.setUploadCallback(callback.get());
- } else {
- // we do not need to set the uplaod callback
- // since we are not uploading anything on a get
- client.set_request_method("GET");
- }
- client.setContentType("application/json");
- bool isOkay = client.submit();
- int64_t respCode = client.getResponseCode();
-
- if (isOkay && respCode) {
- if (payload.isRaw()) {
- C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true);
-
- response_payload.setRawData(client.getResponseBody());
- return response_payload;
- }
- return parseJsonResponse(payload, client.getResponseBody());
- } else {
- return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
- }
-}
-
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTSender.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTSender.h b/extensions/http-curl/protocols/RESTSender.h
deleted file mode 100644
index e4c1e5e..0000000
--- a/extensions/http-curl/protocols/RESTSender.h
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_C2_RESTSENDER_H_
-#define LIBMINIFI_INCLUDE_C2_RESTSENDER_H_
-
-#include "json/json.h"
-#include "json/writer.h"
-#include <string>
-#include <mutex>
-
-#include "utils/ByteArrayCallback.h"
-#include "CivetServer.h"
-#include "c2/C2Protocol.h"
-#include "RESTProtocol.h"
-#include "c2/HeartBeatReporter.h"
-#include "controllers/SSLContextService.h"
-#include "../client/HTTPClient.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-/**
- * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
- *
- * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
- * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
- * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
- *
- */
-class RESTSender : public RESTProtocol, public C2Protocol {
- public:
-
- explicit RESTSender(std::string name, uuid_t uuid = nullptr);
-
- virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) override;
-
- virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async) override;
-
- virtual void update(const std::shared_ptr<Configure> &configure) override;
-
- virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
-
- protected:
-
- virtual const C2Payload sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig);
-
- std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
-
- private:
- std::shared_ptr<logging::Logger> logger_;
- std::string rest_uri_;
- std::string ack_uri_;
-};
-
-REGISTER_RESOURCE(RESTSender);
-
-} /* namesapce c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif /* LIBMINIFI_INCLUDE_C2_RESTPROTOCOL_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/libarchive/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/libarchive/CMakeLists.txt b/extensions/libarchive/CMakeLists.txt
index 8a8d37f..1ed8c5b 100644
--- a/extensions/libarchive/CMakeLists.txt
+++ b/extensions/libarchive/CMakeLists.txt
@@ -20,7 +20,7 @@
set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
-include_directories(../../libminifi/include ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/jsoncpp/include ../../thirdparty/)
+include_directories(../../libminifi/include ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/jsoncpp/include ../../thirdparty/)
find_package(LibArchive)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/rocksdb-repos/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/CMakeLists.txt b/extensions/rocksdb-repos/CMakeLists.txt
index cb1bdb1..5d459e5 100644
--- a/extensions/rocksdb-repos/CMakeLists.txt
+++ b/extensions/rocksdb-repos/CMakeLists.txt
@@ -23,7 +23,7 @@ cmake_minimum_required(VERSION 2.6)
set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
-include_directories(../../libminifi/include ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/jsoncpp/include ../../thirdparty/)
+include_directories(../../libminifi/include ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/rocksdb/include ../../thirdparty/)
find_package(RocksDB)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/script/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/script/CMakeLists.txt b/extensions/script/CMakeLists.txt
index 1501c68..927d974 100644
--- a/extensions/script/CMakeLists.txt
+++ b/extensions/script/CMakeLists.txt
@@ -37,7 +37,7 @@ endif()
set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
-include_directories(../../libminifi/include ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/jsoncpp/include ../../thirdparty/)
+include_directories(../../libminifi/include ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/jsoncpp/include ../../thirdparty/)
file(GLOB SOURCES "*.cpp")
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 6ede84f..a40f070 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -55,7 +55,6 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wno-reorder")
include_directories(../thirdparty/spdlog-20170710/include)
include_directories(../thirdparty/yaml-cpp-yaml-cpp-20171024/include)
-include_directories(${CIVET_THIRDPARTY_ROOT}/include)
include_directories(../thirdparty/jsoncpp/include)
include_directories(../thirdparty/concurrentqueue/)
include_directories(include)
@@ -73,7 +72,7 @@ file(GLOB SPD_SOURCES "../thirdparty/spdlog-20170710/include/spdlog/*")
add_library(spdlog STATIC ${SPD_SOURCES})
add_library(core-minifi STATIC ${SOURCES})
add_dependencies(core-minifi jsoncpp_project)
-target_link_libraries(core-minifi ${UUID_LIBRARIES} ${JSONCPP_LIB} yaml-cpp )
+target_link_libraries(core-minifi ${UUID_LIBRARIES} ${JSONCPP_LIB} yaml-cpp dl)
find_package(ZLIB REQUIRED)
include_directories(${ZLIB_INCLUDE_DIRS})
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/include/c2/protocols/RESTProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/protocols/RESTProtocol.h b/libminifi/include/c2/protocols/RESTProtocol.h
new file mode 100644
index 0000000..823b5a9
--- /dev/null
+++ b/libminifi/include/c2/protocols/RESTProtocol.h
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
+#define LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
+
+#include "json/json.h"
+#include "json/writer.h"
+#include <string>
+#include <mutex>
+
+#include "utils/ByteArrayCallback.h"
+#include "c2/C2Protocol.h"
+#include "c2/HeartBeatReporter.h"
+#include "controllers/SSLContextService.h"
+#include "utils/HTTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
+ *
+ * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
+ * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
+ * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
+ *
+ */
+class RESTProtocol {
+ public:
+ RESTProtocol() {
+
+ }
+
+ virtual ~RESTProtocol() {
+
+ }
+
+ protected:
+
+ virtual Json::Value serializeJsonPayload(Json::Value &json_root, const C2Payload &payload);
+
+ virtual const C2Payload parseJsonResponse(const C2Payload &payload, const std::vector<char> &response);
+
+ virtual std::string getOperation(const C2Payload &payload);
+
+ virtual Operation stringToOperation(const std::string str);
+
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTOPERATIONS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index 6e644ef..eab7169 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -25,6 +25,7 @@
#include "core/controller/StandardControllerServiceProvider.h"
#include "provenance/Provenance.h"
#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
+
#include "core/Processor.h"
#include "core/Resource.h"
#include "core/logging/LoggerConfiguration.h"
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/include/processors/ListenHTTP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ListenHTTP.h b/libminifi/include/processors/ListenHTTP.h
deleted file mode 100644
index 1b58dcd..0000000
--- a/libminifi/include/processors/ListenHTTP.h
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * @file ListenHTTP.h
- * ListenHTTP class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __LISTEN_HTTP_H__
-#define __LISTEN_HTTP_H__
-
-#include <memory>
-#include <regex>
-
-#include <CivetServer.h>
-
-#include "FlowFileRecord.h"
-#include "core/Processor.h"
-#include "core/ProcessSession.h"
-#include "core/Core.h"
-#include "core/Resource.h"
-#include "core/logging/LoggerConfiguration.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-// ListenHTTP Class
-class ListenHTTP : public core::Processor {
- public:
-
- // Constructor
- /*!
- * Create a new processor
- */
- ListenHTTP(std::string name, uuid_t uuid = NULL)
- : Processor(name, uuid),
- logger_(logging::LoggerFactory<ListenHTTP>::getLogger()) {
- }
- // Destructor
- virtual ~ListenHTTP();
- // Processor Name
- static constexpr char const* ProcessorName = "ListenHTTP";
- // Supported Properties
- static core::Property BasePath;
- static core::Property Port;
- static core::Property AuthorizedDNPattern;
- static core::Property SSLCertificate;
- static core::Property SSLCertificateAuthority;
- static core::Property SSLVerifyPeer;
- static core::Property SSLMinimumVersion;
- static core::Property HeadersAsAttributesRegex;
- // Supported Relationships
- static core::Relationship Success;
-
- void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
- void initialize();
- void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
-
- // HTTP request handler
- class Handler : public CivetHandler {
- public:
- Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern);bool handlePost(
- CivetServer *server, struct mg_connection *conn);
-
- private:
- // Send HTTP 500 error response to client
- void sendErrorResponse(struct mg_connection *conn);
- // Logger
- std::shared_ptr<logging::Logger> logger_;
-
- std::regex _authDNRegex;
- std::regex _headersAsAttributesRegex;
- core::ProcessContext *_processContext;
- core::ProcessSessionFactory *_processSessionFactory;
- };
-
- // Write callback for transferring data from HTTP request to content repo
- class WriteCallback : public OutputStreamCallback {
- public:
- WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo);
- int64_t process(std::shared_ptr<io::BaseStream> stream);
-
- private:
- // Logger
- std::shared_ptr<logging::Logger> logger_;
-
- struct mg_connection *_conn;
- const struct mg_request_info *_reqInfo;
- };
-
- private:
- // Logger
- std::shared_ptr<logging::Logger> logger_;
-
- std::unique_ptr<CivetServer> _server;
- std::unique_ptr<Handler> _handler;
-};
-
-REGISTER_RESOURCE(ListenHTTP);
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/src/c2/protocols/RESTProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp
new file mode 100644
index 0000000..946e3c6
--- /dev/null
+++ b/libminifi/src/c2/protocols/RESTProtocol.cpp
@@ -0,0 +1,177 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "c2/protocols/RESTProtocol.h"
+
+#include <algorithm>
+#include <memory>
+#include <utility>
+#include <map>
+#include <string>
+#include <vector>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const std::vector<char> &response) {
+ Json::Reader reader;
+ Json::Value root;
+ try {
+ if (reader.parse(std::string(response.data(), response.size()), root)) {
+ std::string requested_operation = getOperation(payload);
+
+ std::string identifier;
+ if (root.isMember("operationid")) {
+ identifier = root["operationid"].asString();
+ }
+ if (root["operation"].asString() == requested_operation) {
+ if (root["requested_operations"].size() == 0) {
+ return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true));
+ }
+ C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true);
+
+ new_payload.setIdentifier(identifier);
+
+ for (const Json::Value& request : root["requested_operations"]) {
+ Operation newOp = stringToOperation(request["operation"].asString());
+ C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, true);
+ C2ContentResponse new_command(newOp);
+ new_command.delay = 0;
+ new_command.required = true;
+ new_command.ttl = -1;
+ // set the identifier if one exists
+ if (request.isMember("operationid")) {
+ new_command.ident = request["operationid"].asString();
+ nested_payload.setIdentifier(new_command.ident);
+ }
+ new_command.name = request["name"].asString();
+
+ if (request.isMember("content") && request["content"].size() > 0) {
+ for (const auto &name : request["content"].getMemberNames()) {
+ new_command.operation_arguments[name] = request["content"][name].asString();
+ }
+ }
+ nested_payload.addContent(std::move(new_command));
+ new_payload.addPayload(std::move(nested_payload));
+ }
+ // we have a response for this request
+ return new_payload;
+ }
+ }
+ } catch (...) {
+ }
+ return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true));
+}
+
+Json::Value RESTProtocol::serializeJsonPayload(Json::Value &json_root, const C2Payload &payload) {
+ // get the name from the content
+ Json::Value json_payload;
+ std::map<std::string, std::vector<Json::Value>> children;
+ for (const auto &nested_payload : payload.getNestedPayloads()) {
+ Json::Value child_payload = serializeJsonPayload(json_payload, nested_payload);
+ children[nested_payload.getLabel()].push_back(child_payload);
+ }
+ for (auto child_vector : children) {
+ if (child_vector.second.size() > 1) {
+ Json::Value children_json(Json::arrayValue);
+ for (auto child : child_vector.second) {
+ json_payload[child_vector.first] = child;
+ }
+ } else {
+ if (child_vector.second.size() == 1) {
+ if (child_vector.second.at(0).isMember(child_vector.first)) {
+ json_payload[child_vector.first] = child_vector.second.at(0)[child_vector.first];
+ } else {
+ json_payload[child_vector.first] = child_vector.second.at(0);
+ }
+ }
+ }
+ }
+
+ const std::vector<C2ContentResponse> &content = payload.getContent();
+ for (const auto &payload_content : content) {
+ Json::Value payload_content_values;
+ bool use_sub_option = true;
+ if (payload_content.op == payload.getOperation()) {
+ for (auto content : payload_content.operation_arguments) {
+ if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
+ json_payload[payload_content.name] = content.second;
+ use_sub_option = false;
+ } else {
+ payload_content_values[content.first] = content.second;
+ }
+ }
+ }
+ if (use_sub_option)
+ json_payload[payload_content.name] = payload_content_values;
+ }
+ return json_payload;
+}
+
+std::string RESTProtocol::getOperation(const C2Payload &payload) {
+ switch (payload.getOperation()) {
+ case Operation::ACKNOWLEDGE:
+ return "acknowledge";
+ case Operation::HEARTBEAT:
+ return "heartbeat";
+ case Operation::RESTART:
+ return "restart";
+ case Operation::DESCRIBE:
+ return "describe";
+ case Operation::STOP:
+ return "stop";
+ case Operation::START:
+ return "start";
+ case Operation::UPDATE:
+ return "update";
+ default:
+ return "heartbeat";
+ }
+}
+
+Operation RESTProtocol::stringToOperation(const std::string str) {
+ std::string op = str;
+ std::transform(str.begin(), str.end(), op.begin(), ::tolower);
+ if (op == "heartbeat") {
+ return Operation::HEARTBEAT;
+ } else if (op == "acknowledge") {
+ return Operation::ACKNOWLEDGE;
+ } else if (op == "update") {
+ return Operation::UPDATE;
+ } else if (op == "describe") {
+ return Operation::DESCRIBE;
+ } else if (op == "restart") {
+ return Operation::RESTART;
+ } else if (op == "clear") {
+ return Operation::CLEAR;
+ } else if (op == "stop") {
+ return Operation::STOP;
+ } else if (op == "start") {
+ return Operation::START;
+ }
+ return Operation::HEARTBEAT;
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/src/processors/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp
deleted file mode 100644
index 62f8194..0000000
--- a/libminifi/src/processors/ListenHTTP.cpp
+++ /dev/null
@@ -1,333 +0,0 @@
-/**
- * @file ListenHTTP.cpp
-
- * ListenHTTP class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "processors/ListenHTTP.h"
-#include <uuid/uuid.h>
-#include <CivetServer.h>
-#include <stdio.h>
-#include <sstream>
-#include <utility>
-#include <memory>
-#include <string>
-#include <iostream>
-#include <fstream>
-#include <set>
-#include <vector>
-#include "utils/TimeUtil.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/ProcessSessionFactory.h"
-#include "core/logging/LoggerConfiguration.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-core::Property ListenHTTP::BasePath("Base Path", "Base path for incoming connections", "contentListener");
-core::Property ListenHTTP::Port("Listening Port", "The Port to listen on for incoming connections", "");
-core::Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming"
- " connections. If the Pattern does not match the DN, the connection will be refused.",
- ".*");
-core::Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing PEM-formatted file including TLS/SSL certificate and key", "");
-core::Property ListenHTTP::SSLCertificateAuthority("SSL Certificate Authority", "File containing trusted PEM-formatted certificates", "");
-core::Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the client's certificate (yes/no)", "no");
-core::Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2");
-core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", "Specifies the Regular Expression that determines the names of HTTP Headers that"
- " should be passed along as FlowFile attributes",
- "");
-
-core::Relationship ListenHTTP::Success("success", "All files are routed to success");
-
-void ListenHTTP::initialize() {
- logger_->log_info("Initializing ListenHTTP");
-
- // Set the supported properties
- std::set<core::Property> properties;
- properties.insert(BasePath);
- properties.insert(Port);
- properties.insert(AuthorizedDNPattern);
- properties.insert(SSLCertificate);
- properties.insert(SSLCertificateAuthority);
- properties.insert(SSLVerifyPeer);
- properties.insert(SSLMinimumVersion);
- properties.insert(HeadersAsAttributesRegex);
- setSupportedProperties(properties);
- // Set the supported relationships
- std::set<core::Relationship> relationships;
- relationships.insert(Success);
- setSupportedRelationships(relationships);
-}
-
-void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
- std::string basePath;
-
- if (!context->getProperty(BasePath.getName(), basePath)) {
- logger_->log_info("%s attribute is missing, so default value of %s will be used", BasePath.getName().c_str(), BasePath.getValue().c_str());
- basePath = BasePath.getValue();
- }
-
- basePath.insert(0, "/");
-
- std::string listeningPort;
-
- if (!context->getProperty(Port.getName(), listeningPort)) {
- logger_->log_error("%s attribute is missing or invalid", Port.getName().c_str());
- return;
- }
-
- std::string authDNPattern;
-
- if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) && !authDNPattern.empty()) {
- logger_->log_info("ListenHTTP using %s: %s", AuthorizedDNPattern.getName().c_str(), authDNPattern.c_str());
- }
-
- std::string sslCertFile;
-
- if (context->getProperty(SSLCertificate.getName(), sslCertFile) && !sslCertFile.empty()) {
- logger_->log_info("ListenHTTP using %s: %s", SSLCertificate.getName().c_str(), sslCertFile.c_str());
- }
-
- // Read further TLS/SSL options only if TLS/SSL usage is implied by virtue of certificate value being set
- std::string sslCertAuthorityFile;
- std::string sslVerifyPeer;
- std::string sslMinVer;
-
- if (!sslCertFile.empty()) {
- if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile) && !sslCertAuthorityFile.empty()) {
- logger_->log_info("ListenHTTP using %s: %s", SSLCertificateAuthority.getName().c_str(), sslCertAuthorityFile.c_str());
- }
-
- if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer)) {
- if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) {
- logger_->log_info("ListenHTTP will not verify peers");
- } else {
- logger_->log_info("ListenHTTP will verify peers");
- }
- } else {
- logger_->log_info("ListenHTTP will not verify peers");
- }
-
- if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer)) {
- logger_->log_info("ListenHTTP using %s: %s", SSLMinimumVersion.getName().c_str(), sslMinVer.c_str());
- }
- }
-
- std::string headersAsAttributesPattern;
-
- if (context->getProperty(HeadersAsAttributesRegex.getName(), headersAsAttributesPattern) && !headersAsAttributesPattern.empty()) {
- logger_->log_info("ListenHTTP using %s: %s", HeadersAsAttributesRegex.getName().c_str(), headersAsAttributesPattern.c_str());
- }
-
- auto numThreads = getMaxConcurrentTasks();
-
- logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads", listeningPort.c_str(), basePath.c_str(), numThreads);
-
- // Initialize web server
- std::vector<std::string> options;
- options.push_back("enable_keep_alive");
- options.push_back("yes");
- options.push_back("keep_alive_timeout_ms");
- options.push_back("15000");
- options.push_back("num_threads");
- options.push_back(std::to_string(numThreads));
-
- if (sslCertFile.empty()) {
- options.push_back("listening_ports");
- options.push_back(listeningPort);
- } else {
- listeningPort += "s";
- options.push_back("listening_ports");
- options.push_back(listeningPort);
-
- options.push_back("ssl_certificate");
- options.push_back(sslCertFile);
-
- if (!sslCertAuthorityFile.empty()) {
- options.push_back("ssl_ca_file");
- options.push_back(sslCertAuthorityFile);
- }
-
- if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) {
- options.push_back("ssl_verify_peer");
- options.push_back("no");
- } else {
- options.push_back("ssl_verify_peer");
- options.push_back("yes");
- }
-
- if (sslMinVer.compare("SSL2") == 0) {
- options.push_back("ssl_protocol_version");
- options.push_back(std::to_string(0));
- } else if (sslMinVer.compare("SSL3") == 0) {
- options.push_back("ssl_protocol_version");
- options.push_back(std::to_string(1));
- } else if (sslMinVer.compare("TLS1.0") == 0) {
- options.push_back("ssl_protocol_version");
- options.push_back(std::to_string(2));
- } else if (sslMinVer.compare("TLS1.1") == 0) {
- options.push_back("ssl_protocol_version");
- options.push_back(std::to_string(3));
- } else {
- options.push_back("ssl_protocol_version");
- options.push_back(std::to_string(4));
- }
- }
-
- _server.reset(new CivetServer(options));
- _handler.reset(new Handler(context, sessionFactory, std::move(authDNPattern), std::move(headersAsAttributesPattern)));
- _server->addHandler(basePath, _handler.get());
-}
-
-ListenHTTP::~ListenHTTP() {
-}
-
-void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
-
- // Do nothing if there are no incoming files
- if (!flowFile) {
- return;
- }
-}
-
-ListenHTTP::Handler::Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern)
- : _authDNRegex(std::move(authDNPattern)),
- _headersAsAttributesRegex(std::move(headersAsAttributesPattern)),
- logger_(logging::LoggerFactory<ListenHTTP::Handler>::getLogger()) {
- _processContext = context;
- _processSessionFactory = sessionFactory;
-}
-
-void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) {
- mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"
- "Content-Type: text/html\r\n"
- "Content-Length: 0\r\n\r\n");
-}
-
-bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn) {
- auto req_info = mg_get_request_info(conn);
- logger_->log_info("ListenHTTP handling POST request of length %ll", req_info->content_length);
-
- // If this is a two-way TLS connection, authorize the peer against the configured pattern
- if (req_info->is_ssl && req_info->client_cert != nullptr) {
- if (!std::regex_match(req_info->client_cert->subject, _authDNRegex)) {
- mg_printf(conn, "HTTP/1.1 403 Forbidden\r\n"
- "Content-Type: text/html\r\n"
- "Content-Length: 0\r\n\r\n");
- logger_->log_warn("ListenHTTP client DN not authorized: %s", req_info->client_cert->subject);
- return true;
- }
- }
-
- // Always send 100 Continue, as allowed per standard to minimize client delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html)
- mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n");
-
- auto session = _processSessionFactory->createSession();
- ListenHTTP::WriteCallback callback(conn, req_info);
- auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
-
- if (!flowFile) {
- sendErrorResponse(conn);
- return true;
- }
-
- try {
- session->write(flowFile, &callback);
-
- // Add filename from "filename" header value (and pattern headers)
- for (int i = 0; i < req_info->num_headers; i++) {
- auto header = &req_info->http_headers[i];
-
- if (strcmp("filename", header->name) == 0) {
- if (!flowFile->updateAttribute("filename", header->value)) {
- flowFile->addAttribute("filename", header->value);
- }
- } else if (std::regex_match(header->name, _headersAsAttributesRegex)) {
- if (!flowFile->updateAttribute(header->name, header->value)) {
- flowFile->addAttribute(header->name, header->value);
- }
- }
- }
-
- session->transfer(flowFile, Success);
- session->commit();
- } catch (std::exception &exception) {
- logger_->log_debug("ListenHTTP Caught Exception %s", exception.what());
- sendErrorResponse(conn);
- session->rollback();
- throw;
- } catch (...) {
- logger_->log_debug("ListenHTTP Caught Exception Processor::onTrigger");
- sendErrorResponse(conn);
- session->rollback();
- throw;
- }
-
- mg_printf(conn, "HTTP/1.1 200 OK\r\n"
- "Content-Type: text/html\r\n"
- "Content-Length: 0\r\n\r\n");
-
- return true;
-}
-
-ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo)
- : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) {
- _conn = conn;
- _reqInfo = reqInfo;
-}
-
-int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
- int64_t rlen;
- int64_t nlen = 0;
- int64_t tlen = _reqInfo->content_length;
- uint8_t buf[16384];
-
- // if we have no content length we should call mg_read until
- // there is no data left from the stream to be HTTP/1.1 compliant
- while (tlen == -1 || nlen < tlen) {
- rlen = tlen == -1 ? sizeof(buf) : tlen - nlen;
-
- if (rlen > (int64_t)sizeof(buf)) {
- rlen = (int64_t)sizeof(buf);
- }
-
- // Read a buffer of data from client
- rlen = mg_read(_conn, &buf[0], (size_t) rlen);
-
- if (rlen <= 0) {
- break;
- }
-
- // Transfer buffer data to the output stream
- stream->write(&buf[0], rlen);
-
- nlen += rlen;
- }
-
- return nlen;
-}
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/TestServer.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestServer.h b/libminifi/test/TestServer.h
deleted file mode 100644
index 06f996c..0000000
--- a/libminifi/test/TestServer.h
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_TEST_TESTSERVER_H_
-#define LIBMINIFI_TEST_TESTSERVER_H_
-#include <regex.h>
-#include <string>
-#include <iostream>
-#include "civetweb.h"
-#include "CivetServer.h"
-
-
-/* Server context handle */
-static std::string resp_str;
-
-void init_webserver() {
- mg_init_library(0);
-}
-
-
-CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler, struct mg_callbacks *callbacks, std::string &cert, std::string &ca_cert) {
- const char *options[] = { "listening_ports", port.c_str(), "error_log_file",
- "error.log", "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list",
- "ALL", "ssl_verify_peer", "no", 0 };
-
- std::vector<std::string> cpp_options;
- for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
- cpp_options.push_back(options[i]);
- }
- CivetServer *server = new CivetServer(cpp_options);
-
- server->addHandler(rooturi, handler);
-
- return server;
-
-}
-
-CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler) {
- const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), 0 };
-
- std::vector<std::string> cpp_options;
- for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
- cpp_options.push_back(options[i]);
- }
- CivetServer *server = new CivetServer(cpp_options);
-
- server->addHandler(rooturi, handler);
-
- return server;
-
-}
-
-bool parse_http_components(const std::string &url, std::string &port, std::string &scheme, std::string &path) {
- regex_t regex;
-
- const char *regexstr = "^(http|https)://(localhost:)([0-9]+)?(/.*)$";
-
- int ret = regcomp(®ex, regexstr, REG_EXTENDED);
- if (ret) {
- return false;
- }
-
- size_t potentialGroups = regex.re_nsub + 1;
- regmatch_t groups[potentialGroups];
- if (regexec(®ex, url.c_str(), potentialGroups, groups, 0) == 0) {
- for (int i = 0; i < potentialGroups; i++) {
- if (groups[i].rm_so == -1)
- break;
-
- std::string str(url.data() + groups[i].rm_so, groups[i].rm_eo - groups[i].rm_so);
- switch (i) {
- case 1:
- scheme = str;
- break;
- case 3:
- port = str;
- break;
- case 4:
- path = str;
- break;
- default:
- break;
- }
- }
- }
- if (path.empty() || scheme.empty() || port.empty())
- return false;
-
- regfree(®ex);
-
- return true;
-
-}
-
-static void stop_webserver(CivetServer *server) {
- if (server != nullptr)
- delete server;
-
- /* Un-initialize the library */
- mg_exit_library();
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/C2NullConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/C2NullConfiguration.cpp b/libminifi/test/curl-tests/C2NullConfiguration.cpp
deleted file mode 100644
index cbb1831..0000000
--- a/libminifi/test/curl-tests/C2NullConfiguration.cpp
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sys/stat.h>
-#undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
-#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
-#include "../TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "../include/core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "../unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "CivetServer.h"
-#include "RemoteProcessorGroupPort.h"
-#include "core/ConfigurableComponent.h"
-#include "controllers/SSLContextService.h"
-#include "../TestServer.h"
-#include "c2/C2Agent.h"
-#include "protocols/RESTReceiver.h"
-#include "../integration/IntegrationBase.h"
-#include "processors/LogAttribute.h"
-
-class VerifyC2Server : public IntegrationBase {
- public:
- explicit VerifyC2Server(bool isSecure)
- : isSecure(isSecure) {
- char format[] = "/tmp/ssth.XXXXXX";
- dir = testController.createTempDirectory(format);
- }
-
- void testSetup() {
- LogTestController::getInstance().setDebug<utils::HTTPClient>();
- LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
- LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
- LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
- LogTestController::getInstance().setDebug<processors::LogAttribute>();
- LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
- std::fstream file;
- ss << dir << "/" << "tstFile.ext";
- file.open(ss.str(), std::ios::out);
- file << "tempFile";
- file.close();
- }
-
- void cleanup() {
- unlink(ss.str().c_str());
- }
-
- void runAssertions() {
- assert(LogTestController::getInstance().contains("C2Agent] [info] Class is null") == true);
- assert(LogTestController::getInstance().contains("C2Agent] [debug] Could not instantiate null") == true);
- assert(LogTestController::getInstance().contains("Class is RESTSender") == true);
- }
-
- void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
- std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
- assert(proc != nullptr);
-
- std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
-
- assert(inv != nullptr);
- std::string url = "";
- inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
-
-
- std::string port, scheme, path;
- parse_http_components(url, port, scheme, path);
- configuration->set("c2.agent.protocol.class", "null");
- configuration->set("c2.rest.url", "");
- configuration->set("c2.rest.url.ack", "");
- configuration->set("c2.agent.heartbeat.reporter.classes", "null");
- configuration->set("c2.rest.listener.port", "null");
- configuration->set("c2.agent.heartbeat.period", "null");
- configuration->set("c2.rest.listener.heartbeat.rooturi", "null");
- }
-
- protected:
- bool isSecure;
- char *dir;
- std::stringstream ss;
- TestController testController;
-};
-
-int main(int argc, char **argv) {
- std::string key_dir, test_file_location, url;
- if (argc > 1) {
- test_file_location = argv[1];
- key_dir = argv[2];
- }
-
- bool isSecure = false;
- if (url.find("https") != std::string::npos) {
- isSecure = true;
- }
-
- VerifyC2Server harness(isSecure);
-
- harness.setKeyDir(key_dir);
-
- harness.run(test_file_location);
-
- return 0;
-}
-
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/C2UpdateTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/C2UpdateTest.cpp b/libminifi/test/curl-tests/C2UpdateTest.cpp
deleted file mode 100644
index 1a9fe5b..0000000
--- a/libminifi/test/curl-tests/C2UpdateTest.cpp
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sys/stat.h>
-#undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
-#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
-#include "../TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "../include/core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "../unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "c2/C2Agent.h"
-#include "CivetServer.h"
-#include <cstring>
-#include "RESTSender.h"
-
-void waitToVerifyProcessor() {
- std::this_thread::sleep_for(std::chrono::seconds(10));
-}
-
-static std::vector<std::string> responses;
-
-class ConfigHandler : public CivetHandler {
- public:
- ConfigHandler() {
- calls_ = 0;
- }
- bool handlePost(CivetServer *server, struct mg_connection *conn) {
- calls_++;
- if (responses.size() > 0) {
- std::string top_str = responses.back();
- responses.pop_back();
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- top_str.length());
- mg_printf(conn, "%s", top_str.c_str());
- } else {
- mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
- }
-
- return true;
- }
-
- bool handleGet(CivetServer *server, struct mg_connection *conn) {
- std::ifstream myfile(test_file_location_.c_str());
-
- if (myfile.is_open()) {
- std::stringstream buffer;
- buffer << myfile.rdbuf();
- std::string str = buffer.str();
- myfile.close();
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- str.length());
- mg_printf(conn, "%s", str.c_str());
- } else {
- mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
- }
-
- return true;
- }
- std::string test_file_location_;
- std::atomic<size_t> calls_;
-};
-
-int main(int argc, char **argv) {
- mg_init_library(0);
- LogTestController::getInstance().setInfo<minifi::FlowController>();
- LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
- LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
- LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
-
- const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 };
- std::vector<std::string> cpp_options;
- for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
- cpp_options.push_back(options[i]);
- }
-
- CivetServer server(cpp_options);
- ConfigHandler h_ex;
- server.addHandler("/update", h_ex);
- std::string key_dir, test_file_location;
- if (argc > 1) {
- h_ex.test_file_location_ = test_file_location = argv[1];
- key_dir = argv[2];
- }
- std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
- "\"operation\" : \"update\", "
- "\"operationid\" : \"8675309\", "
- "\"name\": \"configuration\""
- "}]}";
-
- responses.push_back(heartbeat_response);
-
- std::ifstream myfile(test_file_location.c_str());
-
- if (myfile.is_open()) {
- std::stringstream buffer;
- buffer << myfile.rdbuf();
- std::string str = buffer.str();
- myfile.close();
- std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
- "\"operation\" : \"update\", "
- "\"operationid\" : \"8675309\", "
- "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:9090/update\"}}]}";
- responses.push_back(response);
- }
-
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-
- configuration->set("c2.rest.url", "http://localhost:9090/update");
- configuration->set("c2.agent.heartbeat.period", "1000");
- mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
-
- std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
- std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
-
- configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
-
- std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
- new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
- std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
-
- std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
- true);
-
- core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
-
- std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
- std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
- ptr.release();
- auto start = std::chrono::system_clock::now();
-
- controller->load();
- controller->start();
- waitToVerifyProcessor();
-
- controller->waitUnload(60000);
- auto then = std::chrono::system_clock::now();
-
- auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
- std::string logs = LogTestController::getInstance().log_output.str();
- assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos);
- LogTestController::getInstance().reset();
- rmdir("./content_repository");
- assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
-
- return 0;
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/C2VerifyHeartbeatAndStop.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/C2VerifyHeartbeatAndStop.cpp b/libminifi/test/curl-tests/C2VerifyHeartbeatAndStop.cpp
deleted file mode 100644
index 69c9a3b..0000000
--- a/libminifi/test/curl-tests/C2VerifyHeartbeatAndStop.cpp
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sys/stat.h>
-#undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
-#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
-#include "../TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "../include/core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "../unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "CivetServer.h"
-#include "RemoteProcessorGroupPort.h"
-#include "core/ConfigurableComponent.h"
-#include "controllers/SSLContextService.h"
-#include "../TestServer.h"
-#include "c2/C2Agent.h"
-#include "RESTReceiver.h"
-#include "../integration/IntegrationBase.h"
-#include "processors/LogAttribute.h"
-
-class Responder : public CivetHandler {
- public:
- explicit Responder(bool isSecure)
- : isSecure(isSecure) {
- }
- bool handlePost(CivetServer *server, struct mg_connection *conn) {
- std::string resp =
- "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\" }, "
- "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\" } ]}";
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- resp.length());
- mg_printf(conn, "%s", resp.c_str());
- return true;
- }
-
- protected:
- bool isSecure;
-};
-
-class VerifyC2Heartbeat : public IntegrationBase {
- public:
- explicit VerifyC2Heartbeat(bool isSecure)
- : isSecure(isSecure) {
- char format[] = "/tmp/ssth.XXXXXX";
- dir = testController.createTempDirectory(format);
- }
-
- void testSetup() {
- LogTestController::getInstance().setDebug<utils::HTTPClient>();
- LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
- LogTestController::getInstance().setDebug<LogTestController>();
- LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
- LogTestController::getInstance().setDebug<minifi::c2::RESTProtocol>();
- LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
- std::fstream file;
- ss << dir << "/" << "tstFile.ext";
- file.open(ss.str(), std::ios::out);
- file << "tempFile";
- file.close();
- }
-
- void cleanup() {
- LogTestController::getInstance().reset();
- unlink(ss.str().c_str());
- }
-
- void runAssertions() {
- assert(LogTestController::getInstance().contains("Received Ack from Server") == true);
-
- assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component invoke") == true);
-
- assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController") == true);
- }
-
- void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
- std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
- assert(proc != nullptr);
-
- std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
-
- assert(inv != nullptr);
- std::string url = "";
- inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
-
- configuration->set("c2.rest.url", "http://localhost:8888/api/heartbeat");
- configuration->set("c2.agent.heartbeat.period", "1000");
- configuration->set("c2.rest.url.ack", "http://localhost:8888/api/heartbeat");
- }
-
- protected:
- bool isSecure;
- char *dir;
- std::stringstream ss;
- TestController testController;
-};
-
-int main(int argc, char **argv) {
- std::string key_dir, test_file_location, url;
- url = "http://localhost:8888/api/heartbeat";
- if (argc > 1) {
- test_file_location = argv[1];
- key_dir = argv[2];
- }
-
- bool isSecure = false;
- if (url.find("https") != std::string::npos) {
- isSecure = true;
- }
-
- VerifyC2Heartbeat harness(isSecure);
-
- harness.setKeyDir(key_dir);
-
- Responder responder(isSecure);
-
- harness.setUrl(url, &responder);
-
- harness.run(test_file_location);
-
- return 0;
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/C2VerifyServeResults.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/C2VerifyServeResults.cpp b/libminifi/test/curl-tests/C2VerifyServeResults.cpp
deleted file mode 100644
index 961fec0..0000000
--- a/libminifi/test/curl-tests/C2VerifyServeResults.cpp
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sys/stat.h>
-#undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
-#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
-#include "../TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "../include/core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "../unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "CivetServer.h"
-#include "RemoteProcessorGroupPort.h"
-#include "core/ConfigurableComponent.h"
-#include "controllers/SSLContextService.h"
-#include "../TestServer.h"
-#include "c2/C2Agent.h"
-#include "RESTReceiver.h"
-#include "../integration/IntegrationBase.h"
-#include "processors/LogAttribute.h"
-
-class VerifyC2Server : public IntegrationBase {
- public:
- explicit VerifyC2Server(bool isSecure)
- : isSecure(isSecure) {
- char format[] = "/tmp/ssth.XXXXXX";
- dir = testController.createTempDirectory(format);
- }
-
- void testSetup() {
- LogTestController::getInstance().setDebug<utils::HTTPClient>();
- LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
- LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
- LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
- LogTestController::getInstance().setDebug<processors::LogAttribute>();
- LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
- std::fstream file;
- ss << dir << "/" << "tstFile.ext";
- file.open(ss.str(), std::ios::out);
- file << "tempFile";
- file.close();
- }
-
- void cleanup() {
- unlink(ss.str().c_str());
- }
-
- void runAssertions() {
- assert(LogTestController::getInstance().contains("Import offset 0") == true);
-
- assert(LogTestController::getInstance().contains("Outputting success and response") == true);
- }
-
- void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
- std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
- assert(proc != nullptr);
-
- std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
-
- assert(inv != nullptr);
- std::string url = "";
- inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
-
-
- std::string port, scheme, path;
- parse_http_components(url, port, scheme, path);
- configuration->set("c2.agent.heartbeat.reporter.classes", "RESTReceiver");
- configuration->set("c2.rest.listener.port", port);
- configuration->set("c2.agent.heartbeat.period", "10");
- configuration->set("c2.rest.listener.heartbeat.rooturi", path);
- }
-
- protected:
- bool isSecure;
- char *dir;
- std::stringstream ss;
- TestController testController;
-};
-
-int main(int argc, char **argv) {
- std::string key_dir, test_file_location, url;
- if (argc > 1) {
- test_file_location = argv[1];
- key_dir = argv[2];
- }
-
- bool isSecure = false;
- if (url.find("https") != std::string::npos) {
- isSecure = true;
- }
-
- VerifyC2Server harness(isSecure);
-
- harness.setKeyDir(key_dir);
-
- harness.run(test_file_location);
-
- return 0;
-}