You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/01/10 19:14:03 UTC
[2/4] nifi-minifi-cpp git commit: MINIFICPP-364: resolve issues with
test extension. This will cause issues with bootstrapping. Also resolve
linkage between civet and curl extensions
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/RESTSender.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/RESTSender.cpp b/extensions/http-curl/RESTSender.cpp
deleted file mode 100644
index 839c70b..0000000
--- a/extensions/http-curl/RESTSender.cpp
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "c2/protocols/RESTSender.h"
-
-#include <algorithm>
-#include <memory>
-#include <utility>
-#include <map>
-#include <string>
-#include <vector>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-RESTSender::RESTSender(std::string name, uuid_t uuid)
- : C2Protocol(name, uuid),
- logger_(logging::LoggerFactory<Connectable>::getLogger()) {
-}
-
-void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
- C2Protocol::initialize(controller, configure);
- // base URL when one is not specified.
- if (nullptr != configure) {
- configure->get("c2.rest.url", rest_uri_);
- configure->get("c2.rest.url.ack", ack_uri_);
- }
- logger_->log_info("Submitting to %s", rest_uri_);
-}
-C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) {
- std::string operation_request_str = getOperation(payload);
- std::string outputConfig;
- if (direction == Direction::TRANSMIT) {
- Json::Value json_payload;
- json_payload["operation"] = operation_request_str;
- if (payload.getIdentifier().length() > 0) {
- json_payload["operationid"] = payload.getIdentifier();
- }
- const std::vector<C2ContentResponse> &content = payload.getContent();
-
- for (const auto &payload_content : content) {
- Json::Value payload_content_values;
- bool use_sub_option = true;
- if (payload_content.op == payload.getOperation()) {
- for (auto content : payload_content.operation_arguments) {
- if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
- json_payload[payload_content.name] = content.second;
- use_sub_option = false;
- } else {
- payload_content_values[content.first] = content.second;
- }
- }
- }
- if (use_sub_option)
- json_payload[payload_content.name] = payload_content_values;
- }
-
- for (const auto &nested_payload : payload.getNestedPayloads()) {
- json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload);
- }
-
- Json::StyledWriter writer;
- outputConfig = writer.write(json_payload);
- }
-
- return sendPayload(url, direction, payload, outputConfig);
-}
-
-C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction direction, bool async) {
- if (payload.getOperation() == ACKNOWLEDGE) {
- return consumePayload(ack_uri_, payload, direction, async);
- }
- return consumePayload(rest_uri_, payload, direction, async);
-}
-
-void RESTSender::update(const std::shared_ptr<Configure> &configure) {
- std::string url;
- configure->get("c2.rest.url", url);
- configure->get("c2.rest.url.ack", url);
-}
-
-const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) {
- utils::HTTPClient client(url, ssl_context_service_);
- client.setConnectionTimeout(2);
-
- std::unique_ptr<utils::ByteInputCallBack> input = nullptr;
- std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr;
- if (direction == Direction::TRANSMIT) {
- input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
- callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback);
- input->write(outputConfig);
- callback->ptr = input.get();
- callback->pos = 0;
- client.set_request_method("POST");
- client.setUploadCallback(callback.get());
- } else {
- // we do not need to set the uplaod callback
- // since we are not uploading anything on a get
- client.set_request_method("GET");
- }
- client.setContentType("application/json");
- bool isOkay = client.submit();
- int64_t respCode = client.getResponseCode();
-
- if (isOkay && respCode) {
- if (payload.isRaw()) {
- C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true);
-
- response_payload.setRawData(client.getResponseBody());
- return response_payload;
- }
- return parseJsonResponse(payload, client.getResponseBody());
- } else {
- return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
- }
-}
-
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/c2/protocols/RESTSender.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/c2/protocols/RESTSender.h b/extensions/http-curl/c2/protocols/RESTSender.h
deleted file mode 100644
index 450799c..0000000
--- a/extensions/http-curl/c2/protocols/RESTSender.h
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_C2_RESTSENDER_H_
-#define LIBMINIFI_INCLUDE_C2_RESTSENDER_H_
-
-#include "json/json.h"
-#include "json/writer.h"
-#include <string>
-#include <mutex>
-
-#include "utils/ByteArrayCallback.h"
-#include "c2/C2Protocol.h"
-#include "c2/protocols/RESTProtocol.h"
-#include "c2/HeartBeatReporter.h"
-#include "controllers/SSLContextService.h"
-#include "../client/HTTPClient.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-/**
- * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
- *
- * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
- * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
- * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
- *
- */
-class RESTSender : public RESTProtocol, public C2Protocol {
- public:
-
- explicit RESTSender(std::string name, uuid_t uuid = nullptr);
-
- virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) override;
-
- virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async) override;
-
- virtual void update(const std::shared_ptr<Configure> &configure) override;
-
- virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
-
- protected:
-
- virtual const C2Payload sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig);
-
- std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
-
- private:
- std::shared_ptr<logging::Logger> logger_;
- std::string rest_uri_;
- std::string ack_uri_;
-};
-
-REGISTER_RESOURCE(RESTSender);
-
-} /* namesapce c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif /* LIBMINIFI_INCLUDE_C2_RESTPROTOCOL_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/client/HTTPClient.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index 85000d9..ace479c 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -198,10 +198,6 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
CURLcode res;
-
-
-
-
CURL *http_session_;
std::string method_;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/protocols/RESTReceiver.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp b/extensions/http-curl/protocols/RESTReceiver.cpp
new file mode 100644
index 0000000..4c46516
--- /dev/null
+++ b/extensions/http-curl/protocols/RESTReceiver.cpp
@@ -0,0 +1,147 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RESTReceiver.h"
+#include <algorithm>
+#include <memory>
+#include <utility>
+#include <map>
+#include <string>
+#include <vector>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+int log_message(const struct mg_connection *conn, const char *message) {
+ puts(message);
+ return 1;
+}
+
+int ssl_protocol_en(void *ssl_context, void *user_data) {
+ return 0;
+}
+
+RESTReceiver::RESTReceiver(std::string name, uuid_t uuid)
+ : HeartBeatReporter(name, uuid),
+ logger_(logging::LoggerFactory<RESTReceiver>::getLogger()) {
+}
+
+void RESTReceiver::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+ HeartBeatReporter::initialize(controller, configure);
+ logger_->log_debug("Initializing rest receiveer");
+ if (nullptr != configuration_) {
+ std::string listeningPort, rootUri, caCert;
+ configuration_->get("c2.rest.listener.port", listeningPort);
+ configuration_->get("c2.rest.listener.heartbeat.rooturi", rootUri);
+ configuration_->get("c2.rest.listener.cacert", caCert);
+
+ if (!listeningPort.empty() && !rootUri.empty()) {
+ handler = std::unique_ptr<ListeningProtocol>(new ListeningProtocol());
+ if (!caCert.empty()) {
+ listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()), caCert);
+ } else {
+ listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()));
+ }
+ }
+ }
+}
+int16_t RESTReceiver::heartbeat(const C2Payload &payload) {
+ std::string operation_request_str = getOperation(payload);
+ std::string outputConfig;
+ Json::Value json_payload;
+ json_payload["operation"] = operation_request_str;
+ if (payload.getIdentifier().length() > 0) {
+ json_payload["operationid"] = payload.getIdentifier();
+ }
+ const std::vector<C2ContentResponse> &content = payload.getContent();
+
+ for (const auto &payload_content : content) {
+ Json::Value payload_content_values;
+ bool use_sub_option = true;
+ if (payload_content.op == payload.getOperation()) {
+ for (auto content : payload_content.operation_arguments) {
+ if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
+ json_payload[payload_content.name] = content.second;
+ use_sub_option = false;
+ } else {
+ payload_content_values[content.first] = content.second;
+ }
+ }
+ }
+ if (use_sub_option)
+ json_payload[payload_content.name] = payload_content_values;
+ }
+
+ for (const auto &nested_payload : payload.getNestedPayloads()) {
+ json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload);
+ }
+
+ Json::StyledWriter writer;
+ outputConfig = writer.write(json_payload);
+ if (handler != nullptr) {
+ logger_->log_debug("Setting %s", outputConfig);
+ handler->setResponse(outputConfig);
+ }
+
+ return 0;
+}
+
+std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert) {
+ struct mg_callbacks callback;
+
+ memset(&callback, 0, sizeof(callback));
+ callback.init_ssl = ssl_protocol_en;
+ std::string my_port = port;
+ my_port += "s";
+ callback.log_message = log_message;
+ const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", "ALL",
+ "ssl_verify_peer", "no", "num_threads", "1", 0 };
+
+ std::vector<std::string> cpp_options;
+ for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+ cpp_options.push_back(options[i]);
+ }
+ std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options));
+
+ server->addHandler(rooturi, handler);
+
+ return server;
+}
+
+std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler) {
+ const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), "num_threads", "1", 0 };
+
+ std::vector<std::string> cpp_options;
+ for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+ cpp_options.push_back(options[i]);
+ }
+ std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options));
+
+ server->addHandler(rooturi, handler);
+
+ return server;
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/protocols/RESTReceiver.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTReceiver.h b/extensions/http-curl/protocols/RESTReceiver.h
new file mode 100644
index 0000000..4793ee3
--- /dev/null
+++ b/extensions/http-curl/protocols/RESTReceiver.h
@@ -0,0 +1,110 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_RESTRCVR_H_
+#define LIBMINIFI_INCLUDE_C2_RESTRCVR_H_
+
+#include "json/json.h"
+#include "json/writer.h"
+#include <string>
+#include <mutex>
+#include "core/Resource.h"
+#include "c2/protocols/RESTProtocol.h"
+#include "CivetServer.h"
+#include "c2/C2Protocol.h"
+#include "controllers/SSLContextService.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+int log_message(const struct mg_connection *conn, const char *message);
+
+int ssl_protocol_en(void *ssl_context, void *user_data);
+
+/**
+ * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
+ *
+ * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
+ * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
+ * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
+ *
+ */
+class RESTReceiver : public RESTProtocol, public HeartBeatReporter {
+ public:
+ RESTReceiver(std::string name, uuid_t uuid = nullptr);
+
+ virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
+ virtual int16_t heartbeat(const C2Payload &heartbeat) override;
+
+ protected:
+
+ class ListeningProtocol : public CivetHandler {
+
+ public:
+ ListeningProtocol() {
+
+ }
+
+ bool handleGet(CivetServer *server, struct mg_connection *conn) {
+ std::string currentvalue;
+ {
+ std::lock_guard<std::mutex> lock(reponse_mutex_);
+ currentvalue = resp_;
+ }
+
+ std::stringstream output;
+ output << "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: " << currentvalue.length() << "\r\nConnection: close\r\n\r\n";
+
+ mg_printf(conn, "%s", output.str().c_str());
+ mg_printf(conn, "%s", currentvalue.c_str());
+ return true;
+ }
+
+ void setResponse(std::string response) {
+ std::lock_guard<std::mutex> lock(reponse_mutex_);
+ resp_ = response;
+ }
+
+ protected:
+ std::mutex reponse_mutex_;
+ std::string resp_;
+
+ };
+
+ std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert);
+
+ std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler);
+
+ std::unique_ptr<CivetServer> listener;
+ std::unique_ptr<ListeningProtocol> handler;
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(RESTReceiver);
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/protocols/RESTSender.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
new file mode 100644
index 0000000..ebf532a
--- /dev/null
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -0,0 +1,140 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RESTSender.h"
+
+#include <algorithm>
+#include <memory>
+#include <utility>
+#include <map>
+#include <string>
+#include <vector>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+RESTSender::RESTSender(std::string name, uuid_t uuid)
+ : C2Protocol(name, uuid),
+ logger_(logging::LoggerFactory<Connectable>::getLogger()) {
+}
+
+void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+ C2Protocol::initialize(controller, configure);
+ // base URL when one is not specified.
+ if (nullptr != configure) {
+ configure->get("c2.rest.url", rest_uri_);
+ configure->get("c2.rest.url.ack", ack_uri_);
+ }
+ logger_->log_info("Submitting to %s", rest_uri_);
+}
+C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) {
+ std::string operation_request_str = getOperation(payload);
+ std::string outputConfig;
+ if (direction == Direction::TRANSMIT) {
+ Json::Value json_payload;
+ json_payload["operation"] = operation_request_str;
+ if (payload.getIdentifier().length() > 0) {
+ json_payload["operationid"] = payload.getIdentifier();
+ }
+ const std::vector<C2ContentResponse> &content = payload.getContent();
+
+ for (const auto &payload_content : content) {
+ Json::Value payload_content_values;
+ bool use_sub_option = true;
+ if (payload_content.op == payload.getOperation()) {
+ for (auto content : payload_content.operation_arguments) {
+ if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
+ json_payload[payload_content.name] = content.second;
+ use_sub_option = false;
+ } else {
+ payload_content_values[content.first] = content.second;
+ }
+ }
+ }
+ if (use_sub_option)
+ json_payload[payload_content.name] = payload_content_values;
+ }
+
+ for (const auto &nested_payload : payload.getNestedPayloads()) {
+ json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload);
+ }
+
+ Json::StyledWriter writer;
+ outputConfig = writer.write(json_payload);
+ }
+
+ return sendPayload(url, direction, payload, outputConfig);
+}
+
+C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction direction, bool async) {
+ if (payload.getOperation() == ACKNOWLEDGE) {
+ return consumePayload(ack_uri_, payload, direction, async);
+ }
+ return consumePayload(rest_uri_, payload, direction, async);
+}
+
+void RESTSender::update(const std::shared_ptr<Configure> &configure) {
+ std::string url;
+ configure->get("c2.rest.url", url);
+ configure->get("c2.rest.url.ack", url);
+}
+
+const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) {
+ utils::HTTPClient client(url, ssl_context_service_);
+ client.setConnectionTimeout(2);
+
+ std::unique_ptr<utils::ByteInputCallBack> input = nullptr;
+ std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr;
+ if (direction == Direction::TRANSMIT) {
+ input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
+ callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback);
+ input->write(outputConfig);
+ callback->ptr = input.get();
+ callback->pos = 0;
+ client.set_request_method("POST");
+ client.setUploadCallback(callback.get());
+ } else {
+ // we do not need to set the uplaod callback
+ // since we are not uploading anything on a get
+ client.set_request_method("GET");
+ }
+ client.setContentType("application/json");
+ bool isOkay = client.submit();
+ int64_t respCode = client.getResponseCode();
+
+ if (isOkay && respCode) {
+ if (payload.isRaw()) {
+ C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true);
+
+ response_payload.setRawData(client.getResponseBody());
+ return response_payload;
+ }
+ return parseJsonResponse(payload, client.getResponseBody());
+ } else {
+ return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
+ }
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/protocols/RESTSender.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTSender.h b/extensions/http-curl/protocols/RESTSender.h
new file mode 100644
index 0000000..450799c
--- /dev/null
+++ b/extensions/http-curl/protocols/RESTSender.h
@@ -0,0 +1,80 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_RESTSENDER_H_
+#define LIBMINIFI_INCLUDE_C2_RESTSENDER_H_
+
+#include "json/json.h"
+#include "json/writer.h"
+#include <string>
+#include <mutex>
+
+#include "utils/ByteArrayCallback.h"
+#include "c2/C2Protocol.h"
+#include "c2/protocols/RESTProtocol.h"
+#include "c2/HeartBeatReporter.h"
+#include "controllers/SSLContextService.h"
+#include "../client/HTTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
+ *
+ * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
+ * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
+ * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
+ *
+ */
+class RESTSender : public RESTProtocol, public C2Protocol {
+ public:
+
+ explicit RESTSender(std::string name, uuid_t uuid = nullptr);
+
+ virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) override;
+
+ virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async) override;
+
+ virtual void update(const std::shared_ptr<Configure> &configure) override;
+
+ virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
+
+ protected:
+
+ virtual const C2Payload sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig);
+
+ std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+ std::string rest_uri_;
+ std::string ack_uri_;
+};
+
+REGISTER_RESOURCE(RESTSender);
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_RESTPROTOCOL_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/C2NullConfiguration.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/C2NullConfiguration.cpp b/extensions/http-curl/tests/C2NullConfiguration.cpp
new file mode 100644
index 0000000..934cf02
--- /dev/null
+++ b/extensions/http-curl/tests/C2NullConfiguration.cpp
@@ -0,0 +1,136 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "CivetServer.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "controllers/SSLContextService.h"
+#include "TestServer.h"
+#include "protocols/RESTReceiver.h"
+#include "protocols/RESTSender.h"
+#include "c2/C2Agent.h"
+#include "processors/LogAttribute.h"
+#include "HTTPIntegrationBase.h"
+
+class VerifyC2Server : public HTTPIntegrationBase {
+ public:
+ explicit VerifyC2Server(bool isSecure)
+ : isSecure(isSecure) {
+ char format[] = "/tmp/ssth.XXXXXX";
+ dir = testController.createTempDirectory(format);
+ }
+
+ void testSetup() {
+ LogTestController::getInstance().setDebug<utils::HTTPClient>();
+ LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
+ LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
+ LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
+ LogTestController::getInstance().setDebug<processors::LogAttribute>();
+ LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
+ std::fstream file;
+ ss << dir << "/" << "tstFile.ext";
+ file.open(ss.str(), std::ios::out);
+ file << "tempFile";
+ file.close();
+ }
+
+ void cleanup() {
+ unlink(ss.str().c_str());
+ }
+
+ void runAssertions() {
+ assert(LogTestController::getInstance().contains("C2Agent] [info] Class is null") == true);
+ assert(LogTestController::getInstance().contains("C2Agent] [debug] Could not instantiate null") == true);
+ assert(LogTestController::getInstance().contains("Class is RESTSender") == true);
+ }
+
+ void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
+ std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
+ assert(proc != nullptr);
+
+ std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
+
+ assert(inv != nullptr);
+ std::string url = "";
+ inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+
+
+ std::string port, scheme, path;
+ parse_http_components(url, port, scheme, path);
+ configuration->set("c2.agent.protocol.class", "null");
+ configuration->set("c2.rest.url", "");
+ configuration->set("c2.rest.url.ack", "");
+ configuration->set("c2.agent.heartbeat.reporter.classes", "null");
+ configuration->set("c2.rest.listener.port", "null");
+ configuration->set("c2.agent.heartbeat.period", "null");
+ configuration->set("c2.rest.listener.heartbeat.rooturi", "null");
+ }
+
+ protected:
+ bool isSecure;
+ char *dir;
+ std::stringstream ss;
+ TestController testController;
+};
+
+int main(int argc, char **argv) {
+ std::string key_dir, test_file_location, url;
+ if (argc > 1) {
+ test_file_location = argv[1];
+ key_dir = argv[2];
+ }
+
+ bool isSecure = false;
+ if (url.find("https") != std::string::npos) {
+ isSecure = true;
+ }
+
+ VerifyC2Server harness(isSecure);
+
+ harness.setKeyDir(key_dir);
+
+ harness.run(test_file_location);
+
+ return 0;
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/C2UpdateTest.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/C2UpdateTest.cpp b/extensions/http-curl/tests/C2UpdateTest.cpp
new file mode 100644
index 0000000..f21084b
--- /dev/null
+++ b/extensions/http-curl/tests/C2UpdateTest.cpp
@@ -0,0 +1,183 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "c2/C2Agent.h"
+#include "CivetServer.h"
+#include <cstring>
+#include "protocols/RESTSender.h"
+
+void waitToVerifyProcessor() {
+ std::this_thread::sleep_for(std::chrono::seconds(10));
+}
+
+static std::vector<std::string> responses;
+
+class ConfigHandler : public CivetHandler {
+ public:
+ ConfigHandler() {
+ calls_ = 0;
+ }
+ bool handlePost(CivetServer *server, struct mg_connection *conn) {
+ calls_++;
+ if (responses.size() > 0) {
+ std::string top_str = responses.back();
+ responses.pop_back();
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ top_str.length());
+ mg_printf(conn, "%s", top_str.c_str());
+ } else {
+ mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
+ }
+
+ return true;
+ }
+
+ bool handleGet(CivetServer *server, struct mg_connection *conn) {
+ std::ifstream myfile(test_file_location_.c_str());
+
+ if (myfile.is_open()) {
+ std::stringstream buffer;
+ buffer << myfile.rdbuf();
+ std::string str = buffer.str();
+ myfile.close();
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ str.length());
+ mg_printf(conn, "%s", str.c_str());
+ } else {
+ mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
+ }
+
+ return true;
+ }
+ std::string test_file_location_;
+ std::atomic<size_t> calls_;
+};
+
+int main(int argc, char **argv) {
+ mg_init_library(0);
+ LogTestController::getInstance().setInfo<minifi::FlowController>();
+ LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
+ LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+ LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
+
+ const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 };
+ std::vector<std::string> cpp_options;
+ for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+ cpp_options.push_back(options[i]);
+ }
+
+ CivetServer server(cpp_options);
+ ConfigHandler h_ex;
+ server.addHandler("/update", h_ex);
+ std::string key_dir, test_file_location;
+ if (argc > 1) {
+ h_ex.test_file_location_ = test_file_location = argv[1];
+ key_dir = argv[2];
+ }
+ std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
+ "\"operation\" : \"update\", "
+ "\"operationid\" : \"8675309\", "
+ "\"name\": \"configuration\""
+ "}]}";
+
+ responses.push_back(heartbeat_response);
+
+ std::ifstream myfile(test_file_location.c_str());
+
+ if (myfile.is_open()) {
+ std::stringstream buffer;
+ buffer << myfile.rdbuf();
+ std::string str = buffer.str();
+ myfile.close();
+ std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
+ "\"operation\" : \"update\", "
+ "\"operationid\" : \"8675309\", "
+ "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:9090/update\"}}]}";
+ responses.push_back(response);
+ }
+
+ std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+
+ configuration->set("c2.rest.url", "http://localhost:9090/update");
+ configuration->set("c2.agent.heartbeat.period", "1000");
+ mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+
+ std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
+
+ configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+
+ std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+ new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
+ std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
+
+ std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
+ true);
+
+ core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+
+ std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
+ std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
+ ptr.release();
+ auto start = std::chrono::system_clock::now();
+
+ controller->load();
+ controller->start();
+ waitToVerifyProcessor();
+
+ controller->waitUnload(60000);
+ auto then = std::chrono::system_clock::now();
+
+ auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
+ std::string logs = LogTestController::getInstance().log_output.str();
+ assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos);
+ LogTestController::getInstance().reset();
+ rmdir("./content_repository");
+ assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
+
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
new file mode 100644
index 0000000..adb2db1
--- /dev/null
+++ b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
@@ -0,0 +1,156 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "CivetServer.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "controllers/SSLContextService.h"
+#include "TestServer.h"
+#include "c2/C2Agent.h"
+#include "protocols/RESTReceiver.h"
+#include "protocols/RESTSender.h"
+#include "HTTPIntegrationBase.h"
+#include "processors/LogAttribute.h"
+
+class Responder : public CivetHandler {
+ public:
+ explicit Responder(bool isSecure)
+ : isSecure(isSecure) {
+ }
+ bool handlePost(CivetServer *server, struct mg_connection *conn) {
+ std::string resp =
+ "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\" }, "
+ "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\" } ]}";
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ resp.length());
+ mg_printf(conn, "%s", resp.c_str());
+ return true;
+ }
+
+ protected:
+ bool isSecure;
+};
+
+class VerifyC2Heartbeat : public HTTPIntegrationBase {
+ public:
+ explicit VerifyC2Heartbeat(bool isSecure)
+ : isSecure(isSecure) {
+ char format[] = "/tmp/ssth.XXXXXX";
+ dir = testController.createTempDirectory(format);
+ }
+
+ void testSetup() {
+ LogTestController::getInstance().setDebug<utils::HTTPClient>();
+ LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+ LogTestController::getInstance().setDebug<LogTestController>();
+ LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+ LogTestController::getInstance().setDebug<minifi::c2::RESTProtocol>();
+ LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
+ std::fstream file;
+ ss << dir << "/" << "tstFile.ext";
+ file.open(ss.str(), std::ios::out);
+ file << "tempFile";
+ file.close();
+ }
+
+ void cleanup() {
+ LogTestController::getInstance().reset();
+ unlink(ss.str().c_str());
+ }
+
+ void runAssertions() {
+ assert(LogTestController::getInstance().contains("Received Ack from Server") == true);
+
+ assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component invoke") == true);
+
+ assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController") == true);
+ }
+
+ void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
+ std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
+ assert(proc != nullptr);
+
+ std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
+
+ assert(inv != nullptr);
+ std::string url = "";
+ inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+
+ configuration->set("c2.rest.url", "http://localhost:8888/api/heartbeat");
+ configuration->set("c2.agent.heartbeat.period", "1000");
+ configuration->set("c2.rest.url.ack", "http://localhost:8888/api/heartbeat");
+ }
+
+ protected:
+ bool isSecure;
+ char *dir;
+ std::stringstream ss;
+ TestController testController;
+};
+
+int main(int argc, char **argv) {
+ std::string key_dir, test_file_location, url;
+ url = "http://localhost:8888/api/heartbeat";
+ if (argc > 1) {
+ test_file_location = argv[1];
+ key_dir = argv[2];
+ }
+
+ bool isSecure = false;
+ if (url.find("https") != std::string::npos) {
+ isSecure = true;
+ }
+
+ VerifyC2Heartbeat harness(isSecure);
+
+ harness.setKeyDir(key_dir);
+
+ Responder responder(isSecure);
+
+ harness.setUrl(url, &responder);
+
+ harness.run(test_file_location);
+
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/C2VerifyServeResults.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/C2VerifyServeResults.cpp b/extensions/http-curl/tests/C2VerifyServeResults.cpp
new file mode 100644
index 0000000..fbbc8c8
--- /dev/null
+++ b/extensions/http-curl/tests/C2VerifyServeResults.cpp
@@ -0,0 +1,131 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "processors/InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "CivetServer.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "controllers/SSLContextService.h"
+#include "TestServer.h"
+#include "c2/C2Agent.h"
+#include "protocols/RESTReceiver.h"
+#include "HTTPIntegrationBase.h"
+#include "processors/LogAttribute.h"
+
+class VerifyC2Server : public HTTPIntegrationBase {
+ public:
+ explicit VerifyC2Server(bool isSecure)
+ : isSecure(isSecure) {
+ char format[] = "/tmp/ssth.XXXXXX";
+ dir = testController.createTempDirectory(format);
+ }
+
+ void testSetup() {
+ LogTestController::getInstance().setDebug<utils::HTTPClient>();
+ LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
+ LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
+ LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
+ LogTestController::getInstance().setDebug<processors::LogAttribute>();
+ LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
+ std::fstream file;
+ ss << dir << "/" << "tstFile.ext";
+ file.open(ss.str(), std::ios::out);
+ file << "tempFile";
+ file.close();
+ }
+
+ void cleanup() {
+ unlink(ss.str().c_str());
+ }
+
+ void runAssertions() {
+ assert(LogTestController::getInstance().contains("Import offset 0") == true);
+
+ assert(LogTestController::getInstance().contains("Outputting success and response") == true);
+ }
+
+ void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
+ std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
+ assert(proc != nullptr);
+
+ std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
+
+ assert(inv != nullptr);
+ std::string url = "";
+ inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+
+
+ std::string port, scheme, path;
+ parse_http_components(url, port, scheme, path);
+ configuration->set("c2.agent.heartbeat.reporter.classes", "RESTReceiver");
+ configuration->set("c2.rest.listener.port", port);
+ configuration->set("c2.agent.heartbeat.period", "10");
+ configuration->set("c2.rest.listener.heartbeat.rooturi", path);
+ }
+
+ protected:
+ bool isSecure;
+ char *dir;
+ std::stringstream ss;
+ TestController testController;
+};
+
+int main(int argc, char **argv) {
+ std::string key_dir, test_file_location, url;
+ if (argc > 1) {
+ test_file_location = argv[1];
+ key_dir = argv[2];
+ }
+
+ bool isSecure = false;
+ if (url.find("https") != std::string::npos) {
+ isSecure = true;
+ }
+
+ VerifyC2Server harness(isSecure);
+
+ harness.setKeyDir(key_dir);
+
+ harness.run(test_file_location);
+
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
new file mode 100644
index 0000000..8fd89e9
--- /dev/null
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB CURL_UNIT_TESTS "unit/*.cpp")
+file(GLOB CURL_INTEGRATION_TESTS "*.cpp")
+
+SET(CURL_INT_TEST_COUNT 0)
+
+FOREACH(testfile ${CURL_UNIT_TESTS})
+ get_filename_component(testfilename "${testfile}" NAME_WE)
+ add_executable("${testfilename}" "${testfile}")
+ target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS})
+ target_include_directories(${testfilename} BEFORE PRIVATE "${CIVET_THIRDPARTY_ROOT}")
+ target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../client/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../processors/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../protocols/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../sitetosite/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/civetweb/")
+ target_include_directories(${testfilename} BEFORE PRIVATE ./include)
+ createTests("${testfilename}")
+ target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+ if (APPLE)
+ target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl minifi-civet-extensions)
+ else ()
+ target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl minifi-civet-extensions -Wl,--no-whole-archive)
+ endif()
+ MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1")
+ENDFOREACH()
+
+FOREACH(testfile ${CURL_INTEGRATION_TESTS})
+ get_filename_component(testfilename "${testfile}" NAME_WE)
+ add_executable("${testfilename}" "${testfile}")
+ target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS})
+ target_include_directories(${testfilename} BEFORE PRIVATE "${CIVET_THIRDPARTY_ROOT}/include")
+ target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../client/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../processors/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../protocols/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "../sitetosite/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/civetweb/")
+ target_include_directories(${testfilename} BEFORE PRIVATE ./include)
+ createTests("${testfilename}")
+ if (APPLE)
+ target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl minifi-civet-extensions)
+ else ()
+ target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl minifi-civet-extensions -Wl,--no-whole-archive)
+ endif()
+ MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1")
+ENDFOREACH()
+
+message("-- Finished building ${CURL_INT_TEST_COUNT} libcURL integration test file(s)...")
+
+add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
+add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
+add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml" "${TEST_RESOURCES}/")
+add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml" "${TEST_RESOURCES}/")
+add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" "${TEST_RESOURCES}/")
+add_test(NAME HttpPostIntegrationTestChunked COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/")
+add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/C2VerifyServeResults.yml" "${TEST_RESOURCES}/")
+add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/C2VerifyHeartbeatAndStop.yml" "${TEST_RESOURCES}/")
+add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8077/nifi-api/site-to-site")
+add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/")
+add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/ThreadPoolAdjust.yml" "${TEST_RESOURCES}/")
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/CivetStream.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/CivetStream.h b/extensions/http-curl/tests/CivetStream.h
new file mode 100644
index 0000000..571b0ca
--- /dev/null
+++ b/extensions/http-curl/tests/CivetStream.h
@@ -0,0 +1,138 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_
+#define EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_
+
+#include <memory>
+#include <thread>
+#include <mutex>
+#include <future>
+#include <vector>
+
+#include "io/BaseStream.h"
+#include "civetweb.h"
+#include "CivetServer.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+class CivetStream : public io::BaseStream {
+ public:
+ /**
+ * File Stream constructor that accepts an fstream shared pointer.
+ * It must already be initialized for read and write.
+ */
+ explicit CivetStream(struct mg_connection *conn)
+ : io::BaseStream(), conn(conn) {
+
+ }
+
+ virtual ~CivetStream() {
+ }
+ /**
+ * Skip to the specified offset.
+ * @param offset offset to which we will skip
+ */
+ void seek(uint64_t offset){
+
+ }
+
+ const uint64_t getSize() const {
+ return BaseStream::readBuffer;
+ }
+
+ // data stream extensions
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+ virtual int readData(std::vector<uint8_t> &buf, int buflen) {
+ if (buf.capacity() < buflen) {
+ buf.resize(buflen);
+ }
+ int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen);
+
+ if (ret < buflen) {
+ buf.resize(ret);
+ }
+ return ret;
+ }
+
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+ virtual int readData(uint8_t *buf, int buflen) {
+ return mg_read(conn,buf,buflen);
+ }
+
+ /**
+ * Write value to the stream using std::vector
+ * @param buf incoming buffer
+ * @param buflen buffer to write
+ *
+ */
+ virtual int writeData(std::vector<uint8_t> &buf, int buflen) {
+ return 0;
+ }
+
+ /**
+ * writes value to stream
+ * @param value value to write
+ * @param size size of value
+ */
+ virtual int writeData(uint8_t *value, int size) {
+ return 0;
+ }
+
+ protected:
+
+ /**
+ * Creates a vector and returns the vector using the provided
+ * type name.
+ * @param t incoming object
+ * @returns vector.
+ */
+ template<typename T>
+ inline std::vector<uint8_t> readBuffer(const T& t) {
+ std::vector<uint8_t> buf;
+ buf.resize(sizeof t);
+ readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+ return buf;
+ }
+
+ void reset();
+
+ //size_t pos;
+ struct mg_connection *conn;
+
+ private:
+
+ std::shared_ptr<logging::Logger> logger_;
+};
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
new file mode 100644
index 0000000..612603a
--- /dev/null
+++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
@@ -0,0 +1,160 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <cassert>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <utility>
+#include <thread>
+#include <type_traits>
+#include <vector>
+
+#include "core/controller/ControllerServiceMap.h"
+#include "core/controller/StandardControllerServiceNode.h"
+#include "core/controller/StandardControllerServiceProvider.h"
+#include "controllers/SSLContextService.h"
+#include "core/Core.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/ProcessGroup.h"
+#include "core/Resource.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/MockClasses.h"
+#include "unit/ProvenanceTestHelper.h"
+
+REGISTER_RESOURCE(MockControllerService);
+REGISTER_RESOURCE(MockProcessor);
+
+std::shared_ptr<core::controller::StandardControllerServiceNode> newCsNode(std::shared_ptr<core::controller::ControllerServiceProvider> provider, const std::string id) {
+ std::shared_ptr<core::controller::ControllerService> service = std::make_shared<MockControllerService>();
+ std::shared_ptr<core::controller::StandardControllerServiceNode> testNode = std::make_shared<core::controller::StandardControllerServiceNode>(service, provider, id,
+ std::make_shared<minifi::Configure>());
+ return testNode;
+}
+
+void waitToVerifyProcessor() {
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+}
+
+int main(int argc, char **argv) {
+ std::string test_file_location;
+ std::string key_dir;
+
+ if (argc > 2) {
+ test_file_location = argv[1];
+ key_dir = argv[1];
+ }
+
+ std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+
+ std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
+
+ configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+ std::string client_cert = "cn.crt.pem";
+ std::string priv_key_file = "cn.ckey.pem";
+ std::string passphrase = "cn.pass";
+ std::string ca_cert = "nifi-cert.pem";
+ configuration->set(minifi::Configure::nifi_security_client_certificate, test_file_location);
+ configuration->set(minifi::Configure::nifi_security_client_private_key, priv_key_file);
+ configuration->set(minifi::Configure::nifi_security_client_pass_phrase, passphrase);
+ configuration->set(minifi::Configure::nifi_default_directory, key_dir);
+
+ std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(configuration);
+ std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+ new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
+ std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
+
+ std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
+ content_repo,
+ DEFAULT_ROOT_GROUP_NAME,
+ true);
+
+ disabled = false;
+ std::shared_ptr<core::controller::ControllerServiceMap> map = std::make_shared<core::controller::ControllerServiceMap>();
+
+ core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+
+ std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
+ std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
+ ptr.release();
+
+ std::shared_ptr<core::controller::StandardControllerServiceProvider> provider = std::make_shared<core::controller::StandardControllerServiceProvider>(map, pg, std::make_shared<minifi::Configure>());
+ std::shared_ptr<core::controller::ControllerServiceNode> mockNode = pg->findControllerService("MockItLikeIts1995");
+ assert(mockNode != nullptr);
+ mockNode->enable();
+ std::vector<std::shared_ptr<core::controller::ControllerServiceNode> > linkedNodes = mockNode->getLinkedControllerServices();
+ assert(linkedNodes.size() == 1);
+
+ std::shared_ptr<core::controller::ControllerServiceNode> notexistNode = pg->findControllerService("MockItLikeItsWrong");
+ assert(notexistNode == nullptr);
+
+ std::shared_ptr<core::controller::ControllerServiceNode> ssl_client_cont = nullptr;
+ std::shared_ptr<minifi::controllers::SSLContextService> ssl_client = nullptr;
+ {
+ std::lock_guard<std::mutex> lock(control_mutex);
+ controller->load();
+ controller->start();
+ ssl_client_cont = controller->getControllerServiceNode("SSLClientServiceTest");
+ ssl_client_cont->enable();
+ assert(ssl_client_cont != nullptr);
+ assert(ssl_client_cont->getControllerServiceImplementation() != nullptr);
+ ssl_client = std::static_pointer_cast<minifi::controllers::SSLContextService>(ssl_client_cont->getControllerServiceImplementation());
+ }
+ assert(ssl_client->getCACertificate().length() > 0);
+ // now let's disable one of the controller services.
+ std::shared_ptr<core::controller::ControllerServiceNode> cs_id = controller->getControllerServiceNode("ID");
+ assert(cs_id != nullptr);
+ {
+ std::lock_guard<std::mutex> lock(control_mutex);
+ controller->disableControllerService(cs_id);
+ disabled = true;
+ waitToVerifyProcessor();
+ }
+ {
+ std::lock_guard<std::mutex> lock(control_mutex);
+ controller->enableControllerService(cs_id);
+ disabled = false;
+ waitToVerifyProcessor();
+ }
+ std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995");
+ assert(cs_id->enabled());
+{
+ std::lock_guard<std::mutex> lock(control_mutex);
+ controller->disableReferencingServices(mock_cont);
+ disabled = true;
+ waitToVerifyProcessor();
+ }
+ assert(cs_id->enabled() == false);
+{
+ std::lock_guard<std::mutex> lock(control_mutex);
+ controller->enableReferencingServices(mock_cont);
+ disabled = false;
+ waitToVerifyProcessor();
+ }
+ assert(cs_id->enabled() == true);
+
+ controller->waitUnload(60000);
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/GetFileNoData.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/GetFileNoData.cpp b/extensions/http-curl/tests/GetFileNoData.cpp
new file mode 100644
index 0000000..299d994
--- /dev/null
+++ b/extensions/http-curl/tests/GetFileNoData.cpp
@@ -0,0 +1,184 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "c2/C2Agent.h"
+#include "CivetServer.h"
+#include <cstring>
+#include "protocols/RESTSender.h"
+
+void waitToVerifyProcessor() {
+ std::this_thread::sleep_for(std::chrono::seconds(10));
+}
+
+static std::vector<std::string> responses;
+
+class ConfigHandler : public CivetHandler {
+ public:
+ bool handlePost(CivetServer *server, struct mg_connection *conn) {
+ if (responses.size() > 0) {
+ std::string top_str = responses.back();
+ responses.pop_back();
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ top_str.length());
+ mg_printf(conn, "%s", top_str.c_str());
+ } else {
+ mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
+ }
+
+ return true;
+ }
+
+ bool handleGet(CivetServer *server, struct mg_connection *conn) {
+ std::ifstream myfile(test_file_location_.c_str());
+
+ if (myfile.is_open()) {
+ std::stringstream buffer;
+ buffer << myfile.rdbuf();
+ std::string str = buffer.str();
+ myfile.close();
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ str.length());
+ mg_printf(conn, "%s", str.c_str());
+ } else {
+ mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
+ }
+
+ return true;
+ }
+ std::string test_file_location_;
+};
+
+int main(int argc, char **argv) {
+ mg_init_library(0);
+ LogTestController::getInstance().setInfo<minifi::FlowController>();
+ LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
+ LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+ LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
+
+ const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 };
+ std::vector<std::string> cpp_options;
+ for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+ cpp_options.push_back(options[i]);
+ }
+
+ CivetServer server(cpp_options);
+ ConfigHandler h_ex;
+ server.addHandler("/update", h_ex);
+ std::string key_dir, test_file_location;
+ if (argc > 1) {
+ h_ex.test_file_location_ = test_file_location = argv[1];
+ key_dir = argv[2];
+ }
+ std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
+ "\"operation\" : \"update\", "
+ "\"operationid\" : \"8675309\", "
+ "\"name\": \"configuration\""
+ "}]}";
+
+ responses.push_back(heartbeat_response);
+
+ std::ifstream myfile(test_file_location.c_str());
+
+ if (myfile.is_open()) {
+ std::stringstream buffer;
+ buffer << myfile.rdbuf();
+ std::string str = buffer.str();
+ myfile.close();
+ std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
+ "\"operation\" : \"update\", "
+ "\"operationid\" : \"8675309\", "
+ "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:9090/update\"}}]}";
+ responses.push_back(response);
+ }
+
+ std::shared_ptr<minifi::Configure> configuration = std::make_shared<
+ minifi::Configure>();
+
+ configuration->set("c2.rest.url",
+ "http://localhost:9090/update");
+ mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+
+ std::shared_ptr<core::Repository> test_repo =
+ std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<
+ TestFlowRepository>();
+
+ configuration->set(minifi::Configure::nifi_flow_configuration_file,
+ test_file_location);
+
+ std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared
+ <minifi::io::StreamFactory>(configuration);
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr
+ <core::YamlConfiguration
+ >(new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory,
+ configuration,
+ test_file_location));
+ std::shared_ptr<TestRepository> repo = std::static_pointer_cast
+ <TestRepository>(test_repo);
+
+ std::shared_ptr<minifi::FlowController> controller =
+ std::make_shared<minifi::FlowController
+ >(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true);
+
+ core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory,
+ configuration,
+ test_file_location);
+
+ std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(
+ test_file_location);
+ std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup
+ >(ptr.get());
+ ptr.release();
+
+ controller->load();
+ controller->start();
+ waitToVerifyProcessor();
+
+ controller->waitUnload(60000);
+ std::string logs = LogTestController::getInstance().log_output.str();
+ assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos);
+ LogTestController::getInstance().reset();
+
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/HTTPHandlers.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
new file mode 100644
index 0000000..714090a
--- /dev/null
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -0,0 +1,320 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "civetweb.h"
+#include "CivetServer.h"
+#include "concurrentqueue.h"
+#include "CivetStream.h"
+#include "io/CRCStream.h"
+#ifndef LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_
+#define LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_
+static std::atomic<int> transaction_id;
+static std::atomic<int> transaction_id_output;
+
+class FlowObj {
+ public:
+ FlowObj()
+ : total_size(0) {
+
+ }
+ explicit FlowObj(const FlowObj &&other)
+ : total_size(std::move(other.total_size)),
+ attributes(std::move(other.attributes)),
+ data(std::move(other.data)) {
+
+ }
+ uint64_t total_size;
+ std::map<std::string, std::string> attributes;
+ std::vector<uint8_t> data;
+
+};
+
+class SiteToSiteLocationResponder : public CivetHandler {
+ public:
+ explicit SiteToSiteLocationResponder(bool isSecure)
+ : isSecure(isSecure) {
+ }
+ bool handleGet(CivetServer *server, struct mg_connection *conn) {
+ std::string site2site_rest_resp = "{"
+ "\"revision\": {"
+ "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\""
+ "},"
+ "\"controller\": {"
+ "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\","
+ "\"name\": \"NiFi Flow\","
+ "\"siteToSiteSecure\": ";
+ site2site_rest_resp += (isSecure ? "true" : "false");
+ site2site_rest_resp += "}}";
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ site2site_rest_resp.length());
+ mg_printf(conn, "%s", site2site_rest_resp.c_str());
+ return true;
+ }
+
+ protected:
+ bool isSecure;
+};
+
+class PeerResponder : public CivetHandler {
+ public:
+
+ explicit PeerResponder(const std::string base_url)
+ : base_url(base_url) {
+ }
+
+ bool handleGet(CivetServer *server, struct mg_connection *conn) {
+ std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": \"localhost\", \"port\": 8082, \"secure\": false, \"flowFileCount\" : 0 }] }";
+ std::stringstream headers;
+ headers << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n";
+ mg_printf(conn, "%s", headers.str().c_str());
+ mg_printf(conn, "%s", site2site_rest_resp.c_str());
+ return true;
+ }
+
+ protected:
+ std::string base_url;
+};
+
+class TransactionResponder : public CivetHandler {
+ public:
+
+ explicit TransactionResponder(const std::string base_url, std::string port_id, bool input_port, bool wrong_uri, bool empty_transaction_uri)
+ : base_url(base_url),
+ wrong_uri(wrong_uri),
+ empty_transaction_uri(empty_transaction_uri),
+ input_port(input_port),
+ port_id(port_id),
+ flow_files_feed_(nullptr) {
+
+ if (input_port) {
+ transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de96";
+ transaction_id_str += std::to_string(transaction_id.load());
+ transaction_id++;
+ } else {
+ transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de95";
+ transaction_id_str += std::to_string(transaction_id_output.load());
+ transaction_id_output++;
+ }
+ }
+
+ bool handlePost(CivetServer *server, struct mg_connection *conn) {
+ std::string site2site_rest_resp = "";
+ std::stringstream headers;
+ headers << "HTTP/1.1 201 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nx-location-uri-intent: ";
+ if (wrong_uri)
+ headers << "ohstuff\r\n";
+ else
+ headers << "transaction-url\r\n";
+
+ std::string port_type;
+
+ if (input_port)
+ port_type = "input-ports";
+ else
+ port_type = "output-ports";
+ if (!empty_transaction_uri)
+ headers << "Location: " << base_url << "/site-to-site/" << port_type << "/" << port_id << "/transactions/" << transaction_id_str << "\r\n";
+ headers << "Connection: close\r\n\r\n";
+ mg_printf(conn, "%s", headers.str().c_str());
+ mg_printf(conn, "%s", site2site_rest_resp.c_str());
+ return true;
+ }
+
+ void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) {
+ flow_files_feed_ = feed;
+ }
+
+ std::string getTransactionId() {
+ return transaction_id_str;
+ }
+ protected:
+ std::string base_url;
+ std::string transaction_id_str;
+ bool wrong_uri;
+ bool empty_transaction_uri;
+ bool input_port;
+ std::string port_id;
+ moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
+};
+
+class FlowFileResponder : public CivetHandler {
+ public:
+
+ explicit FlowFileResponder(bool input_port, bool wrong_uri, bool invalid_checksum)
+ : wrong_uri(wrong_uri),
+ input_port(input_port),
+ invalid_checksum(invalid_checksum),
+ flow_files_feed_(nullptr) {
+ }
+
+ moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *getFlows() {
+ return &flow_files_;
+ }
+
+ void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) {
+ flow_files_feed_ = feed;
+ }
+
+ bool handlePost(CivetServer *server, struct mg_connection *conn) {
+ std::string site2site_rest_resp = "";
+ std::stringstream headers;
+
+ if (!wrong_uri) {
+ minifi::io::CivetStream civet_stream(conn);
+ minifi::io::CRCStream<minifi::io::CivetStream> stream(&civet_stream);
+ uint32_t num_attributes;
+ uint64_t total_size = 0;
+ total_size += stream.read(num_attributes);
+
+ auto flow = std::make_shared<FlowObj>();
+
+ for (int i = 0; i < num_attributes; i++) {
+ std::string name, value;
+ total_size += stream.readUTF(name, true);
+ total_size += stream.readUTF(value, true);
+ flow->attributes[name] = value;
+ }
+ uint64_t length;
+ total_size += stream.read(length);
+
+ total_size += length;
+ flow->data.resize(length);
+ flow->total_size = total_size;
+
+ assert(stream.readData(flow->data.data(), length) == length);
+
+ assert(flow->attributes["path"] == ".");
+ assert(!flow->attributes["uuid"].empty());
+ assert(!flow->attributes["filename"].empty());
+
+ if (!invalid_checksum) {
+ site2site_rest_resp = std::to_string(stream.getCRC());
+ flow_files_.enqueue(flow);
+ } else {
+ site2site_rest_resp = "Imawrongchecksumshortandstout";
+ }
+
+ headers << "HTTP/1.1 202 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n";
+ } else {
+ headers << "HTTP/1.1 404\r\nConnection: close\r\n\r\n";
+ }
+
+ mg_printf(conn, "%s", headers.str().c_str());
+ mg_printf(conn, "%s", site2site_rest_resp.c_str());
+ return true;
+ }
+
+ bool handleGet(CivetServer *server, struct mg_connection *conn) {
+
+ if (flow_files_feed_->size_approx() > 0) {
+ std::shared_ptr<FlowObj> flow;
+ uint8_t buf[1];
+ std::vector<std::shared_ptr<FlowObj>> flows;
+ uint64_t total = 0;
+
+ while (flow_files_feed_->try_dequeue(flow)) {
+ flows.push_back(flow);
+ total += flow->total_size;
+ }
+ mg_printf(conn, "HTTP/1.1 200 OK\r\n"
+ "Content-Length: %llu\r\n"
+ "Content-Type: application/octet-stream\r\n"
+ "Connection: close\r\n\r\n",
+ total);
+ minifi::io::BaseStream serializer;
+ minifi::io::CRCStream<minifi::io::BaseStream> stream(&serializer);
+ for (auto flow : flows) {
+ uint32_t num_attributes = flow->attributes.size();
+ stream.write(num_attributes);
+ for (auto entry : flow->attributes) {
+ stream.writeUTF(entry.first);
+ stream.writeUTF(entry.second);
+ }
+ uint64_t length = flow->data.size();
+ stream.write(length);
+ stream.writeData(flow->data.data(), length);
+ }
+ auto ret = mg_write(conn, serializer.getBuffer(), total);
+ } else {
+ std::cout << "Nothing to transfer feed" << std::endl;
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nConnection: "
+ "close\r\nContent-Length: 0\r\n");
+ mg_printf(conn, "Content-Type: text/plain\r\n\r\n");
+
+ }
+ return true;
+ }
+
+ void setFlowUrl(std::string flowUrl) {
+ base_url = flowUrl;
+ }
+
+ protected:
+ // base url
+ std::string base_url;
+ // set the wrong url
+ bool wrong_uri;
+ // we are running an input port
+ bool input_port;
+ // invalid checksum is returned.
+ bool invalid_checksum;
+ moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> flow_files_;
+ moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
+};
+
+class DeleteTransactionResponder : public CivetHandler {
+ public:
+
+ explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, int expected_resp_code)
+ : flow_files_feed_(nullptr),
+ base_url(base_url),
+ response_code(response_code) {
+ expected_resp_code_str = std::to_string(expected_resp_code);
+ }
+
+ explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed)
+ : flow_files_feed_(feed),
+ base_url(base_url),
+ response_code(response_code) {
+ }
+
+ bool handleDelete(CivetServer *server, struct mg_connection *conn) {
+
+ std::string site2site_rest_resp = "";
+ std::stringstream headers;
+ std::string resp;
+ CivetServer::getParam(conn, "responseCode", resp);
+ headers << "HTTP/1.1 " << response_code << "\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\n";
+ headers << "Connection: close\r\n\r\n";
+ mg_printf(conn, "%s", headers.str().c_str());
+ mg_printf(conn, "%s", site2site_rest_resp.c_str());
+ return true;
+ }
+
+ void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) {
+ flow_files_feed_ = feed;
+ }
+
+ protected:
+ moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
+ std::string base_url;
+ std::string expected_resp_code_str;
+ std::string response_code;
+};
+
+#endif /* LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ */