You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2018/01/08 18:27:00 UTC

[3/5] nifi-minifi-cpp git commit: MINIFICPP-251 Move Civet implementations to an extension.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTProtocol.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTProtocol.cpp b/extensions/http-curl/protocols/RESTProtocol.cpp
deleted file mode 100644
index afbe3c9..0000000
--- a/extensions/http-curl/protocols/RESTProtocol.cpp
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "RESTProtocol.h"
-
-#include <algorithm>
-#include <memory>
-#include <utility>
-#include <map>
-#include <string>
-#include <vector>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const std::vector<char> &response) {
-  Json::Reader reader;
-  Json::Value root;
-  try {
-    if (reader.parse(std::string(response.data(), response.size()), root)) {
-      std::string requested_operation = getOperation(payload);
-
-      std::string identifier;
-      if (root.isMember("operationid")) {
-        identifier = root["operationid"].asString();
-      }
-      if (root["operation"].asString() == requested_operation) {
-        if (root["requested_operations"].size() == 0) {
-          return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true));
-        }
-        C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true);
-
-        new_payload.setIdentifier(identifier);
-
-        for (const Json::Value& request : root["requested_operations"]) {
-          Operation newOp = stringToOperation(request["operation"].asString());
-          C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, true);
-          C2ContentResponse new_command(newOp);
-          new_command.delay = 0;
-          new_command.required = true;
-          new_command.ttl = -1;
-          // set the identifier if one exists
-          if (request.isMember("operationid")) {
-            new_command.ident = request["operationid"].asString();
-            nested_payload.setIdentifier(new_command.ident);
-          }
-          new_command.name = request["name"].asString();
-
-          if (request.isMember("content") && request["content"].size() > 0) {
-            for (const auto &name : request["content"].getMemberNames()) {
-              new_command.operation_arguments[name] = request["content"][name].asString();
-            }
-          }
-          nested_payload.addContent(std::move(new_command));
-          new_payload.addPayload(std::move(nested_payload));
-        }
-        // we have a response for this request
-        return new_payload;
-      }
-    }
-  } catch (...) {
-  }
-  return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true));
-}
-
-Json::Value RESTProtocol::serializeJsonPayload(Json::Value &json_root, const C2Payload &payload) {
-  // get the name from the content
-  Json::Value json_payload;
-  std::map<std::string, std::vector<Json::Value>> children;
-  for (const auto &nested_payload : payload.getNestedPayloads()) {
-    Json::Value child_payload = serializeJsonPayload(json_payload, nested_payload);
-    children[nested_payload.getLabel()].push_back(child_payload);
-  }
-  for (auto child_vector : children) {
-    if (child_vector.second.size() > 1) {
-      Json::Value children_json(Json::arrayValue);
-      for (auto child : child_vector.second) {
-        json_payload[child_vector.first] = child;
-      }
-    } else {
-      if (child_vector.second.size() == 1) {
-        if (child_vector.second.at(0).isMember(child_vector.first)) {
-          json_payload[child_vector.first] = child_vector.second.at(0)[child_vector.first];
-        } else {
-          json_payload[child_vector.first] = child_vector.second.at(0);
-        }
-      }
-    }
-  }
-
-  const std::vector<C2ContentResponse> &content = payload.getContent();
-  for (const auto &payload_content : content) {
-    Json::Value payload_content_values;
-    bool use_sub_option = true;
-    if (payload_content.op == payload.getOperation()) {
-      for (auto content : payload_content.operation_arguments) {
-        if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
-          json_payload[payload_content.name] = content.second;
-          use_sub_option = false;
-        } else {
-          payload_content_values[content.first] = content.second;
-        }
-      }
-    }
-    if (use_sub_option)
-      json_payload[payload_content.name] = payload_content_values;
-  }
-  return json_payload;
-}
-
-std::string RESTProtocol::getOperation(const C2Payload &payload) {
-  switch (payload.getOperation()) {
-    case Operation::ACKNOWLEDGE:
-      return "acknowledge";
-    case Operation::HEARTBEAT:
-      return "heartbeat";
-    case Operation::RESTART:
-      return "restart";
-    case Operation::DESCRIBE:
-      return "describe";
-    case Operation::STOP:
-      return "stop";
-    case Operation::START:
-      return "start";
-    case Operation::UPDATE:
-      return "update";
-    default:
-      return "heartbeat";
-  }
-}
-
-Operation RESTProtocol::stringToOperation(const std::string str) {
-  std::string op = str;
-  std::transform(str.begin(), str.end(), op.begin(), ::tolower);
-  if (op == "heartbeat") {
-    return Operation::HEARTBEAT;
-  } else if (op == "acknowledge") {
-    return Operation::ACKNOWLEDGE;
-  } else if (op == "update") {
-    return Operation::UPDATE;
-  } else if (op == "describe") {
-    return Operation::DESCRIBE;
-  } else if (op == "restart") {
-    return Operation::RESTART;
-  } else if (op == "clear") {
-    return Operation::CLEAR;
-  } else if (op == "stop") {
-    return Operation::STOP;
-  } else if (op == "start") {
-    return Operation::START;
-  }
-  return Operation::HEARTBEAT;
-}
-
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTProtocol.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTProtocol.h b/extensions/http-curl/protocols/RESTProtocol.h
deleted file mode 100644
index 4767e77..0000000
--- a/extensions/http-curl/protocols/RESTProtocol.h
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
-#define LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
-
-#include "json/json.h"
-#include "json/writer.h"
-#include <string>
-#include <mutex>
-
-#include "utils/ByteArrayCallback.h"
-#include "CivetServer.h"
-#include "c2/C2Protocol.h"
-#include "c2/HeartBeatReporter.h"
-#include "controllers/SSLContextService.h"
-#include "utils/HTTPClient.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-/**
- * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
- *
- * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
- * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
- * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
- *
- */
-class RESTProtocol {
- public:
-  RESTProtocol() {
-
-  }
-
-  virtual ~RESTProtocol() {
-
-  }
-
- protected:
-
-  virtual Json::Value serializeJsonPayload(Json::Value &json_root, const C2Payload &payload);
-
-  virtual const C2Payload parseJsonResponse(const C2Payload &payload, const std::vector<char> &response);
-
-  virtual std::string getOperation(const C2Payload &payload);
-
-  virtual Operation stringToOperation(const std::string str);
-
-};
-
-} /* namesapce c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif /* LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTOPERATIONS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTReceiver.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp b/extensions/http-curl/protocols/RESTReceiver.cpp
deleted file mode 100644
index 4c46516..0000000
--- a/extensions/http-curl/protocols/RESTReceiver.cpp
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "RESTReceiver.h"
-#include <algorithm>
-#include <memory>
-#include <utility>
-#include <map>
-#include <string>
-#include <vector>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-int log_message(const struct mg_connection *conn, const char *message) {
-  puts(message);
-  return 1;
-}
-
-int ssl_protocol_en(void *ssl_context, void *user_data) {
-  return 0;
-}
-
-RESTReceiver::RESTReceiver(std::string name, uuid_t uuid)
-    : HeartBeatReporter(name, uuid),
-      logger_(logging::LoggerFactory<RESTReceiver>::getLogger()) {
-}
-
-void RESTReceiver::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
-  HeartBeatReporter::initialize(controller, configure);
-  logger_->log_debug("Initializing rest receiveer");
-  if (nullptr != configuration_) {
-    std::string listeningPort, rootUri, caCert;
-    configuration_->get("c2.rest.listener.port", listeningPort);
-    configuration_->get("c2.rest.listener.heartbeat.rooturi", rootUri);
-    configuration_->get("c2.rest.listener.cacert", caCert);
-
-    if (!listeningPort.empty() && !rootUri.empty()) {
-      handler = std::unique_ptr<ListeningProtocol>(new ListeningProtocol());
-      if (!caCert.empty()) {
-        listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()), caCert);
-      } else {
-        listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()));
-      }
-    }
-  }
-}
-int16_t RESTReceiver::heartbeat(const C2Payload &payload) {
-  std::string operation_request_str = getOperation(payload);
-  std::string outputConfig;
-  Json::Value json_payload;
-  json_payload["operation"] = operation_request_str;
-  if (payload.getIdentifier().length() > 0) {
-    json_payload["operationid"] = payload.getIdentifier();
-  }
-  const std::vector<C2ContentResponse> &content = payload.getContent();
-
-  for (const auto &payload_content : content) {
-    Json::Value payload_content_values;
-    bool use_sub_option = true;
-    if (payload_content.op == payload.getOperation()) {
-      for (auto content : payload_content.operation_arguments) {
-        if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
-          json_payload[payload_content.name] = content.second;
-          use_sub_option = false;
-        } else {
-          payload_content_values[content.first] = content.second;
-        }
-      }
-    }
-    if (use_sub_option)
-      json_payload[payload_content.name] = payload_content_values;
-  }
-
-  for (const auto &nested_payload : payload.getNestedPayloads()) {
-    json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload);
-  }
-
-  Json::StyledWriter writer;
-  outputConfig = writer.write(json_payload);
-  if (handler != nullptr) {
-    logger_->log_debug("Setting %s", outputConfig);
-    handler->setResponse(outputConfig);
-  }
-
-  return 0;
-}
-
-std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert) {
-  struct mg_callbacks callback;
-
-  memset(&callback, 0, sizeof(callback));
-  callback.init_ssl = ssl_protocol_en;
-  std::string my_port = port;
-  my_port += "s";
-  callback.log_message = log_message;
-  const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", "ALL",
-      "ssl_verify_peer", "no", "num_threads", "1", 0 };
-
-  std::vector<std::string> cpp_options;
-  for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
-    cpp_options.push_back(options[i]);
-  }
-  std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options));
-
-  server->addHandler(rooturi, handler);
-
-  return server;
-}
-
-std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler) {
-  const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), "num_threads", "1", 0 };
-
-  std::vector<std::string> cpp_options;
-  for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
-    cpp_options.push_back(options[i]);
-  }
-  std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options));
-
-  server->addHandler(rooturi, handler);
-
-  return server;
-}
-
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTReceiver.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTReceiver.h b/extensions/http-curl/protocols/RESTReceiver.h
deleted file mode 100644
index b0de62a..0000000
--- a/extensions/http-curl/protocols/RESTReceiver.h
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_C2_RESTRCVR_H_
-#define LIBMINIFI_INCLUDE_C2_RESTRCVR_H_
-
-#include "RESTSender.h"
-#include "json/json.h"
-#include "json/writer.h"
-#include <string>
-#include <mutex>
-#include "core/Resource.h"
-#include "RESTProtocol.h"
-#include "CivetServer.h"
-#include "c2/C2Protocol.h"
-#include "controllers/SSLContextService.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-int log_message(const struct mg_connection *conn, const char *message);
-
-int ssl_protocol_en(void *ssl_context, void *user_data);
-
-/**
- * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
- *
- * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
- * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
- * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
- *
- */
-class RESTReceiver : public RESTProtocol, public HeartBeatReporter {
- public:
-  RESTReceiver(std::string name, uuid_t uuid = nullptr);
-
-  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
-  virtual int16_t heartbeat(const C2Payload &heartbeat) override;
-
- protected:
-
-  class ListeningProtocol : public CivetHandler {
-
-   public:
-    ListeningProtocol() {
-
-    }
-
-    bool handleGet(CivetServer *server, struct mg_connection *conn) {
-      std::string currentvalue;
-      {
-        std::lock_guard<std::mutex> lock(reponse_mutex_);
-        currentvalue = resp_;
-      }
-
-      std::stringstream output;
-      output << "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: " << currentvalue.length() << "\r\nConnection: close\r\n\r\n";
-
-      mg_printf(conn, "%s", output.str().c_str());
-      mg_printf(conn, "%s", currentvalue.c_str());
-      return true;
-    }
-
-    void setResponse(std::string response) {
-      std::lock_guard<std::mutex> lock(reponse_mutex_);
-      resp_ = response;
-    }
-
-   protected:
-    std::mutex reponse_mutex_;
-    std::string resp_;
-
-  };
-
-  std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert);
-
-  std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler);
-
-  std::unique_ptr<CivetServer> listener;
-  std::unique_ptr<ListeningProtocol> handler;
-
- private:
-  std::shared_ptr<logging::Logger> logger_;
-};
-
-REGISTER_RESOURCE(RESTReceiver);
-
-} /* namesapce c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif /* LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTSender.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
deleted file mode 100644
index ebf532a..0000000
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "RESTSender.h"
-
-#include <algorithm>
-#include <memory>
-#include <utility>
-#include <map>
-#include <string>
-#include <vector>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-RESTSender::RESTSender(std::string name, uuid_t uuid)
-    : C2Protocol(name, uuid),
-      logger_(logging::LoggerFactory<Connectable>::getLogger()) {
-}
-
-void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
-  C2Protocol::initialize(controller, configure);
-  // base URL when one is not specified.
-  if (nullptr != configure) {
-    configure->get("c2.rest.url", rest_uri_);
-    configure->get("c2.rest.url.ack", ack_uri_);
-  }
-  logger_->log_info("Submitting to %s", rest_uri_);
-}
-C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) {
-  std::string operation_request_str = getOperation(payload);
-  std::string outputConfig;
-  if (direction == Direction::TRANSMIT) {
-    Json::Value json_payload;
-    json_payload["operation"] = operation_request_str;
-    if (payload.getIdentifier().length() > 0) {
-      json_payload["operationid"] = payload.getIdentifier();
-    }
-    const std::vector<C2ContentResponse> &content = payload.getContent();
-
-    for (const auto &payload_content : content) {
-      Json::Value payload_content_values;
-      bool use_sub_option = true;
-      if (payload_content.op == payload.getOperation()) {
-        for (auto content : payload_content.operation_arguments) {
-          if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
-            json_payload[payload_content.name] = content.second;
-            use_sub_option = false;
-          } else {
-            payload_content_values[content.first] = content.second;
-          }
-        }
-      }
-      if (use_sub_option)
-        json_payload[payload_content.name] = payload_content_values;
-    }
-
-    for (const auto &nested_payload : payload.getNestedPayloads()) {
-      json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload);
-    }
-
-    Json::StyledWriter writer;
-    outputConfig = writer.write(json_payload);
-  }
-
-  return sendPayload(url, direction, payload, outputConfig);
-}
-
-C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction direction, bool async) {
-  if (payload.getOperation() == ACKNOWLEDGE) {
-    return consumePayload(ack_uri_, payload, direction, async);
-  }
-  return consumePayload(rest_uri_, payload, direction, async);
-}
-
-void RESTSender::update(const std::shared_ptr<Configure> &configure) {
-  std::string url;
-  configure->get("c2.rest.url", url);
-  configure->get("c2.rest.url.ack", url);
-}
-
-const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) {
-  utils::HTTPClient client(url, ssl_context_service_);
-  client.setConnectionTimeout(2);
-
-  std::unique_ptr<utils::ByteInputCallBack> input = nullptr;
-  std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr;
-  if (direction == Direction::TRANSMIT) {
-    input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
-    callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback);
-    input->write(outputConfig);
-    callback->ptr = input.get();
-    callback->pos = 0;
-    client.set_request_method("POST");
-    client.setUploadCallback(callback.get());
-  } else {
-    // we do not need to set the uplaod callback
-    // since we are not uploading anything on a get
-    client.set_request_method("GET");
-  }
-  client.setContentType("application/json");
-  bool isOkay = client.submit();
-  int64_t respCode = client.getResponseCode();
-
-  if (isOkay && respCode) {
-    if (payload.isRaw()) {
-      C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true);
-
-      response_payload.setRawData(client.getResponseBody());
-      return response_payload;
-    }
-    return parseJsonResponse(payload, client.getResponseBody());
-  } else {
-    return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
-  }
-}
-
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/protocols/RESTSender.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTSender.h b/extensions/http-curl/protocols/RESTSender.h
deleted file mode 100644
index e4c1e5e..0000000
--- a/extensions/http-curl/protocols/RESTSender.h
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_C2_RESTSENDER_H_
-#define LIBMINIFI_INCLUDE_C2_RESTSENDER_H_
-
-#include "json/json.h"
-#include "json/writer.h"
-#include <string>
-#include <mutex>
-
-#include "utils/ByteArrayCallback.h"
-#include "CivetServer.h"
-#include "c2/C2Protocol.h"
-#include "RESTProtocol.h"
-#include "c2/HeartBeatReporter.h"
-#include "controllers/SSLContextService.h"
-#include "../client/HTTPClient.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
-
-/**
- * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
- *
- * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
- * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
- * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
- *
- */
-class RESTSender : public RESTProtocol, public C2Protocol {
- public:
-
-  explicit RESTSender(std::string name, uuid_t uuid = nullptr);
-
-  virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) override;
-
-  virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async) override;
-
-  virtual void update(const std::shared_ptr<Configure> &configure) override;
-
-  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
-
- protected:
-
-  virtual const C2Payload sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig);
-
-  std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
-
- private:
-  std::shared_ptr<logging::Logger> logger_;
-  std::string rest_uri_;
-  std::string ack_uri_;
-};
-
-REGISTER_RESOURCE(RESTSender);
-
-} /* namesapce c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif /* LIBMINIFI_INCLUDE_C2_RESTPROTOCOL_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/libarchive/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/libarchive/CMakeLists.txt b/extensions/libarchive/CMakeLists.txt
index 8a8d37f..1ed8c5b 100644
--- a/extensions/libarchive/CMakeLists.txt
+++ b/extensions/libarchive/CMakeLists.txt
@@ -20,7 +20,7 @@
 set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
 set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
 
-include_directories(../../libminifi/include  ../../libminifi/include/core  ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/jsoncpp/include  ../../thirdparty/) 
+include_directories(../../libminifi/include  ../../libminifi/include/core  ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/jsoncpp/include  ../../thirdparty/) 
 
 find_package(LibArchive)
 	

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/rocksdb-repos/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/CMakeLists.txt b/extensions/rocksdb-repos/CMakeLists.txt
index cb1bdb1..5d459e5 100644
--- a/extensions/rocksdb-repos/CMakeLists.txt
+++ b/extensions/rocksdb-repos/CMakeLists.txt
@@ -23,7 +23,7 @@ cmake_minimum_required(VERSION 2.6)
 set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
 set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
 
-include_directories(../../libminifi/include  ../../libminifi/include/core  ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/jsoncpp/include ../../thirdparty/) 
+include_directories(../../libminifi/include  ../../libminifi/include/core  ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/rocksdb/include  ../../thirdparty/) 
 
 find_package(RocksDB)
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/script/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/script/CMakeLists.txt b/extensions/script/CMakeLists.txt
index 1501c68..927d974 100644
--- a/extensions/script/CMakeLists.txt
+++ b/extensions/script/CMakeLists.txt
@@ -37,7 +37,7 @@ endif()
 set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
 set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
 
-include_directories(../../libminifi/include  ../../libminifi/include/core  ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/jsoncpp/include  ../../thirdparty/) 
+include_directories(../../libminifi/include  ../../libminifi/include/core  ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/jsoncpp/include  ../../thirdparty/) 
 
 file(GLOB SOURCES  "*.cpp")
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 6ede84f..a40f070 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -55,7 +55,6 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wno-reorder")
 include_directories(../thirdparty/spdlog-20170710/include)
 include_directories(../thirdparty/yaml-cpp-yaml-cpp-20171024/include)
 
-include_directories(${CIVET_THIRDPARTY_ROOT}/include)
 include_directories(../thirdparty/jsoncpp/include)
 include_directories(../thirdparty/concurrentqueue/)
 include_directories(include)
@@ -73,7 +72,7 @@ file(GLOB SPD_SOURCES "../thirdparty/spdlog-20170710/include/spdlog/*")
 add_library(spdlog STATIC ${SPD_SOURCES})
 add_library(core-minifi STATIC ${SOURCES})
 add_dependencies(core-minifi jsoncpp_project)
-target_link_libraries(core-minifi ${UUID_LIBRARIES} ${JSONCPP_LIB} yaml-cpp )
+target_link_libraries(core-minifi ${UUID_LIBRARIES} ${JSONCPP_LIB} yaml-cpp dl)
 
 find_package(ZLIB REQUIRED)
 include_directories(${ZLIB_INCLUDE_DIRS})

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/include/c2/protocols/RESTProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/protocols/RESTProtocol.h b/libminifi/include/c2/protocols/RESTProtocol.h
new file mode 100644
index 0000000..823b5a9
--- /dev/null
+++ b/libminifi/include/c2/protocols/RESTProtocol.h
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
+#define LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTPROTOCOL_H_
+
+#include "json/json.h"
+#include "json/writer.h"
+#include <string>
+#include <mutex>
+
+#include "utils/ByteArrayCallback.h"
+#include "c2/C2Protocol.h"
+#include "c2/HeartBeatReporter.h"
+#include "controllers/SSLContextService.h"
+#include "utils/HTTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
+ *
+ * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST
+ * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction
+ * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively.
+ *
+ */
+class RESTProtocol {
+ public:
+  RESTProtocol() {
+
+  }
+
+  virtual ~RESTProtocol() {
+
+  }
+
+ protected:
+
+  virtual Json::Value serializeJsonPayload(Json::Value &json_root, const C2Payload &payload);
+
+  virtual const C2Payload parseJsonResponse(const C2Payload &payload, const std::vector<char> &response);
+
+  virtual std::string getOperation(const C2Payload &payload);
+
+  virtual Operation stringToOperation(const std::string str);
+
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_PROTOCOLS_RESTOPERATIONS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index 6e644ef..eab7169 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -25,6 +25,7 @@
 #include "core/controller/StandardControllerServiceProvider.h"
 #include "provenance/Provenance.h"
 #include "core/reporting/SiteToSiteProvenanceReportingTask.h"
+
 #include "core/Processor.h"
 #include "core/Resource.h"
 #include "core/logging/LoggerConfiguration.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/include/processors/ListenHTTP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ListenHTTP.h b/libminifi/include/processors/ListenHTTP.h
deleted file mode 100644
index 1b58dcd..0000000
--- a/libminifi/include/processors/ListenHTTP.h
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * @file ListenHTTP.h
- * ListenHTTP class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __LISTEN_HTTP_H__
-#define __LISTEN_HTTP_H__
-
-#include <memory>
-#include <regex>
-
-#include <CivetServer.h>
-
-#include "FlowFileRecord.h"
-#include "core/Processor.h"
-#include "core/ProcessSession.h"
-#include "core/Core.h"
-#include "core/Resource.h"
-#include "core/logging/LoggerConfiguration.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-// ListenHTTP Class
-class ListenHTTP : public core::Processor {
- public:
-
-  // Constructor
-  /*!
-   * Create a new processor
-   */
-  ListenHTTP(std::string name, uuid_t uuid = NULL)
-      : Processor(name, uuid),
-        logger_(logging::LoggerFactory<ListenHTTP>::getLogger()) {
-  }
-  // Destructor
-  virtual ~ListenHTTP();
-  // Processor Name
-  static constexpr char const* ProcessorName = "ListenHTTP";
-  // Supported Properties
-  static core::Property BasePath;
-  static core::Property Port;
-  static core::Property AuthorizedDNPattern;
-  static core::Property SSLCertificate;
-  static core::Property SSLCertificateAuthority;
-  static core::Property SSLVerifyPeer;
-  static core::Property SSLMinimumVersion;
-  static core::Property HeadersAsAttributesRegex;
-  // Supported Relationships
-  static core::Relationship Success;
-
-  void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
-  void initialize();
-  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
-
-  // HTTP request handler
-  class Handler : public CivetHandler {
-   public:
-    Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern);bool handlePost(
-        CivetServer *server, struct mg_connection *conn);
-
-   private:
-    // Send HTTP 500 error response to client
-    void sendErrorResponse(struct mg_connection *conn);
-    // Logger
-    std::shared_ptr<logging::Logger> logger_;
-
-    std::regex _authDNRegex;
-    std::regex _headersAsAttributesRegex;
-    core::ProcessContext *_processContext;
-    core::ProcessSessionFactory *_processSessionFactory;
-  };
-
-  // Write callback for transferring data from HTTP request to content repo
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo);
-    int64_t process(std::shared_ptr<io::BaseStream> stream);
-
-   private:
-    // Logger
-    std::shared_ptr<logging::Logger> logger_;
-
-    struct mg_connection *_conn;
-    const struct mg_request_info *_reqInfo;
-  };
-
- private:
-  // Logger
-  std::shared_ptr<logging::Logger> logger_;
-
-  std::unique_ptr<CivetServer> _server;
-  std::unique_ptr<Handler> _handler;
-};
-
-REGISTER_RESOURCE(ListenHTTP);
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/src/c2/protocols/RESTProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp
new file mode 100644
index 0000000..946e3c6
--- /dev/null
+++ b/libminifi/src/c2/protocols/RESTProtocol.cpp
@@ -0,0 +1,177 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "c2/protocols/RESTProtocol.h"
+
+#include <algorithm>
+#include <memory>
+#include <utility>
+#include <map>
+#include <string>
+#include <vector>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const std::vector<char> &response) {
+  Json::Reader reader;
+  Json::Value root;
+  try {
+    if (reader.parse(std::string(response.data(), response.size()), root)) {
+      std::string requested_operation = getOperation(payload);
+
+      std::string identifier;
+      if (root.isMember("operationid")) {
+        identifier = root["operationid"].asString();
+      }
+      if (root["operation"].asString() == requested_operation) {
+        if (root["requested_operations"].size() == 0) {
+          return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true));
+        }
+        C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true);
+
+        new_payload.setIdentifier(identifier);
+
+        for (const Json::Value& request : root["requested_operations"]) {
+          Operation newOp = stringToOperation(request["operation"].asString());
+          C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, true);
+          C2ContentResponse new_command(newOp);
+          new_command.delay = 0;
+          new_command.required = true;
+          new_command.ttl = -1;
+          // set the identifier if one exists
+          if (request.isMember("operationid")) {
+            new_command.ident = request["operationid"].asString();
+            nested_payload.setIdentifier(new_command.ident);
+          }
+          new_command.name = request["name"].asString();
+
+          if (request.isMember("content") && request["content"].size() > 0) {
+            for (const auto &name : request["content"].getMemberNames()) {
+              new_command.operation_arguments[name] = request["content"][name].asString();
+            }
+          }
+          nested_payload.addContent(std::move(new_command));
+          new_payload.addPayload(std::move(nested_payload));
+        }
+        // we have a response for this request
+        return new_payload;
+      }
+    }
+  } catch (...) {
+  }
+  return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true));
+}
+
+Json::Value RESTProtocol::serializeJsonPayload(Json::Value &json_root, const C2Payload &payload) {
+  // get the name from the content
+  Json::Value json_payload;
+  std::map<std::string, std::vector<Json::Value>> children;
+  for (const auto &nested_payload : payload.getNestedPayloads()) {
+    Json::Value child_payload = serializeJsonPayload(json_payload, nested_payload);
+    children[nested_payload.getLabel()].push_back(child_payload);
+  }
+  for (auto child_vector : children) {
+    if (child_vector.second.size() > 1) {
+      Json::Value children_json(Json::arrayValue);
+      for (auto child : child_vector.second) {
+        json_payload[child_vector.first] = child;
+      }
+    } else {
+      if (child_vector.second.size() == 1) {
+        if (child_vector.second.at(0).isMember(child_vector.first)) {
+          json_payload[child_vector.first] = child_vector.second.at(0)[child_vector.first];
+        } else {
+          json_payload[child_vector.first] = child_vector.second.at(0);
+        }
+      }
+    }
+  }
+
+  const std::vector<C2ContentResponse> &content = payload.getContent();
+  for (const auto &payload_content : content) {
+    Json::Value payload_content_values;
+    bool use_sub_option = true;
+    if (payload_content.op == payload.getOperation()) {
+      for (auto content : payload_content.operation_arguments) {
+        if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) {
+          json_payload[payload_content.name] = content.second;
+          use_sub_option = false;
+        } else {
+          payload_content_values[content.first] = content.second;
+        }
+      }
+    }
+    if (use_sub_option)
+      json_payload[payload_content.name] = payload_content_values;
+  }
+  return json_payload;
+}
+
+std::string RESTProtocol::getOperation(const C2Payload &payload) {
+  switch (payload.getOperation()) {
+    case Operation::ACKNOWLEDGE:
+      return "acknowledge";
+    case Operation::HEARTBEAT:
+      return "heartbeat";
+    case Operation::RESTART:
+      return "restart";
+    case Operation::DESCRIBE:
+      return "describe";
+    case Operation::STOP:
+      return "stop";
+    case Operation::START:
+      return "start";
+    case Operation::UPDATE:
+      return "update";
+    default:
+      return "heartbeat";
+  }
+}
+
+Operation RESTProtocol::stringToOperation(const std::string str) {
+  std::string op = str;
+  std::transform(str.begin(), str.end(), op.begin(), ::tolower);
+  if (op == "heartbeat") {
+    return Operation::HEARTBEAT;
+  } else if (op == "acknowledge") {
+    return Operation::ACKNOWLEDGE;
+  } else if (op == "update") {
+    return Operation::UPDATE;
+  } else if (op == "describe") {
+    return Operation::DESCRIBE;
+  } else if (op == "restart") {
+    return Operation::RESTART;
+  } else if (op == "clear") {
+    return Operation::CLEAR;
+  } else if (op == "stop") {
+    return Operation::STOP;
+  } else if (op == "start") {
+    return Operation::START;
+  }
+  return Operation::HEARTBEAT;
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/src/processors/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp
deleted file mode 100644
index 62f8194..0000000
--- a/libminifi/src/processors/ListenHTTP.cpp
+++ /dev/null
@@ -1,333 +0,0 @@
-/**
- * @file ListenHTTP.cpp
-
- * ListenHTTP class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "processors/ListenHTTP.h"
-#include <uuid/uuid.h>
-#include <CivetServer.h>
-#include <stdio.h>
-#include <sstream>
-#include <utility>
-#include <memory>
-#include <string>
-#include <iostream>
-#include <fstream>
-#include <set>
-#include <vector>
-#include "utils/TimeUtil.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/ProcessSessionFactory.h"
-#include "core/logging/LoggerConfiguration.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-core::Property ListenHTTP::BasePath("Base Path", "Base path for incoming connections", "contentListener");
-core::Property ListenHTTP::Port("Listening Port", "The Port to listen on for incoming connections", "");
-core::Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming"
-                                               " connections. If the Pattern does not match the DN, the connection will be refused.",
-                                               ".*");
-core::Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing PEM-formatted file including TLS/SSL certificate and key", "");
-core::Property ListenHTTP::SSLCertificateAuthority("SSL Certificate Authority", "File containing trusted PEM-formatted certificates", "");
-core::Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the client's certificate (yes/no)", "no");
-core::Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2");
-core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", "Specifies the Regular Expression that determines the names of HTTP Headers that"
-                                                    " should be passed along as FlowFile attributes",
-                                                    "");
-
-core::Relationship ListenHTTP::Success("success", "All files are routed to success");
-
-void ListenHTTP::initialize() {
-  logger_->log_info("Initializing ListenHTTP");
-
-  // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(BasePath);
-  properties.insert(Port);
-  properties.insert(AuthorizedDNPattern);
-  properties.insert(SSLCertificate);
-  properties.insert(SSLCertificateAuthority);
-  properties.insert(SSLVerifyPeer);
-  properties.insert(SSLMinimumVersion);
-  properties.insert(HeadersAsAttributesRegex);
-  setSupportedProperties(properties);
-  // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  setSupportedRelationships(relationships);
-}
-
-void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
-  std::string basePath;
-
-  if (!context->getProperty(BasePath.getName(), basePath)) {
-    logger_->log_info("%s attribute is missing, so default value of %s will be used", BasePath.getName().c_str(), BasePath.getValue().c_str());
-    basePath = BasePath.getValue();
-  }
-
-  basePath.insert(0, "/");
-
-  std::string listeningPort;
-
-  if (!context->getProperty(Port.getName(), listeningPort)) {
-    logger_->log_error("%s attribute is missing or invalid", Port.getName().c_str());
-    return;
-  }
-
-  std::string authDNPattern;
-
-  if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) && !authDNPattern.empty()) {
-    logger_->log_info("ListenHTTP using %s: %s", AuthorizedDNPattern.getName().c_str(), authDNPattern.c_str());
-  }
-
-  std::string sslCertFile;
-
-  if (context->getProperty(SSLCertificate.getName(), sslCertFile) && !sslCertFile.empty()) {
-    logger_->log_info("ListenHTTP using %s: %s", SSLCertificate.getName().c_str(), sslCertFile.c_str());
-  }
-
-  // Read further TLS/SSL options only if TLS/SSL usage is implied by virtue of certificate value being set
-  std::string sslCertAuthorityFile;
-  std::string sslVerifyPeer;
-  std::string sslMinVer;
-
-  if (!sslCertFile.empty()) {
-    if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile) && !sslCertAuthorityFile.empty()) {
-      logger_->log_info("ListenHTTP using %s: %s", SSLCertificateAuthority.getName().c_str(), sslCertAuthorityFile.c_str());
-    }
-
-    if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer)) {
-      if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) {
-        logger_->log_info("ListenHTTP will not verify peers");
-      } else {
-        logger_->log_info("ListenHTTP will verify peers");
-      }
-    } else {
-      logger_->log_info("ListenHTTP will not verify peers");
-    }
-
-    if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer)) {
-      logger_->log_info("ListenHTTP using %s: %s", SSLMinimumVersion.getName().c_str(), sslMinVer.c_str());
-    }
-  }
-
-  std::string headersAsAttributesPattern;
-
-  if (context->getProperty(HeadersAsAttributesRegex.getName(), headersAsAttributesPattern) && !headersAsAttributesPattern.empty()) {
-    logger_->log_info("ListenHTTP using %s: %s", HeadersAsAttributesRegex.getName().c_str(), headersAsAttributesPattern.c_str());
-  }
-
-  auto numThreads = getMaxConcurrentTasks();
-
-  logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads", listeningPort.c_str(), basePath.c_str(), numThreads);
-
-  // Initialize web server
-  std::vector<std::string> options;
-  options.push_back("enable_keep_alive");
-  options.push_back("yes");
-  options.push_back("keep_alive_timeout_ms");
-  options.push_back("15000");
-  options.push_back("num_threads");
-  options.push_back(std::to_string(numThreads));
-
-  if (sslCertFile.empty()) {
-    options.push_back("listening_ports");
-    options.push_back(listeningPort);
-  } else {
-    listeningPort += "s";
-    options.push_back("listening_ports");
-    options.push_back(listeningPort);
-
-    options.push_back("ssl_certificate");
-    options.push_back(sslCertFile);
-
-    if (!sslCertAuthorityFile.empty()) {
-      options.push_back("ssl_ca_file");
-      options.push_back(sslCertAuthorityFile);
-    }
-
-    if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) {
-      options.push_back("ssl_verify_peer");
-      options.push_back("no");
-    } else {
-      options.push_back("ssl_verify_peer");
-      options.push_back("yes");
-    }
-
-    if (sslMinVer.compare("SSL2") == 0) {
-      options.push_back("ssl_protocol_version");
-      options.push_back(std::to_string(0));
-    } else if (sslMinVer.compare("SSL3") == 0) {
-      options.push_back("ssl_protocol_version");
-      options.push_back(std::to_string(1));
-    } else if (sslMinVer.compare("TLS1.0") == 0) {
-      options.push_back("ssl_protocol_version");
-      options.push_back(std::to_string(2));
-    } else if (sslMinVer.compare("TLS1.1") == 0) {
-      options.push_back("ssl_protocol_version");
-      options.push_back(std::to_string(3));
-    } else {
-      options.push_back("ssl_protocol_version");
-      options.push_back(std::to_string(4));
-    }
-  }
-
-  _server.reset(new CivetServer(options));
-  _handler.reset(new Handler(context, sessionFactory, std::move(authDNPattern), std::move(headersAsAttributesPattern)));
-  _server->addHandler(basePath, _handler.get());
-}
-
-ListenHTTP::~ListenHTTP() {
-}
-
-void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
-  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
-
-  // Do nothing if there are no incoming files
-  if (!flowFile) {
-    return;
-  }
-}
-
-ListenHTTP::Handler::Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern)
-    : _authDNRegex(std::move(authDNPattern)),
-      _headersAsAttributesRegex(std::move(headersAsAttributesPattern)),
-      logger_(logging::LoggerFactory<ListenHTTP::Handler>::getLogger()) {
-  _processContext = context;
-  _processSessionFactory = sessionFactory;
-}
-
-void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) {
-  mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"
-            "Content-Type: text/html\r\n"
-            "Content-Length: 0\r\n\r\n");
-}
-
-bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn) {
-  auto req_info = mg_get_request_info(conn);
-  logger_->log_info("ListenHTTP handling POST request of length %ll", req_info->content_length);
-
-  // If this is a two-way TLS connection, authorize the peer against the configured pattern
-  if (req_info->is_ssl && req_info->client_cert != nullptr) {
-    if (!std::regex_match(req_info->client_cert->subject, _authDNRegex)) {
-      mg_printf(conn, "HTTP/1.1 403 Forbidden\r\n"
-                "Content-Type: text/html\r\n"
-                "Content-Length: 0\r\n\r\n");
-      logger_->log_warn("ListenHTTP client DN not authorized: %s", req_info->client_cert->subject);
-      return true;
-    }
-  }
-
-  // Always send 100 Continue, as allowed per standard to minimize client delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html)
-  mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n");
-
-  auto session = _processSessionFactory->createSession();
-  ListenHTTP::WriteCallback callback(conn, req_info);
-  auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
-
-  if (!flowFile) {
-    sendErrorResponse(conn);
-    return true;
-  }
-
-  try {
-    session->write(flowFile, &callback);
-
-    // Add filename from "filename" header value (and pattern headers)
-    for (int i = 0; i < req_info->num_headers; i++) {
-      auto header = &req_info->http_headers[i];
-
-      if (strcmp("filename", header->name) == 0) {
-        if (!flowFile->updateAttribute("filename", header->value)) {
-          flowFile->addAttribute("filename", header->value);
-        }
-      } else if (std::regex_match(header->name, _headersAsAttributesRegex)) {
-        if (!flowFile->updateAttribute(header->name, header->value)) {
-          flowFile->addAttribute(header->name, header->value);
-        }
-      }
-    }
-
-    session->transfer(flowFile, Success);
-    session->commit();
-  } catch (std::exception &exception) {
-    logger_->log_debug("ListenHTTP Caught Exception %s", exception.what());
-    sendErrorResponse(conn);
-    session->rollback();
-    throw;
-  } catch (...) {
-    logger_->log_debug("ListenHTTP Caught Exception Processor::onTrigger");
-    sendErrorResponse(conn);
-    session->rollback();
-    throw;
-  }
-
-  mg_printf(conn, "HTTP/1.1 200 OK\r\n"
-            "Content-Type: text/html\r\n"
-            "Content-Length: 0\r\n\r\n");
-
-  return true;
-}
-
-ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo)
-    : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) {
-  _conn = conn;
-  _reqInfo = reqInfo;
-}
-
-int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
-  int64_t rlen;
-  int64_t nlen = 0;
-  int64_t tlen = _reqInfo->content_length;
-  uint8_t buf[16384];
-
-  // if we have no content length we should call mg_read until
-  // there is no data left from the stream to be HTTP/1.1 compliant
-  while (tlen == -1 || nlen < tlen) {
-    rlen = tlen == -1 ? sizeof(buf) : tlen - nlen;
-
-    if (rlen > (int64_t)sizeof(buf)) {
-      rlen = (int64_t)sizeof(buf);
-    }
-
-    // Read a buffer of data from client
-    rlen = mg_read(_conn, &buf[0], (size_t) rlen);
-
-    if (rlen <= 0) {
-      break;
-    }
-
-    // Transfer buffer data to the output stream
-    stream->write(&buf[0], rlen);
-
-    nlen += rlen;
-  }
-
-  return nlen;
-}
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/TestServer.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestServer.h b/libminifi/test/TestServer.h
deleted file mode 100644
index 06f996c..0000000
--- a/libminifi/test/TestServer.h
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_TEST_TESTSERVER_H_
-#define LIBMINIFI_TEST_TESTSERVER_H_
-#include <regex.h>
-#include <string>
-#include <iostream>
-#include "civetweb.h"
-#include "CivetServer.h"
-
-
-/* Server context handle */
-static std::string resp_str;
-
-void init_webserver() {
-  mg_init_library(0);
-}
-
-
-CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler, struct mg_callbacks *callbacks, std::string &cert, std::string &ca_cert) {
-  const char *options[] = { "listening_ports", port.c_str(), "error_log_file",
-      "error.log", "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list",
-      "ALL", "ssl_verify_peer", "no", 0 };
-
-  std::vector<std::string> cpp_options;
-  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
-    cpp_options.push_back(options[i]);
-  }
-  CivetServer *server = new CivetServer(cpp_options);
-
-  server->addHandler(rooturi, handler);
-
-  return server;
-
-}
-
-CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler) {
-  const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), 0 };
-
-  std::vector<std::string> cpp_options;
-  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
-    cpp_options.push_back(options[i]);
-  }
-  CivetServer *server = new CivetServer(cpp_options);
-
-  server->addHandler(rooturi, handler);
-
-  return server;
-
-}
-
-bool parse_http_components(const std::string &url, std::string &port, std::string &scheme, std::string &path) {
-  regex_t regex;
-
-  const char *regexstr = "^(http|https)://(localhost:)([0-9]+)?(/.*)$";
-
-  int ret = regcomp(&regex, regexstr, REG_EXTENDED);
-  if (ret) {
-    return false;
-  }
-
-  size_t potentialGroups = regex.re_nsub + 1;
-  regmatch_t groups[potentialGroups];
-  if (regexec(&regex, url.c_str(), potentialGroups, groups, 0) == 0) {
-    for (int i = 0; i < potentialGroups; i++) {
-      if (groups[i].rm_so == -1)
-        break;
-
-      std::string str(url.data() + groups[i].rm_so, groups[i].rm_eo - groups[i].rm_so);
-      switch (i) {
-        case 1:
-          scheme = str;
-          break;
-        case 3:
-          port = str;
-          break;
-        case 4:
-          path = str;
-          break;
-        default:
-          break;
-      }
-    }
-  }
-  if (path.empty() || scheme.empty() || port.empty())
-    return false;
-
-  regfree(&regex);
-
-  return true;
-
-}
-
-static void stop_webserver(CivetServer *server) {
-  if (server != nullptr)
-    delete server;
-
-  /* Un-initialize the library */
-  mg_exit_library();
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/C2NullConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/C2NullConfiguration.cpp b/libminifi/test/curl-tests/C2NullConfiguration.cpp
deleted file mode 100644
index cbb1831..0000000
--- a/libminifi/test/curl-tests/C2NullConfiguration.cpp
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sys/stat.h>
-#undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
-#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
-#include "../TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "../include/core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "../unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "CivetServer.h"
-#include "RemoteProcessorGroupPort.h"
-#include "core/ConfigurableComponent.h"
-#include "controllers/SSLContextService.h"
-#include "../TestServer.h"
-#include "c2/C2Agent.h"
-#include "protocols/RESTReceiver.h"
-#include "../integration/IntegrationBase.h"
-#include "processors/LogAttribute.h"
-
-class VerifyC2Server : public IntegrationBase {
- public:
-  explicit VerifyC2Server(bool isSecure)
-      : isSecure(isSecure) {
-    char format[] = "/tmp/ssth.XXXXXX";
-    dir = testController.createTempDirectory(format);
-  }
-
-  void testSetup() {
-    LogTestController::getInstance().setDebug<utils::HTTPClient>();
-    LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
-    LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
-      LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
-    LogTestController::getInstance().setDebug<processors::LogAttribute>();
-    LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
-    std::fstream file;
-    ss << dir << "/" << "tstFile.ext";
-    file.open(ss.str(), std::ios::out);
-    file << "tempFile";
-    file.close();
-  }
-
-  void cleanup() {
-    unlink(ss.str().c_str());
-  }
-
-  void runAssertions() {
-    assert(LogTestController::getInstance().contains("C2Agent] [info] Class is null") == true);
-    assert(LogTestController::getInstance().contains("C2Agent] [debug] Could not instantiate null") == true);
-    assert(LogTestController::getInstance().contains("Class is RESTSender") == true);
-  }
-
-  void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
-    std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
-    assert(proc != nullptr);
-
-    std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
-
-    assert(inv != nullptr);
-    std::string url = "";
-    inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
-
-
-    std::string port, scheme, path;
-    parse_http_components(url, port, scheme, path);
-    configuration->set("c2.agent.protocol.class", "null");
-    configuration->set("c2.rest.url", "");
-    configuration->set("c2.rest.url.ack", "");
-    configuration->set("c2.agent.heartbeat.reporter.classes", "null");
-    configuration->set("c2.rest.listener.port", "null");
-    configuration->set("c2.agent.heartbeat.period", "null");
-    configuration->set("c2.rest.listener.heartbeat.rooturi", "null");
-  }
-
- protected:
-  bool isSecure;
-  char *dir;
-  std::stringstream ss;
-  TestController testController;
-};
-
-int main(int argc, char **argv) {
-  std::string key_dir, test_file_location, url;
-  if (argc > 1) {
-    test_file_location = argv[1];
-    key_dir = argv[2];
-  }
-
-  bool isSecure = false;
-  if (url.find("https") != std::string::npos) {
-    isSecure = true;
-  }
-
-  VerifyC2Server harness(isSecure);
-
-  harness.setKeyDir(key_dir);
-
-  harness.run(test_file_location);
-
-  return 0;
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/C2UpdateTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/C2UpdateTest.cpp b/libminifi/test/curl-tests/C2UpdateTest.cpp
deleted file mode 100644
index 1a9fe5b..0000000
--- a/libminifi/test/curl-tests/C2UpdateTest.cpp
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sys/stat.h>
-#undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
-#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
-#include "../TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "../include/core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "../unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "c2/C2Agent.h"
-#include "CivetServer.h"
-#include <cstring>
-#include "RESTSender.h"
-
-void waitToVerifyProcessor() {
-  std::this_thread::sleep_for(std::chrono::seconds(10));
-}
-
-static std::vector<std::string> responses;
-
-class ConfigHandler : public CivetHandler {
- public:
-  ConfigHandler() {
-    calls_ = 0;
-  }
-  bool handlePost(CivetServer *server, struct mg_connection *conn) {
-    calls_++;
-    if (responses.size() > 0) {
-      std::string top_str = responses.back();
-      responses.pop_back();
-      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-                "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
-                top_str.length());
-      mg_printf(conn, "%s", top_str.c_str());
-    } else {
-      mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
-    }
-
-    return true;
-  }
-
-  bool handleGet(CivetServer *server, struct mg_connection *conn) {
-    std::ifstream myfile(test_file_location_.c_str());
-
-    if (myfile.is_open()) {
-      std::stringstream buffer;
-      buffer << myfile.rdbuf();
-      std::string str = buffer.str();
-      myfile.close();
-      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-                "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
-                str.length());
-      mg_printf(conn, "%s", str.c_str());
-    } else {
-      mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
-    }
-
-    return true;
-  }
-  std::string test_file_location_;
-  std::atomic<size_t> calls_;
-};
-
-int main(int argc, char **argv) {
-  mg_init_library(0);
-  LogTestController::getInstance().setInfo<minifi::FlowController>();
-  LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
-  LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
-  LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
-
-  const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 };
-  std::vector<std::string> cpp_options;
-  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
-    cpp_options.push_back(options[i]);
-  }
-
-  CivetServer server(cpp_options);
-  ConfigHandler h_ex;
-  server.addHandler("/update", h_ex);
-  std::string key_dir, test_file_location;
-  if (argc > 1) {
-    h_ex.test_file_location_ = test_file_location = argv[1];
-    key_dir = argv[2];
-  }
-  std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [  {"
-      "\"operation\" : \"update\", "
-      "\"operationid\" : \"8675309\", "
-      "\"name\": \"configuration\""
-      "}]}";
-
-  responses.push_back(heartbeat_response);
-
-  std::ifstream myfile(test_file_location.c_str());
-
-  if (myfile.is_open()) {
-    std::stringstream buffer;
-    buffer << myfile.rdbuf();
-    std::string str = buffer.str();
-    myfile.close();
-    std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [  {"
-        "\"operation\" : \"update\", "
-        "\"operationid\" : \"8675309\", "
-        "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:9090/update\"}}]}";
-    responses.push_back(response);
-  }
-
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-
-  configuration->set("c2.rest.url", "http://localhost:9090/update");
-  configuration->set("c2.agent.heartbeat.period", "1000");
-  mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
-
-  std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
-  std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
-
-  configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
-
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
-      new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
-  std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
-
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
-  true);
-
-  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
-
-  std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
-  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
-  ptr.release();
-  auto start = std::chrono::system_clock::now();
-
-  controller->load();
-  controller->start();
-  waitToVerifyProcessor();
-
-  controller->waitUnload(60000);
-  auto then = std::chrono::system_clock::now();
-
-  auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
-  std::string logs = LogTestController::getInstance().log_output.str();
-  assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos);
-  LogTestController::getInstance().reset();
-  rmdir("./content_repository");
-  assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
-
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/C2VerifyHeartbeatAndStop.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/C2VerifyHeartbeatAndStop.cpp b/libminifi/test/curl-tests/C2VerifyHeartbeatAndStop.cpp
deleted file mode 100644
index 69c9a3b..0000000
--- a/libminifi/test/curl-tests/C2VerifyHeartbeatAndStop.cpp
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sys/stat.h>
-#undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
-#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
-#include "../TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "../include/core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "../unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "CivetServer.h"
-#include "RemoteProcessorGroupPort.h"
-#include "core/ConfigurableComponent.h"
-#include "controllers/SSLContextService.h"
-#include "../TestServer.h"
-#include "c2/C2Agent.h"
-#include "RESTReceiver.h"
-#include "../integration/IntegrationBase.h"
-#include "processors/LogAttribute.h"
-
-class Responder : public CivetHandler {
- public:
-  explicit Responder(bool isSecure)
-      : isSecure(isSecure) {
-  }
-  bool handlePost(CivetServer *server, struct mg_connection *conn) {
-    std::string resp =
-        "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\"  }, "
-        "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\"  } ]}";
-    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
-              resp.length());
-    mg_printf(conn, "%s", resp.c_str());
-    return true;
-  }
-
- protected:
-  bool isSecure;
-};
-
-class VerifyC2Heartbeat : public IntegrationBase {
- public:
-  explicit VerifyC2Heartbeat(bool isSecure)
-      : isSecure(isSecure) {
-    char format[] = "/tmp/ssth.XXXXXX";
-    dir = testController.createTempDirectory(format);
-  }
-
-  void testSetup() {
-    LogTestController::getInstance().setDebug<utils::HTTPClient>();
-    LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
-    LogTestController::getInstance().setDebug<LogTestController>();
-    LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
-    LogTestController::getInstance().setDebug<minifi::c2::RESTProtocol>();
-    LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
-    std::fstream file;
-    ss << dir << "/" << "tstFile.ext";
-    file.open(ss.str(), std::ios::out);
-    file << "tempFile";
-    file.close();
-  }
-
-  void cleanup() {
-    LogTestController::getInstance().reset();
-    unlink(ss.str().c_str());
-  }
-
-  void runAssertions() {
-    assert(LogTestController::getInstance().contains("Received Ack from Server") == true);
-
-    assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component invoke") == true);
-
-    assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController") == true);
-  }
-
-  void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
-    std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
-    assert(proc != nullptr);
-
-    std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
-
-    assert(inv != nullptr);
-    std::string url = "";
-    inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
-
-    configuration->set("c2.rest.url", "http://localhost:8888/api/heartbeat");
-    configuration->set("c2.agent.heartbeat.period", "1000");
-    configuration->set("c2.rest.url.ack", "http://localhost:8888/api/heartbeat");
-  }
-
- protected:
-  bool isSecure;
-  char *dir;
-  std::stringstream ss;
-  TestController testController;
-};
-
-int main(int argc, char **argv) {
-  std::string key_dir, test_file_location, url;
-  url = "http://localhost:8888/api/heartbeat";
-  if (argc > 1) {
-    test_file_location = argv[1];
-    key_dir = argv[2];
-  }
-
-  bool isSecure = false;
-  if (url.find("https") != std::string::npos) {
-    isSecure = true;
-  }
-
-  VerifyC2Heartbeat harness(isSecure);
-
-  harness.setKeyDir(key_dir);
-
-  Responder responder(isSecure);
-
-  harness.setUrl(url, &responder);
-
-  harness.run(test_file_location);
-
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/C2VerifyServeResults.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/C2VerifyServeResults.cpp b/libminifi/test/curl-tests/C2VerifyServeResults.cpp
deleted file mode 100644
index 961fec0..0000000
--- a/libminifi/test/curl-tests/C2VerifyServeResults.cpp
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sys/stat.h>
-#undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
-#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
-#include "../TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "../include/core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "../unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "CivetServer.h"
-#include "RemoteProcessorGroupPort.h"
-#include "core/ConfigurableComponent.h"
-#include "controllers/SSLContextService.h"
-#include "../TestServer.h"
-#include "c2/C2Agent.h"
-#include "RESTReceiver.h"
-#include "../integration/IntegrationBase.h"
-#include "processors/LogAttribute.h"
-
-class VerifyC2Server : public IntegrationBase {
- public:
-  explicit VerifyC2Server(bool isSecure)
-      : isSecure(isSecure) {
-    char format[] = "/tmp/ssth.XXXXXX";
-    dir = testController.createTempDirectory(format);
-  }
-
-  void testSetup() {
-    LogTestController::getInstance().setDebug<utils::HTTPClient>();
-    LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
-    LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
-      LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
-    LogTestController::getInstance().setDebug<processors::LogAttribute>();
-    LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
-    std::fstream file;
-    ss << dir << "/" << "tstFile.ext";
-    file.open(ss.str(), std::ios::out);
-    file << "tempFile";
-    file.close();
-  }
-
-  void cleanup() {
-    unlink(ss.str().c_str());
-  }
-
-  void runAssertions() {
-    assert(LogTestController::getInstance().contains("Import offset 0") == true);
-
-    assert(LogTestController::getInstance().contains("Outputting success and response") == true);
-  }
-
-  void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
-    std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
-    assert(proc != nullptr);
-
-    std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
-
-    assert(inv != nullptr);
-    std::string url = "";
-    inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
-
-
-    std::string port, scheme, path;
-    parse_http_components(url, port, scheme, path);
-    configuration->set("c2.agent.heartbeat.reporter.classes", "RESTReceiver");
-    configuration->set("c2.rest.listener.port", port);
-    configuration->set("c2.agent.heartbeat.period", "10");
-    configuration->set("c2.rest.listener.heartbeat.rooturi", path);
-  }
-
- protected:
-  bool isSecure;
-  char *dir;
-  std::stringstream ss;
-  TestController testController;
-};
-
-int main(int argc, char **argv) {
-  std::string key_dir, test_file_location, url;
-  if (argc > 1) {
-    test_file_location = argv[1];
-    key_dir = argv[2];
-  }
-
-  bool isSecure = false;
-  if (url.find("https") != std::string::npos) {
-    isSecure = true;
-  }
-
-  VerifyC2Server harness(isSecure);
-
-  harness.setKeyDir(key_dir);
-
-  harness.run(test_file_location);
-
-  return 0;
-}