You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/09/12 17:51:24 UTC

[01/11] nifi-minifi-cpp git commit: MINIFI-368 exclude hidden files when scanning for src files [Forced Update!]

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 17fe045b4 -> 84f50b517 (forced update)


MINIFI-368 exclude hidden files when scanning for src files

This closes #125.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/22f45281
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/22f45281
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/22f45281

Branch: refs/heads/master
Commit: 22f45281ddcbf70a93cf68ae4e186d5d710bd2ea
Parents: eb14080
Author: Andrew I. Christianson <an...@andyic.org>
Authored: Wed Aug 9 11:53:04 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Sep 12 13:51:03 2017 -0400

----------------------------------------------------------------------
 cmake/BuildTests.cmake | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/22f45281/cmake/BuildTests.cmake
----------------------------------------------------------------------
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index 2502537..29603bf 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -21,7 +21,7 @@ MACRO(GETSOURCEFILES result curdir)
   FILE(GLOB children RELATIVE ${curdir} ${curdir}/*)
   SET(dirlist "")
   FOREACH(child ${children})
-    IF( "${curdir}/${child}" MATCHES .*\\.cpp)
+    IF( "${child}" MATCHES ^[^.].*\\.cpp)
   
       LIST(APPEND dirlist ${child})
     ENDIF()


[11/11] nifi-minifi-cpp git commit: MINIFI-350 Added pytest-based system integration test framework and initial test cases

Posted by al...@apache.org.
MINIFI-350 Added pytest-based system integration test framework and initial test cases

This closes #126.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/84f50b51
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/84f50b51
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/84f50b51

Branch: refs/heads/master
Commit: 84f50b5173b6d3cb88ddd8a8f366b80483081da2
Parents: 35a47c7
Author: Andrew I. Christianson <an...@andyic.org>
Authored: Thu Jul 13 10:42:35 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Sep 12 13:51:14 2017 -0400

----------------------------------------------------------------------
 .gitignore                                      |   4 +
 README.md                                       |  18 ++
 cmake/DockerConfig.cmake                        |   4 +-
 docker/DockerVerify.sh                          |  38 +++
 docker/test/integration/.gitignore              |   1 +
 docker/test/integration/README.md               | 184 ++++++++++++
 docker/test/integration/minifi/__init__.py      | 298 +++++++++++++++++++
 docker/test/integration/minifi/test/__init__.py | 191 ++++++++++++
 docker/test/integration/test_filesystem_ops.py  |  54 ++++
 docker/test/integration/test_http.py            |  36 +++
 10 files changed, 827 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 2ba3164..c28b765 100644
--- a/.gitignore
+++ b/.gitignore
@@ -54,3 +54,7 @@ thirdparty/apache-rat/apache-rat*
 # Ignore source files that have been placed in the docker directory during build
 docker/minificppsource
 *.swp
+.cache
+.cproject
+.settings
+*.pyc

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index f971683..8830aaa 100644
--- a/README.md
+++ b/README.md
@@ -99,6 +99,11 @@ $ yum install cmake \
   boost-devel \
   libssl-dev \
   doxygen
+$ # (Optional) for building docker image
+$ yum install docker
+$ # (Optional) for system integration tests
+$ yum install docker python-virtualenv
+
 ```
 
 Aptitude based Linux Distributions
@@ -111,6 +116,10 @@ $ apt-get install cmake \
   uuid-dev uuid \
   libboost-all-dev libssl-dev \
   doxygen
+$ # (Optional) for building docker image
+$ apt-get install docker.io
+$ # (Optional) for system integration tests
+$ apt-get install docker.io python-virtualenv
 ```
 
 OS X Using Homebrew (with XCode Command Line Tools installed)
@@ -124,6 +133,9 @@ $ brew install cmake \
   doxygen
 $ brew install curl
 $ brew link curl --force
+$ # (Optional) for building docker image/running system integration tests
+$ # Install docker using instructions at https://docs.docker.com/docker-for-mac/install/
+$ sudo pip install virtualenv
 ```
 
 
@@ -221,6 +233,12 @@ Successfully built c390063d9bd1
 Built target docker
 ```
 
+- (Optional) Execute system integration tests using the docker image built locally on a docker daemon running locally.
+```
+~/Development/code/apache/nifi-minifi-cpp/build
+$ make docker-verify
+```
+
 ### Cleaning
 Remove the build directory created above.
 ```

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/cmake/DockerConfig.cmake
----------------------------------------------------------------------
diff --git a/cmake/DockerConfig.cmake b/cmake/DockerConfig.cmake
index 41ca7f7..57270e4 100644
--- a/cmake/DockerConfig.cmake
+++ b/cmake/DockerConfig.cmake
@@ -21,4 +21,6 @@ add_custom_target(
     COMMAND ${CMAKE_SOURCE_DIR}/docker/DockerBuild.sh 1000 1000 ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH} minificppsource ${CMAKE_SOURCE_DIR}
     WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/docker/)
 
-
+add_custom_target(
+    docker-verify
+    COMMAND ${CMAKE_SOURCE_DIR}/docker/DockerVerify.sh)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/docker/DockerVerify.sh
----------------------------------------------------------------------
diff --git a/docker/DockerVerify.sh b/docker/DockerVerify.sh
new file mode 100755
index 0000000..be62c1f
--- /dev/null
+++ b/docker/DockerVerify.sh
@@ -0,0 +1,38 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -e
+
+docker_dir="$( cd ${0%/*} && pwd )"
+
+# Create virutal environment for testing
+if [[ ! -d ./test-env-py2 ]]; then
+  echo "Creating virtual environment in ./test-env-py2" 1>&2
+  virtualenv ./test-env-py2
+fi
+
+echo "Activating virtual environment..." 1>&2
+. ./test-env-py2/bin/activate
+pip install --upgrade pip setuptools
+
+# Install test dependencies
+echo "Installing test dependencies..." 1>&2
+pip install --upgrade pytest docker PyYAML watchdog
+
+export MINIFI_VERSION=0.3.0
+export PYTHONPATH="${PYTHONPATH}:${docker_dir}/test/integration"
+pytest -s -v "${docker_dir}"/test/integration

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/docker/test/integration/.gitignore
----------------------------------------------------------------------
diff --git a/docker/test/integration/.gitignore b/docker/test/integration/.gitignore
new file mode 100644
index 0000000..bee8a64
--- /dev/null
+++ b/docker/test/integration/.gitignore
@@ -0,0 +1 @@
+__pycache__

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/docker/test/integration/README.md
----------------------------------------------------------------------
diff --git a/docker/test/integration/README.md b/docker/test/integration/README.md
new file mode 100644
index 0000000..6d8c066
--- /dev/null
+++ b/docker/test/integration/README.md
@@ -0,0 +1,184 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+# Apache MiNiFi Docker System Integration Tests
+
+Apache MiNiFi includes a suite of docker-based system integration tests. These
+tests are designed to test the integration between distinct MiNiFi instances as
+well as other systems which are available in docker, such as Apache NiFi.
+
+## Test Execution Lifecycle
+
+Each test involves the following stages as part of its execution lifecycle:
+
+### Definition of flows/Flow DSL
+
+Flows are defined using a python-native domain specific language (DSL). The DSL
+supports the standard primitives which make up a NiFi/MiNiFi flow, such as
+processors, connections, and controller services. Several processors defined in
+the DSL have optional, named parameters enabling concise flow expression.
+
+By default, all relationships are set to auto-terminate. If a relationship is
+used, it is automatically taken out of the auto\_terminate list.
+
+**Example Trivial Flow:**
+
+```python
+flow = GetFile('/tmp/input') >> LogAttribute() >> PutFile('/tmp/output')
+```
+
+#### Supported Processors
+
+The following processors/parameters are supported:
+
+**GetFile**
+
+- input\_dir
+
+**PutFile**
+
+- output\_dir
+
+**LogAttribute**
+
+**ListenHTTP**
+
+- port
+- cert=None
+
+**InvokeHTTP**
+
+- url
+- method='GET'
+- ssl\_context\_service=None
+
+### Definition of an output validator
+
+The output validator is responsible for checking the state of a cluster for
+valid output conditions. Currently, the only supported output validator is the
+SingleFileOutputValidator, which looks for a single file to be written to
+/tmp/output by a flow having a given string as its contents.
+
+**Example SingleFileOutputValidator:**
+
+```python
+SingleFileOutputValidator('example output')
+```
+
+This example SingleFileOutputValidator would validate that a single file is
+written with the contents 'example output.'
+
+### Creation of a DockerTestCluster
+
+DockerTestCluster instances are used to deploy one or more flow to a simulated
+or actual multi-host docker cluster. This enables testing of interactions
+between multiple system components, such as MiNiFi flows. Before the test
+cluster is destroyed, an assertion may be performed on the results of the
+*check\_output()* method of the cluster. This invokes the validator supplied at
+construction against the output state of the system.
+
+Creation of a DockerTestCluster is simple:
+
+**Example DockerTestCluster Instantiation:**
+
+```python
+with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+  ...
+  # Perform test operations
+  ...
+  assert cluster.check_output()
+```
+
+Note that a docker cluster must be created inside of a *with* structure to
+ensure that all resources are ccreated and destroyed cleanly. 
+
+### Insertion of test input data
+
+Although arbitrary NiFi flows can ingest data from a multitude of sources, a
+MiNiFi system integration test is expected to receive input via deterministed,
+controlled channels. The primary supported method of providing input to a
+MiNiFi system integration test is to insert data into the filesystem at
+/tmp/input.
+
+To write a string to the contents of a file in /tmp/input, use the
+*put\_test\_data()* method.
+
+**Example put\_test\_data() Usage:**
+
+```python
+cluster.put_test_data('test')
+```
+
+This writes a file with a random name to /tmp/input, with the contents 'test.'
+
+To provide a resource to a container, such as a TLS certificate, use the
+*put\_test\_resource()* method to write a resource file to /tmp/resources.
+
+**Example put\_test\_resource() Usage:**
+
+```python
+cluster.put_test_resource('test-resource', 'resource contents')
+```
+
+This writes a file to /tmp/resources/test-resource with the contents 'resource
+contents.'
+
+### Deployment of one or more flows
+
+Deployment of flows to a test cluster is performed using the *deploy\_flow()*
+method of a cluster. Each flow is deployed as a separate docker service having
+its own DNS name. If a name is not provided upon deployment, a random name will
+be used.
+
+**Example deploy\_flow() Usage:**
+
+```python
+cluster.deploy_flow(flow, name='test-flow')
+```
+
+### Execution of one or more flows
+
+Flows are executed immediately upon deployment and according to schedule
+properties defined in the flow.yml. As such, to minimize test latency it is
+important to ensure that test inputs are added to the test cluster before flows
+are deployed. Filesystem events are monitored using event APIs, ensuring that
+flows are executed immediately upon input availability and output is validated
+immediately after it is written to disk.
+
+### Output validation
+
+As soon as data is written to /tmp/output, the OutputValidator (defined
+according to the documentation above) is executed on the output. The
+*check\_output()* cluster method waits for up to 5 seconds for valid output.
+
+### Cluster teardown/cleanup
+
+The deployment of a test cluster involves creating one or more docker
+containers and networks, as well as temporary files/directories on the host
+system. All resources are cleaned up automatically as long as clusters are
+created within a *with* block.
+
+```python
+
+# Using the with block ensures that all cluster resources are cleaned up after
+# the test cluster is no longer needed.
+
+with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+  ...
+  # Perform test operations
+  ...
+  assert cluster.check_output()
+```
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/docker/test/integration/minifi/__init__.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py
new file mode 100644
index 0000000..557b9a8
--- /dev/null
+++ b/docker/test/integration/minifi/__init__.py
@@ -0,0 +1,298 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import uuid
+import logging
+
+import yaml
+import docker
+
+
+class Cluster(object):
+    """
+    Base Cluster class. This is intended to be a generic interface
+    to different types of clusters. Clusters could be Kubernetes clusters,
+    Docker swarms, or cloud compute/container services.
+    """
+
+    def deploy_flow(self, flow):
+        """
+        Deploys a flow to the cluster.
+        """
+
+    def __enter__(self):
+        """
+        Allocate ephemeral cluster resources.
+        """
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """
+        Clean up ephemeral cluster resources.
+        """
+
+class SingleNodeDockerCluster(Cluster):
+    """
+    A "cluster" which consists of a single docker node. Useful for
+    testing or use-cases which do not span multiple compute nodes.
+    """
+
+    def __init__(self):
+        self.network = None
+        self.containers = []
+        self.tmp_files = []
+
+        # Get docker client
+        self.client = docker.from_env()
+
+    def deploy_flow(self, flow, name=None, vols={}):
+        """
+        Compiles the flow to YAML and maps it into the container using
+        the docker volumes API.
+        """
+
+        logging.info('Deploying flow...')
+
+        if name is None:
+            name = 'minifi-' + str(uuid.uuid4())
+            logging.info('Flow name was not provided; using generated name \'%s\'', name)
+
+        minifi_version = os.environ['MINIFI_VERSION']
+        self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + minifi_version
+
+        # Write flow config
+        tmp_flow_file_name = '/tmp/.minifi-flow.' + str(uuid.uuid4())
+        self.tmp_files.append(tmp_flow_file_name)
+
+        yaml = flow_yaml(flow)
+
+        logging.info('Using generated flow config yml:\n%s', yaml)
+
+        with open(tmp_flow_file_name, 'w') as tmp_flow_file:
+            tmp_flow_file.write(yaml)
+
+        conf_file = tmp_flow_file_name
+
+        local_vols = {}
+        local_vols[conf_file] = {'bind': self.minifi_root + '/conf/config.yml', 'mode': 'ro'}
+        local_vols.update(vols)
+
+        logging.info('Creating and running docker container for flow...')
+
+        # Create network if necessary
+        if self.network is None:
+            net_name = 'minifi-' + str(uuid.uuid4())
+            logging.info('Creating network: %s', net_name)
+            self.network = self.client.networks.create(net_name)
+
+        container = self.client.containers.run(
+            'apacheminificpp:' + minifi_version,
+            detach=True,
+            name=name,
+            network=self.network.name,
+            volumes=local_vols)
+
+        logging.info('Started container \'%s\'', container.name)
+        self.containers.append(container)
+
+    def __enter__(self):
+        """
+        Allocate ephemeral cluster resources.
+        """
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """
+        Clean up ephemeral cluster resources
+        """
+
+        # Clean up containers
+        for container in self.containers:
+            logging.info('Cleaning up container: %s', container.name)
+            container.remove(v=True, force=True)
+
+        # Clean up network
+        if self.network is not None:
+            logging.info('Cleaning up network network: %s', self.network.name)
+            self.network.remove()
+
+        # Clean up tmp files
+        for tmp_file in self.tmp_files:
+            os.remove(tmp_file)
+
+
+class Processor(object):
+
+    def __init__(self,
+                 clazz,
+                 properties={},
+                 schedule={},
+                 name=None,
+                 auto_terminate=[]):
+        self.connections = {}
+        self.uuid = uuid.uuid4()
+
+        if name is None:
+            self.name = str(self.uuid)
+
+        self.clazz = clazz
+        self.properties = properties
+        self.auto_terminate = auto_terminate
+
+        self.out_proc = self
+
+        self.schedule = {
+            'scheduling strategy': 'EVENT_DRIVEN',
+            'scheduling period': '1 sec',
+            'penalization period': '30 sec',
+            'yield period': '1 sec',
+            'run duration nanos': 0
+        }
+        self.schedule.update(schedule)
+
+    def connect(self, connections):
+        for rel in connections:
+
+            # Ensure that rel is not auto-terminated
+            if rel in self.auto_terminate:
+                del self.auto_terminate[self.auto_terminate.index(rel)]
+
+            # Add to set of output connections for this rel
+            if not rel in self.connections:
+                self.connections[rel] = []
+            self.connections[rel].append(connections[rel])
+
+        return self
+
+    def __rshift__(self, other):
+        """
+        Right shift operator to support flow DSL, for example:
+
+            GetFile('/input') >> LogAttribute() >> PutFile('/output')
+
+        """
+
+        if (isinstance(other, tuple)):
+            if (isinstance(other[0], tuple)):
+                for rel_tuple in other:
+                    rel = {}
+                    rel[rel_tuple[0]] = rel_tuple[1]
+                    self.out_proc.connect(rel)
+            else:
+                rel = {}
+                rel[other[0]] = other[1]
+                self.out_proc.connect(rel)
+        else:
+            self.out_proc.connect({'success': other})
+            self.out_proc = other
+
+        return self
+
+
+def InvokeHTTP(url, method='GET'):
+    return Processor('InvokeHTTP',
+                     properties={'Remote URL': url,
+                                 'HTTP Method': method},
+                     auto_terminate=['success',
+                                     'response',
+                                     'retry',
+                                     'failure',
+                                     'no retry'])
+
+
+def ListenHTTP(port):
+    return Processor('ListenHTTP',
+                     properties={'Listening Port': port},
+                     auto_terminate=['success'])
+
+
+def LogAttribute():
+    return Processor('LogAttribute',
+                     auto_terminate=['success'])
+
+
+def GetFile(input_dir):
+    return Processor('GetFile',
+                     properties={'Input Directory': input_dir},
+                     schedule={'scheduling period': '0 sec'},
+                     auto_terminate=['success'])
+
+
+def PutFile(output_dir):
+    return Processor('PutFile',
+                     properties={'Output Directory': output_dir},
+                     auto_terminate=['success', 'failure'])
+
+
+def flow_yaml(processor, root=None, visited=[]):
+
+    if root is None:
+        res = {
+            'Flow Controller': {
+                'name': 'MiNiFi Flow'
+            },
+            'Processors': [],
+            'Connections': [],
+            'Remote Processing Groups': []
+        }
+    else:
+        res = root
+
+    visited.append(processor)
+
+    if hasattr(processor, 'name'):
+        proc_name = processor.name
+    else:
+        proc_name = str(processor.uuid)
+
+    res['Processors'].append({
+        'name': proc_name,
+        'id': str(processor.uuid),
+        'class': 'org.apache.nifi.processors.standard.' + processor.clazz,
+        'scheduling strategy': processor.schedule['scheduling strategy'],
+        'scheduling period': processor.schedule['scheduling period'],
+        'penalization period': processor.schedule['penalization period'],
+        'yield period': processor.schedule['yield period'],
+        'run duration nanos': processor.schedule['run duration nanos'],
+        'Properties': processor.properties,
+        'auto-terminated relationships list': processor.auto_terminate
+    })
+
+    for conn_name in processor.connections:
+        conn_procs = processor.connections[conn_name]
+
+        if isinstance(conn_procs, list):
+            for proc in conn_procs:
+                res['Connections'].append({
+                    'name': str(uuid.uuid4()),
+                    'source id': str(processor.uuid),
+                    'source relationship name': conn_name,
+                    'destination id': str(proc.uuid)
+                })
+                if proc not in visited:
+                    flow_yaml(proc, res, visited)
+        else:
+            res['Connections'].append({
+                'name': str(uuid.uuid4()),
+                'source id': str(processor.uuid),
+                'source relationship name': conn_name,
+                'destination id': str(conn_procs.uuid)
+            })
+            if conn_procs not in visited:
+                flow_yaml(conn_procs, res, visited)
+
+    if root is None:
+        return yaml.dump(res, default_flow_style=False)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/docker/test/integration/minifi/test/__init__.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py
new file mode 100644
index 0000000..a7a3030
--- /dev/null
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import logging
+import shutil
+import uuid
+
+from threading import Event
+
+from os import listdir
+from os.path import isfile, join
+
+from watchdog.observers import Observer
+from watchdog.events import FileSystemEventHandler
+
+from minifi import SingleNodeDockerCluster
+
+logging.basicConfig(level=logging.DEBUG)
+
+
+class DockerTestCluster(SingleNodeDockerCluster):
+
+    def __init__(self, output_validator):
+
+        # Create test input/output directories
+        test_cluster_id = str(uuid.uuid4())
+
+        self.tmp_test_output_dir = '/tmp/.minifi-test-output.' + test_cluster_id
+        self.tmp_test_input_dir = '/tmp/.minifi-test-input.' + test_cluster_id
+
+        logging.info('Creating tmp test input dir: %s', self.tmp_test_input_dir)
+        os.makedirs(self.tmp_test_input_dir, mode=0777)
+        logging.info('Creating tmp test output dir: %s', self.tmp_test_output_dir)
+        os.makedirs(self.tmp_test_output_dir, mode=0777)
+
+        # Point output validator to ephemeral output dir
+        self.output_validator = output_validator
+        output_validator.set_output_dir(self.tmp_test_output_dir)
+
+        # Start observing output dir
+        self.done_event = Event()
+        event_handler = OutputEventHandler(output_validator, self.done_event)
+        self.observer = Observer()
+        self.observer.schedule(event_handler, self.tmp_test_output_dir)
+        self.observer.start()
+
+        super(DockerTestCluster, self).__init__()
+
+    def deploy_flow(self, flow, name=None):
+        """
+        Performs a standard container flow deployment with the addition
+        of volumes supporting test input/output directories.
+        """
+
+        vols = {}
+        vols[self.tmp_test_input_dir] = {'bind': '/tmp/input', 'mode': 'rw'}
+        vols[self.tmp_test_output_dir] = {'bind': '/tmp/output', 'mode': 'rw'}
+
+        super(DockerTestCluster, self).deploy_flow(flow, vols=vols, name=name)
+
+    def put_test_data(self, contents):
+        """
+        Creates a randomly-named file in the test input dir and writes
+        the given content to it.
+        """
+
+        test_file_name = join(self.tmp_test_input_dir, str(uuid.uuid4()))
+        logging.info('Writing %d bytes of content to test file: %s', len(contents), test_file_name)
+
+        with open(test_file_name, 'w') as test_input_file:
+            test_input_file.write(contents)
+
+    def wait_for_output(self, timeout_seconds):
+        logging.info('Waiting up to %d seconds for test output...', timeout_seconds)
+        self.done_event.wait(timeout_seconds)
+        self.observer.stop()
+        self.observer.join()
+
+    def log_minifi_output(self):
+
+        for container in self.containers:
+            container = self.client.containers.get(container.id)
+            logging.info('Container logs for container \'%s\':\n%s', container.name, container.logs())
+            if container.status == 'running':
+                app_logs = container.exec_run('cat ' + self.minifi_root + '/minifi-app.log')
+                logging.info('MiNiFi app logs for container \'%s\':\n%s', container.name, app_logs)
+            else:
+                logging.info(container.status)
+                logging.info('Could not cat app logs for container \'%s\' because it is not running',
+                             container.name)
+            stats = container.stats(decode=True, stream=False)
+            logging.info('Container stats:\n%s', repr(stats))
+
+    def check_output(self):
+        """
+        Wait for flow output, validate it, and log minifi output.
+        """
+        self.wait_for_output(5)
+        self.log_minifi_output()
+
+        return self.output_validator.validate()
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """
+        Clean up ephemeral test resources.
+        """
+
+        logging.info('Removing tmp test input dir: %s', self.tmp_test_input_dir)
+        shutil.rmtree(self.tmp_test_input_dir)
+        logging.info('Removing tmp test output dir: %s', self.tmp_test_output_dir)
+        shutil.rmtree(self.tmp_test_output_dir)
+
+        super(DockerTestCluster, self).__exit__(exc_type, exc_val, exc_tb)
+
+
+class OutputEventHandler(FileSystemEventHandler):
+
+    def __init__(self, validator, done_event):
+        self.validator = validator
+        self.done_event = done_event
+
+    def on_created(self, event):
+        logging.info('Output file created: ' + event.src_path)
+        self.check(event)
+
+    def on_modified(self, event):
+        logging.info('Output file modified: ' + event.src_path)
+        self.check(event)
+
+    def check(self, event):
+        if self.validator.validate():
+            logging.info('Output file is valid')
+            self.done_event.set()
+        else:
+            logging.info('Output file is invalid')
+
+
+class OutputValidator(object):
+    """
+    Base output validator class. Validators must implement
+    method validate, which returns a boolean.
+    """
+
+    def validate(self):
+        """
+        Return True if output is valid; False otherwise.
+        """
+
+class SingleFileOutputValidator(OutputValidator):
+    """
+    Validates the content of a single file in the given directory.
+    """
+
+    def __init__(self, expected_content):
+        self.valid = False
+        self.expected_content = expected_content
+
+    def set_output_dir(self, output_dir):
+        self.output_dir = output_dir
+
+    def validate(self):
+
+        if self.valid:
+            return True
+
+        listing = listdir(self.output_dir)
+
+        if len(listing) > 0:
+            out_file_name = listing[0]
+
+            with open(join(self.output_dir, out_file_name), 'r') as out_file:
+                contents = out_file.read()
+
+                if contents == self.expected_content:
+                    self.valid = True
+                    return True
+
+        return False

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/docker/test/integration/test_filesystem_ops.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/test_filesystem_ops.py b/docker/test/integration/test_filesystem_ops.py
new file mode 100644
index 0000000..7a6f212
--- /dev/null
+++ b/docker/test/integration/test_filesystem_ops.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from minifi import *
+from minifi.test import *
+
+
+def test_get_put():
+    """
+    Verify basic file get/put operations.
+    """
+
+    flow = GetFile('/tmp/input') >> LogAttribute() >> PutFile('/tmp/output')
+
+    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+
+        cluster.put_test_data('test')
+        cluster.deploy_flow(flow)
+
+        assert cluster.check_output()
+
+
+def test_file_exists_failure():
+    """
+    Verify that putting to a file that already exists fails.
+    """
+
+    flow = (GetFile('/tmp/input') >>
+
+            # First put should succeed
+            PutFile('/tmp') >>
+
+            # Second put should fail (file exists)
+            PutFile('/tmp') >> (('success', LogAttribute()),
+                                ('failure', LogAttribute() >> PutFile('/tmp/output'))))
+
+    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+
+        cluster.put_test_data('test')
+        cluster.deploy_flow(flow)
+
+        assert cluster.check_output()

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/84f50b51/docker/test/integration/test_http.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/test_http.py b/docker/test/integration/test_http.py
new file mode 100644
index 0000000..72c80bd
--- /dev/null
+++ b/docker/test/integration/test_http.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from minifi import *
+from minifi.test import *
+
+
+def test_invoke_listen():
+    """
+    Verify sending using InvokeHTTP to a receiver using ListenHTTP.
+    """
+
+    invoke_flow = (GetFile('/tmp/input') >> LogAttribute() >>
+                   InvokeHTTP('http://minifi-listen:8080/contentListener', method='POST'))
+
+    listen_flow = ListenHTTP(8080) >> LogAttribute() >> PutFile('/tmp/output')
+
+    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+
+        cluster.put_test_data('test')
+        cluster.deploy_flow(listen_flow, name='minifi-listen')
+        cluster.deploy_flow(invoke_flow, name='minifi-invoke')
+
+        assert cluster.check_output()


[09/11] nifi-minifi-cpp git commit: MINIFI-388 Disable dynamic loading of TLS libs in civet, install libressl instead of openssl to resolve conflict with libcurl deps, and backport fix for civetweb which fixes compatibility with libressl

Posted by al...@apache.org.
MINIFI-388 Disable dynamic loading of TLS libs in civet, install libressl instead of openssl
to resolve conflict with libcurl deps, and backport fix for civetweb which fixes compatibility with libressl

This closes #131.

Signed-off-by: Marc Parisi <ph...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/35a47c7f
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/35a47c7f
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/35a47c7f

Branch: refs/heads/master
Commit: 35a47c7f42916c5c942824d35e252b7d51409f90
Parents: 0e24a34
Author: Andrew I. Christianson <an...@andyic.org>
Authored: Mon Aug 21 12:19:54 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Sep 12 13:51:04 2017 -0400

----------------------------------------------------------------------
 CMakeLists.txt                           |  3 ++-
 docker/Dockerfile                        |  6 ++++--
 thirdparty/civetweb-1.9.1/src/civetweb.c | 26 +++++++++++++++++++++++++-
 3 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/35a47c7f/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 361f9a5..292bc8d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -101,9 +101,10 @@ set(prefix "lib")
 set(suffix ".a")
 set(JSONCPP_LIB "${JSONCPP_LIB_DIR}/lib/${prefix}jsoncpp${suffix}")
 
+set(CIVETWEB_ENABLE_SSL_DYNAMIC_LOADING OFF CACHE BOOL "Disable dynamic SSL library loading")
 set(CIVETWEB_ENABLE_CXX ON CACHE BOOL "Enable civet C++ library")
 add_subdirectory(thirdparty/yaml-cpp-yaml-cpp-0.5.3)
-add_subdirectory(thirdparty/civetweb-1.9.1)
+add_subdirectory(thirdparty/civetweb-1.9.1 EXCLUDE_FROM_ALL)
 include_directories(thirdparty/concurrentqueue)
 add_subdirectory(libminifi)
 add_subdirectory(main)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/35a47c7f/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 213015c..7688439 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -44,7 +44,8 @@ RUN apk --update --no-cache upgrade && apk --update --no-cache add gcc \
 	git \
 	unzip \
 	gpsd-dev \
-	openssl-dev
+	libressl-dev \
+	zlib-dev
 
 ENV USER minificpp
 ENV MINIFI_BASE_DIR /opt/minifi
@@ -86,7 +87,8 @@ RUN apk --update --no-cache upgrade && apk add --update --no-cache \
 	curl \
 	unzip \
 	gpsd \
-	openssl
+	libressl \
+	zlib
 
 # Start MiNiFi CPP in the foreground
 ENV USER minificpp

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/35a47c7f/thirdparty/civetweb-1.9.1/src/civetweb.c
----------------------------------------------------------------------
diff --git a/thirdparty/civetweb-1.9.1/src/civetweb.c b/thirdparty/civetweb-1.9.1/src/civetweb.c
index da491b6..ba916da 100644
--- a/thirdparty/civetweb-1.9.1/src/civetweb.c
+++ b/thirdparty/civetweb-1.9.1/src/civetweb.c
@@ -11826,6 +11826,9 @@ ssl_get_client_cert_info(struct mg_connection *conn)
 		unsigned char buf[256];
 		int len;
 		unsigned int ulen;
+		int ilen;
+		unsigned char *tmp_buf;
+		unsigned char *tmp_p;
 
 		/* Handle to algorithm used for fingerprint */
 		const EVP_MD *digest = EVP_get_digestbyname("sha1");
@@ -11856,7 +11859,24 @@ ssl_get_client_cert_info(struct mg_connection *conn)
 
 		/* Calculate SHA1 fingerprint and store as a hex string */
 		ulen = 0;
-		ASN1_digest((int (*)())i2d_X509, digest, (char *)cert, buf, &ulen);
+
+		/* ASN1_digest is deprecated. Do the calculation manually,
+		 * using EVP_Digest. */
+		ilen = i2d_X509(cert, NULL);
+		tmp_buf =
+			(ilen > 0)
+				? (unsigned char *)mg_malloc((unsigned)ilen + 1)
+				: NULL;
+		if (tmp_buf) {
+			tmp_p = tmp_buf;
+			(void)i2d_X509(cert, &tmp_p);
+			if (!EVP_Digest(
+					tmp_buf, (unsigned)ilen, buf, &ulen, digest, NULL)) {
+				ulen = 0;
+			}
+			mg_free(tmp_buf);
+		}
+
 		if (!hexdump2string(
 		        buf, (int)ulen, str_finger, (int)sizeof(str_finger))) {
 			*str_finger = 0;
@@ -12109,7 +12129,11 @@ set_ssl_option(struct mg_context *ctx)
 	SSL_CTX_set_options(ctx->ssl_ctx, ssl_get_protocol(protocol_ver));
 	SSL_CTX_set_options(ctx->ssl_ctx, SSL_OP_SINGLE_DH_USE);
 	SSL_CTX_set_options(ctx->ssl_ctx, SSL_OP_CIPHER_SERVER_PREFERENCE);
+/* BEGIN Backport of commit from civetweb.c https://github.com/civetweb/civetweb/commit/e849ce4b54c09d5b4441e371f17cf13368ac2234 */
+#if !defined(NO_SSL_DL)
 	SSL_CTX_set_ecdh_auto(ctx->ssl_ctx, 1);
+#endif /* NO_SSL_DL */
+/* END Backport of commit from civetweb.c https://github.com/civetweb/civetweb/commit/e849ce4b54c09d5b4441e371f17cf13368ac2234 */
 
 	/* If a callback has been specified, call it. */
 	callback_ret =


[07/11] nifi-minifi-cpp git commit: MINIFI-338: Improve wait decay per pull request comments

Posted by al...@apache.org.
MINIFI-338: Improve wait decay per pull request comments


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/35d23d09
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/35d23d09
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/35d23d09

Branch: refs/heads/master
Commit: 35d23d091a9b3c6be7eb09fa47b6d8c65bb5334f
Parents: ead9e84
Author: Marc <ma...@gmail.com>
Authored: Thu Jul 20 19:28:26 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Sep 12 13:51:03 2017 -0400

----------------------------------------------------------------------
 libminifi/include/utils/ThreadPool.h | 23 +++++++++++++++++++----
 1 file changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/35d23d09/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 8ff3975..5335c81 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -129,6 +129,10 @@ class Worker {
   virtual uint64_t getTimeSlice() {
     return time_slice_;
   }
+  
+  virtual uint64_t getWaitTime(){
+    return run_determinant_->wait_time();
+  }
 
   Worker<T>(const Worker<T>&) = delete;
   Worker<T>& operator =(const Worker<T>&) = delete;
@@ -352,11 +356,19 @@ void ThreadPool<T>::run_tasks() {
   uint64_t wait_decay_ = 0;
   while (running_.load()) {
 
+    // if we exceed 500ms of wait due to not being able to run any tasks and there are tasks available, meaning
+    // they are eligible to run per the fact that the thread pool isn't shut down and the tasks are in a runnable state
+    // BUT they've been continually timesliced, we will lower the wait decay to 100ms and continue incrementing from
+    // there. This ensures we don't have arbitrarily long sleep cycles. 
+    if (wait_decay_ > 500000000L){
+     wait_decay_ = 100000000L;
+    }
     // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible
     // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially
     // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should
     // be more likely to run. This is intentional.
-    if (wait_decay_ > 1000) {
+    
+    if (wait_decay_ > 2000) {
       std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
     }
     Worker<T> task;
@@ -376,9 +388,12 @@ void ThreadPool<T>::run_tasks() {
 
     bool wait_to_run = false;
     if (task.getTimeSlice() > 1) {
+      double wt = (double)task.getWaitTime();
       auto now = std::chrono::system_clock::now().time_since_epoch();
-      auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
-      if (task.getTimeSlice() > ms.count()) {
+      auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
+      // if our differential is < 10% of the wait time we will not put the task into a wait state
+      // since requeuing will break the time slice contract.
+      if (task.getTimeSlice() > ms && (task.getTimeSlice() - ms) > (wt*.10)) {
         wait_to_run = true;
       }
     }
@@ -392,7 +407,7 @@ void ThreadPool<T>::run_tasks() {
       }
       worker_queue_.enqueue(std::move(task));
 
-      wait_decay_ += 100;
+      wait_decay_ += 25;
       continue;
     }
 


[05/11] nifi-minifi-cpp git commit: MINIFI-367 port tests to use boost::filesystem vs. stat.h for better portability

Posted by al...@apache.org.
MINIFI-367 port tests to use boost::filesystem vs. stat.h for better portability

This closes #124.

Signed-off-by: Marc Parisi <ph...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/8ea91a1d
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/8ea91a1d
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/8ea91a1d

Branch: refs/heads/master
Commit: 8ea91a1d1a40373e8254f068d2715c2a4269cb24
Parents: 35d23d0
Author: Andrew I. Christianson <ac...@hortonworks.com>
Authored: Tue Aug 8 13:16:19 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Sep 12 13:51:03 2017 -0400

----------------------------------------------------------------------
 libminifi/CMakeLists.txt             |  3 ++-
 libminifi/test/unit/PutFileTests.cpp | 22 ++++++----------------
 2 files changed, 8 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8ea91a1d/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index a78fd8f..5e63a30 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -80,9 +80,10 @@ target_link_libraries (minifi ${ZLIB_LIBRARIES})
 
 if (NOT IOS)
 # Include Boost System
-find_package(Boost COMPONENTS system REQUIRED)
+find_package(Boost COMPONENTS system filesystem REQUIRED)
 find_package(CURL)
 target_link_libraries(minifi ${Boost_SYSTEM_LIBRARY})
+target_link_libraries(minifi ${Boost_FILESYSTEM_LIBRARY})
 
 if (CURL_FOUND)
         include_directories(${CURL_INCLUDE_DIRS})

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8ea91a1d/libminifi/test/unit/PutFileTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/PutFileTests.cpp b/libminifi/test/unit/PutFileTests.cpp
index c18c72c..024b6fa 100644
--- a/libminifi/test/unit/PutFileTests.cpp
+++ b/libminifi/test/unit/PutFileTests.cpp
@@ -25,6 +25,8 @@
 #include <set>
 #include <fstream>
 
+#include <boost/filesystem.hpp>
+
 #include "../TestBase.h"
 #include "processors/ListenHTTP.h"
 #include "processors/LogAttribute.h"
@@ -45,18 +47,6 @@ TEST_CASE("Test Creation of PutFile", "[getfileCreate]") {
   REQUIRE(processor->getName() == "processorname");
 }
 
-uint64_t getModificationTime(std::string filename) {
-  struct stat result;
-  if (stat(filename.c_str(), &result) == 0) {
-#if !defined(_POSIX_C_SOURCE) || defined(_DARWIN_C_SOURCE)
-    return result.st_mtimespec.tv_sec;
-#else
-    return result.st_mtime;
-#endif
-  }
-  return 0;
-}
-
 TEST_CASE("PutFileTest", "[getfileputpfile]") {
   TestController testController;
 
@@ -231,7 +221,7 @@ TEST_CASE("PutFileTestFileExistsIgnore", "[getfileputpfile]") {
   file.open(movedFile.str(), std::ios::out);
   file << "tempFile";
   file.close();
-  uint64_t filemodtime = getModificationTime(movedFile.str());
+  auto filemodtime = boost::filesystem::last_write_time(movedFile.str());
 
   std::this_thread::sleep_for(std::chrono::milliseconds(1000));
   plan->reset();
@@ -252,7 +242,7 @@ TEST_CASE("PutFileTestFileExistsIgnore", "[getfileputpfile]") {
   // verify that the fle was moved
   REQUIRE(false == std::ifstream(ss.str()).good());
   REQUIRE(true == std::ifstream(movedFile.str()).good());
-  REQUIRE(filemodtime == getModificationTime(movedFile.str()));
+  REQUIRE(filemodtime == boost::filesystem::last_write_time(movedFile.str()));
   LogTestController::getInstance().reset();
 }
 
@@ -299,7 +289,7 @@ TEST_CASE("PutFileTestFileExistsReplace", "[getfileputpfile]") {
   file.open(movedFile.str(), std::ios::out);
   file << "tempFile";
   file.close();
-  uint64_t filemodtime = getModificationTime(movedFile.str());
+  auto filemodtime = boost::filesystem::last_write_time(movedFile.str());
 
   std::this_thread::sleep_for(std::chrono::milliseconds(1000));
   plan->reset();
@@ -320,7 +310,7 @@ TEST_CASE("PutFileTestFileExistsReplace", "[getfileputpfile]") {
   // verify that the fle was moved
   REQUIRE(false == std::ifstream(ss.str()).good());
   REQUIRE(true == std::ifstream(movedFile.str()).good());
-  REQUIRE(filemodtime != getModificationTime(movedFile.str()));
+  REQUIRE(filemodtime != boost::filesystem::last_write_time(movedFile.str()));
   LogTestController::getInstance().reset();
 }
 


[03/11] nifi-minifi-cpp git commit: MINIFI-375: Remove forward slash from urls

Posted by al...@apache.org.
MINIFI-375: Remove forward slash from urls

This closes #127.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/95343203
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/95343203
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/95343203

Branch: refs/heads/master
Commit: 953432038acbef807789ad7bbcc01240b16fc90d
Parents: 8ea91a1
Author: Marc Parisi <ph...@apache.org>
Authored: Tue Aug 15 15:05:41 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Sep 12 13:51:03 2017 -0400

----------------------------------------------------------------------
 libminifi/src/RemoteProcessorGroupPort.cpp | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/95343203/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index 3c88e8f..05d5f7a 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -212,7 +212,7 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
   if (this->host_.empty() || this->port_ == -1 || this->protocol_.empty())
       return;
 
-  std::string fullUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/controller/";
+  std::string fullUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/controller";
 
   this->site2site_port_ = -1;
   configure_->get(Configure::nifi_rest_api_user_name, this->rest_user_name_);
@@ -221,7 +221,7 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
   std::string token;
 
   if (!rest_user_name_.empty()) {
-    std::string loginUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/access/token/";
+    std::string loginUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/access/token";
     token = utils::get_token(loginUrl, this->rest_user_name_, this->rest_password_, this->securityConfig_);
     logger_->log_debug("Token from NiFi REST Api endpoint %s", token);
     if (token.empty())
@@ -285,7 +285,7 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
         logger_->log_info("process group remote site2site port %d, is secure %d", site2site_port_, site2site_secure_);
       }
     } else {
-      logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo");
+      logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo: received HTTP code %d from %s", http_code, fullUrl);
     }
   } else {
     logger_->log_error(


[10/11] nifi-minifi-cpp git commit: MINIFI-389 Added support for one-way TLS to SSLContextService

Posted by al...@apache.org.
MINIFI-389 Added support for one-way TLS to SSLContextService

This closes #132.

Signed-off-by: Marc Parisi <ph...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/0e24a343
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/0e24a343
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/0e24a343

Branch: refs/heads/master
Commit: 0e24a343653bada3b106606584bf9723609e02e0
Parents: a426b8d
Author: Andrew I. Christianson <an...@andyic.org>
Authored: Fri Aug 25 16:24:12 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Sep 12 13:51:04 2017 -0400

----------------------------------------------------------------------
 .../include/controllers/SSLContextService.h     | 39 ++++++++++++--------
 libminifi/src/controllers/SSLContextService.cpp | 37 +++++++++++--------
 2 files changed, 44 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0e24a343/libminifi/include/controllers/SSLContextService.h
----------------------------------------------------------------------
diff --git a/libminifi/include/controllers/SSLContextService.h b/libminifi/include/controllers/SSLContextService.h
index 9093d5f..c48d30f 100644
--- a/libminifi/include/controllers/SSLContextService.h
+++ b/libminifi/include/controllers/SSLContextService.h
@@ -100,27 +100,34 @@ class SSLContextService : public core::controller::ControllerService {
   }
 
   bool configure_ssl_context(SSL_CTX *ctx) {
-    if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) <= 0) {
-      logger_->log_error("Could not create load certificate, error : %s", std::strerror(errno));
-      return false;
-    }
-    if (!IsNullOrEmpty(passphrase_)) {
-      SSL_CTX_set_default_passwd_cb_userdata(ctx, &passphrase_);
-      SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
+    if (!IsNullOrEmpty(certificate)) {
+      if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) <= 0) {
+        logger_->log_error("Could not create load certificate, error : %s", std::strerror(errno));
+        return false;
+      }
+      if (!IsNullOrEmpty(passphrase_)) {
+        SSL_CTX_set_default_passwd_cb_userdata(ctx, &passphrase_);
+        SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
+      }
     }
 
-    int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(), SSL_FILETYPE_PEM);
-    if (retp != 1) {
-      logger_->log_error("Could not create load private key,%i on %s error : %s", retp, private_key_, std::strerror(errno));
-      return false;
+    if (!IsNullOrEmpty(private_key_)) {
+      int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(), SSL_FILETYPE_PEM);
+      if (retp != 1) {
+        logger_->log_error("Could not create load private key,%i on %s error : %s", retp, private_key_,
+                           std::strerror(errno));
+        return false;
+      }
+
+      if (!SSL_CTX_check_private_key(ctx)) {
+        logger_->log_error("Private key does not match the public certificate, error : %s", std::strerror(errno));
+        return false;
+      }
     }
 
-    if (!SSL_CTX_check_private_key(ctx)) {
-      logger_->log_error("Private key does not match the public certificate, error : %s", std::strerror(errno));
-      return false;
-    }
+    SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, nullptr);
+    int retp = SSL_CTX_load_verify_locations(ctx, ca_certificate_.c_str(), 0);
 
-    retp = SSL_CTX_load_verify_locations(ctx, ca_certificate_.c_str(), 0);
     if (retp == 0) {
       logger_->log_error("Can not load CA certificate, Exiting, error : %s", std::strerror(errno));
       return false;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0e24a343/libminifi/src/controllers/SSLContextService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp
index 73c9e35..95ccbb0 100644
--- a/libminifi/src/controllers/SSLContextService.cpp
+++ b/libminifi/src/controllers/SSLContextService.cpp
@@ -51,27 +51,32 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() {
   method = TLSv1_2_client_method();
   SSL_CTX *ctx = SSL_CTX_new(method);
 
-  if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) <= 0) {
-    logger_->log_error("Could not create load certificate, error : %s", std::strerror(errno));
-    return nullptr;
-  }
-  if (!IsNullOrEmpty(passphrase_)) {
-    SSL_CTX_set_default_passwd_cb_userdata(ctx, &passphrase_);
-    SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
+  if (!IsNullOrEmpty(certificate)) {
+    if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) <= 0) {
+      logger_->log_error("Could not create load certificate, error : %s", std::strerror(errno));
+      return nullptr;
+    }
+    if (!IsNullOrEmpty(passphrase_)) {
+      SSL_CTX_set_default_passwd_cb_userdata(ctx, &passphrase_);
+      SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
+    }
   }
 
-  int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(), SSL_FILETYPE_PEM);
-  if (retp != 1) {
-    logger_->log_error("Could not create load private key,%i on %s error : %s", retp, private_key_, std::strerror(errno));
-    return nullptr;
-  }
+  if (!IsNullOrEmpty(private_key_)) {
+    int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(), SSL_FILETYPE_PEM);
+    if (retp != 1) {
+      logger_->log_error("Could not create load private key,%i on %s error : %s", retp, private_key_,
+                         std::strerror(errno));
+      return nullptr;
+    }
 
-  if (!SSL_CTX_check_private_key(ctx)) {
-    logger_->log_error("Private key does not match the public certificate, error : %s", std::strerror(errno));
-    return nullptr;
+    if (!SSL_CTX_check_private_key(ctx)) {
+      logger_->log_error("Private key does not match the public certificate, error : %s", std::strerror(errno));
+      return nullptr;
+    }
   }
 
-  retp = SSL_CTX_load_verify_locations(ctx, ca_certificate_.c_str(), 0);
+  int retp = SSL_CTX_load_verify_locations(ctx, ca_certificate_.c_str(), 0);
   if (retp == 0) {
     logger_->log_error("Can not load CA certificate, Exiting, error : %s", std::strerror(errno));
   }


[08/11] nifi-minifi-cpp git commit: MINIFI-375: Update integration test

Posted by al...@apache.org.
MINIFI-375: Update integration test

This closes #129.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a426b8d2
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a426b8d2
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a426b8d2

Branch: refs/heads/master
Commit: a426b8d2d1764f0b9033013dbca609ade7d3c3d2
Parents: 22f4528
Author: Marc Parisi <ph...@apache.org>
Authored: Mon Aug 21 12:20:15 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Sep 12 13:51:04 2017 -0400

----------------------------------------------------------------------
 libminifi/test/integration/Site2SiteRestTest.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a426b8d2/libminifi/test/integration/Site2SiteRestTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/Site2SiteRestTest.cpp b/libminifi/test/integration/Site2SiteRestTest.cpp
index 1773cdb..0e454af 100644
--- a/libminifi/test/integration/Site2SiteRestTest.cpp
+++ b/libminifi/test/integration/Site2SiteRestTest.cpp
@@ -86,7 +86,7 @@ int main(int argc, char **argv) {
 
   CivetServer server(cpp_options);
   ConfigHandler h_ex;
-  server.addHandler("/nifi-api/controller/", h_ex);
+  server.addHandler("/nifi-api/controller", h_ex);
   LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>();
 
   std::string key_dir, test_file_location;


[06/11] nifi-minifi-cpp git commit: MINIFI-338: Address linter errors

Posted by al...@apache.org.
MINIFI-338: Address linter errors


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/ead9e84e
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/ead9e84e
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/ead9e84e

Branch: refs/heads/master
Commit: ead9e84ec163252b63e7889de925e6eac72e1b63
Parents: 747691d
Author: Marc Parisi <ph...@apache.org>
Authored: Wed Jul 5 12:13:45 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Sep 12 13:51:03 2017 -0400

----------------------------------------------------------------------
 libminifi/src/FlowController.cpp             | 1 -
 libminifi/src/ThreadedSchedulingAgent.cpp    | 4 +---
 libminifi/src/TimerDrivenSchedulingAgent.cpp | 3 ---
 3 files changed, 1 insertion(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ead9e84e/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 32fd298..b0fbffa 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -192,7 +192,6 @@ void FlowController::stop(bool force) {
     this->timer_scheduler_->stop();
     this->event_scheduler_->stop();
     running_ = false;
-
   }
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ead9e84e/libminifi/src/ThreadedSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp
index d6b8fae..82d4dfd 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -21,6 +21,7 @@
 #include <memory>
 #include <string>
 #include <vector>
+#include <utility>
 #include <map>
 #include <thread>
 #include <iostream>
@@ -76,7 +77,6 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
 
   ThreadedSchedulingAgent *agent = this;
   for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) {
-
     // reference the disable function from serviceNode
     std::function<uint64_t()> f_ex = [agent, processor, processContext, sessionFactory] () {
       return agent->run(processor, processContext.get(), sessionFactory.get());
@@ -88,10 +88,8 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
     // we aren't terribly concerned with the result.
     std::future<uint64_t> future;
     thread_pool_.execute(std::move(functor), future);
-
   }
   logger_->log_info("Scheduled thread %d concurrent workers for for process %s", processor->getMaxConcurrentTasks(), processor->getName().c_str());
-
   return;
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ead9e84e/libminifi/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 3276470..c3aaa69 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -37,14 +37,11 @@ uint64_t TimerDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> proces
       return processor->getYieldTime();
     } else if (shouldYield && this->bored_yield_duration_ > 0) {
       // No work to do or need to apply back pressure
-      //std::this_thread::sleep_for(std::chrono::milliseconds(x));
       return this->bored_yield_duration_;
     }
     return processor->getSchedulingPeriodNano() / 1000000;
-    //std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
   }
   return 0;
-  //return;
 }
 
 } /* namespace minifi */


[02/11] nifi-minifi-cpp git commit: MINIFI-376 removed defunct references to curlbuild.h

Posted by al...@apache.org.
MINIFI-376 removed defunct references to curlbuild.h

This closes #128.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/eb140801
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/eb140801
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/eb140801

Branch: refs/heads/master
Commit: eb140801a1cf01eda3fa950d91ed51720ca0b26a
Parents: 9534320
Author: Andrew I. Christianson <ac...@hortonworks.com>
Authored: Wed Aug 16 11:56:34 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Sep 12 13:51:03 2017 -0400

----------------------------------------------------------------------
 libminifi/include/utils/HTTPUtils.h         | 1 -
 libminifi/src/HttpConfigurationListener.cpp | 1 -
 libminifi/src/RemoteProcessorGroupPort.cpp  | 1 -
 libminifi/src/processors/InvokeHTTP.cpp     | 1 -
 4 files changed, 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/eb140801/libminifi/include/utils/HTTPUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/HTTPUtils.h b/libminifi/include/utils/HTTPUtils.h
index 46aa67a..e47bc11 100644
--- a/libminifi/include/utils/HTTPUtils.h
+++ b/libminifi/include/utils/HTTPUtils.h
@@ -23,7 +23,6 @@
 #include <vector>
 #include <iostream>
 #include <string>
-#include <curl/curlbuild.h>
 #include <curl/easy.h>
 #include <openssl/ssl.h>
 #include "ByteInputCallBack.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/eb140801/libminifi/src/HttpConfigurationListener.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/HttpConfigurationListener.cpp b/libminifi/src/HttpConfigurationListener.cpp
index c16ca75..6b3a061 100644
--- a/libminifi/src/HttpConfigurationListener.cpp
+++ b/libminifi/src/HttpConfigurationListener.cpp
@@ -18,7 +18,6 @@
 
 #include "HttpConfigurationListener.h"
 #include "FlowController.h"
-#include <curl/curlbuild.h>
 #include <curl/easy.h>
 #include <iostream>
 #include <iterator>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/eb140801/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index 05d5f7a..bcc3d49 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -21,7 +21,6 @@
 #include "RemoteProcessorGroupPort.h"
 
 #include <curl/curl.h>
-#include <curl/curlbuild.h>
 #include <curl/easy.h>
 #include <uuid/uuid.h>
 #include <algorithm>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/eb140801/libminifi/src/processors/InvokeHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/InvokeHTTP.cpp b/libminifi/src/processors/InvokeHTTP.cpp
index 7dc75d2..81271a5 100644
--- a/libminifi/src/processors/InvokeHTTP.cpp
+++ b/libminifi/src/processors/InvokeHTTP.cpp
@@ -18,7 +18,6 @@
 
 #include "processors/InvokeHTTP.h"
 #include <regex.h>
-#include <curl/curlbuild.h>
 #include <curl/easy.h>
 #include <uuid/uuid.h>
 #include <memory>


[04/11] nifi-minifi-cpp git commit: MINIFI-338: Convert processor threads to use thread pools

Posted by al...@apache.org.
MINIFI-338: Convert processor threads to use thread pools


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/747691d7
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/747691d7
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/747691d7

Branch: refs/heads/master
Commit: 747691d7f3a787d7b38a4f67bac56d26001cf400
Parents: 5fca46f
Author: Marc Parisi <ph...@apache.org>
Authored: Fri Jun 30 10:05:15 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Sep 12 13:51:03 2017 -0400

----------------------------------------------------------------------
 libminifi/include/EventDrivenSchedulingAgent.h |   2 +-
 libminifi/include/SchedulingAgent.h            |   2 +-
 libminifi/include/ThreadedSchedulingAgent.h    |  56 +++++-
 libminifi/include/TimerDrivenSchedulingAgent.h |   2 +-
 libminifi/include/core/Processor.h             |   5 +-
 libminifi/include/utils/ThreadPool.h           | 180 +++++++++++++++++++-
 libminifi/src/EventDrivenSchedulingAgent.cpp   |  13 +-
 libminifi/src/FlowController.cpp               |  15 +-
 libminifi/src/SchedulingAgent.cpp              |   4 +-
 libminifi/src/ThreadedSchedulingAgent.cpp      |  44 ++---
 libminifi/src/TimerDrivenSchedulingAgent.cpp   |  13 +-
 libminifi/test/unit/SocketTests.cpp            |   2 +-
 libminifi/test/unit/ThreadPoolTests.cpp        |   2 +-
 13 files changed, 281 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/EventDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h
index c838b11..ca9f021 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -46,7 +46,7 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
   virtual ~EventDrivenSchedulingAgent() {
   }
   // Run function for the thread
-  void run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
+  uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
 
  private:
   // Prevent default copy constructor and assignment operation

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/SchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 1ff3fac..130c088 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -84,7 +84,7 @@ class SchedulingAgent {
     running_ = true;
   }
   // stop
-  void stop() {
+  virtual void stop() {
     running_ = false;
     component_lifecycle_thread_pool_.shutdown();
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/ThreadedSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index b4db4bf..27b8b3a 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -20,6 +20,7 @@
 #ifndef __THREADED_SCHEDULING_AGENT_H__
 #define __THREADED_SCHEDULING_AGENT_H__
 
+#include <chrono>
 #include "properties/Configure.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "core/Processor.h"
@@ -33,6 +34,47 @@ namespace nifi {
 namespace minifi {
 
 /**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<uint64_t> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : run_monitor_(run_monitor),
+        current_wait_(0) {
+
+  }
+  explicit TimerAwareMonitor(TimerAwareMonitor &&other)
+      : AfterExecute(std::move(other)),
+        run_monitor_(std::move(other.run_monitor_)) {
+    current_wait_.store(other.current_wait_.load());
+  }
+  virtual bool isFinished(const uint64_t &result) {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const uint64_t &result) {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual int64_t wait_time() {
+    return current_wait_.load();
+  }
+ private:
+
+  std::atomic<uint64_t> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+/**
  * An abstract scheduling agent which creates and manages a pool of threads for
  * each processor scheduled.
  */
@@ -48,13 +90,18 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
                           std::shared_ptr<Configure> configuration)
       : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration),
         logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) {
+
+    utils::ThreadPool<uint64_t> pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 8), true);
+    thread_pool_ = std::move(pool);
+    thread_pool_.start();
+
   }
   // Destructor
   virtual ~ThreadedSchedulingAgent() {
   }
 
   // Run function for the thread
-  virtual void run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) = 0;
+  virtual uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) = 0;
 
  public:
   // schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
@@ -62,9 +109,12 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
   // unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent
   virtual void unschedule(std::shared_ptr<core::Processor> processor);
 
+  virtual void stop();
+   protected:
+  utils::ThreadPool<uint64_t> thread_pool_;
+
  protected:
-  // Threads
-  std::map<std::string, std::vector<std::thread *>> _threads;
+
 
  private:
   // Prevent default copy constructor and assignment operation

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/TimerDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index 816bcec..1502c47 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -49,7 +49,7 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
   /**
    * Run function that accepts the processor, context and session factory.
    */
-  void run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
+  uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
 
  private:
   // Prevent default copy constructor and assignment operation

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/core/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index 251ec47..0853c11 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -220,6 +220,9 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
   virtual void onSchedule(ProcessContext *context, ProcessSessionFactory *sessionFactory) {
   }
 
+  // Check all incoming connections for work
+  bool isWorkAvailable();
+
  protected:
 
   // Processor state
@@ -246,8 +249,6 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
   // Yield Expiration
   std::atomic<uint64_t> yield_expiration_;
 
-  // Check all incoming connections for work
-  bool isWorkAvailable();
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer
   Processor(const Processor &parent);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 77772cd..8ff3975 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -33,6 +33,35 @@ namespace minifi {
 namespace utils {
 
 /**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual int64_t wait_time() {
+    return 0;
+  }
+};
+
+/**
  * Worker task
  * purpose: Provides a wrapper for the functor
  * and returns a future based on the template argument.
@@ -40,12 +69,29 @@ namespace utils {
 template<typename T>
 class Worker {
  public:
-  explicit Worker(std::function<T()> &task)
-      : task(task) {
+  explicit Worker(std::function<T()> &task, const std::string &identifier, std::unique_ptr<AfterExecute<T>> run_determinant)
+      : task(task),
+        run_determinant_(std::move(run_determinant)),
+        identifier_(identifier),
+        time_slice_(0) {
+    promise = std::make_shared<std::promise<T>>();
+  }
+
+  explicit Worker(std::function<T()> &task, const std::string &identifier)
+      : task(task),
+        run_determinant_(nullptr),
+        identifier_(identifier),
+        time_slice_(0) {
     promise = std::make_shared<std::promise<T>>();
   }
 
-  explicit Worker() {
+  explicit Worker(const std::string identifier = "")
+      : identifier_(identifier),
+        time_slice_(0) {
+  }
+
+  virtual ~Worker() {
+
   }
 
   /**
@@ -53,16 +99,35 @@ class Worker {
    */
   Worker(Worker &&other)
       : task(std::move(other.task)),
-        promise(other.promise) {
+        promise(other.promise),
+        time_slice_(std::move(other.time_slice_)),
+        identifier_(std::move(other.identifier_)),
+        run_determinant_(std::move(other.run_determinant_)) {
   }
 
   /**
-   * Runs the task and takes the output from the funtor
+   * Runs the task and takes the output from the functor
    * setting the result into the promise
+   * @return whether or not to continue running
+   *   false == finished || error
+   *   true == run again
    */
-  void run() {
+  virtual bool run() {
     T result = task();
-    promise->set_value(result);
+    if (run_determinant_ == nullptr || (run_determinant_->isFinished(result) || run_determinant_->isCancelled(result))) {
+      promise->set_value(result);
+      return false;
+    }
+    time_slice_ = increment_time(run_determinant_->wait_time());
+    return true;
+  }
+
+  virtual void setIdentifier(const std::string identifier) {
+    identifier_ = identifier;
+  }
+
+  virtual uint64_t getTimeSlice() {
+    return time_slice_;
   }
 
   Worker<T>(const Worker<T>&) = delete;
@@ -72,8 +137,22 @@ class Worker {
 
   std::shared_ptr<std::promise<T>> getPromise();
 
- private:
+  const std::string &getIdentifier() {
+    return identifier_;
+  }
+ protected:
+
+  inline uint64_t increment_time(const uint64_t &time) {
+    std::chrono::time_point<std::chrono::system_clock> now =
+        std::chrono::system_clock::now();
+    auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
+    return millis + time;
+  }
+
+  std::string identifier_;
+  uint64_t time_slice_;
   std::function<T()> task;
+  std::unique_ptr<AfterExecute<T>> run_determinant_;
   std::shared_ptr<std::promise<T>> promise;
 };
 
@@ -81,6 +160,9 @@ template<typename T>
 Worker<T>& Worker<T>::operator =(Worker<T> && other) {
   task = std::move(other.task);
   promise = other.promise;
+  time_slice_ = std::move(other.time_slice_);
+  identifier_ = std::move(other.identifier_);
+  run_determinant_ = std::move(other.run_determinant_);
   return *this;
 }
 
@@ -125,6 +207,21 @@ class ThreadPool {
    * @return true if future can be created and thread pool is in a running state.
    */
   bool execute(Worker<T> &&task, std::future<T> &future);
+
+  /**
+   * attempts to stop tasks with the provided identifier.
+   * @param identifier for worker tasks. Note that these tasks won't
+   * immediately stop.
+   */
+  void stopTasks(const std::string &identifier);
+
+  /**
+   * Returns true if a task is running.
+   */
+  bool isRunning(const std::string &identifier) {
+    return task_status_[identifier] == true;
+  }
+
   /**
    * Starts the Thread Pool
    */
@@ -199,6 +296,8 @@ class ThreadPool {
   moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
   // notification for available work
   std::condition_variable tasks_available_;
+  // map to identify if a task should be
+  std::map<std::string, bool> task_status_;
   // manager mutex
   std::recursive_mutex manager_mutex_;
   // work queue mutex
@@ -218,6 +317,10 @@ class ThreadPool {
 template<typename T>
 bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
 
+  {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    task_status_[task.getIdentifier()] = true;
+  }
   future = std::move(task.getPromise()->get_future());
   bool enqueued = worker_queue_.enqueue(std::move(task));
   if (running_) {
@@ -246,15 +349,67 @@ void ThreadPool<T>::startWorkers() {
 template<typename T>
 void ThreadPool<T>::run_tasks() {
   auto waitperiod = std::chrono::milliseconds(1) * 100;
+  uint64_t wait_decay_ = 0;
   while (running_.load()) {
 
+    // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible
+    // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially
+    // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should
+    // be more likely to run. This is intentional.
+    if (wait_decay_ > 1000) {
+      std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
+    }
     Worker<T> task;
     if (!worker_queue_.try_dequeue(task)) {
+
       std::unique_lock<std::mutex> lock(worker_queue_mutex_);
       tasks_available_.wait_for(lock, waitperiod);
       continue;
     }
-    task.run();
+    else {
+
+      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      if (!task_status_[task.getIdentifier()]) {
+        continue;
+      }
+    }
+
+    bool wait_to_run = false;
+    if (task.getTimeSlice() > 1) {
+      auto now = std::chrono::system_clock::now().time_since_epoch();
+      auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
+      if (task.getTimeSlice() > ms.count()) {
+        wait_to_run = true;
+      }
+    }
+    // if we have to wait we re-queue the worker.
+    if (wait_to_run) {
+      {
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        if (!task_status_[task.getIdentifier()]) {
+          continue;
+        }
+      }
+      worker_queue_.enqueue(std::move(task));
+
+      wait_decay_ += 100;
+      continue;
+    }
+
+    const bool task_renew = task.run();
+    wait_decay_ = 0;
+    if (task_renew) {
+
+      {
+        // even if we have more work to do we will not
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        if (!task_status_[task.getIdentifier()]) {
+          continue;
+        }
+      }
+      worker_queue_.enqueue(std::move(task));
+
+    }
   }
   current_workers_--;
 
@@ -272,12 +427,19 @@ void ThreadPool<T>::start() {
 }
 
 template<typename T>
+void ThreadPool<T>::stopTasks(const std::string &identifier) {
+  std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+  task_status_[identifier] = false;
+}
+
+template<typename T>
 void ThreadPool<T>::shutdown() {
   if (running_.load()) {
     std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
     running_.store(false);
 
     drain();
+    task_status_.clear();
     if (manager_thread_.joinable())
       manager_thread_.join();
     {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/src/EventDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp
index 8a2a874..db5ca08 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -32,22 +32,27 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-void EventDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
+uint64_t EventDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
   while (this->running_) {
     bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
 
     if (processor->isYield()) {
       // Honor the yield
-      std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
+      return processor->getYieldTime();
     } else if (shouldYield && this->bored_yield_duration_ > 0) {
       // No work to do or need to apply back pressure
-      std::this_thread::sleep_for(std::chrono::milliseconds(this->bored_yield_duration_));
+      return this->bored_yield_duration_;
     }
 
     // Block until work is available
+
     processor->waitForWork(1000);
+
+    if (!processor->isWorkAvailable()) {
+      return 1000;
+    }
   }
-  return;
+  return 0;
 }
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 6358ed0..32fd298 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -183,17 +183,16 @@ void FlowController::stop(bool force) {
   std::lock_guard < std::recursive_mutex > flow_lock(mutex_);
   if (running_) {
     // immediately indicate that we are not running
-    running_ = false;
-
     logger_->log_info("Stop Flow Controller");
-    this->timer_scheduler_->stop();
-    this->event_scheduler_->stop();
-    this->flow_file_repo_->stop();
-    this->provenance_repo_->stop();
-    // Wait for sometime for thread stop
-    std::this_thread::sleep_for(std::chrono::milliseconds(3000));
     if (this->root_)
       this->root_->stopProcessing(this->timer_scheduler_.get(), this->event_scheduler_.get());
+    this->flow_file_repo_->stop();
+    this->provenance_repo_->stop();
+    // stop after we've attempted to stop the processors.
+    this->timer_scheduler_->stop();
+    this->event_scheduler_->stop();
+    running_ = false;
+
   }
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 1060830..e228ba5 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -46,7 +46,7 @@ void SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::
     return serviceNode->enable();
   };
   // create a functor that will be submitted to the thread pool.
-  utils::Worker<bool> functor(f_ex);
+  utils::Worker<bool> functor(f_ex, serviceNode->getUUIDStr());
   // move the functor into the thread pool. While a future is returned
   // we aren't terribly concerned with the result.
   std::future<bool> future;
@@ -59,7 +59,7 @@ void SchedulingAgent::disableControllerService(std::shared_ptr<core::controller:
     return serviceNode->disable();
   };
   // create a functor that will be submitted to the thread pool.
-  utils::Worker<bool> functor(f_ex);
+  utils::Worker<bool> functor(f_ex, serviceNode->getUUIDStr());
   // move the functor into the thread pool. While a future is returned
   // we aren't terribly concerned with the result.
   std::future<bool> future;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/src/ThreadedSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp
index 7b4ce85..d6b8fae 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -61,8 +61,7 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
     return;
   }
 
-  std::map<std::string, std::vector<std::thread *>>::iterator it = _threads.find(processor->getUUIDStr());
-  if (it != _threads.end()) {
+  if (thread_pool_.isRunning(processor->getUUIDStr())) {
     logger_->log_info("Can not schedule threads for processor %s because there are existing threads running");
     return;
   }
@@ -74,20 +73,33 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
   processor->onSchedule(processContext.get(), sessionFactory.get());
 
   std::vector<std::thread *> threads;
+
+  ThreadedSchedulingAgent *agent = this;
   for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) {
-    ThreadedSchedulingAgent *agent = this;
-    std::thread *thread = new std::thread([agent, processor, processContext, sessionFactory] () {
-      agent->run(processor, processContext.get(), sessionFactory.get());
-    });
-    thread->detach();
-    threads.push_back(thread);
-    logger_->log_info("Scheduled thread %d running for process %s", thread->get_id(), processor->getName().c_str());
+
+    // reference the disable function from serviceNode
+    std::function<uint64_t()> f_ex = [agent, processor, processContext, sessionFactory] () {
+      return agent->run(processor, processContext.get(), sessionFactory.get());
+    };
+    // create a functor that will be submitted to the thread pool.
+    std::unique_ptr<TimerAwareMonitor> monitor = std::unique_ptr<TimerAwareMonitor>(new TimerAwareMonitor(&running_));
+    utils::Worker<uint64_t> functor(f_ex, processor->getUUIDStr(), std::move(monitor));
+    // move the functor into the thread pool. While a future is returned
+    // we aren't terribly concerned with the result.
+    std::future<uint64_t> future;
+    thread_pool_.execute(std::move(functor), future);
+
   }
-  _threads[processor->getUUIDStr().c_str()] = threads;
+  logger_->log_info("Scheduled thread %d concurrent workers for for process %s", processor->getMaxConcurrentTasks(), processor->getName().c_str());
 
   return;
 }
 
+void ThreadedSchedulingAgent::stop() {
+  SchedulingAgent::stop();
+  thread_pool_.shutdown();
+}
+
 void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> processor) {
   std::lock_guard < std::mutex > lock(mutex_);
   logger_->log_info("Shutting down threads for processor %s/%s", processor->getName().c_str(), processor->getUUIDStr().c_str());
@@ -97,18 +109,8 @@ void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> proces
     return;
   }
 
-  std::map<std::string, std::vector<std::thread *>>::iterator it = _threads.find(processor->getUUIDStr());
+  thread_pool_.stopTasks(processor->getUUIDStr());
 
-  if (it == _threads.end()) {
-    logger_->log_info("Cannot unschedule threads for processor %s because there are no existing threads running", processor->getName().c_str());
-    return;
-  }
-  for (std::vector<std::thread *>::iterator itThread = it->second.begin(); itThread != it->second.end(); ++itThread) {
-    std::thread *thread = *itThread;
-    logger_->log_info("Scheduled thread %d deleted for process %s", thread->get_id(), processor->getName().c_str());
-    delete thread;
-  }
-  _threads.erase(processor->getUUIDStr());
   processor->clearActiveTask();
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index b9a41ea..3276470 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -29,19 +29,22 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-void TimerDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
+uint64_t TimerDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
   while (this->running_) {
     bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
     if (processor->isYield()) {
       // Honor the yield
-      std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
+      return processor->getYieldTime();
     } else if (shouldYield && this->bored_yield_duration_ > 0) {
       // No work to do or need to apply back pressure
-      std::this_thread::sleep_for(std::chrono::milliseconds(this->bored_yield_duration_));
+      //std::this_thread::sleep_for(std::chrono::milliseconds(x));
+      return this->bored_yield_duration_;
     }
-    std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
+    return processor->getSchedulingPeriodNano() / 1000000;
+    //std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
   }
-  return;
+  return 0;
+  //return;
 }
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/test/unit/SocketTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp
index a791b3f..0576d5f 100644
--- a/libminifi/test/unit/SocketTests.cpp
+++ b/libminifi/test/unit/SocketTests.cpp
@@ -200,7 +200,7 @@ TEST_CASE("TestTLSContextCreation", "[TestSocket6]") {
   std::vector<std::future<bool>> futures;
   for (int i = 0; i < 20; i++) {
     std::function<bool()> f_ex = createSocket;
-    utils::Worker<bool> functor(f_ex);
+    utils::Worker<bool> functor(f_ex, "id");
     std::future<bool> fut;
     REQUIRE(true == pool.execute(std::move(functor), fut));
     futures.push_back(std::move(fut));

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/test/unit/ThreadPoolTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp
index 0bba767..670958a 100644
--- a/libminifi/test/unit/ThreadPoolTests.cpp
+++ b/libminifi/test/unit/ThreadPoolTests.cpp
@@ -29,7 +29,7 @@ bool function() {
 TEST_CASE("ThreadPoolTest1", "[TPT1]") {
   utils::ThreadPool<bool> pool(5);
   std::function<bool()> f_ex = function;
-  utils::Worker<bool> functor(f_ex);
+  utils::Worker<bool> functor(f_ex, "id");
   pool.start();
   std::future<bool> fut;
   REQUIRE(true == pool.execute(std::move(functor), fut));