You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/09/14 17:39:15 UTC
nifi-minifi-cpp git commit: MINIFI-374 Created automated tests for
HTTPS integration cases
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 84f50b517 -> 6283a447e
MINIFI-374 Created automated tests for HTTPS integration cases
This closes #135.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/6283a447
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/6283a447
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/6283a447
Branch: refs/heads/master
Commit: 6283a447efea55473f4cedb860f0415ec729c00a
Parents: 84f50b5
Author: Andrew I. Christianson <an...@andyic.org>
Authored: Tue Sep 12 14:10:32 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Thu Sep 14 13:38:31 2017 -0400
----------------------------------------------------------------------
.gitignore | 1 +
docker/DockerVerify.sh | 19 ++-
docker/test/integration/.gitignore | 1 +
docker/test/integration/minifi/__init__.py | 127 +++++++++++++++----
docker/test/integration/minifi/test/__init__.py | 42 ++++--
docker/test/integration/test_https.py | 103 +++++++++++++++
6 files changed, 254 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6283a447/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index c28b765..0d62996 100644
--- a/.gitignore
+++ b/.gitignore
@@ -58,3 +58,4 @@ docker/minificppsource
.cproject
.settings
*.pyc
+/cmake-build-*
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6283a447/docker/DockerVerify.sh
----------------------------------------------------------------------
diff --git a/docker/DockerVerify.sh b/docker/DockerVerify.sh
index be62c1f..f7014fd 100755
--- a/docker/DockerVerify.sh
+++ b/docker/DockerVerify.sh
@@ -31,8 +31,23 @@ pip install --upgrade pip setuptools
# Install test dependencies
echo "Installing test dependencies..." 1>&2
-pip install --upgrade pytest docker PyYAML watchdog
+
+# hint include/library paths if homewbrew is in use
+if brew list 2> /dev/null | grep openssl > /dev/null 2>&1; then
+ echo "Using homebrew paths for openssl" 1>&2
+ export LDFLAGS="-L$(brew --prefix openssl)/lib"
+ export CFLAGS="-I$(brew --prefix openssl)/include"
+ export SWIG_FEATURES="-cpperraswarn -includeall -I$(brew --prefix openssl)/include"
+fi
+
+pip install --upgrade \
+ pytest \
+ docker \
+ PyYAML \
+ m2crypto \
+ watchdog
export MINIFI_VERSION=0.3.0
export PYTHONPATH="${PYTHONPATH}:${docker_dir}/test/integration"
-pytest -s -v "${docker_dir}"/test/integration
+
+exec pytest -s -v "${docker_dir}"/test/integration
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6283a447/docker/test/integration/.gitignore
----------------------------------------------------------------------
diff --git a/docker/test/integration/.gitignore b/docker/test/integration/.gitignore
index bee8a64..bdf3349 100644
--- a/docker/test/integration/.gitignore
+++ b/docker/test/integration/.gitignore
@@ -1 +1,2 @@
__pycache__
+/.ropeproject
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6283a447/docker/test/integration/minifi/__init__.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py
index 557b9a8..2d83126 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -28,7 +28,7 @@ class Cluster(object):
Docker swarms, or cloud compute/container services.
"""
- def deploy_flow(self, flow):
+ def deploy_flow(self, flow, name=None, vols=None):
"""
Deploys a flow to the cluster.
"""
@@ -44,6 +44,7 @@ class Cluster(object):
Clean up ephemeral cluster resources.
"""
+
class SingleNodeDockerCluster(Cluster):
"""
A "cluster" which consists of a single docker node. Useful for
@@ -58,12 +59,15 @@ class SingleNodeDockerCluster(Cluster):
# Get docker client
self.client = docker.from_env()
- def deploy_flow(self, flow, name=None, vols={}):
+ def deploy_flow(self, flow, name=None, vols=None):
"""
Compiles the flow to YAML and maps it into the container using
the docker volumes API.
"""
+ if vols is None:
+ vols = {}
+
logging.info('Deploying flow...')
if name is None:
@@ -99,11 +103,11 @@ class SingleNodeDockerCluster(Cluster):
self.network = self.client.networks.create(net_name)
container = self.client.containers.run(
- 'apacheminificpp:' + minifi_version,
- detach=True,
- name=name,
- network=self.network.name,
- volumes=local_vols)
+ 'apacheminificpp:' + minifi_version,
+ detach=True,
+ name=name,
+ network=self.network.name,
+ volumes=local_vols)
logging.info('Started container \'%s\'', container.name)
self.containers.append(container)
@@ -135,13 +139,26 @@ class SingleNodeDockerCluster(Cluster):
class Processor(object):
-
def __init__(self,
clazz,
- properties={},
- schedule={},
+ properties=None,
+ schedule=None,
name=None,
- auto_terminate=[]):
+ auto_terminate=None,
+ controller_services=None):
+
+ if controller_services is None:
+ controller_services = []
+
+ if auto_terminate is None:
+ auto_terminate = []
+
+ if schedule is None:
+ schedule = {}
+
+ if properties is None:
+ properties = {}
+
self.connections = {}
self.uuid = uuid.uuid4()
@@ -151,6 +168,7 @@ class Processor(object):
self.clazz = clazz
self.properties = properties
self.auto_terminate = auto_terminate
+ self.controller_services = controller_services
self.out_proc = self
@@ -185,15 +203,13 @@ class Processor(object):
"""
- if (isinstance(other, tuple)):
- if (isinstance(other[0], tuple)):
+ if isinstance(other, tuple):
+ if isinstance(other[0], tuple):
for rel_tuple in other:
- rel = {}
- rel[rel_tuple[0]] = rel_tuple[1]
+ rel = {rel_tuple[0]: rel_tuple[1]}
self.out_proc.connect(rel)
else:
- rel = {}
- rel[other[0]] = other[1]
+ rel = {other[0]: other[1]}
self.out_proc.connect(rel)
else:
self.out_proc.connect({'success': other})
@@ -202,10 +218,20 @@ class Processor(object):
return self
-def InvokeHTTP(url, method='GET'):
+def InvokeHTTP(url,
+ method='GET',
+ ssl_context_service=None):
+ properties = {'Remote URL': url, 'HTTP Method': method}
+
+ controller_services = []
+
+ if ssl_context_service is not None:
+ properties['SSL Context Service'] = ssl_context_service.name
+ controller_services.append(ssl_context_service)
+
return Processor('InvokeHTTP',
- properties={'Remote URL': url,
- 'HTTP Method': method},
+ properties=properties,
+ controller_services=controller_services,
auto_terminate=['success',
'response',
'retry',
@@ -213,9 +239,14 @@ def InvokeHTTP(url, method='GET'):
'no retry'])
-def ListenHTTP(port):
+def ListenHTTP(port, cert=None):
+ properties = {'Listening Port': port}
+
+ if cert is not None:
+ properties['SSL Certificate'] = cert
+
return Processor('ListenHTTP',
- properties={'Listening Port': port},
+ properties=properties,
auto_terminate=['success'])
@@ -237,7 +268,42 @@ def PutFile(output_dir):
auto_terminate=['success', 'failure'])
-def flow_yaml(processor, root=None, visited=[]):
+class ControllerService(object):
+ def __init__(self, name=None, properties=None):
+
+ self.id = str(uuid.uuid4())
+
+ if name is None:
+ self.name = str(uuid.uuid4())
+ logging.info('Controller service name was not provided; using generated name \'%s\'', self.name)
+ else:
+ self.name = name
+
+ if properties is None:
+ properties = {}
+
+ self.properties = properties
+
+
+class SSLContextService(ControllerService):
+ def __init__(self, name=None, cert=None, key=None, ca_cert=None):
+ super(SSLContextService, self).__init__(name=name)
+
+ self.service_class = 'SSLContextService'
+
+ if cert is not None:
+ self.properties['Client Certificate'] = cert
+
+ if key is not None:
+ self.properties['Private Key'] = key
+
+ if ca_cert is not None:
+ self.properties['CA Certificate'] = ca_cert
+
+
+def flow_yaml(processor, root=None, visited=None):
+ if visited is None:
+ visited = []
if root is None:
res = {
@@ -246,7 +312,8 @@ def flow_yaml(processor, root=None, visited=[]):
},
'Processors': [],
'Connections': [],
- 'Remote Processing Groups': []
+ 'Remote Processing Groups': [],
+ 'Controller Services': []
}
else:
res = root
@@ -271,6 +338,18 @@ def flow_yaml(processor, root=None, visited=[]):
'auto-terminated relationships list': processor.auto_terminate
})
+ for svc in processor.controller_services:
+ if svc in visited:
+ continue
+
+ visited.append(svc)
+ res['Controller Services'].append({
+ 'name': svc.name,
+ 'id': svc.id,
+ 'class': svc.service_class,
+ 'Properties': svc.properties
+ })
+
for conn_name in processor.connections:
conn_procs = processor.connections[conn_name]
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6283a447/docker/test/integration/minifi/test/__init__.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py
index a7a3030..8c0aee9 100644
--- a/docker/test/integration/minifi/test/__init__.py
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -13,24 +13,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
import logging
import shutil
import uuid
-
from threading import Event
+import os
from os import listdir
-from os.path import isfile, join
-
-from watchdog.observers import Observer
+from os.path import join
from watchdog.events import FileSystemEventHandler
+from watchdog.observers import Observer
from minifi import SingleNodeDockerCluster
logging.basicConfig(level=logging.DEBUG)
+def put_file_contents(contents, file_abs_path):
+ logging.info('Writing %d bytes of content to file: %s', len(contents), file_abs_path)
+ with open(file_abs_path, 'w') as test_input_file:
+ test_input_file.write(contents)
+
+
class DockerTestCluster(SingleNodeDockerCluster):
def __init__(self, output_validator):
@@ -40,11 +44,14 @@ class DockerTestCluster(SingleNodeDockerCluster):
self.tmp_test_output_dir = '/tmp/.minifi-test-output.' + test_cluster_id
self.tmp_test_input_dir = '/tmp/.minifi-test-input.' + test_cluster_id
+ self.tmp_test_resources_dir = '/tmp/.minifi-test-resources.' + test_cluster_id
logging.info('Creating tmp test input dir: %s', self.tmp_test_input_dir)
os.makedirs(self.tmp_test_input_dir, mode=0777)
logging.info('Creating tmp test output dir: %s', self.tmp_test_output_dir)
os.makedirs(self.tmp_test_output_dir, mode=0777)
+ logging.info('Creating tmp test resource dir: %s', self.tmp_test_resources_dir)
+ os.makedirs(self.tmp_test_resources_dir, mode=0777)
# Point output validator to ephemeral output dir
self.output_validator = output_validator
@@ -59,15 +66,15 @@ class DockerTestCluster(SingleNodeDockerCluster):
super(DockerTestCluster, self).__init__()
- def deploy_flow(self, flow, name=None):
+ def deploy_flow(self, flow, name=None, vols=None):
"""
Performs a standard container flow deployment with the addition
of volumes supporting test input/output directories.
"""
- vols = {}
- vols[self.tmp_test_input_dir] = {'bind': '/tmp/input', 'mode': 'rw'}
- vols[self.tmp_test_output_dir] = {'bind': '/tmp/output', 'mode': 'rw'}
+ vols = {self.tmp_test_input_dir: {'bind': '/tmp/input', 'mode': 'rw'},
+ self.tmp_test_output_dir: {'bind': '/tmp/output', 'mode': 'rw'},
+ self.tmp_test_resources_dir: {'bind': '/tmp/resources', 'mode': 'rw'}}
super(DockerTestCluster, self).deploy_flow(flow, vols=vols, name=name)
@@ -77,11 +84,18 @@ class DockerTestCluster(SingleNodeDockerCluster):
the given content to it.
"""
- test_file_name = join(self.tmp_test_input_dir, str(uuid.uuid4()))
- logging.info('Writing %d bytes of content to test file: %s', len(contents), test_file_name)
+ file_name = str(uuid.uuid4())
+ file_abs_path = join(self.tmp_test_input_dir, file_name)
+ put_file_contents(contents, file_abs_path)
+
+ def put_test_resource(self, file_name, contents):
+ """
+ Creates a resource file in the test resource dir and writes
+ the given content to it.
+ """
- with open(test_file_name, 'w') as test_input_file:
- test_input_file.write(contents)
+ file_abs_path = join(self.tmp_test_resources_dir, file_name)
+ put_file_contents(contents, file_abs_path)
def wait_for_output(self, timeout_seconds):
logging.info('Waiting up to %d seconds for test output...', timeout_seconds)
@@ -122,6 +136,8 @@ class DockerTestCluster(SingleNodeDockerCluster):
shutil.rmtree(self.tmp_test_input_dir)
logging.info('Removing tmp test output dir: %s', self.tmp_test_output_dir)
shutil.rmtree(self.tmp_test_output_dir)
+ logging.info('Removing tmp test resources dir: %s', self.tmp_test_output_dir)
+ shutil.rmtree(self.tmp_test_resources_dir)
super(DockerTestCluster, self).__exit__(exc_type, exc_val, exc_tb)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6283a447/docker/test/integration/test_https.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/test_https.py b/docker/test/integration/test_https.py
new file mode 100644
index 0000000..9912370
--- /dev/null
+++ b/docker/test/integration/test_https.py
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+
+from M2Crypto import X509, EVP, RSA, ASN1
+
+from minifi import *
+from minifi.test import *
+
+
+def callback():
+ pass
+
+
+def test_invoke_listen_https_one_way():
+ """
+ Verify sending using InvokeHTTP to a receiver using ListenHTTP (with TLS).
+ """
+
+ cert, key = gen_cert()
+
+ # TODO define SSLContextService class & generate config yml for services
+ crt_file = '/tmp/resources/test-crt.pem'
+
+ invoke_flow = (GetFile('/tmp/input')
+ >> LogAttribute()
+ >> InvokeHTTP('https://minifi-listen:4430/contentListener',
+ method='POST',
+ ssl_context_service=SSLContextService(ca_cert=crt_file)))
+
+ listen_flow = (ListenHTTP(4430, cert=crt_file)
+ >> LogAttribute()
+ >> PutFile('/tmp/output'))
+
+ with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+ cluster.put_test_resource('test-crt.pem', cert.as_pem() + key.as_pem(None, callback))
+ cluster.put_test_data('test')
+ cluster.deploy_flow(listen_flow, name='minifi-listen')
+ cluster.deploy_flow(invoke_flow, name='minifi-invoke')
+
+ assert cluster.check_output()
+
+
+def gen_cert():
+ """
+ Generate TLS certificate request for testing
+ """
+
+ req, key = gen_req()
+ pub_key = req.get_pubkey()
+ subject = req.get_subject()
+ cert = X509.X509()
+ # noinspection PyTypeChecker
+ cert.set_serial_number(1)
+ cert.set_version(2)
+ cert.set_subject(subject)
+ t = long(time.time())
+ now = ASN1.ASN1_UTCTIME()
+ now.set_time(t)
+ now_plus_year = ASN1.ASN1_UTCTIME()
+ now_plus_year.set_time(t + 60 * 60 * 24 * 365)
+ cert.set_not_before(now)
+ cert.set_not_after(now_plus_year)
+ issuer = X509.X509_Name()
+ issuer.C = 'US'
+ issuer.CN = 'minifi-listen'
+ cert.set_issuer(issuer)
+ cert.set_pubkey(pub_key)
+ cert.sign(key, 'sha256')
+
+ return cert, key
+
+
+def gen_req():
+ """
+ Generate TLS certificate request for testing
+ """
+
+ logging.info('Generating test certificate request')
+ key = EVP.PKey()
+ req = X509.Request()
+ rsa = RSA.gen_key(1024, 65537, callback)
+ key.assign_rsa(rsa)
+ req.set_pubkey(key)
+ name = req.get_subject()
+ name.C = 'US'
+ name.CN = 'minifi-listen'
+ req.sign(key, 'sha256')
+
+ return req, key