You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/01/12 17:57:52 UTC

[2/2] nifi-minifi-cpp git commit: MINIFICPP-37: Create an executable to support basic localized devops operations.

MINIFICPP-37: Create an executable to support basic localized devops operations.

This includes stopping components, clearing queues, getting queue information, and updating the flow

MINIFICPP-37: Updates to allow host/port to be specified and allow any interface to be used


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/edc8858f
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/edc8858f
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/edc8858f

Branch: refs/heads/master
Commit: edc8858f05f2e6b6e239be0c9aac942bbb5f4e49
Parents: a6c7a9f
Author: Marc Parisi <ph...@apache.org>
Authored: Sun Jan 7 11:42:03 2018 -0500
Committer: Marc Parisi <ph...@apache.org>
Committed: Fri Jan 12 12:57:21 2018 -0500

----------------------------------------------------------------------
 CMakeLists.txt                                  |    5 +
 LICENSE                                         |   22 +
 README.md                                       |   55 +
 conf/minifi.properties                          |    5 +-
 controller/CMakeLists.txt                       |   90 +
 controller/Controller.h                         |  200 ++
 controller/MiNiFiController.cpp                 |  217 ++
 extensions/http-curl/protocols/RESTReceiver.cpp |    4 +-
 extensions/http-curl/protocols/RESTReceiver.h   |    3 +-
 libminifi/include/c2/C2Agent.h                  |    2 +-
 libminifi/include/c2/ControllerSocketProtocol.h |   94 +
 libminifi/include/c2/HeartBeatReporter.h        |    7 +-
 libminifi/include/io/ClientSocket.h             |   22 +-
 libminifi/include/io/DescriptorStream.h         |  185 ++
 libminifi/include/io/ServerSocket.h             |   69 +
 libminifi/include/io/Sockets.h                  |    1 +
 libminifi/src/FlowController.cpp                |    4 +-
 libminifi/src/c2/C2Agent.cpp                    |   13 +-
 libminifi/src/c2/ControllerSocketProtocol.cpp   |  259 +++
 libminifi/src/io/ClientSocket.cpp               |   36 +-
 libminifi/src/io/DescriptorStream.cpp           |  196 ++
 libminifi/src/io/ServerSocket.cpp               |   81 +
 .../integration/ProvenanceReportingTest.cpp     |    2 +-
 libminifi/test/unit/ControllerTests.cpp         |  262 +++
 libminifi/test/unit/GetTCPTests.cpp             |    6 +-
 libminifi/test/unit/SocketTests.cpp             |    9 +-
 main/Main.h                                     |   57 +
 main/MiNiFiMain.cpp                             |   32 +-
 thirdparty/cxxopts/CHANGELOG.md                 |   45 +
 thirdparty/cxxopts/CMakeLists.txt               |   85 +
 thirdparty/cxxopts/INSTALL                      |   10 +
 thirdparty/cxxopts/LICENSE                      |   19 +
 thirdparty/cxxopts/README.md                    |  119 ++
 thirdparty/cxxopts/cxxopts-config.cmake.in      |    4 +
 thirdparty/cxxopts/include/cxxopts.hpp          | 1988 ++++++++++++++++++
 thirdparty/cxxopts/src/.gitignore               |    1 +
 thirdparty/cxxopts/src/CMakeLists.txt           |   24 +
 thirdparty/cxxopts/src/example.cpp              |  138 ++
 38 files changed, 4298 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 57afc95..1487417 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -148,6 +148,11 @@ if (NOT DISABLE_CIVET)
 createExtension(CIVETWEB CIVETWEB "This enables ListenHTTP" "extensions/civetweb")
 endif()
 
+if (NOT DISABLE_CURL AND NOT DISABLE_CONTROLLER)
+	add_subdirectory(thirdparty/cxxopts)
+	add_subdirectory(controller)
+endif()
+
 ## Add the rocks DB extension
 if (NOT ROCKSDB_FOUND OR BUILD_ROCKSDB)
 	set(BUILD_RD "TRUE")

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index a270b73..8cde4ea 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1053,3 +1053,25 @@ Redistribution and use in source and binary forms, with or without modification,
     Neither the name of the Eclipse Foundation, Inc. nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. 
 
 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+This product bundles 'cxxopts' which is available under an MIT license.
+
+Copyright (c) 2014 Jarryd Beck
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index ed0b834..1682017 100644
--- a/README.md
+++ b/README.md
@@ -628,6 +628,61 @@ MiNiFi can then be stopped by issuing:
 MiNiFi can also be installed as a system service using minifi.sh with an optional "service name" (default: minifi)
 
     $ ./bin/minifi.sh install [service name]
+    
+### Managing MiNFI C++ through the MiNiFi Controller
+
+The MiNiFi controller is an executable in the bin directory that can be used to control the MiNFi C++ agent while it runs. Currently the controller will let you stop subcomponents within a running instance, clear queues, get the status of queues, and update the flow for a warm re-deploy.
+
+The minificontroller can track a single MiNiFi C++ agent through the use of three options. Port is required.
+The hostname is not and will default to localhost. Additionally, controller.socket.local.any.interface allows
+you to bind to any address when using localhost. Otherwise, we will bind only to the loopback adapter so only
+minificontroller on the local host can control the agent:
+
+	$ controller.socket.host=localhost
+	$ controller.socket.port=9998
+	$ controller.socket.local.any.interface=true/false ( default false)
+
+These are defined by default to the above values. If the port option is left undefined, the MiNiFi controller
+will be disabled in your deployment.
+
+ The executable is stored in the bin directory and is titled minificontroller. Available commands are listed below.
+ 
+ #### Specifying connecting information
+ 
+   ./minificontroller --host "host name" --port "port"
+
+        * By default these options use those defined in minifi.properties and are not required
+
+ #### Start Command
+ 
+   ./minificontroller --start "component name"
+ 
+ #### Stop command 
+   ./minificontroller --stop "component name"
+   	  
+ #### List connections command
+   ./minificontroller --list connections
+      
+ #### List components command
+   ./minificontroller --list components
+ 
+ #### Clear connection command
+   ./minificontroller --clear "connection name"
+      
+ #### GetSize command
+   ./minificontroller --getsize "connection name"
+
+       * Returns the size of the connection. The current size along with the max will be reported
+ 
+ #### Update flow
+   ./minificontroller --updateflow "config yml"
+    
+       *Updates the flow file reference and performs a warm re-deploy.
+ 
+ #### Get full connection command     
+   ./minificontroller --getfull 
+   
+       *Provides a list of full connections, if any.
 
 ### Extensions
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/conf/minifi.properties
----------------------------------------------------------------------
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 6c75305..a528c01 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -15,8 +15,6 @@
 
 # Core Properties #
 nifi.version=0.1.0
-#disable the c2 services
-nifi.c2.enable=false
 nifi.flow.configuration.file=./conf/config.yml
 nifi.administrative.yield.duration=30 sec
 # If a component has no work to do (is "bored"), how long should we wait before checking again for work?
@@ -33,3 +31,6 @@ nifi.https.client.pass.phrase=./conf/password
 nifi.https.client.ca.certificate=./conf/nifi-cert.pem
 #nifi.rest.api.user.name=admin
 #nifi.rest.api.password=password
+## enable the controller socket provider on port 9998
+controller.socket.host=localhost
+controller.socket.port=9998

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/controller/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/controller/CMakeLists.txt b/controller/CMakeLists.txt
new file mode 100644
index 0000000..da6f499
--- /dev/null
+++ b/controller/CMakeLists.txt
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+IF(POLICY CMP0048)
+  CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
+
+include_directories(../main/ ../libminifi/include  ../libminifi/include/c2  ../libminifi/include/c2/protocols/  ../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics  ../libminifi/include/core/yaml  ../libminifi/include/core  ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ${CIVET_THIRDPARTY_ROOT}/include ../thirdparty/cxxopts/include  ../thirdparty/)
+
+include(CheckCXXCompilerFlag)
+CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
+CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
+if(COMPILER_SUPPORTS_CXX11)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Os")
+elseif(COMPILER_SUPPORTS_CXX0X)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x -Os")
+else()
+ message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
+endif()
+
+add_executable(minificontroller MiNiFiController.cpp)
+if(THREADS_HAVE_PTHREAD_ARG)
+  target_compile_options(PUBLIC minificontroller "-pthread")
+endif()
+if(CMAKE_THREAD_LIBS_INIT)
+  target_link_libraries(minificontroller "${CMAKE_THREAD_LIBS_INIT}")
+endif()
+
+# Include UUID
+find_package(UUID REQUIRED)
+
+# Include OpenSSL
+find_package(OpenSSL REQUIRED)
+include_directories(${OPENSSL_INCLUDE_DIR})
+
+# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, jsoncpp and rocksdb
+target_link_libraries(minificontroller core-minifi)
+
+if (APPLE)
+	target_link_libraries (minificontroller -Wl,-all_load minifi)
+else ()
+	target_link_libraries (minificontroller -Wl,--whole-archive minifi -Wl,--no-whole-archive)
+endif ()
+
+
+target_link_libraries(minificontroller yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB} ${UUID_LIBRARIES} cxxopts)
+
+
+if (APPLE)
+	get_property(extensions GLOBAL PROPERTY EXTENSION-OPTIONS)
+	foreach(EXTENSION ${extensions})
+		message("Linking against ${EXTENSION}")
+		target_link_libraries (minificontroller -Wl,-all_load ${EXTENSION})
+	endforeach()    
+else ()
+	get_property(extensions GLOBAL PROPERTY EXTENSION-OPTIONS)
+	foreach(EXTENSION ${extensions})
+	  target_link_libraries (minificontroller -Wl,--whole-archive ${EXTENSION} -Wl,--no-whole-archive)
+	endforeach()
+endif ()
+
+set_target_properties(minificontroller
+        PROPERTIES OUTPUT_NAME minificontroller)
+
+install(TARGETS minificontroller
+        RUNTIME
+        DESTINATION bin
+        COMPONENT bin)
+
+
+add_custom_command(TARGET minificontroller POST_BUILD
+           COMMAND cat ${CMAKE_BINARY_DIR}/all.log)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/controller/Controller.h
----------------------------------------------------------------------
diff --git a/controller/Controller.h b/controller/Controller.h
new file mode 100644
index 0000000..9cd3ba1
--- /dev/null
+++ b/controller/Controller.h
@@ -0,0 +1,200 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CONTROLLER_CONTROLLER_H_
+#define CONTROLLER_CONTROLLER_H_
+
+#include "io/ClientSocket.h"
+#include "c2/ControllerSocketProtocol.h"
+
+/**
+ * Sends a single argument comment
+ * @param socket socket unique ptr.
+ * @param op operation to perform
+ * @param value value to send
+ */
+bool sendSingleCommand(std::unique_ptr<minifi::io::Socket> socket, uint8_t op, const std::string value) {
+  socket->initialize();
+  std::vector<uint8_t> data;
+  minifi::io::BaseStream stream;
+  stream.writeData(&op, 1);
+  stream.writeUTF(value);
+  socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize());
+  return true;
+}
+
+/**
+ * Stops a stopped component
+ * @param socket socket unique ptr.
+ * @param op operation to perform
+ */
+bool stopComponent(std::unique_ptr<minifi::io::Socket> socket, std::string component) {
+  return sendSingleCommand(std::move(socket), minifi::c2::Operation::STOP, component);
+}
+
+/**
+ * Starts a previously stopped component.
+ * @param socket socket unique ptr.
+ * @param op operation to perform
+ */
+bool startComponent(std::unique_ptr<minifi::io::Socket> socket, std::string component) {
+  return sendSingleCommand(std::move(socket), minifi::c2::Operation::START, component);
+}
+
+/**
+ * Clears a connection queue.
+ * @param socket socket unique ptr.
+ * @param op operation to perform
+ */
+bool clearConnection(std::unique_ptr<minifi::io::Socket> socket, std::string connection) {
+  return sendSingleCommand(std::move(socket), minifi::c2::Operation::CLEAR, connection);
+}
+
+/**
+ * Updates the flow to the provided file
+ */
+int updateFlow(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out, std::string file) {
+  socket->initialize();
+  std::vector<uint8_t> data;
+  uint8_t op = minifi::c2::Operation::UPDATE;
+  minifi::io::BaseStream stream;
+  stream.writeData(&op, 1);
+  stream.writeUTF("flow");
+  stream.writeUTF(file);
+  if (socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize()) < 0) {
+    return -1;
+  }
+  // read the response
+  uint8_t resp = 0;
+  socket->readData(&resp, 1);
+  if (resp == minifi::c2::Operation::DESCRIBE) {
+    uint16_t connections = 0;
+    socket->read(connections);
+    out << connections << " are full" << std::endl;
+    for (int i = 0; i < connections; i++) {
+      std::string fullcomponent;
+      socket->readUTF(fullcomponent);
+      out << fullcomponent << " is full" << std::endl;
+    }
+  }
+  return 0;
+}
+
+/**
+ * Lists connections which are full
+ * @param socket socket ptr
+ */
+int getFullConnections(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out) {
+  socket->initialize();
+  std::vector<uint8_t> data;
+  uint8_t op = minifi::c2::Operation::DESCRIBE;
+  minifi::io::BaseStream stream;
+  stream.writeData(&op, 1);
+  stream.writeUTF("getfull");
+  if (socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize()) < 0) {
+    return -1;
+  }
+  // read the response
+  uint8_t resp = 0;
+  socket->readData(&resp, 1);
+  if (resp == minifi::c2::Operation::DESCRIBE) {
+    uint16_t connections = 0;
+    socket->read(connections);
+    out << connections << " are full" << std::endl;
+    for (int i = 0; i < connections; i++) {
+      std::string fullcomponent;
+      socket->readUTF(fullcomponent);
+      out << fullcomponent << " is full" << std::endl;
+    }
+  }
+  return 0;
+}
+
+/**
+ * Prints the connection size for the provided connection.
+ * @param socket socket ptr
+ * @param connection connection whose size will be returned.
+ */
+int getConnectionSize(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out, std::string connection) {
+  socket->initialize();
+  std::vector<uint8_t> data;
+  uint8_t op = minifi::c2::Operation::DESCRIBE;
+  minifi::io::BaseStream stream;
+  stream.writeData(&op, 1);
+  stream.writeUTF("queue");
+  stream.writeUTF(connection);
+  if (socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize()) < 0) {
+    return -1;
+  }
+  // read the response
+  uint8_t resp = 0;
+  socket->readData(&resp, 1);
+  if (resp == minifi::c2::Operation::DESCRIBE) {
+    std::string size;
+    socket->readUTF(size);
+    out << "Size/Max of " << connection << " " << size << std::endl;
+  }
+  return 0;
+}
+
+int listComponents(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out, bool show_header = true) {
+  socket->initialize();
+  minifi::io::BaseStream stream;
+  uint8_t op = minifi::c2::Operation::DESCRIBE;
+  stream.writeData(&op, 1);
+  stream.writeUTF("components");
+  if (socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize()) < 0) {
+    return -1;
+  }
+  uint16_t responses = 0;
+  socket->readData(&op, 1);
+  socket->read(responses);
+  if (show_header)
+    out << "Components:" << std::endl;
+
+  for (int i = 0; i < responses; i++) {
+    std::string name;
+    socket->readUTF(name, false);
+    out << name << std::endl;
+  }
+  return 0;
+}
+
+int listConnections(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out, bool show_header = true) {
+  socket->initialize();
+  minifi::io::BaseStream stream;
+  uint8_t op = minifi::c2::Operation::DESCRIBE;
+  stream.writeData(&op, 1);
+  stream.writeUTF("connections");
+  if (socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize()) < 0) {
+    return -1;
+  }
+  uint16_t responses = 0;
+  socket->readData(&op, 1);
+  socket->read(responses);
+  if (show_header)
+    out << "Connection Names:" << std::endl;
+
+  for (int i = 0; i < responses; i++) {
+    std::string name;
+    socket->readUTF(name, false);
+    out << name << std::endl;
+  }
+  return 0;
+}
+
+#endif /* CONTROLLER_CONTROLLER_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/controller/MiNiFiController.cpp
----------------------------------------------------------------------
diff --git a/controller/MiNiFiController.cpp b/controller/MiNiFiController.cpp
new file mode 100644
index 0000000..65d9dbd
--- /dev/null
+++ b/controller/MiNiFiController.cpp
@@ -0,0 +1,217 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <fcntl.h>
+#include <stdio.h>
+#include <semaphore.h>
+#include <signal.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <unistd.h>
+#include <yaml-cpp/yaml.h>
+#include <iostream>
+#include "io/BaseStream.h"
+
+#include "core/Core.h"
+
+#include "core/FlowConfiguration.h"
+#include "core/ConfigurationFactory.h"
+#include "core/RepositoryFactory.h"
+#include "FlowController.h"
+#include "Main.h"
+
+#include "Controller.h"
+#include "c2/ControllerSocketProtocol.h"
+
+#include "cxxopts.hpp"
+
+int main(int argc, char **argv) {
+
+  std::shared_ptr<logging::Logger> logger = logging::LoggerConfiguration::getConfiguration().getLogger("controller");
+
+  // assumes POSIX compliant environment
+  std::string minifiHome;
+  if (const char *env_p = std::getenv(MINIFI_HOME_ENV_KEY)) {
+    minifiHome = env_p;
+    logger->log_info("Using MINIFI_HOME=%s from environment.", minifiHome);
+  } else {
+    logger->log_info("MINIFI_HOME is not set; determining based on environment.");
+    char *path = nullptr;
+    char full_path[PATH_MAX];
+    path = realpath(argv[0], full_path);
+
+    if (path != nullptr) {
+      std::string minifiHomePath(path);
+      if (minifiHomePath.find_last_of("/\\") != std::string::npos) {
+        minifiHomePath = minifiHomePath.substr(0, minifiHomePath.find_last_of("/\\"));  //Remove /minifi from path
+        minifiHome = minifiHomePath.substr(0, minifiHomePath.find_last_of("/\\"));    //Remove /bin from path
+      }
+    }
+
+    // attempt to use cwd as MINIFI_HOME
+    if (minifiHome.empty() || !validHome(minifiHome)) {
+      char cwd[PATH_MAX];
+      getcwd(cwd, PATH_MAX);
+      minifiHome = cwd;
+    }
+
+  }
+
+  if (!validHome(minifiHome)) {
+    logger->log_error("No valid MINIFI_HOME could be inferred. "
+                      "Please set MINIFI_HOME or run minifi from a valid location.");
+    return -1;
+  }
+
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  configuration->setHome(minifiHome);
+  configuration->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
+
+  std::shared_ptr<logging::LoggerProperties> log_properties = std::make_shared<logging::LoggerProperties>();
+  log_properties->setHome(minifiHome);
+  log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE);
+  logging::LoggerConfiguration::getConfiguration().initialize(log_properties);
+
+  auto stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configuration);
+
+  std::string host = "localhost", portStr, caCert;
+  int port = -1;
+
+  cxxopts::Options options("MiNiFiController", "MiNiFi local agent controller");
+  options.positional_help("[optional args]").show_positional_help();
+
+  options.add_options()  //NOLINT
+  ("h,help", "Shows Help")  //NOLINT
+  ("host", "Specifies connecting host name", cxxopts::value<std::string>())  //NOLINT
+  ("port", "Specifies connecting host port", cxxopts::value<int>())  //NOLINT
+  ("stop", "Shuts down the provided component", cxxopts::value<std::vector<std::string>>())  //NOLINT
+  ("start", "Starts provided component", cxxopts::value<std::vector<std::string>>())  //NOLINT
+  ("l,list", "Provides a list of connections or processors", cxxopts::value<std::string>())  //NOLINT
+  ("c,clear", "Clears the associated connection queue", cxxopts::value<std::vector<std::string>>())  //NOLINT
+  ("getsize", "Reports the size of the associated connection queue", cxxopts::value<std::vector<std::string>>())  //NOLINT
+  ("updateflow", "Updates the flow of the agent using the provided flow file", cxxopts::value<std::string>())  //NOLINT
+  ("getfull", "Reports a list of full connections")  //NOLINT
+  ("noheaders", "Removes headers from output streams");
+
+  bool show_headers = true;
+
+  try {
+    auto result = options.parse(argc, argv);
+
+    if (result.count("help")) {
+      std::cout << options.help( { "", "Group" }) << std::endl;
+      exit(0);
+    }
+
+    if (result.count("host")) {
+      host = result["host"].as<std::string>();
+    } else {
+      configuration->get("controller.socket.host", host);
+    }
+
+    if (result.count("port")) {
+      port = result["port"].as<int>();
+    } else {
+      if (port == -1 && configuration->get("controller.socket.port", portStr)) {
+        port = std::stoi(portStr);
+      }
+    }
+
+    if ((IsNullOrEmpty(host) && port == -1)) {
+      std::cout << "MiNiFi Controller is disabled" << std::endl;
+      exit(0);
+    } else
+
+    if (result.count("noheaders")) {
+      show_headers = false;
+    }
+
+    if (result.count("stop") > 0) {
+      auto& components = result["stop"].as<std::vector<std::string>>();
+      for (const auto& component : components) {
+        auto socket = stream_factory_->createSocket(host, port);
+        if (!stopComponent(std::move(socket), component))
+          std::cout << component << " requested to stop" << std::endl;
+        else
+          std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+      }
+    }
+
+    if (result.count("start") > 0) {
+      auto& components = result["start"].as<std::vector<std::string>>();
+      for (const auto& component : components) {
+        auto socket = stream_factory_->createSocket(host, port);
+        if (!startComponent(std::move(socket), component))
+          std::cout << component << " requested to start" << std::endl;
+        else
+          std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+      }
+    }
+
+    if (result.count("c") > 0) {
+      auto& components = result["c"].as<std::vector<std::string>>();
+      for (const auto& connection : components) {
+        auto socket = stream_factory_->createSocket(host, port);
+        if (!clearConnection(std::move(socket), connection))
+          std::cout << "Cleared " << connection << std::endl;
+        else
+          std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+      }
+    }
+
+    if (result.count("getsize") > 0) {
+      auto& components = result["getsize"].as<std::vector<std::string>>();
+      for (const auto& component : components) {
+        auto socket = stream_factory_->createSocket(host, port);
+        if (getConnectionSize(std::move(socket), std::cout, component) < 0)
+          std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+      }
+
+    }
+
+    if (result.count("l") > 0) {
+      auto& option = result["l"].as<std::string>();
+      auto socket = stream_factory_->createSocket(host, port);
+      if (option == "components") {
+        if (listComponents(std::move(socket), std::cout, show_headers) < 0)
+          std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+      } else if (option == "connections") {
+        if (listConnections(std::move(socket), std::cout, show_headers) < 0)
+          std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+      }
+
+    }
+
+    if (result.count("getfull") > 0) {
+      auto socket = stream_factory_->createSocket(host, port);
+      if (getFullConnections(std::move(socket), std::cout) < 0)
+        std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+    }
+
+    if (result.count("updateflow") > 0) {
+      auto& flow_file = result["updateflow"].as<std::string>();
+      auto socket = stream_factory_->createSocket(host, port);
+      if (updateFlow(std::move(socket), std::cout, flow_file) < 0)
+        std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+    }
+  } catch (...) {
+    std::cout << options.help( { "", "Group" }) << std::endl;
+    exit(0);
+  }
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/extensions/http-curl/protocols/RESTReceiver.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp b/extensions/http-curl/protocols/RESTReceiver.cpp
index 4c46516..ac9d229 100644
--- a/extensions/http-curl/protocols/RESTReceiver.cpp
+++ b/extensions/http-curl/protocols/RESTReceiver.cpp
@@ -44,8 +44,8 @@ RESTReceiver::RESTReceiver(std::string name, uuid_t uuid)
       logger_(logging::LoggerFactory<RESTReceiver>::getLogger()) {
 }
 
-void RESTReceiver::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
-  HeartBeatReporter::initialize(controller, configure);
+void RESTReceiver::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure) {
+  HeartBeatReporter::initialize(controller, updateSink, configure);
   logger_->log_debug("Initializing rest receiveer");
   if (nullptr != configuration_) {
     std::string listeningPort, rootUri, caCert;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/extensions/http-curl/protocols/RESTReceiver.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTReceiver.h b/extensions/http-curl/protocols/RESTReceiver.h
index 4793ee3..e19932c 100644
--- a/extensions/http-curl/protocols/RESTReceiver.h
+++ b/extensions/http-curl/protocols/RESTReceiver.h
@@ -50,7 +50,8 @@ class RESTReceiver : public RESTProtocol, public HeartBeatReporter {
  public:
   RESTReceiver(std::string name, uuid_t uuid = nullptr);
 
-  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
+  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+                          const std::shared_ptr<Configure> &configure) override;
   virtual int16_t heartbeat(const C2Payload &heartbeat) override;
 
  protected:

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/include/c2/C2Agent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index b5d4d31..810eede 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -73,7 +73,7 @@ class C2Agent : public state::UpdateController, public state::metrics::MetricsSi
    */
   virtual int16_t setMetrics(const std::shared_ptr<state::metrics::Metrics> &metric);
 
-  int64_t getHeartBestDelay(){
+  int64_t getHeartBeatDelay(){
     std::lock_guard<std::mutex> lock(heartbeat_mutex);
     return heart_beat_period_;
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/include/c2/ControllerSocketProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/ControllerSocketProtocol.h b/libminifi/include/c2/ControllerSocketProtocol.h
new file mode 100644
index 0000000..156cf5c
--- /dev/null
+++ b/libminifi/include/c2/ControllerSocketProtocol.h
@@ -0,0 +1,94 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_CONTROLLERSOCKETPROTOCOL_H_
+#define LIBMINIFI_INCLUDE_C2_CONTROLLERSOCKETPROTOCOL_H_
+
+#include "core/Resource.h"
+#include "HeartBeatReporter.h"
+#include "io/StreamFactory.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Purpose: Creates a reporter that can handle basic c2 operations for a localized environment
+ * through a simple TCP socket.
+ */
+class ControllerSocketProtocol : public HeartBeatReporter {
+ public:
+
+  ControllerSocketProtocol(std::string name, uuid_t uuid = nullptr)
+      : HeartBeatReporter(name, uuid),
+        logger_(logging::LoggerFactory<ControllerSocketProtocol>::getLogger()) {
+
+  }
+
+  /**
+   * Initialize the socket protocol.
+   * @param controller controller service provider.
+   * @param updateSink update mechanism that will be used to stop/clear elements
+   * @param configuration configuration class.
+   */
+  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+                          const std::shared_ptr<Configure> &configuration);
+
+  /**
+   * Handles the heartbeat
+   * @param payload incoming payload. From this function we only care about queue metrics.
+   */
+  virtual int16_t heartbeat(const C2Payload &payload);
+
+ protected:
+
+  /**
+   * Parses content from the content response.
+   */
+  void parse_content(const std::vector<C2ContentResponse> &content);
+
+  std::mutex controller_mutex_;
+
+  std::map<std::string, bool> queue_full_;
+
+  std::map<std::string, uint64_t> queue_size_;
+
+  std::map<std::string, uint64_t> queue_max_;
+
+  std::map<std::string, bool> component_map_;
+
+  std::unique_ptr<io::ServerSocket> server_socket_;
+
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory_;
+
+ private:
+
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(ControllerSocketProtocol);
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_CONTROLLERSOCKETPROTOCOL_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/include/c2/HeartBeatReporter.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/HeartBeatReporter.h b/libminifi/include/c2/HeartBeatReporter.h
index 3d0fd49..81f8828 100644
--- a/libminifi/include/c2/HeartBeatReporter.h
+++ b/libminifi/include/c2/HeartBeatReporter.h
@@ -39,11 +39,14 @@ class HeartBeatReporter : public core::Connectable {
   HeartBeatReporter(std::string name, uuid_t uuid)
       : core::Connectable(name, uuid),
         controller_(nullptr),
+        update_sink_(nullptr),
         configuration_(nullptr) {
   }
 
-  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+                          const std::shared_ptr<Configure> &configure) {
     controller_ = controller;
+    update_sink_ = updateSink;
     configuration_ = configure;
   }
   virtual ~HeartBeatReporter() {
@@ -89,6 +92,8 @@ class HeartBeatReporter : public core::Connectable {
 
   std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
 
+  std::shared_ptr<state::StateMonitor> update_sink_;
+
   std::shared_ptr<Configure> configuration_;
 };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/include/io/ClientSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h
index 216ef3d..ea445be 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -58,16 +58,6 @@ class SocketContext {
 class Socket : public BaseStream {
  public:
   /**
-   * Constructor that accepts host name, port and listeners. With this
-   * contructor we will be creating a server socket
-   * @param context the SocketContext
-   * @param hostname our host name
-   * @param port connecting port
-   * @param listeners number of listeners in the queue
-   */
-  explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners);
-
-  /**
    * Constructor that creates a client socket.
    * @param context the SocketContext
    * @param hostname hostname we are connecting to.
@@ -219,6 +209,16 @@ class Socket : public BaseStream {
  protected:
 
   /**
+   * Constructor that accepts host name, port and listeners. With this
+   * contructor we will be creating a server socket
+   * @param context the SocketContext
+   * @param hostname our host name
+   * @param port connecting port
+   * @param listeners number of listeners in the queue
+   */
+  explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners);
+
+  /**
    * Creates a vector and returns the vector using the provided
    * type name.
    * @param t incoming object
@@ -255,6 +255,8 @@ class Socket : public BaseStream {
   std::string canonical_hostname_;
   uint16_t port_;
 
+  bool is_loopback_only_;
+
   // connection information
   int32_t socket_file_descriptor_;
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/include/io/DescriptorStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/DescriptorStream.h b/libminifi/include/io/DescriptorStream.h
new file mode 100644
index 0000000..e6d843c
--- /dev/null
+++ b/libminifi/include/io/DescriptorStream.h
@@ -0,0 +1,185 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_DESCRIPTORSTREAM_H_
+#define LIBMINIFI_INCLUDE_IO_DESCRIPTORSTREAM_H_
+
+#include <iostream>
+#include <cstdint>
+#include <string>
+#include "EndianCheck.h"
+#include "BaseStream.h"
+#include "Serializable.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+/**
+ * Purpose: File Stream Base stream extension. This is intended to be a thread safe access to
+ * read/write to the local file system.
+ *
+ * Design: Simply extends BaseStream and overrides readData/writeData to allow a sink to the
+ * fstream object.
+ */
+class DescriptorStream : public io::BaseStream {
+ public:
+  /**
+   * File Stream constructor that accepts an fstream shared pointer.
+   * It must already be initialized for read and write.
+   */
+  explicit DescriptorStream(int fd);
+
+  virtual ~DescriptorStream() {
+
+  }
+
+  /**
+   * Skip to the specified offset.
+   * @param offset offset to which we will skip
+   */
+  void seek(uint64_t offset);
+
+  const uint64_t getSize() const {
+    return -1;
+  }
+
+  // data stream extensions
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(std::vector<uint8_t> &buf, int buflen);
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(uint8_t *buf, int buflen);
+
+  /**
+   * Write value to the stream using std::vector
+   * @param buf incoming buffer
+   * @param buflen buffer to write
+   *
+   */
+  virtual int writeData(std::vector<uint8_t> &buf, int buflen);
+
+  /**
+   * writes value to stream
+   * @param value value to write
+   * @param size size of value
+   */
+  virtual int writeData(uint8_t *value, int size);
+
+  /**
+   * Returns the underlying buffer
+   * @return vector's array
+   **/
+  const uint8_t *getBuffer() const {
+    throw std::runtime_error("Stream does not support this operation");
+  }
+
+  /**
+   * reads a byte from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint8_t &value);
+
+  /**
+   * reads two bytes from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint16_t &base_value, bool is_little_endian = false);
+
+  /**
+   * reads a byte from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(char &value);
+
+  /**
+   * reads a byte array from the stream
+   * @param value reference in which will set the result
+   * @param len length to read
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint8_t *value, int len);
+
+  /**
+   * reads four bytes from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint32_t &value, bool is_little_endian = false);
+
+  /**
+   * reads eight byte from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint64_t &value, bool is_little_endian = false);
+
+
+  /**
+   * read UTF from stream
+   * @param str reference string
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int readUTF(std::string &str, bool widen = false);
+
+ protected:
+
+  /**
+   * Creates a vector and returns the vector using the provided
+   * type name.
+   * @param t incoming object
+   * @returns vector.
+   */
+  template<typename T>
+  std::vector<uint8_t> readBuffer(const T&);
+  std::recursive_mutex file_lock_;
+
+  int fd_;
+
+ private:
+
+  std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_IO_DESCRIPTORSTREAM_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/include/io/ServerSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/ServerSocket.h b/libminifi/include/io/ServerSocket.h
new file mode 100644
index 0000000..025d05f
--- /dev/null
+++ b/libminifi/include/io/ServerSocket.h
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_SERVERSOCKET_H_
+#define LIBMINIFI_INCLUDE_IO_SERVERSOCKET_H_
+
+#include "ClientSocket.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+/**
+ * Purpose: Server socket abstraction that makes focusing the accept/block paradigm
+ * simpler.
+ */
+class ServerSocket : public Socket {
+ public:
+  explicit ServerSocket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners);
+
+  virtual ~ServerSocket();
+
+  int16_t initialize(bool loopbackOnly){
+    is_loopback_only_ = loopbackOnly;
+    return Socket::initialize();
+  }
+
+  virtual int16_t initialize(){
+    return Socket::initialize();
+  }
+
+  /**
+   * Registers a call back and starts the read for the server socket.
+   */
+  void registerCallback(std::function<bool()> accept_function, std::function<void(int)> handler);
+
+ private:
+
+  void close_fd(int fd );
+
+  std::atomic<bool> running_;
+
+  std::thread server_read_thread_;
+
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_IO_SERVERSOCKET_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/include/io/Sockets.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/Sockets.h b/libminifi/include/io/Sockets.h
index 2c0b163..78ea645 100644
--- a/libminifi/include/io/Sockets.h
+++ b/libminifi/include/io/Sockets.h
@@ -19,6 +19,7 @@
 #define LIBMINIFI_INCLUDE_IO_SOCKET_H_
 
 #include "ClientSocket.h"
+#include "ServerSocket.h"
 
 #ifdef OPENSSL_SUPPORT
 #include "tls/TLSSocket.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 7e06727..03452b6 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -365,9 +365,9 @@ void FlowController::initializeC2() {
 
     std::shared_ptr<c2::C2Agent> agent = std::make_shared<c2::C2Agent>(std::dynamic_pointer_cast<FlowController>(shared_from_this()), std::dynamic_pointer_cast<FlowController>(shared_from_this()),
                                                                        configuration_);
-    registerUpdateListener(agent, agent->getHeartBestDelay());
+    registerUpdateListener(agent, agent->getHeartBeatDelay());
 
-    state::StateManager::startMetrics(agent->getHeartBestDelay());
+    state::StateManager::startMetrics(agent->getHeartBeatDelay());
   }
   if (!c2_enabled_) {
     return;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/src/c2/C2Agent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index ae1629f..a8cc5b2 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -24,6 +24,7 @@
 #include <map>
 #include <string>
 #include <memory>
+#include "c2/ControllerSocketProtocol.h"
 #include "core/state/UpdateController.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
@@ -140,11 +141,21 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf
         logger_->log_debug("Could not instantiate %s", reporter);
       } else {
         std::shared_ptr<HeartBeatReporter> shp_reporter = std::static_pointer_cast<HeartBeatReporter>(heartbeat_reporter_obj);
-        shp_reporter->initialize(controller_, configuration_);
+        shp_reporter->initialize(controller_, update_sink_, configuration_);
         heartbeat_protocols_.push_back(shp_reporter);
       }
     }
   }
+
+  auto base_reporter = "ControllerSocketProtocol";
+  auto heartbeat_reporter_obj = core::ClassLoader::getDefaultClassLoader().instantiate(base_reporter, base_reporter);
+  if (heartbeat_reporter_obj == nullptr) {
+    logger_->log_debug("Could not instantiate %s", base_reporter);
+  } else {
+    std::shared_ptr<HeartBeatReporter> shp_reporter = std::static_pointer_cast<HeartBeatReporter>(heartbeat_reporter_obj);
+    shp_reporter->initialize(controller_, update_sink_, configuration_);
+    heartbeat_protocols_.push_back(shp_reporter);
+  }
 }
 
 void C2Agent::performHeartBeat() {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/src/c2/ControllerSocketProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp
new file mode 100644
index 0000000..22e8696
--- /dev/null
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -0,0 +1,259 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "c2/ControllerSocketProtocol.h"
+#include "utils/StringUtils.h"
+#include "io/DescriptorStream.h"
+#include <utility>
+#include <memory>
+#include <vector>
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+void ControllerSocketProtocol::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+                                          const std::shared_ptr<Configure> &configuration) {
+  HeartBeatReporter::initialize(controller, updateSink, configuration);
+  stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configuration);
+
+  std::string host = "localhost", port, limitStr;
+  bool anyInterface = false;
+  if (configuration_->get("controller.socket.local.any.interface", limitStr)) {
+    utils::StringUtils::StringToBool(limitStr, anyInterface);
+  }
+
+  // if host name isn't defined we will use localhost
+  configuration_->get("controller.socket.host", host);
+
+  if (nullptr != configuration_ && configuration_->get("controller.socket.port", port)) {
+    server_socket_ = std::unique_ptr<io::ServerSocket>(new io::ServerSocket(nullptr, host, std::stoi(port), 2));
+    // if we have a localhost hostname and we did not manually specify any.interface we will
+    // bind only to the loopback adapter
+    if ((host == "localhost" || host == "127.0.0.1" || host == "::") && !anyInterface) {
+      server_socket_->initialize(true);
+    } else {
+      server_socket_->initialize(true);
+    }
+
+    auto check = [this]() -> bool {
+      return update_sink_->isRunning();
+    };
+
+    auto handler = [this](int fd) {
+      uint8_t head;
+      io::DescriptorStream stream(fd);
+      if (stream.read(head) != 1) {
+        logger_->log_debug("Connection broke with fd %d", fd);
+        return;
+      }
+      switch (head) {
+        case Operation::START:
+        {
+          std::string componentStr;
+          int size = stream.readUTF(componentStr);
+          if ( size != -1 ) {
+            auto components = update_sink_->getComponents(componentStr);
+            for (auto component : components) {
+              component->start();
+            }
+          } else {
+            logger_->log_debug("Connection broke with fd %d", fd);
+          }
+        }
+        break;
+        case Operation::STOP:
+        {
+          std::string componentStr;
+          int size = stream.readUTF(componentStr);
+          if ( size != -1 ) {
+            auto components = update_sink_->getComponents(componentStr);
+            for (auto component : components) {
+              component->stop(true, 1000);
+            }
+          } else {
+            logger_->log_debug("Connection broke with fd %d", fd);
+          }
+        }
+        break;
+        case Operation::CLEAR:
+        {
+          std::string connection;
+          int size = stream.readUTF(connection);
+          if ( size != -1 ) {
+            update_sink_->clearConnection(connection);
+          }
+        }
+        break;
+        case Operation::UPDATE:
+        {
+          std::string what;
+          int size = stream.readUTF(what);
+          if (size == -1) {
+            logger_->log_debug("Connection broke with fd %d", fd);
+            break;
+          }
+          if (what == "flow") {
+            std::string ff_loc;
+            int size = stream.readUTF(ff_loc);
+            std::ifstream tf(ff_loc);
+            std::string configuration((std::istreambuf_iterator<char>(tf)),
+                std::istreambuf_iterator<char>());
+            if (size == -1) {
+              logger_->log_debug("Connection broke with fd %d", fd);
+              break;
+            }
+            update_sink_->applyUpdate(configuration);
+          }
+        }
+        break;
+        case Operation::DESCRIBE:
+        {
+          std::string what;
+          int size = stream.readUTF(what);
+          if (size == -1) {
+            logger_->log_debug("Connection broke with fd %d", fd);
+            break;
+          }
+          if (what == "queue") {
+            std::string connection;
+            int size = stream.readUTF(connection);
+            if (size == -1) {
+              logger_->log_debug("Connection broke with fd %d", fd);
+              break;
+            }
+            std::stringstream response;
+            {
+              std::lock_guard<std::mutex> lock(controller_mutex_);
+              response << queue_size_[connection] << " / " << queue_max_[connection];
+            }
+            io::BaseStream resp;
+            resp.writeData(&head, 1);
+            resp.writeUTF(response.str());
+            write(fd, resp.getBuffer(), resp.getSize());
+          } else if (what == "components") {
+            io::BaseStream resp;
+            resp.writeData(&head, 1);
+            uint16_t size = update_sink_->getAllComponents().size();
+            resp.write(size);
+            for (const auto &component : update_sink_->getAllComponents()) {
+              resp.writeUTF(component->getComponentName());
+            }
+            write(fd, resp.getBuffer(), resp.getSize());
+          } else if (what == "connections") {
+            io::BaseStream resp;
+            resp.writeData(&head, 1);
+            uint16_t size = queue_full_.size();
+            resp.write(size);
+            for (const auto &connection : queue_full_) {
+              resp.writeUTF(connection.first, false);
+            }
+            write(fd, resp.getBuffer(), resp.getSize());
+          } else if (what == "getfull") {
+            std::vector<std::string> full_connections;
+            {
+              std::lock_guard<std::mutex> lock(controller_mutex_);
+              for (auto conn : queue_full_) {
+                if (conn.second == true) {
+                  full_connections.push_back(conn.first);
+                }
+              }
+            }
+            io::BaseStream resp;
+            resp.writeData(&head, 1);
+            uint16_t full_connection_count = full_connections.size();
+            resp.write(full_connection_count);
+            for (auto conn : full_connections) {
+              resp.writeUTF(conn);
+            }
+            write(fd, resp.getBuffer(), resp.getSize());
+          }
+        }
+        break;
+      }
+    };
+    server_socket_->registerCallback(check, handler);
+  } else {
+    server_socket_ = nullptr;
+  }
+}
+
+void ControllerSocketProtocol::parse_content(const std::vector<C2ContentResponse> &content) {
+  for (const auto &payload_content : content) {
+    if (payload_content.name == "Components") {
+      for (auto content : payload_content.operation_arguments) {
+        bool is_enabled = false;
+        minifi::utils::StringUtils::StringToBool(content.second, is_enabled);
+        std::lock_guard<std::mutex> lock(controller_mutex_);
+        component_map_[content.first] = is_enabled;
+      }
+    }
+  }
+}
+
+int16_t ControllerSocketProtocol::heartbeat(const C2Payload &payload) {
+  if (server_socket_ == nullptr)
+    return 0;
+  const std::vector<C2ContentResponse> &content = payload.getContent();
+  for (const auto pc : payload.getNestedPayloads()) {
+    if (pc.getLabel() == "metrics") {
+      for (const auto metrics_payload : pc.getNestedPayloads()) {
+        if (metrics_payload.getLabel() == "QueueMetrics") {
+          for (const auto queue_metrics : metrics_payload.getNestedPayloads()) {
+            auto metric_content = queue_metrics.getContent();
+            for (const auto &payload_content : queue_metrics.getContent()) {
+              uint64_t size = 0;
+              uint64_t max = 0;
+              for (auto content : payload_content.operation_arguments) {
+                if (content.first == "datasize") {
+                  size = std::stol(content.second);
+                } else if (content.first == "datasizemax") {
+                  max = std::stol(content.second);
+                }
+              }
+              std::lock_guard<std::mutex> lock(controller_mutex_);
+              if (size >= max) {
+                queue_full_[payload_content.name] = true;
+              } else {
+                queue_full_[payload_content.name] = false;
+              }
+              queue_size_[payload_content.name] = size;
+              queue_max_[payload_content.name] = max;
+            }
+          }
+        }
+      }
+    }
+  }
+
+  parse_content(content);
+
+  std::vector<uint8_t> buffer;
+  buffer.resize(1024);
+
+  return 0;
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/src/io/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index 39fc982..d98c4ed 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -45,6 +45,7 @@ Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string
       addr_info_(0),
       socket_file_descriptor_(-1),
       socket_max_(0),
+      is_loopback_only_(false),
       listeners_(listeners),
       canonical_hostname_(""),
       logger_(logging::LoggerFactory<Socket>::getLogger()) {
@@ -59,6 +60,7 @@ Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string
 Socket::Socket(const Socket &&other)
     : requested_hostname_(std::move(other.requested_hostname_)),
       port_(std::move(other.port_)),
+      is_loopback_only_(false),
       addr_info_(std::move(other.addr_info_)),
       socket_file_descriptor_(other.socket_file_descriptor_),
       socket_max_(other.socket_max_.load()),
@@ -79,7 +81,7 @@ void Socket::closeStream() {
     addr_info_ = 0;
   }
   if (socket_file_descriptor_ >= 0) {
-    logging::LOG_INFO(logger_) <<  "Closing " << socket_file_descriptor_;
+    logging::LOG_DEBUG(logger_) << "Closing " << socket_file_descriptor_;
     close(socket_file_descriptor_);
     socket_file_descriptor_ = -1;
   }
@@ -109,7 +111,11 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
     struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
     sa_loc->sin_family = AF_INET;
     sa_loc->sin_port = htons(port_);
-    sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
+    if (is_loopback_only_) {
+      sa_loc->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+    } else {
+      sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
+    }
     if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
       logger_->log_error("Could not bind to socket", strerror(errno));
       return -1;
@@ -123,7 +129,11 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
       // use any address if you are connecting to the local machine for testing
       // otherwise we must use the requested hostname
       if (IsNullOrEmpty(requested_hostname_) || requested_hostname_ == "localhost") {
-        sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
+        if (is_loopback_only_) {
+          sa_loc->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+        } else {
+          sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
+        }
       } else {
         sa_loc->sin_addr.s_addr = addr;
       }
@@ -139,6 +149,8 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
   if (listeners_ > 0) {
     if (listen(socket_file_descriptor_, listeners_) == -1) {
       return -1;
+    } else {
+      logger_->log_debug("Created connection with %d listeners", listeners_);
     }
   }
   // add the listener to the total set
@@ -209,7 +221,6 @@ int16_t Socket::select_descriptor(const uint16_t msec) {
   }
 
   struct timeval tv;
-  int retval;
 
   read_fds_ = total_list_;
 
@@ -219,14 +230,9 @@ int16_t Socket::select_descriptor(const uint16_t msec) {
   std::lock_guard<std::recursive_mutex> guard(selection_mutex_);
 
   if (msec > 0)
-    retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv);
+    select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv);
   else
-    retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL);
-
-  if (retval < 0) {
-    logger_->log_error("Saw error during selection, error:%i %s", retval, strerror(errno));
-    return retval;
-  }
+    select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL);
 
   for (int i = 0; i <= socket_max_; i++) {
     if (FD_ISSET(i, &read_fds_)) {
@@ -252,7 +258,7 @@ int16_t Socket::select_descriptor(const uint16_t msec) {
     }
   }
 
-  logger_->log_error("Could not find a suitable file descriptor");
+  logger_->log_debug("Could not find a suitable file descriptor or select timed out");
 
   return -1;
 }
@@ -390,8 +396,10 @@ int Socket::readData(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
   while (buflen) {
     int16_t fd = select_descriptor(1000);
     if (fd < 0) {
-      logger_->log_debug("fd %d close %i", fd, buflen);
-      close(socket_file_descriptor_);
+      if (listeners_ <= 0) {
+        logger_->log_debug("fd %d close %i", fd, buflen);
+        close(socket_file_descriptor_);
+      }
       return -1;
     }
     int bytes_read = recv(fd, buf, buflen, 0);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/src/io/DescriptorStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/DescriptorStream.cpp b/libminifi/src/io/DescriptorStream.cpp
new file mode 100644
index 0000000..d50a39f
--- /dev/null
+++ b/libminifi/src/io/DescriptorStream.cpp
@@ -0,0 +1,196 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "io/DescriptorStream.h"
+#include <fstream>
+#include <unistd.h>
+#include <vector>
+#include <memory>
+#include <string>
+#include "io/validation.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+DescriptorStream::DescriptorStream(int fd)
+    : fd_(fd),
+      logger_(logging::LoggerFactory<DescriptorStream>::getLogger()) {
+}
+
+void DescriptorStream::seek(uint64_t offset) {
+  std::lock_guard<std::recursive_mutex> lock(file_lock_);
+  lseek(fd_, offset, 0x00);
+}
+
+int DescriptorStream::writeData(std::vector<uint8_t> &buf, int buflen) {
+  if (static_cast<int>(buf.capacity()) < buflen) {
+    return -1;
+  }
+  return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen);
+}
+
+// data stream overrides
+
+int DescriptorStream::writeData(uint8_t *value, int size) {
+  if (!IsNullOrEmpty(value)) {
+    std::lock_guard<std::recursive_mutex> lock(file_lock_);
+    if (::write(fd_, value, size) != size) {
+      return -1;
+    } else {
+      return size;
+    }
+  } else {
+    return -1;
+  }
+}
+
+template<typename T>
+inline std::vector<uint8_t> DescriptorStream::readBuffer(const T& t) {
+  std::vector<uint8_t> buf;
+  buf.resize(sizeof t);
+  readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+  return buf;
+}
+
+int DescriptorStream::readData(std::vector<uint8_t> &buf, int buflen) {
+  if (static_cast<int>(buf.capacity()) < buflen) {
+    buf.resize(buflen);
+  }
+  int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen);
+
+  if (ret < buflen) {
+    buf.resize(ret);
+  }
+  return ret;
+}
+
+int DescriptorStream::readData(uint8_t *buf, int buflen) {
+  if (!IsNullOrEmpty(buf)) {
+    auto size_read = ::read(fd_, buf, buflen);
+
+    if (size_read != buflen) {
+      return -1;
+    } else {
+      return buflen;
+    }
+
+  } else {
+    return -1;
+  }
+}
+
+/**
+ * reads a byte from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int DescriptorStream::read(uint8_t &value) {
+  return Serializable::read(value, reinterpret_cast<DataStream*>(this));
+}
+
+/**
+ * reads two bytes from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int DescriptorStream::read(uint16_t &base_value, bool is_little_endian) {
+  auto buf = readBuffer(base_value);
+  if (is_little_endian) {
+    base_value = (buf[0] << 8) | buf[1];
+  } else {
+    base_value = buf[0] | buf[1] << 8;
+  }
+  return 2;
+}
+
+/**
+ * reads a byte from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int DescriptorStream::read(char &value) {
+  return readData(reinterpret_cast<uint8_t*>(&value), 1);
+}
+
+/**
+ * reads a byte array from the stream
+ * @param value reference in which will set the result
+ * @param len length to read
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int DescriptorStream::read(uint8_t *value, int len) {
+  return readData(value, len);
+}
+
+/**
+ * reads four bytes from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int DescriptorStream::read(uint32_t &value, bool is_little_endian) {
+  auto buf = readBuffer(value);
+  if (is_little_endian) {
+    value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
+  } else {
+    value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
+  }
+  return 4;
+}
+
+/**
+ * reads eight byte from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int DescriptorStream::read(uint64_t &value, bool is_little_endian) {
+  auto buf = readBuffer(value);
+
+  if (is_little_endian) {
+    value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24)
+        | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
+  } else {
+    value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | ((uint64_t) (buf[4] & 255) << 32)
+        | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56);
+  }
+  return 8;
+}
+
+/**
+ * read UTF from stream
+ * @param str reference string
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int DescriptorStream::readUTF(std::string &str, bool widen) {
+  return Serializable::readUTF(str, reinterpret_cast<DataStream*>(this), widen);
+}
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/src/io/ServerSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ServerSocket.cpp b/libminifi/src/io/ServerSocket.cpp
new file mode 100644
index 0000000..1a72a0f
--- /dev/null
+++ b/libminifi/src/io/ServerSocket.cpp
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "io/ServerSocket.h"
+#include <netinet/tcp.h>
+#include <sys/types.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+#include <cstdio>
+#include <memory>
+#include <utility>
+#include <vector>
+#include <cerrno>
+#include <iostream>
+#include <string>
+#include "io/validation.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+ServerSocket::ServerSocket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners = -1)
+    : Socket(context, hostname, port, listeners),
+      running_(true),
+      logger_(logging::LoggerFactory<ServerSocket>::getLogger()) {
+}
+
+ServerSocket::~ServerSocket() {
+  running_ = false;
+  if (server_read_thread_.joinable())
+    server_read_thread_.join();
+}
+
+/**
+ * Initializes the socket
+ * @return result of the creation operation.
+ */
+void ServerSocket::registerCallback(std::function<bool()> accept_function, std::function<void(int)> handler) {
+  auto fx = [this](std::function<bool()> accept_function, std::function<void(int)> handler) {
+    while (running_) {
+      int fd = select_descriptor(1000);
+      if (fd >= 0) {
+        handler(fd);
+        close_fd(fd);
+      }
+    }
+  };
+  server_read_thread_ = std::thread(fx, accept_function, handler);
+}
+
+void ServerSocket::close_fd(int fd) {
+  std::lock_guard<std::recursive_mutex> guard(selection_mutex_);
+  close(fd);
+  FD_CLR(fd, &total_list_);
+}
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/test/integration/ProvenanceReportingTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp
index 7db235f..0e8d52f 100644
--- a/libminifi/test/integration/ProvenanceReportingTest.cpp
+++ b/libminifi/test/integration/ProvenanceReportingTest.cpp
@@ -73,7 +73,7 @@ int main(int argc, char **argv) {
   std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
   ptr.release();
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 10005, 1);
+  org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 10005, 1);
 
   controller->load();
   controller->start();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/test/unit/ControllerTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp
new file mode 100644
index 0000000..c761b6e
--- /dev/null
+++ b/libminifi/test/unit/ControllerTests.cpp
@@ -0,0 +1,262 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <uuid/uuid.h>
+#include <vector>
+#include <memory>
+#include <utility>
+#include <string>
+#include "../TestBase.h"
+#include "io/ClientSocket.h"
+#include "core/Processor.h"
+#include "../../controller/Controller.h"
+#include "c2/ControllerSocketProtocol.h"
+
+#include "state/UpdateController.h"
+
+class TestStateController : public minifi::state::StateController {
+ public:
+  TestStateController()
+      : is_running(false) {
+  }
+  virtual ~TestStateController() {
+  }
+
+  virtual std::string getComponentName() {
+    return "TestStateController";
+  }
+  /**
+   * Start the client
+   */
+  virtual int16_t start() {
+    is_running = true;
+    return 0;
+  }
+  /**
+   * Stop the client
+   */
+  virtual int16_t stop(bool force, uint64_t timeToWait = 0) {
+    is_running = false;
+    return 0;
+  }
+
+  virtual bool isRunning() {
+    return is_running;
+  }
+
+  virtual int16_t pause() {
+    return 0;
+  }
+
+  std::atomic<bool> is_running;
+};
+
+class TestUpdateSink : public minifi::state::StateMonitor {
+ public:
+  explicit TestUpdateSink(std::shared_ptr<StateController> controller)
+      : is_running(true),
+        clear_calls(0),
+        controller(controller),
+        update_calls(0) {
+  }
+  virtual std::vector<std::shared_ptr<StateController>> getComponents(const std::string &name) {
+    std::vector<std::shared_ptr<StateController>> vec;
+    vec.push_back(controller);
+    return vec;
+  }
+
+  virtual std::vector<std::shared_ptr<StateController>> getAllComponents() {
+    std::vector<std::shared_ptr<StateController>> vec;
+    vec.push_back(controller);
+    return vec;
+  }
+
+  virtual std::string getComponentName() {
+    return "TestUpdateSink";
+  }
+  /**
+   * Start the client
+   */
+  virtual int16_t start() {
+    is_running = true;
+    return 0;
+  }
+  /**
+   * Stop the client
+   */
+  virtual int16_t stop(bool force, uint64_t timeToWait = 0) {
+    is_running = false;
+    return 0;
+  }
+
+  virtual bool isRunning() {
+    return is_running;
+  }
+
+  virtual int16_t pause() {
+    return 0;
+  }
+
+  /**
+   * Operational controllers
+   */
+
+  /**
+   * Drain repositories
+   */
+  virtual int16_t drainRepositories() {
+    return 0;
+  }
+
+  /**
+   * Clear connection for the agent.
+   */
+  virtual int16_t clearConnection(const std::string &connection) {
+    clear_calls++;
+    return 0;
+  }
+
+  /**
+   * Apply an update with the provided string.
+   *
+   * < 0 is an error code
+   * 0 is success
+   */
+  virtual int16_t applyUpdate(const std::string &configuration) {
+    update_calls++;
+    return 0;
+  }
+
+  /**
+   * Apply an update that the agent must decode. This is useful for certain operations
+   * that can't be encapsulated within these definitions.
+   */
+  virtual int16_t applyUpdate(const std::shared_ptr<minifi::state::Update> &updateController) {
+    return 0;
+  }
+
+  /**
+   * Returns uptime for this module.
+   * @return uptime for the current state monitor.
+   */
+  virtual uint64_t getUptime() {
+    return 8765309;
+  }
+
+  std::atomic<bool> is_running;
+  std::atomic<uint32_t> clear_calls;
+  std::shared_ptr<StateController> controller;
+  std::atomic<uint32_t> update_calls;
+};
+
+TEST_CASE("TestGet", "[test1]") {
+  auto controller = std::make_shared<TestStateController>();
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  configuration->set("controller.socket.host", "localhost");
+  configuration->set("controller.socket.port", "9997");
+  auto ptr = std::make_shared<TestUpdateSink>(controller);
+  minifi::c2::ControllerSocketProtocol protocol("testprotocol");
+  protocol.initialize(nullptr, ptr, configuration);
+
+  auto stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
+
+  auto socket = stream_factory->createSocket("localhost", 9997);
+
+  startComponent(std::move(socket), "TestStateController");
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(500));
+
+  REQUIRE(controller->isRunning() == true);
+
+  socket = stream_factory->createSocket("localhost", 9997);
+
+  stopComponent(std::move(socket), "TestStateController");
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(500));
+  REQUIRE(controller->isRunning() == false);
+
+  socket = stream_factory->createSocket("localhost", 9997);
+  std::stringstream ss;
+  listComponents(std::move(socket), ss);
+
+  REQUIRE(ss.str().find("TestStateController") != std::string::npos);
+}
+
+TEST_CASE("TestClear", "[test1]") {
+  auto controller = std::make_shared<TestStateController>();
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  configuration->set("controller.socket.host", "localhost");
+  configuration->set("controller.socket.port", "9997");
+  auto ptr = std::make_shared<TestUpdateSink>(controller);
+  minifi::c2::ControllerSocketProtocol protocol("testprotocol");
+  protocol.initialize(nullptr, ptr, configuration);
+
+  auto stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
+
+  auto socket = stream_factory->createSocket("localhost", 9997);
+
+  startComponent(std::move(socket), "TestStateController");
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(500));
+
+  REQUIRE(controller->isRunning() == true);
+
+  socket = stream_factory->createSocket("localhost", 9997);
+
+  clearConnection(std::move(socket), "connection");
+
+  socket = stream_factory->createSocket("localhost", 9997);
+
+  clearConnection(std::move(socket), "connection");
+
+  socket = stream_factory->createSocket("localhost", 9997);
+
+  clearConnection(std::move(socket), "connection");
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(500));
+
+  REQUIRE(3 == ptr->clear_calls);
+}
+
+TEST_CASE("TestUpdate", "[test1]") {
+  auto controller = std::make_shared<TestStateController>();
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  configuration->set("controller.socket.host", "localhost");
+  configuration->set("controller.socket.port", "9997");
+  auto ptr = std::make_shared<TestUpdateSink>(controller);
+  minifi::c2::ControllerSocketProtocol protocol("testprotocol");
+  protocol.initialize(nullptr, ptr, configuration);
+
+  auto stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
+
+  auto socket = stream_factory->createSocket("localhost", 9997);
+
+  startComponent(std::move(socket), "TestStateController");
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(500));
+
+  REQUIRE(controller->isRunning() == true);
+
+  std::stringstream ss;
+
+  socket = stream_factory->createSocket("localhost", 9997);
+
+  updateFlow(std::move(socket), ss, "connection");
+  std::this_thread::sleep_for(std::chrono::milliseconds(500));
+  REQUIRE(1 == ptr->update_calls);
+  REQUIRE(0 == ptr->clear_calls);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/test/unit/GetTCPTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/GetTCPTests.cpp b/libminifi/test/unit/GetTCPTests.cpp
index 60db1ee..87121dc 100644
--- a/libminifi/test/unit/GetTCPTests.cpp
+++ b/libminifi/test/unit/GetTCPTests.cpp
@@ -47,7 +47,7 @@ TEST_CASE("GetTCPWithoutEOM", "[GetTCP1]") {
 
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
   std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory = std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>());
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9184, 1);
+  org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 9184, 1);
 
   REQUIRE(-1 != server.initialize());
 
@@ -158,7 +158,7 @@ TEST_CASE("GetTCPWithOEM", "[GetTCP2]") {
 
   TestController testController;
 
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9182, 1);
+  org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 9182, 1);
 
   REQUIRE(-1 != server.initialize());
 
@@ -284,7 +284,7 @@ TEST_CASE("GetTCPWithOnlyOEM", "[GetTCP3]") {
 
   LogTestController::getInstance().setDebug<minifi::io::Socket>();
 
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9182, 1);
+  org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 9182, 1);
 
   REQUIRE(-1 != server.initialize());
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/test/unit/SocketTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp
index 3e2760e..a4785f9 100644
--- a/libminifi/test/unit/SocketTests.cpp
+++ b/libminifi/test/unit/SocketTests.cpp
@@ -26,6 +26,7 @@
 #include "../TestBase.h"
 #include "io/StreamFactory.h"
 #include "io/ClientSocket.h"
+#include "io/ServerSocket.h"
 #include "io/tls/TLSSocket.h"
 #include "utils/ThreadPool.h"
 
@@ -55,7 +56,7 @@ TEST_CASE("TestSocketWriteTest2", "[TestSocket3]") {
   buffer.push_back('a');
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
 
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9183, 1);
+  org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 9183, 1);
 
   REQUIRE(-1 != server.initialize());
 
@@ -86,7 +87,7 @@ TEST_CASE("TestWriteEndian64", "[TestSocket4]") {
   buffer.push_back('a');
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
 
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9183, 1);
+  org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 9183, 1);
 
   REQUIRE(-1 != server.initialize());
 
@@ -113,7 +114,7 @@ TEST_CASE("TestWriteEndian32", "[TestSocket5]") {
 
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
 
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9183, 1);
+  org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 9183, 1);
   REQUIRE(-1 != server.initialize());
 
   org::apache::nifi::minifi::io::Socket client(socket_context, "localhost", 9183);
@@ -148,7 +149,7 @@ TEST_CASE("TestSocketWriteTestAfterClose", "[TestSocket6]") {
 
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
 
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9183, 1);
+  org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 9183, 1);
 
   REQUIRE(-1 != server.initialize());