You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/01 20:27:16 UTC

[2/6] nifi-minifi-cpp git commit: MINIFICPP-60: Add initial implementation of Site to Site changes.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/src/sitetosite/SiteToSiteClient.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
new file mode 100644
index 0000000..e3a4719
--- /dev/null
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -0,0 +1,773 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "sitetosite/SiteToSiteClient.h"
+#include <map>
+#include <string>
+#include <memory>
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sitetosite {
+
+int SiteToSiteClient::writeRequestType(RequestType type) {
+  if (type >= MAX_REQUEST_TYPE)
+    return -1;
+
+  return peer_->writeUTF(RequestTypeStr[type]);
+}
+
+int SiteToSiteClient::readRequestType(RequestType &type) {
+  std::string requestTypeStr;
+
+  int ret = peer_->readUTF(requestTypeStr);
+
+  if (ret <= 0)
+    return ret;
+
+  for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) {
+    if (RequestTypeStr[i] == requestTypeStr) {
+      type = (RequestType) i;
+      return ret;
+    }
+  }
+
+  return -1;
+}
+
+int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction> &transaction, RespondCode &code, std::string &message) {
+  uint8_t firstByte;
+
+  int ret = peer_->read(firstByte);
+
+  if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1)
+    return -1;
+
+  uint8_t secondByte;
+
+  ret = peer_->read(secondByte);
+
+  if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2)
+    return -1;
+
+  uint8_t thirdByte;
+
+  ret = peer_->read(thirdByte);
+
+  if (ret <= 0)
+    return ret;
+
+  code = (RespondCode) thirdByte;
+
+  RespondCodeContext *resCode = this->getRespondCodeContext(code);
+
+  if (resCode == NULL) {
+    // Not a valid respond code
+    return -1;
+  }
+  if (resCode->hasDescription) {
+    ret = peer_->readUTF(message);
+    if (ret <= 0)
+      return -1;
+  }
+  return 3 + message.size();
+}
+
+void SiteToSiteClient::deleteTransaction(std::string transactionID) {
+  std::shared_ptr<Transaction> transaction = NULL;
+
+  std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+
+  if (it == known_transactions_.end()) {
+    return;
+  } else {
+    transaction = it->second;
+  }
+
+  logger_->log_info("Site2Site delete transaction %s", transaction->getUUIDStr().c_str());
+  known_transactions_.erase(transactionID);
+}
+
+int SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message) {
+  RespondCodeContext *resCode = this->getRespondCodeContext(code);
+
+  if (resCode == NULL) {
+    // Not a valid respond code
+    return -1;
+  }
+
+  uint8_t codeSeq[3];
+  codeSeq[0] = CODE_SEQUENCE_VALUE_1;
+  codeSeq[1] = CODE_SEQUENCE_VALUE_2;
+  codeSeq[2] = (uint8_t) code;
+
+  int ret = peer_->write(codeSeq, 3);
+
+  if (ret != 3)
+    return -1;
+
+  if (resCode->hasDescription) {
+    ret = peer_->writeUTF(message);
+    if (ret > 0) {
+      return (3 + ret);
+    } else {
+      return ret;
+    }
+  } else {
+    return 3;
+  }
+}
+
+void SiteToSiteClient::tearDown() {
+  if (peer_state_ >= ESTABLISHED) {
+    logger_->log_info("Site2Site Protocol tearDown");
+    // need to write shutdown request
+    writeRequestType(SHUTDOWN);
+  }
+
+  known_transactions_.clear();
+  peer_->Close();
+  peer_state_ = IDLE;
+}
+
+bool SiteToSiteClient::transferFlowFiles(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+  std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast<FlowFileRecord>(session->get());
+
+  std::shared_ptr<Transaction> transaction = NULL;
+
+  if (!flow) {
+    return false;
+  }
+
+  if (peer_state_ != READY) {
+    if (!bootstrap())
+      return false;
+  }
+
+  if (peer_state_ != READY) {
+    context->yield();
+    tearDown();
+    throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
+  }
+
+  // Create the transaction
+  std::string transactionID;
+  transaction = createTransaction(transactionID, SEND);
+  if (transaction == NULL) {
+    context->yield();
+    tearDown();
+    throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
+  }
+
+  bool continueTransaction = true;
+  uint64_t startSendingNanos = getTimeNano();
+
+  try {
+    while (continueTransaction) {
+      uint64_t startTime = getTimeMillis();
+      std::string payload;
+      DataPacket packet(getLogger(), transaction, flow->getAttributes(), payload);
+
+      int16_t resp = send(transactionID, &packet, flow, session);
+      if (resp == -1) {
+        throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
+      }
+
+      logger_->log_info("Site2Site transaction %s send flow record %s", transactionID.c_str(), flow->getUUIDStr().c_str());
+      if (resp == 0) {
+        uint64_t endTime = getTimeMillis();
+        std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
+        std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + peer_->getHostName();
+        session->getProvenanceReporter()->send(flow, transitUri, details, endTime - startTime, false);
+      }
+      session->remove(flow);
+
+      uint64_t transferNanos = getTimeNano() - startSendingNanos;
+      if (transferNanos > _batchSendNanos)
+        break;
+
+      flow = std::static_pointer_cast<FlowFileRecord>(session->get());
+
+      if (!flow) {
+        continueTransaction = false;
+      }
+    }  // while true
+
+    if (!confirm(transactionID)) {
+      std::stringstream ss;
+      ss << "Confirm Failed for " << transactionID;
+      throw Exception(SITE2SITE_EXCEPTION, ss.str().c_str());
+    }
+    if (!complete(transactionID)) {
+      std::stringstream ss;
+      ss << "Complete Failed for " << transactionID;
+      throw Exception(SITE2SITE_EXCEPTION, ss.str().c_str());
+    }
+    logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes);
+  } catch (std::exception &exception) {
+    if (transaction)
+      deleteTransaction(transactionID);
+    context->yield();
+    tearDown();
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    if (transaction)
+      deleteTransaction(transactionID);
+    context->yield();
+    tearDown();
+    logger_->log_debug("Caught Exception during SiteToSiteClient::transferFlowFiles");
+    throw;
+  }
+
+  deleteTransaction(transactionID);
+
+  return true;
+}
+
+bool SiteToSiteClient::confirm(std::string transactionID) {
+  int ret;
+  std::shared_ptr<Transaction> transaction = NULL;
+
+  if (peer_state_ != READY) {
+    bootstrap();
+  }
+
+  if (peer_state_ != READY) {
+    return false;
+  }
+
+  std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+
+  if (it == known_transactions_.end()) {
+    return false;
+  } else {
+    transaction = it->second;
+  }
+
+  if (transaction->getState() == TRANSACTION_STARTED && !transaction->isDataAvailable() && transaction->getDirection() == RECEIVE) {
+    transaction->_state = TRANSACTION_CONFIRMED;
+    return true;
+  }
+
+  if (transaction->getState() != DATA_EXCHANGED)
+    return false;
+
+  if (transaction->getDirection() == RECEIVE) {
+    if (transaction->isDataAvailable())
+      return false;
+    // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
+    // to peer so that we can verify that the connection is still open. This is a two-phase commit,
+    // which helps to prevent the chances of data duplication. Without doing this, we may commit the
+    // session and then when we send the response back to the peer, the peer may have timed out and may not
+    // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
+    // Critical Section involved in this transaction so that rather than the Critical Section being the
+    // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
+    int64_t crcValue = transaction->getCRC();
+    std::string crc = std::to_string(crcValue);
+    logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s", transaction->getCRC(), transactionID.c_str());
+    ret = writeResponse(transaction, CONFIRM_TRANSACTION, crc);
+    if (ret <= 0)
+      return false;
+    RespondCode code;
+    std::string message;
+    readResponse(transaction, code, message);
+    if (ret <= 0)
+      return false;
+
+    if (code == CONFIRM_TRANSACTION) {
+      logger_->log_info("Site2Site transaction %s peer confirm transaction", transactionID.c_str());
+      transaction->_state = TRANSACTION_CONFIRMED;
+      return true;
+    } else if (code == BAD_CHECKSUM) {
+      logger_->log_info("Site2Site transaction %s peer indicate bad checksum", transactionID.c_str());
+      return false;
+    } else {
+      logger_->log_info("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code);
+      return false;
+    }
+  } else {
+    logger_->log_info("Site2Site Send FINISH TRANSACTION for transaction %s", transactionID.c_str());
+    ret = writeResponse(transaction, FINISH_TRANSACTION, "FINISH_TRANSACTION");
+    if (ret <= 0)
+      return false;
+    RespondCode code;
+    std::string message;
+    readResponse(transaction, code, message);
+
+    // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
+    if (code == CONFIRM_TRANSACTION) {
+      logger_->log_info("Site2Site transaction %s peer confirm transaction with CRC %s", transactionID.c_str(), message.c_str());
+      if (this->_currentVersion > 3) {
+        int64_t crcValue = transaction->getCRC();
+        std::string crc = std::to_string(crcValue);
+        if (message == crc) {
+          logger_->log_info("Site2Site transaction %s CRC matched", transactionID.c_str());
+          ret = writeResponse(transaction, CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
+          if (ret <= 0)
+            return false;
+          transaction->_state = TRANSACTION_CONFIRMED;
+          return true;
+        } else {
+          logger_->log_info("Site2Site transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str());
+          ret = writeResponse(transaction, BAD_CHECKSUM, "BAD_CHECKSUM");
+          return false;
+        }
+      }
+      ret = writeResponse(transaction, CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
+      if (ret <= 0)
+        return false;
+      transaction->_state = TRANSACTION_CONFIRMED;
+      return true;
+    } else {
+      logger_->log_info("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code);
+      return false;
+    }
+    return false;
+  }
+}
+
+void SiteToSiteClient::cancel(std::string transactionID) {
+  std::shared_ptr<Transaction> transaction = NULL;
+
+  if (peer_state_ != READY) {
+    return;
+  }
+
+  std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+
+  if (it == known_transactions_.end()) {
+    return;
+  } else {
+    transaction = it->second;
+  }
+
+  if (transaction->getState() == TRANSACTION_CANCELED || transaction->getState() == TRANSACTION_COMPLETED || transaction->getState() == TRANSACTION_ERROR) {
+    return;
+  }
+
+  this->writeResponse(transaction, CANCEL_TRANSACTION, "Cancel");
+  transaction->_state = TRANSACTION_CANCELED;
+
+  tearDown();
+  return;
+}
+
+void SiteToSiteClient::error(std::string transactionID) {
+  std::shared_ptr<Transaction> transaction = NULL;
+
+  std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+
+  if (it == known_transactions_.end()) {
+    return;
+  } else {
+    transaction = it->second;
+  }
+
+  transaction->_state = TRANSACTION_ERROR;
+  tearDown();
+  return;
+}
+
+// Complete the transaction
+bool SiteToSiteClient::complete(std::string transactionID) {
+  int ret;
+  std::shared_ptr<Transaction> transaction = NULL;
+
+  if (peer_state_ != READY) {
+    bootstrap();
+  }
+
+  if (peer_state_ != READY) {
+    return false;
+  }
+
+  auto it = this->known_transactions_.find(transactionID);
+
+  if (it == known_transactions_.end()) {
+    return false;
+  } else {
+    transaction = it->second;
+  }
+
+  if (transaction->total_transfers_ > 0 && transaction->getState() != TRANSACTION_CONFIRMED) {
+    return false;
+  }
+  if (transaction->getDirection() == RECEIVE) {
+    if (transaction->current_transfers_ == 0) {
+      transaction->_state = TRANSACTION_COMPLETED;
+      return true;
+    } else {
+      logger_->log_info("Site2Site transaction %s send finished", transactionID.c_str());
+      ret = this->writeResponse(transaction, TRANSACTION_FINISHED, "Finished");
+      if (ret <= 0) {
+        return false;
+      } else {
+        transaction->_state = TRANSACTION_COMPLETED;
+        return true;
+      }
+    }
+  } else {
+    RespondCode code;
+    std::string message;
+    int ret;
+
+    ret = readResponse(transaction, code, message);
+
+    if (ret <= 0)
+      return false;
+
+    if (code == TRANSACTION_FINISHED) {
+      logger_->log_info("Site2Site transaction %s peer finished transaction", transactionID.c_str());
+      transaction->_state = TRANSACTION_COMPLETED;
+      return true;
+    } else {
+      logger_->log_info("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code);
+      return false;
+    }
+  }
+}
+
+int16_t SiteToSiteClient::send(std::string transactionID, DataPacket *packet, const std::shared_ptr<FlowFileRecord> &flowFile, const std::shared_ptr<core::ProcessSession> &session) {
+  int ret;
+  std::shared_ptr<Transaction> transaction = NULL;
+
+  if (flowFile && !flowFile->getResourceClaim()->exists()) {
+    logger_->log_info("Claim %s does not exist for FlowFile %s", flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr());
+    return -2;
+  }
+  if (peer_state_ != READY) {
+    bootstrap();
+  }
+
+  if (peer_state_ != READY) {
+    return -1;
+  }
+  std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+
+  if (it == known_transactions_.end()) {
+    return -1;
+  } else {
+    transaction = it->second;
+  }
+
+  if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) {
+    logger_->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str());
+    return -1;
+  }
+
+  if (transaction->getDirection() != SEND) {
+    logger_->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str());
+    return -1;
+  }
+
+  if (transaction->current_transfers_ > 0) {
+    ret = writeResponse(transaction, CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION");
+    if (ret <= 0) {
+      return -1;
+    }
+  }
+  // start to read the packet
+  uint32_t numAttributes = packet->_attributes.size();
+  ret = transaction->getStream().write(numAttributes);
+  if (ret != 4) {
+    return -1;
+  }
+
+  std::map<std::string, std::string>::iterator itAttribute;
+  for (itAttribute = packet->_attributes.begin(); itAttribute != packet->_attributes.end(); itAttribute++) {
+    ret = transaction->getStream().writeUTF(itAttribute->first, true);
+
+    if (ret <= 0) {
+      return -1;
+    }
+    ret = transaction->getStream().writeUTF(itAttribute->second, true);
+    if (ret <= 0) {
+      return -1;
+    }
+    logger_->log_info("Site2Site transaction %s send attribute key %s value %s", transactionID.c_str(), itAttribute->first.c_str(), itAttribute->second.c_str());
+  }
+
+  uint64_t len = 0;
+  if (flowFile) {
+    len = flowFile->getSize();
+    ret = transaction->getStream().write(len);
+    if (ret != 8) {
+      logger_->log_info("ret != 8");
+      return -1;
+    }
+    if (flowFile->getSize() > 0) {
+      sitetosite::ReadCallback callback(packet);
+      session->read(flowFile, &callback);
+      if (flowFile->getSize() != packet->_size) {
+        logger_->log_info("MisMatched sizes %d %d", flowFile->getSize(), packet->_size);
+        return -2;
+      }
+    }
+    if (packet->payload_.length() == 0 && len == 0) {
+      if (flowFile->getResourceClaim() == nullptr)
+        logger_->log_debug("no claim");
+      else
+        logger_->log_debug("Flowfile empty %s", flowFile->getResourceClaim()->getContentFullPath());
+    }
+  } else if (packet->payload_.length() > 0) {
+    len = packet->payload_.length();
+
+    ret = transaction->getStream().write(len);
+    if (ret != 8) {
+      return -1;
+    }
+
+    ret = transaction->getStream().writeData(reinterpret_cast<uint8_t *>(const_cast<char*>(packet->payload_.c_str())), len);
+    if (ret != len) {
+      logger_->log_info("ret != len");
+      return -1;
+    }
+    packet->_size += len;
+  }
+
+  transaction->current_transfers_++;
+  transaction->total_transfers_++;
+  transaction->_state = DATA_EXCHANGED;
+  transaction->_bytes += len;
+  logger_->log_info("Site2Site transaction %s send flow record %d, total length %d", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes);
+
+  return 0;
+}
+
+bool SiteToSiteClient::receive(std::string transactionID, DataPacket *packet, bool &eof) {
+  int ret;
+  std::shared_ptr<Transaction> transaction = NULL;
+
+  if (peer_state_ != READY) {
+    bootstrap();
+  }
+
+  if (peer_state_ != READY) {
+    return false;
+  }
+
+  auto it = this->known_transactions_.find(transactionID);
+
+  if (it == known_transactions_.end()) {
+    return false;
+  } else {
+    transaction = it->second;
+  }
+
+  if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) {
+    logger_->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str());
+    return false;
+  }
+
+  if (transaction->getDirection() != RECEIVE) {
+    logger_->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str());
+    return false;
+  }
+
+  if (!transaction->isDataAvailable()) {
+    eof = true;
+    return true;
+  }
+
+  if (transaction->current_transfers_ > 0) {
+    // if we already has transfer before, check to see whether another one is available
+    RespondCode code;
+    std::string message;
+
+    ret = readResponse(transaction, code, message);
+
+    if (ret <= 0) {
+      return false;
+    }
+    if (code == CONTINUE_TRANSACTION) {
+      logger_->log_info("Site2Site transaction %s peer indicate continue transaction", transactionID.c_str());
+      transaction->_dataAvailable = true;
+    } else if (code == FINISH_TRANSACTION) {
+      logger_->log_info("Site2Site transaction %s peer indicate finish transaction", transactionID.c_str());
+      transaction->_dataAvailable = false;
+      eof = true;
+      return true;
+    } else {
+      logger_->log_info("Site2Site transaction %s peer indicate wrong respond code %d", transactionID.c_str(), code);
+      return false;
+    }
+  }
+
+  if (!transaction->isDataAvailable()) {
+    eof = true;
+    return true;
+  }
+
+  // start to read the packet
+  uint32_t numAttributes;
+  ret = transaction->getStream().read(numAttributes);
+  if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) {
+    return false;
+  }
+
+  // read the attributes
+  logger_->log_info("Site2Site transaction %s receives attribute key %llu", transactionID.c_str(), numAttributes);
+  for (unsigned int i = 0; i < numAttributes; i++) {
+    std::string key;
+    std::string value;
+    ret = transaction->getStream().readUTF(key, true);
+    if (ret <= 0) {
+      return false;
+    }
+    ret = transaction->getStream().readUTF(value, true);
+    if (ret <= 0) {
+      return false;
+    }
+    packet->_attributes[key] = value;
+    logger_->log_info("Site2Site transaction %s receives attribute key %s value %s", transactionID.c_str(), key.c_str(), value.c_str());
+  }
+
+  uint64_t len;
+  ret = transaction->getStream().read(len);
+  if (ret <= 0) {
+    return false;
+  }
+
+  packet->_size = len;
+  if (len > 0) {
+    transaction->current_transfers_++;
+    transaction->total_transfers_++;
+  } else {
+    logger_->log_info("Site2Site transaction %s receives attribute ?", transactionID);
+    transaction->_dataAvailable = false;
+    eof = true;
+    return true;
+  }
+  transaction->_state = DATA_EXCHANGED;
+  transaction->_bytes += len;
+  logger_->log_info("Site2Site transaction %s receives flow record %d, total length %d", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes);
+
+  return true;
+}
+
+bool SiteToSiteClient::receiveFlowFiles(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+  uint64_t bytes = 0;
+  int transfers = 0;
+  std::shared_ptr<Transaction> transaction = NULL;
+
+  if (peer_state_ != READY) {
+    if (!bootstrap()) {
+      return false;
+    }
+  }
+
+  if (peer_state_ != READY) {
+    context->yield();
+    tearDown();
+    throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
+  }
+
+  // Create the transaction
+  std::string transactionID;
+  transaction = createTransaction(transactionID, RECEIVE);
+
+  if (transaction == NULL) {
+    context->yield();
+    tearDown();
+    throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
+  }
+
+  try {
+    while (true) {
+      std::map<std::string, std::string> empty;
+      uint64_t startTime = getTimeMillis();
+      std::string payload;
+      DataPacket packet(getLogger(), transaction, empty, payload);
+      bool eof = false;
+
+      if (!receive(transactionID, &packet, eof)) {
+        throw Exception(SITE2SITE_EXCEPTION, "Receive Failed");
+      }
+      if (eof) {
+        // transaction done
+        break;
+      }
+      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+
+      if (!flowFile) {
+        throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed");
+      }
+      std::map<std::string, std::string>::iterator it;
+      std::string sourceIdentifier;
+      for (it = packet._attributes.begin(); it != packet._attributes.end(); it++) {
+        if (it->first == FlowAttributeKey(UUID))
+          sourceIdentifier = it->second;
+        flowFile->addAttribute(it->first, it->second);
+      }
+
+      if (packet._size > 0) {
+        sitetosite::WriteCallback callback(&packet);
+        session->write(flowFile, &callback);
+        if (flowFile->getSize() != packet._size) {
+          throw Exception(SITE2SITE_EXCEPTION, "Receive Size Not Right");
+        }
+      }
+      core::Relationship relation;  // undefined relationship
+      uint64_t endTime = getTimeMillis();
+      std::string transitUri = peer_->getURL() + "/" + sourceIdentifier;
+      std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host=" + peer_->getHostName();
+      session->getProvenanceReporter()->receive(flowFile, transitUri, sourceIdentifier, details, endTime - startTime);
+      session->transfer(flowFile, relation);
+      // receive the transfer for the flow record
+      bytes += packet._size;
+      transfers++;
+    }  // while true
+
+    if (transfers > 0 && !confirm(transactionID)) {
+      throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed");
+    }
+    if (!complete(transactionID)) {
+      std::stringstream transaction_str;
+      transaction_str << "Complete Transaction " << transactionID << " Failed";
+      throw Exception(SITE2SITE_EXCEPTION, transaction_str.str().c_str());
+    }
+    logger_->log_info("Site2Site transaction %s successfully receive flow record %d, content bytes %d", transactionID.c_str(), transfers, bytes);
+    // we yield the receive if we did not get anything
+    if (transfers == 0)
+      context->yield();
+  } catch (std::exception &exception) {
+    if (transaction)
+      deleteTransaction(transactionID);
+    context->yield();
+    tearDown();
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    if (transaction)
+      deleteTransaction(transactionID);
+    context->yield();
+    tearDown();
+    logger_->log_debug("Caught Exception during RawSiteToSiteClient::receiveFlowFiles");
+    throw;
+  }
+
+  deleteTransaction(transactionID);
+
+  return true;
+}
+} /* namespace sitetosite */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/src/utils/ByteArrayCallback.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp
new file mode 100644
index 0000000..84edfde
--- /dev/null
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -0,0 +1,155 @@
+/**
+ * @file SiteToSiteProvenanceReportingTask.cpp
+ * SiteToSiteProvenanceReportingTask class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "utils/ByteArrayCallback.h"
+#include <vector>
+#include <utility>
+#include <string>
+#include <memory>
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+int64_t ByteOutputCallback::process(std::shared_ptr<io::BaseStream> stream) {
+  stream->seek(0);
+  if (stream->getSize() > 0) {
+    std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[stream->getSize()]);
+    readFully(buffer.get(), stream->getSize());
+    stream->readData(reinterpret_cast<uint8_t*>(buffer.get()), stream->getSize());
+    return stream->getSize();
+  }
+  return size_.load();
+}
+
+const std::vector<char> ByteOutputCallback::to_string() {
+  std::vector<char> buffer;
+  buffer.resize(size_.load());
+  readFully(buffer.data(), size_.load());
+  return buffer;
+}
+
+void ByteOutputCallback::close() {
+  is_alive_ = false;
+  spinner_.notify_all();
+}
+
+size_t ByteOutputCallback::getSize() {
+  return size_;
+}
+
+bool ByteOutputCallback::waitingOps() {
+  if (vector_lock_.try_lock()) {
+    vector_lock_.unlock();
+    return false;
+  }
+  return true;
+}
+
+void ByteOutputCallback::write(char *data, size_t size) {
+  size_t amount_to_write = size;
+  size_t pos = 0;
+  do {
+    if (size_ > max_size_) {
+      std::unique_lock<std::recursive_mutex> lock(vector_lock_);
+      if (size_ > max_size_) {
+        spinner_.wait(lock, [&] {
+          return size_ < max_size_ || !is_alive_;});
+      }
+      // if we're not alive, we will let the write continue in the event that
+      // we do not wish to lose this data. In the event taht we don't care, we've simply
+      // spent wasted cycles on locking and notification.
+    }
+    {
+      std::lock_guard<std::recursive_mutex> lock(vector_lock_);
+      vec.push(std::string(data + pos, size));
+      size_ += size;
+      pos += size;
+      amount_to_write -= size;
+      spinner_.notify_all();
+    }
+  } while (amount_to_write > 0);
+}
+
+size_t ByteOutputCallback::readFully(char *buffer, size_t size) {
+  return read_current_str(buffer, size);
+}
+
+size_t ByteOutputCallback::read_current_str(char *buffer, size_t size) {
+  size_t amount_to_read = size;
+  size_t curr_buf_pos = 0;
+  do {
+    {
+      std::lock_guard<std::recursive_mutex> lock(vector_lock_);
+
+      if (current_str_pos < current_str.length() && current_str.length() > 0) {
+        size_t str_remaining = current_str.length() - current_str_pos;
+        size_t current_str_read = str_remaining;
+        if (str_remaining > amount_to_read) {
+          current_str_read = amount_to_read;
+        }
+        memcpy(buffer + curr_buf_pos, current_str.data() + current_str_pos, current_str_read);
+        curr_buf_pos += current_str_read;
+        amount_to_read -= current_str_read;
+        current_str_pos += current_str_read;
+        size_ -= current_str_read;
+        if (current_str.length() - current_str_read <= 0) {
+          preload_next_str();
+        }
+      } else {
+        preload_next_str();
+      }
+    }
+    if (size_ < amount_to_read) {
+      {
+        std::unique_lock<std::recursive_mutex> lock(vector_lock_);
+        if (size_ < amount_to_read) {
+          spinner_.wait(lock, [&] {
+            return size_ >= amount_to_read || !is_alive_;});
+        }
+
+        if (size_ == 0 && !is_alive_) {
+          return 0;
+        }
+      }
+      std::lock_guard<std::recursive_mutex> lock(vector_lock_);
+      preload_next_str();
+    }
+  } while (amount_to_read > 0 && (is_alive_ || size_ > 0) && current_str.length() > 0);
+
+  spinner_.notify_all();
+  return size - amount_to_read;
+}
+
+void ByteOutputCallback::preload_next_str() {
+  // reset the current str.
+  current_str = "";
+  if (!vec.empty()) {
+    current_str = std::move(vec.front());
+    current_str_pos = 0;
+    vec.pop();
+  } else {
+  }
+}
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/TestBase.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index c18153c..d61bdac 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -144,7 +144,8 @@ void TestPlan::reset() {
   }
 }
 
-bool TestPlan::runNextProcessor(std::function<void(core::ProcessContext*, core::ProcessSession*)> verify) {
+
+bool TestPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) {
   if (!finalized) {
     finalize();
   }
@@ -164,7 +165,7 @@ bool TestPlan::runNextProcessor(std::function<void(core::ProcessContext*, core::
   processor->incrementActiveTasks();
   processor->setScheduledState(core::ScheduledState::RUNNING);
   if (verify != nullptr) {
-    verify(context.get(), current_session.get());
+    verify(context, current_session);
   } else {
     logger_->log_info("Running %s", processor->getName());
     processor->onTrigger(context, current_session);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/TestBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 4eba0a8..c4bc7d7 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -115,7 +115,7 @@ class LogTestController {
   std::ostringstream log_output;
 
   std::shared_ptr<logging::Logger> logger_;
-   private:
+ private:
   class TestBootstrapLogger : public logging::Logger {
    public:
     TestBootstrapLogger(std::shared_ptr<spdlog::logger> logger)
@@ -164,7 +164,7 @@ class TestPlan {
 
   void reset();
 
-  bool runNextProcessor(std::function<void(core::ProcessContext*, core::ProcessSession*)> verify = nullptr);
+  bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
 
   std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
 
@@ -184,7 +184,6 @@ class TestPlan {
 
  protected:
 
-
   std::shared_ptr<logging::Logger> logger_;
 
   void finalize();
@@ -230,8 +229,7 @@ class TestController {
     utils::IdGenerator::getIdGenerator()->initialize(std::make_shared<minifi::Properties>());
   }
 
-  std::shared_ptr<TestPlan> createPlan()
-  {
+  std::shared_ptr<TestPlan> createPlan() {
     std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
     std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
 
@@ -242,16 +240,15 @@ class TestController {
     return std::make_shared<TestPlan>(content_repo, flow_repo, repo);
   }
 
-  void runSession(std::shared_ptr<TestPlan> &plan, bool runToCompletion = true, std::function<void(core::ProcessContext*, core::ProcessSession*)> verify = nullptr) {
 
-    while (plan->runNextProcessor(verify) && runToCompletion)
-    {
+  void runSession(std::shared_ptr<TestPlan> &plan, bool runToCompletion = true, std::function<void(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>&)> verify =
+                      nullptr) {
+
+    while (plan->runNextProcessor(verify) && runToCompletion) {
 
     }
   }
 
-
-
   ~TestController() {
     for (auto dir : directories) {
       DIR *created_dir;
@@ -282,8 +279,6 @@ class TestController {
 
  protected:
 
-
-
   std::mutex test_mutex;
   //std::map<std::string,>
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/curl-tests/C2NullConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/C2NullConfiguration.cpp b/libminifi/test/curl-tests/C2NullConfiguration.cpp
index de8d3b8..d163a21 100644
--- a/libminifi/test/curl-tests/C2NullConfiguration.cpp
+++ b/libminifi/test/curl-tests/C2NullConfiguration.cpp
@@ -92,11 +92,9 @@ class VerifyC2Server : public IntegrationBase {
     std::string url = "";
     inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
 
-    std::cout << "url is " << url << std::endl;
 
     std::string port, scheme, path;
     parse_http_components(url, port, scheme, path);
-    std::cout << "path is " << path << std::endl;
     configuration->set("c2.agent.protocol.class", "null");
     configuration->set("c2.rest.url", "");
     configuration->set("c2.rest.url.ack", "");

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/curl-tests/C2VerifyServeResults.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/C2VerifyServeResults.cpp b/libminifi/test/curl-tests/C2VerifyServeResults.cpp
index 101fc59..625a5f2 100644
--- a/libminifi/test/curl-tests/C2VerifyServeResults.cpp
+++ b/libminifi/test/curl-tests/C2VerifyServeResults.cpp
@@ -92,11 +92,9 @@ class VerifyC2Server : public IntegrationBase {
     std::string url = "";
     inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
 
-    std::cout << "url is " << url << std::endl;
 
     std::string port, scheme, path;
     parse_http_components(url, port, scheme, path);
-    std::cout << "path is " << path << std::endl;
     configuration->set("c2.agent.heartbeat.reporter.classes", "RESTReceiver");
     configuration->set("c2.rest.listener.port", port);
     configuration->set("c2.agent.heartbeat.period", "10");

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/curl-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/CMakeLists.txt b/libminifi/test/curl-tests/CMakeLists.txt
index 37e8b25..e162c7c 100644
--- a/libminifi/test/curl-tests/CMakeLists.txt
+++ b/libminifi/test/curl-tests/CMakeLists.txt
@@ -29,6 +29,7 @@ FOREACH(testfile ${CURL_INTEGRATION_TESTS})
 	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/http-curl/client/")
 	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/http-curl/processors/")
 	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/http-curl/protocols/")
+	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/http-curl/sitetosite/")
     target_link_libraries(${testfilename} ${CURL_LIBRARIES} )
     createTests("${testfilename}")
     if (APPLE)    
@@ -46,8 +47,8 @@ add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.y
 add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml"  "${TEST_RESOURCES}/")
 add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" "${TEST_RESOURCES}/")
 add_test(NAME HttpPostIntegrationTestChunked COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/")
-add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
-add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
-add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8082/nifi-api/controller")
+add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/C2VerifyServeResults.yml" "${TEST_RESOURCES}/")
+add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/C2VerifyHeartbeatAndStop.yml" "${TEST_RESOURCES}/")
+add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8077/nifi-api/controller")
 add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/")
 add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/")

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/curl-tests/ControllerServiceIntegrationTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/ControllerServiceIntegrationTests.cpp b/libminifi/test/curl-tests/ControllerServiceIntegrationTests.cpp
index 7a4ee35..d34d65e 100644
--- a/libminifi/test/curl-tests/ControllerServiceIntegrationTests.cpp
+++ b/libminifi/test/curl-tests/ControllerServiceIntegrationTests.cpp
@@ -123,7 +123,6 @@ int main(int argc, char **argv) {
     ssl_client = std::static_pointer_cast<minifi::controllers::SSLContextService>(ssl_client_cont->getControllerServiceImplementation());
   }
   assert(ssl_client->getCACertificate().length() > 0);
-  std::cout << "Disabling ID" << std::endl;
   // now let's disable one of the controller services.
   std::shared_ptr<core::controller::ControllerServiceNode> cs_id = controller->getControllerServiceNode("ID");
   assert(cs_id != nullptr);
@@ -133,7 +132,6 @@ int main(int argc, char **argv) {
     disabled = true;
     waitToVerifyProcessor();
   }
-  std::cout << "Disabled ID" << std::endl;
   {
     std::lock_guard<std::mutex> lock(control_mutex);
     controller->enableControllerService(cs_id);
@@ -141,7 +139,6 @@ int main(int argc, char **argv) {
     waitToVerifyProcessor();
   }
   std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995");
-  std::cout << "Disabling MockItLikeIts1995" << std::endl;
   assert(cs_id->enabled());
 {
     std::lock_guard<std::mutex> lock(control_mutex);
@@ -149,7 +146,6 @@ int main(int argc, char **argv) {
     disabled = true;
     waitToVerifyProcessor();
   }
-std::cout << "Disabled MockItLikeIts1995" << std::endl;
     assert(cs_id->enabled() == false);
 {
     std::lock_guard<std::mutex> lock(control_mutex);
@@ -157,7 +153,6 @@ std::cout << "Disabled MockItLikeIts1995" << std::endl;
     disabled = false;
     waitToVerifyProcessor();
   }
-std::cout << "Enabled ref for MockItLikeIts1995" << std::endl;
   assert(cs_id->enabled() == true);
 
   controller->waitUnload(60000);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/curl-tests/HTTPSiteToSiteTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/HTTPSiteToSiteTests.cpp b/libminifi/test/curl-tests/HTTPSiteToSiteTests.cpp
new file mode 100644
index 0000000..775739d
--- /dev/null
+++ b/libminifi/test/curl-tests/HTTPSiteToSiteTests.cpp
@@ -0,0 +1,262 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CURLOPT_SSL_VERIFYPEER_DISABLE 1
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "CivetServer.h"
+#include "sitetosite/HTTPProtocol.h"
+#include "InvokeHTTP.h"
+#include "../TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "../include/core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "io/StreamFactory.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "../TestServer.h"
+#include "../integration/IntegrationBase.h"
+#include "sitetositehttp/HTTPHandlers.h"
+#include "client/HTTPStream.h"
+
+class SiteToSiteTestHarness : public IntegrationBase {
+ public:
+  explicit SiteToSiteTestHarness(bool isSecure)
+      : isSecure(isSecure) {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+  }
+
+  void testSetup() {
+    LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>();
+    LogTestController::getInstance().setDebug<minifi::sitetosite::HttpSiteToSiteClient>();
+    LogTestController::getInstance().setDebug<minifi::sitetosite::SiteToSiteClient>();
+    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+    LogTestController::getInstance().setTrace<minifi::controllers::SSLContextService>();
+    LogTestController::getInstance().setInfo<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<core::ConfigurableComponent>();
+
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+
+    configuration->set("nifi.c2.enable", "false");
+    configuration->set("nifi.remote.input.http.enabled", "true");
+    configuration->set("nifi.remote.input.socket.port", "8082");
+  }
+
+  virtual void waitToVerifyProcessor() {
+    std::this_thread::sleep_for(std::chrono::seconds(3));
+  }
+
+  void cleanup() {
+    unlink(ss.str().c_str());
+  }
+
+  void runAssertions() {
+  }
+
+ protected:
+  bool isSecure;
+  char *dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+struct test_profile {
+  test_profile()
+      : flow_url_broken(false),
+        transaction_url_broken(false),
+        empty_transaction_url(false),
+        no_delete(false),
+        invalid_checksum(false) {
+  }
+
+  bool allFalse() const {
+    return !flow_url_broken && !transaction_url_broken && !empty_transaction_url && !no_delete && !invalid_checksum;
+  }
+  // tests for a broken flow file url
+  bool flow_url_broken;
+  // transaction url will return incorrect information
+  bool transaction_url_broken;
+  // Location will be absent within the
+  bool empty_transaction_url;
+  // delete url is not supported.
+  bool no_delete;
+  // invalid checksum error
+  bool invalid_checksum;
+};
+
+void run_variance(std::string test_file_location, bool isSecure, std::string url, const struct test_profile &profile) {
+  SiteToSiteTestHarness harness(isSecure);
+
+  SiteToSiteLocationResponder responder(isSecure);
+
+  TransactionResponder transaction_response(url, "471deef6-2a6e-4a7d-912a-81cc17e3a204", true, profile.transaction_url_broken, profile.empty_transaction_url);
+
+  std::string transaction_id = transaction_response.getTransactionId();
+
+  harness.setKeyDir("");
+
+  std::string controller_loc = url + "/controller";
+
+  harness.setUrl(controller_loc, &responder);
+
+  std::string transaction_url = url + "/data-transfer/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions";
+  std::string action_url = url + "/site-to-site/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions";
+
+  std::string transaction_output_url = url + "/data-transfer/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions";
+  std::string action_output_url = url + "/site-to-site/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions";
+
+  harness.setUrl(transaction_url, &transaction_response);
+
+  std::string peer_url = url + "/site-to-site/peers";
+
+  PeerResponder peer_response(url);
+
+  harness.setUrl(peer_url, &peer_response);
+
+  std::string flow_url = action_url + "/" + transaction_id + "/flow-files";
+
+  FlowFileResponder flowResponder(true, profile.flow_url_broken, profile.invalid_checksum);
+  flowResponder.setFlowUrl(flow_url);
+  auto producedFlows = flowResponder.getFlows();
+
+  TransactionResponder transaction_response_output(url, "471deef6-2a6e-4a7d-912a-81cc17e3a203", false, profile.transaction_url_broken, profile.empty_transaction_url);
+  std::string transaction_output_id = transaction_response_output.getTransactionId();
+  transaction_response_output.setFeed(producedFlows);
+
+  harness.setUrl(transaction_output_url, &transaction_response_output);
+
+  std::string flow_output_url = action_output_url + "/" + transaction_output_id + "/flow-files";
+
+  FlowFileResponder flowOutputResponder(false, profile.flow_url_broken, profile.invalid_checksum);
+  flowOutputResponder.setFlowUrl(flow_output_url);
+  flowOutputResponder.setFeed(producedFlows);
+
+  harness.setUrl(flow_url, &flowResponder);
+  harness.setUrl(flow_output_url, &flowOutputResponder);
+
+  if (!profile.no_delete) {
+    std::string delete_url = transaction_url + "/" + transaction_id;
+    DeleteTransactionResponder deleteResponse(delete_url, "201 OK", 12);
+    harness.setUrl(delete_url, &deleteResponse);
+
+    std::string delete_output_url = transaction_output_url + "/" + transaction_output_id;
+    DeleteTransactionResponder deleteOutputResponse(delete_output_url, "201 OK", producedFlows);
+    harness.setUrl(delete_output_url, &deleteOutputResponse);
+  }
+
+  harness.run(test_file_location);
+
+  std::stringstream assertStr;
+  if (profile.allFalse()) {
+    assertStr << "Site2Site transaction " << transaction_id << " peer finished transaction";
+    assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+  } else if (profile.empty_transaction_url) {
+    assert(LogTestController::getInstance().contains("Location is empty") == true);
+  } else if (profile.transaction_url_broken) {
+    assert(LogTestController::getInstance().contains("Could not create transaction, intent is ohstuff") == true);
+  } else if (profile.invalid_checksum) {
+    assertStr << "Site2Site transaction " << transaction_id << " peer confirm transaction with CRC Imawrongchecksumshortandstout";
+    assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+    assertStr.str(std::string());
+    assertStr << "Site2Site transaction " << transaction_id << " CRC not matched";
+    assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+    assertStr.str(std::string());
+    assertStr << "Site2Site delete transaction " << transaction_id;
+    assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+  } else if (profile.no_delete) {
+    assert(LogTestController::getInstance().contains("Received 401 response code from delete") == true);
+  } else {
+    assertStr << "Site2Site transaction " << transaction_id << " peer unknown respond code 254";
+    assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+  }
+  LogTestController::getInstance().reset();
+}
+
+int main(int argc, char **argv) {
+  transaction_id = 0;
+  transaction_id_output = 0;
+  std::string key_dir, test_file_location, url;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+    url = argv[3];
+  }
+
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
+
+  {
+    struct test_profile profile;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    struct test_profile profile;
+    profile.flow_url_broken = true;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    struct test_profile profile;
+    profile.empty_transaction_url = true;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    struct test_profile profile;
+    profile.transaction_url_broken = true;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    struct test_profile profile;
+    profile.no_delete = true;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    struct test_profile profile;
+    profile.invalid_checksum = true;
+    run_variance(test_file_location, isSecure, url, profile);
+  }
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/curl-tests/sitetositehttp/CivetStream.h
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/sitetositehttp/CivetStream.h b/libminifi/test/curl-tests/sitetositehttp/CivetStream.h
new file mode 100644
index 0000000..571b0ca
--- /dev/null
+++ b/libminifi/test/curl-tests/sitetositehttp/CivetStream.h
@@ -0,0 +1,138 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_
+#define EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_
+
+#include <memory>
+#include <thread>
+#include <mutex>
+#include <future>
+#include <vector>
+
+#include "io/BaseStream.h"
+#include "civetweb.h"
+#include "CivetServer.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+class CivetStream : public io::BaseStream {
+ public:
+  /**
+   * File Stream constructor that accepts an fstream shared pointer.
+   * It must already be initialized for read and write.
+   */
+  explicit CivetStream(struct mg_connection *conn)
+      : io::BaseStream(), conn(conn) {
+
+  }
+
+  virtual ~CivetStream() {
+  }
+  /**
+   * Skip to the specified offset.
+   * @param offset offset to which we will skip
+   */
+  void seek(uint64_t offset){
+
+  }
+
+  const uint64_t getSize() const {
+    return BaseStream::readBuffer;
+  }
+
+  // data stream extensions
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(std::vector<uint8_t> &buf, int buflen) {
+    if (buf.capacity() < buflen) {
+      buf.resize(buflen);
+    }
+    int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen);
+
+    if (ret < buflen) {
+      buf.resize(ret);
+    }
+    return ret;
+  }
+
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(uint8_t *buf, int buflen) {
+    return mg_read(conn,buf,buflen);
+  }
+
+  /**
+   * Write value to the stream using std::vector
+   * @param buf incoming buffer
+   * @param buflen buffer to write
+   *
+   */
+  virtual int writeData(std::vector<uint8_t> &buf, int buflen) {
+    return 0;
+  }
+
+  /**
+   * writes value to stream
+   * @param value value to write
+   * @param size size of value
+   */
+  virtual int writeData(uint8_t *value, int size) {
+    return 0;
+  }
+
+ protected:
+
+  /**
+   * Creates a vector and returns the vector using the provided
+   * type name.
+   * @param t incoming object
+   * @returns vector.
+   */
+  template<typename T>
+  inline std::vector<uint8_t> readBuffer(const T& t) {
+    std::vector<uint8_t> buf;
+    buf.resize(sizeof t);
+    readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+    return buf;
+  }
+
+  void reset();
+
+  //size_t pos;
+  struct mg_connection *conn;
+
+ private:
+
+  std::shared_ptr<logging::Logger> logger_;
+};
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h b/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h
new file mode 100644
index 0000000..3fe4623
--- /dev/null
+++ b/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h
@@ -0,0 +1,322 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "civetweb.h"
+#include "CivetServer.h"
+#include "concurrentqueue.h"
+
+
+
+#include "CivetStream.h"
+#include "io/CRCStream.h"
+#ifndef LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_
+#define LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_
+static std::atomic<int> transaction_id;
+static std::atomic<int> transaction_id_output;
+
+class FlowObj {
+ public:
+  FlowObj()
+      : total_size(0) {
+
+  }
+  explicit FlowObj(const FlowObj &&other)
+      : attributes(std::move(other.attributes)),
+        total_size(std::move(other.total_size)),
+        data(std::move(other.data)) {
+
+  }
+  uint64_t total_size;
+  std::map<std::string, std::string> attributes;
+  std::vector<uint8_t> data;
+
+};
+
+class SiteToSiteLocationResponder : public CivetHandler {
+ public:
+  explicit SiteToSiteLocationResponder(bool isSecure)
+      : isSecure(isSecure) {
+  }
+  bool handleGet(CivetServer *server, struct mg_connection *conn) {
+    std::string site2site_rest_resp = "{"
+        "\"revision\": {"
+        "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\""
+        "},"
+        "\"controller\": {"
+        "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\","
+        "\"name\": \"NiFi Flow\","
+        "\"siteToSiteSecure\": ";
+    site2site_rest_resp += (isSecure ? "true" : "false");
+    site2site_rest_resp += "}}";
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+              site2site_rest_resp.length());
+    mg_printf(conn, "%s", site2site_rest_resp.c_str());
+    return true;
+  }
+
+ protected:
+  bool isSecure;
+};
+
+class PeerResponder : public CivetHandler {
+ public:
+
+  explicit PeerResponder(const std::string base_url)
+      : base_url(base_url) {
+  }
+
+  bool handleGet(CivetServer *server, struct mg_connection *conn) {
+    std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": \"localhost\", \"port\": 8082,  \"secure\": false, \"flowFileCount\" : 0 }] }";
+    std::stringstream headers;
+    headers << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n";
+    mg_printf(conn, "%s", headers.str().c_str());
+    mg_printf(conn, "%s", site2site_rest_resp.c_str());
+    return true;
+  }
+
+ protected:
+  std::string base_url;
+};
+
+class TransactionResponder : public CivetHandler {
+ public:
+
+  explicit TransactionResponder(const std::string base_url, std::string port_id, bool input_port, bool wrong_uri, bool empty_transaction_uri)
+      : base_url(base_url),
+        wrong_uri(wrong_uri),
+        empty_transaction_uri(empty_transaction_uri),
+        input_port(input_port),
+        port_id(port_id),
+        flow_files_feed_(nullptr) {
+
+    if (input_port) {
+      transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de96";
+      transaction_id_str += std::to_string(transaction_id.load());
+      transaction_id++;
+    } else {
+      transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de95";
+      transaction_id_str += std::to_string(transaction_id_output.load());
+      transaction_id_output++;
+    }
+  }
+
+  bool handlePost(CivetServer *server, struct mg_connection *conn) {
+    std::string site2site_rest_resp = "";
+    std::stringstream headers;
+    headers << "HTTP/1.1 201 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nx-location-uri-intent: ";
+    if (wrong_uri)
+      headers << "ohstuff\r\n";
+    else
+      headers << "transaction-url\r\n";
+
+    std::string port_type;
+
+    if (input_port)
+      port_type = "input-ports";
+    else
+      port_type = "output-ports";
+    if (!empty_transaction_uri)
+      headers << "Location: " << base_url << "/site-to-site/" << port_type << "/" << port_id << "/transactions/" << transaction_id_str << "\r\n";
+    headers << "Connection: close\r\n\r\n";
+    mg_printf(conn, "%s", headers.str().c_str());
+    mg_printf(conn, "%s", site2site_rest_resp.c_str());
+    return true;
+  }
+
+  void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) {
+    flow_files_feed_ = feed;
+  }
+
+  std::string getTransactionId() {
+    return transaction_id_str;
+  }
+ protected:
+  moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
+  std::string transaction_id_str;
+  std::string base_url;bool wrong_uri;bool empty_transaction_uri;bool input_port;
+  std::string port_id;
+};
+
+class FlowFileResponder : public CivetHandler {
+
+  moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> flow_files_;
+  moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
+
+ public:
+
+  explicit FlowFileResponder(bool input_port, bool wrong_uri, bool invalid_checksum)
+      : wrong_uri(wrong_uri),
+        input_port(input_port),
+        flow_files_feed_(nullptr),
+        invalid_checksum(invalid_checksum) {
+  }
+
+  moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *getFlows() {
+    return &flow_files_;
+  }
+
+  void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) {
+    flow_files_feed_ = feed;
+  }
+
+  bool handlePost(CivetServer *server, struct mg_connection *conn) {
+    std::string site2site_rest_resp = "";
+    std::stringstream headers;
+
+    if (!wrong_uri) {
+      minifi::io::CivetStream civet_stream(conn);
+      minifi::io::CRCStream<minifi::io::CivetStream> stream(&civet_stream);
+      uint32_t num_attributes;
+      uint64_t total_size = 0;
+      total_size += stream.read(num_attributes);
+
+      auto flow = std::make_shared<FlowObj>();
+
+      for (int i = 0; i < num_attributes; i++) {
+        std::string name, value;
+        total_size += stream.readUTF(name, true);
+        total_size += stream.readUTF(value, true);
+        flow->attributes[name] = value;
+      }
+      uint64_t length;
+      total_size += stream.read(length);
+
+      total_size += length;
+      flow->data.resize(length);
+      flow->total_size = total_size;
+
+      assert(stream.readData(flow->data.data(), length) == length);
+
+      assert(flow->attributes["path"] == ".");
+      assert(!flow->attributes["uuid"].empty());
+      assert(!flow->attributes["filename"].empty());
+
+      if (!invalid_checksum) {
+        site2site_rest_resp = std::to_string(stream.getCRC());
+        flow_files_.enqueue(flow);
+      } else {
+        site2site_rest_resp = "Imawrongchecksumshortandstout";
+      }
+
+      headers << "HTTP/1.1 202 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n";
+    } else {
+      headers << "HTTP/1.1 404\r\nConnection: close\r\n\r\n";
+    }
+
+    mg_printf(conn, "%s", headers.str().c_str());
+    mg_printf(conn, "%s", site2site_rest_resp.c_str());
+    return true;
+  }
+
+  bool handleGet(CivetServer *server, struct mg_connection *conn) {
+
+    if (flow_files_feed_->size_approx() > 0) {
+      std::shared_ptr<FlowObj> flow;
+      uint8_t buf[1];
+      std::vector<std::shared_ptr<FlowObj>> flows;
+      uint64_t total = 0;
+
+      while (flow_files_feed_->try_dequeue(flow)) {
+        flows.push_back(flow);
+        total += flow->total_size;
+      }
+      mg_printf(conn, "HTTP/1.1 200 OK\r\n"
+                "Content-Length: %llu\r\n"
+                "Content-Type: application/octet-stream\r\n"
+                "Connection: close\r\n\r\n",
+                total);
+      minifi::io::BaseStream serializer;
+      minifi::io::CRCStream<minifi::io::BaseStream> stream(&serializer);
+      for (auto flow : flows) {
+        uint32_t num_attributes = flow->attributes.size();
+        stream.write(num_attributes);
+        for (auto entry : flow->attributes) {
+          stream.writeUTF(entry.first);
+          stream.writeUTF(entry.second);
+        }
+        uint64_t length = flow->data.size();
+        stream.write(length);
+        stream.writeData(flow->data.data(), length);
+      }
+      auto ret = mg_write(conn, serializer.getBuffer(), total);
+    } else {
+      std::cout << "Nothing to transfer feed" << std::endl;
+      mg_printf(conn, "HTTP/1.1 200 OK\r\nConnection: "
+                "close\r\nContent-Length: 0\r\n");
+      mg_printf(conn, "Content-Type: text/plain\r\n\r\n");
+
+    }
+    return true;
+  }
+
+  void setFlowUrl(std::string flowUrl) {
+    base_url = flowUrl;
+  }
+
+ protected:
+// base url
+  std::string base_url;
+// set the wrong url
+  bool wrong_uri;
+// we are running an input port
+  bool input_port;
+// invalid checksum is returned.
+  bool invalid_checksum;
+};
+
+class DeleteTransactionResponder : public CivetHandler {
+ public:
+
+  explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, int expected_resp_code)
+      : base_url(base_url),
+        response_code(response_code),
+        flow_files_feed_(nullptr) {
+    expected_resp_code_str = std::to_string(expected_resp_code);
+  }
+
+  explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed)
+      : base_url(base_url),
+        response_code(response_code),
+        flow_files_feed_(feed) {
+  }
+
+  bool handleDelete(CivetServer *server, struct mg_connection *conn) {
+
+    std::string site2site_rest_resp = "";
+    std::stringstream headers;
+    std::string resp;
+    CivetServer::getParam(conn, "responseCode", resp);
+    headers << "HTTP/1.1 " << response_code << "\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\n";
+    headers << "Connection: close\r\n\r\n";
+    mg_printf(conn, "%s", headers.str().c_str());
+    mg_printf(conn, "%s", site2site_rest_resp.c_str());
+    return true;
+  }
+
+  void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) {
+    flow_files_feed_ = feed;
+  }
+
+ protected:
+  moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
+  std::string base_url;
+  std::string expected_resp_code_str;
+  std::string response_code;
+};
+
+#endif /* LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/integration/IntegrationBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index a854508..7626cdf 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -41,9 +41,7 @@ int ssl_enable(void *ssl_context, void *user_data) {
   return 0;
 }
 
-void waitToVerifyProcessor() {
-  std::this_thread::sleep_for(std::chrono::seconds(3));
-}
+
 
 class IntegrationBase {
  public:
@@ -66,6 +64,10 @@ class IntegrationBase {
 
   virtual void runAssertions() = 0;
 
+  virtual void waitToVerifyProcessor() {
+    std::this_thread::sleep_for(std::chrono::seconds(3));
+  }
+
  protected:
 
   virtual void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
@@ -160,6 +162,11 @@ void IntegrationBase::setUrl(std::string url, CivetHandler *handler) {
   parse_http_components(url, port, scheme, path);
   struct mg_callbacks callback;
   if (url.find("localhost") != std::string::npos) {
+
+    if (server != nullptr){
+      server->addHandler(path,handler);
+      return;
+    }
     if (scheme == "https" && !key_dir.empty()) {
       std::string cert = "";
       cert = key_dir + "nifi-cert.pem";

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/integration/ProvenanceReportingTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp
index 5845767..7db235f 100644
--- a/libminifi/test/integration/ProvenanceReportingTest.cpp
+++ b/libminifi/test/integration/ProvenanceReportingTest.cpp
@@ -73,7 +73,7 @@ int main(int argc, char **argv) {
   std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
   ptr.release();
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 10001, 1);
+  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 10005, 1);
 
   controller->load();
   controller->start();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/C2VerifyHeartbeatAndStop.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/C2VerifyHeartbeatAndStop.yml b/libminifi/test/resources/C2VerifyHeartbeatAndStop.yml
new file mode 100644
index 0000000..a2b0747
--- /dev/null
+++ b/libminifi/test/resources/C2VerifyHeartbeatAndStop.yml
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+    name: MiNiFi Flow
+    id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+    - name: invoke
+      id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      class: org.apache.nifi.processors.standard.InvokeHTTP
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+          HTTP Method: GET
+          Remote URL: http://localhost:10015/geturl
+    - name: LogAttribute
+      id: 2438e3c8-015a-1000-79ca-83af40ec1992
+      class: org.apache.nifi.processors.standard.LogAttribute
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list: response
+      Properties:
+        Log Level: info
+        Log Payload: true
+
+Connections:
+    - name: TransferFilesToRPG
+      id: 2438e3c8-015a-1000-79ca-83af40ec1997
+      source name: invoke
+      source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      source relationship name: success
+      destination name: LogAttribute
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec
+    - name: TransferFilesToRPG2
+      id: 2438e3c8-015a-1000-79ca-83af40ec1917
+      source name: LogAttribute
+      source id: 2438e3c8-015a-1000-79ca-83af40ec1992
+      destination name: LogAttribute
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992  
+      source relationship name: success
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec
+
+Remote Processing Groups:
+    

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/C2VerifyServeResults.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/C2VerifyServeResults.yml b/libminifi/test/resources/C2VerifyServeResults.yml
new file mode 100644
index 0000000..18db3dd
--- /dev/null
+++ b/libminifi/test/resources/C2VerifyServeResults.yml
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+    name: MiNiFi Flow
+    id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+    - name: invoke
+      id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      class: org.apache.nifi.processors.standard.InvokeHTTP
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+          HTTP Method: GET
+          Remote URL: http://localhost:10013/geturl
+    - name: LogAttribute
+      id: 2438e3c8-015a-1000-79ca-83af40ec1992
+      class: org.apache.nifi.processors.standard.LogAttribute
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list: response
+      Properties:
+        Log Level: info
+        Log Payload: true
+
+Connections:
+    - name: TransferFilesToRPG
+      id: 2438e3c8-015a-1000-79ca-83af40ec1997
+      source name: invoke
+      source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      source relationship name: success
+      destination name: LogAttribute
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec
+    - name: TransferFilesToRPG2
+      id: 2438e3c8-015a-1000-79ca-83af40ec1917
+      source name: LogAttribute
+      source id: 2438e3c8-015a-1000-79ca-83af40ec1992
+      destination name: LogAttribute
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992  
+      source relationship name: success
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec
+
+Remote Processing Groups:
+    

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/TestHTTPGetSecure.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestHTTPGetSecure.yml b/libminifi/test/resources/TestHTTPGetSecure.yml
index 1ac82bd..a5a0bee 100644
--- a/libminifi/test/resources/TestHTTPGetSecure.yml
+++ b/libminifi/test/resources/TestHTTPGetSecure.yml
@@ -33,7 +33,7 @@ Processors:
       Properties:
           SSL Context Service: SSLContextService
           HTTP Method: GET
-          Remote URL: https://localhost:10003/geturl
+          Remote URL: https://localhost:10004/geturl
           Disable Peer Verification: true
     - name: LogAttribute
       id: 2438e3c8-015a-1000-79ca-83af40ec1992

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/TestHTTPPost.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestHTTPPost.yml b/libminifi/test/resources/TestHTTPPost.yml
index 32e4f42..e4dbf88 100644
--- a/libminifi/test/resources/TestHTTPPost.yml
+++ b/libminifi/test/resources/TestHTTPPost.yml
@@ -43,7 +43,7 @@ Processors:
       auto-terminated relationships list:
       Properties:
           Base Path: urlofchampions
-          Listening Port: 10004
+          Listening Port: 10007
     - name: Invoke
       id: 2438e3c8-015a-1000-79ca-83af40ec1992
       class: org.apache.nifi.processors.standard.InvokeHTTP
@@ -58,7 +58,7 @@ Processors:
       Properties:
           HTTP Method: POST
           Content-type: text/html
-          Remote URL: http://localhost:10004/urlofchampions
+          Remote URL: http://localhost:10007/urlofchampions
     - name: Loggit
       id: 2438e3c8-015a-1000-79ca-83af40ec1993
       class: org.apache.nifi.processors.standard.LogAttribute

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml b/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml
index 110783c..f31f3e7 100644
--- a/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml
+++ b/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml
@@ -43,7 +43,7 @@ Processors:
       auto-terminated relationships list:
       Properties:
           Base Path: urlofchampions
-          Listening Port: 10004
+          Listening Port: 10006
     - name: Invoke
       id: 2438e3c8-015a-1000-79ca-83af40ec1992
       class: org.apache.nifi.processors.standard.InvokeHTTP
@@ -59,7 +59,7 @@ Processors:
           HTTP Method: POST
           Use Chunked Encoding: true
           Content-type: text/html
-          Remote URL: http://localhost:10004/urlofchampions
+          Remote URL: http://localhost:10006/urlofchampions
     - name: Loggit
       id: 2438e3c8-015a-1000-79ca-83af40ec1993
       class: org.apache.nifi.processors.standard.LogAttribute

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/TestHTTPSiteToSite.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestHTTPSiteToSite.yml b/libminifi/test/resources/TestHTTPSiteToSite.yml
new file mode 100644
index 0000000..582f48b
--- /dev/null
+++ b/libminifi/test/resources/TestHTTPSiteToSite.yml
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Flow Controller:
+    id: 471deef6-2a6e-4a7d-912a-81cc17e3a205
+    name: MiNiFi Flow
+
+Processors:
+    - name: GetFile
+      id: 471deef6-2a6e-4a7d-912a-81cc17e3a206
+      class: org.apache.nifi.processors.standard.GenerateFlowFile
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 10 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+          Input Directory: /tmp/site2siteGetFile
+          Keep Source File: true
+    - name: GetFile
+      id: 471deef6-2a6e-4a7d-912a-81cc17e3a205
+      class: org.apache.nifi.processors.standard.LogAttribute
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 10 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+
+
+Connections:
+    - name: GenerateFlowFileS2S
+      id: 471deef6-2a6e-4a7d-912a-81cc17e3a207
+      source id: 471deef6-2a6e-4a7d-912a-81cc17e3a206 
+      source relationship name: success
+      destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec
+      queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+    - name: GenerateFlowFileS2S
+      id: 471deef6-2a6e-4a7d-912a-81cc17e3a207
+      source id: 471deef6-2a6e-4a7d-912a-81cc17e3a203 
+      source relationship name: success
+      destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a205
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec
+      queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+
+Remote Processing Groups:
+    - name: NiFi Flow
+      id: 471deef6-2a6e-4a7d-912a-81cc17e3a208
+      url: http://localhost:8082/nifi
+      timeout: 30 secs
+      yield period: 1 sec
+      Input Ports:
+          - id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+            name: From Node A
+            max concurrent tasks: 1
+            use compression: false
+            Properties: # Deviates from spec and will later be removed when this is autonegotiated
+                Port UUID: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+                Port: 8082
+                Host Name: 127.0.0.1
+      Output Ports:
+          - id: 471deef6-2a6e-4a7d-912a-81cc17e3a203
+            name: To Node A
+            max concurrent tasks: 1
+            use compression: false
+            Properties: # Deviates from spec and will later be removed when this is autonegotiated
+                Port UUID: 471deef6-2a6e-4a7d-912a-81cc17e3a203
+                Port: 8082
+                Host Name: 127.0.0.1
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/TestSite2SiteRest.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestSite2SiteRest.yml b/libminifi/test/resources/TestSite2SiteRest.yml
index ca751b5..87534a8 100644
--- a/libminifi/test/resources/TestSite2SiteRest.yml
+++ b/libminifi/test/resources/TestSite2SiteRest.yml
@@ -46,7 +46,7 @@ Connections:
 Remote Processing Groups:
     - name: NiFi Flow
       id: 471deef6-2a6e-4a7d-912a-81cc17e3a208
-      url: http://localhost:8082/nifi
+      url: http://localhost:8077/nifi
       timeout: 30 secs
       yield period: 10 sec
       Input Ports:
@@ -56,3 +56,5 @@ Remote Processing Groups:
             use compression: false
             Properties: # Deviates from spec and will later be removed when this is autonegotiated
                 Port UUID: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+                Port: 8077
+                Host Name: 127.0.0.1

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/TestSite2SiteRestSecure.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestSite2SiteRestSecure.yml b/libminifi/test/resources/TestSite2SiteRestSecure.yml
index fd530ae..5fa572c 100644
--- a/libminifi/test/resources/TestSite2SiteRestSecure.yml
+++ b/libminifi/test/resources/TestSite2SiteRestSecure.yml
@@ -46,7 +46,7 @@ Connections:
 Remote Processing Groups:
     - name: NiFi Flow
       id: 471deef6-2a6e-4a7d-912a-81cc17e3a208
-      url: https://localhost:8082/nifi
+      url: https://localhost:8072/nifi
       timeout: 30 secs
       yield period: 10 sec
       Input Ports:

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/ThreadPoolAdjust.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/ThreadPoolAdjust.yml b/libminifi/test/resources/ThreadPoolAdjust.yml
new file mode 100644
index 0000000..8b3c989
--- /dev/null
+++ b/libminifi/test/resources/ThreadPoolAdjust.yml
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+    name: MiNiFi Flow
+    id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+    - name: generate
+      id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      class: org.apache.nifi.processors.standard.GenerateFlowFile
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+    - name: listen
+      id: 2438e3c8-015a-1000-79ca-83af40ec1994
+      class: org.apache.nifi.processors.standard.ListenHTTP
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+          Base Path: urlofchampions
+          Listening Port: 10016
+    - name: Invoke
+      id: 2438e3c8-015a-1000-79ca-83af40ec1992
+      class: org.apache.nifi.processors.standard.InvokeHTTP
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list: 
+          - success
+      Properties:
+          HTTP Method: POST
+          Use Chunked Encoding: true
+          Content-type: text/html
+          Remote URL: http://localhost:10016/urlofchampions
+    - name: Loggit
+      id: 2438e3c8-015a-1000-79ca-83af40ec1993
+      class: org.apache.nifi.processors.standard.LogAttribute
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list: 
+          - success
+      Properties:
+          LogLevel: debug
+
+Connections:    
+    - name: GenerateFlowFile/Invoke
+      id: 2438e3c8-015a-1000-79ca-83af40ec1997
+      source name: invoke
+      source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      source relationship name: success
+      destination name: LogAttribute
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec
+    - name: Listen/Loggit
+      id: 2438e3c8-015a-1000-79ca-83af40ec1918
+      source id: 2438e3c8-015a-1000-79ca-83af40ec1994
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1993
+      source relationship name: success
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec
+Remote Processing Groups:
+