You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/01 20:27:16 UTC
[2/6] nifi-minifi-cpp git commit: MINIFICPP-60: Add initial
implementation of Site to Site changes.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/src/sitetosite/SiteToSiteClient.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
new file mode 100644
index 0000000..e3a4719
--- /dev/null
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -0,0 +1,773 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "sitetosite/SiteToSiteClient.h"
+#include <map>
+#include <string>
+#include <memory>
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sitetosite {
+
+int SiteToSiteClient::writeRequestType(RequestType type) {
+ if (type >= MAX_REQUEST_TYPE)
+ return -1;
+
+ return peer_->writeUTF(RequestTypeStr[type]);
+}
+
+int SiteToSiteClient::readRequestType(RequestType &type) {
+ std::string requestTypeStr;
+
+ int ret = peer_->readUTF(requestTypeStr);
+
+ if (ret <= 0)
+ return ret;
+
+ for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) {
+ if (RequestTypeStr[i] == requestTypeStr) {
+ type = (RequestType) i;
+ return ret;
+ }
+ }
+
+ return -1;
+}
+
+int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction> &transaction, RespondCode &code, std::string &message) {
+ uint8_t firstByte;
+
+ int ret = peer_->read(firstByte);
+
+ if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1)
+ return -1;
+
+ uint8_t secondByte;
+
+ ret = peer_->read(secondByte);
+
+ if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2)
+ return -1;
+
+ uint8_t thirdByte;
+
+ ret = peer_->read(thirdByte);
+
+ if (ret <= 0)
+ return ret;
+
+ code = (RespondCode) thirdByte;
+
+ RespondCodeContext *resCode = this->getRespondCodeContext(code);
+
+ if (resCode == NULL) {
+ // Not a valid respond code
+ return -1;
+ }
+ if (resCode->hasDescription) {
+ ret = peer_->readUTF(message);
+ if (ret <= 0)
+ return -1;
+ }
+ return 3 + message.size();
+}
+
+void SiteToSiteClient::deleteTransaction(std::string transactionID) {
+ std::shared_ptr<Transaction> transaction = NULL;
+
+ std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+
+ if (it == known_transactions_.end()) {
+ return;
+ } else {
+ transaction = it->second;
+ }
+
+ logger_->log_info("Site2Site delete transaction %s", transaction->getUUIDStr().c_str());
+ known_transactions_.erase(transactionID);
+}
+
+int SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message) {
+ RespondCodeContext *resCode = this->getRespondCodeContext(code);
+
+ if (resCode == NULL) {
+ // Not a valid respond code
+ return -1;
+ }
+
+ uint8_t codeSeq[3];
+ codeSeq[0] = CODE_SEQUENCE_VALUE_1;
+ codeSeq[1] = CODE_SEQUENCE_VALUE_2;
+ codeSeq[2] = (uint8_t) code;
+
+ int ret = peer_->write(codeSeq, 3);
+
+ if (ret != 3)
+ return -1;
+
+ if (resCode->hasDescription) {
+ ret = peer_->writeUTF(message);
+ if (ret > 0) {
+ return (3 + ret);
+ } else {
+ return ret;
+ }
+ } else {
+ return 3;
+ }
+}
+
+void SiteToSiteClient::tearDown() {
+ if (peer_state_ >= ESTABLISHED) {
+ logger_->log_info("Site2Site Protocol tearDown");
+ // need to write shutdown request
+ writeRequestType(SHUTDOWN);
+ }
+
+ known_transactions_.clear();
+ peer_->Close();
+ peer_state_ = IDLE;
+}
+
+bool SiteToSiteClient::transferFlowFiles(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+ std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast<FlowFileRecord>(session->get());
+
+ std::shared_ptr<Transaction> transaction = NULL;
+
+ if (!flow) {
+ return false;
+ }
+
+ if (peer_state_ != READY) {
+ if (!bootstrap())
+ return false;
+ }
+
+ if (peer_state_ != READY) {
+ context->yield();
+ tearDown();
+ throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
+ }
+
+ // Create the transaction
+ std::string transactionID;
+ transaction = createTransaction(transactionID, SEND);
+ if (transaction == NULL) {
+ context->yield();
+ tearDown();
+ throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
+ }
+
+ bool continueTransaction = true;
+ uint64_t startSendingNanos = getTimeNano();
+
+ try {
+ while (continueTransaction) {
+ uint64_t startTime = getTimeMillis();
+ std::string payload;
+ DataPacket packet(getLogger(), transaction, flow->getAttributes(), payload);
+
+ int16_t resp = send(transactionID, &packet, flow, session);
+ if (resp == -1) {
+ throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
+ }
+
+ logger_->log_info("Site2Site transaction %s send flow record %s", transactionID.c_str(), flow->getUUIDStr().c_str());
+ if (resp == 0) {
+ uint64_t endTime = getTimeMillis();
+ std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
+ std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + peer_->getHostName();
+ session->getProvenanceReporter()->send(flow, transitUri, details, endTime - startTime, false);
+ }
+ session->remove(flow);
+
+ uint64_t transferNanos = getTimeNano() - startSendingNanos;
+ if (transferNanos > _batchSendNanos)
+ break;
+
+ flow = std::static_pointer_cast<FlowFileRecord>(session->get());
+
+ if (!flow) {
+ continueTransaction = false;
+ }
+ } // while true
+
+ if (!confirm(transactionID)) {
+ std::stringstream ss;
+ ss << "Confirm Failed for " << transactionID;
+ throw Exception(SITE2SITE_EXCEPTION, ss.str().c_str());
+ }
+ if (!complete(transactionID)) {
+ std::stringstream ss;
+ ss << "Complete Failed for " << transactionID;
+ throw Exception(SITE2SITE_EXCEPTION, ss.str().c_str());
+ }
+ logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes);
+ } catch (std::exception &exception) {
+ if (transaction)
+ deleteTransaction(transactionID);
+ context->yield();
+ tearDown();
+ logger_->log_debug("Caught Exception %s", exception.what());
+ throw;
+ } catch (...) {
+ if (transaction)
+ deleteTransaction(transactionID);
+ context->yield();
+ tearDown();
+ logger_->log_debug("Caught Exception during SiteToSiteClient::transferFlowFiles");
+ throw;
+ }
+
+ deleteTransaction(transactionID);
+
+ return true;
+}
+
+bool SiteToSiteClient::confirm(std::string transactionID) {
+ int ret;
+ std::shared_ptr<Transaction> transaction = NULL;
+
+ if (peer_state_ != READY) {
+ bootstrap();
+ }
+
+ if (peer_state_ != READY) {
+ return false;
+ }
+
+ std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+
+ if (it == known_transactions_.end()) {
+ return false;
+ } else {
+ transaction = it->second;
+ }
+
+ if (transaction->getState() == TRANSACTION_STARTED && !transaction->isDataAvailable() && transaction->getDirection() == RECEIVE) {
+ transaction->_state = TRANSACTION_CONFIRMED;
+ return true;
+ }
+
+ if (transaction->getState() != DATA_EXCHANGED)
+ return false;
+
+ if (transaction->getDirection() == RECEIVE) {
+ if (transaction->isDataAvailable())
+ return false;
+ // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
+ // to peer so that we can verify that the connection is still open. This is a two-phase commit,
+ // which helps to prevent the chances of data duplication. Without doing this, we may commit the
+ // session and then when we send the response back to the peer, the peer may have timed out and may not
+ // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
+ // Critical Section involved in this transaction so that rather than the Critical Section being the
+ // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
+ int64_t crcValue = transaction->getCRC();
+ std::string crc = std::to_string(crcValue);
+ logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s", transaction->getCRC(), transactionID.c_str());
+ ret = writeResponse(transaction, CONFIRM_TRANSACTION, crc);
+ if (ret <= 0)
+ return false;
+ RespondCode code;
+ std::string message;
+ readResponse(transaction, code, message);
+ if (ret <= 0)
+ return false;
+
+ if (code == CONFIRM_TRANSACTION) {
+ logger_->log_info("Site2Site transaction %s peer confirm transaction", transactionID.c_str());
+ transaction->_state = TRANSACTION_CONFIRMED;
+ return true;
+ } else if (code == BAD_CHECKSUM) {
+ logger_->log_info("Site2Site transaction %s peer indicate bad checksum", transactionID.c_str());
+ return false;
+ } else {
+ logger_->log_info("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code);
+ return false;
+ }
+ } else {
+ logger_->log_info("Site2Site Send FINISH TRANSACTION for transaction %s", transactionID.c_str());
+ ret = writeResponse(transaction, FINISH_TRANSACTION, "FINISH_TRANSACTION");
+ if (ret <= 0)
+ return false;
+ RespondCode code;
+ std::string message;
+ readResponse(transaction, code, message);
+
+ // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
+ if (code == CONFIRM_TRANSACTION) {
+ logger_->log_info("Site2Site transaction %s peer confirm transaction with CRC %s", transactionID.c_str(), message.c_str());
+ if (this->_currentVersion > 3) {
+ int64_t crcValue = transaction->getCRC();
+ std::string crc = std::to_string(crcValue);
+ if (message == crc) {
+ logger_->log_info("Site2Site transaction %s CRC matched", transactionID.c_str());
+ ret = writeResponse(transaction, CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
+ if (ret <= 0)
+ return false;
+ transaction->_state = TRANSACTION_CONFIRMED;
+ return true;
+ } else {
+ logger_->log_info("Site2Site transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str());
+ ret = writeResponse(transaction, BAD_CHECKSUM, "BAD_CHECKSUM");
+ return false;
+ }
+ }
+ ret = writeResponse(transaction, CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
+ if (ret <= 0)
+ return false;
+ transaction->_state = TRANSACTION_CONFIRMED;
+ return true;
+ } else {
+ logger_->log_info("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code);
+ return false;
+ }
+ return false;
+ }
+}
+
+void SiteToSiteClient::cancel(std::string transactionID) {
+ std::shared_ptr<Transaction> transaction = NULL;
+
+ if (peer_state_ != READY) {
+ return;
+ }
+
+ std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+
+ if (it == known_transactions_.end()) {
+ return;
+ } else {
+ transaction = it->second;
+ }
+
+ if (transaction->getState() == TRANSACTION_CANCELED || transaction->getState() == TRANSACTION_COMPLETED || transaction->getState() == TRANSACTION_ERROR) {
+ return;
+ }
+
+ this->writeResponse(transaction, CANCEL_TRANSACTION, "Cancel");
+ transaction->_state = TRANSACTION_CANCELED;
+
+ tearDown();
+ return;
+}
+
+void SiteToSiteClient::error(std::string transactionID) {
+ std::shared_ptr<Transaction> transaction = NULL;
+
+ std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+
+ if (it == known_transactions_.end()) {
+ return;
+ } else {
+ transaction = it->second;
+ }
+
+ transaction->_state = TRANSACTION_ERROR;
+ tearDown();
+ return;
+}
+
+// Complete the transaction
+bool SiteToSiteClient::complete(std::string transactionID) {
+ int ret;
+ std::shared_ptr<Transaction> transaction = NULL;
+
+ if (peer_state_ != READY) {
+ bootstrap();
+ }
+
+ if (peer_state_ != READY) {
+ return false;
+ }
+
+ auto it = this->known_transactions_.find(transactionID);
+
+ if (it == known_transactions_.end()) {
+ return false;
+ } else {
+ transaction = it->second;
+ }
+
+ if (transaction->total_transfers_ > 0 && transaction->getState() != TRANSACTION_CONFIRMED) {
+ return false;
+ }
+ if (transaction->getDirection() == RECEIVE) {
+ if (transaction->current_transfers_ == 0) {
+ transaction->_state = TRANSACTION_COMPLETED;
+ return true;
+ } else {
+ logger_->log_info("Site2Site transaction %s send finished", transactionID.c_str());
+ ret = this->writeResponse(transaction, TRANSACTION_FINISHED, "Finished");
+ if (ret <= 0) {
+ return false;
+ } else {
+ transaction->_state = TRANSACTION_COMPLETED;
+ return true;
+ }
+ }
+ } else {
+ RespondCode code;
+ std::string message;
+ int ret;
+
+ ret = readResponse(transaction, code, message);
+
+ if (ret <= 0)
+ return false;
+
+ if (code == TRANSACTION_FINISHED) {
+ logger_->log_info("Site2Site transaction %s peer finished transaction", transactionID.c_str());
+ transaction->_state = TRANSACTION_COMPLETED;
+ return true;
+ } else {
+ logger_->log_info("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code);
+ return false;
+ }
+ }
+}
+
+int16_t SiteToSiteClient::send(std::string transactionID, DataPacket *packet, const std::shared_ptr<FlowFileRecord> &flowFile, const std::shared_ptr<core::ProcessSession> &session) {
+ int ret;
+ std::shared_ptr<Transaction> transaction = NULL;
+
+ if (flowFile && !flowFile->getResourceClaim()->exists()) {
+ logger_->log_info("Claim %s does not exist for FlowFile %s", flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr());
+ return -2;
+ }
+ if (peer_state_ != READY) {
+ bootstrap();
+ }
+
+ if (peer_state_ != READY) {
+ return -1;
+ }
+ std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+
+ if (it == known_transactions_.end()) {
+ return -1;
+ } else {
+ transaction = it->second;
+ }
+
+ if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) {
+ logger_->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str());
+ return -1;
+ }
+
+ if (transaction->getDirection() != SEND) {
+ logger_->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str());
+ return -1;
+ }
+
+ if (transaction->current_transfers_ > 0) {
+ ret = writeResponse(transaction, CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION");
+ if (ret <= 0) {
+ return -1;
+ }
+ }
+ // start to read the packet
+ uint32_t numAttributes = packet->_attributes.size();
+ ret = transaction->getStream().write(numAttributes);
+ if (ret != 4) {
+ return -1;
+ }
+
+ std::map<std::string, std::string>::iterator itAttribute;
+ for (itAttribute = packet->_attributes.begin(); itAttribute != packet->_attributes.end(); itAttribute++) {
+ ret = transaction->getStream().writeUTF(itAttribute->first, true);
+
+ if (ret <= 0) {
+ return -1;
+ }
+ ret = transaction->getStream().writeUTF(itAttribute->second, true);
+ if (ret <= 0) {
+ return -1;
+ }
+ logger_->log_info("Site2Site transaction %s send attribute key %s value %s", transactionID.c_str(), itAttribute->first.c_str(), itAttribute->second.c_str());
+ }
+
+ uint64_t len = 0;
+ if (flowFile) {
+ len = flowFile->getSize();
+ ret = transaction->getStream().write(len);
+ if (ret != 8) {
+ logger_->log_info("ret != 8");
+ return -1;
+ }
+ if (flowFile->getSize() > 0) {
+ sitetosite::ReadCallback callback(packet);
+ session->read(flowFile, &callback);
+ if (flowFile->getSize() != packet->_size) {
+ logger_->log_info("MisMatched sizes %d %d", flowFile->getSize(), packet->_size);
+ return -2;
+ }
+ }
+ if (packet->payload_.length() == 0 && len == 0) {
+ if (flowFile->getResourceClaim() == nullptr)
+ logger_->log_debug("no claim");
+ else
+ logger_->log_debug("Flowfile empty %s", flowFile->getResourceClaim()->getContentFullPath());
+ }
+ } else if (packet->payload_.length() > 0) {
+ len = packet->payload_.length();
+
+ ret = transaction->getStream().write(len);
+ if (ret != 8) {
+ return -1;
+ }
+
+ ret = transaction->getStream().writeData(reinterpret_cast<uint8_t *>(const_cast<char*>(packet->payload_.c_str())), len);
+ if (ret != len) {
+ logger_->log_info("ret != len");
+ return -1;
+ }
+ packet->_size += len;
+ }
+
+ transaction->current_transfers_++;
+ transaction->total_transfers_++;
+ transaction->_state = DATA_EXCHANGED;
+ transaction->_bytes += len;
+ logger_->log_info("Site2Site transaction %s send flow record %d, total length %d", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes);
+
+ return 0;
+}
+
+bool SiteToSiteClient::receive(std::string transactionID, DataPacket *packet, bool &eof) {
+ int ret;
+ std::shared_ptr<Transaction> transaction = NULL;
+
+ if (peer_state_ != READY) {
+ bootstrap();
+ }
+
+ if (peer_state_ != READY) {
+ return false;
+ }
+
+ auto it = this->known_transactions_.find(transactionID);
+
+ if (it == known_transactions_.end()) {
+ return false;
+ } else {
+ transaction = it->second;
+ }
+
+ if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) {
+ logger_->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str());
+ return false;
+ }
+
+ if (transaction->getDirection() != RECEIVE) {
+ logger_->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str());
+ return false;
+ }
+
+ if (!transaction->isDataAvailable()) {
+ eof = true;
+ return true;
+ }
+
+ if (transaction->current_transfers_ > 0) {
+ // if we already has transfer before, check to see whether another one is available
+ RespondCode code;
+ std::string message;
+
+ ret = readResponse(transaction, code, message);
+
+ if (ret <= 0) {
+ return false;
+ }
+ if (code == CONTINUE_TRANSACTION) {
+ logger_->log_info("Site2Site transaction %s peer indicate continue transaction", transactionID.c_str());
+ transaction->_dataAvailable = true;
+ } else if (code == FINISH_TRANSACTION) {
+ logger_->log_info("Site2Site transaction %s peer indicate finish transaction", transactionID.c_str());
+ transaction->_dataAvailable = false;
+ eof = true;
+ return true;
+ } else {
+ logger_->log_info("Site2Site transaction %s peer indicate wrong respond code %d", transactionID.c_str(), code);
+ return false;
+ }
+ }
+
+ if (!transaction->isDataAvailable()) {
+ eof = true;
+ return true;
+ }
+
+ // start to read the packet
+ uint32_t numAttributes;
+ ret = transaction->getStream().read(numAttributes);
+ if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) {
+ return false;
+ }
+
+ // read the attributes
+ logger_->log_info("Site2Site transaction %s receives attribute key %llu", transactionID.c_str(), numAttributes);
+ for (unsigned int i = 0; i < numAttributes; i++) {
+ std::string key;
+ std::string value;
+ ret = transaction->getStream().readUTF(key, true);
+ if (ret <= 0) {
+ return false;
+ }
+ ret = transaction->getStream().readUTF(value, true);
+ if (ret <= 0) {
+ return false;
+ }
+ packet->_attributes[key] = value;
+ logger_->log_info("Site2Site transaction %s receives attribute key %s value %s", transactionID.c_str(), key.c_str(), value.c_str());
+ }
+
+ uint64_t len;
+ ret = transaction->getStream().read(len);
+ if (ret <= 0) {
+ return false;
+ }
+
+ packet->_size = len;
+ if (len > 0) {
+ transaction->current_transfers_++;
+ transaction->total_transfers_++;
+ } else {
+ logger_->log_info("Site2Site transaction %s receives attribute ?", transactionID);
+ transaction->_dataAvailable = false;
+ eof = true;
+ return true;
+ }
+ transaction->_state = DATA_EXCHANGED;
+ transaction->_bytes += len;
+ logger_->log_info("Site2Site transaction %s receives flow record %d, total length %d", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes);
+
+ return true;
+}
+
+bool SiteToSiteClient::receiveFlowFiles(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+ uint64_t bytes = 0;
+ int transfers = 0;
+ std::shared_ptr<Transaction> transaction = NULL;
+
+ if (peer_state_ != READY) {
+ if (!bootstrap()) {
+ return false;
+ }
+ }
+
+ if (peer_state_ != READY) {
+ context->yield();
+ tearDown();
+ throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
+ }
+
+ // Create the transaction
+ std::string transactionID;
+ transaction = createTransaction(transactionID, RECEIVE);
+
+ if (transaction == NULL) {
+ context->yield();
+ tearDown();
+ throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
+ }
+
+ try {
+ while (true) {
+ std::map<std::string, std::string> empty;
+ uint64_t startTime = getTimeMillis();
+ std::string payload;
+ DataPacket packet(getLogger(), transaction, empty, payload);
+ bool eof = false;
+
+ if (!receive(transactionID, &packet, eof)) {
+ throw Exception(SITE2SITE_EXCEPTION, "Receive Failed");
+ }
+ if (eof) {
+ // transaction done
+ break;
+ }
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+
+ if (!flowFile) {
+ throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed");
+ }
+ std::map<std::string, std::string>::iterator it;
+ std::string sourceIdentifier;
+ for (it = packet._attributes.begin(); it != packet._attributes.end(); it++) {
+ if (it->first == FlowAttributeKey(UUID))
+ sourceIdentifier = it->second;
+ flowFile->addAttribute(it->first, it->second);
+ }
+
+ if (packet._size > 0) {
+ sitetosite::WriteCallback callback(&packet);
+ session->write(flowFile, &callback);
+ if (flowFile->getSize() != packet._size) {
+ throw Exception(SITE2SITE_EXCEPTION, "Receive Size Not Right");
+ }
+ }
+ core::Relationship relation; // undefined relationship
+ uint64_t endTime = getTimeMillis();
+ std::string transitUri = peer_->getURL() + "/" + sourceIdentifier;
+ std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host=" + peer_->getHostName();
+ session->getProvenanceReporter()->receive(flowFile, transitUri, sourceIdentifier, details, endTime - startTime);
+ session->transfer(flowFile, relation);
+ // receive the transfer for the flow record
+ bytes += packet._size;
+ transfers++;
+ } // while true
+
+ if (transfers > 0 && !confirm(transactionID)) {
+ throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed");
+ }
+ if (!complete(transactionID)) {
+ std::stringstream transaction_str;
+ transaction_str << "Complete Transaction " << transactionID << " Failed";
+ throw Exception(SITE2SITE_EXCEPTION, transaction_str.str().c_str());
+ }
+ logger_->log_info("Site2Site transaction %s successfully receive flow record %d, content bytes %d", transactionID.c_str(), transfers, bytes);
+ // we yield the receive if we did not get anything
+ if (transfers == 0)
+ context->yield();
+ } catch (std::exception &exception) {
+ if (transaction)
+ deleteTransaction(transactionID);
+ context->yield();
+ tearDown();
+ logger_->log_debug("Caught Exception %s", exception.what());
+ throw;
+ } catch (...) {
+ if (transaction)
+ deleteTransaction(transactionID);
+ context->yield();
+ tearDown();
+ logger_->log_debug("Caught Exception during RawSiteToSiteClient::receiveFlowFiles");
+ throw;
+ }
+
+ deleteTransaction(transactionID);
+
+ return true;
+}
+} /* namespace sitetosite */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/src/utils/ByteArrayCallback.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp
new file mode 100644
index 0000000..84edfde
--- /dev/null
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -0,0 +1,155 @@
+/**
+ * @file SiteToSiteProvenanceReportingTask.cpp
+ * SiteToSiteProvenanceReportingTask class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "utils/ByteArrayCallback.h"
+#include <vector>
+#include <utility>
+#include <string>
+#include <memory>
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+int64_t ByteOutputCallback::process(std::shared_ptr<io::BaseStream> stream) {
+ stream->seek(0);
+ if (stream->getSize() > 0) {
+ std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[stream->getSize()]);
+ readFully(buffer.get(), stream->getSize());
+ stream->readData(reinterpret_cast<uint8_t*>(buffer.get()), stream->getSize());
+ return stream->getSize();
+ }
+ return size_.load();
+}
+
+const std::vector<char> ByteOutputCallback::to_string() {
+ std::vector<char> buffer;
+ buffer.resize(size_.load());
+ readFully(buffer.data(), size_.load());
+ return buffer;
+}
+
+void ByteOutputCallback::close() {
+ is_alive_ = false;
+ spinner_.notify_all();
+}
+
+size_t ByteOutputCallback::getSize() {
+ return size_;
+}
+
+bool ByteOutputCallback::waitingOps() {
+ if (vector_lock_.try_lock()) {
+ vector_lock_.unlock();
+ return false;
+ }
+ return true;
+}
+
+void ByteOutputCallback::write(char *data, size_t size) {
+ size_t amount_to_write = size;
+ size_t pos = 0;
+ do {
+ if (size_ > max_size_) {
+ std::unique_lock<std::recursive_mutex> lock(vector_lock_);
+ if (size_ > max_size_) {
+ spinner_.wait(lock, [&] {
+ return size_ < max_size_ || !is_alive_;});
+ }
+ // if we're not alive, we will let the write continue in the event that
+ // we do not wish to lose this data. In the event taht we don't care, we've simply
+ // spent wasted cycles on locking and notification.
+ }
+ {
+ std::lock_guard<std::recursive_mutex> lock(vector_lock_);
+ vec.push(std::string(data + pos, size));
+ size_ += size;
+ pos += size;
+ amount_to_write -= size;
+ spinner_.notify_all();
+ }
+ } while (amount_to_write > 0);
+}
+
+size_t ByteOutputCallback::readFully(char *buffer, size_t size) {
+ return read_current_str(buffer, size);
+}
+
+size_t ByteOutputCallback::read_current_str(char *buffer, size_t size) {
+ size_t amount_to_read = size;
+ size_t curr_buf_pos = 0;
+ do {
+ {
+ std::lock_guard<std::recursive_mutex> lock(vector_lock_);
+
+ if (current_str_pos < current_str.length() && current_str.length() > 0) {
+ size_t str_remaining = current_str.length() - current_str_pos;
+ size_t current_str_read = str_remaining;
+ if (str_remaining > amount_to_read) {
+ current_str_read = amount_to_read;
+ }
+ memcpy(buffer + curr_buf_pos, current_str.data() + current_str_pos, current_str_read);
+ curr_buf_pos += current_str_read;
+ amount_to_read -= current_str_read;
+ current_str_pos += current_str_read;
+ size_ -= current_str_read;
+ if (current_str.length() - current_str_read <= 0) {
+ preload_next_str();
+ }
+ } else {
+ preload_next_str();
+ }
+ }
+ if (size_ < amount_to_read) {
+ {
+ std::unique_lock<std::recursive_mutex> lock(vector_lock_);
+ if (size_ < amount_to_read) {
+ spinner_.wait(lock, [&] {
+ return size_ >= amount_to_read || !is_alive_;});
+ }
+
+ if (size_ == 0 && !is_alive_) {
+ return 0;
+ }
+ }
+ std::lock_guard<std::recursive_mutex> lock(vector_lock_);
+ preload_next_str();
+ }
+ } while (amount_to_read > 0 && (is_alive_ || size_ > 0) && current_str.length() > 0);
+
+ spinner_.notify_all();
+ return size - amount_to_read;
+}
+
+void ByteOutputCallback::preload_next_str() {
+ // reset the current str.
+ current_str = "";
+ if (!vec.empty()) {
+ current_str = std::move(vec.front());
+ current_str_pos = 0;
+ vec.pop();
+ } else {
+ }
+}
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/TestBase.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index c18153c..d61bdac 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -144,7 +144,8 @@ void TestPlan::reset() {
}
}
-bool TestPlan::runNextProcessor(std::function<void(core::ProcessContext*, core::ProcessSession*)> verify) {
+
+bool TestPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) {
if (!finalized) {
finalize();
}
@@ -164,7 +165,7 @@ bool TestPlan::runNextProcessor(std::function<void(core::ProcessContext*, core::
processor->incrementActiveTasks();
processor->setScheduledState(core::ScheduledState::RUNNING);
if (verify != nullptr) {
- verify(context.get(), current_session.get());
+ verify(context, current_session);
} else {
logger_->log_info("Running %s", processor->getName());
processor->onTrigger(context, current_session);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/TestBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 4eba0a8..c4bc7d7 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -115,7 +115,7 @@ class LogTestController {
std::ostringstream log_output;
std::shared_ptr<logging::Logger> logger_;
- private:
+ private:
class TestBootstrapLogger : public logging::Logger {
public:
TestBootstrapLogger(std::shared_ptr<spdlog::logger> logger)
@@ -164,7 +164,7 @@ class TestPlan {
void reset();
- bool runNextProcessor(std::function<void(core::ProcessContext*, core::ProcessSession*)> verify = nullptr);
+ bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
@@ -184,7 +184,6 @@ class TestPlan {
protected:
-
std::shared_ptr<logging::Logger> logger_;
void finalize();
@@ -230,8 +229,7 @@ class TestController {
utils::IdGenerator::getIdGenerator()->initialize(std::make_shared<minifi::Properties>());
}
- std::shared_ptr<TestPlan> createPlan()
- {
+ std::shared_ptr<TestPlan> createPlan() {
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
@@ -242,16 +240,15 @@ class TestController {
return std::make_shared<TestPlan>(content_repo, flow_repo, repo);
}
- void runSession(std::shared_ptr<TestPlan> &plan, bool runToCompletion = true, std::function<void(core::ProcessContext*, core::ProcessSession*)> verify = nullptr) {
- while (plan->runNextProcessor(verify) && runToCompletion)
- {
+ void runSession(std::shared_ptr<TestPlan> &plan, bool runToCompletion = true, std::function<void(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>&)> verify =
+ nullptr) {
+
+ while (plan->runNextProcessor(verify) && runToCompletion) {
}
}
-
-
~TestController() {
for (auto dir : directories) {
DIR *created_dir;
@@ -282,8 +279,6 @@ class TestController {
protected:
-
-
std::mutex test_mutex;
//std::map<std::string,>
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/curl-tests/C2NullConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/C2NullConfiguration.cpp b/libminifi/test/curl-tests/C2NullConfiguration.cpp
index de8d3b8..d163a21 100644
--- a/libminifi/test/curl-tests/C2NullConfiguration.cpp
+++ b/libminifi/test/curl-tests/C2NullConfiguration.cpp
@@ -92,11 +92,9 @@ class VerifyC2Server : public IntegrationBase {
std::string url = "";
inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
- std::cout << "url is " << url << std::endl;
std::string port, scheme, path;
parse_http_components(url, port, scheme, path);
- std::cout << "path is " << path << std::endl;
configuration->set("c2.agent.protocol.class", "null");
configuration->set("c2.rest.url", "");
configuration->set("c2.rest.url.ack", "");
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/curl-tests/C2VerifyServeResults.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/C2VerifyServeResults.cpp b/libminifi/test/curl-tests/C2VerifyServeResults.cpp
index 101fc59..625a5f2 100644
--- a/libminifi/test/curl-tests/C2VerifyServeResults.cpp
+++ b/libminifi/test/curl-tests/C2VerifyServeResults.cpp
@@ -92,11 +92,9 @@ class VerifyC2Server : public IntegrationBase {
std::string url = "";
inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
- std::cout << "url is " << url << std::endl;
std::string port, scheme, path;
parse_http_components(url, port, scheme, path);
- std::cout << "path is " << path << std::endl;
configuration->set("c2.agent.heartbeat.reporter.classes", "RESTReceiver");
configuration->set("c2.rest.listener.port", port);
configuration->set("c2.agent.heartbeat.period", "10");
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/curl-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/CMakeLists.txt b/libminifi/test/curl-tests/CMakeLists.txt
index 37e8b25..e162c7c 100644
--- a/libminifi/test/curl-tests/CMakeLists.txt
+++ b/libminifi/test/curl-tests/CMakeLists.txt
@@ -29,6 +29,7 @@ FOREACH(testfile ${CURL_INTEGRATION_TESTS})
target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/http-curl/client/")
target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/http-curl/processors/")
target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/http-curl/protocols/")
+ target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/http-curl/sitetosite/")
target_link_libraries(${testfilename} ${CURL_LIBRARIES} )
createTests("${testfilename}")
if (APPLE)
@@ -46,8 +47,8 @@ add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.y
add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml" "${TEST_RESOURCES}/")
add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" "${TEST_RESOURCES}/")
add_test(NAME HttpPostIntegrationTestChunked COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/")
-add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
-add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
-add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8082/nifi-api/controller")
+add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/C2VerifyServeResults.yml" "${TEST_RESOURCES}/")
+add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/C2VerifyHeartbeatAndStop.yml" "${TEST_RESOURCES}/")
+add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8077/nifi-api/controller")
add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/")
add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/")
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/curl-tests/ControllerServiceIntegrationTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/ControllerServiceIntegrationTests.cpp b/libminifi/test/curl-tests/ControllerServiceIntegrationTests.cpp
index 7a4ee35..d34d65e 100644
--- a/libminifi/test/curl-tests/ControllerServiceIntegrationTests.cpp
+++ b/libminifi/test/curl-tests/ControllerServiceIntegrationTests.cpp
@@ -123,7 +123,6 @@ int main(int argc, char **argv) {
ssl_client = std::static_pointer_cast<minifi::controllers::SSLContextService>(ssl_client_cont->getControllerServiceImplementation());
}
assert(ssl_client->getCACertificate().length() > 0);
- std::cout << "Disabling ID" << std::endl;
// now let's disable one of the controller services.
std::shared_ptr<core::controller::ControllerServiceNode> cs_id = controller->getControllerServiceNode("ID");
assert(cs_id != nullptr);
@@ -133,7 +132,6 @@ int main(int argc, char **argv) {
disabled = true;
waitToVerifyProcessor();
}
- std::cout << "Disabled ID" << std::endl;
{
std::lock_guard<std::mutex> lock(control_mutex);
controller->enableControllerService(cs_id);
@@ -141,7 +139,6 @@ int main(int argc, char **argv) {
waitToVerifyProcessor();
}
std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995");
- std::cout << "Disabling MockItLikeIts1995" << std::endl;
assert(cs_id->enabled());
{
std::lock_guard<std::mutex> lock(control_mutex);
@@ -149,7 +146,6 @@ int main(int argc, char **argv) {
disabled = true;
waitToVerifyProcessor();
}
-std::cout << "Disabled MockItLikeIts1995" << std::endl;
assert(cs_id->enabled() == false);
{
std::lock_guard<std::mutex> lock(control_mutex);
@@ -157,7 +153,6 @@ std::cout << "Disabled MockItLikeIts1995" << std::endl;
disabled = false;
waitToVerifyProcessor();
}
-std::cout << "Enabled ref for MockItLikeIts1995" << std::endl;
assert(cs_id->enabled() == true);
controller->waitUnload(60000);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/curl-tests/HTTPSiteToSiteTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/HTTPSiteToSiteTests.cpp b/libminifi/test/curl-tests/HTTPSiteToSiteTests.cpp
new file mode 100644
index 0000000..775739d
--- /dev/null
+++ b/libminifi/test/curl-tests/HTTPSiteToSiteTests.cpp
@@ -0,0 +1,262 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CURLOPT_SSL_VERIFYPEER_DISABLE 1
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "CivetServer.h"
+#include "sitetosite/HTTPProtocol.h"
+#include "InvokeHTTP.h"
+#include "../TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "../include/core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "io/StreamFactory.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "../TestServer.h"
+#include "../integration/IntegrationBase.h"
+#include "sitetositehttp/HTTPHandlers.h"
+#include "client/HTTPStream.h"
+
+class SiteToSiteTestHarness : public IntegrationBase {
+ public:
+ explicit SiteToSiteTestHarness(bool isSecure)
+ : isSecure(isSecure) {
+ char format[] = "/tmp/ssth.XXXXXX";
+ dir = testController.createTempDirectory(format);
+ }
+
+ void testSetup() {
+ LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>();
+ LogTestController::getInstance().setDebug<minifi::sitetosite::HttpSiteToSiteClient>();
+ LogTestController::getInstance().setDebug<minifi::sitetosite::SiteToSiteClient>();
+ LogTestController::getInstance().setDebug<utils::HTTPClient>();
+ LogTestController::getInstance().setTrace<minifi::controllers::SSLContextService>();
+ LogTestController::getInstance().setInfo<minifi::FlowController>();
+ LogTestController::getInstance().setDebug<core::ConfigurableComponent>();
+
+ std::fstream file;
+ ss << dir << "/" << "tstFile.ext";
+ file.open(ss.str(), std::ios::out);
+ file << "tempFile";
+ file.close();
+
+ configuration->set("nifi.c2.enable", "false");
+ configuration->set("nifi.remote.input.http.enabled", "true");
+ configuration->set("nifi.remote.input.socket.port", "8082");
+ }
+
+ virtual void waitToVerifyProcessor() {
+ std::this_thread::sleep_for(std::chrono::seconds(3));
+ }
+
+ void cleanup() {
+ unlink(ss.str().c_str());
+ }
+
+ void runAssertions() {
+ }
+
+ protected:
+ bool isSecure;
+ char *dir;
+ std::stringstream ss;
+ TestController testController;
+};
+
+struct test_profile {
+ test_profile()
+ : flow_url_broken(false),
+ transaction_url_broken(false),
+ empty_transaction_url(false),
+ no_delete(false),
+ invalid_checksum(false) {
+ }
+
+ bool allFalse() const {
+ return !flow_url_broken && !transaction_url_broken && !empty_transaction_url && !no_delete && !invalid_checksum;
+ }
+ // tests for a broken flow file url
+ bool flow_url_broken;
+ // transaction url will return incorrect information
+ bool transaction_url_broken;
+ // Location will be absent within the
+ bool empty_transaction_url;
+ // delete url is not supported.
+ bool no_delete;
+ // invalid checksum error
+ bool invalid_checksum;
+};
+
+void run_variance(std::string test_file_location, bool isSecure, std::string url, const struct test_profile &profile) {
+ SiteToSiteTestHarness harness(isSecure);
+
+ SiteToSiteLocationResponder responder(isSecure);
+
+ TransactionResponder transaction_response(url, "471deef6-2a6e-4a7d-912a-81cc17e3a204", true, profile.transaction_url_broken, profile.empty_transaction_url);
+
+ std::string transaction_id = transaction_response.getTransactionId();
+
+ harness.setKeyDir("");
+
+ std::string controller_loc = url + "/controller";
+
+ harness.setUrl(controller_loc, &responder);
+
+ std::string transaction_url = url + "/data-transfer/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions";
+ std::string action_url = url + "/site-to-site/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions";
+
+ std::string transaction_output_url = url + "/data-transfer/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions";
+ std::string action_output_url = url + "/site-to-site/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions";
+
+ harness.setUrl(transaction_url, &transaction_response);
+
+ std::string peer_url = url + "/site-to-site/peers";
+
+ PeerResponder peer_response(url);
+
+ harness.setUrl(peer_url, &peer_response);
+
+ std::string flow_url = action_url + "/" + transaction_id + "/flow-files";
+
+ FlowFileResponder flowResponder(true, profile.flow_url_broken, profile.invalid_checksum);
+ flowResponder.setFlowUrl(flow_url);
+ auto producedFlows = flowResponder.getFlows();
+
+ TransactionResponder transaction_response_output(url, "471deef6-2a6e-4a7d-912a-81cc17e3a203", false, profile.transaction_url_broken, profile.empty_transaction_url);
+ std::string transaction_output_id = transaction_response_output.getTransactionId();
+ transaction_response_output.setFeed(producedFlows);
+
+ harness.setUrl(transaction_output_url, &transaction_response_output);
+
+ std::string flow_output_url = action_output_url + "/" + transaction_output_id + "/flow-files";
+
+ FlowFileResponder flowOutputResponder(false, profile.flow_url_broken, profile.invalid_checksum);
+ flowOutputResponder.setFlowUrl(flow_output_url);
+ flowOutputResponder.setFeed(producedFlows);
+
+ harness.setUrl(flow_url, &flowResponder);
+ harness.setUrl(flow_output_url, &flowOutputResponder);
+
+ if (!profile.no_delete) {
+ std::string delete_url = transaction_url + "/" + transaction_id;
+ DeleteTransactionResponder deleteResponse(delete_url, "201 OK", 12);
+ harness.setUrl(delete_url, &deleteResponse);
+
+ std::string delete_output_url = transaction_output_url + "/" + transaction_output_id;
+ DeleteTransactionResponder deleteOutputResponse(delete_output_url, "201 OK", producedFlows);
+ harness.setUrl(delete_output_url, &deleteOutputResponse);
+ }
+
+ harness.run(test_file_location);
+
+ std::stringstream assertStr;
+ if (profile.allFalse()) {
+ assertStr << "Site2Site transaction " << transaction_id << " peer finished transaction";
+ assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+ } else if (profile.empty_transaction_url) {
+ assert(LogTestController::getInstance().contains("Location is empty") == true);
+ } else if (profile.transaction_url_broken) {
+ assert(LogTestController::getInstance().contains("Could not create transaction, intent is ohstuff") == true);
+ } else if (profile.invalid_checksum) {
+ assertStr << "Site2Site transaction " << transaction_id << " peer confirm transaction with CRC Imawrongchecksumshortandstout";
+ assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+ assertStr.str(std::string());
+ assertStr << "Site2Site transaction " << transaction_id << " CRC not matched";
+ assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+ assertStr.str(std::string());
+ assertStr << "Site2Site delete transaction " << transaction_id;
+ assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+ } else if (profile.no_delete) {
+ assert(LogTestController::getInstance().contains("Received 401 response code from delete") == true);
+ } else {
+ assertStr << "Site2Site transaction " << transaction_id << " peer unknown respond code 254";
+ assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+ }
+ LogTestController::getInstance().reset();
+}
+
+int main(int argc, char **argv) {
+ transaction_id = 0;
+ transaction_id_output = 0;
+ std::string key_dir, test_file_location, url;
+ if (argc > 1) {
+ test_file_location = argv[1];
+ key_dir = argv[2];
+ url = argv[3];
+ }
+
+ bool isSecure = false;
+ if (url.find("https") != std::string::npos) {
+ isSecure = true;
+ }
+
+ {
+ struct test_profile profile;
+ run_variance(test_file_location, isSecure, url, profile);
+ }
+
+ {
+ struct test_profile profile;
+ profile.flow_url_broken = true;
+ run_variance(test_file_location, isSecure, url, profile);
+ }
+
+ {
+ struct test_profile profile;
+ profile.empty_transaction_url = true;
+ run_variance(test_file_location, isSecure, url, profile);
+ }
+
+ {
+ struct test_profile profile;
+ profile.transaction_url_broken = true;
+ run_variance(test_file_location, isSecure, url, profile);
+ }
+
+ {
+ struct test_profile profile;
+ profile.no_delete = true;
+ run_variance(test_file_location, isSecure, url, profile);
+ }
+
+ {
+ struct test_profile profile;
+ profile.invalid_checksum = true;
+ run_variance(test_file_location, isSecure, url, profile);
+ }
+
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/curl-tests/sitetositehttp/CivetStream.h
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/sitetositehttp/CivetStream.h b/libminifi/test/curl-tests/sitetositehttp/CivetStream.h
new file mode 100644
index 0000000..571b0ca
--- /dev/null
+++ b/libminifi/test/curl-tests/sitetositehttp/CivetStream.h
@@ -0,0 +1,138 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_
+#define EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_
+
+#include <memory>
+#include <thread>
+#include <mutex>
+#include <future>
+#include <vector>
+
+#include "io/BaseStream.h"
+#include "civetweb.h"
+#include "CivetServer.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+class CivetStream : public io::BaseStream {
+ public:
+ /**
+ * File Stream constructor that accepts an fstream shared pointer.
+ * It must already be initialized for read and write.
+ */
+ explicit CivetStream(struct mg_connection *conn)
+ : io::BaseStream(), conn(conn) {
+
+ }
+
+ virtual ~CivetStream() {
+ }
+ /**
+ * Skip to the specified offset.
+ * @param offset offset to which we will skip
+ */
+ void seek(uint64_t offset){
+
+ }
+
+ const uint64_t getSize() const {
+ return BaseStream::readBuffer;
+ }
+
+ // data stream extensions
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+ virtual int readData(std::vector<uint8_t> &buf, int buflen) {
+ if (buf.capacity() < buflen) {
+ buf.resize(buflen);
+ }
+ int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen);
+
+ if (ret < buflen) {
+ buf.resize(ret);
+ }
+ return ret;
+ }
+
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+ virtual int readData(uint8_t *buf, int buflen) {
+ return mg_read(conn,buf,buflen);
+ }
+
+ /**
+ * Write value to the stream using std::vector
+ * @param buf incoming buffer
+ * @param buflen buffer to write
+ *
+ */
+ virtual int writeData(std::vector<uint8_t> &buf, int buflen) {
+ return 0;
+ }
+
+ /**
+ * writes value to stream
+ * @param value value to write
+ * @param size size of value
+ */
+ virtual int writeData(uint8_t *value, int size) {
+ return 0;
+ }
+
+ protected:
+
+ /**
+ * Creates a vector and returns the vector using the provided
+ * type name.
+ * @param t incoming object
+ * @returns vector.
+ */
+ template<typename T>
+ inline std::vector<uint8_t> readBuffer(const T& t) {
+ std::vector<uint8_t> buf;
+ buf.resize(sizeof t);
+ readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+ return buf;
+ }
+
+ void reset();
+
+ //size_t pos;
+ struct mg_connection *conn;
+
+ private:
+
+ std::shared_ptr<logging::Logger> logger_;
+};
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h b/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h
new file mode 100644
index 0000000..3fe4623
--- /dev/null
+++ b/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h
@@ -0,0 +1,322 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "civetweb.h"
+#include "CivetServer.h"
+#include "concurrentqueue.h"
+
+
+
+#include "CivetStream.h"
+#include "io/CRCStream.h"
+#ifndef LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_
+#define LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_
+static std::atomic<int> transaction_id;
+static std::atomic<int> transaction_id_output;
+
+class FlowObj {
+ public:
+ FlowObj()
+ : total_size(0) {
+
+ }
+ explicit FlowObj(const FlowObj &&other)
+ : attributes(std::move(other.attributes)),
+ total_size(std::move(other.total_size)),
+ data(std::move(other.data)) {
+
+ }
+ uint64_t total_size;
+ std::map<std::string, std::string> attributes;
+ std::vector<uint8_t> data;
+
+};
+
+class SiteToSiteLocationResponder : public CivetHandler {
+ public:
+ explicit SiteToSiteLocationResponder(bool isSecure)
+ : isSecure(isSecure) {
+ }
+ bool handleGet(CivetServer *server, struct mg_connection *conn) {
+ std::string site2site_rest_resp = "{"
+ "\"revision\": {"
+ "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\""
+ "},"
+ "\"controller\": {"
+ "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\","
+ "\"name\": \"NiFi Flow\","
+ "\"siteToSiteSecure\": ";
+ site2site_rest_resp += (isSecure ? "true" : "false");
+ site2site_rest_resp += "}}";
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ site2site_rest_resp.length());
+ mg_printf(conn, "%s", site2site_rest_resp.c_str());
+ return true;
+ }
+
+ protected:
+ bool isSecure;
+};
+
+class PeerResponder : public CivetHandler {
+ public:
+
+ explicit PeerResponder(const std::string base_url)
+ : base_url(base_url) {
+ }
+
+ bool handleGet(CivetServer *server, struct mg_connection *conn) {
+ std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": \"localhost\", \"port\": 8082, \"secure\": false, \"flowFileCount\" : 0 }] }";
+ std::stringstream headers;
+ headers << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n";
+ mg_printf(conn, "%s", headers.str().c_str());
+ mg_printf(conn, "%s", site2site_rest_resp.c_str());
+ return true;
+ }
+
+ protected:
+ std::string base_url;
+};
+
+class TransactionResponder : public CivetHandler {
+ public:
+
+ explicit TransactionResponder(const std::string base_url, std::string port_id, bool input_port, bool wrong_uri, bool empty_transaction_uri)
+ : base_url(base_url),
+ wrong_uri(wrong_uri),
+ empty_transaction_uri(empty_transaction_uri),
+ input_port(input_port),
+ port_id(port_id),
+ flow_files_feed_(nullptr) {
+
+ if (input_port) {
+ transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de96";
+ transaction_id_str += std::to_string(transaction_id.load());
+ transaction_id++;
+ } else {
+ transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de95";
+ transaction_id_str += std::to_string(transaction_id_output.load());
+ transaction_id_output++;
+ }
+ }
+
+ bool handlePost(CivetServer *server, struct mg_connection *conn) {
+ std::string site2site_rest_resp = "";
+ std::stringstream headers;
+ headers << "HTTP/1.1 201 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nx-location-uri-intent: ";
+ if (wrong_uri)
+ headers << "ohstuff\r\n";
+ else
+ headers << "transaction-url\r\n";
+
+ std::string port_type;
+
+ if (input_port)
+ port_type = "input-ports";
+ else
+ port_type = "output-ports";
+ if (!empty_transaction_uri)
+ headers << "Location: " << base_url << "/site-to-site/" << port_type << "/" << port_id << "/transactions/" << transaction_id_str << "\r\n";
+ headers << "Connection: close\r\n\r\n";
+ mg_printf(conn, "%s", headers.str().c_str());
+ mg_printf(conn, "%s", site2site_rest_resp.c_str());
+ return true;
+ }
+
+ void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) {
+ flow_files_feed_ = feed;
+ }
+
+ std::string getTransactionId() {
+ return transaction_id_str;
+ }
+ protected:
+ moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
+ std::string transaction_id_str;
+ std::string base_url;bool wrong_uri;bool empty_transaction_uri;bool input_port;
+ std::string port_id;
+};
+
+class FlowFileResponder : public CivetHandler {
+
+ moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> flow_files_;
+ moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
+
+ public:
+
+ explicit FlowFileResponder(bool input_port, bool wrong_uri, bool invalid_checksum)
+ : wrong_uri(wrong_uri),
+ input_port(input_port),
+ flow_files_feed_(nullptr),
+ invalid_checksum(invalid_checksum) {
+ }
+
+ moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *getFlows() {
+ return &flow_files_;
+ }
+
+ void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) {
+ flow_files_feed_ = feed;
+ }
+
+ bool handlePost(CivetServer *server, struct mg_connection *conn) {
+ std::string site2site_rest_resp = "";
+ std::stringstream headers;
+
+ if (!wrong_uri) {
+ minifi::io::CivetStream civet_stream(conn);
+ minifi::io::CRCStream<minifi::io::CivetStream> stream(&civet_stream);
+ uint32_t num_attributes;
+ uint64_t total_size = 0;
+ total_size += stream.read(num_attributes);
+
+ auto flow = std::make_shared<FlowObj>();
+
+ for (int i = 0; i < num_attributes; i++) {
+ std::string name, value;
+ total_size += stream.readUTF(name, true);
+ total_size += stream.readUTF(value, true);
+ flow->attributes[name] = value;
+ }
+ uint64_t length;
+ total_size += stream.read(length);
+
+ total_size += length;
+ flow->data.resize(length);
+ flow->total_size = total_size;
+
+ assert(stream.readData(flow->data.data(), length) == length);
+
+ assert(flow->attributes["path"] == ".");
+ assert(!flow->attributes["uuid"].empty());
+ assert(!flow->attributes["filename"].empty());
+
+ if (!invalid_checksum) {
+ site2site_rest_resp = std::to_string(stream.getCRC());
+ flow_files_.enqueue(flow);
+ } else {
+ site2site_rest_resp = "Imawrongchecksumshortandstout";
+ }
+
+ headers << "HTTP/1.1 202 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n";
+ } else {
+ headers << "HTTP/1.1 404\r\nConnection: close\r\n\r\n";
+ }
+
+ mg_printf(conn, "%s", headers.str().c_str());
+ mg_printf(conn, "%s", site2site_rest_resp.c_str());
+ return true;
+ }
+
+ bool handleGet(CivetServer *server, struct mg_connection *conn) {
+
+ if (flow_files_feed_->size_approx() > 0) {
+ std::shared_ptr<FlowObj> flow;
+ uint8_t buf[1];
+ std::vector<std::shared_ptr<FlowObj>> flows;
+ uint64_t total = 0;
+
+ while (flow_files_feed_->try_dequeue(flow)) {
+ flows.push_back(flow);
+ total += flow->total_size;
+ }
+ mg_printf(conn, "HTTP/1.1 200 OK\r\n"
+ "Content-Length: %llu\r\n"
+ "Content-Type: application/octet-stream\r\n"
+ "Connection: close\r\n\r\n",
+ total);
+ minifi::io::BaseStream serializer;
+ minifi::io::CRCStream<minifi::io::BaseStream> stream(&serializer);
+ for (auto flow : flows) {
+ uint32_t num_attributes = flow->attributes.size();
+ stream.write(num_attributes);
+ for (auto entry : flow->attributes) {
+ stream.writeUTF(entry.first);
+ stream.writeUTF(entry.second);
+ }
+ uint64_t length = flow->data.size();
+ stream.write(length);
+ stream.writeData(flow->data.data(), length);
+ }
+ auto ret = mg_write(conn, serializer.getBuffer(), total);
+ } else {
+ std::cout << "Nothing to transfer feed" << std::endl;
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nConnection: "
+ "close\r\nContent-Length: 0\r\n");
+ mg_printf(conn, "Content-Type: text/plain\r\n\r\n");
+
+ }
+ return true;
+ }
+
+ void setFlowUrl(std::string flowUrl) {
+ base_url = flowUrl;
+ }
+
+ protected:
+// base url
+ std::string base_url;
+// set the wrong url
+ bool wrong_uri;
+// we are running an input port
+ bool input_port;
+// invalid checksum is returned.
+ bool invalid_checksum;
+};
+
+class DeleteTransactionResponder : public CivetHandler {
+ public:
+
+ explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, int expected_resp_code)
+ : base_url(base_url),
+ response_code(response_code),
+ flow_files_feed_(nullptr) {
+ expected_resp_code_str = std::to_string(expected_resp_code);
+ }
+
+ explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed)
+ : base_url(base_url),
+ response_code(response_code),
+ flow_files_feed_(feed) {
+ }
+
+ bool handleDelete(CivetServer *server, struct mg_connection *conn) {
+
+ std::string site2site_rest_resp = "";
+ std::stringstream headers;
+ std::string resp;
+ CivetServer::getParam(conn, "responseCode", resp);
+ headers << "HTTP/1.1 " << response_code << "\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\n";
+ headers << "Connection: close\r\n\r\n";
+ mg_printf(conn, "%s", headers.str().c_str());
+ mg_printf(conn, "%s", site2site_rest_resp.c_str());
+ return true;
+ }
+
+ void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) {
+ flow_files_feed_ = feed;
+ }
+
+ protected:
+ moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_;
+ std::string base_url;
+ std::string expected_resp_code_str;
+ std::string response_code;
+};
+
+#endif /* LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/integration/IntegrationBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index a854508..7626cdf 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -41,9 +41,7 @@ int ssl_enable(void *ssl_context, void *user_data) {
return 0;
}
-void waitToVerifyProcessor() {
- std::this_thread::sleep_for(std::chrono::seconds(3));
-}
+
class IntegrationBase {
public:
@@ -66,6 +64,10 @@ class IntegrationBase {
virtual void runAssertions() = 0;
+ virtual void waitToVerifyProcessor() {
+ std::this_thread::sleep_for(std::chrono::seconds(3));
+ }
+
protected:
virtual void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
@@ -160,6 +162,11 @@ void IntegrationBase::setUrl(std::string url, CivetHandler *handler) {
parse_http_components(url, port, scheme, path);
struct mg_callbacks callback;
if (url.find("localhost") != std::string::npos) {
+
+ if (server != nullptr){
+ server->addHandler(path,handler);
+ return;
+ }
if (scheme == "https" && !key_dir.empty()) {
std::string cert = "";
cert = key_dir + "nifi-cert.pem";
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/integration/ProvenanceReportingTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp
index 5845767..7db235f 100644
--- a/libminifi/test/integration/ProvenanceReportingTest.cpp
+++ b/libminifi/test/integration/ProvenanceReportingTest.cpp
@@ -73,7 +73,7 @@ int main(int argc, char **argv) {
std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
ptr.release();
std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
- org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 10001, 1);
+ org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 10005, 1);
controller->load();
controller->start();
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/C2VerifyHeartbeatAndStop.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/C2VerifyHeartbeatAndStop.yml b/libminifi/test/resources/C2VerifyHeartbeatAndStop.yml
new file mode 100644
index 0000000..a2b0747
--- /dev/null
+++ b/libminifi/test/resources/C2VerifyHeartbeatAndStop.yml
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+ name: MiNiFi Flow
+ id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+ - name: invoke
+ id: 2438e3c8-015a-1000-79ca-83af40ec1991
+ class: org.apache.nifi.processors.standard.InvokeHTTP
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 1 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ Properties:
+ HTTP Method: GET
+ Remote URL: http://localhost:10015/geturl
+ - name: LogAttribute
+ id: 2438e3c8-015a-1000-79ca-83af40ec1992
+ class: org.apache.nifi.processors.standard.LogAttribute
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 1 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: response
+ Properties:
+ Log Level: info
+ Log Payload: true
+
+Connections:
+ - name: TransferFilesToRPG
+ id: 2438e3c8-015a-1000-79ca-83af40ec1997
+ source name: invoke
+ source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+ source relationship name: success
+ destination name: LogAttribute
+ destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+ - name: TransferFilesToRPG2
+ id: 2438e3c8-015a-1000-79ca-83af40ec1917
+ source name: LogAttribute
+ source id: 2438e3c8-015a-1000-79ca-83af40ec1992
+ destination name: LogAttribute
+ destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
+ source relationship name: success
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+
+Remote Processing Groups:
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/C2VerifyServeResults.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/C2VerifyServeResults.yml b/libminifi/test/resources/C2VerifyServeResults.yml
new file mode 100644
index 0000000..18db3dd
--- /dev/null
+++ b/libminifi/test/resources/C2VerifyServeResults.yml
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+ name: MiNiFi Flow
+ id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+ - name: invoke
+ id: 2438e3c8-015a-1000-79ca-83af40ec1991
+ class: org.apache.nifi.processors.standard.InvokeHTTP
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 1 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ Properties:
+ HTTP Method: GET
+ Remote URL: http://localhost:10013/geturl
+ - name: LogAttribute
+ id: 2438e3c8-015a-1000-79ca-83af40ec1992
+ class: org.apache.nifi.processors.standard.LogAttribute
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 1 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: response
+ Properties:
+ Log Level: info
+ Log Payload: true
+
+Connections:
+ - name: TransferFilesToRPG
+ id: 2438e3c8-015a-1000-79ca-83af40ec1997
+ source name: invoke
+ source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+ source relationship name: success
+ destination name: LogAttribute
+ destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+ - name: TransferFilesToRPG2
+ id: 2438e3c8-015a-1000-79ca-83af40ec1917
+ source name: LogAttribute
+ source id: 2438e3c8-015a-1000-79ca-83af40ec1992
+ destination name: LogAttribute
+ destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
+ source relationship name: success
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+
+Remote Processing Groups:
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/TestHTTPGetSecure.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestHTTPGetSecure.yml b/libminifi/test/resources/TestHTTPGetSecure.yml
index 1ac82bd..a5a0bee 100644
--- a/libminifi/test/resources/TestHTTPGetSecure.yml
+++ b/libminifi/test/resources/TestHTTPGetSecure.yml
@@ -33,7 +33,7 @@ Processors:
Properties:
SSL Context Service: SSLContextService
HTTP Method: GET
- Remote URL: https://localhost:10003/geturl
+ Remote URL: https://localhost:10004/geturl
Disable Peer Verification: true
- name: LogAttribute
id: 2438e3c8-015a-1000-79ca-83af40ec1992
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/TestHTTPPost.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestHTTPPost.yml b/libminifi/test/resources/TestHTTPPost.yml
index 32e4f42..e4dbf88 100644
--- a/libminifi/test/resources/TestHTTPPost.yml
+++ b/libminifi/test/resources/TestHTTPPost.yml
@@ -43,7 +43,7 @@ Processors:
auto-terminated relationships list:
Properties:
Base Path: urlofchampions
- Listening Port: 10004
+ Listening Port: 10007
- name: Invoke
id: 2438e3c8-015a-1000-79ca-83af40ec1992
class: org.apache.nifi.processors.standard.InvokeHTTP
@@ -58,7 +58,7 @@ Processors:
Properties:
HTTP Method: POST
Content-type: text/html
- Remote URL: http://localhost:10004/urlofchampions
+ Remote URL: http://localhost:10007/urlofchampions
- name: Loggit
id: 2438e3c8-015a-1000-79ca-83af40ec1993
class: org.apache.nifi.processors.standard.LogAttribute
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml b/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml
index 110783c..f31f3e7 100644
--- a/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml
+++ b/libminifi/test/resources/TestHTTPPostChunkedEncoding.yml
@@ -43,7 +43,7 @@ Processors:
auto-terminated relationships list:
Properties:
Base Path: urlofchampions
- Listening Port: 10004
+ Listening Port: 10006
- name: Invoke
id: 2438e3c8-015a-1000-79ca-83af40ec1992
class: org.apache.nifi.processors.standard.InvokeHTTP
@@ -59,7 +59,7 @@ Processors:
HTTP Method: POST
Use Chunked Encoding: true
Content-type: text/html
- Remote URL: http://localhost:10004/urlofchampions
+ Remote URL: http://localhost:10006/urlofchampions
- name: Loggit
id: 2438e3c8-015a-1000-79ca-83af40ec1993
class: org.apache.nifi.processors.standard.LogAttribute
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/TestHTTPSiteToSite.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestHTTPSiteToSite.yml b/libminifi/test/resources/TestHTTPSiteToSite.yml
new file mode 100644
index 0000000..582f48b
--- /dev/null
+++ b/libminifi/test/resources/TestHTTPSiteToSite.yml
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Flow Controller:
+ id: 471deef6-2a6e-4a7d-912a-81cc17e3a205
+ name: MiNiFi Flow
+
+Processors:
+ - name: GetFile
+ id: 471deef6-2a6e-4a7d-912a-81cc17e3a206
+ class: org.apache.nifi.processors.standard.GenerateFlowFile
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 1 sec
+ penalization period: 30 sec
+ yield period: 10 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ Properties:
+ Input Directory: /tmp/site2siteGetFile
+ Keep Source File: true
+ - name: GetFile
+ id: 471deef6-2a6e-4a7d-912a-81cc17e3a205
+ class: org.apache.nifi.processors.standard.LogAttribute
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 1 sec
+ penalization period: 30 sec
+ yield period: 10 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ Properties:
+
+
+Connections:
+ - name: GenerateFlowFileS2S
+ id: 471deef6-2a6e-4a7d-912a-81cc17e3a207
+ source id: 471deef6-2a6e-4a7d-912a-81cc17e3a206
+ source relationship name: success
+ destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+ - name: GenerateFlowFileS2S
+ id: 471deef6-2a6e-4a7d-912a-81cc17e3a207
+ source id: 471deef6-2a6e-4a7d-912a-81cc17e3a203
+ source relationship name: success
+ destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a205
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+
+Remote Processing Groups:
+ - name: NiFi Flow
+ id: 471deef6-2a6e-4a7d-912a-81cc17e3a208
+ url: http://localhost:8082/nifi
+ timeout: 30 secs
+ yield period: 1 sec
+ Input Ports:
+ - id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+ name: From Node A
+ max concurrent tasks: 1
+ use compression: false
+ Properties: # Deviates from spec and will later be removed when this is autonegotiated
+ Port UUID: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+ Port: 8082
+ Host Name: 127.0.0.1
+ Output Ports:
+ - id: 471deef6-2a6e-4a7d-912a-81cc17e3a203
+ name: To Node A
+ max concurrent tasks: 1
+ use compression: false
+ Properties: # Deviates from spec and will later be removed when this is autonegotiated
+ Port UUID: 471deef6-2a6e-4a7d-912a-81cc17e3a203
+ Port: 8082
+ Host Name: 127.0.0.1
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/TestSite2SiteRest.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestSite2SiteRest.yml b/libminifi/test/resources/TestSite2SiteRest.yml
index ca751b5..87534a8 100644
--- a/libminifi/test/resources/TestSite2SiteRest.yml
+++ b/libminifi/test/resources/TestSite2SiteRest.yml
@@ -46,7 +46,7 @@ Connections:
Remote Processing Groups:
- name: NiFi Flow
id: 471deef6-2a6e-4a7d-912a-81cc17e3a208
- url: http://localhost:8082/nifi
+ url: http://localhost:8077/nifi
timeout: 30 secs
yield period: 10 sec
Input Ports:
@@ -56,3 +56,5 @@ Remote Processing Groups:
use compression: false
Properties: # Deviates from spec and will later be removed when this is autonegotiated
Port UUID: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+ Port: 8077
+ Host Name: 127.0.0.1
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/TestSite2SiteRestSecure.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestSite2SiteRestSecure.yml b/libminifi/test/resources/TestSite2SiteRestSecure.yml
index fd530ae..5fa572c 100644
--- a/libminifi/test/resources/TestSite2SiteRestSecure.yml
+++ b/libminifi/test/resources/TestSite2SiteRestSecure.yml
@@ -46,7 +46,7 @@ Connections:
Remote Processing Groups:
- name: NiFi Flow
id: 471deef6-2a6e-4a7d-912a-81cc17e3a208
- url: https://localhost:8082/nifi
+ url: https://localhost:8072/nifi
timeout: 30 secs
yield period: 10 sec
Input Ports:
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/test/resources/ThreadPoolAdjust.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/ThreadPoolAdjust.yml b/libminifi/test/resources/ThreadPoolAdjust.yml
new file mode 100644
index 0000000..8b3c989
--- /dev/null
+++ b/libminifi/test/resources/ThreadPoolAdjust.yml
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+ name: MiNiFi Flow
+ id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+ - name: generate
+ id: 2438e3c8-015a-1000-79ca-83af40ec1991
+ class: org.apache.nifi.processors.standard.GenerateFlowFile
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 1 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ Properties:
+ - name: listen
+ id: 2438e3c8-015a-1000-79ca-83af40ec1994
+ class: org.apache.nifi.processors.standard.ListenHTTP
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 1 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ Properties:
+ Base Path: urlofchampions
+ Listening Port: 10016
+ - name: Invoke
+ id: 2438e3c8-015a-1000-79ca-83af40ec1992
+ class: org.apache.nifi.processors.standard.InvokeHTTP
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 1 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ - success
+ Properties:
+ HTTP Method: POST
+ Use Chunked Encoding: true
+ Content-type: text/html
+ Remote URL: http://localhost:10016/urlofchampions
+ - name: Loggit
+ id: 2438e3c8-015a-1000-79ca-83af40ec1993
+ class: org.apache.nifi.processors.standard.LogAttribute
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 1 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ - success
+ Properties:
+ LogLevel: debug
+
+Connections:
+ - name: GenerateFlowFile/Invoke
+ id: 2438e3c8-015a-1000-79ca-83af40ec1997
+ source name: invoke
+ source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+ source relationship name: success
+ destination name: LogAttribute
+ destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+ - name: Listen/Loggit
+ id: 2438e3c8-015a-1000-79ca-83af40ec1918
+ source id: 2438e3c8-015a-1000-79ca-83af40ec1994
+ destination id: 2438e3c8-015a-1000-79ca-83af40ec1993
+ source relationship name: success
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+Remote Processing Groups:
+