You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/09/14 17:39:15 UTC

nifi-minifi-cpp git commit: MINIFI-374 Created automated tests for HTTPS integration cases

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 84f50b517 -> 6283a447e


MINIFI-374 Created automated tests for HTTPS integration cases

This closes #135.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/6283a447
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/6283a447
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/6283a447

Branch: refs/heads/master
Commit: 6283a447efea55473f4cedb860f0415ec729c00a
Parents: 84f50b5
Author: Andrew I. Christianson <an...@andyic.org>
Authored: Tue Sep 12 14:10:32 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Thu Sep 14 13:38:31 2017 -0400

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 docker/DockerVerify.sh                          |  19 ++-
 docker/test/integration/.gitignore              |   1 +
 docker/test/integration/minifi/__init__.py      | 127 +++++++++++++++----
 docker/test/integration/minifi/test/__init__.py |  42 ++++--
 docker/test/integration/test_https.py           | 103 +++++++++++++++
 6 files changed, 254 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6283a447/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index c28b765..0d62996 100644
--- a/.gitignore
+++ b/.gitignore
@@ -58,3 +58,4 @@ docker/minificppsource
 .cproject
 .settings
 *.pyc
+/cmake-build-*

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6283a447/docker/DockerVerify.sh
----------------------------------------------------------------------
diff --git a/docker/DockerVerify.sh b/docker/DockerVerify.sh
index be62c1f..f7014fd 100755
--- a/docker/DockerVerify.sh
+++ b/docker/DockerVerify.sh
@@ -31,8 +31,23 @@ pip install --upgrade pip setuptools
 
 # Install test dependencies
 echo "Installing test dependencies..." 1>&2
-pip install --upgrade pytest docker PyYAML watchdog
+
+# hint include/library paths if homewbrew is in use
+if brew list 2> /dev/null | grep openssl > /dev/null 2>&1; then
+  echo "Using homebrew paths for openssl" 1>&2
+  export LDFLAGS="-L$(brew --prefix openssl)/lib"
+  export CFLAGS="-I$(brew --prefix openssl)/include"
+  export SWIG_FEATURES="-cpperraswarn -includeall -I$(brew --prefix openssl)/include"
+fi
+
+pip install --upgrade \
+            pytest \
+            docker \
+            PyYAML \
+            m2crypto \
+            watchdog
 
 export MINIFI_VERSION=0.3.0
 export PYTHONPATH="${PYTHONPATH}:${docker_dir}/test/integration"
-pytest -s -v "${docker_dir}"/test/integration
+
+exec pytest -s -v "${docker_dir}"/test/integration

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6283a447/docker/test/integration/.gitignore
----------------------------------------------------------------------
diff --git a/docker/test/integration/.gitignore b/docker/test/integration/.gitignore
index bee8a64..bdf3349 100644
--- a/docker/test/integration/.gitignore
+++ b/docker/test/integration/.gitignore
@@ -1 +1,2 @@
 __pycache__
+/.ropeproject

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6283a447/docker/test/integration/minifi/__init__.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py
index 557b9a8..2d83126 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -28,7 +28,7 @@ class Cluster(object):
     Docker swarms, or cloud compute/container services.
     """
 
-    def deploy_flow(self, flow):
+    def deploy_flow(self, flow, name=None, vols=None):
         """
         Deploys a flow to the cluster.
         """
@@ -44,6 +44,7 @@ class Cluster(object):
         Clean up ephemeral cluster resources.
         """
 
+
 class SingleNodeDockerCluster(Cluster):
     """
     A "cluster" which consists of a single docker node. Useful for
@@ -58,12 +59,15 @@ class SingleNodeDockerCluster(Cluster):
         # Get docker client
         self.client = docker.from_env()
 
-    def deploy_flow(self, flow, name=None, vols={}):
+    def deploy_flow(self, flow, name=None, vols=None):
         """
         Compiles the flow to YAML and maps it into the container using
         the docker volumes API.
         """
 
+        if vols is None:
+            vols = {}
+
         logging.info('Deploying flow...')
 
         if name is None:
@@ -99,11 +103,11 @@ class SingleNodeDockerCluster(Cluster):
             self.network = self.client.networks.create(net_name)
 
         container = self.client.containers.run(
-            'apacheminificpp:' + minifi_version,
-            detach=True,
-            name=name,
-            network=self.network.name,
-            volumes=local_vols)
+                'apacheminificpp:' + minifi_version,
+                detach=True,
+                name=name,
+                network=self.network.name,
+                volumes=local_vols)
 
         logging.info('Started container \'%s\'', container.name)
         self.containers.append(container)
@@ -135,13 +139,26 @@ class SingleNodeDockerCluster(Cluster):
 
 
 class Processor(object):
-
     def __init__(self,
                  clazz,
-                 properties={},
-                 schedule={},
+                 properties=None,
+                 schedule=None,
                  name=None,
-                 auto_terminate=[]):
+                 auto_terminate=None,
+                 controller_services=None):
+
+        if controller_services is None:
+            controller_services = []
+
+        if auto_terminate is None:
+            auto_terminate = []
+
+        if schedule is None:
+            schedule = {}
+
+        if properties is None:
+            properties = {}
+
         self.connections = {}
         self.uuid = uuid.uuid4()
 
@@ -151,6 +168,7 @@ class Processor(object):
         self.clazz = clazz
         self.properties = properties
         self.auto_terminate = auto_terminate
+        self.controller_services = controller_services
 
         self.out_proc = self
 
@@ -185,15 +203,13 @@ class Processor(object):
 
         """
 
-        if (isinstance(other, tuple)):
-            if (isinstance(other[0], tuple)):
+        if isinstance(other, tuple):
+            if isinstance(other[0], tuple):
                 for rel_tuple in other:
-                    rel = {}
-                    rel[rel_tuple[0]] = rel_tuple[1]
+                    rel = {rel_tuple[0]: rel_tuple[1]}
                     self.out_proc.connect(rel)
             else:
-                rel = {}
-                rel[other[0]] = other[1]
+                rel = {other[0]: other[1]}
                 self.out_proc.connect(rel)
         else:
             self.out_proc.connect({'success': other})
@@ -202,10 +218,20 @@ class Processor(object):
         return self
 
 
-def InvokeHTTP(url, method='GET'):
+def InvokeHTTP(url,
+               method='GET',
+               ssl_context_service=None):
+    properties = {'Remote URL': url, 'HTTP Method': method}
+
+    controller_services = []
+
+    if ssl_context_service is not None:
+        properties['SSL Context Service'] = ssl_context_service.name
+        controller_services.append(ssl_context_service)
+
     return Processor('InvokeHTTP',
-                     properties={'Remote URL': url,
-                                 'HTTP Method': method},
+                     properties=properties,
+                     controller_services=controller_services,
                      auto_terminate=['success',
                                      'response',
                                      'retry',
@@ -213,9 +239,14 @@ def InvokeHTTP(url, method='GET'):
                                      'no retry'])
 
 
-def ListenHTTP(port):
+def ListenHTTP(port, cert=None):
+    properties = {'Listening Port': port}
+
+    if cert is not None:
+        properties['SSL Certificate'] = cert
+
     return Processor('ListenHTTP',
-                     properties={'Listening Port': port},
+                     properties=properties,
                      auto_terminate=['success'])
 
 
@@ -237,7 +268,42 @@ def PutFile(output_dir):
                      auto_terminate=['success', 'failure'])
 
 
-def flow_yaml(processor, root=None, visited=[]):
+class ControllerService(object):
+    def __init__(self, name=None, properties=None):
+
+        self.id = str(uuid.uuid4())
+
+        if name is None:
+            self.name = str(uuid.uuid4())
+            logging.info('Controller service name was not provided; using generated name \'%s\'', self.name)
+        else:
+            self.name = name
+
+        if properties is None:
+            properties = {}
+
+        self.properties = properties
+
+
+class SSLContextService(ControllerService):
+    def __init__(self, name=None, cert=None, key=None, ca_cert=None):
+        super(SSLContextService, self).__init__(name=name)
+
+        self.service_class = 'SSLContextService'
+
+        if cert is not None:
+            self.properties['Client Certificate'] = cert
+
+        if key is not None:
+            self.properties['Private Key'] = key
+
+        if ca_cert is not None:
+            self.properties['CA Certificate'] = ca_cert
+
+
+def flow_yaml(processor, root=None, visited=None):
+    if visited is None:
+        visited = []
 
     if root is None:
         res = {
@@ -246,7 +312,8 @@ def flow_yaml(processor, root=None, visited=[]):
             },
             'Processors': [],
             'Connections': [],
-            'Remote Processing Groups': []
+            'Remote Processing Groups': [],
+            'Controller Services': []
         }
     else:
         res = root
@@ -271,6 +338,18 @@ def flow_yaml(processor, root=None, visited=[]):
         'auto-terminated relationships list': processor.auto_terminate
     })
 
+    for svc in processor.controller_services:
+        if svc in visited:
+            continue
+
+        visited.append(svc)
+        res['Controller Services'].append({
+            'name': svc.name,
+            'id': svc.id,
+            'class': svc.service_class,
+            'Properties': svc.properties
+        })
+
     for conn_name in processor.connections:
         conn_procs = processor.connections[conn_name]
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6283a447/docker/test/integration/minifi/test/__init__.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py
index a7a3030..8c0aee9 100644
--- a/docker/test/integration/minifi/test/__init__.py
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -13,24 +13,28 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import os
 import logging
 import shutil
 import uuid
-
 from threading import Event
 
+import os
 from os import listdir
-from os.path import isfile, join
-
-from watchdog.observers import Observer
+from os.path import join
 from watchdog.events import FileSystemEventHandler
+from watchdog.observers import Observer
 
 from minifi import SingleNodeDockerCluster
 
 logging.basicConfig(level=logging.DEBUG)
 
 
+def put_file_contents(contents, file_abs_path):
+    logging.info('Writing %d bytes of content to file: %s', len(contents), file_abs_path)
+    with open(file_abs_path, 'w') as test_input_file:
+        test_input_file.write(contents)
+
+
 class DockerTestCluster(SingleNodeDockerCluster):
 
     def __init__(self, output_validator):
@@ -40,11 +44,14 @@ class DockerTestCluster(SingleNodeDockerCluster):
 
         self.tmp_test_output_dir = '/tmp/.minifi-test-output.' + test_cluster_id
         self.tmp_test_input_dir = '/tmp/.minifi-test-input.' + test_cluster_id
+        self.tmp_test_resources_dir = '/tmp/.minifi-test-resources.' + test_cluster_id
 
         logging.info('Creating tmp test input dir: %s', self.tmp_test_input_dir)
         os.makedirs(self.tmp_test_input_dir, mode=0777)
         logging.info('Creating tmp test output dir: %s', self.tmp_test_output_dir)
         os.makedirs(self.tmp_test_output_dir, mode=0777)
+        logging.info('Creating tmp test resource dir: %s', self.tmp_test_resources_dir)
+        os.makedirs(self.tmp_test_resources_dir, mode=0777)
 
         # Point output validator to ephemeral output dir
         self.output_validator = output_validator
@@ -59,15 +66,15 @@ class DockerTestCluster(SingleNodeDockerCluster):
 
         super(DockerTestCluster, self).__init__()
 
-    def deploy_flow(self, flow, name=None):
+    def deploy_flow(self, flow, name=None, vols=None):
         """
         Performs a standard container flow deployment with the addition
         of volumes supporting test input/output directories.
         """
 
-        vols = {}
-        vols[self.tmp_test_input_dir] = {'bind': '/tmp/input', 'mode': 'rw'}
-        vols[self.tmp_test_output_dir] = {'bind': '/tmp/output', 'mode': 'rw'}
+        vols = {self.tmp_test_input_dir: {'bind': '/tmp/input', 'mode': 'rw'},
+                self.tmp_test_output_dir: {'bind': '/tmp/output', 'mode': 'rw'},
+                self.tmp_test_resources_dir: {'bind': '/tmp/resources', 'mode': 'rw'}}
 
         super(DockerTestCluster, self).deploy_flow(flow, vols=vols, name=name)
 
@@ -77,11 +84,18 @@ class DockerTestCluster(SingleNodeDockerCluster):
         the given content to it.
         """
 
-        test_file_name = join(self.tmp_test_input_dir, str(uuid.uuid4()))
-        logging.info('Writing %d bytes of content to test file: %s', len(contents), test_file_name)
+        file_name = str(uuid.uuid4())
+        file_abs_path = join(self.tmp_test_input_dir, file_name)
+        put_file_contents(contents, file_abs_path)
+
+    def put_test_resource(self, file_name, contents):
+        """
+        Creates a resource file in the test resource dir and writes
+        the given content to it.
+        """
 
-        with open(test_file_name, 'w') as test_input_file:
-            test_input_file.write(contents)
+        file_abs_path = join(self.tmp_test_resources_dir, file_name)
+        put_file_contents(contents, file_abs_path)
 
     def wait_for_output(self, timeout_seconds):
         logging.info('Waiting up to %d seconds for test output...', timeout_seconds)
@@ -122,6 +136,8 @@ class DockerTestCluster(SingleNodeDockerCluster):
         shutil.rmtree(self.tmp_test_input_dir)
         logging.info('Removing tmp test output dir: %s', self.tmp_test_output_dir)
         shutil.rmtree(self.tmp_test_output_dir)
+        logging.info('Removing tmp test resources dir: %s', self.tmp_test_output_dir)
+        shutil.rmtree(self.tmp_test_resources_dir)
 
         super(DockerTestCluster, self).__exit__(exc_type, exc_val, exc_tb)
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6283a447/docker/test/integration/test_https.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/test_https.py b/docker/test/integration/test_https.py
new file mode 100644
index 0000000..9912370
--- /dev/null
+++ b/docker/test/integration/test_https.py
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+
+from M2Crypto import X509, EVP, RSA, ASN1
+
+from minifi import *
+from minifi.test import *
+
+
+def callback():
+    pass
+
+
+def test_invoke_listen_https_one_way():
+    """
+    Verify sending using InvokeHTTP to a receiver using ListenHTTP (with TLS).
+    """
+
+    cert, key = gen_cert()
+
+    # TODO define SSLContextService class & generate config yml for services
+    crt_file = '/tmp/resources/test-crt.pem'
+
+    invoke_flow = (GetFile('/tmp/input')
+                   >> LogAttribute()
+                   >> InvokeHTTP('https://minifi-listen:4430/contentListener',
+                                 method='POST',
+                                 ssl_context_service=SSLContextService(ca_cert=crt_file)))
+
+    listen_flow = (ListenHTTP(4430, cert=crt_file)
+                   >> LogAttribute()
+                   >> PutFile('/tmp/output'))
+
+    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+        cluster.put_test_resource('test-crt.pem', cert.as_pem() + key.as_pem(None, callback))
+        cluster.put_test_data('test')
+        cluster.deploy_flow(listen_flow, name='minifi-listen')
+        cluster.deploy_flow(invoke_flow, name='minifi-invoke')
+
+        assert cluster.check_output()
+
+
+def gen_cert():
+    """
+    Generate TLS certificate request for testing
+    """
+
+    req, key = gen_req()
+    pub_key = req.get_pubkey()
+    subject = req.get_subject()
+    cert = X509.X509()
+    # noinspection PyTypeChecker
+    cert.set_serial_number(1)
+    cert.set_version(2)
+    cert.set_subject(subject)
+    t = long(time.time())
+    now = ASN1.ASN1_UTCTIME()
+    now.set_time(t)
+    now_plus_year = ASN1.ASN1_UTCTIME()
+    now_plus_year.set_time(t + 60 * 60 * 24 * 365)
+    cert.set_not_before(now)
+    cert.set_not_after(now_plus_year)
+    issuer = X509.X509_Name()
+    issuer.C = 'US'
+    issuer.CN = 'minifi-listen'
+    cert.set_issuer(issuer)
+    cert.set_pubkey(pub_key)
+    cert.sign(key, 'sha256')
+
+    return cert, key
+
+
+def gen_req():
+    """
+    Generate TLS certificate request for testing
+    """
+
+    logging.info('Generating test certificate request')
+    key = EVP.PKey()
+    req = X509.Request()
+    rsa = RSA.gen_key(1024, 65537, callback)
+    key.assign_rsa(rsa)
+    req.set_pubkey(key)
+    name = req.get_subject()
+    name.C = 'US'
+    name.CN = 'minifi-listen'
+    req.sign(key, 'sha256')
+
+    return req, key