You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/01 20:27:17 UTC

[3/6] nifi-minifi-cpp git commit: MINIFICPP-60: Add initial implementation of Site to Site changes.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/src/Site2SiteClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp
deleted file mode 100644
index 8b8b646..0000000
--- a/libminifi/src/Site2SiteClientProtocol.cpp
+++ /dev/null
@@ -1,1261 +0,0 @@
-/**
- * @file Site2SiteProtocol.cpp
- * Site2SiteProtocol class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <sys/time.h>
-#include <stdio.h>
-#include <time.h>
-#include <chrono>
-#include <map>
-#include <string>
-#include <memory>
-#include <thread>
-#include <random>
-#include <iostream>
-#include <vector>
-#include "io/CRCStream.h"
-#include "Site2SitePeer.h"
-#include "Site2SiteClientProtocol.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-
-std::shared_ptr<utils::IdGenerator> Site2SiteClientProtocol::id_generator_ = utils::IdGenerator::getIdGenerator();
-std::shared_ptr<utils::IdGenerator> Transaction::id_generator_ = utils::IdGenerator::getIdGenerator();
-
-bool Site2SiteClientProtocol::establish() {
-  if (_peerState != IDLE) {
-    logger_->log_error("Site2Site peer state is not idle while try to establish");
-    return false;
-  }
-
-  bool ret = peer_->Open();
-
-  if (!ret) {
-    logger_->log_error("Site2Site peer socket open failed");
-    return false;
-  }
-
-  // Negotiate the version
-  ret = initiateResourceNegotiation();
-
-  if (!ret) {
-    logger_->log_error("Site2Site Protocol Version Negotiation failed");
-    return false;
-  }
-
-  logger_->log_info("Site2Site socket established");
-  _peerState = ESTABLISHED;
-
-  return true;
-}
-
-bool Site2SiteClientProtocol::initiateResourceNegotiation() {
-  // Negotiate the version
-  if (_peerState != IDLE) {
-    logger_->log_error("Site2Site peer state is not idle while initiateResourceNegotiation");
-    return false;
-  }
-
-  logger_->log_info("Negotiate protocol version with destination port %s current version %d", _portIdStr.c_str(), _currentVersion);
-
-  int ret = peer_->writeUTF(this->getResourceName());
-
-  logger_->log_info("result of writing resource name is %i", ret);
-  if (ret <= 0) {
-    logger_->log_debug("result of writing resource name is %i", ret);
-    // tearDown();
-    return false;
-  }
-
-  ret = peer_->write(_currentVersion);
-
-  if (ret <= 0) {
-    logger_->log_info("result of writing version is %i", ret);
-    return false;
-  }
-
-  uint8_t statusCode;
-  ret = peer_->read(statusCode);
-
-  if (ret <= 0) {
-    logger_->log_info("result of writing version status code  %i", ret);
-    return false;
-  }
-  logger_->log_info("status code is %i", statusCode);
-  switch (statusCode) {
-    case RESOURCE_OK:
-      logger_->log_info("Site2Site Protocol Negotiate protocol version OK");
-      return true;
-    case DIFFERENT_RESOURCE_VERSION:
-      uint32_t serverVersion;
-      ret = peer_->read(serverVersion);
-      if (ret <= 0) {
-        return false;
-      }
-      logger_->log_info("Site2Site Server Response asked for a different protocol version %d", serverVersion);
-      for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
-        if (serverVersion >= _supportedVersion[i]) {
-          _currentVersion = _supportedVersion[i];
-          _currentVersionIndex = i;
-          return initiateResourceNegotiation();
-        }
-      }
-      ret = -1;
-      return false;
-    case NEGOTIATED_ABORT:
-      logger_->log_info("Site2Site Negotiate protocol response ABORT");
-      ret = -1;
-      return false;
-    default:
-      logger_->log_info("Negotiate protocol response unknown code %d", statusCode);
-      return true;
-  }
-
-  return true;
-}
-
-bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() {
-  // Negotiate the version
-  if (_peerState != HANDSHAKED) {
-    logger_->log_error("Site2Site peer state is not handshaked while initiateCodecResourceNegotiation");
-    return false;
-  }
-
-  logger_->log_info("Negotiate Codec version with destination port %s current version %d", _portIdStr.c_str(), _currentCodecVersion);
-
-  int ret = peer_->writeUTF(this->getCodecResourceName());
-
-  if (ret <= 0) {
-    logger_->log_debug("result of getCodecResourceName is %i", ret);
-    return false;
-  }
-
-  ret = peer_->write(_currentCodecVersion);
-
-  if (ret <= 0) {
-    logger_->log_debug("result of _currentCodecVersion is %i", ret);
-    return false;
-  }
-
-  uint8_t statusCode;
-  ret = peer_->read(statusCode);
-
-  if (ret <= 0) {
-    return false;
-  }
-
-  switch (statusCode) {
-    case RESOURCE_OK:
-      logger_->log_info("Site2Site Codec Negotiate version OK");
-      return true;
-    case DIFFERENT_RESOURCE_VERSION:
-      uint32_t serverVersion;
-      ret = peer_->read(serverVersion);
-      if (ret <= 0) {
-        return false;
-      }
-      logger_->log_info("Site2Site Server Response asked for a different codec version %d", serverVersion);
-      for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
-        if (serverVersion >= _supportedCodecVersion[i]) {
-          _currentCodecVersion = _supportedCodecVersion[i];
-          _currentCodecVersionIndex = i;
-          return initiateCodecResourceNegotiation();
-        }
-      }
-      ret = -1;
-      return false;
-    case NEGOTIATED_ABORT:
-      logger_->log_info("Site2Site Codec Negotiate response ABORT");
-      ret = -1;
-      return false;
-    default:
-      logger_->log_info("Negotiate Codec response unknown code %d", statusCode);
-      return true;
-  }
-
-  return true;
-}
-
-bool Site2SiteClientProtocol::handShake() {
-  if (_peerState != ESTABLISHED) {
-    logger_->log_error("Site2Site peer state is not established while handshake");
-    return false;
-  }
-  logger_->log_info("Site2Site Protocol Perform hand shake with destination port %s", _portIdStr.c_str());
-  uuid_t uuid;
-  // Generate the global UUID for the com identify
-  id_generator_->generate(uuid);
-  char uuidStr[37];
-  uuid_unparse_lower(uuid, uuidStr);
-  _commsIdentifier = uuidStr;
-
-  int ret = peer_->writeUTF(_commsIdentifier);
-
-  if (ret <= 0) {
-    return false;
-  }
-
-  std::map<std::string, std::string> properties;
-  properties[HandShakePropertyStr[GZIP]] = "false";
-  properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr;
-  properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(this->_timeOut);
-  if (this->_currentVersion >= 5) {
-    if (this->_batchCount > 0)
-      properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string(this->_batchCount);
-    if (this->_batchSize > 0)
-      properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string(this->_batchSize);
-    if (this->_batchDuration > 0)
-      properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(this->_batchDuration);
-  }
-
-  if (_currentVersion >= 3) {
-    ret = peer_->writeUTF(peer_->getURL());
-    if (ret <= 0) {
-      return false;
-    }
-  }
-
-  uint32_t size = properties.size();
-  ret = peer_->write(size);
-  if (ret <= 0) {
-    return false;
-  }
-
-  std::map<std::string, std::string>::iterator it;
-  for (it = properties.begin(); it != properties.end(); it++) {
-    ret = peer_->writeUTF(it->first);
-    if (ret <= 0) {
-      return false;
-    }
-    ret = peer_->writeUTF(it->second);
-    if (ret <= 0) {
-      return false;
-    }
-    logger_->log_info("Site2Site Protocol Send handshake properties %s %s", it->first.c_str(), it->second.c_str());
-  }
-
-  RespondCode code;
-  std::string message;
-
-  ret = this->readRespond(code, message);
-
-  if (ret <= 0) {
-    return false;
-  }
-
-  switch (code) {
-    case PROPERTIES_OK:
-      logger_->log_info("Site2Site HandShake Completed");
-      _peerState = HANDSHAKED;
-      return true;
-    case PORT_NOT_IN_VALID_STATE:
-    case UNKNOWN_PORT:
-    case PORTS_DESTINATION_FULL:
-      logger_->log_error("Site2Site HandShake Failed because destination port, %s, is either invalid or full", _portIdStr);
-      ret = -1;
-      return false;
-    default:
-      logger_->log_info("HandShake Failed because of unknown respond code %d", code);
-      ret = -1;
-      return false;
-  }
-
-  return false;
-}
-
-void Site2SiteClientProtocol::tearDown() {
-  if (_peerState >= ESTABLISHED) {
-    logger_->log_info("Site2Site Protocol tearDown");
-    // need to write shutdown request
-    writeRequestType(SHUTDOWN);
-  }
-
-  std::map<std::string, Transaction *>::iterator it;
-  for (it = _transactionMap.begin(); it != _transactionMap.end(); it++) {
-    delete it->second;
-  }
-  _transactionMap.clear();
-  peer_->Close();
-  _peerState = IDLE;
-}
-
-bool Site2SiteClientProtocol::getPeerList(std::vector<minifi::Site2SitePeerStatus> &peer) {
-  if (establish() && handShake()) {
-    int status = this->writeRequestType(REQUEST_PEER_LIST);
-
-    if (status <= 0) {
-      tearDown();
-      return false;
-    }
-
-    uint32_t number;
-    status = peer_->read(number);
-
-    if (status <= 0) {
-      tearDown();
-      return false;
-    }
-
-    for (int i = 0; i < number; i++) {
-      std::string host;
-      status = peer_->readUTF(host);
-      if (status <= 0) {
-        tearDown();
-        return false;
-      }
-      uint32_t port;
-      status = peer_->read(port);
-      if (status <= 0) {
-        tearDown();
-        return false;
-      }
-      uint8_t secure;
-      status = peer_->read(secure);
-      if (status <= 0) {
-        tearDown();
-        return false;
-      }
-      uint32_t count;
-      status = peer_->read(count);
-      if (status <= 0) {
-        tearDown();
-        return false;
-      }
-      minifi::Site2SitePeerStatus status;
-      status.host_ = host;
-      status.isSecure_ = secure;
-      status.port_ = port;
-      peer.push_back(status);
-      logger_->log_info("Site2Site Peer host %s, port %d, Secure %d", host, port, secure);
-    }
-
-    tearDown();
-    return true;
-  } else {
-    tearDown();
-    return false;
-  }
-}
-
-int Site2SiteClientProtocol::writeRequestType(RequestType type) {
-  if (type >= MAX_REQUEST_TYPE)
-    return -1;
-
-  return peer_->writeUTF(RequestTypeStr[type]);
-}
-
-int Site2SiteClientProtocol::readRequestType(RequestType &type) {
-  std::string requestTypeStr;
-
-  int ret = peer_->readUTF(requestTypeStr);
-
-  if (ret <= 0)
-    return ret;
-
-  for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) {
-    if (RequestTypeStr[i] == requestTypeStr) {
-      type = (RequestType) i;
-      return ret;
-    }
-  }
-
-  return -1;
-}
-
-int Site2SiteClientProtocol::readRespond(RespondCode &code, std::string &message) {
-  uint8_t firstByte;
-
-  int ret = peer_->read(firstByte);
-
-  if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1)
-    return -1;
-
-  uint8_t secondByte;
-
-  ret = peer_->read(secondByte);
-
-  if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2)
-    return -1;
-
-  uint8_t thirdByte;
-
-  ret = peer_->read(thirdByte);
-
-  if (ret <= 0)
-    return ret;
-
-  code = (RespondCode) thirdByte;
-
-  RespondCodeContext *resCode = this->getRespondCodeContext(code);
-
-  if (resCode == NULL) {
-    // Not a valid respond code
-    return -1;
-  }
-  if (resCode->hasDescription) {
-    ret = peer_->readUTF(message);
-    if (ret <= 0)
-      return -1;
-  }
-  return 3 + message.size();
-}
-
-int Site2SiteClientProtocol::writeRespond(RespondCode code, std::string message) {
-  RespondCodeContext *resCode = this->getRespondCodeContext(code);
-
-  if (resCode == NULL) {
-    // Not a valid respond code
-    return -1;
-  }
-
-  uint8_t codeSeq[3];
-  codeSeq[0] = CODE_SEQUENCE_VALUE_1;
-  codeSeq[1] = CODE_SEQUENCE_VALUE_2;
-  codeSeq[2] = (uint8_t) code;
-
-  int ret = peer_->write(codeSeq, 3);
-
-  if (ret != 3)
-    return -1;
-
-  if (resCode->hasDescription) {
-    ret = peer_->writeUTF(message);
-    if (ret > 0) {
-      return (3 + ret);
-    } else {
-      return ret;
-    }
-  } else {
-    return 3;
-  }
-}
-
-bool Site2SiteClientProtocol::negotiateCodec() {
-  if (_peerState != HANDSHAKED) {
-    logger_->log_error("Site2Site peer state is not handshaked while negotiate codec");
-    return false;
-  }
-
-  logger_->log_info("Site2Site Protocol Negotiate Codec with destination port %s", _portIdStr.c_str());
-
-  int status = this->writeRequestType(NEGOTIATE_FLOWFILE_CODEC);
-
-  if (status <= 0) {
-    return false;
-  }
-
-  // Negotiate the codec version
-  bool ret = initiateCodecResourceNegotiation();
-
-  if (!ret) {
-    logger_->log_error("Site2Site Codec Version Negotiation failed");
-    return false;
-  }
-
-  logger_->log_info("Site2Site Codec Completed and move to READY state for data transfer");
-  _peerState = READY;
-
-  return true;
-}
-
-bool Site2SiteClientProtocol::bootstrap() {
-  if (_peerState == READY)
-    return true;
-
-  tearDown();
-
-  if (establish() && handShake() && negotiateCodec()) {
-    logger_->log_info("Site2Site Ready For data transaction");
-    return true;
-  } else {
-    peer_->yield();
-    tearDown();
-    return false;
-  }
-}
-
-Transaction* Site2SiteClientProtocol::createTransaction(std::string &transactionID, TransferDirection direction) {
-  int ret;
-  bool dataAvailable;
-  Transaction *transaction = NULL;
-
-  if (_peerState != READY) {
-    bootstrap();
-  }
-
-  if (_peerState != READY) {
-    return NULL;
-  }
-
-  if (direction == RECEIVE) {
-    ret = writeRequestType(RECEIVE_FLOWFILES);
-
-    if (ret <= 0) {
-      return NULL;
-    }
-
-    RespondCode code;
-    std::string message;
-
-    ret = readRespond(code, message);
-
-    if (ret <= 0) {
-      return NULL;
-    }
-
-    org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(peer_.get());
-    switch (code) {
-      case MORE_DATA:
-        dataAvailable = true;
-        logger_->log_info("Site2Site peer indicates that data is available");
-        transaction = new Transaction(direction, crcstream);
-        _transactionMap[transaction->getUUIDStr()] = transaction;
-        transactionID = transaction->getUUIDStr();
-        transaction->setDataAvailable(dataAvailable);
-        logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str());
-        return transaction;
-      case NO_MORE_DATA:
-        dataAvailable = false;
-        logger_->log_info("Site2Site peer indicates that no data is available");
-        transaction = new Transaction(direction, crcstream);
-        _transactionMap[transaction->getUUIDStr()] = transaction;
-        transactionID = transaction->getUUIDStr();
-        transaction->setDataAvailable(dataAvailable);
-        logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str());
-        return transaction;
-      default:
-        logger_->log_info("Site2Site got unexpected response %d when asking for data", code);
-        return NULL;
-    }
-  } else {
-    ret = writeRequestType(SEND_FLOWFILES);
-
-    if (ret <= 0) {
-      return NULL;
-    } else {
-      org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(peer_.get());
-      transaction = new Transaction(direction, crcstream);
-      _transactionMap[transaction->getUUIDStr()] = transaction;
-      transactionID = transaction->getUUIDStr();
-      logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str());
-      return transaction;
-    }
-  }
-}
-
-bool Site2SiteClientProtocol::receive(std::string transactionID, DataPacket *packet, bool &eof) {
-  int ret;
-  Transaction *transaction = NULL;
-
-  if (_peerState != READY) {
-    bootstrap();
-  }
-
-  if (_peerState != READY) {
-    return false;
-  }
-
-  std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID);
-
-  if (it == _transactionMap.end()) {
-    return false;
-  } else {
-    transaction = it->second;
-  }
-
-  if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) {
-    logger_->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str());
-    return false;
-  }
-
-  if (transaction->getDirection() != RECEIVE) {
-    logger_->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str());
-    return false;
-  }
-
-  if (!transaction->isDataAvailable()) {
-    eof = true;
-    return true;
-  }
-
-  if (transaction->_transfers > 0) {
-    // if we already has transfer before, check to see whether another one is available
-    RespondCode code;
-    std::string message;
-
-    ret = readRespond(code, message);
-
-    if (ret <= 0) {
-      return false;
-    }
-    if (code == CONTINUE_TRANSACTION) {
-      logger_->log_info("Site2Site transaction %s peer indicate continue transaction", transactionID.c_str());
-      transaction->_dataAvailable = true;
-    } else if (code == FINISH_TRANSACTION) {
-      logger_->log_info("Site2Site transaction %s peer indicate finish transaction", transactionID.c_str());
-      transaction->_dataAvailable = false;
-    } else {
-      logger_->log_info("Site2Site transaction %s peer indicate wrong respond code %d", transactionID.c_str(), code);
-      return false;
-    }
-  }
-
-  if (!transaction->isDataAvailable()) {
-    eof = true;
-    return true;
-  }
-
-  // start to read the packet
-  uint32_t numAttributes;
-  ret = transaction->getStream().read(numAttributes);
-  if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) {
-    return false;
-  }
-
-  // read the attributes
-  for (unsigned int i = 0; i < numAttributes; i++) {
-    std::string key;
-    std::string value;
-    ret = transaction->getStream().readUTF(key, true);
-    if (ret <= 0) {
-      return false;
-    }
-    ret = transaction->getStream().readUTF(value, true);
-    if (ret <= 0) {
-      return false;
-    }
-    packet->_attributes[key] = value;
-    logger_->log_info("Site2Site transaction %s receives attribute key %s value %s", transactionID.c_str(), key.c_str(), value.c_str());
-  }
-
-  uint64_t len;
-  ret = transaction->getStream().read(len);
-  if (ret <= 0) {
-    return false;
-  }
-
-  packet->_size = len;
-  transaction->_transfers++;
-  transaction->_state = DATA_EXCHANGED;
-  transaction->_bytes += len;
-  logger_->log_info("Site2Site transaction %s receives flow record %d, total length %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes);
-
-  return true;
-}
-
-int16_t Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, core::ProcessSession *session) {
-  int ret;
-  Transaction *transaction = NULL;
-
-  if (flowFile && !flowFile->getResourceClaim()->exists()) {
-    logger_->log_info("Claim %s does not exist for FlowFile %s", flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr());
-    return -2;
-  }
-
-  if (_peerState != READY) {
-    bootstrap();
-  }
-
-  if (_peerState != READY) {
-    return -1;
-  }
-
-  std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID);
-
-  if (it == _transactionMap.end()) {
-    return -1;
-  } else {
-    transaction = it->second;
-  }
-
-  if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) {
-    logger_->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str());
-    return -1;
-  }
-
-  if (transaction->getDirection() != SEND) {
-    logger_->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str());
-    return -1;
-  }
-
-  if (transaction->_transfers > 0) {
-    ret = writeRespond(CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION");
-    if (ret <= 0) {
-      return -1;
-    }
-  }
-
-  // start to read the packet
-  uint32_t numAttributes = packet->_attributes.size();
-  ret = transaction->getStream().write(numAttributes);
-  if (ret != 4) {
-    return -1;
-  }
-
-  std::map<std::string, std::string>::iterator itAttribute;
-  for (itAttribute = packet->_attributes.begin(); itAttribute != packet->_attributes.end(); itAttribute++) {
-    ret = transaction->getStream().writeUTF(itAttribute->first, true);
-
-    if (ret <= 0) {
-      return -1;
-    }
-    ret = transaction->getStream().writeUTF(itAttribute->second, true);
-    if (ret <= 0) {
-      return -1;
-    }
-    logger_->log_info("Site2Site transaction %s send attribute key %s value %s", transactionID.c_str(), itAttribute->first.c_str(), itAttribute->second.c_str());
-  }
-
-  uint64_t len = 0;
-  if (flowFile) {
-    len = flowFile->getSize();
-    ret = transaction->getStream().write(len);
-    if (ret != 8) {
-      logger_->log_info("ret != 8");
-      return -1;
-    }
-    if (flowFile->getSize() > 0) {
-      Site2SiteClientProtocol::ReadCallback callback(packet);
-      session->read(flowFile, &callback);
-      if (flowFile->getSize() != packet->_size) {
-        logger_->log_info("MisMatched sizes %d %d", flowFile->getSize(), packet->_size);
-        return -2;
-      }
-    }
-    if (packet->payload_.length() == 0 && len == 0) {
-      if (flowFile->getResourceClaim() == nullptr)
-        logger_->log_debug("no claim");
-      else
-        logger_->log_debug("Flowfile empty %s", flowFile->getResourceClaim()->getContentFullPath());
-    }
-  } else if (packet->payload_.length() > 0) {
-    len = packet->payload_.length();
-
-    ret = transaction->getStream().write(len);
-    if (ret != 8) {
-      return -1;
-    }
-
-    ret = transaction->getStream().writeData(reinterpret_cast<uint8_t *>(const_cast<char*>(packet->payload_.c_str())), len);
-    if (ret != len) {
-      logger_->log_info("ret != len");
-      return -1;
-    }
-    packet->_size += len;
-  }
-
-  transaction->_transfers++;
-  transaction->_state = DATA_EXCHANGED;
-  transaction->_bytes += len;
-  logger_->log_info("Site2Site transaction %s send flow record %d, total length %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes);
-
-  return 0;
-}
-
-void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, core::ProcessSession *session) {
-  uint64_t bytes = 0;
-  int transfers = 0;
-  Transaction *transaction = NULL;
-
-  if (_peerState != READY) {
-    bootstrap();
-  }
-
-  if (_peerState != READY) {
-    context->yield();
-    tearDown();
-    throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
-  }
-
-  // Create the transaction
-  std::string transactionID;
-  transaction = createTransaction(transactionID, RECEIVE);
-
-  if (transaction == NULL) {
-    context->yield();
-    tearDown();
-    throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
-  }
-
-  try {
-    while (true) {
-      std::map<std::string, std::string> empty;
-      uint64_t startTime = getTimeMillis();
-      std::string payload;
-      DataPacket packet(this, transaction, empty, payload);
-      bool eof = false;
-
-      if (!receive(transactionID, &packet, eof)) {
-        throw Exception(SITE2SITE_EXCEPTION, "Receive Failed");
-      }
-      if (eof) {
-        // transaction done
-        break;
-      }
-      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
-
-      if (!flowFile) {
-        throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed");
-      }
-      std::map<std::string, std::string>::iterator it;
-      std::string sourceIdentifier;
-      for (it = packet._attributes.begin(); it != packet._attributes.end(); it++) {
-        if (it->first == FlowAttributeKey(UUID))
-          sourceIdentifier = it->second;
-        flowFile->addAttribute(it->first, it->second);
-      }
-
-      if (packet._size > 0) {
-        Site2SiteClientProtocol::WriteCallback callback(&packet);
-        session->write(flowFile, &callback);
-        if (flowFile->getSize() != packet._size) {
-          throw Exception(SITE2SITE_EXCEPTION, "Receive Size Not Right");
-        }
-      }
-      core::Relationship relation;  // undefined relationship
-      uint64_t endTime = getTimeMillis();
-      std::string transitUri = peer_->getURL() + "/" + sourceIdentifier;
-      std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host=" + peer_->getHostName();
-      session->getProvenanceReporter()->receive(flowFile, transitUri, sourceIdentifier, details, endTime - startTime);
-      session->transfer(flowFile, relation);
-      // receive the transfer for the flow record
-      bytes += packet._size;
-      transfers++;
-    }  // while true
-
-    if (!confirm(transactionID)) {
-      throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed");
-    }
-    if (!complete(transactionID)) {
-      throw Exception(SITE2SITE_EXCEPTION, "Complete Transaction Failed");
-    }
-    logger_->log_info("Site2Site transaction %s successfully receive flow record %d, content bytes %d", transactionID.c_str(), transfers, bytes);
-    // we yield the receive if we did not get anything
-    if (transfers == 0)
-      context->yield();
-  } catch (std::exception &exception) {
-    if (transaction)
-      deleteTransaction(transactionID);
-    context->yield();
-    tearDown();
-    logger_->log_debug("Caught Exception %s", exception.what());
-    throw;
-  } catch (...) {
-    if (transaction)
-      deleteTransaction(transactionID);
-    context->yield();
-    tearDown();
-    logger_->log_debug("Caught Exception during Site2SiteClientProtocol::receiveFlowFiles");
-    throw;
-  }
-
-  deleteTransaction(transactionID);
-
-  return;
-}
-
-bool Site2SiteClientProtocol::confirm(std::string transactionID) {
-  int ret;
-  Transaction *transaction = NULL;
-
-  if (_peerState != READY) {
-    bootstrap();
-  }
-
-  if (_peerState != READY) {
-    return false;
-  }
-
-  std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID);
-
-  if (it == _transactionMap.end()) {
-    return false;
-  } else {
-    transaction = it->second;
-  }
-
-  if (transaction->getState() == TRANSACTION_STARTED && !transaction->isDataAvailable() && transaction->getDirection() == RECEIVE) {
-    transaction->_state = TRANSACTION_CONFIRMED;
-    return true;
-  }
-
-  if (transaction->getState() != DATA_EXCHANGED)
-    return false;
-
-  if (transaction->getDirection() == RECEIVE) {
-    if (transaction->isDataAvailable())
-      return false;
-    // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
-    // to peer so that we can verify that the connection is still open. This is a two-phase commit,
-    // which helps to prevent the chances of data duplication. Without doing this, we may commit the
-    // session and then when we send the response back to the peer, the peer may have timed out and may not
-    // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
-    // Critical Section involved in this transaction so that rather than the Critical Section being the
-    // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
-    int64_t crcValue = transaction->getCRC();
-    std::string crc = std::to_string(crcValue);
-    logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s", transaction->getCRC(), transactionID.c_str());
-    ret = writeRespond(CONFIRM_TRANSACTION, crc);
-    if (ret <= 0)
-      return false;
-    RespondCode code;
-    std::string message;
-    readRespond(code, message);
-    if (ret <= 0)
-      return false;
-
-    if (code == CONFIRM_TRANSACTION) {
-      logger_->log_info("Site2Site transaction %s peer confirm transaction", transactionID.c_str());
-      transaction->_state = TRANSACTION_CONFIRMED;
-      return true;
-    } else if (code == BAD_CHECKSUM) {
-      logger_->log_info("Site2Site transaction %s peer indicate bad checksum", transactionID.c_str());
-      /*
-       transaction->_state = TRANSACTION_CONFIRMED;
-       return true; */
-      return false;
-    } else {
-      logger_->log_info("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code);
-      return false;
-    }
-  } else {
-    logger_->log_info("Site2Site Send FINISH TRANSACTION for transaction %s", transactionID.c_str());
-    ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION");
-    if (ret <= 0)
-      return false;
-    RespondCode code;
-    std::string message;
-    readRespond(code, message);
-    if (ret <= 0)
-      return false;
-
-    // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
-    if (code == CONFIRM_TRANSACTION) {
-      logger_->log_info("Site2Site transaction %s peer confirm transaction with CRC %s", transactionID.c_str(), message.c_str());
-      if (this->_currentVersion > 3) {
-        int64_t crcValue = transaction->getCRC();
-        std::string crc = std::to_string(crcValue);
-        if (message == crc) {
-          logger_->log_info("Site2Site transaction %s CRC matched", transactionID.c_str());
-          ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
-          if (ret <= 0)
-            return false;
-          transaction->_state = TRANSACTION_CONFIRMED;
-          return true;
-        } else {
-          logger_->log_info("Site2Site transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str());
-          ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM");
-          return false;
-        }
-      }
-      ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
-      if (ret <= 0)
-        return false;
-      transaction->_state = TRANSACTION_CONFIRMED;
-      return true;
-    } else {
-      logger_->log_info("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code);
-      return false;
-    }
-    return false;
-  }
-}
-
-void Site2SiteClientProtocol::cancel(std::string transactionID) {
-  Transaction *transaction = NULL;
-
-  if (_peerState != READY) {
-    return;
-  }
-
-  std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID);
-
-  if (it == _transactionMap.end()) {
-    return;
-  } else {
-    transaction = it->second;
-  }
-
-  if (transaction->getState() == TRANSACTION_CANCELED || transaction->getState() == TRANSACTION_COMPLETED || transaction->getState() == TRANSACTION_ERROR) {
-    return;
-  }
-
-  this->writeRespond(CANCEL_TRANSACTION, "Cancel");
-  transaction->_state = TRANSACTION_CANCELED;
-
-  tearDown();
-  return;
-}
-
-void Site2SiteClientProtocol::deleteTransaction(std::string transactionID) {
-  Transaction *transaction = NULL;
-
-  std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID);
-
-  if (it == _transactionMap.end()) {
-    return;
-  } else {
-    transaction = it->second;
-  }
-
-  logger_->log_info("Site2Site delete transaction %s", transaction->getUUIDStr().c_str());
-  delete transaction;
-  _transactionMap.erase(transactionID);
-}
-
-void Site2SiteClientProtocol::error(std::string transactionID) {
-  Transaction *transaction = NULL;
-
-  std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID);
-
-  if (it == _transactionMap.end()) {
-    return;
-  } else {
-    transaction = it->second;
-  }
-
-  transaction->_state = TRANSACTION_ERROR;
-  tearDown();
-  return;
-}
-
-// Complete the transaction
-bool Site2SiteClientProtocol::complete(std::string transactionID) {
-  int ret;
-  Transaction *transaction = NULL;
-
-  if (_peerState != READY) {
-    bootstrap();
-  }
-
-  if (_peerState != READY) {
-    return false;
-  }
-
-  std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID);
-
-  if (it == _transactionMap.end()) {
-    return false;
-  } else {
-    transaction = it->second;
-  }
-
-  if (transaction->getState() != TRANSACTION_CONFIRMED) {
-    return false;
-  }
-
-  if (transaction->getDirection() == RECEIVE) {
-    if (transaction->_transfers == 0) {
-      transaction->_state = TRANSACTION_COMPLETED;
-      return true;
-    } else {
-      logger_->log_info("Site2Site transaction %s send finished", transactionID.c_str());
-      ret = this->writeRespond(TRANSACTION_FINISHED, "Finished");
-      if (ret <= 0) {
-        return false;
-      } else {
-        transaction->_state = TRANSACTION_COMPLETED;
-        return true;
-      }
-    }
-  } else {
-    RespondCode code;
-    std::string message;
-    int ret;
-
-    ret = readRespond(code, message);
-
-    if (ret <= 0)
-      return false;
-
-    if (code == TRANSACTION_FINISHED) {
-      logger_->log_info("Site2Site transaction %s peer finished transaction", transactionID.c_str());
-      transaction->_state = TRANSACTION_COMPLETED;
-      return true;
-    } else {
-      logger_->log_info("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code);
-      return false;
-    }
-  }
-}
-
-void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, core::ProcessSession *session) {
-  std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast<FlowFileRecord>(session->get());
-
-  Transaction *transaction = NULL;
-
-  if (!flow) {
-    return;
-  }
-
-  if (_peerState != READY) {
-    bootstrap();
-  }
-
-  if (_peerState != READY) {
-    context->yield();
-    tearDown();
-    throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
-  }
-
-  // Create the transaction
-  std::string transactionID;
-  transaction = createTransaction(transactionID, SEND);
-
-  if (transaction == NULL) {
-    context->yield();
-    tearDown();
-    throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
-  }
-
-  bool continueTransaction = true;
-  uint64_t startSendingNanos = getTimeNano();
-
-  try {
-    while (continueTransaction) {
-      uint64_t startTime = getTimeMillis();
-      std::string payload;
-      DataPacket packet(this, transaction, flow->getAttributes(), payload);
-
-      int16_t resp = send(transactionID, &packet, flow, session);
-      if (resp == -1) {
-        throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
-      }
-
-      logger_->log_info("Site2Site transaction %s send flow record %s", transactionID.c_str(), flow->getUUIDStr().c_str());
-      if (resp == 0) {
-        uint64_t endTime = getTimeMillis();
-        std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
-        std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + peer_->getHostName();
-        session->getProvenanceReporter()->send(flow, transitUri, details, endTime - startTime, false);
-      }
-      session->remove(flow);
-
-      uint64_t transferNanos = getTimeNano() - startSendingNanos;
-      if (transferNanos > _batchSendNanos)
-        break;
-
-      flow = std::static_pointer_cast<FlowFileRecord>(session->get());
-
-      if (!flow) {
-        continueTransaction = false;
-      }
-    }  // while true
-
-    if (!confirm(transactionID)) {
-      std::stringstream ss;
-      ss << "Confirm Failed for " << transactionID;
-      throw Exception(SITE2SITE_EXCEPTION, ss.str().c_str());
-    }
-    if (!complete(transactionID)) {
-      std::stringstream ss;
-      ss << "Complete Failed for " << transactionID;
-      throw Exception(SITE2SITE_EXCEPTION, ss.str().c_str());
-    }
-    logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes);
-  } catch (std::exception &exception) {
-    if (transaction)
-      deleteTransaction(transactionID);
-    context->yield();
-    tearDown();
-    logger_->log_debug("Caught Exception %s", exception.what());
-    throw;
-  } catch (...) {
-    if (transaction)
-      deleteTransaction(transactionID);
-    context->yield();
-    tearDown();
-    logger_->log_debug("Caught Exception during Site2SiteClientProtocol::transferFlowFiles");
-    throw;
-  }
-
-  deleteTransaction(transactionID);
-
-  return;
-}
-
-void Site2SiteClientProtocol::transferString(core::ProcessContext *context, core::ProcessSession *session, std::string &payload, std::map<std::string, std::string> attributes) {
-  Transaction *transaction = NULL;
-
-  if (payload.length() <= 0)
-    return;
-
-  if (_peerState != READY) {
-    bootstrap();
-  }
-
-  if (_peerState != READY) {
-    context->yield();
-    tearDown();
-    throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
-  }
-
-  // Create the transaction
-  std::string transactionID;
-  transaction = createTransaction(transactionID, SEND);
-
-  if (transaction == NULL) {
-    context->yield();
-    tearDown();
-    throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
-  }
-
-  try {
-    DataPacket packet(this, transaction, attributes, payload);
-
-    int16_t resp = send(transactionID, &packet, nullptr, session);
-    if (resp == -1) {
-      throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
-    }
-    logger_->log_info("Site2Site transaction %s send bytes length %d", transactionID.c_str(), payload.length());
-
-    if (!confirm(transactionID)) {
-      throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
-    }
-    if (!complete(transactionID)) {
-      throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
-    }
-    logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes);
-  } catch (std::exception &exception) {
-    if (transaction)
-      deleteTransaction(transactionID);
-    context->yield();
-    tearDown();
-    logger_->log_debug("Caught Exception %s", exception.what());
-    throw;
-  } catch (...) {
-    if (transaction)
-      deleteTransaction(transactionID);
-    context->yield();
-    tearDown();
-    logger_->log_debug("Caught Exception during Site2SiteClientProtocol::transferBytes");
-    throw;
-  }
-
-  deleteTransaction(transactionID);
-
-  return;
-}
-
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/src/Site2SitePeer.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SitePeer.cpp b/libminifi/src/Site2SitePeer.cpp
deleted file mode 100644
index 7c46564..0000000
--- a/libminifi/src/Site2SitePeer.cpp
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * @file Site2SitePeer.cpp
- * Site2SitePeer class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "Site2SitePeer.h"
-#include <sys/time.h>
-#include <stdio.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <random>
-#include <memory>
-#include <iostream>
-#include "io/ClientSocket.h"
-#include "io/validation.h"
-#include "FlowController.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-
-bool Site2SitePeer::Open() {
-  if (IsNullOrEmpty(host_))
-    return false;
-
-  if (stream_->initialize() < 0)
-    return false;
-
-  uint16_t data_size = sizeof MAGIC_BYTES;
-
-  if (stream_->writeData(reinterpret_cast<uint8_t *>(const_cast<char*>(MAGIC_BYTES)), data_size) != data_size) {
-    return false;
-  }
-
-  return true;
-}
-
-void Site2SitePeer::Close() {
-  if (stream_ != nullptr)
-    stream_->closeStream();
-}
-
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index d35f283..2bceac5 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -229,7 +229,7 @@ void Processor::onTrigger(ProcessContext *context, ProcessSessionFactory *sessio
   }
 }
 
-void Processor::onTrigger(std::shared_ptr<ProcessContext> context, std::shared_ptr<ProcessSessionFactory> sessionFactory) {
+void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const std::shared_ptr<ProcessSessionFactory> &sessionFactory) {
   auto session = sessionFactory->createSession();
 
   try {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
index c556701..542d026 100644
--- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
+++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
@@ -53,8 +53,8 @@ void SiteToSiteProvenanceReportingTask::initialize() {
   RemoteProcessorGroupPort::initialize();
 }
 
-void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext *context, core::ProcessSession *session, std::vector<std::shared_ptr<core::SerializableComponent>> &records,
-                                                      std::string &report) {
+void SiteToSiteProvenanceReportingTask::getJsonReport(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session,
+                                                      std::vector<std::shared_ptr<core::SerializableComponent>> &records, std::string &report) {
   Json::Value array;
   for (auto sercomp : records) {
     std::shared_ptr<provenance::ProvenanceEventRecord> record = std::dynamic_pointer_cast<provenance::ProvenanceEventRecord>(sercomp);
@@ -103,37 +103,18 @@ void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext *cont
   report = writer.write(array);
 }
 
-void SiteToSiteProvenanceReportingTask::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+void SiteToSiteProvenanceReportingTask::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
 }
 
-void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
-  std::unique_ptr<Site2SiteClientProtocol> protocol_ = getNextProtocol(true);
-
-  if (!protocol_) {
-    context->yield();
-    return;
-  }
-
+void SiteToSiteProvenanceReportingTask::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   logger_->log_debug("SiteToSiteProvenanceReportingTask -- onTrigger");
-
-  if (!protocol_->bootstrap()) {
-    // bootstrap the client protocol if needeed
-    context->yield();
-    std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(context->getProcessorNode()->getProcessor());
-    logger_->log_error("Site2Site bootstrap failed yield period %d peer ", processor->getYieldPeriodMsec());
-    returnProtocol(std::move(protocol_));
-    return;
-  }
-
   std::vector<std::shared_ptr<core::SerializableComponent>> records;
-
   logger_->log_debug("batch size %d records", batch_size_);
   size_t deserialized = batch_size_;
   std::shared_ptr<core::Repository> repo = context->getProvenanceRepository();
   std::function<std::shared_ptr<core::SerializableComponent>()> constructor = []() {return std::make_shared<provenance::ProvenanceEventRecord>();};
   if (!repo->DeSerialize(records, deserialized, constructor) && deserialized == 0) {
     logger_->log_debug("Not sending because deserialized is %d", deserialized);
-    returnProtocol(std::move(protocol_));
     return;
   }
 
@@ -143,13 +124,21 @@ void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context,
   std::string jsonStr;
   this->getJsonReport(context, session, records, jsonStr);
   if (jsonStr.length() <= 0) {
-    returnProtocol(std::move(protocol_));
+    return;
+  }
+
+  auto protocol_ = getNextProtocol(true);
+
+  if (!protocol_) {
+    context->yield();
     return;
   }
 
   try {
     std::map<std::string, std::string> attributes;
-    protocol_->transferString(context, session, jsonStr, attributes);
+    if (!protocol_->transmitPayload(context, session, jsonStr, attributes)) {
+      context->yield();
+    }
   } catch (...) {
     // if transfer bytes failed, return instead of purge the provenance records
     return;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 370da21..aab22f7 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -296,7 +296,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
 
             YAML::Node currPort = portIter->as<YAML::Node>();
 
-            this->parsePortYaml(&currPort, group, SEND);
+            this->parsePortYaml(&currPort, group, sitetosite::SEND);
           }  // for node
         }
         YAML::Node outputPorts = currRpgNode["Output Ports"].as<YAML::Node>();
@@ -306,7 +306,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
 
             YAML::Node currPort = portIter->as<YAML::Node>();
 
-            this->parsePortYaml(&currPort, group, RECEIVE);
+            this->parsePortYaml(&currPort, group, sitetosite::RECEIVE);
           }  // for node
         }
       }
@@ -567,7 +567,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
   }
 }
 
-void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *parent, TransferDirection direction) {
+void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *parent, sitetosite::TransferDirection direction) {
   uuid_t uuid;
   std::shared_ptr<core::Processor> processor = NULL;
   std::shared_ptr<minifi::RemoteProcessorGroupPort> port = NULL;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/src/io/DataStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/DataStream.cpp b/libminifi/src/io/DataStream.cpp
index 92c7cda..14ff5f0 100644
--- a/libminifi/src/io/DataStream.cpp
+++ b/libminifi/src/io/DataStream.cpp
@@ -32,6 +32,8 @@ namespace minifi {
 namespace io {
 
 int DataStream::writeData(uint8_t *value, int size) {
+  if (value == nullptr)
+    return 0;
   std::copy(value, value + size, std::back_inserter(buffer));
   return size;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/src/io/FileStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index 93a38df..2b605eb 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -136,14 +136,14 @@ int FileStream::readData(uint8_t *buf, int buflen) {
     file_stream_->read(reinterpret_cast<char*>(buf), buflen);
     if ((file_stream_->rdstate() & (file_stream_->eofbit | file_stream_->failbit)) != 0) {
       file_stream_->clear();
-      size_t prev_offset = offset_;
       file_stream_->seekg(0, file_stream_->end);
       file_stream_->seekp(0, file_stream_->end);
       int len = file_stream_->tellg();
+      size_t ret = len - offset_;
       offset_ = len;
       length_ = len;
       logger_->log_info("%s eof bit, ended at %d", path_, offset_);
-      return offset_-prev_offset;
+      return ret;
     } else {
       offset_ += buflen;
       file_stream_->seekp(offset_);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/src/io/NonConvertingStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/NonConvertingStream.cpp b/libminifi/src/io/NonConvertingStream.cpp
new file mode 100644
index 0000000..f40bede
--- /dev/null
+++ b/libminifi/src/io/NonConvertingStream.cpp
@@ -0,0 +1,190 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "io/NonConvertingStream.h"
+#include <vector>
+#include <string>
+#include "io/Serializable.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+/**
+ * write 4 bytes to stream
+ * @param base_value non encoded value
+ * @param stream output stream
+ * @param is_little_endian endianness determination
+ * @return resulting write size
+ **/
+int NonConvertingStream::write(uint32_t base_value, bool is_little_endian) {
+  return Serializable::write(base_value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian);
+}
+
+int NonConvertingStream::writeData(uint8_t *value, int size) {
+  if (composable_stream_ == this) {
+    return DataStream::writeData(value, size);
+  } else {
+    return composable_stream_->writeData(value, size);
+  }
+}
+
+/**
+ * write 2 bytes to stream
+ * @param base_value non encoded value
+ * @param stream output stream
+ * @param is_little_endian endianness determination
+ * @return resulting write size
+ **/
+int NonConvertingStream::write(uint16_t base_value, bool is_little_endian) {
+  return Serializable::write(base_value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian);
+}
+
+/**
+ * write valueto stream
+ * @param value non encoded value
+ * @param len length of value
+ * @param strema output stream
+ * @return resulting write size
+ **/
+int NonConvertingStream::write(uint8_t *value, int len) {
+  return Serializable::write(value, len, reinterpret_cast<DataStream*>(composable_stream_));
+}
+
+/**
+ * write 8 bytes to stream
+ * @param base_value non encoded value
+ * @param stream output stream
+ * @param is_little_endian endianness determination
+ * @return resulting write size
+ **/
+int NonConvertingStream::write(uint64_t base_value, bool is_little_endian) {
+  return Serializable::write(base_value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian);
+}
+
+/**
+ * write bool to stream
+ * @param value non encoded value
+ * @return resulting write size
+ **/
+int NonConvertingStream::write(bool value) {
+  uint8_t v = value;
+  return Serializable::write(v);
+}
+
+/**
+ * write UTF string to stream
+ * @param str string to write
+ * @return resulting write size
+ **/
+int NonConvertingStream::writeUTF(std::string str, bool widen) {
+  return Serializable::writeUTF(str, reinterpret_cast<DataStream*>(composable_stream_), widen);
+}
+
+/**
+ * reads a byte from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int NonConvertingStream::read(uint8_t &value) {
+  return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_));
+}
+
+/**
+ * reads two bytes from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int NonConvertingStream::read(uint16_t &base_value, bool is_little_endian) {
+  return Serializable::read(base_value, reinterpret_cast<DataStream*>(composable_stream_));
+}
+
+/**
+ * reads a byte from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int NonConvertingStream::read(char &value) {
+  return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_));
+}
+
+/**
+ * reads a byte array from the stream
+ * @param value reference in which will set the result
+ * @param len length to read
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int NonConvertingStream::read(uint8_t *value, int len) {
+  return Serializable::read(value, len, reinterpret_cast<DataStream*>(composable_stream_));
+}
+
+/**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+int NonConvertingStream::readData(std::vector<uint8_t> &buf, int buflen) {
+  return Serializable::read(&buf[0], buflen, reinterpret_cast<DataStream*>(composable_stream_));
+}
+/**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+int NonConvertingStream::readData(uint8_t *buf, int buflen) {
+  return Serializable::read(buf, buflen, reinterpret_cast<DataStream*>(composable_stream_));
+}
+
+/**
+ * reads four bytes from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int NonConvertingStream::read(uint32_t &value, bool is_little_endian) {
+  return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian);
+}
+
+/**
+ * reads eight byte from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int NonConvertingStream::read(uint64_t &value, bool is_little_endian) {
+  return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian);
+}
+
+/**
+ * read UTF from stream
+ * @param str reference string
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int NonConvertingStream::readUTF(std::string &str, bool widen) {
+  return Serializable::readUTF(str, reinterpret_cast<DataStream*>(composable_stream_), widen);
+}
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/src/io/Serializable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp
index 5e57c80..342d2ba 100644
--- a/libminifi/src/io/Serializable.cpp
+++ b/libminifi/src/io/Serializable.cpp
@@ -126,7 +126,6 @@ int Serializable::readUTF(std::string &str, DataStream *stream, bool widen) {
     uint16_t shortLength = 0;
     ret = read(shortLength, stream);
     utflen = shortLength;
-
     if (ret <= 0)
       return ret;
   } else {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/src/processors/GetTCP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetTCP.cpp b/libminifi/src/processors/GetTCP.cpp
index bcf3d58..0db082f 100644
--- a/libminifi/src/processors/GetTCP.cpp
+++ b/libminifi/src/processors/GetTCP.cpp
@@ -98,7 +98,7 @@ void GetTCP::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void GetTCP::onSchedule(std::shared_ptr<core::ProcessContext> context, std::shared_ptr<core::ProcessSessionFactory> sessionFactory) {
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   std::string value;
   stay_connected_ = true;
   if (context->getProperty(EndpointList.getName(), value)) {
@@ -222,7 +222,7 @@ void GetTCP::notifyStop() {
     socket_ring_buffer_.try_dequeue(socket_ptr);
   }
 }
-void GetTCP::onTrigger(std::shared_ptr<core::ProcessContext> context, std::shared_ptr<core::ProcessSession> session) {
+void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   // Perform directory list
   metrics_->iterations_++;
   std::lock_guard<std::mutex> lock(mutex_);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/src/sitetosite/Peer.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/sitetosite/Peer.cpp b/libminifi/src/sitetosite/Peer.cpp
new file mode 100644
index 0000000..e8678d7
--- /dev/null
+++ b/libminifi/src/sitetosite/Peer.cpp
@@ -0,0 +1,65 @@
+/**
+ * @file Site2SitePeer.cpp
+ * Site2SitePeer class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <sys/time.h>
+#include <stdio.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <random>
+#include <memory>
+#include <iostream>
+
+#include "sitetosite/Peer.h"
+#include "io/ClientSocket.h"
+#include "io/validation.h"
+#include "FlowController.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sitetosite {
+
+bool SiteToSitePeer::Open() {
+  if (IsNullOrEmpty(host_))
+    return false;
+
+  if (stream_->initialize() < 0)
+    return false;
+
+  uint16_t data_size = sizeof MAGIC_BYTES;
+
+  if (stream_->writeData(reinterpret_cast<uint8_t *>(const_cast<char*>(MAGIC_BYTES)), data_size) != data_size) {
+    return false;
+  }
+
+  return true;
+}
+
+void SiteToSitePeer::Close() {
+  if (stream_ != nullptr)
+    stream_->closeStream();
+}
+
+} /* namespace sitetosite */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/726dc403/libminifi/src/sitetosite/RawSocketProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp
new file mode 100644
index 0000000..e162a36
--- /dev/null
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -0,0 +1,629 @@
+/**
+ * Site2SiteProtocol class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <sys/time.h>
+#include <stdio.h>
+#include <time.h>
+#include <chrono>
+#include <utility>
+#include <map>
+#include <string>
+#include <memory>
+#include <thread>
+#include <random>
+#include <iostream>
+#include <vector>
+
+#include "sitetosite/RawSocketProtocol.h"
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sitetosite {
+
+std::shared_ptr<utils::IdGenerator> RawSiteToSiteClient::id_generator_ = utils::IdGenerator::getIdGenerator();
+std::shared_ptr<utils::IdGenerator> Transaction::id_generator_ = utils::IdGenerator::getIdGenerator();
+
+bool RawSiteToSiteClient::establish() {
+  if (peer_state_ != IDLE) {
+    logger_->log_error("Site2Site peer state is not idle while try to establish");
+    return false;
+  }
+
+  bool ret = peer_->Open();
+
+  if (!ret) {
+    logger_->log_error("Site2Site peer socket open failed");
+    return false;
+  }
+
+  // Negotiate the version
+  ret = initiateResourceNegotiation();
+
+  if (!ret) {
+    logger_->log_error("Site2Site Protocol Version Negotiation failed");
+    return false;
+  }
+
+  logger_->log_info("Site2Site socket established");
+  peer_state_ = ESTABLISHED;
+
+  return true;
+}
+
+bool RawSiteToSiteClient::initiateResourceNegotiation() {
+  // Negotiate the version
+  if (peer_state_ != IDLE) {
+    logger_->log_error("Site2Site peer state is not idle while initiateResourceNegotiation");
+    return false;
+  }
+
+  logger_->log_info("Negotiate protocol version with destination port %s current version %d", port_id_str_, _currentVersion);
+
+  int ret = peer_->writeUTF(getResourceName());
+
+  logger_->log_info("result of writing resource name is %i", ret);
+  if (ret <= 0) {
+    logger_->log_debug("result of writing resource name is %i", ret);
+    // tearDown();
+    return false;
+  }
+
+  ret = peer_->write(_currentVersion);
+
+  if (ret <= 0) {
+    logger_->log_info("result of writing version is %i", ret);
+    return false;
+  }
+
+  uint8_t statusCode;
+  ret = peer_->read(statusCode);
+
+  if (ret <= 0) {
+    logger_->log_info("result of writing version status code  %i", ret);
+    return false;
+  }
+  logger_->log_info("status code is %i", statusCode);
+  switch (statusCode) {
+    case RESOURCE_OK:
+      logger_->log_info("Site2Site Protocol Negotiate protocol version OK");
+      return true;
+    case DIFFERENT_RESOURCE_VERSION:
+      uint32_t serverVersion;
+      ret = peer_->read(serverVersion);
+      if (ret <= 0) {
+        return false;
+      }
+      logger_->log_info("Site2Site Server Response asked for a different protocol version %d", serverVersion);
+      for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
+        if (serverVersion >= _supportedVersion[i]) {
+          _currentVersion = _supportedVersion[i];
+          _currentVersionIndex = i;
+          return initiateResourceNegotiation();
+        }
+      }
+      ret = -1;
+      return false;
+    case NEGOTIATED_ABORT:
+      logger_->log_info("Site2Site Negotiate protocol response ABORT");
+      ret = -1;
+      return false;
+    default:
+      logger_->log_info("Negotiate protocol response unknown code %d", statusCode);
+      return true;
+  }
+
+  return true;
+}
+
+bool RawSiteToSiteClient::initiateCodecResourceNegotiation() {
+  // Negotiate the version
+  if (peer_state_ != HANDSHAKED) {
+    logger_->log_error("Site2Site peer state is not handshaked while initiateCodecResourceNegotiation");
+    return false;
+  }
+
+  logger_->log_info("Negotiate Codec version with destination port %s current version %d", port_id_str_, _currentCodecVersion);
+
+  int ret = peer_->writeUTF(getCodecResourceName());
+
+  if (ret <= 0) {
+    logger_->log_debug("result of getCodecResourceName is %i", ret);
+    return false;
+  }
+
+  ret = peer_->write(_currentCodecVersion);
+
+  if (ret <= 0) {
+    logger_->log_debug("result of _currentCodecVersion is %i", ret);
+    return false;
+  }
+
+  uint8_t statusCode;
+  ret = peer_->read(statusCode);
+
+  if (ret <= 0) {
+    return false;
+  }
+
+  switch (statusCode) {
+    case RESOURCE_OK:
+      logger_->log_info("Site2Site Codec Negotiate version OK");
+      return true;
+    case DIFFERENT_RESOURCE_VERSION:
+      uint32_t serverVersion;
+      ret = peer_->read(serverVersion);
+      if (ret <= 0) {
+        return false;
+      }
+      logger_->log_info("Site2Site Server Response asked for a different codec version %d", serverVersion);
+      for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
+        if (serverVersion >= _supportedCodecVersion[i]) {
+          _currentCodecVersion = _supportedCodecVersion[i];
+          _currentCodecVersionIndex = i;
+          return initiateCodecResourceNegotiation();
+        }
+      }
+      ret = -1;
+      return false;
+    case NEGOTIATED_ABORT:
+      logger_->log_info("Site2Site Codec Negotiate response ABORT");
+      ret = -1;
+      return false;
+    default:
+      logger_->log_info("Negotiate Codec response unknown code %d", statusCode);
+      return true;
+  }
+
+  return true;
+}
+
+bool RawSiteToSiteClient::handShake() {
+  if (peer_state_ != ESTABLISHED) {
+    logger_->log_error("Site2Site peer state is not established while handshake");
+    return false;
+  }
+  logger_->log_info("Site2Site Protocol Perform hand shake with destination port %s", port_id_str_);
+  uuid_t uuid;
+  // Generate the global UUID for the com identify
+  id_generator_->generate(uuid);
+  char uuidStr[37];
+  uuid_unparse_lower(uuid, uuidStr);
+  _commsIdentifier = uuidStr;
+
+  int ret = peer_->writeUTF(_commsIdentifier);
+
+  if (ret <= 0) {
+    return false;
+  }
+
+  std::map<std::string, std::string> properties;
+  properties[HandShakePropertyStr[GZIP]] = "false";
+  properties[HandShakePropertyStr[PORT_IDENTIFIER]] = port_id_str_;
+  properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(_timeOut);
+  if (_currentVersion >= 5) {
+    if (_batchCount > 0)
+      properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string(_batchCount);
+    if (_batchSize > 0)
+      properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string(_batchSize);
+    if (_batchDuration > 0)
+      properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(_batchDuration);
+  }
+
+  if (_currentVersion >= 3) {
+    ret = peer_->writeUTF(peer_->getURL());
+    if (ret <= 0) {
+      return false;
+    }
+  }
+
+  uint32_t size = properties.size();
+  ret = peer_->write(size);
+  if (ret <= 0) {
+    return false;
+  }
+
+  std::map<std::string, std::string>::iterator it;
+  for (it = properties.begin(); it != properties.end(); it++) {
+    ret = peer_->writeUTF(it->first);
+    if (ret <= 0) {
+      return false;
+    }
+    ret = peer_->writeUTF(it->second);
+    if (ret <= 0) {
+      return false;
+    }
+    logger_->log_info("Site2Site Protocol Send handshake properties %s %s", it->first.c_str(), it->second.c_str());
+  }
+
+  RespondCode code;
+  std::string message;
+
+  ret = readRespond(nullptr, code, message);
+
+  if (ret <= 0) {
+    return false;
+  }
+
+  switch (code) {
+    case PROPERTIES_OK:
+      logger_->log_info("Site2Site HandShake Completed");
+      peer_state_ = HANDSHAKED;
+      return true;
+    case PORT_NOT_IN_VALID_STATE:
+    case UNKNOWN_PORT:
+    case PORTS_DESTINATION_FULL:
+      logger_->log_error("Site2Site HandShake Failed because destination port, %s, is either invalid or full", port_id_str_);
+      ret = -1;
+      return false;
+    default:
+      logger_->log_info("HandShake Failed because of unknown respond code %d", code);
+      ret = -1;
+      return false;
+  }
+
+  return false;
+}
+
+void RawSiteToSiteClient::tearDown() {
+  if (peer_state_ >= ESTABLISHED) {
+    logger_->log_info("Site2Site Protocol tearDown");
+    // need to write shutdown request
+    writeRequestType(SHUTDOWN);
+  }
+
+  known_transactions_.clear();
+  peer_->Close();
+  peer_state_ = IDLE;
+}
+
+bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
+  if (establish() && handShake()) {
+    int status = writeRequestType(REQUEST_PEER_LIST);
+
+    if (status <= 0) {
+      tearDown();
+      return false;
+    }
+
+    uint32_t number;
+    status = peer_->read(number);
+
+    if (status <= 0) {
+      tearDown();
+      return false;
+    }
+
+    for (int i = 0; i < number; i++) {
+      std::string host;
+      status = peer_->readUTF(host);
+      if (status <= 0) {
+        tearDown();
+        return false;
+      }
+      uint32_t port;
+      status = peer_->read(port);
+      if (status <= 0) {
+        tearDown();
+        return false;
+      }
+      uint8_t secure;
+      status = peer_->read(secure);
+      if (status <= 0) {
+        tearDown();
+        return false;
+      }
+      uint32_t count;
+      status = peer_->read(count);
+      if (status <= 0) {
+        tearDown();
+        return false;
+      }
+      PeerStatus status(std::make_shared<Peer>(port_id_, host, port, secure), count, true);
+      peers.push_back(std::move(status));
+      logger_->log_info("Site2Site Peer host %s, port %d, Secure %d", host, port, secure);
+    }
+
+    tearDown();
+    return true;
+  } else {
+    tearDown();
+    return false;
+  }
+}
+
+int RawSiteToSiteClient::writeRequestType(RequestType type) {
+  if (type >= MAX_REQUEST_TYPE)
+    return -1;
+
+  return peer_->writeUTF(RequestTypeStr[type]);
+}
+
+int RawSiteToSiteClient::readRequestType(RequestType &type) {
+  std::string requestTypeStr;
+
+  int ret = peer_->readUTF(requestTypeStr);
+
+  if (ret <= 0)
+    return ret;
+
+  for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) {
+    if (RequestTypeStr[i] == requestTypeStr) {
+      type = (RequestType) i;
+      return ret;
+    }
+  }
+
+  return -1;
+}
+
+int RawSiteToSiteClient::readRespond(const std::shared_ptr<Transaction> &transaction, RespondCode &code, std::string &message) {
+  uint8_t firstByte;
+
+  int ret = peer_->read(firstByte);
+
+  if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1)
+    return -1;
+
+  uint8_t secondByte;
+
+  ret = peer_->read(secondByte);
+
+  if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2)
+    return -1;
+
+  uint8_t thirdByte;
+
+  ret = peer_->read(thirdByte);
+
+  if (ret <= 0)
+    return ret;
+
+  code = (RespondCode) thirdByte;
+
+  RespondCodeContext *resCode = getRespondCodeContext(code);
+
+  if (resCode == NULL) {
+    // Not a valid respond code
+    return -1;
+  }
+  if (resCode->hasDescription) {
+    ret = peer_->readUTF(message);
+    if (ret <= 0)
+      return -1;
+  }
+  return 3 + message.size();
+}
+
+int RawSiteToSiteClient::writeRespond(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message) {
+  RespondCodeContext *resCode = getRespondCodeContext(code);
+
+  if (resCode == NULL) {
+    // Not a valid respond code
+    return -1;
+  }
+
+  uint8_t codeSeq[3];
+  codeSeq[0] = CODE_SEQUENCE_VALUE_1;
+  codeSeq[1] = CODE_SEQUENCE_VALUE_2;
+  codeSeq[2] = (uint8_t) code;
+
+  int ret = peer_->write(codeSeq, 3);
+
+  if (ret != 3)
+    return -1;
+
+  if (resCode->hasDescription) {
+    ret = peer_->writeUTF(message);
+    if (ret > 0) {
+      return (3 + ret);
+    } else {
+      return ret;
+    }
+  } else {
+    return 3;
+  }
+}
+
+bool RawSiteToSiteClient::negotiateCodec() {
+  if (peer_state_ != HANDSHAKED) {
+    logger_->log_error("Site2Site peer state is not handshaked while negotiate codec");
+    return false;
+  }
+
+  logger_->log_info("Site2Site Protocol Negotiate Codec with destination port %s", port_id_str_);
+
+  int status = writeRequestType(NEGOTIATE_FLOWFILE_CODEC);
+
+  if (status <= 0) {
+    return false;
+  }
+
+  // Negotiate the codec version
+  bool ret = initiateCodecResourceNegotiation();
+
+  if (!ret) {
+    logger_->log_error("Site2Site Codec Version Negotiation failed");
+    return false;
+  }
+
+  logger_->log_info("Site2Site Codec Completed and move to READY state for data transfer");
+  peer_state_ = READY;
+
+  return true;
+}
+
+bool RawSiteToSiteClient::bootstrap() {
+  if (peer_state_ == READY)
+    return true;
+
+  tearDown();
+
+  if (establish() && handShake() && negotiateCodec()) {
+    logger_->log_info("Site2Site Ready For data transaction");
+    return true;
+  } else {
+    peer_->yield();
+    tearDown();
+    return false;
+  }
+}
+
+std::shared_ptr<Transaction> RawSiteToSiteClient::createTransaction(std::string &transactionID, TransferDirection direction) {
+  int ret;
+  bool dataAvailable;
+  std::shared_ptr<Transaction> transaction = nullptr;
+
+  if (peer_state_ != READY) {
+    bootstrap();
+  }
+
+  if (peer_state_ != READY) {
+    return transaction;
+  }
+
+  if (direction == RECEIVE) {
+    ret = writeRequestType(RECEIVE_FLOWFILES);
+
+    if (ret <= 0) {
+      return transaction;
+    }
+
+    RespondCode code;
+    std::string message;
+
+    ret = readRespond(nullptr, code, message);
+
+    if (ret <= 0) {
+      return transaction;
+    }
+
+    org::apache::nifi::minifi::io::CRCStream<SiteToSitePeer> crcstream(peer_.get());
+    switch (code) {
+      case MORE_DATA:
+        dataAvailable = true;
+        logger_->log_info("Site2Site peer indicates that data is available");
+        transaction = std::make_shared<Transaction>(direction, crcstream);
+        known_transactions_[transaction->getUUIDStr()] = transaction;
+        transactionID = transaction->getUUIDStr();
+        transaction->setDataAvailable(dataAvailable);
+        logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str());
+        return transaction;
+      case NO_MORE_DATA:
+        dataAvailable = false;
+        logger_->log_info("Site2Site peer indicates that no data is available");
+        transaction = std::make_shared<Transaction>(direction, crcstream);
+        known_transactions_[transaction->getUUIDStr()] = transaction;
+        transactionID = transaction->getUUIDStr();
+        transaction->setDataAvailable(dataAvailable);
+        logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str());
+        return transaction;
+      default:
+        logger_->log_info("Site2Site got unexpected response %d when asking for data", code);
+        return NULL;
+    }
+  } else {
+    ret = writeRequestType(SEND_FLOWFILES);
+
+    if (ret <= 0) {
+      return NULL;
+    } else {
+      org::apache::nifi::minifi::io::CRCStream<SiteToSitePeer> crcstream(peer_.get());
+      transaction = std::make_shared<Transaction>(direction, crcstream);
+      known_transactions_[transaction->getUUIDStr()] = transaction;
+      transactionID = transaction->getUUIDStr();
+      logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str());
+      return transaction;
+    }
+  }
+}
+
+bool RawSiteToSiteClient::transmitPayload(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session, const std::string &payload,
+                                          std::map<std::string, std::string> attributes) {
+  std::shared_ptr<Transaction> transaction = NULL;
+
+  if (payload.length() <= 0)
+    return false;
+
+  if (peer_state_ != READY) {
+    if (!bootstrap()) {
+      return false;
+    }
+  }
+
+  if (peer_state_ != READY) {
+    context->yield();
+    tearDown();
+    throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
+  }
+
+  // Create the transaction
+  std::string transactionID;
+  transaction = createTransaction(transactionID, SEND);
+
+  if (transaction == NULL) {
+    context->yield();
+    tearDown();
+    throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
+  }
+
+  try {
+    DataPacket packet(getLogger(), transaction, attributes, payload);
+
+    int16_t resp = send(transactionID, &packet, nullptr, session);
+    if (resp == -1) {
+      throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
+    }
+    logger_->log_info("Site2Site transaction %s send bytes length %d", transactionID.c_str(), payload.length());
+
+    if (!confirm(transactionID)) {
+      throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
+    }
+    if (!complete(transactionID)) {
+      throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
+    }
+    logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->current_transfers_, transaction->_bytes);
+  } catch (std::exception &exception) {
+    if (transaction)
+      deleteTransaction(transactionID);
+    context->yield();
+    tearDown();
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    if (transaction)
+      deleteTransaction(transactionID);
+    context->yield();
+    tearDown();
+    logger_->log_debug("Caught Exception during RawSiteToSiteClient::transferBytes");
+    throw;
+  }
+
+  deleteTransaction(transactionID);
+
+  return true;
+}
+
+} /* namespace sitetosite */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */