You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/01/12 17:57:51 UTC

[1/2] nifi-minifi-cpp git commit: MINIFICPP-37: Create an executable to support basic localized devops operations.

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master a6c7a9f7b -> edc8858f0


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/main/Main.h
----------------------------------------------------------------------
diff --git a/main/Main.h b/main/Main.h
new file mode 100644
index 0000000..016673e
--- /dev/null
+++ b/main/Main.h
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef MAIN_MAIN_H_
+#define MAIN_MAIN_H_
+
+
+//! Main thread sleep interval 1 second
+#define SLEEP_INTERVAL 1
+//! Main thread stop wait time
+#define STOP_WAIT_TIME_MS 30*1000
+//! Default YAML location
+#define DEFAULT_NIFI_CONFIG_YML "./conf/config.yml"
+//! Default properties file paths
+#define DEFAULT_NIFI_PROPERTIES_FILE "./conf/minifi.properties"
+#define DEFAULT_LOG_PROPERTIES_FILE "./conf/minifi-log.properties"
+#define DEFAULT_UID_PROPERTIES_FILE "./conf/minifi-uid.properties"
+//! Define home environment variable
+#define MINIFI_HOME_ENV_KEY "MINIFI_HOME"
+
+/* Define Parser Values for Configuration YAML sections */
+#define CONFIG_YAML_PROCESSORS_KEY "Processors"
+#define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller"
+#define CONFIG_YAML_CONNECTIONS_KEY "Connections"
+#define CONFIG_YAML_REMOTE_PROCESSING_GROUPS_KEY "Remote Processing Groups"
+
+
+/**
+ * Validates a MINIFI_HOME value.
+ * @param home_path
+ * @return true if home_path represents a valid MINIFI_HOME
+ */
+bool validHome(const std::string &home_path) {
+  struct stat stat_result { };
+  auto properties_file_path = home_path + "/" + DEFAULT_NIFI_PROPERTIES_FILE;
+  return (stat(properties_file_path.c_str(), &stat_result) == 0);
+}
+
+
+
+
+
+#endif /* MAIN_MAIN_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/main/MiNiFiMain.cpp
----------------------------------------------------------------------
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index 2bf4ac6..7ebe972 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -34,26 +34,7 @@
 #include "core/ConfigurationFactory.h"
 #include "core/RepositoryFactory.h"
 #include "FlowController.h"
-
-//! Main thread sleep interval 1 second
-#define SLEEP_INTERVAL 1
-//! Main thread stop wait time
-#define STOP_WAIT_TIME_MS 30*1000
-//! Default YAML location
-#define DEFAULT_NIFI_CONFIG_YML "./conf/config.yml"
-//! Default properties file paths
-#define DEFAULT_NIFI_PROPERTIES_FILE "./conf/minifi.properties"
-#define DEFAULT_LOG_PROPERTIES_FILE "./conf/minifi-log.properties"
-#define DEFAULT_UID_PROPERTIES_FILE "./conf/minifi-uid.properties"
-//! Define home environment variable
-#define MINIFI_HOME_ENV_KEY "MINIFI_HOME"
-
-/* Define Parser Values for Configuration YAML sections */
-#define CONFIG_YAML_PROCESSORS_KEY "Processors"
-#define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller"
-#define CONFIG_YAML_CONNECTIONS_KEY "Connections"
-#define CONFIG_YAML_REMOTE_PROCESSING_GROUPS_KEY "Remote Processing Groups"
-
+#include "Main.h"
 // Variables that allow us to avoid a timed wait.
 sem_t *running;
 //! Flow Controller
@@ -75,17 +56,6 @@ void sigHandler(int signal) {
   }
 }
 
-/**
- * Validates a MINIFI_HOME value.
- * @param home_path
- * @return true if home_path represents a valid MINIFI_HOME
- */
-bool validHome(const std::string &home_path) {
-  struct stat stat_result { };
-  auto properties_file_path = home_path + "/" + DEFAULT_NIFI_PROPERTIES_FILE;
-  return (stat(properties_file_path.c_str(), &stat_result) == 0);
-}
-
 int main(int argc, char **argv) {
   std::shared_ptr<logging::Logger> logger = logging::LoggerConfiguration::getConfiguration().getLogger("main");
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/thirdparty/cxxopts/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/thirdparty/cxxopts/CHANGELOG.md b/thirdparty/cxxopts/CHANGELOG.md
new file mode 100644
index 0000000..01b67b6
--- /dev/null
+++ b/thirdparty/cxxopts/CHANGELOG.md
@@ -0,0 +1,45 @@
+# Changelog
+
+This is the changelog for `cxxopts`, a C++11 library for parsing command line
+options. The project adheres to semantic versioning.
+
+## 2.1
+
+### Changed
+
+* Options with implicit arguments now require the `--option=value` form if
+  they are to be specified with an option. This is to remove the ambiguity
+  when a positional argument could follow an option with an implicit value.
+  For example, `--foo value`, where `foo` has an implicit value, will be
+  parsed as `--foo=implicit` and a positional argument `value`.
+* Fixed an ambiguous overload in the `parse_positional` function when an
+  `initializer_list` was directly passed.
+
+### Bug Fixes
+
+* Building against GCC 4.9 was broken due to overly strict shadow warnings.
+
+## 2.0
+
+### Changed
+
+* `Options::parse` returns a ParseResult rather than storing the parse
+  result internally.
+* Options with default values now get counted as appearing once if they
+  were not specified by the user.
+
+### Added
+
+* A new `ParseResult` object that is the immutable result of parsing. It
+  responds to the same `count` and `operator[]` as `Options` of 1.x did.
+* The function `ParseResult::arguments` returns a vector of the parsed
+  arguments to iterate through in the order they were provided.
+* The symbol `cxxopts::version` for the version of the library.
+* Booleans can be specified with various strings and explicitly set false.
+
+## 1.x
+
+The 1.x series was the first major version of the library, with release numbers
+starting to follow semantic versioning, after 0.x being unstable.  It never had
+a changelog maintained for it. Releases mostly contained bug fixes, with the
+occasional feature added.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/thirdparty/cxxopts/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/thirdparty/cxxopts/CMakeLists.txt b/thirdparty/cxxopts/CMakeLists.txt
new file mode 100644
index 0000000..192b529
--- /dev/null
+++ b/thirdparty/cxxopts/CMakeLists.txt
@@ -0,0 +1,85 @@
+# Copyright (c) 2014 Jarryd Beck
+# 
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+# 
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+# 
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+cmake_minimum_required(VERSION 3.1)
+project(cxxopts)
+
+
+set(VERSION "1.2.0")
+
+option(CXXOPTS_BUILD_EXAMPLES "Set to ON to build examples" ON)
+
+# request c++11 without gnu extension for the whole project and enable more warnings
+set(CMAKE_CXX_STANDARD   11)
+set(CMAKE_CXX_EXTENSIONS OFF)
+if(MSVC)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W2")
+elseif(CMAKE_CXX_COMPILER_ID MATCHES "[Cc]lang" OR CMAKE_CXX_COMPILER_ID MATCHES "GNU")
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Werror -Wextra -Wshadow")
+endif()
+
+add_library(cxxopts INTERFACE)
+
+# optionally, enable unicode support using the ICU library
+set(CXXOPTS_USE_UNICODE_HELP FALSE CACHE BOOL "Use ICU Unicode library")
+if(CXXOPTS_USE_UNICODE_HELP)
+    find_package(PkgConfig)
+    pkg_check_modules(ICU REQUIRED icu-uc)
+
+    target_link_libraries(cxxopts INTERFACE ${ICU_LDFLAGS})
+    target_compile_options(cxxopts INTERFACE ${ICU_CFLAGS})
+    target_compile_definitions(cxxopts INTERFACE CXXOPTS_USE_UNICODE)
+endif()
+
+target_include_directories(cxxopts INTERFACE
+    $<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/include>
+    $<INSTALL_INTERFACE:include>
+    )
+
+include(CMakePackageConfigHelpers)
+set(CXXOPTS_CMAKE_DIR "lib/cmake/cxxopts" CACHE STRING
+  "Installation directory for cmake files, relative to ${CMAKE_INSTALL_PREFIX}.")
+set(version_config "${PROJECT_BINARY_DIR}/cxxopts-config-version.cmake")
+set(project_config "${PROJECT_BINARY_DIR}/cxxopts-config.cmake")
+set(targets_export_name cxxopts-targets)
+
+# Generate the version, config and target files into the build directory.
+write_basic_package_version_file(
+    ${version_config}
+    VERSION ${VERSION}
+    COMPATIBILITY AnyNewerVersion)
+configure_package_config_file(
+    ${PROJECT_SOURCE_DIR}/cxxopts-config.cmake.in
+    ${project_config}
+    INSTALL_DESTINATION ${CXXOPTS_CMAKE_DIR})
+export(TARGETS cxxopts NAMESPACE cxxopts::
+    FILE ${PROJECT_BINARY_DIR}/${targets_export_name}.cmake)
+
+# Install version, config and target files.
+install(
+    FILES ${project_config} ${version_config}
+    DESTINATION ${CXXOPTS_CMAKE_DIR})
+install(EXPORT ${targets_export_name} DESTINATION ${CXXOPTS_CMAKE_DIR}
+    NAMESPACE cxxopts::)
+
+# Install the header file and export the target
+install(TARGETS cxxopts EXPORT ${targets_export_name} DESTINATION lib)
+install(FILES ${PROJECT_SOURCE_DIR}/include/cxxopts.hpp DESTINATION include)
+
+add_subdirectory(src)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/thirdparty/cxxopts/INSTALL
----------------------------------------------------------------------
diff --git a/thirdparty/cxxopts/INSTALL b/thirdparty/cxxopts/INSTALL
new file mode 100644
index 0000000..e5f8b39
--- /dev/null
+++ b/thirdparty/cxxopts/INSTALL
@@ -0,0 +1,10 @@
+It is preferable to build out of source.
+
+cmake ${CXXOPTS_DIR}
+make
+
+
+You can use another build tool, such as ninja.
+
+cmake -G Ninja ${CXXOPTS_DIR}
+ninja

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/thirdparty/cxxopts/LICENSE
----------------------------------------------------------------------
diff --git a/thirdparty/cxxopts/LICENSE b/thirdparty/cxxopts/LICENSE
new file mode 100644
index 0000000..324a203
--- /dev/null
+++ b/thirdparty/cxxopts/LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2014 Jarryd Beck
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/thirdparty/cxxopts/README.md
----------------------------------------------------------------------
diff --git a/thirdparty/cxxopts/README.md b/thirdparty/cxxopts/README.md
new file mode 100644
index 0000000..c5c171b
--- /dev/null
+++ b/thirdparty/cxxopts/README.md
@@ -0,0 +1,119 @@
+[![Build Status](https://travis-ci.org/jarro2783/cxxopts.svg?branch=master)](https://travis-ci.org/jarro2783/cxxopts)
+
+# Quick start
+
+This is a lightweight C++ option parser library, supporting the standard GNU
+style syntax for options.
+
+Options can be given as:
+
+    --long
+    --long=argument
+    --long argument
+    -a
+    -ab
+    -abc argument
+
+where c takes an argument, but a and b do not.
+
+Additionally, anything after `--` will be parsed as a positional argument.
+
+## Basics
+
+    #include <cxxopts.hpp>
+
+Create a cxxopts::Options instance.
+
+    cxxopts::Options options("MyProgram", "One line description of MyProgram");
+
+Then use `add_options`.
+
+    options.add_options()
+      ("d,debug", "Enable debugging")
+      ("f,file", "File name", cxxopts::value<std::string>())
+      ;
+
+Options are declared with a long and an optional short option. A description
+must be provided. The third argument is the value, if omitted it is boolean.
+Any type can be given as long as it can be parsed, with operator>>.
+
+To parse the command line do:
+
+    auto result = options.parse(argc, argv);
+
+To retrieve an option use `result.count("option")` to get the number of times
+it appeared, and
+
+    result["opt"].as<type>()
+
+to get its value. If "opt" doesn't exist, or isn't of the right type, then an
+exception will be thrown.
+
+Note that the result of `options.parse` should only be used as long as the
+`options` object that created it is in scope.
+
+## Help groups
+
+Options can be placed into groups for the purposes of displaying help messages.
+To place options in a group, pass the group as a string to `add_options`. Then,
+when displaying the help, pass the groups that you would like displayed as a
+vector to the `help` function.
+
+## Positional Arguments
+
+Positional arguments can be optionally parsed into one or more options.
+To set up positional arguments, call
+
+    options.parse_positional({"first", "second", "last"})
+
+where "last" should be the name of an option with a container type, and the
+others should have a single value.
+
+## Default and implicit values
+
+An option can be declared with a default or an implicit value, or both.
+
+A default value is the value that an option takes when it is not specified
+on the command line. The following specifies a default value for an option:
+
+    cxxopts::value<std::string>()->default_value("value")
+
+An implicit value is the value that an option takes when it is given on the
+command line without an argument. The following specifies an implicit value:
+
+    cxxopts::value<std::string>()->implicit_value("implicit")
+
+If an option had both, then not specifying it would give the value `"value"`,
+writing it on the command line as `--option` would give the value `"implicit"`,
+and writing `--option=another` would give it the value `"another"`.
+
+Note that the default and implicit value is always stored as a string,
+regardless of the type that you want to store it in. It will be parsed as
+though it was given on the command line.
+
+## Boolean values
+
+Boolean options have a default implicit value of `"true"`, which can be
+overridden. The effect is that writing `-o` by itself will set option `o` to
+`true`. However, they can also be written with various strings using either
+`=value` or the next argument.
+
+## Custom help
+
+The string after the program name on the first line of the help can be
+completely replaced by calling `options.custom_help`. Note that you might
+also want to override the positional help by calling `options.positional_help`.
+
+# Linking
+
+This is a header only library.
+
+# Requirements
+
+The only build requirement is a C++ compiler that supports C++11 regular
+expressions. For example GCC >= 4.9 or clang with libc++.
+
+# TODO list
+
+* Allow unrecognised options.
+* Various help strings.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/thirdparty/cxxopts/cxxopts-config.cmake.in
----------------------------------------------------------------------
diff --git a/thirdparty/cxxopts/cxxopts-config.cmake.in b/thirdparty/cxxopts/cxxopts-config.cmake.in
new file mode 100644
index 0000000..c9efaf1
--- /dev/null
+++ b/thirdparty/cxxopts/cxxopts-config.cmake.in
@@ -0,0 +1,4 @@
+@PACKAGE_INIT@
+
+include(${CMAKE_CURRENT_LIST_DIR}/@targets_export_name@.cmake)
+check_required_components(cxxopts)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/thirdparty/cxxopts/include/cxxopts.hpp
----------------------------------------------------------------------
diff --git a/thirdparty/cxxopts/include/cxxopts.hpp b/thirdparty/cxxopts/include/cxxopts.hpp
new file mode 100644
index 0000000..6a6a85f
--- /dev/null
+++ b/thirdparty/cxxopts/include/cxxopts.hpp
@@ -0,0 +1,1988 @@
+/*
+
+Copyright (c) 2014, 2015, 2016, 2017 Jarryd Beck
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+
+*/
+
+#ifndef CXXOPTS_HPP_INCLUDED
+#define CXXOPTS_HPP_INCLUDED
+
+#include <cstring>
+#include <cctype>
+#include <exception>
+#include <iostream>
+#include <map>
+#include <memory>
+#include <regex>
+#include <sstream>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+namespace cxxopts
+{
+  static constexpr struct {
+    uint8_t major, minor, patch;
+  } version = {2, 1, 0};
+}
+
+//when we ask cxxopts to use Unicode, help strings are processed using ICU,
+//which results in the correct lengths being computed for strings when they
+//are formatted for the help output
+//it is necessary to make sure that <unicode/unistr.h> can be found by the
+//compiler, and that icu-uc is linked in to the binary.
+
+#ifdef CXXOPTS_USE_UNICODE
+#include <unicode/unistr.h>
+
+namespace cxxopts
+{
+  typedef icu::UnicodeString String;
+
+  inline
+  String
+  toLocalString(std::string s)
+  {
+    return icu::UnicodeString::fromUTF8(std::move(s));
+  }
+
+  class UnicodeStringIterator : public
+    std::iterator<std::forward_iterator_tag, int32_t>
+  {
+    public:
+
+    UnicodeStringIterator(const icu::UnicodeString* string, int32_t pos)
+    : s(string)
+    , i(pos)
+    {
+    }
+
+    value_type
+    operator*() const
+    {
+      return s->char32At(i);
+    }
+
+    bool
+    operator==(const UnicodeStringIterator& rhs) const
+    {
+      return s == rhs.s && i == rhs.i;
+    }
+
+    bool
+    operator!=(const UnicodeStringIterator& rhs) const
+    {
+      return !(*this == rhs);
+    }
+
+    UnicodeStringIterator&
+    operator++()
+    {
+      ++i;
+      return *this;
+    }
+
+    UnicodeStringIterator
+    operator+(int32_t v)
+    {
+      return UnicodeStringIterator(s, i + v);
+    }
+
+    private:
+    const icu::UnicodeString* s;
+    int32_t i;
+  };
+
+  inline
+  String&
+  stringAppend(String&s, String a)
+  {
+    return s.append(std::move(a));
+  }
+
+  inline
+  String&
+  stringAppend(String& s, int n, UChar32 c)
+  {
+    for (int i = 0; i != n; ++i)
+    {
+      s.append(c);
+    }
+
+    return s;
+  }
+
+  template <typename Iterator>
+  String&
+  stringAppend(String& s, Iterator begin, Iterator end)
+  {
+    while (begin != end)
+    {
+      s.append(*begin);
+      ++begin;
+    }
+
+    return s;
+  }
+
+  inline
+  size_t
+  stringLength(const String& s)
+  {
+    return s.length();
+  }
+
+  inline
+  std::string
+  toUTF8String(const String& s)
+  {
+    std::string result;
+    s.toUTF8String(result);
+
+    return result;
+  }
+
+  inline
+  bool
+  empty(const String& s)
+  {
+    return s.isEmpty();
+  }
+}
+
+namespace std
+{
+  inline
+  cxxopts::UnicodeStringIterator
+  begin(const icu::UnicodeString& s)
+  {
+    return cxxopts::UnicodeStringIterator(&s, 0);
+  }
+
+  inline
+  cxxopts::UnicodeStringIterator
+  end(const icu::UnicodeString& s)
+  {
+    return cxxopts::UnicodeStringIterator(&s, s.length());
+  }
+}
+
+//ifdef CXXOPTS_USE_UNICODE
+#else
+
+namespace cxxopts
+{
+  typedef std::string String;
+
+  template <typename T>
+  T
+  toLocalString(T&& t)
+  {
+    return t;
+  }
+
+  inline
+  size_t
+  stringLength(const String& s)
+  {
+    return s.length();
+  }
+
+  inline
+  String&
+  stringAppend(String&s, String a)
+  {
+    return s.append(std::move(a));
+  }
+
+  inline
+  String&
+  stringAppend(String& s, size_t n, char c)
+  {
+    return s.append(n, c);
+  }
+
+  template <typename Iterator>
+  String&
+  stringAppend(String& s, Iterator begin, Iterator end)
+  {
+    return s.append(begin, end);
+  }
+
+  template <typename T>
+  std::string
+  toUTF8String(T&& t)
+  {
+    return std::forward<T>(t);
+  }
+
+  inline
+  bool
+  empty(const std::string& s)
+  {
+    return s.empty();
+  }
+}
+
+//ifdef CXXOPTS_USE_UNICODE
+#endif
+
+namespace cxxopts
+{
+  namespace
+  {
+#ifdef _WIN32
+    const std::string LQUOTE("\'");
+    const std::string RQUOTE("\'");
+#else
+    const std::string LQUOTE("‘");
+    const std::string RQUOTE("’");
+#endif
+  }
+
+  class Value : public std::enable_shared_from_this<Value>
+  {
+    public:
+
+    virtual ~Value() = default;
+
+    virtual
+    std::shared_ptr<Value>
+    clone() const = 0;
+
+    virtual void
+    parse(const std::string& text) const = 0;
+
+    virtual void
+    parse() const = 0;
+
+    virtual bool
+    has_default() const = 0;
+
+    virtual bool
+    is_container() const = 0;
+
+    virtual bool
+    has_implicit() const = 0;
+
+    virtual std::string
+    get_default_value() const = 0;
+
+    virtual std::string
+    get_implicit_value() const = 0;
+
+    virtual std::shared_ptr<Value>
+    default_value(const std::string& value) = 0;
+
+    virtual std::shared_ptr<Value>
+    implicit_value(const std::string& value) = 0;
+
+    virtual bool
+    is_boolean() const = 0;
+  };
+
+  class OptionException : public std::exception
+  {
+    public:
+    OptionException(const std::string& message)
+    : m_message(message)
+    {
+    }
+
+    virtual const char*
+    what() const noexcept
+    {
+      return m_message.c_str();
+    }
+
+    private:
+    std::string m_message;
+  };
+
+  class OptionSpecException : public OptionException
+  {
+    public:
+
+    OptionSpecException(const std::string& message)
+    : OptionException(message)
+    {
+    }
+  };
+
+  class OptionParseException : public OptionException
+  {
+    public:
+    OptionParseException(const std::string& message)
+    : OptionException(message)
+    {
+    }
+  };
+
+  class option_exists_error : public OptionSpecException
+  {
+    public:
+    option_exists_error(const std::string& option)
+    : OptionSpecException(u8"Option " + LQUOTE + option + RQUOTE + u8" already exists")
+    {
+    }
+  };
+
+  class invalid_option_format_error : public OptionSpecException
+  {
+    public:
+    invalid_option_format_error(const std::string& format)
+    : OptionSpecException(u8"Invalid option format " + LQUOTE + format + RQUOTE)
+    {
+    }
+  };
+
+  class option_not_exists_exception : public OptionParseException
+  {
+    public:
+    option_not_exists_exception(const std::string& option)
+    : OptionParseException(u8"Option " + LQUOTE + option + RQUOTE + u8" does not exist")
+    {
+    }
+  };
+
+  class missing_argument_exception : public OptionParseException
+  {
+    public:
+    missing_argument_exception(const std::string& option)
+    : OptionParseException(
+        u8"Option " + LQUOTE + option + RQUOTE + u8" is missing an argument"
+      )
+    {
+    }
+  };
+
+  class option_requires_argument_exception : public OptionParseException
+  {
+    public:
+    option_requires_argument_exception(const std::string& option)
+    : OptionParseException(
+        u8"Option " + LQUOTE + option + RQUOTE + u8" requires an argument"
+      )
+    {
+    }
+  };
+
+  class option_not_has_argument_exception : public OptionParseException
+  {
+    public:
+    option_not_has_argument_exception
+    (
+      const std::string& option,
+      const std::string& arg
+    )
+    : OptionParseException(
+        u8"Option " + LQUOTE + option + RQUOTE +
+        u8" does not take an argument, but argument " +
+        LQUOTE + arg + RQUOTE + " given"
+      )
+    {
+    }
+  };
+
+  class option_not_present_exception : public OptionParseException
+  {
+    public:
+    option_not_present_exception(const std::string& option)
+    : OptionParseException(u8"Option " + LQUOTE + option + RQUOTE + u8" not present")
+    {
+    }
+  };
+
+  class argument_incorrect_type : public OptionParseException
+  {
+    public:
+    argument_incorrect_type
+    (
+      const std::string& arg
+    )
+    : OptionParseException(
+        u8"Argument " + LQUOTE + arg + RQUOTE + u8" failed to parse"
+      )
+    {
+    }
+  };
+
+  class option_required_exception : public OptionParseException
+  {
+    public:
+    option_required_exception(const std::string& option)
+    : OptionParseException(
+        u8"Option " + LQUOTE + option + RQUOTE + u8" is required but not present"
+      )
+    {
+    }
+  };
+
+  namespace values
+  {
+    namespace
+    {
+      std::basic_regex<char> integer_pattern
+        ("(-)?(0x)?([1-9a-zA-Z][0-9a-zA-Z]*)|((0x)?0)");
+      std::basic_regex<char> truthy_pattern
+        ("(t|T)(rue)?");
+      std::basic_regex<char> falsy_pattern
+        ("((f|F)(alse)?)?");
+    }
+
+    namespace detail
+    {
+      template <typename T, bool B>
+      struct SignedCheck;
+
+      template <typename T>
+      struct SignedCheck<T, true>
+      {
+        template <typename U>
+        void
+        operator()(bool negative, U u, const std::string& text)
+        {
+          if (negative)
+          {
+            if (u > static_cast<U>(-std::numeric_limits<T>::min()))
+            {
+              throw argument_incorrect_type(text);
+            }
+          }
+          else
+          {
+            if (u > static_cast<U>(std::numeric_limits<T>::max()))
+            {
+              throw argument_incorrect_type(text);
+            }
+          }
+        }
+      };
+
+      template <typename T>
+      struct SignedCheck<T, false>
+      {
+        template <typename U>
+        void
+        operator()(bool, U, const std::string&) {}
+      };
+
+      template <typename T, typename U>
+      void
+      check_signed_range(bool negative, U value, const std::string& text)
+      {
+        SignedCheck<T, std::numeric_limits<T>::is_signed>()(negative, value, text);
+      }
+    }
+
+    template <typename R, typename T>
+    R
+    checked_negate(T&& t, const std::string&, std::true_type)
+    {
+      // if we got to here, then `t` is a positive number that fits into
+      // `R`. So to avoid MSVC C4146, we first cast it to `R`.
+      // See https://github.com/jarro2783/cxxopts/issues/62 for more details.
+      return -static_cast<R>(t);
+    }
+
+    template <typename R, typename T>
+    T
+    checked_negate(T&&, const std::string& text, std::false_type)
+    {
+      throw argument_incorrect_type(text);
+    }
+
+    template <typename T>
+    void
+    integer_parser(const std::string& text, T& value)
+    {
+      std::smatch match;
+      std::regex_match(text, match, integer_pattern);
+
+      if (match.length() == 0)
+      {
+        throw argument_incorrect_type(text);
+      }
+
+      if (match.length(4) > 0)
+      {
+        value = 0;
+        return;
+      }
+
+      using US = typename std::make_unsigned<T>::type;
+
+      constexpr auto umax = std::numeric_limits<US>::max();
+      constexpr bool is_signed = std::numeric_limits<T>::is_signed;
+      const bool negative = match.length(1) > 0;
+      const uint8_t base = match.length(2) > 0 ? 16 : 10;
+
+      auto value_match = match[3];
+
+      US result = 0;
+
+      for (auto iter = value_match.first; iter != value_match.second; ++iter)
+      {
+        size_t digit = 0;
+
+        if (*iter >= '0' && *iter <= '9')
+        {
+          digit = *iter - '0';
+        }
+        else if (base == 16 && *iter >= 'a' && *iter <= 'f')
+        {
+          digit = *iter - 'a' + 10;
+        }
+        else if (base == 16 && *iter >= 'A' && *iter <= 'F')
+        {
+          digit = *iter - 'A' + 10;
+        }
+        else
+        {
+          throw argument_incorrect_type(text);
+        }
+
+        if (umax - digit < result * base)
+        {
+          throw argument_incorrect_type(text);
+        }
+
+        result = result * base + digit;
+      }
+
+      detail::check_signed_range<T>(negative, result, text);
+
+      if (negative)
+      {
+        value = checked_negate<T>(result,
+          text,
+          std::integral_constant<bool, is_signed>());
+      }
+      else
+      {
+        value = result;
+      }
+    }
+
+    template <typename T>
+    void stringstream_parser(const std::string& text, T& value)
+    {
+      std::stringstream in(text);
+      in >> value;
+      if (!in) {
+        throw argument_incorrect_type(text);
+      }
+    }
+
+    inline
+    void
+    parse_value(const std::string& text, uint8_t& value)
+    {
+      integer_parser(text, value);
+    }
+
+    inline
+    void
+    parse_value(const std::string& text, int8_t& value)
+    {
+      integer_parser(text, value);
+    }
+
+    inline
+    void
+    parse_value(const std::string& text, uint16_t& value)
+    {
+      integer_parser(text, value);
+    }
+
+    inline
+    void
+    parse_value(const std::string& text, int16_t& value)
+    {
+      integer_parser(text, value);
+    }
+
+    inline
+    void
+    parse_value(const std::string& text, uint32_t& value)
+    {
+      integer_parser(text, value);
+    }
+
+    inline
+    void
+    parse_value(const std::string& text, int32_t& value)
+    {
+      integer_parser(text, value);
+    }
+
+    inline
+    void
+    parse_value(const std::string& text, uint64_t& value)
+    {
+      integer_parser(text, value);
+    }
+
+    inline
+    void
+    parse_value(const std::string& text, int64_t& value)
+    {
+      integer_parser(text, value);
+    }
+
+    inline
+    void
+    parse_value(const std::string& text, bool& value)
+    {
+      std::smatch result;
+      std::regex_match(text, result, truthy_pattern);
+
+      if (!result.empty())
+      {
+        value = true;
+        return;
+      }
+
+      std::regex_match(text, result, falsy_pattern);
+      if (!result.empty())
+      {
+        value = false;
+        return;
+      }
+
+      throw argument_incorrect_type(text);
+    }
+
+    inline
+    void
+    parse_value(const std::string& text, std::string& value)
+    {
+      value = text;
+    }
+
+    // The fallback parser. It uses the stringstream parser to parse all types
+    // that have not been overloaded explicitly.  It has to be placed in the
+    // source code before all other more specialized templates.
+    template <typename T>
+    void
+    parse_value(const std::string& text, T& value) {
+      stringstream_parser(text, value);
+    }
+
+    template <typename T>
+    void
+    parse_value(const std::string& text, std::vector<T>& value)
+    {
+      T v;
+      parse_value(text, v);
+      value.push_back(v);
+    }
+
+    template <typename T>
+    struct type_is_container
+    {
+      static constexpr bool value = false;
+    };
+
+    template <typename T>
+    struct type_is_container<std::vector<T>>
+    {
+      static constexpr bool value = true;
+    };
+
+    template <typename T>
+    class abstract_value : public Value
+    {
+      using Self = abstract_value<T>;
+
+      public:
+      abstract_value()
+      : m_result(std::make_shared<T>())
+      , m_store(m_result.get())
+      {
+      }
+
+      abstract_value(T* t)
+      : m_store(t)
+      {
+      }
+
+      virtual ~abstract_value() = default;
+
+      abstract_value(const abstract_value& rhs)
+      {
+        if (rhs.m_result)
+        {
+          m_result = std::make_shared<T>();
+          m_store = m_result.get();
+        }
+        else
+        {
+          m_store = rhs.m_store;
+        }
+
+        m_default = rhs.m_default;
+        m_implicit = rhs.m_implicit;
+        m_default_value = rhs.m_default_value;
+        m_implicit_value = rhs.m_implicit_value;
+      }
+
+      void
+      parse(const std::string& text) const
+      {
+        parse_value(text, *m_store);
+      }
+
+      bool
+      is_container() const
+      {
+        return type_is_container<T>::value;
+      }
+
+      void
+      parse() const
+      {
+        parse_value(m_default_value, *m_store);
+      }
+
+      bool
+      has_default() const
+      {
+        return m_default;
+      }
+
+      bool
+      has_implicit() const
+      {
+        return m_implicit;
+      }
+
+      std::shared_ptr<Value>
+      default_value(const std::string& value)
+      {
+        m_default = true;
+        m_default_value = value;
+        return shared_from_this();
+      }
+
+      std::shared_ptr<Value>
+      implicit_value(const std::string& value)
+      {
+        m_implicit = true;
+        m_implicit_value = value;
+        return shared_from_this();
+      }
+
+      std::string
+      get_default_value() const
+      {
+        return m_default_value;
+      }
+
+      std::string
+      get_implicit_value() const
+      {
+        return m_implicit_value;
+      }
+
+      bool
+      is_boolean() const
+      {
+        return std::is_same<T, bool>::value;
+      }
+
+      const T&
+      get() const
+      {
+        if (m_store == nullptr)
+        {
+          return *m_result;
+        }
+        else
+        {
+          return *m_store;
+        }
+      }
+
+      protected:
+      std::shared_ptr<T> m_result;
+      T* m_store;
+
+      bool m_default = false;
+      bool m_implicit = false;
+
+      std::string m_default_value;
+      std::string m_implicit_value;
+    };
+
+    template <typename T>
+    class standard_value : public abstract_value<T>
+    {
+      public:
+      using abstract_value<T>::abstract_value;
+
+      std::shared_ptr<Value>
+      clone() const
+      {
+        return std::make_shared<standard_value<T>>(*this);
+      }
+    };
+
+    template <>
+    class standard_value<bool> : public abstract_value<bool>
+    {
+      public:
+      ~standard_value() = default;
+
+      standard_value()
+      {
+        set_implicit();
+      }
+
+      standard_value(bool* b)
+      : abstract_value(b)
+      {
+        set_implicit();
+      }
+
+      std::shared_ptr<Value>
+      clone() const
+      {
+        return std::make_shared<standard_value<bool>>(*this);
+      }
+
+      private:
+
+      void
+      set_implicit()
+      {
+        m_implicit = true;
+        m_implicit_value = "true";
+      }
+    };
+  }
+
+  template <typename T>
+  std::shared_ptr<Value>
+  value()
+  {
+    return std::make_shared<values::standard_value<T>>();
+  }
+
+  template <typename T>
+  std::shared_ptr<Value>
+  value(T& t)
+  {
+    return std::make_shared<values::standard_value<T>>(&t);
+  }
+
+  class OptionAdder;
+
+  class OptionDetails
+  {
+    public:
+    OptionDetails
+    (
+      const std::string& short_,
+      const std::string& long_,
+      const String& desc,
+      std::shared_ptr<const Value> val
+    )
+    : m_short(short_)
+    , m_long(long_)
+    , m_desc(desc)
+    , m_value(val)
+    , m_count(0)
+    {
+    }
+
+    OptionDetails(const OptionDetails& rhs)
+    : m_desc(rhs.m_desc)
+    , m_count(rhs.m_count)
+    {
+      m_value = rhs.m_value->clone();
+    }
+
+    OptionDetails(OptionDetails&& rhs) = default;
+
+    const String&
+    description() const
+    {
+      return m_desc;
+    }
+
+    const Value& value() const {
+        return *m_value;
+    }
+
+    std::shared_ptr<Value>
+    make_storage() const
+    {
+      return m_value->clone();
+    }
+
+    const std::string&
+    short_name() const
+    {
+      return m_short;
+    }
+
+    const std::string&
+    long_name() const
+    {
+      return m_long;
+    }
+
+    private:
+    std::string m_short;
+    std::string m_long;
+    String m_desc;
+    std::shared_ptr<const Value> m_value;
+    int m_count;
+  };
+
+  struct HelpOptionDetails
+  {
+    std::string s;
+    std::string l;
+    String desc;
+    bool has_default;
+    std::string default_value;
+    bool has_implicit;
+    std::string implicit_value;
+    std::string arg_help;
+    bool is_container;
+    bool is_boolean;
+  };
+
+  struct HelpGroupDetails
+  {
+    std::string name;
+    std::string description;
+    std::vector<HelpOptionDetails> options;
+  };
+
+  class OptionValue
+  {
+    public:
+    void
+    parse
+    (
+      std::shared_ptr<const OptionDetails> details,
+      const std::string& text
+    )
+    {
+      ensure_value(details);
+      ++m_count;
+      m_value->parse(text);
+    }
+
+    void
+    parse_default(std::shared_ptr<const OptionDetails> details)
+    {
+      ensure_value(details);
+      m_value->parse();
+      m_count++;
+    }
+
+    size_t
+    count() const
+    {
+      return m_count;
+    }
+
+    template <typename T>
+    const T&
+    as() const
+    {
+#ifdef CXXOPTS_NO_RTTI
+      return static_cast<const values::standard_value<T>&>(*m_value).get();
+#else
+      return dynamic_cast<const values::standard_value<T>&>(*m_value).get();
+#endif
+    }
+
+    private:
+    void
+    ensure_value(std::shared_ptr<const OptionDetails> details)
+    {
+      if (m_value == nullptr)
+      {
+        m_value = details->make_storage();
+      }
+    }
+
+    std::shared_ptr<Value> m_value;
+    size_t m_count = 0;
+  };
+
+  class KeyValue
+  {
+    public:
+    KeyValue(std::string key_, std::string value_)
+    : m_key(std::move(key_))
+    , m_value(std::move(value_))
+    {
+    }
+
+    const
+    std::string&
+    key() const
+    {
+      return m_key;
+    }
+
+    const std::string
+    value() const
+    {
+      return m_value;
+    }
+
+    template <typename T>
+    T
+    as() const
+    {
+      T result;
+      values::parse_value(m_value, result);
+      return result;
+    }
+
+    private:
+    std::string m_key;
+    std::string m_value;
+  };
+
+  class ParseResult
+  {
+    public:
+
+    ParseResult(
+      const std::unordered_map<std::string, std::shared_ptr<OptionDetails>>&,
+      std::vector<std::string>,
+      int&, char**&);
+
+    size_t
+    count(const std::string& o) const
+    {
+      auto iter = m_options.find(o);
+      if (iter == m_options.end())
+      {
+        return 0;
+      }
+
+      auto riter = m_results.find(iter->second);
+
+      return riter->second.count();
+    }
+
+    const OptionValue&
+    operator[](const std::string& option) const
+    {
+      auto iter = m_options.find(option);
+
+      if (iter == m_options.end())
+      {
+        throw option_not_present_exception(option);
+      }
+
+      auto riter = m_results.find(iter->second);
+
+      return riter->second;
+    }
+
+    const std::vector<KeyValue>&
+    arguments() const
+    {
+      return m_sequential;
+    }
+
+    private:
+
+    OptionValue&
+    get_option(std::shared_ptr<OptionDetails>);
+
+    void
+    parse(int& argc, char**& argv);
+
+    void
+    add_to_option(const std::string& option, const std::string& arg);
+
+    bool
+    consume_positional(std::string a);
+
+    void
+    parse_option
+    (
+      std::shared_ptr<OptionDetails> value,
+      const std::string& name,
+      const std::string& arg = ""
+    );
+
+    void
+    parse_default(std::shared_ptr<OptionDetails> details);
+
+    void
+    checked_parse_arg
+    (
+      int argc,
+      char* argv[],
+      int& current,
+      std::shared_ptr<OptionDetails> value,
+      const std::string& name
+    );
+
+    const std::unordered_map<std::string, std::shared_ptr<OptionDetails>>
+      &m_options;
+    std::vector<std::string> m_positional;
+    std::vector<std::string>::iterator m_next_positional;
+    std::unordered_set<std::string> m_positional_set;
+    std::unordered_map<std::shared_ptr<OptionDetails>, OptionValue> m_results;
+
+    std::vector<KeyValue> m_sequential;
+  };
+
+  class Options
+  {
+    public:
+
+    Options(std::string program, std::string help_string = "")
+    : m_program(std::move(program))
+    , m_help_string(toLocalString(std::move(help_string)))
+    , m_custom_help("[OPTION...]")
+    , m_positional_help("positional parameters")
+    , m_show_positional(false)
+    , m_next_positional(m_positional.end())
+    {
+    }
+
+    Options&
+    positional_help(std::string help_text)
+    {
+      m_positional_help = std::move(help_text);
+      return *this;
+    }
+
+    Options&
+    custom_help(std::string help_text)
+    {
+      m_custom_help = std::move(help_text);
+      return *this;
+    }
+
+    Options&
+    show_positional_help()
+    {
+      m_show_positional = true;
+      return *this;
+    }
+
+    ParseResult
+    parse(int& argc, char**& argv);
+
+    OptionAdder
+    add_options(std::string group = "");
+
+    void
+    add_option
+    (
+      const std::string& group,
+      const std::string& s,
+      const std::string& l,
+      std::string desc,
+      std::shared_ptr<const Value> value,
+      std::string arg_help
+    );
+
+    //parse positional arguments into the given option
+    void
+    parse_positional(std::string option);
+
+    void
+    parse_positional(std::vector<std::string> options);
+
+    void
+    parse_positional(std::initializer_list<std::string> options);
+
+    std::string
+    help(const std::vector<std::string>& groups = {""}) const;
+
+    const std::vector<std::string>
+    groups() const;
+
+    const HelpGroupDetails&
+    group_help(const std::string& group) const;
+
+    private:
+
+    void
+    add_one_option
+    (
+      const std::string& option,
+      std::shared_ptr<OptionDetails> details
+    );
+
+    String
+    help_one_group(const std::string& group) const;
+
+    void
+    generate_group_help
+    (
+      String& result,
+      const std::vector<std::string>& groups
+    ) const;
+
+    void
+    generate_all_groups_help(String& result) const;
+
+    std::string m_program;
+    String m_help_string;
+    std::string m_custom_help;
+    std::string m_positional_help;
+    bool m_show_positional;
+
+    std::unordered_map<std::string, std::shared_ptr<OptionDetails>> m_options;
+    std::vector<std::string> m_positional;
+    std::vector<std::string>::iterator m_next_positional;
+    std::unordered_set<std::string> m_positional_set;
+
+    //mapping from groups to help options
+    std::map<std::string, HelpGroupDetails> m_help;
+  };
+
+  class OptionAdder
+  {
+    public:
+
+    OptionAdder(Options& options, std::string group)
+    : m_options(options), m_group(std::move(group))
+    {
+    }
+
+    OptionAdder&
+    operator()
+    (
+      const std::string& opts,
+      const std::string& desc,
+      std::shared_ptr<const Value> value
+        = ::cxxopts::value<bool>(),
+      std::string arg_help = ""
+    );
+
+    private:
+    Options& m_options;
+    std::string m_group;
+  };
+
+  namespace
+  {
+    constexpr int OPTION_LONGEST = 30;
+    constexpr int OPTION_DESC_GAP = 2;
+
+    std::basic_regex<char> option_matcher
+      ("--([[:alnum:]][-_[:alnum:]]+)(=(.*))?|-([[:alnum:]]+)");
+
+    std::basic_regex<char> option_specifier
+      ("(([[:alnum:]]),)?[ ]*([[:alnum:]][-_[:alnum:]]*)?");
+
+    String
+    format_option
+    (
+      const HelpOptionDetails& o
+    )
+    {
+      auto& s = o.s;
+      auto& l = o.l;
+
+      String result = "  ";
+
+      if (s.size() > 0)
+      {
+        result += "-" + toLocalString(s) + ",";
+      }
+      else
+      {
+        result += "   ";
+      }
+
+      if (l.size() > 0)
+      {
+        result += " --" + toLocalString(l);
+      }
+
+      auto arg = o.arg_help.size() > 0 ? toLocalString(o.arg_help) : "arg";
+
+      if (!o.is_boolean)
+      {
+        if (o.has_implicit)
+        {
+          result += " [=" + arg + "(=" + toLocalString(o.implicit_value) + ")]";
+        }
+        else
+        {
+          result += " " + arg;
+        }
+      }
+
+      return result;
+    }
+
+    String
+    format_description
+    (
+      const HelpOptionDetails& o,
+      size_t start,
+      size_t width
+    )
+    {
+      auto desc = o.desc;
+
+      if (o.has_default)
+      {
+        desc += toLocalString(" (default: " + o.default_value + ")");
+      }
+
+      String result;
+
+      auto current = std::begin(desc);
+      auto startLine = current;
+      auto lastSpace = current;
+
+      auto size = size_t{};
+
+      while (current != std::end(desc))
+      {
+        if (*current == ' ')
+        {
+          lastSpace = current;
+        }
+
+        if (size > width)
+        {
+          if (lastSpace == startLine)
+          {
+            stringAppend(result, startLine, current + 1);
+            stringAppend(result, "\n");
+            stringAppend(result, start, ' ');
+            startLine = current + 1;
+            lastSpace = startLine;
+          }
+          else
+          {
+            stringAppend(result, startLine, lastSpace);
+            stringAppend(result, "\n");
+            stringAppend(result, start, ' ');
+            startLine = lastSpace + 1;
+          }
+          size = 0;
+        }
+        else
+        {
+          ++size;
+        }
+
+        ++current;
+      }
+
+      //append whatever is left
+      stringAppend(result, startLine, current);
+
+      return result;
+    }
+  }
+
+inline
+ParseResult::ParseResult
+(
+  const std::unordered_map<std::string, std::shared_ptr<OptionDetails>>& options,
+  std::vector<std::string> positional,
+  int& argc, char**& argv
+)
+: m_options(options)
+, m_positional(std::move(positional))
+, m_next_positional(m_positional.begin())
+{
+  parse(argc, argv);
+}
+
+inline
+OptionAdder
+Options::add_options(std::string group)
+{
+  return OptionAdder(*this, std::move(group));
+}
+
+inline
+OptionAdder&
+OptionAdder::operator()
+(
+  const std::string& opts,
+  const std::string& desc,
+  std::shared_ptr<const Value> value,
+  std::string arg_help
+)
+{
+  std::match_results<const char*> result;
+  std::regex_match(opts.c_str(), result, option_specifier);
+
+  if (result.empty())
+  {
+    throw invalid_option_format_error(opts);
+  }
+
+  const auto& short_match = result[2];
+  const auto& long_match = result[3];
+
+  if (!short_match.length() && !long_match.length())
+  {
+    throw invalid_option_format_error(opts);
+  } else if (long_match.length() == 1 && short_match.length())
+  {
+    throw invalid_option_format_error(opts);
+  }
+
+  auto option_names = []
+  (
+    const std::sub_match<const char*>& short_,
+    const std::sub_match<const char*>& long_
+  )
+  {
+    if (long_.length() == 1)
+    {
+      return std::make_tuple(long_.str(), short_.str());
+    }
+    else
+    {
+      return std::make_tuple(short_.str(), long_.str());
+    }
+  }(short_match, long_match);
+
+  m_options.add_option
+  (
+    m_group,
+    std::get<0>(option_names),
+    std::get<1>(option_names),
+    desc,
+    value,
+    std::move(arg_help)
+  );
+
+  return *this;
+}
+
+inline
+void
+ParseResult::parse_default(std::shared_ptr<OptionDetails> details)
+{
+  m_results[details].parse_default(details);
+}
+
+inline
+void
+ParseResult::parse_option
+(
+  std::shared_ptr<OptionDetails> value,
+  const std::string& /*name*/,
+  const std::string& arg
+)
+{
+  auto& result = m_results[value];
+  result.parse(value, arg);
+
+  m_sequential.emplace_back(value->long_name(), arg);
+}
+
+inline
+void
+ParseResult::checked_parse_arg
+(
+  int argc,
+  char* argv[],
+  int& current,
+  std::shared_ptr<OptionDetails> value,
+  const std::string& name
+)
+{
+  if (current + 1 >= argc)
+  {
+    if (value->value().has_implicit())
+    {
+      parse_option(value, name, value->value().get_implicit_value());
+    }
+    else
+    {
+      throw missing_argument_exception(name);
+    }
+  }
+  else
+  {
+    if (value->value().has_implicit())
+    {
+      parse_option(value, name, value->value().get_implicit_value());
+    }
+    else
+    {
+      parse_option(value, name, argv[current + 1]);
+      ++current;
+    }
+  }
+}
+
+inline
+void
+ParseResult::add_to_option(const std::string& option, const std::string& arg)
+{
+  auto iter = m_options.find(option);
+
+  if (iter == m_options.end())
+  {
+    throw option_not_exists_exception(option);
+  }
+
+  parse_option(iter->second, option, arg);
+}
+
+inline
+bool
+ParseResult::consume_positional(std::string a)
+{
+  while (m_next_positional != m_positional.end())
+  {
+    auto iter = m_options.find(*m_next_positional);
+    if (iter != m_options.end())
+    {
+      auto& result = m_results[iter->second];
+      if (!iter->second->value().is_container())
+      {
+        if (result.count() == 0)
+        {
+          add_to_option(*m_next_positional, a);
+          ++m_next_positional;
+          return true;
+        }
+        else
+        {
+          ++m_next_positional;
+          continue;
+        }
+      }
+      else
+      {
+        add_to_option(*m_next_positional, a);
+        return true;
+      }
+    }
+    ++m_next_positional;
+  }
+
+  return false;
+}
+
+inline
+void
+Options::parse_positional(std::string option)
+{
+  parse_positional(std::vector<std::string>{std::move(option)});
+}
+
+inline
+void
+Options::parse_positional(std::vector<std::string> options)
+{
+  m_positional = std::move(options);
+  m_next_positional = m_positional.begin();
+
+  m_positional_set.insert(m_positional.begin(), m_positional.end());
+}
+
+inline
+void
+Options::parse_positional(std::initializer_list<std::string> options)
+{
+  parse_positional(std::vector<std::string>(std::move(options)));
+}
+
+inline
+ParseResult
+Options::parse(int& argc, char**& argv)
+{
+  ParseResult result(m_options, m_positional, argc, argv);
+  return result;
+}
+
+inline
+void
+ParseResult::parse(int& argc, char**& argv)
+{
+  int current = 1;
+
+  int nextKeep = 1;
+
+  bool consume_remaining = false;
+
+  while (current != argc)
+  {
+    if (strcmp(argv[current], "--") == 0)
+    {
+      consume_remaining = true;
+      ++current;
+      break;
+    }
+
+    std::match_results<const char*> result;
+    std::regex_match(argv[current], result, option_matcher);
+
+    if (result.empty())
+    {
+      //not a flag
+
+      //if true is returned here then it was consumed, otherwise it is
+      //ignored
+      if (consume_positional(argv[current]))
+      {
+      }
+      else
+      {
+        argv[nextKeep] = argv[current];
+        ++nextKeep;
+      }
+      //if we return from here then it was parsed successfully, so continue
+    }
+    else
+    {
+      //short or long option?
+      if (result[4].length() != 0)
+      {
+        const std::string& s = result[4];
+
+        for (std::size_t i = 0; i != s.size(); ++i)
+        {
+          std::string name(1, s[i]);
+          auto iter = m_options.find(name);
+
+          if (iter == m_options.end())
+          {
+            throw option_not_exists_exception(name);
+          }
+
+          auto value = iter->second;
+
+          if (i + 1 == s.size())
+          {
+            //it must be the last argument
+            checked_parse_arg(argc, argv, current, value, name);
+          }
+          else if (value->value().has_implicit())
+          {
+            parse_option(value, name, value->value().get_implicit_value());
+          }
+          else
+          {
+            //error
+            throw option_requires_argument_exception(name);
+          }
+        }
+      }
+      else if (result[1].length() != 0)
+      {
+        const std::string& name = result[1];
+
+        auto iter = m_options.find(name);
+
+        if (iter == m_options.end())
+        {
+          throw option_not_exists_exception(name);
+        }
+
+        auto opt = iter->second;
+
+        //equals provided for long option?
+        if (result[2].length() != 0)
+        {
+          //parse the option given
+
+          parse_option(opt, name, result[3]);
+        }
+        else
+        {
+          //parse the next argument
+          checked_parse_arg(argc, argv, current, opt, name);
+        }
+      }
+
+    }
+
+    ++current;
+  }
+
+  for (auto& opt : m_options)
+  {
+    auto& detail = opt.second;
+    auto& value = detail->value();
+
+    auto& store = m_results[detail];
+
+    if(!store.count() && value.has_default()){
+      parse_default(detail);
+    }
+  }
+
+  if (consume_remaining)
+  {
+    while (current < argc)
+    {
+      if (!consume_positional(argv[current])) {
+        break;
+      }
+      ++current;
+    }
+
+    //adjust argv for any that couldn't be swallowed
+    while (current != argc) {
+      argv[nextKeep] = argv[current];
+      ++nextKeep;
+      ++current;
+    }
+  }
+
+  argc = nextKeep;
+
+}
+
+inline
+void
+Options::add_option
+(
+  const std::string& group,
+  const std::string& s,
+  const std::string& l,
+  std::string desc,
+  std::shared_ptr<const Value> value,
+  std::string arg_help
+)
+{
+  auto stringDesc = toLocalString(std::move(desc));
+  auto option = std::make_shared<OptionDetails>(s, l, stringDesc, value);
+
+  if (s.size() > 0)
+  {
+    add_one_option(s, option);
+  }
+
+  if (l.size() > 0)
+  {
+    add_one_option(l, option);
+  }
+
+  //add the help details
+  auto& options = m_help[group];
+
+  options.options.emplace_back(HelpOptionDetails{s, l, stringDesc,
+      value->has_default(), value->get_default_value(),
+      value->has_implicit(), value->get_implicit_value(),
+      std::move(arg_help),
+      value->is_container(),
+      value->is_boolean()});
+}
+
+inline
+void
+Options::add_one_option
+(
+  const std::string& option,
+  std::shared_ptr<OptionDetails> details
+)
+{
+  auto in = m_options.emplace(option, details);
+
+  if (!in.second)
+  {
+    throw option_exists_error(option);
+  }
+}
+
+inline
+String
+Options::help_one_group(const std::string& g) const
+{
+  typedef std::vector<std::pair<String, String>> OptionHelp;
+
+  auto group = m_help.find(g);
+  if (group == m_help.end())
+  {
+    return "";
+  }
+
+  OptionHelp format;
+
+  size_t longest = 0;
+
+  String result;
+
+  if (!g.empty())
+  {
+    result += toLocalString(" " + g + " options:\n");
+  }
+
+  for (const auto& o : group->second.options)
+  {
+    if (o.is_container &&
+        m_positional_set.find(o.l) != m_positional_set.end() &&
+        !m_show_positional)
+    {
+      continue;
+    }
+
+    auto s = format_option(o);
+    longest = std::max(longest, stringLength(s));
+    format.push_back(std::make_pair(s, String()));
+  }
+
+  longest = std::min(longest, static_cast<size_t>(OPTION_LONGEST));
+
+  //widest allowed description
+  auto allowed = size_t{76} - longest - OPTION_DESC_GAP;
+
+  auto fiter = format.begin();
+  for (const auto& o : group->second.options)
+  {
+    if (o.is_container &&
+        m_positional_set.find(o.l) != m_positional_set.end() &&
+        !m_show_positional)
+    {
+      continue;
+    }
+
+    auto d = format_description(o, longest + OPTION_DESC_GAP, allowed);
+
+    result += fiter->first;
+    if (stringLength(fiter->first) > longest)
+    {
+      result += '\n';
+      result += toLocalString(std::string(longest + OPTION_DESC_GAP, ' '));
+    }
+    else
+    {
+      result += toLocalString(std::string(longest + OPTION_DESC_GAP -
+        stringLength(fiter->first),
+        ' '));
+    }
+    result += d;
+    result += '\n';
+
+    ++fiter;
+  }
+
+  return result;
+}
+
+inline
+void
+Options::generate_group_help
+(
+  String& result,
+  const std::vector<std::string>& print_groups
+) const
+{
+  for (size_t i = 0; i != print_groups.size(); ++i)
+  {
+    const String& group_help_text = help_one_group(print_groups[i]);
+    if (empty(group_help_text))
+    {
+      continue;
+    }
+    result += group_help_text;
+    if (i < print_groups.size() - 1)
+    {
+      result += '\n';
+    }
+  }
+}
+
+inline
+void
+Options::generate_all_groups_help(String& result) const
+{
+  std::vector<std::string> all_groups;
+  all_groups.reserve(m_help.size());
+
+  for (auto& group : m_help)
+  {
+    all_groups.push_back(group.first);
+  }
+
+  generate_group_help(result, all_groups);
+}
+
+inline
+std::string
+Options::help(const std::vector<std::string>& help_groups) const
+{
+  String result = m_help_string + "\nUsage:\n  " +
+    toLocalString(m_program) + " " + toLocalString(m_custom_help);
+
+  if (m_positional.size() > 0 && m_positional_help.size() > 0) {
+    result += " " + toLocalString(m_positional_help);
+  }
+
+  result += "\n\n";
+
+  if (help_groups.size() == 0)
+  {
+    generate_all_groups_help(result);
+  }
+  else
+  {
+    generate_group_help(result, help_groups);
+  }
+
+  return toUTF8String(result);
+}
+
+inline
+const std::vector<std::string>
+Options::groups() const
+{
+  std::vector<std::string> g;
+
+  std::transform(
+    m_help.begin(),
+    m_help.end(),
+    std::back_inserter(g),
+    [] (const std::map<std::string, HelpGroupDetails>::value_type& pair)
+    {
+      return pair.first;
+    }
+  );
+
+  return g;
+}
+
+inline
+const HelpGroupDetails&
+Options::group_help(const std::string& group) const
+{
+  return m_help.at(group);
+}
+
+}
+
+#endif //CXXOPTS_HPP_INCLUDED

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/thirdparty/cxxopts/src/.gitignore
----------------------------------------------------------------------
diff --git a/thirdparty/cxxopts/src/.gitignore b/thirdparty/cxxopts/src/.gitignore
new file mode 100644
index 0000000..33a9488
--- /dev/null
+++ b/thirdparty/cxxopts/src/.gitignore
@@ -0,0 +1 @@
+example

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/thirdparty/cxxopts/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/thirdparty/cxxopts/src/CMakeLists.txt b/thirdparty/cxxopts/src/CMakeLists.txt
new file mode 100644
index 0000000..eec97b7
--- /dev/null
+++ b/thirdparty/cxxopts/src/CMakeLists.txt
@@ -0,0 +1,24 @@
+# Copyright (c) 2014 Jarryd Beck
+# 
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+# 
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+# 
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+if(CXXOPTS_BUILD_EXAMPLES)
+    add_executable(example example.cpp)
+    target_link_libraries(example cxxopts)
+endif()

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/thirdparty/cxxopts/src/example.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/cxxopts/src/example.cpp b/thirdparty/cxxopts/src/example.cpp
new file mode 100644
index 0000000..b541774
--- /dev/null
+++ b/thirdparty/cxxopts/src/example.cpp
@@ -0,0 +1,138 @@
+/*
+
+Copyright (c) 2014 Jarryd Beck
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+
+*/
+
+#include <iostream>
+
+#include "cxxopts.hpp"
+
+int main(int argc, char* argv[])
+{
+  try
+  {
+    cxxopts::Options options(argv[0], " - example command line options");
+    options
+      .positional_help("[optional args]")
+      .show_positional_help();
+
+    bool apple = false;
+
+    options.add_options()
+      ("a,apple", "an apple", cxxopts::value<bool>(apple))
+      ("b,bob", "Bob")
+      ("f, file", "File", cxxopts::value<std::vector<std::string>>(), "FILE")
+      ("i,input", "Input", cxxopts::value<std::string>())
+      ("o,output", "Output file", cxxopts::value<std::string>()
+          ->default_value("a.out")->implicit_value("b.def"), "BIN")
+      ("positional",
+        "Positional arguments: these are the arguments that are entered "
+        "without an option", cxxopts::value<std::vector<std::string>>())
+      ("long-description",
+        "thisisareallylongwordthattakesupthewholelineandcannotbebrokenataspace")
+      ("help", "Print help")
+      ("int", "An integer", cxxopts::value<int>(), "N")
+      ("float", "A floating point number", cxxopts::value<float>())
+      ("option_that_is_too_long_for_the_help", "A very long option")
+    #ifdef CXXOPTS_USE_UNICODE
+      ("unicode", u8"A help option with non-ascii: à. Here the size of the"
+        " string should be correct")
+    #endif
+    ;
+
+    options.add_options("Group")
+      ("c,compile", "compile")
+      ("d,drop", "drop", cxxopts::value<std::vector<std::string>>());
+
+    options.parse_positional({"input", "output", "positional"});
+
+    auto result = options.parse(argc, argv);
+
+    if (result.count("help"))
+    {
+      std::cout << options.help({"", "Group"}) << std::endl;
+      exit(0);
+    }
+
+    if (apple)
+    {
+      std::cout << "Saw option ‘a’ " << result.count("a") << " times " <<
+        std::endl;
+    }
+
+    if (result.count("b"))
+    {
+      std::cout << "Saw option ‘b’" << std::endl;
+    }
+
+    if (result.count("f"))
+    {
+      auto& ff = result["f"].as<std::vector<std::string>>();
+      std::cout << "Files" << std::endl;
+      for (const auto& f : ff)
+      {
+        std::cout << f << std::endl;
+      }
+    }
+
+    if (result.count("input"))
+    {
+      std::cout << "Input = " << result["input"].as<std::string>()
+        << std::endl;
+    }
+
+    if (result.count("output"))
+    {
+      std::cout << "Output = " << result["output"].as<std::string>()
+        << std::endl;
+    }
+
+    if (result.count("positional"))
+    {
+      std::cout << "Positional = {";
+      auto& v = result["positional"].as<std::vector<std::string>>();
+      for (const auto& s : v) {
+        std::cout << s << ", ";
+      }
+      std::cout << "}" << std::endl;
+    }
+
+    if (result.count("int"))
+    {
+      std::cout << "int = " << result["int"].as<int>() << std::endl;
+    }
+
+    if (result.count("float"))
+    {
+      std::cout << "float = " << result["float"].as<float>() << std::endl;
+    }
+
+    std::cout << "Arguments remain = " << argc << std::endl;
+
+  } catch (const cxxopts::OptionException& e)
+  {
+    std::cout << "error parsing options: " << e.what() << std::endl;
+    exit(1);
+  }
+
+  return 0;
+}


[2/2] nifi-minifi-cpp git commit: MINIFICPP-37: Create an executable to support basic localized devops operations.

Posted by ph...@apache.org.
MINIFICPP-37: Create an executable to support basic localized devops operations.

This includes stopping components, clearing queues, getting queue information, and updating the flow

MINIFICPP-37: Updates to allow host/port to be specified and allow any interface to be used


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/edc8858f
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/edc8858f
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/edc8858f

Branch: refs/heads/master
Commit: edc8858f05f2e6b6e239be0c9aac942bbb5f4e49
Parents: a6c7a9f
Author: Marc Parisi <ph...@apache.org>
Authored: Sun Jan 7 11:42:03 2018 -0500
Committer: Marc Parisi <ph...@apache.org>
Committed: Fri Jan 12 12:57:21 2018 -0500

----------------------------------------------------------------------
 CMakeLists.txt                                  |    5 +
 LICENSE                                         |   22 +
 README.md                                       |   55 +
 conf/minifi.properties                          |    5 +-
 controller/CMakeLists.txt                       |   90 +
 controller/Controller.h                         |  200 ++
 controller/MiNiFiController.cpp                 |  217 ++
 extensions/http-curl/protocols/RESTReceiver.cpp |    4 +-
 extensions/http-curl/protocols/RESTReceiver.h   |    3 +-
 libminifi/include/c2/C2Agent.h                  |    2 +-
 libminifi/include/c2/ControllerSocketProtocol.h |   94 +
 libminifi/include/c2/HeartBeatReporter.h        |    7 +-
 libminifi/include/io/ClientSocket.h             |   22 +-
 libminifi/include/io/DescriptorStream.h         |  185 ++
 libminifi/include/io/ServerSocket.h             |   69 +
 libminifi/include/io/Sockets.h                  |    1 +
 libminifi/src/FlowController.cpp                |    4 +-
 libminifi/src/c2/C2Agent.cpp                    |   13 +-
 libminifi/src/c2/ControllerSocketProtocol.cpp   |  259 +++
 libminifi/src/io/ClientSocket.cpp               |   36 +-
 libminifi/src/io/DescriptorStream.cpp           |  196 ++
 libminifi/src/io/ServerSocket.cpp               |   81 +
 .../integration/ProvenanceReportingTest.cpp     |    2 +-
 libminifi/test/unit/ControllerTests.cpp         |  262 +++
 libminifi/test/unit/GetTCPTests.cpp             |    6 +-
 libminifi/test/unit/SocketTests.cpp             |    9 +-
 main/Main.h                                     |   57 +
 main/MiNiFiMain.cpp                             |   32 +-
 thirdparty/cxxopts/CHANGELOG.md                 |   45 +
 thirdparty/cxxopts/CMakeLists.txt               |   85 +
 thirdparty/cxxopts/INSTALL                      |   10 +
 thirdparty/cxxopts/LICENSE                      |   19 +
 thirdparty/cxxopts/README.md                    |  119 ++
 thirdparty/cxxopts/cxxopts-config.cmake.in      |    4 +
 thirdparty/cxxopts/include/cxxopts.hpp          | 1988 ++++++++++++++++++
 thirdparty/cxxopts/src/.gitignore               |    1 +
 thirdparty/cxxopts/src/CMakeLists.txt           |   24 +
 thirdparty/cxxopts/src/example.cpp              |  138 ++
 38 files changed, 4298 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 57afc95..1487417 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -148,6 +148,11 @@ if (NOT DISABLE_CIVET)
 createExtension(CIVETWEB CIVETWEB "This enables ListenHTTP" "extensions/civetweb")
 endif()
 
+if (NOT DISABLE_CURL AND NOT DISABLE_CONTROLLER)
+	add_subdirectory(thirdparty/cxxopts)
+	add_subdirectory(controller)
+endif()
+
 ## Add the rocks DB extension
 if (NOT ROCKSDB_FOUND OR BUILD_ROCKSDB)
 	set(BUILD_RD "TRUE")

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index a270b73..8cde4ea 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1053,3 +1053,25 @@ Redistribution and use in source and binary forms, with or without modification,
     Neither the name of the Eclipse Foundation, Inc. nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. 
 
 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+This product bundles 'cxxopts' which is available under an MIT license.
+
+Copyright (c) 2014 Jarryd Beck
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index ed0b834..1682017 100644
--- a/README.md
+++ b/README.md
@@ -628,6 +628,61 @@ MiNiFi can then be stopped by issuing:
 MiNiFi can also be installed as a system service using minifi.sh with an optional "service name" (default: minifi)
 
     $ ./bin/minifi.sh install [service name]
+    
+### Managing MiNFI C++ through the MiNiFi Controller
+
+The MiNiFi controller is an executable in the bin directory that can be used to control the MiNFi C++ agent while it runs. Currently the controller will let you stop subcomponents within a running instance, clear queues, get the status of queues, and update the flow for a warm re-deploy.
+
+The minificontroller can track a single MiNiFi C++ agent through the use of three options. Port is required.
+The hostname is not and will default to localhost. Additionally, controller.socket.local.any.interface allows
+you to bind to any address when using localhost. Otherwise, we will bind only to the loopback adapter so only
+minificontroller on the local host can control the agent:
+
+	$ controller.socket.host=localhost
+	$ controller.socket.port=9998
+	$ controller.socket.local.any.interface=true/false ( default false)
+
+These are defined by default to the above values. If the port option is left undefined, the MiNiFi controller
+will be disabled in your deployment.
+
+ The executable is stored in the bin directory and is titled minificontroller. Available commands are listed below.
+ 
+ #### Specifying connecting information
+ 
+   ./minificontroller --host "host name" --port "port"
+
+        * By default these options use those defined in minifi.properties and are not required
+
+ #### Start Command
+ 
+   ./minificontroller --start "component name"
+ 
+ #### Stop command 
+   ./minificontroller --stop "component name"
+   	  
+ #### List connections command
+   ./minificontroller --list connections
+      
+ #### List components command
+   ./minificontroller --list components
+ 
+ #### Clear connection command
+   ./minificontroller --clear "connection name"
+      
+ #### GetSize command
+   ./minificontroller --getsize "connection name"
+
+       * Returns the size of the connection. The current size along with the max will be reported
+ 
+ #### Update flow
+   ./minificontroller --updateflow "config yml"
+    
+       *Updates the flow file reference and performs a warm re-deploy.
+ 
+ #### Get full connection command     
+   ./minificontroller --getfull 
+   
+       *Provides a list of full connections, if any.
 
 ### Extensions
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/conf/minifi.properties
----------------------------------------------------------------------
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 6c75305..a528c01 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -15,8 +15,6 @@
 
 # Core Properties #
 nifi.version=0.1.0
-#disable the c2 services
-nifi.c2.enable=false
 nifi.flow.configuration.file=./conf/config.yml
 nifi.administrative.yield.duration=30 sec
 # If a component has no work to do (is "bored"), how long should we wait before checking again for work?
@@ -33,3 +31,6 @@ nifi.https.client.pass.phrase=./conf/password
 nifi.https.client.ca.certificate=./conf/nifi-cert.pem
 #nifi.rest.api.user.name=admin
 #nifi.rest.api.password=password
+## enable the controller socket provider on port 9998
+controller.socket.host=localhost
+controller.socket.port=9998

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/controller/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/controller/CMakeLists.txt b/controller/CMakeLists.txt
new file mode 100644
index 0000000..da6f499
--- /dev/null
+++ b/controller/CMakeLists.txt
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+IF(POLICY CMP0048)
+  CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
+
+include_directories(../main/ ../libminifi/include  ../libminifi/include/c2  ../libminifi/include/c2/protocols/  ../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics  ../libminifi/include/core/yaml  ../libminifi/include/core  ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ${CIVET_THIRDPARTY_ROOT}/include ../thirdparty/cxxopts/include  ../thirdparty/)
+
+include(CheckCXXCompilerFlag)
+CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
+CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
+if(COMPILER_SUPPORTS_CXX11)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Os")
+elseif(COMPILER_SUPPORTS_CXX0X)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x -Os")
+else()
+ message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
+endif()
+
+add_executable(minificontroller MiNiFiController.cpp)
+if(THREADS_HAVE_PTHREAD_ARG)
+  target_compile_options(PUBLIC minificontroller "-pthread")
+endif()
+if(CMAKE_THREAD_LIBS_INIT)
+  target_link_libraries(minificontroller "${CMAKE_THREAD_LIBS_INIT}")
+endif()
+
+# Include UUID
+find_package(UUID REQUIRED)
+
+# Include OpenSSL
+find_package(OpenSSL REQUIRED)
+include_directories(${OPENSSL_INCLUDE_DIR})
+
+# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, jsoncpp and rocksdb
+target_link_libraries(minificontroller core-minifi)
+
+if (APPLE)
+	target_link_libraries (minificontroller -Wl,-all_load minifi)
+else ()
+	target_link_libraries (minificontroller -Wl,--whole-archive minifi -Wl,--no-whole-archive)
+endif ()
+
+
+target_link_libraries(minificontroller yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB} ${UUID_LIBRARIES} cxxopts)
+
+
+if (APPLE)
+	get_property(extensions GLOBAL PROPERTY EXTENSION-OPTIONS)
+	foreach(EXTENSION ${extensions})
+		message("Linking against ${EXTENSION}")
+		target_link_libraries (minificontroller -Wl,-all_load ${EXTENSION})
+	endforeach()    
+else ()
+	get_property(extensions GLOBAL PROPERTY EXTENSION-OPTIONS)
+	foreach(EXTENSION ${extensions})
+	  target_link_libraries (minificontroller -Wl,--whole-archive ${EXTENSION} -Wl,--no-whole-archive)
+	endforeach()
+endif ()
+
+set_target_properties(minificontroller
+        PROPERTIES OUTPUT_NAME minificontroller)
+
+install(TARGETS minificontroller
+        RUNTIME
+        DESTINATION bin
+        COMPONENT bin)
+
+
+add_custom_command(TARGET minificontroller POST_BUILD
+           COMMAND cat ${CMAKE_BINARY_DIR}/all.log)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/controller/Controller.h
----------------------------------------------------------------------
diff --git a/controller/Controller.h b/controller/Controller.h
new file mode 100644
index 0000000..9cd3ba1
--- /dev/null
+++ b/controller/Controller.h
@@ -0,0 +1,200 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CONTROLLER_CONTROLLER_H_
+#define CONTROLLER_CONTROLLER_H_
+
+#include "io/ClientSocket.h"
+#include "c2/ControllerSocketProtocol.h"
+
+/**
+ * Sends a single argument comment
+ * @param socket socket unique ptr.
+ * @param op operation to perform
+ * @param value value to send
+ */
+bool sendSingleCommand(std::unique_ptr<minifi::io::Socket> socket, uint8_t op, const std::string value) {
+  socket->initialize();
+  std::vector<uint8_t> data;
+  minifi::io::BaseStream stream;
+  stream.writeData(&op, 1);
+  stream.writeUTF(value);
+  socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize());
+  return true;
+}
+
+/**
+ * Stops a stopped component
+ * @param socket socket unique ptr.
+ * @param op operation to perform
+ */
+bool stopComponent(std::unique_ptr<minifi::io::Socket> socket, std::string component) {
+  return sendSingleCommand(std::move(socket), minifi::c2::Operation::STOP, component);
+}
+
+/**
+ * Starts a previously stopped component.
+ * @param socket socket unique ptr.
+ * @param op operation to perform
+ */
+bool startComponent(std::unique_ptr<minifi::io::Socket> socket, std::string component) {
+  return sendSingleCommand(std::move(socket), minifi::c2::Operation::START, component);
+}
+
+/**
+ * Clears a connection queue.
+ * @param socket socket unique ptr.
+ * @param op operation to perform
+ */
+bool clearConnection(std::unique_ptr<minifi::io::Socket> socket, std::string connection) {
+  return sendSingleCommand(std::move(socket), minifi::c2::Operation::CLEAR, connection);
+}
+
+/**
+ * Updates the flow to the provided file
+ */
+int updateFlow(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out, std::string file) {
+  socket->initialize();
+  std::vector<uint8_t> data;
+  uint8_t op = minifi::c2::Operation::UPDATE;
+  minifi::io::BaseStream stream;
+  stream.writeData(&op, 1);
+  stream.writeUTF("flow");
+  stream.writeUTF(file);
+  if (socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize()) < 0) {
+    return -1;
+  }
+  // read the response
+  uint8_t resp = 0;
+  socket->readData(&resp, 1);
+  if (resp == minifi::c2::Operation::DESCRIBE) {
+    uint16_t connections = 0;
+    socket->read(connections);
+    out << connections << " are full" << std::endl;
+    for (int i = 0; i < connections; i++) {
+      std::string fullcomponent;
+      socket->readUTF(fullcomponent);
+      out << fullcomponent << " is full" << std::endl;
+    }
+  }
+  return 0;
+}
+
+/**
+ * Lists connections which are full
+ * @param socket socket ptr
+ */
+int getFullConnections(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out) {
+  socket->initialize();
+  std::vector<uint8_t> data;
+  uint8_t op = minifi::c2::Operation::DESCRIBE;
+  minifi::io::BaseStream stream;
+  stream.writeData(&op, 1);
+  stream.writeUTF("getfull");
+  if (socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize()) < 0) {
+    return -1;
+  }
+  // read the response
+  uint8_t resp = 0;
+  socket->readData(&resp, 1);
+  if (resp == minifi::c2::Operation::DESCRIBE) {
+    uint16_t connections = 0;
+    socket->read(connections);
+    out << connections << " are full" << std::endl;
+    for (int i = 0; i < connections; i++) {
+      std::string fullcomponent;
+      socket->readUTF(fullcomponent);
+      out << fullcomponent << " is full" << std::endl;
+    }
+  }
+  return 0;
+}
+
+/**
+ * Prints the connection size for the provided connection.
+ * @param socket socket ptr
+ * @param connection connection whose size will be returned.
+ */
+int getConnectionSize(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out, std::string connection) {
+  socket->initialize();
+  std::vector<uint8_t> data;
+  uint8_t op = minifi::c2::Operation::DESCRIBE;
+  minifi::io::BaseStream stream;
+  stream.writeData(&op, 1);
+  stream.writeUTF("queue");
+  stream.writeUTF(connection);
+  if (socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize()) < 0) {
+    return -1;
+  }
+  // read the response
+  uint8_t resp = 0;
+  socket->readData(&resp, 1);
+  if (resp == minifi::c2::Operation::DESCRIBE) {
+    std::string size;
+    socket->readUTF(size);
+    out << "Size/Max of " << connection << " " << size << std::endl;
+  }
+  return 0;
+}
+
+int listComponents(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out, bool show_header = true) {
+  socket->initialize();
+  minifi::io::BaseStream stream;
+  uint8_t op = minifi::c2::Operation::DESCRIBE;
+  stream.writeData(&op, 1);
+  stream.writeUTF("components");
+  if (socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize()) < 0) {
+    return -1;
+  }
+  uint16_t responses = 0;
+  socket->readData(&op, 1);
+  socket->read(responses);
+  if (show_header)
+    out << "Components:" << std::endl;
+
+  for (int i = 0; i < responses; i++) {
+    std::string name;
+    socket->readUTF(name, false);
+    out << name << std::endl;
+  }
+  return 0;
+}
+
+int listConnections(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out, bool show_header = true) {
+  socket->initialize();
+  minifi::io::BaseStream stream;
+  uint8_t op = minifi::c2::Operation::DESCRIBE;
+  stream.writeData(&op, 1);
+  stream.writeUTF("connections");
+  if (socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize()) < 0) {
+    return -1;
+  }
+  uint16_t responses = 0;
+  socket->readData(&op, 1);
+  socket->read(responses);
+  if (show_header)
+    out << "Connection Names:" << std::endl;
+
+  for (int i = 0; i < responses; i++) {
+    std::string name;
+    socket->readUTF(name, false);
+    out << name << std::endl;
+  }
+  return 0;
+}
+
+#endif /* CONTROLLER_CONTROLLER_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/controller/MiNiFiController.cpp
----------------------------------------------------------------------
diff --git a/controller/MiNiFiController.cpp b/controller/MiNiFiController.cpp
new file mode 100644
index 0000000..65d9dbd
--- /dev/null
+++ b/controller/MiNiFiController.cpp
@@ -0,0 +1,217 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <fcntl.h>
+#include <stdio.h>
+#include <semaphore.h>
+#include <signal.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <unistd.h>
+#include <yaml-cpp/yaml.h>
+#include <iostream>
+#include "io/BaseStream.h"
+
+#include "core/Core.h"
+
+#include "core/FlowConfiguration.h"
+#include "core/ConfigurationFactory.h"
+#include "core/RepositoryFactory.h"
+#include "FlowController.h"
+#include "Main.h"
+
+#include "Controller.h"
+#include "c2/ControllerSocketProtocol.h"
+
+#include "cxxopts.hpp"
+
+int main(int argc, char **argv) {
+
+  std::shared_ptr<logging::Logger> logger = logging::LoggerConfiguration::getConfiguration().getLogger("controller");
+
+  // assumes POSIX compliant environment
+  std::string minifiHome;
+  if (const char *env_p = std::getenv(MINIFI_HOME_ENV_KEY)) {
+    minifiHome = env_p;
+    logger->log_info("Using MINIFI_HOME=%s from environment.", minifiHome);
+  } else {
+    logger->log_info("MINIFI_HOME is not set; determining based on environment.");
+    char *path = nullptr;
+    char full_path[PATH_MAX];
+    path = realpath(argv[0], full_path);
+
+    if (path != nullptr) {
+      std::string minifiHomePath(path);
+      if (minifiHomePath.find_last_of("/\\") != std::string::npos) {
+        minifiHomePath = minifiHomePath.substr(0, minifiHomePath.find_last_of("/\\"));  //Remove /minifi from path
+        minifiHome = minifiHomePath.substr(0, minifiHomePath.find_last_of("/\\"));    //Remove /bin from path
+      }
+    }
+
+    // attempt to use cwd as MINIFI_HOME
+    if (minifiHome.empty() || !validHome(minifiHome)) {
+      char cwd[PATH_MAX];
+      getcwd(cwd, PATH_MAX);
+      minifiHome = cwd;
+    }
+
+  }
+
+  if (!validHome(minifiHome)) {
+    logger->log_error("No valid MINIFI_HOME could be inferred. "
+                      "Please set MINIFI_HOME or run minifi from a valid location.");
+    return -1;
+  }
+
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  configuration->setHome(minifiHome);
+  configuration->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
+
+  std::shared_ptr<logging::LoggerProperties> log_properties = std::make_shared<logging::LoggerProperties>();
+  log_properties->setHome(minifiHome);
+  log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE);
+  logging::LoggerConfiguration::getConfiguration().initialize(log_properties);
+
+  auto stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configuration);
+
+  std::string host = "localhost", portStr, caCert;
+  int port = -1;
+
+  cxxopts::Options options("MiNiFiController", "MiNiFi local agent controller");
+  options.positional_help("[optional args]").show_positional_help();
+
+  options.add_options()  //NOLINT
+  ("h,help", "Shows Help")  //NOLINT
+  ("host", "Specifies connecting host name", cxxopts::value<std::string>())  //NOLINT
+  ("port", "Specifies connecting host port", cxxopts::value<int>())  //NOLINT
+  ("stop", "Shuts down the provided component", cxxopts::value<std::vector<std::string>>())  //NOLINT
+  ("start", "Starts provided component", cxxopts::value<std::vector<std::string>>())  //NOLINT
+  ("l,list", "Provides a list of connections or processors", cxxopts::value<std::string>())  //NOLINT
+  ("c,clear", "Clears the associated connection queue", cxxopts::value<std::vector<std::string>>())  //NOLINT
+  ("getsize", "Reports the size of the associated connection queue", cxxopts::value<std::vector<std::string>>())  //NOLINT
+  ("updateflow", "Updates the flow of the agent using the provided flow file", cxxopts::value<std::string>())  //NOLINT
+  ("getfull", "Reports a list of full connections")  //NOLINT
+  ("noheaders", "Removes headers from output streams");
+
+  bool show_headers = true;
+
+  try {
+    auto result = options.parse(argc, argv);
+
+    if (result.count("help")) {
+      std::cout << options.help( { "", "Group" }) << std::endl;
+      exit(0);
+    }
+
+    if (result.count("host")) {
+      host = result["host"].as<std::string>();
+    } else {
+      configuration->get("controller.socket.host", host);
+    }
+
+    if (result.count("port")) {
+      port = result["port"].as<int>();
+    } else {
+      if (port == -1 && configuration->get("controller.socket.port", portStr)) {
+        port = std::stoi(portStr);
+      }
+    }
+
+    if ((IsNullOrEmpty(host) && port == -1)) {
+      std::cout << "MiNiFi Controller is disabled" << std::endl;
+      exit(0);
+    } else
+
+    if (result.count("noheaders")) {
+      show_headers = false;
+    }
+
+    if (result.count("stop") > 0) {
+      auto& components = result["stop"].as<std::vector<std::string>>();
+      for (const auto& component : components) {
+        auto socket = stream_factory_->createSocket(host, port);
+        if (!stopComponent(std::move(socket), component))
+          std::cout << component << " requested to stop" << std::endl;
+        else
+          std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+      }
+    }
+
+    if (result.count("start") > 0) {
+      auto& components = result["start"].as<std::vector<std::string>>();
+      for (const auto& component : components) {
+        auto socket = stream_factory_->createSocket(host, port);
+        if (!startComponent(std::move(socket), component))
+          std::cout << component << " requested to start" << std::endl;
+        else
+          std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+      }
+    }
+
+    if (result.count("c") > 0) {
+      auto& components = result["c"].as<std::vector<std::string>>();
+      for (const auto& connection : components) {
+        auto socket = stream_factory_->createSocket(host, port);
+        if (!clearConnection(std::move(socket), connection))
+          std::cout << "Cleared " << connection << std::endl;
+        else
+          std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+      }
+    }
+
+    if (result.count("getsize") > 0) {
+      auto& components = result["getsize"].as<std::vector<std::string>>();
+      for (const auto& component : components) {
+        auto socket = stream_factory_->createSocket(host, port);
+        if (getConnectionSize(std::move(socket), std::cout, component) < 0)
+          std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+      }
+
+    }
+
+    if (result.count("l") > 0) {
+      auto& option = result["l"].as<std::string>();
+      auto socket = stream_factory_->createSocket(host, port);
+      if (option == "components") {
+        if (listComponents(std::move(socket), std::cout, show_headers) < 0)
+          std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+      } else if (option == "connections") {
+        if (listConnections(std::move(socket), std::cout, show_headers) < 0)
+          std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+      }
+
+    }
+
+    if (result.count("getfull") > 0) {
+      auto socket = stream_factory_->createSocket(host, port);
+      if (getFullConnections(std::move(socket), std::cout) < 0)
+        std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+    }
+
+    if (result.count("updateflow") > 0) {
+      auto& flow_file = result["updateflow"].as<std::string>();
+      auto socket = stream_factory_->createSocket(host, port);
+      if (updateFlow(std::move(socket), std::cout, flow_file) < 0)
+        std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+    }
+  } catch (...) {
+    std::cout << options.help( { "", "Group" }) << std::endl;
+    exit(0);
+  }
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/extensions/http-curl/protocols/RESTReceiver.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp b/extensions/http-curl/protocols/RESTReceiver.cpp
index 4c46516..ac9d229 100644
--- a/extensions/http-curl/protocols/RESTReceiver.cpp
+++ b/extensions/http-curl/protocols/RESTReceiver.cpp
@@ -44,8 +44,8 @@ RESTReceiver::RESTReceiver(std::string name, uuid_t uuid)
       logger_(logging::LoggerFactory<RESTReceiver>::getLogger()) {
 }
 
-void RESTReceiver::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
-  HeartBeatReporter::initialize(controller, configure);
+void RESTReceiver::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure) {
+  HeartBeatReporter::initialize(controller, updateSink, configure);
   logger_->log_debug("Initializing rest receiveer");
   if (nullptr != configuration_) {
     std::string listeningPort, rootUri, caCert;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/extensions/http-curl/protocols/RESTReceiver.h
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTReceiver.h b/extensions/http-curl/protocols/RESTReceiver.h
index 4793ee3..e19932c 100644
--- a/extensions/http-curl/protocols/RESTReceiver.h
+++ b/extensions/http-curl/protocols/RESTReceiver.h
@@ -50,7 +50,8 @@ class RESTReceiver : public RESTProtocol, public HeartBeatReporter {
  public:
   RESTReceiver(std::string name, uuid_t uuid = nullptr);
 
-  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
+  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+                          const std::shared_ptr<Configure> &configure) override;
   virtual int16_t heartbeat(const C2Payload &heartbeat) override;
 
  protected:

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/include/c2/C2Agent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index b5d4d31..810eede 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -73,7 +73,7 @@ class C2Agent : public state::UpdateController, public state::metrics::MetricsSi
    */
   virtual int16_t setMetrics(const std::shared_ptr<state::metrics::Metrics> &metric);
 
-  int64_t getHeartBestDelay(){
+  int64_t getHeartBeatDelay(){
     std::lock_guard<std::mutex> lock(heartbeat_mutex);
     return heart_beat_period_;
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/include/c2/ControllerSocketProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/ControllerSocketProtocol.h b/libminifi/include/c2/ControllerSocketProtocol.h
new file mode 100644
index 0000000..156cf5c
--- /dev/null
+++ b/libminifi/include/c2/ControllerSocketProtocol.h
@@ -0,0 +1,94 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_CONTROLLERSOCKETPROTOCOL_H_
+#define LIBMINIFI_INCLUDE_C2_CONTROLLERSOCKETPROTOCOL_H_
+
+#include "core/Resource.h"
+#include "HeartBeatReporter.h"
+#include "io/StreamFactory.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+/**
+ * Purpose: Creates a reporter that can handle basic c2 operations for a localized environment
+ * through a simple TCP socket.
+ */
+class ControllerSocketProtocol : public HeartBeatReporter {
+ public:
+
+  ControllerSocketProtocol(std::string name, uuid_t uuid = nullptr)
+      : HeartBeatReporter(name, uuid),
+        logger_(logging::LoggerFactory<ControllerSocketProtocol>::getLogger()) {
+
+  }
+
+  /**
+   * Initialize the socket protocol.
+   * @param controller controller service provider.
+   * @param updateSink update mechanism that will be used to stop/clear elements
+   * @param configuration configuration class.
+   */
+  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+                          const std::shared_ptr<Configure> &configuration);
+
+  /**
+   * Handles the heartbeat
+   * @param payload incoming payload. From this function we only care about queue metrics.
+   */
+  virtual int16_t heartbeat(const C2Payload &payload);
+
+ protected:
+
+  /**
+   * Parses content from the content response.
+   */
+  void parse_content(const std::vector<C2ContentResponse> &content);
+
+  std::mutex controller_mutex_;
+
+  std::map<std::string, bool> queue_full_;
+
+  std::map<std::string, uint64_t> queue_size_;
+
+  std::map<std::string, uint64_t> queue_max_;
+
+  std::map<std::string, bool> component_map_;
+
+  std::unique_ptr<io::ServerSocket> server_socket_;
+
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory_;
+
+ private:
+
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(ControllerSocketProtocol);
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_CONTROLLERSOCKETPROTOCOL_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/include/c2/HeartBeatReporter.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/HeartBeatReporter.h b/libminifi/include/c2/HeartBeatReporter.h
index 3d0fd49..81f8828 100644
--- a/libminifi/include/c2/HeartBeatReporter.h
+++ b/libminifi/include/c2/HeartBeatReporter.h
@@ -39,11 +39,14 @@ class HeartBeatReporter : public core::Connectable {
   HeartBeatReporter(std::string name, uuid_t uuid)
       : core::Connectable(name, uuid),
         controller_(nullptr),
+        update_sink_(nullptr),
         configuration_(nullptr) {
   }
 
-  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+  virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+                          const std::shared_ptr<Configure> &configure) {
     controller_ = controller;
+    update_sink_ = updateSink;
     configuration_ = configure;
   }
   virtual ~HeartBeatReporter() {
@@ -89,6 +92,8 @@ class HeartBeatReporter : public core::Connectable {
 
   std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
 
+  std::shared_ptr<state::StateMonitor> update_sink_;
+
   std::shared_ptr<Configure> configuration_;
 };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/include/io/ClientSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h
index 216ef3d..ea445be 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -58,16 +58,6 @@ class SocketContext {
 class Socket : public BaseStream {
  public:
   /**
-   * Constructor that accepts host name, port and listeners. With this
-   * contructor we will be creating a server socket
-   * @param context the SocketContext
-   * @param hostname our host name
-   * @param port connecting port
-   * @param listeners number of listeners in the queue
-   */
-  explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners);
-
-  /**
    * Constructor that creates a client socket.
    * @param context the SocketContext
    * @param hostname hostname we are connecting to.
@@ -219,6 +209,16 @@ class Socket : public BaseStream {
  protected:
 
   /**
+   * Constructor that accepts host name, port and listeners. With this
+   * contructor we will be creating a server socket
+   * @param context the SocketContext
+   * @param hostname our host name
+   * @param port connecting port
+   * @param listeners number of listeners in the queue
+   */
+  explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners);
+
+  /**
    * Creates a vector and returns the vector using the provided
    * type name.
    * @param t incoming object
@@ -255,6 +255,8 @@ class Socket : public BaseStream {
   std::string canonical_hostname_;
   uint16_t port_;
 
+  bool is_loopback_only_;
+
   // connection information
   int32_t socket_file_descriptor_;
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/include/io/DescriptorStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/DescriptorStream.h b/libminifi/include/io/DescriptorStream.h
new file mode 100644
index 0000000..e6d843c
--- /dev/null
+++ b/libminifi/include/io/DescriptorStream.h
@@ -0,0 +1,185 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_DESCRIPTORSTREAM_H_
+#define LIBMINIFI_INCLUDE_IO_DESCRIPTORSTREAM_H_
+
+#include <iostream>
+#include <cstdint>
+#include <string>
+#include "EndianCheck.h"
+#include "BaseStream.h"
+#include "Serializable.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+/**
+ * Purpose: File Stream Base stream extension. This is intended to be a thread safe access to
+ * read/write to the local file system.
+ *
+ * Design: Simply extends BaseStream and overrides readData/writeData to allow a sink to the
+ * fstream object.
+ */
+class DescriptorStream : public io::BaseStream {
+ public:
+  /**
+   * File Stream constructor that accepts an fstream shared pointer.
+   * It must already be initialized for read and write.
+   */
+  explicit DescriptorStream(int fd);
+
+  virtual ~DescriptorStream() {
+
+  }
+
+  /**
+   * Skip to the specified offset.
+   * @param offset offset to which we will skip
+   */
+  void seek(uint64_t offset);
+
+  const uint64_t getSize() const {
+    return -1;
+  }
+
+  // data stream extensions
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(std::vector<uint8_t> &buf, int buflen);
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(uint8_t *buf, int buflen);
+
+  /**
+   * Write value to the stream using std::vector
+   * @param buf incoming buffer
+   * @param buflen buffer to write
+   *
+   */
+  virtual int writeData(std::vector<uint8_t> &buf, int buflen);
+
+  /**
+   * writes value to stream
+   * @param value value to write
+   * @param size size of value
+   */
+  virtual int writeData(uint8_t *value, int size);
+
+  /**
+   * Returns the underlying buffer
+   * @return vector's array
+   **/
+  const uint8_t *getBuffer() const {
+    throw std::runtime_error("Stream does not support this operation");
+  }
+
+  /**
+   * reads a byte from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint8_t &value);
+
+  /**
+   * reads two bytes from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint16_t &base_value, bool is_little_endian = false);
+
+  /**
+   * reads a byte from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(char &value);
+
+  /**
+   * reads a byte array from the stream
+   * @param value reference in which will set the result
+   * @param len length to read
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint8_t *value, int len);
+
+  /**
+   * reads four bytes from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint32_t &value, bool is_little_endian = false);
+
+  /**
+   * reads eight byte from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint64_t &value, bool is_little_endian = false);
+
+
+  /**
+   * read UTF from stream
+   * @param str reference string
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int readUTF(std::string &str, bool widen = false);
+
+ protected:
+
+  /**
+   * Creates a vector and returns the vector using the provided
+   * type name.
+   * @param t incoming object
+   * @returns vector.
+   */
+  template<typename T>
+  std::vector<uint8_t> readBuffer(const T&);
+  std::recursive_mutex file_lock_;
+
+  int fd_;
+
+ private:
+
+  std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_IO_DESCRIPTORSTREAM_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/include/io/ServerSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/ServerSocket.h b/libminifi/include/io/ServerSocket.h
new file mode 100644
index 0000000..025d05f
--- /dev/null
+++ b/libminifi/include/io/ServerSocket.h
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_SERVERSOCKET_H_
+#define LIBMINIFI_INCLUDE_IO_SERVERSOCKET_H_
+
+#include "ClientSocket.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+/**
+ * Purpose: Server socket abstraction that makes focusing the accept/block paradigm
+ * simpler.
+ */
+class ServerSocket : public Socket {
+ public:
+  explicit ServerSocket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners);
+
+  virtual ~ServerSocket();
+
+  int16_t initialize(bool loopbackOnly){
+    is_loopback_only_ = loopbackOnly;
+    return Socket::initialize();
+  }
+
+  virtual int16_t initialize(){
+    return Socket::initialize();
+  }
+
+  /**
+   * Registers a call back and starts the read for the server socket.
+   */
+  void registerCallback(std::function<bool()> accept_function, std::function<void(int)> handler);
+
+ private:
+
+  void close_fd(int fd );
+
+  std::atomic<bool> running_;
+
+  std::thread server_read_thread_;
+
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_IO_SERVERSOCKET_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/include/io/Sockets.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/Sockets.h b/libminifi/include/io/Sockets.h
index 2c0b163..78ea645 100644
--- a/libminifi/include/io/Sockets.h
+++ b/libminifi/include/io/Sockets.h
@@ -19,6 +19,7 @@
 #define LIBMINIFI_INCLUDE_IO_SOCKET_H_
 
 #include "ClientSocket.h"
+#include "ServerSocket.h"
 
 #ifdef OPENSSL_SUPPORT
 #include "tls/TLSSocket.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 7e06727..03452b6 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -365,9 +365,9 @@ void FlowController::initializeC2() {
 
     std::shared_ptr<c2::C2Agent> agent = std::make_shared<c2::C2Agent>(std::dynamic_pointer_cast<FlowController>(shared_from_this()), std::dynamic_pointer_cast<FlowController>(shared_from_this()),
                                                                        configuration_);
-    registerUpdateListener(agent, agent->getHeartBestDelay());
+    registerUpdateListener(agent, agent->getHeartBeatDelay());
 
-    state::StateManager::startMetrics(agent->getHeartBestDelay());
+    state::StateManager::startMetrics(agent->getHeartBeatDelay());
   }
   if (!c2_enabled_) {
     return;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/src/c2/C2Agent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index ae1629f..a8cc5b2 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -24,6 +24,7 @@
 #include <map>
 #include <string>
 #include <memory>
+#include "c2/ControllerSocketProtocol.h"
 #include "core/state/UpdateController.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
@@ -140,11 +141,21 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf
         logger_->log_debug("Could not instantiate %s", reporter);
       } else {
         std::shared_ptr<HeartBeatReporter> shp_reporter = std::static_pointer_cast<HeartBeatReporter>(heartbeat_reporter_obj);
-        shp_reporter->initialize(controller_, configuration_);
+        shp_reporter->initialize(controller_, update_sink_, configuration_);
         heartbeat_protocols_.push_back(shp_reporter);
       }
     }
   }
+
+  auto base_reporter = "ControllerSocketProtocol";
+  auto heartbeat_reporter_obj = core::ClassLoader::getDefaultClassLoader().instantiate(base_reporter, base_reporter);
+  if (heartbeat_reporter_obj == nullptr) {
+    logger_->log_debug("Could not instantiate %s", base_reporter);
+  } else {
+    std::shared_ptr<HeartBeatReporter> shp_reporter = std::static_pointer_cast<HeartBeatReporter>(heartbeat_reporter_obj);
+    shp_reporter->initialize(controller_, update_sink_, configuration_);
+    heartbeat_protocols_.push_back(shp_reporter);
+  }
 }
 
 void C2Agent::performHeartBeat() {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/src/c2/ControllerSocketProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp
new file mode 100644
index 0000000..22e8696
--- /dev/null
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -0,0 +1,259 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "c2/ControllerSocketProtocol.h"
+#include "utils/StringUtils.h"
+#include "io/DescriptorStream.h"
+#include <utility>
+#include <memory>
+#include <vector>
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+void ControllerSocketProtocol::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+                                          const std::shared_ptr<Configure> &configuration) {
+  HeartBeatReporter::initialize(controller, updateSink, configuration);
+  stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configuration);
+
+  std::string host = "localhost", port, limitStr;
+  bool anyInterface = false;
+  if (configuration_->get("controller.socket.local.any.interface", limitStr)) {
+    utils::StringUtils::StringToBool(limitStr, anyInterface);
+  }
+
+  // if host name isn't defined we will use localhost
+  configuration_->get("controller.socket.host", host);
+
+  if (nullptr != configuration_ && configuration_->get("controller.socket.port", port)) {
+    server_socket_ = std::unique_ptr<io::ServerSocket>(new io::ServerSocket(nullptr, host, std::stoi(port), 2));
+    // if we have a localhost hostname and we did not manually specify any.interface we will
+    // bind only to the loopback adapter
+    if ((host == "localhost" || host == "127.0.0.1" || host == "::") && !anyInterface) {
+      server_socket_->initialize(true);
+    } else {
+      server_socket_->initialize(true);
+    }
+
+    auto check = [this]() -> bool {
+      return update_sink_->isRunning();
+    };
+
+    auto handler = [this](int fd) {
+      uint8_t head;
+      io::DescriptorStream stream(fd);
+      if (stream.read(head) != 1) {
+        logger_->log_debug("Connection broke with fd %d", fd);
+        return;
+      }
+      switch (head) {
+        case Operation::START:
+        {
+          std::string componentStr;
+          int size = stream.readUTF(componentStr);
+          if ( size != -1 ) {
+            auto components = update_sink_->getComponents(componentStr);
+            for (auto component : components) {
+              component->start();
+            }
+          } else {
+            logger_->log_debug("Connection broke with fd %d", fd);
+          }
+        }
+        break;
+        case Operation::STOP:
+        {
+          std::string componentStr;
+          int size = stream.readUTF(componentStr);
+          if ( size != -1 ) {
+            auto components = update_sink_->getComponents(componentStr);
+            for (auto component : components) {
+              component->stop(true, 1000);
+            }
+          } else {
+            logger_->log_debug("Connection broke with fd %d", fd);
+          }
+        }
+        break;
+        case Operation::CLEAR:
+        {
+          std::string connection;
+          int size = stream.readUTF(connection);
+          if ( size != -1 ) {
+            update_sink_->clearConnection(connection);
+          }
+        }
+        break;
+        case Operation::UPDATE:
+        {
+          std::string what;
+          int size = stream.readUTF(what);
+          if (size == -1) {
+            logger_->log_debug("Connection broke with fd %d", fd);
+            break;
+          }
+          if (what == "flow") {
+            std::string ff_loc;
+            int size = stream.readUTF(ff_loc);
+            std::ifstream tf(ff_loc);
+            std::string configuration((std::istreambuf_iterator<char>(tf)),
+                std::istreambuf_iterator<char>());
+            if (size == -1) {
+              logger_->log_debug("Connection broke with fd %d", fd);
+              break;
+            }
+            update_sink_->applyUpdate(configuration);
+          }
+        }
+        break;
+        case Operation::DESCRIBE:
+        {
+          std::string what;
+          int size = stream.readUTF(what);
+          if (size == -1) {
+            logger_->log_debug("Connection broke with fd %d", fd);
+            break;
+          }
+          if (what == "queue") {
+            std::string connection;
+            int size = stream.readUTF(connection);
+            if (size == -1) {
+              logger_->log_debug("Connection broke with fd %d", fd);
+              break;
+            }
+            std::stringstream response;
+            {
+              std::lock_guard<std::mutex> lock(controller_mutex_);
+              response << queue_size_[connection] << " / " << queue_max_[connection];
+            }
+            io::BaseStream resp;
+            resp.writeData(&head, 1);
+            resp.writeUTF(response.str());
+            write(fd, resp.getBuffer(), resp.getSize());
+          } else if (what == "components") {
+            io::BaseStream resp;
+            resp.writeData(&head, 1);
+            uint16_t size = update_sink_->getAllComponents().size();
+            resp.write(size);
+            for (const auto &component : update_sink_->getAllComponents()) {
+              resp.writeUTF(component->getComponentName());
+            }
+            write(fd, resp.getBuffer(), resp.getSize());
+          } else if (what == "connections") {
+            io::BaseStream resp;
+            resp.writeData(&head, 1);
+            uint16_t size = queue_full_.size();
+            resp.write(size);
+            for (const auto &connection : queue_full_) {
+              resp.writeUTF(connection.first, false);
+            }
+            write(fd, resp.getBuffer(), resp.getSize());
+          } else if (what == "getfull") {
+            std::vector<std::string> full_connections;
+            {
+              std::lock_guard<std::mutex> lock(controller_mutex_);
+              for (auto conn : queue_full_) {
+                if (conn.second == true) {
+                  full_connections.push_back(conn.first);
+                }
+              }
+            }
+            io::BaseStream resp;
+            resp.writeData(&head, 1);
+            uint16_t full_connection_count = full_connections.size();
+            resp.write(full_connection_count);
+            for (auto conn : full_connections) {
+              resp.writeUTF(conn);
+            }
+            write(fd, resp.getBuffer(), resp.getSize());
+          }
+        }
+        break;
+      }
+    };
+    server_socket_->registerCallback(check, handler);
+  } else {
+    server_socket_ = nullptr;
+  }
+}
+
+void ControllerSocketProtocol::parse_content(const std::vector<C2ContentResponse> &content) {
+  for (const auto &payload_content : content) {
+    if (payload_content.name == "Components") {
+      for (auto content : payload_content.operation_arguments) {
+        bool is_enabled = false;
+        minifi::utils::StringUtils::StringToBool(content.second, is_enabled);
+        std::lock_guard<std::mutex> lock(controller_mutex_);
+        component_map_[content.first] = is_enabled;
+      }
+    }
+  }
+}
+
+int16_t ControllerSocketProtocol::heartbeat(const C2Payload &payload) {
+  if (server_socket_ == nullptr)
+    return 0;
+  const std::vector<C2ContentResponse> &content = payload.getContent();
+  for (const auto pc : payload.getNestedPayloads()) {
+    if (pc.getLabel() == "metrics") {
+      for (const auto metrics_payload : pc.getNestedPayloads()) {
+        if (metrics_payload.getLabel() == "QueueMetrics") {
+          for (const auto queue_metrics : metrics_payload.getNestedPayloads()) {
+            auto metric_content = queue_metrics.getContent();
+            for (const auto &payload_content : queue_metrics.getContent()) {
+              uint64_t size = 0;
+              uint64_t max = 0;
+              for (auto content : payload_content.operation_arguments) {
+                if (content.first == "datasize") {
+                  size = std::stol(content.second);
+                } else if (content.first == "datasizemax") {
+                  max = std::stol(content.second);
+                }
+              }
+              std::lock_guard<std::mutex> lock(controller_mutex_);
+              if (size >= max) {
+                queue_full_[payload_content.name] = true;
+              } else {
+                queue_full_[payload_content.name] = false;
+              }
+              queue_size_[payload_content.name] = size;
+              queue_max_[payload_content.name] = max;
+            }
+          }
+        }
+      }
+    }
+  }
+
+  parse_content(content);
+
+  std::vector<uint8_t> buffer;
+  buffer.resize(1024);
+
+  return 0;
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/src/io/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index 39fc982..d98c4ed 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -45,6 +45,7 @@ Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string
       addr_info_(0),
       socket_file_descriptor_(-1),
       socket_max_(0),
+      is_loopback_only_(false),
       listeners_(listeners),
       canonical_hostname_(""),
       logger_(logging::LoggerFactory<Socket>::getLogger()) {
@@ -59,6 +60,7 @@ Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string
 Socket::Socket(const Socket &&other)
     : requested_hostname_(std::move(other.requested_hostname_)),
       port_(std::move(other.port_)),
+      is_loopback_only_(false),
       addr_info_(std::move(other.addr_info_)),
       socket_file_descriptor_(other.socket_file_descriptor_),
       socket_max_(other.socket_max_.load()),
@@ -79,7 +81,7 @@ void Socket::closeStream() {
     addr_info_ = 0;
   }
   if (socket_file_descriptor_ >= 0) {
-    logging::LOG_INFO(logger_) <<  "Closing " << socket_file_descriptor_;
+    logging::LOG_DEBUG(logger_) << "Closing " << socket_file_descriptor_;
     close(socket_file_descriptor_);
     socket_file_descriptor_ = -1;
   }
@@ -109,7 +111,11 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
     struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
     sa_loc->sin_family = AF_INET;
     sa_loc->sin_port = htons(port_);
-    sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
+    if (is_loopback_only_) {
+      sa_loc->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+    } else {
+      sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
+    }
     if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
       logger_->log_error("Could not bind to socket", strerror(errno));
       return -1;
@@ -123,7 +129,11 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
       // use any address if you are connecting to the local machine for testing
       // otherwise we must use the requested hostname
       if (IsNullOrEmpty(requested_hostname_) || requested_hostname_ == "localhost") {
-        sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
+        if (is_loopback_only_) {
+          sa_loc->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+        } else {
+          sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
+        }
       } else {
         sa_loc->sin_addr.s_addr = addr;
       }
@@ -139,6 +149,8 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
   if (listeners_ > 0) {
     if (listen(socket_file_descriptor_, listeners_) == -1) {
       return -1;
+    } else {
+      logger_->log_debug("Created connection with %d listeners", listeners_);
     }
   }
   // add the listener to the total set
@@ -209,7 +221,6 @@ int16_t Socket::select_descriptor(const uint16_t msec) {
   }
 
   struct timeval tv;
-  int retval;
 
   read_fds_ = total_list_;
 
@@ -219,14 +230,9 @@ int16_t Socket::select_descriptor(const uint16_t msec) {
   std::lock_guard<std::recursive_mutex> guard(selection_mutex_);
 
   if (msec > 0)
-    retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv);
+    select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv);
   else
-    retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL);
-
-  if (retval < 0) {
-    logger_->log_error("Saw error during selection, error:%i %s", retval, strerror(errno));
-    return retval;
-  }
+    select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL);
 
   for (int i = 0; i <= socket_max_; i++) {
     if (FD_ISSET(i, &read_fds_)) {
@@ -252,7 +258,7 @@ int16_t Socket::select_descriptor(const uint16_t msec) {
     }
   }
 
-  logger_->log_error("Could not find a suitable file descriptor");
+  logger_->log_debug("Could not find a suitable file descriptor or select timed out");
 
   return -1;
 }
@@ -390,8 +396,10 @@ int Socket::readData(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
   while (buflen) {
     int16_t fd = select_descriptor(1000);
     if (fd < 0) {
-      logger_->log_debug("fd %d close %i", fd, buflen);
-      close(socket_file_descriptor_);
+      if (listeners_ <= 0) {
+        logger_->log_debug("fd %d close %i", fd, buflen);
+        close(socket_file_descriptor_);
+      }
       return -1;
     }
     int bytes_read = recv(fd, buf, buflen, 0);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/src/io/DescriptorStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/DescriptorStream.cpp b/libminifi/src/io/DescriptorStream.cpp
new file mode 100644
index 0000000..d50a39f
--- /dev/null
+++ b/libminifi/src/io/DescriptorStream.cpp
@@ -0,0 +1,196 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "io/DescriptorStream.h"
+#include <fstream>
+#include <unistd.h>
+#include <vector>
+#include <memory>
+#include <string>
+#include "io/validation.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+DescriptorStream::DescriptorStream(int fd)
+    : fd_(fd),
+      logger_(logging::LoggerFactory<DescriptorStream>::getLogger()) {
+}
+
+void DescriptorStream::seek(uint64_t offset) {
+  std::lock_guard<std::recursive_mutex> lock(file_lock_);
+  lseek(fd_, offset, 0x00);
+}
+
+int DescriptorStream::writeData(std::vector<uint8_t> &buf, int buflen) {
+  if (static_cast<int>(buf.capacity()) < buflen) {
+    return -1;
+  }
+  return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen);
+}
+
+// data stream overrides
+
+int DescriptorStream::writeData(uint8_t *value, int size) {
+  if (!IsNullOrEmpty(value)) {
+    std::lock_guard<std::recursive_mutex> lock(file_lock_);
+    if (::write(fd_, value, size) != size) {
+      return -1;
+    } else {
+      return size;
+    }
+  } else {
+    return -1;
+  }
+}
+
+template<typename T>
+inline std::vector<uint8_t> DescriptorStream::readBuffer(const T& t) {
+  std::vector<uint8_t> buf;
+  buf.resize(sizeof t);
+  readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+  return buf;
+}
+
+int DescriptorStream::readData(std::vector<uint8_t> &buf, int buflen) {
+  if (static_cast<int>(buf.capacity()) < buflen) {
+    buf.resize(buflen);
+  }
+  int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen);
+
+  if (ret < buflen) {
+    buf.resize(ret);
+  }
+  return ret;
+}
+
+int DescriptorStream::readData(uint8_t *buf, int buflen) {
+  if (!IsNullOrEmpty(buf)) {
+    auto size_read = ::read(fd_, buf, buflen);
+
+    if (size_read != buflen) {
+      return -1;
+    } else {
+      return buflen;
+    }
+
+  } else {
+    return -1;
+  }
+}
+
+/**
+ * reads a byte from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int DescriptorStream::read(uint8_t &value) {
+  return Serializable::read(value, reinterpret_cast<DataStream*>(this));
+}
+
+/**
+ * reads two bytes from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int DescriptorStream::read(uint16_t &base_value, bool is_little_endian) {
+  auto buf = readBuffer(base_value);
+  if (is_little_endian) {
+    base_value = (buf[0] << 8) | buf[1];
+  } else {
+    base_value = buf[0] | buf[1] << 8;
+  }
+  return 2;
+}
+
+/**
+ * reads a byte from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int DescriptorStream::read(char &value) {
+  return readData(reinterpret_cast<uint8_t*>(&value), 1);
+}
+
+/**
+ * reads a byte array from the stream
+ * @param value reference in which will set the result
+ * @param len length to read
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int DescriptorStream::read(uint8_t *value, int len) {
+  return readData(value, len);
+}
+
+/**
+ * reads four bytes from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int DescriptorStream::read(uint32_t &value, bool is_little_endian) {
+  auto buf = readBuffer(value);
+  if (is_little_endian) {
+    value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
+  } else {
+    value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
+  }
+  return 4;
+}
+
+/**
+ * reads eight byte from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int DescriptorStream::read(uint64_t &value, bool is_little_endian) {
+  auto buf = readBuffer(value);
+
+  if (is_little_endian) {
+    value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24)
+        | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
+  } else {
+    value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | ((uint64_t) (buf[4] & 255) << 32)
+        | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56);
+  }
+  return 8;
+}
+
+/**
+ * read UTF from stream
+ * @param str reference string
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+int DescriptorStream::readUTF(std::string &str, bool widen) {
+  return Serializable::readUTF(str, reinterpret_cast<DataStream*>(this), widen);
+}
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/src/io/ServerSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ServerSocket.cpp b/libminifi/src/io/ServerSocket.cpp
new file mode 100644
index 0000000..1a72a0f
--- /dev/null
+++ b/libminifi/src/io/ServerSocket.cpp
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "io/ServerSocket.h"
+#include <netinet/tcp.h>
+#include <sys/types.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+#include <cstdio>
+#include <memory>
+#include <utility>
+#include <vector>
+#include <cerrno>
+#include <iostream>
+#include <string>
+#include "io/validation.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+ServerSocket::ServerSocket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners = -1)
+    : Socket(context, hostname, port, listeners),
+      running_(true),
+      logger_(logging::LoggerFactory<ServerSocket>::getLogger()) {
+}
+
+ServerSocket::~ServerSocket() {
+  running_ = false;
+  if (server_read_thread_.joinable())
+    server_read_thread_.join();
+}
+
+/**
+ * Initializes the socket
+ * @return result of the creation operation.
+ */
+void ServerSocket::registerCallback(std::function<bool()> accept_function, std::function<void(int)> handler) {
+  auto fx = [this](std::function<bool()> accept_function, std::function<void(int)> handler) {
+    while (running_) {
+      int fd = select_descriptor(1000);
+      if (fd >= 0) {
+        handler(fd);
+        close_fd(fd);
+      }
+    }
+  };
+  server_read_thread_ = std::thread(fx, accept_function, handler);
+}
+
+void ServerSocket::close_fd(int fd) {
+  std::lock_guard<std::recursive_mutex> guard(selection_mutex_);
+  close(fd);
+  FD_CLR(fd, &total_list_);
+}
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/test/integration/ProvenanceReportingTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp
index 7db235f..0e8d52f 100644
--- a/libminifi/test/integration/ProvenanceReportingTest.cpp
+++ b/libminifi/test/integration/ProvenanceReportingTest.cpp
@@ -73,7 +73,7 @@ int main(int argc, char **argv) {
   std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
   ptr.release();
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 10005, 1);
+  org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 10005, 1);
 
   controller->load();
   controller->start();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/test/unit/ControllerTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp
new file mode 100644
index 0000000..c761b6e
--- /dev/null
+++ b/libminifi/test/unit/ControllerTests.cpp
@@ -0,0 +1,262 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <uuid/uuid.h>
+#include <vector>
+#include <memory>
+#include <utility>
+#include <string>
+#include "../TestBase.h"
+#include "io/ClientSocket.h"
+#include "core/Processor.h"
+#include "../../controller/Controller.h"
+#include "c2/ControllerSocketProtocol.h"
+
+#include "state/UpdateController.h"
+
+class TestStateController : public minifi::state::StateController {
+ public:
+  TestStateController()
+      : is_running(false) {
+  }
+  virtual ~TestStateController() {
+  }
+
+  virtual std::string getComponentName() {
+    return "TestStateController";
+  }
+  /**
+   * Start the client
+   */
+  virtual int16_t start() {
+    is_running = true;
+    return 0;
+  }
+  /**
+   * Stop the client
+   */
+  virtual int16_t stop(bool force, uint64_t timeToWait = 0) {
+    is_running = false;
+    return 0;
+  }
+
+  virtual bool isRunning() {
+    return is_running;
+  }
+
+  virtual int16_t pause() {
+    return 0;
+  }
+
+  std::atomic<bool> is_running;
+};
+
+class TestUpdateSink : public minifi::state::StateMonitor {
+ public:
+  explicit TestUpdateSink(std::shared_ptr<StateController> controller)
+      : is_running(true),
+        clear_calls(0),
+        controller(controller),
+        update_calls(0) {
+  }
+  virtual std::vector<std::shared_ptr<StateController>> getComponents(const std::string &name) {
+    std::vector<std::shared_ptr<StateController>> vec;
+    vec.push_back(controller);
+    return vec;
+  }
+
+  virtual std::vector<std::shared_ptr<StateController>> getAllComponents() {
+    std::vector<std::shared_ptr<StateController>> vec;
+    vec.push_back(controller);
+    return vec;
+  }
+
+  virtual std::string getComponentName() {
+    return "TestUpdateSink";
+  }
+  /**
+   * Start the client
+   */
+  virtual int16_t start() {
+    is_running = true;
+    return 0;
+  }
+  /**
+   * Stop the client
+   */
+  virtual int16_t stop(bool force, uint64_t timeToWait = 0) {
+    is_running = false;
+    return 0;
+  }
+
+  virtual bool isRunning() {
+    return is_running;
+  }
+
+  virtual int16_t pause() {
+    return 0;
+  }
+
+  /**
+   * Operational controllers
+   */
+
+  /**
+   * Drain repositories
+   */
+  virtual int16_t drainRepositories() {
+    return 0;
+  }
+
+  /**
+   * Clear connection for the agent.
+   */
+  virtual int16_t clearConnection(const std::string &connection) {
+    clear_calls++;
+    return 0;
+  }
+
+  /**
+   * Apply an update with the provided string.
+   *
+   * < 0 is an error code
+   * 0 is success
+   */
+  virtual int16_t applyUpdate(const std::string &configuration) {
+    update_calls++;
+    return 0;
+  }
+
+  /**
+   * Apply an update that the agent must decode. This is useful for certain operations
+   * that can't be encapsulated within these definitions.
+   */
+  virtual int16_t applyUpdate(const std::shared_ptr<minifi::state::Update> &updateController) {
+    return 0;
+  }
+
+  /**
+   * Returns uptime for this module.
+   * @return uptime for the current state monitor.
+   */
+  virtual uint64_t getUptime() {
+    return 8765309;
+  }
+
+  std::atomic<bool> is_running;
+  std::atomic<uint32_t> clear_calls;
+  std::shared_ptr<StateController> controller;
+  std::atomic<uint32_t> update_calls;
+};
+
+TEST_CASE("TestGet", "[test1]") {
+  auto controller = std::make_shared<TestStateController>();
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  configuration->set("controller.socket.host", "localhost");
+  configuration->set("controller.socket.port", "9997");
+  auto ptr = std::make_shared<TestUpdateSink>(controller);
+  minifi::c2::ControllerSocketProtocol protocol("testprotocol");
+  protocol.initialize(nullptr, ptr, configuration);
+
+  auto stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
+
+  auto socket = stream_factory->createSocket("localhost", 9997);
+
+  startComponent(std::move(socket), "TestStateController");
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(500));
+
+  REQUIRE(controller->isRunning() == true);
+
+  socket = stream_factory->createSocket("localhost", 9997);
+
+  stopComponent(std::move(socket), "TestStateController");
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(500));
+  REQUIRE(controller->isRunning() == false);
+
+  socket = stream_factory->createSocket("localhost", 9997);
+  std::stringstream ss;
+  listComponents(std::move(socket), ss);
+
+  REQUIRE(ss.str().find("TestStateController") != std::string::npos);
+}
+
+TEST_CASE("TestClear", "[test1]") {
+  auto controller = std::make_shared<TestStateController>();
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  configuration->set("controller.socket.host", "localhost");
+  configuration->set("controller.socket.port", "9997");
+  auto ptr = std::make_shared<TestUpdateSink>(controller);
+  minifi::c2::ControllerSocketProtocol protocol("testprotocol");
+  protocol.initialize(nullptr, ptr, configuration);
+
+  auto stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
+
+  auto socket = stream_factory->createSocket("localhost", 9997);
+
+  startComponent(std::move(socket), "TestStateController");
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(500));
+
+  REQUIRE(controller->isRunning() == true);
+
+  socket = stream_factory->createSocket("localhost", 9997);
+
+  clearConnection(std::move(socket), "connection");
+
+  socket = stream_factory->createSocket("localhost", 9997);
+
+  clearConnection(std::move(socket), "connection");
+
+  socket = stream_factory->createSocket("localhost", 9997);
+
+  clearConnection(std::move(socket), "connection");
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(500));
+
+  REQUIRE(3 == ptr->clear_calls);
+}
+
+TEST_CASE("TestUpdate", "[test1]") {
+  auto controller = std::make_shared<TestStateController>();
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  configuration->set("controller.socket.host", "localhost");
+  configuration->set("controller.socket.port", "9997");
+  auto ptr = std::make_shared<TestUpdateSink>(controller);
+  minifi::c2::ControllerSocketProtocol protocol("testprotocol");
+  protocol.initialize(nullptr, ptr, configuration);
+
+  auto stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
+
+  auto socket = stream_factory->createSocket("localhost", 9997);
+
+  startComponent(std::move(socket), "TestStateController");
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(500));
+
+  REQUIRE(controller->isRunning() == true);
+
+  std::stringstream ss;
+
+  socket = stream_factory->createSocket("localhost", 9997);
+
+  updateFlow(std::move(socket), ss, "connection");
+  std::this_thread::sleep_for(std::chrono::milliseconds(500));
+  REQUIRE(1 == ptr->update_calls);
+  REQUIRE(0 == ptr->clear_calls);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/test/unit/GetTCPTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/GetTCPTests.cpp b/libminifi/test/unit/GetTCPTests.cpp
index 60db1ee..87121dc 100644
--- a/libminifi/test/unit/GetTCPTests.cpp
+++ b/libminifi/test/unit/GetTCPTests.cpp
@@ -47,7 +47,7 @@ TEST_CASE("GetTCPWithoutEOM", "[GetTCP1]") {
 
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
   std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory = std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>());
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9184, 1);
+  org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 9184, 1);
 
   REQUIRE(-1 != server.initialize());
 
@@ -158,7 +158,7 @@ TEST_CASE("GetTCPWithOEM", "[GetTCP2]") {
 
   TestController testController;
 
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9182, 1);
+  org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 9182, 1);
 
   REQUIRE(-1 != server.initialize());
 
@@ -284,7 +284,7 @@ TEST_CASE("GetTCPWithOnlyOEM", "[GetTCP3]") {
 
   LogTestController::getInstance().setDebug<minifi::io::Socket>();
 
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9182, 1);
+  org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 9182, 1);
 
   REQUIRE(-1 != server.initialize());
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/edc8858f/libminifi/test/unit/SocketTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp
index 3e2760e..a4785f9 100644
--- a/libminifi/test/unit/SocketTests.cpp
+++ b/libminifi/test/unit/SocketTests.cpp
@@ -26,6 +26,7 @@
 #include "../TestBase.h"
 #include "io/StreamFactory.h"
 #include "io/ClientSocket.h"
+#include "io/ServerSocket.h"
 #include "io/tls/TLSSocket.h"
 #include "utils/ThreadPool.h"
 
@@ -55,7 +56,7 @@ TEST_CASE("TestSocketWriteTest2", "[TestSocket3]") {
   buffer.push_back('a');
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
 
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9183, 1);
+  org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 9183, 1);
 
   REQUIRE(-1 != server.initialize());
 
@@ -86,7 +87,7 @@ TEST_CASE("TestWriteEndian64", "[TestSocket4]") {
   buffer.push_back('a');
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
 
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9183, 1);
+  org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 9183, 1);
 
   REQUIRE(-1 != server.initialize());
 
@@ -113,7 +114,7 @@ TEST_CASE("TestWriteEndian32", "[TestSocket5]") {
 
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
 
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9183, 1);
+  org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 9183, 1);
   REQUIRE(-1 != server.initialize());
 
   org::apache::nifi::minifi::io::Socket client(socket_context, "localhost", 9183);
@@ -148,7 +149,7 @@ TEST_CASE("TestSocketWriteTestAfterClose", "[TestSocket6]") {
 
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
 
-  org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9183, 1);
+  org::apache::nifi::minifi::io::ServerSocket server(socket_context, "localhost", 9183, 1);
 
   REQUIRE(-1 != server.initialize());