You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/09/12 17:49:59 UTC
nifi-minifi-cpp git commit: MINIFI-350 Added pytest-based system
integration test framework and initial test cases
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master d422e725c -> 17fe045b4
MINIFI-350 Added pytest-based system integration test framework and initial test cases
This closes #126.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/17fe045b
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/17fe045b
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/17fe045b
Branch: refs/heads/master
Commit: 17fe045b48f74ba84035cffb77053ac7dbb50c94
Parents: d422e72
Author: Andrew I. Christianson <an...@andyic.org>
Authored: Thu Jul 13 10:42:35 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Sep 12 12:47:52 2017 -0400
----------------------------------------------------------------------
.gitignore | 4 +
README.md | 18 ++
cmake/DockerConfig.cmake | 4 +-
docker/DockerVerify.sh | 38 +++
docker/test/integration/.gitignore | 1 +
docker/test/integration/.ropeproject/config.py | 112 +++++++
docker/test/integration/README.md | 184 ++++++++++++
docker/test/integration/minifi/__init__.py | 298 +++++++++++++++++++
docker/test/integration/minifi/test/__init__.py | 191 ++++++++++++
docker/test/integration/test_filesystem_ops.py | 54 ++++
docker/test/integration/test_http.py | 36 +++
11 files changed, 939 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/17fe045b/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 2ba3164..c28b765 100644
--- a/.gitignore
+++ b/.gitignore
@@ -54,3 +54,7 @@ thirdparty/apache-rat/apache-rat*
# Ignore source files that have been placed in the docker directory during build
docker/minificppsource
*.swp
+.cache
+.cproject
+.settings
+*.pyc
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/17fe045b/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index f971683..8830aaa 100644
--- a/README.md
+++ b/README.md
@@ -99,6 +99,11 @@ $ yum install cmake \
boost-devel \
libssl-dev \
doxygen
+$ # (Optional) for building docker image
+$ yum install docker
+$ # (Optional) for system integration tests
+$ yum install docker python-virtualenv
+
```
Aptitude based Linux Distributions
@@ -111,6 +116,10 @@ $ apt-get install cmake \
uuid-dev uuid \
libboost-all-dev libssl-dev \
doxygen
+$ # (Optional) for building docker image
+$ apt-get install docker.io
+$ # (Optional) for system integration tests
+$ apt-get install docker.io python-virtualenv
```
OS X Using Homebrew (with XCode Command Line Tools installed)
@@ -124,6 +133,9 @@ $ brew install cmake \
doxygen
$ brew install curl
$ brew link curl --force
+$ # (Optional) for building docker image/running system integration tests
+$ # Install docker using instructions at https://docs.docker.com/docker-for-mac/install/
+$ sudo pip install virtualenv
```
@@ -221,6 +233,12 @@ Successfully built c390063d9bd1
Built target docker
```
+- (Optional) Execute system integration tests using the docker image built locally on a docker daemon running locally.
+```
+~/Development/code/apache/nifi-minifi-cpp/build
+$ make docker-verify
+```
+
### Cleaning
Remove the build directory created above.
```
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/17fe045b/cmake/DockerConfig.cmake
----------------------------------------------------------------------
diff --git a/cmake/DockerConfig.cmake b/cmake/DockerConfig.cmake
index 41ca7f7..57270e4 100644
--- a/cmake/DockerConfig.cmake
+++ b/cmake/DockerConfig.cmake
@@ -21,4 +21,6 @@ add_custom_target(
COMMAND ${CMAKE_SOURCE_DIR}/docker/DockerBuild.sh 1000 1000 ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH} minificppsource ${CMAKE_SOURCE_DIR}
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/docker/)
-
+add_custom_target(
+ docker-verify
+ COMMAND ${CMAKE_SOURCE_DIR}/docker/DockerVerify.sh)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/17fe045b/docker/DockerVerify.sh
----------------------------------------------------------------------
diff --git a/docker/DockerVerify.sh b/docker/DockerVerify.sh
new file mode 100755
index 0000000..be62c1f
--- /dev/null
+++ b/docker/DockerVerify.sh
@@ -0,0 +1,38 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -e
+
+docker_dir="$( cd ${0%/*} && pwd )"
+
+# Create virutal environment for testing
+if [[ ! -d ./test-env-py2 ]]; then
+ echo "Creating virtual environment in ./test-env-py2" 1>&2
+ virtualenv ./test-env-py2
+fi
+
+echo "Activating virtual environment..." 1>&2
+. ./test-env-py2/bin/activate
+pip install --upgrade pip setuptools
+
+# Install test dependencies
+echo "Installing test dependencies..." 1>&2
+pip install --upgrade pytest docker PyYAML watchdog
+
+export MINIFI_VERSION=0.3.0
+export PYTHONPATH="${PYTHONPATH}:${docker_dir}/test/integration"
+pytest -s -v "${docker_dir}"/test/integration
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/17fe045b/docker/test/integration/.gitignore
----------------------------------------------------------------------
diff --git a/docker/test/integration/.gitignore b/docker/test/integration/.gitignore
new file mode 100644
index 0000000..bee8a64
--- /dev/null
+++ b/docker/test/integration/.gitignore
@@ -0,0 +1 @@
+__pycache__
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/17fe045b/docker/test/integration/.ropeproject/config.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/.ropeproject/config.py b/docker/test/integration/.ropeproject/config.py
new file mode 100644
index 0000000..0bf7750
--- /dev/null
+++ b/docker/test/integration/.ropeproject/config.py
@@ -0,0 +1,112 @@
+# The default ``config.py``
+# flake8: noqa
+
+
+def set_prefs(prefs):
+ """This function is called before opening the project"""
+
+ # Specify which files and folders to ignore in the project.
+ # Changes to ignored resources are not added to the history and
+ # VCSs. Also they are not returned in `Project.get_files()`.
+ # Note that ``?`` and ``*`` match all characters but slashes.
+ # '*.pyc': matches 'test.pyc' and 'pkg/test.pyc'
+ # 'mod*.pyc': matches 'test/mod1.pyc' but not 'mod/1.pyc'
+ # '.svn': matches 'pkg/.svn' and all of its children
+ # 'build/*.o': matches 'build/lib.o' but not 'build/sub/lib.o'
+ # 'build//*.o': matches 'build/lib.o' and 'build/sub/lib.o'
+ prefs['ignored_resources'] = ['*.pyc', '*~', '.ropeproject',
+ '.hg', '.svn', '_svn', '.git', '.tox']
+
+ # Specifies which files should be considered python files. It is
+ # useful when you have scripts inside your project. Only files
+ # ending with ``.py`` are considered to be python files by
+ # default.
+ #prefs['python_files'] = ['*.py']
+
+ # Custom source folders: By default rope searches the project
+ # for finding source folders (folders that should be searched
+ # for finding modules). You can add paths to that list. Note
+ # that rope guesses project source folders correctly most of the
+ # time; use this if you have any problems.
+ # The folders should be relative to project root and use '/' for
+ # separating folders regardless of the platform rope is running on.
+ # 'src/my_source_folder' for instance.
+ #prefs.add('source_folders', 'src')
+
+ # You can extend python path for looking up modules
+ #prefs.add('python_path', '~/python/')
+
+ # Should rope save object information or not.
+ prefs['save_objectdb'] = True
+ prefs['compress_objectdb'] = False
+
+ # If `True`, rope analyzes each module when it is being saved.
+ prefs['automatic_soa'] = True
+ # The depth of calls to follow in static object analysis
+ prefs['soa_followed_calls'] = 0
+
+ # If `False` when running modules or unit tests "dynamic object
+ # analysis" is turned off. This makes them much faster.
+ prefs['perform_doa'] = True
+
+ # Rope can check the validity of its object DB when running.
+ prefs['validate_objectdb'] = True
+
+ # How many undos to hold?
+ prefs['max_history_items'] = 32
+
+ # Shows whether to save history across sessions.
+ prefs['save_history'] = True
+ prefs['compress_history'] = False
+
+ # Set the number spaces used for indenting. According to
+ # :PEP:`8`, it is best to use 4 spaces. Since most of rope's
+ # unit-tests use 4 spaces it is more reliable, too.
+ prefs['indent_size'] = 4
+
+ # Builtin and c-extension modules that are allowed to be imported
+ # and inspected by rope.
+ prefs['extension_modules'] = []
+
+ # Add all standard c-extensions to extension_modules list.
+ prefs['import_dynload_stdmods'] = True
+
+ # If `True` modules with syntax errors are considered to be empty.
+ # The default value is `False`; When `False` syntax errors raise
+ # `rope.base.exceptions.ModuleSyntaxError` exception.
+ prefs['ignore_syntax_errors'] = False
+
+ # If `True`, rope ignores unresolvable imports. Otherwise, they
+ # appear in the importing namespace.
+ prefs['ignore_bad_imports'] = False
+
+ # If `True`, rope will insert new module imports as
+ # `from <package> import <module>` by default.
+ prefs['prefer_module_from_imports'] = False
+
+ # If `True`, rope will transform a comma list of imports into
+ # multiple separate import statements when organizing
+ # imports.
+ prefs['split_imports'] = False
+
+ # If `True`, rope will remove all top-level import statements and
+ # reinsert them at the top of the module when making changes.
+ prefs['pull_imports_to_top'] = True
+
+ # If `True`, rope will sort imports alphabetically by module name instead of
+ # alphabetically by import statement, with from imports after normal
+ # imports.
+ prefs['sort_imports_alphabetically'] = False
+
+ # Location of implementation of rope.base.oi.type_hinting.interfaces.ITypeHintingFactory
+ # In general case, you don't have to change this value, unless you're an rope expert.
+ # Change this value to inject you own implementations of interfaces
+ # listed in module rope.base.oi.type_hinting.providers.interfaces
+ # For example, you can add you own providers for Django Models, or disable the search
+ # type-hinting in a class hierarchy, etc.
+ prefs['type_hinting_factory'] = 'rope.base.oi.type_hinting.factory.default_type_hinting_factory'
+
+
+def project_opened(project):
+ """This function is called after opening the project"""
+ # Do whatever you like here!
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/17fe045b/docker/test/integration/README.md
----------------------------------------------------------------------
diff --git a/docker/test/integration/README.md b/docker/test/integration/README.md
new file mode 100644
index 0000000..6d8c066
--- /dev/null
+++ b/docker/test/integration/README.md
@@ -0,0 +1,184 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Apache MiNiFi Docker System Integration Tests
+
+Apache MiNiFi includes a suite of docker-based system integration tests. These
+tests are designed to test the integration between distinct MiNiFi instances as
+well as other systems which are available in docker, such as Apache NiFi.
+
+## Test Execution Lifecycle
+
+Each test involves the following stages as part of its execution lifecycle:
+
+### Definition of flows/Flow DSL
+
+Flows are defined using a python-native domain specific language (DSL). The DSL
+supports the standard primitives which make up a NiFi/MiNiFi flow, such as
+processors, connections, and controller services. Several processors defined in
+the DSL have optional, named parameters enabling concise flow expression.
+
+By default, all relationships are set to auto-terminate. If a relationship is
+used, it is automatically taken out of the auto\_terminate list.
+
+**Example Trivial Flow:**
+
+```python
+flow = GetFile('/tmp/input') >> LogAttribute() >> PutFile('/tmp/output')
+```
+
+#### Supported Processors
+
+The following processors/parameters are supported:
+
+**GetFile**
+
+- input\_dir
+
+**PutFile**
+
+- output\_dir
+
+**LogAttribute**
+
+**ListenHTTP**
+
+- port
+- cert=None
+
+**InvokeHTTP**
+
+- url
+- method='GET'
+- ssl\_context\_service=None
+
+### Definition of an output validator
+
+The output validator is responsible for checking the state of a cluster for
+valid output conditions. Currently, the only supported output validator is the
+SingleFileOutputValidator, which looks for a single file to be written to
+/tmp/output by a flow having a given string as its contents.
+
+**Example SingleFileOutputValidator:**
+
+```python
+SingleFileOutputValidator('example output')
+```
+
+This example SingleFileOutputValidator would validate that a single file is
+written with the contents 'example output.'
+
+### Creation of a DockerTestCluster
+
+DockerTestCluster instances are used to deploy one or more flow to a simulated
+or actual multi-host docker cluster. This enables testing of interactions
+between multiple system components, such as MiNiFi flows. Before the test
+cluster is destroyed, an assertion may be performed on the results of the
+*check\_output()* method of the cluster. This invokes the validator supplied at
+construction against the output state of the system.
+
+Creation of a DockerTestCluster is simple:
+
+**Example DockerTestCluster Instantiation:**
+
+```python
+with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+ ...
+ # Perform test operations
+ ...
+ assert cluster.check_output()
+```
+
+Note that a docker cluster must be created inside of a *with* structure to
+ensure that all resources are ccreated and destroyed cleanly.
+
+### Insertion of test input data
+
+Although arbitrary NiFi flows can ingest data from a multitude of sources, a
+MiNiFi system integration test is expected to receive input via deterministed,
+controlled channels. The primary supported method of providing input to a
+MiNiFi system integration test is to insert data into the filesystem at
+/tmp/input.
+
+To write a string to the contents of a file in /tmp/input, use the
+*put\_test\_data()* method.
+
+**Example put\_test\_data() Usage:**
+
+```python
+cluster.put_test_data('test')
+```
+
+This writes a file with a random name to /tmp/input, with the contents 'test.'
+
+To provide a resource to a container, such as a TLS certificate, use the
+*put\_test\_resource()* method to write a resource file to /tmp/resources.
+
+**Example put\_test\_resource() Usage:**
+
+```python
+cluster.put_test_resource('test-resource', 'resource contents')
+```
+
+This writes a file to /tmp/resources/test-resource with the contents 'resource
+contents.'
+
+### Deployment of one or more flows
+
+Deployment of flows to a test cluster is performed using the *deploy\_flow()*
+method of a cluster. Each flow is deployed as a separate docker service having
+its own DNS name. If a name is not provided upon deployment, a random name will
+be used.
+
+**Example deploy\_flow() Usage:**
+
+```python
+cluster.deploy_flow(flow, name='test-flow')
+```
+
+### Execution of one or more flows
+
+Flows are executed immediately upon deployment and according to schedule
+properties defined in the flow.yml. As such, to minimize test latency it is
+important to ensure that test inputs are added to the test cluster before flows
+are deployed. Filesystem events are monitored using event APIs, ensuring that
+flows are executed immediately upon input availability and output is validated
+immediately after it is written to disk.
+
+### Output validation
+
+As soon as data is written to /tmp/output, the OutputValidator (defined
+according to the documentation above) is executed on the output. The
+*check\_output()* cluster method waits for up to 5 seconds for valid output.
+
+### Cluster teardown/cleanup
+
+The deployment of a test cluster involves creating one or more docker
+containers and networks, as well as temporary files/directories on the host
+system. All resources are cleaned up automatically as long as clusters are
+created within a *with* block.
+
+```python
+
+# Using the with block ensures that all cluster resources are cleaned up after
+# the test cluster is no longer needed.
+
+with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+ ...
+ # Perform test operations
+ ...
+ assert cluster.check_output()
+```
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/17fe045b/docker/test/integration/minifi/__init__.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py
new file mode 100644
index 0000000..557b9a8
--- /dev/null
+++ b/docker/test/integration/minifi/__init__.py
@@ -0,0 +1,298 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import uuid
+import logging
+
+import yaml
+import docker
+
+
+class Cluster(object):
+ """
+ Base Cluster class. This is intended to be a generic interface
+ to different types of clusters. Clusters could be Kubernetes clusters,
+ Docker swarms, or cloud compute/container services.
+ """
+
+ def deploy_flow(self, flow):
+ """
+ Deploys a flow to the cluster.
+ """
+
+ def __enter__(self):
+ """
+ Allocate ephemeral cluster resources.
+ """
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """
+ Clean up ephemeral cluster resources.
+ """
+
+class SingleNodeDockerCluster(Cluster):
+ """
+ A "cluster" which consists of a single docker node. Useful for
+ testing or use-cases which do not span multiple compute nodes.
+ """
+
+ def __init__(self):
+ self.network = None
+ self.containers = []
+ self.tmp_files = []
+
+ # Get docker client
+ self.client = docker.from_env()
+
+ def deploy_flow(self, flow, name=None, vols={}):
+ """
+ Compiles the flow to YAML and maps it into the container using
+ the docker volumes API.
+ """
+
+ logging.info('Deploying flow...')
+
+ if name is None:
+ name = 'minifi-' + str(uuid.uuid4())
+ logging.info('Flow name was not provided; using generated name \'%s\'', name)
+
+ minifi_version = os.environ['MINIFI_VERSION']
+ self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + minifi_version
+
+ # Write flow config
+ tmp_flow_file_name = '/tmp/.minifi-flow.' + str(uuid.uuid4())
+ self.tmp_files.append(tmp_flow_file_name)
+
+ yaml = flow_yaml(flow)
+
+ logging.info('Using generated flow config yml:\n%s', yaml)
+
+ with open(tmp_flow_file_name, 'w') as tmp_flow_file:
+ tmp_flow_file.write(yaml)
+
+ conf_file = tmp_flow_file_name
+
+ local_vols = {}
+ local_vols[conf_file] = {'bind': self.minifi_root + '/conf/config.yml', 'mode': 'ro'}
+ local_vols.update(vols)
+
+ logging.info('Creating and running docker container for flow...')
+
+ # Create network if necessary
+ if self.network is None:
+ net_name = 'minifi-' + str(uuid.uuid4())
+ logging.info('Creating network: %s', net_name)
+ self.network = self.client.networks.create(net_name)
+
+ container = self.client.containers.run(
+ 'apacheminificpp:' + minifi_version,
+ detach=True,
+ name=name,
+ network=self.network.name,
+ volumes=local_vols)
+
+ logging.info('Started container \'%s\'', container.name)
+ self.containers.append(container)
+
+ def __enter__(self):
+ """
+ Allocate ephemeral cluster resources.
+ """
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """
+ Clean up ephemeral cluster resources
+ """
+
+ # Clean up containers
+ for container in self.containers:
+ logging.info('Cleaning up container: %s', container.name)
+ container.remove(v=True, force=True)
+
+ # Clean up network
+ if self.network is not None:
+ logging.info('Cleaning up network network: %s', self.network.name)
+ self.network.remove()
+
+ # Clean up tmp files
+ for tmp_file in self.tmp_files:
+ os.remove(tmp_file)
+
+
+class Processor(object):
+
+ def __init__(self,
+ clazz,
+ properties={},
+ schedule={},
+ name=None,
+ auto_terminate=[]):
+ self.connections = {}
+ self.uuid = uuid.uuid4()
+
+ if name is None:
+ self.name = str(self.uuid)
+
+ self.clazz = clazz
+ self.properties = properties
+ self.auto_terminate = auto_terminate
+
+ self.out_proc = self
+
+ self.schedule = {
+ 'scheduling strategy': 'EVENT_DRIVEN',
+ 'scheduling period': '1 sec',
+ 'penalization period': '30 sec',
+ 'yield period': '1 sec',
+ 'run duration nanos': 0
+ }
+ self.schedule.update(schedule)
+
+ def connect(self, connections):
+ for rel in connections:
+
+ # Ensure that rel is not auto-terminated
+ if rel in self.auto_terminate:
+ del self.auto_terminate[self.auto_terminate.index(rel)]
+
+ # Add to set of output connections for this rel
+ if not rel in self.connections:
+ self.connections[rel] = []
+ self.connections[rel].append(connections[rel])
+
+ return self
+
+ def __rshift__(self, other):
+ """
+ Right shift operator to support flow DSL, for example:
+
+ GetFile('/input') >> LogAttribute() >> PutFile('/output')
+
+ """
+
+ if (isinstance(other, tuple)):
+ if (isinstance(other[0], tuple)):
+ for rel_tuple in other:
+ rel = {}
+ rel[rel_tuple[0]] = rel_tuple[1]
+ self.out_proc.connect(rel)
+ else:
+ rel = {}
+ rel[other[0]] = other[1]
+ self.out_proc.connect(rel)
+ else:
+ self.out_proc.connect({'success': other})
+ self.out_proc = other
+
+ return self
+
+
+def InvokeHTTP(url, method='GET'):
+ return Processor('InvokeHTTP',
+ properties={'Remote URL': url,
+ 'HTTP Method': method},
+ auto_terminate=['success',
+ 'response',
+ 'retry',
+ 'failure',
+ 'no retry'])
+
+
+def ListenHTTP(port):
+ return Processor('ListenHTTP',
+ properties={'Listening Port': port},
+ auto_terminate=['success'])
+
+
+def LogAttribute():
+ return Processor('LogAttribute',
+ auto_terminate=['success'])
+
+
+def GetFile(input_dir):
+ return Processor('GetFile',
+ properties={'Input Directory': input_dir},
+ schedule={'scheduling period': '0 sec'},
+ auto_terminate=['success'])
+
+
+def PutFile(output_dir):
+ return Processor('PutFile',
+ properties={'Output Directory': output_dir},
+ auto_terminate=['success', 'failure'])
+
+
+def flow_yaml(processor, root=None, visited=[]):
+
+ if root is None:
+ res = {
+ 'Flow Controller': {
+ 'name': 'MiNiFi Flow'
+ },
+ 'Processors': [],
+ 'Connections': [],
+ 'Remote Processing Groups': []
+ }
+ else:
+ res = root
+
+ visited.append(processor)
+
+ if hasattr(processor, 'name'):
+ proc_name = processor.name
+ else:
+ proc_name = str(processor.uuid)
+
+ res['Processors'].append({
+ 'name': proc_name,
+ 'id': str(processor.uuid),
+ 'class': 'org.apache.nifi.processors.standard.' + processor.clazz,
+ 'scheduling strategy': processor.schedule['scheduling strategy'],
+ 'scheduling period': processor.schedule['scheduling period'],
+ 'penalization period': processor.schedule['penalization period'],
+ 'yield period': processor.schedule['yield period'],
+ 'run duration nanos': processor.schedule['run duration nanos'],
+ 'Properties': processor.properties,
+ 'auto-terminated relationships list': processor.auto_terminate
+ })
+
+ for conn_name in processor.connections:
+ conn_procs = processor.connections[conn_name]
+
+ if isinstance(conn_procs, list):
+ for proc in conn_procs:
+ res['Connections'].append({
+ 'name': str(uuid.uuid4()),
+ 'source id': str(processor.uuid),
+ 'source relationship name': conn_name,
+ 'destination id': str(proc.uuid)
+ })
+ if proc not in visited:
+ flow_yaml(proc, res, visited)
+ else:
+ res['Connections'].append({
+ 'name': str(uuid.uuid4()),
+ 'source id': str(processor.uuid),
+ 'source relationship name': conn_name,
+ 'destination id': str(conn_procs.uuid)
+ })
+ if conn_procs not in visited:
+ flow_yaml(conn_procs, res, visited)
+
+ if root is None:
+ return yaml.dump(res, default_flow_style=False)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/17fe045b/docker/test/integration/minifi/test/__init__.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py
new file mode 100644
index 0000000..a7a3030
--- /dev/null
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import logging
+import shutil
+import uuid
+
+from threading import Event
+
+from os import listdir
+from os.path import isfile, join
+
+from watchdog.observers import Observer
+from watchdog.events import FileSystemEventHandler
+
+from minifi import SingleNodeDockerCluster
+
+logging.basicConfig(level=logging.DEBUG)
+
+
+class DockerTestCluster(SingleNodeDockerCluster):
+
+ def __init__(self, output_validator):
+
+ # Create test input/output directories
+ test_cluster_id = str(uuid.uuid4())
+
+ self.tmp_test_output_dir = '/tmp/.minifi-test-output.' + test_cluster_id
+ self.tmp_test_input_dir = '/tmp/.minifi-test-input.' + test_cluster_id
+
+ logging.info('Creating tmp test input dir: %s', self.tmp_test_input_dir)
+ os.makedirs(self.tmp_test_input_dir, mode=0777)
+ logging.info('Creating tmp test output dir: %s', self.tmp_test_output_dir)
+ os.makedirs(self.tmp_test_output_dir, mode=0777)
+
+ # Point output validator to ephemeral output dir
+ self.output_validator = output_validator
+ output_validator.set_output_dir(self.tmp_test_output_dir)
+
+ # Start observing output dir
+ self.done_event = Event()
+ event_handler = OutputEventHandler(output_validator, self.done_event)
+ self.observer = Observer()
+ self.observer.schedule(event_handler, self.tmp_test_output_dir)
+ self.observer.start()
+
+ super(DockerTestCluster, self).__init__()
+
+ def deploy_flow(self, flow, name=None):
+ """
+ Performs a standard container flow deployment with the addition
+ of volumes supporting test input/output directories.
+ """
+
+ vols = {}
+ vols[self.tmp_test_input_dir] = {'bind': '/tmp/input', 'mode': 'rw'}
+ vols[self.tmp_test_output_dir] = {'bind': '/tmp/output', 'mode': 'rw'}
+
+ super(DockerTestCluster, self).deploy_flow(flow, vols=vols, name=name)
+
+ def put_test_data(self, contents):
+ """
+ Creates a randomly-named file in the test input dir and writes
+ the given content to it.
+ """
+
+ test_file_name = join(self.tmp_test_input_dir, str(uuid.uuid4()))
+ logging.info('Writing %d bytes of content to test file: %s', len(contents), test_file_name)
+
+ with open(test_file_name, 'w') as test_input_file:
+ test_input_file.write(contents)
+
+ def wait_for_output(self, timeout_seconds):
+ logging.info('Waiting up to %d seconds for test output...', timeout_seconds)
+ self.done_event.wait(timeout_seconds)
+ self.observer.stop()
+ self.observer.join()
+
+ def log_minifi_output(self):
+
+ for container in self.containers:
+ container = self.client.containers.get(container.id)
+ logging.info('Container logs for container \'%s\':\n%s', container.name, container.logs())
+ if container.status == 'running':
+ app_logs = container.exec_run('cat ' + self.minifi_root + '/minifi-app.log')
+ logging.info('MiNiFi app logs for container \'%s\':\n%s', container.name, app_logs)
+ else:
+ logging.info(container.status)
+ logging.info('Could not cat app logs for container \'%s\' because it is not running',
+ container.name)
+ stats = container.stats(decode=True, stream=False)
+ logging.info('Container stats:\n%s', repr(stats))
+
+ def check_output(self):
+ """
+ Wait for flow output, validate it, and log minifi output.
+ """
+ self.wait_for_output(5)
+ self.log_minifi_output()
+
+ return self.output_validator.validate()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """
+ Clean up ephemeral test resources.
+ """
+
+ logging.info('Removing tmp test input dir: %s', self.tmp_test_input_dir)
+ shutil.rmtree(self.tmp_test_input_dir)
+ logging.info('Removing tmp test output dir: %s', self.tmp_test_output_dir)
+ shutil.rmtree(self.tmp_test_output_dir)
+
+ super(DockerTestCluster, self).__exit__(exc_type, exc_val, exc_tb)
+
+
+class OutputEventHandler(FileSystemEventHandler):
+
+ def __init__(self, validator, done_event):
+ self.validator = validator
+ self.done_event = done_event
+
+ def on_created(self, event):
+ logging.info('Output file created: ' + event.src_path)
+ self.check(event)
+
+ def on_modified(self, event):
+ logging.info('Output file modified: ' + event.src_path)
+ self.check(event)
+
+ def check(self, event):
+ if self.validator.validate():
+ logging.info('Output file is valid')
+ self.done_event.set()
+ else:
+ logging.info('Output file is invalid')
+
+
+class OutputValidator(object):
+ """
+ Base output validator class. Validators must implement
+ method validate, which returns a boolean.
+ """
+
+ def validate(self):
+ """
+ Return True if output is valid; False otherwise.
+ """
+
+class SingleFileOutputValidator(OutputValidator):
+ """
+ Validates the content of a single file in the given directory.
+ """
+
+ def __init__(self, expected_content):
+ self.valid = False
+ self.expected_content = expected_content
+
+ def set_output_dir(self, output_dir):
+ self.output_dir = output_dir
+
+ def validate(self):
+
+ if self.valid:
+ return True
+
+ listing = listdir(self.output_dir)
+
+ if len(listing) > 0:
+ out_file_name = listing[0]
+
+ with open(join(self.output_dir, out_file_name), 'r') as out_file:
+ contents = out_file.read()
+
+ if contents == self.expected_content:
+ self.valid = True
+ return True
+
+ return False
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/17fe045b/docker/test/integration/test_filesystem_ops.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/test_filesystem_ops.py b/docker/test/integration/test_filesystem_ops.py
new file mode 100644
index 0000000..7a6f212
--- /dev/null
+++ b/docker/test/integration/test_filesystem_ops.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from minifi import *
+from minifi.test import *
+
+
+def test_get_put():
+ """
+ Verify basic file get/put operations.
+ """
+
+ flow = GetFile('/tmp/input') >> LogAttribute() >> PutFile('/tmp/output')
+
+ with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+
+ cluster.put_test_data('test')
+ cluster.deploy_flow(flow)
+
+ assert cluster.check_output()
+
+
+def test_file_exists_failure():
+ """
+ Verify that putting to a file that already exists fails.
+ """
+
+ flow = (GetFile('/tmp/input') >>
+
+ # First put should succeed
+ PutFile('/tmp') >>
+
+ # Second put should fail (file exists)
+ PutFile('/tmp') >> (('success', LogAttribute()),
+ ('failure', LogAttribute() >> PutFile('/tmp/output'))))
+
+ with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+
+ cluster.put_test_data('test')
+ cluster.deploy_flow(flow)
+
+ assert cluster.check_output()
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/17fe045b/docker/test/integration/test_http.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/test_http.py b/docker/test/integration/test_http.py
new file mode 100644
index 0000000..72c80bd
--- /dev/null
+++ b/docker/test/integration/test_http.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from minifi import *
+from minifi.test import *
+
+
+def test_invoke_listen():
+ """
+ Verify sending using InvokeHTTP to a receiver using ListenHTTP.
+ """
+
+ invoke_flow = (GetFile('/tmp/input') >> LogAttribute() >>
+ InvokeHTTP('http://minifi-listen:8080/contentListener', method='POST'))
+
+ listen_flow = ListenHTTP(8080) >> LogAttribute() >> PutFile('/tmp/output')
+
+ with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+
+ cluster.put_test_data('test')
+ cluster.deploy_flow(listen_flow, name='minifi-listen')
+ cluster.deploy_flow(invoke_flow, name='minifi-invoke')
+
+ assert cluster.check_output()