You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/09/29 17:17:50 UTC

nifi-minifi-cpp git commit: MINIFI-398 Added test framework support for s2s and added s2s integration test between minifi-cpp and nifi.

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master aaeb929b2 -> 7c24ed7e6


MINIFI-398 Added test framework support for s2s and added s2s integration test between minifi-cpp and nifi.

This closes #136.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/7c24ed7e
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/7c24ed7e
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/7c24ed7e

Branch: refs/heads/master
Commit: 7c24ed7e60b4e553aadba913e83760ac80f95d33
Parents: aaeb929
Author: Andrew I. Christianson <an...@andyic.org>
Authored: Thu Sep 14 13:47:20 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Fri Sep 29 13:17:24 2017 -0400

----------------------------------------------------------------------
 docker/test/integration/README.md               |  33 +
 docker/test/integration/minifi/__init__.py      | 806 +++++++++++++++----
 docker/test/integration/minifi/test/__init__.py |  49 +-
 docker/test/integration/test_filesystem_ops.py  |  11 +-
 docker/test/integration/test_http.py            |   6 +-
 docker/test/integration/test_s2s.py             |  40 +
 6 files changed, 783 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c24ed7e/docker/test/integration/README.md
----------------------------------------------------------------------
diff --git a/docker/test/integration/README.md b/docker/test/integration/README.md
index 6d8c066..c4fb9f4 100644
--- a/docker/test/integration/README.md
+++ b/docker/test/integration/README.md
@@ -64,6 +64,32 @@ The following processors/parameters are supported:
 - method='GET'
 - ssl\_context\_service=None
 
+#### Remote Process Groups
+
+Remote process groups and input ports are supported.
+
+**Example InputPort/RemoteProcessGroup:**
+
+```python
+port = InputPort('from-minifi', RemoteProcessGroup('http://nifi:8080/nifi'))
+```
+
+InputPorts may be used as inputs or outputs in the flow DSL:
+
+```python
+recv_flow = (port
+             >> LogAttribute()
+             >> PutFile('/tmp/output'))
+
+send_flow = (GetFile('/tmp/input')
+             >> LogAttribute()
+             >> port)
+```
+
+These example flows could be deployed as separate NiFi/MiNiFi instances where
+the send\_flow would send data to the recv\_flow using the site-to-site
+protocol.
+
 ### Definition of an output validator
 
 The output validator is responsible for checking the state of a cluster for
@@ -148,6 +174,13 @@ be used.
 cluster.deploy_flow(flow, name='test-flow')
 ```
 
+The deploy\_flow function defaults to a MiNiFi - C++ engine, but other engines,
+such as NiFi may be used:
+
+```python
+cluster.deploy_flow(flow, engine='nifi')
+```
+
 ### Execution of one or more flows
 
 Flows are executed immediately upon deployment and according to schedule

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c24ed7e/docker/test/integration/minifi/__init__.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py
index 2d83126..36a1b4f 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -12,13 +12,20 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import os
-import uuid
+import gzip
 import logging
+import tarfile
+import uuid
+import xml.etree.cElementTree as elementTree
+from xml.etree.cElementTree import Element
+from StringIO import StringIO
+from io import BytesIO
+from textwrap import dedent
 
-import yaml
 import docker
+import os
+import yaml
+from copy import copy
 
 
 class Cluster(object):
@@ -52,66 +59,181 @@ class SingleNodeDockerCluster(Cluster):
     """
 
     def __init__(self):
+        self.minifi_version = os.environ['MINIFI_VERSION']
+        self.nifi_version = '1.4.0-SNAPSHOT'
+        self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version
+        self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version
         self.network = None
         self.containers = []
+        self.images = []
         self.tmp_files = []
 
         # Get docker client
         self.client = docker.from_env()
 
-    def deploy_flow(self, flow, name=None, vols=None):
+    def deploy_flow(self,
+                    flow,
+                    name=None,
+                    vols=None,
+                    engine='minifi-cpp'):
         """
-        Compiles the flow to YAML and maps it into the container using
-        the docker volumes API.
+        Compiles the flow to a valid config file and overlays it into a new image.
         """
 
         if vols is None:
             vols = {}
 
-        logging.info('Deploying flow...')
+        logging.info('Deploying %s flow...', engine)
 
         if name is None:
-            name = 'minifi-' + str(uuid.uuid4())
+            name = engine + '-' + str(uuid.uuid4())
             logging.info('Flow name was not provided; using generated name \'%s\'', name)
 
-        minifi_version = os.environ['MINIFI_VERSION']
-        self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + minifi_version
+        # Create network if necessary
+        if self.network is None:
+            net_name = 'nifi-' + str(uuid.uuid4())
+            logging.info('Creating network: %s', net_name)
+            self.network = self.client.networks.create(net_name)
+
+        if engine == 'nifi':
+            self.deploy_nifi_flow(flow, name, vols)
+        elif engine == 'minifi-cpp':
+            self.deploy_minifi_cpp_flow(flow, name, vols)
+        else:
+            raise Exception('invalid flow engine: \'%s\'' % engine)
+
+    def deploy_minifi_cpp_flow(self, flow, name, vols):
 
-        # Write flow config
-        tmp_flow_file_name = '/tmp/.minifi-flow.' + str(uuid.uuid4())
-        self.tmp_files.append(tmp_flow_file_name)
+        # Build configured image
+        dockerfile = dedent("""FROM {base_image}
+                USER root
+                ADD config.yml {minifi_root}/conf/config.yml
+                RUN chown minificpp:minificpp {minifi_root}/conf/config.yml
+                USER minificpp
+                """.format(name=name,
+                           base_image='apacheminificpp:' + self.minifi_version,
+                           minifi_root=self.minifi_root))
 
-        yaml = flow_yaml(flow)
+        test_flow_yaml = minifi_flow_yaml(flow)
+        logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
 
-        logging.info('Using generated flow config yml:\n%s', yaml)
+        conf_file_buffer = StringIO()
 
-        with open(tmp_flow_file_name, 'w') as tmp_flow_file:
-            tmp_flow_file.write(yaml)
+        try:
+            conf_file_buffer.write(test_flow_yaml)
+            conf_file_len = conf_file_buffer.tell()
+            conf_file_buffer.seek(0)
 
-        conf_file = tmp_flow_file_name
+            context_files = [
+                {
+                    'name': 'config.yml',
+                    'size': conf_file_len,
+                    'file_obj': conf_file_buffer
+                }
+            ]
 
-        local_vols = {}
-        local_vols[conf_file] = {'bind': self.minifi_root + '/conf/config.yml', 'mode': 'ro'}
-        local_vols.update(vols)
+            configured_image = self.build_image(dockerfile, context_files)
+
+        finally:
+            conf_file_buffer.close()
 
         logging.info('Creating and running docker container for flow...')
 
-        # Create network if necessary
-        if self.network is None:
-            net_name = 'minifi-' + str(uuid.uuid4())
-            logging.info('Creating network: %s', net_name)
-            self.network = self.client.networks.create(net_name)
+        container = self.client.containers.run(
+                configured_image,
+                detach=True,
+                name=name,
+                network=self.network.name,
+                volumes=vols)
+
+        logging.info('Started container \'%s\'', container.name)
+
+        self.containers.append(container)
+
+    def deploy_nifi_flow(self, flow, name, vols):
+        dockerfile = dedent("""FROM {base_image}
+                USER root
+                ADD flow.xml.gz {nifi_root}/conf/flow.xml.gz
+                RUN chown nifi:nifi {nifi_root}/conf/flow.xml.gz
+                RUN sed -i -e 's/^\(nifi.remote.input.host\)=.*/\\1={name}/' {nifi_root}/conf/nifi.properties
+                RUN sed -i -e 's/^\(nifi.remote.input.socket.port\)=.*/\\1=5000/' {nifi_root}/conf/nifi.properties
+                USER nifi
+                """.format(name=name,
+                           base_image='apachenifi:' + self.nifi_version,
+                           nifi_root=self.nifi_root))
+
+        test_flow_xml = nifi_flow_xml(flow, self.nifi_version)
+        logging.info('Using generated flow config xml:\n%s', test_flow_xml)
+
+        conf_file_buffer = BytesIO()
+
+        try:
+            with gzip.GzipFile(mode='wb', fileobj=conf_file_buffer) as conf_gz_file_buffer:
+                conf_gz_file_buffer.write(test_flow_xml)
+            conf_file_len = conf_file_buffer.tell()
+            conf_file_buffer.seek(0)
+
+            context_files = [
+                {
+                    'name': 'flow.xml.gz',
+                    'size': conf_file_len,
+                    'file_obj': conf_file_buffer
+                }
+            ]
+
+            configured_image = self.build_image(dockerfile, context_files)
+
+        finally:
+            conf_file_buffer.close()
+
+        logging.info('Creating and running docker container for flow...')
 
         container = self.client.containers.run(
-                'apacheminificpp:' + minifi_version,
+                configured_image,
                 detach=True,
                 name=name,
                 network=self.network.name,
-                volumes=local_vols)
+                volumes=vols)
 
         logging.info('Started container \'%s\'', container.name)
+
         self.containers.append(container)
 
+    def build_image(self, dockerfile, context_files):
+        conf_dockerfile_buffer = StringIO()
+        docker_context_buffer = BytesIO()
+
+        try:
+            # Overlay conf onto base nifi image
+            conf_dockerfile_buffer.write(dockerfile)
+            conf_dockerfile_buffer.seek(0)
+
+            with tarfile.open(mode='w', fileobj=docker_context_buffer) as docker_context:
+                dockerfile_info = tarfile.TarInfo('Dockerfile')
+                dockerfile_info.size = conf_dockerfile_buffer.len
+                docker_context.addfile(dockerfile_info,
+                                       fileobj=conf_dockerfile_buffer)
+
+                for context_file in context_files:
+                    file_info = tarfile.TarInfo(context_file['name'])
+                    file_info.size = context_file['size']
+                    docker_context.addfile(file_info,
+                                           fileobj=context_file['file_obj'])
+            docker_context_buffer.seek(0)
+
+            logging.info('Creating configured image...')
+            configured_image = self.client.images.build(fileobj=docker_context_buffer,
+                                                        custom_context=True,
+                                                        rm=True,
+                                                        forcerm=True)
+            self.images.append(configured_image)
+
+        finally:
+            conf_dockerfile_buffer.close()
+            docker_context_buffer.close()
+
+        return configured_image
+
     def __enter__(self):
         """
         Allocate ephemeral cluster resources.
@@ -128,6 +250,11 @@ class SingleNodeDockerCluster(Cluster):
             logging.info('Cleaning up container: %s', container.name)
             container.remove(v=True, force=True)
 
+        # Clean up images
+        for image in self.images:
+            logging.info('Cleaning up image: %s', image.id)
+            self.client.images.remove(image.id, force=True)
+
         # Clean up network
         if self.network is not None:
             logging.info('Cleaning up network network: %s', self.network.name)
@@ -138,49 +265,26 @@ class SingleNodeDockerCluster(Cluster):
             os.remove(tmp_file)
 
 
-class Processor(object):
+class Connectable(object):
     def __init__(self,
-                 clazz,
-                 properties=None,
-                 schedule=None,
                  name=None,
-                 auto_terminate=None,
-                 controller_services=None):
-
-        if controller_services is None:
-            controller_services = []
+                 auto_terminate=None):
 
-        if auto_terminate is None:
-            auto_terminate = []
-
-        if schedule is None:
-            schedule = {}
-
-        if properties is None:
-            properties = {}
-
-        self.connections = {}
         self.uuid = uuid.uuid4()
 
         if name is None:
             self.name = str(self.uuid)
+        else:
+            self.name = name
 
-        self.clazz = clazz
-        self.properties = properties
-        self.auto_terminate = auto_terminate
-        self.controller_services = controller_services
+        if auto_terminate is None:
+            self.auto_terminate = []
+        else:
+            self.auto_terminate = auto_terminate
 
+        self.connections = {}
         self.out_proc = self
 
-        self.schedule = {
-            'scheduling strategy': 'EVENT_DRIVEN',
-            'scheduling period': '1 sec',
-            'penalization period': '30 sec',
-            'yield period': '1 sec',
-            'run duration nanos': 0
-        }
-        self.schedule.update(schedule)
-
     def connect(self, connections):
         for rel in connections:
 
@@ -189,7 +293,7 @@ class Processor(object):
                 del self.auto_terminate[self.auto_terminate.index(rel)]
 
             # Add to set of output connections for this rel
-            if not rel in self.connections:
+            if rel not in self.connections:
                 self.connections[rel] = []
             self.connections[rel].append(connections[rel])
 
@@ -203,69 +307,154 @@ class Processor(object):
 
         """
 
+        connected = copy(self)
+        connected.connections = copy(self.connections)
+
+        if self.out_proc is self:
+            connected.out_proc = connected
+        else:
+            connected.out_proc = copy(connected.out_proc)
+
         if isinstance(other, tuple):
             if isinstance(other[0], tuple):
                 for rel_tuple in other:
                     rel = {rel_tuple[0]: rel_tuple[1]}
-                    self.out_proc.connect(rel)
+                    connected.out_proc.connect(rel)
             else:
                 rel = {other[0]: other[1]}
-                self.out_proc.connect(rel)
+                connected.out_proc.connect(rel)
         else:
-            self.out_proc.connect({'success': other})
-            self.out_proc = other
+            connected.out_proc.connect({'success': other})
+            connected.out_proc = other
 
-        return self
+        return connected
 
 
-def InvokeHTTP(url,
-               method='GET',
-               ssl_context_service=None):
-    properties = {'Remote URL': url, 'HTTP Method': method}
+class Processor(Connectable):
+    def __init__(self,
+                 clazz,
+                 properties=None,
+                 schedule=None,
+                 name=None,
+                 controller_services=None,
+                 auto_terminate=None):
 
-    controller_services = []
+        super(Processor, self).__init__(name=name,
+                                        auto_terminate=auto_terminate)
 
-    if ssl_context_service is not None:
-        properties['SSL Context Service'] = ssl_context_service.name
-        controller_services.append(ssl_context_service)
+        if controller_services is None:
+            controller_services = []
 
-    return Processor('InvokeHTTP',
-                     properties=properties,
-                     controller_services=controller_services,
-                     auto_terminate=['success',
-                                     'response',
-                                     'retry',
-                                     'failure',
-                                     'no retry'])
+        if schedule is None:
+            schedule = {}
+
+        if properties is None:
+            properties = {}
+
+        if name is None:
+            pass
+
+        self.clazz = clazz
+        self.properties = properties
+        self.controller_services = controller_services
+
+        self.schedule = {
+            'scheduling strategy': 'EVENT_DRIVEN',
+            'scheduling period': '1 sec',
+            'penalization period': '30 sec',
+            'yield period': '1 sec',
+            'run duration nanos': 0
+        }
+        self.schedule.update(schedule)
+
+    def nifi_property_key(self, key):
+        """
+        Returns the Apache NiFi-equivalent property key for the given key. This is often, but not always, the same as
+        the internal key.
+        """
+        return key
 
 
-def ListenHTTP(port, cert=None):
-    properties = {'Listening Port': port}
+class InvokeHTTP(Processor):
+    def __init__(self, url,
+                 method='GET',
+                 ssl_context_service=None):
+        properties = {'Remote URL': url, 'HTTP Method': method}
 
-    if cert is not None:
-        properties['SSL Certificate'] = cert
+        controller_services = []
 
-    return Processor('ListenHTTP',
-                     properties=properties,
-                     auto_terminate=['success'])
+        if ssl_context_service is not None:
+            properties['SSL Context Service'] = ssl_context_service.name
+            controller_services.append(ssl_context_service)
 
+        super(InvokeHTTP, self).__init__('InvokeHTTP',
+                                         properties=properties,
+                                         controller_services=controller_services,
+                                         auto_terminate=['success',
+                                                         'response',
+                                                         'retry',
+                                                         'failure',
+                                                         'no retry'])
 
-def LogAttribute():
-    return Processor('LogAttribute',
-                     auto_terminate=['success'])
 
+class ListenHTTP(Processor):
+    def __init__(self, port, cert=None):
+        properties = {'Listening Port': port}
 
-def GetFile(input_dir):
-    return Processor('GetFile',
-                     properties={'Input Directory': input_dir},
-                     schedule={'scheduling period': '0 sec'},
-                     auto_terminate=['success'])
+        if cert is not None:
+            properties['SSL Certificate'] = cert
+
+        super(ListenHTTP, self).__init__('ListenHTTP',
+                                         properties=properties,
+                                         auto_terminate=['success'])
+
+
+class LogAttribute(Processor):
+    def __init__(self, ):
+        super(LogAttribute, self).__init__('LogAttribute',
+                                           auto_terminate=['success'])
+
+
+class GetFile(Processor):
+    def __init__(self, input_dir):
+        super(GetFile, self).__init__('GetFile',
+                                      properties={'Input Directory': input_dir},
+                                      schedule={'scheduling period': '0 sec'},
+                                      auto_terminate=['success'])
+
+
+class PutFile(Processor):
+    def __init__(self, output_dir):
+        super(PutFile, self).__init__('PutFile',
+                                      properties={'Output Directory': output_dir},
+                                      auto_terminate=['success', 'failure'])
+
+    def nifi_property_key(self, key):
+        if key == 'Output Directory':
+            return 'Directory'
+        else:
+            return key
 
 
-def PutFile(output_dir):
-    return Processor('PutFile',
-                     properties={'Output Directory': output_dir},
-                     auto_terminate=['success', 'failure'])
+class InputPort(Connectable):
+    def __init__(self, name=None, remote_process_group=None):
+        super(InputPort, self).__init__(name=name)
+
+        self.remote_process_group = remote_process_group
+
+
+class RemoteProcessGroup(object):
+    def __init__(self, url,
+                 name=None):
+
+        self.uuid = uuid.uuid4()
+
+        if name is None:
+            self.name = str(self.uuid)
+        else:
+            self.name = name
+
+        self.url = url
 
 
 class ControllerService(object):
@@ -301,7 +490,7 @@ class SSLContextService(ControllerService):
             self.properties['CA Certificate'] = ca_cert
 
 
-def flow_yaml(processor, root=None, visited=None):
+def minifi_flow_yaml(connectable, root=None, visited=None):
     if visited is None:
         visited = []
 
@@ -318,60 +507,403 @@ def flow_yaml(processor, root=None, visited=None):
     else:
         res = root
 
-    visited.append(processor)
+    visited.append(connectable)
 
-    if hasattr(processor, 'name'):
-        proc_name = processor.name
+    if hasattr(connectable, 'name'):
+        connectable_name = connectable.name
     else:
-        proc_name = str(processor.uuid)
-
-    res['Processors'].append({
-        'name': proc_name,
-        'id': str(processor.uuid),
-        'class': 'org.apache.nifi.processors.standard.' + processor.clazz,
-        'scheduling strategy': processor.schedule['scheduling strategy'],
-        'scheduling period': processor.schedule['scheduling period'],
-        'penalization period': processor.schedule['penalization period'],
-        'yield period': processor.schedule['yield period'],
-        'run duration nanos': processor.schedule['run duration nanos'],
-        'Properties': processor.properties,
-        'auto-terminated relationships list': processor.auto_terminate
-    })
-
-    for svc in processor.controller_services:
-        if svc in visited:
-            continue
-
-        visited.append(svc)
-        res['Controller Services'].append({
-            'name': svc.name,
-            'id': svc.id,
-            'class': svc.service_class,
-            'Properties': svc.properties
+        connectable_name = str(connectable.uuid)
+
+    if isinstance(connectable, InputPort):
+        group = connectable.remote_process_group
+        res_group = None
+
+        for res_group_candidate in res['Remote Processing Groups']:
+            assert isinstance(res_group_candidate, dict)
+            if res_group_candidate['id'] == str(group.uuid):
+                res_group = res_group_candidate
+
+        if res_group is None:
+            res_group = {
+                'name': group.name,
+                'id': str(group.uuid),
+                'url': group.url,
+                'timeout': '30 sec',
+                'yield period': '10 sec',
+                'Input Ports': []
+            }
+
+            res['Remote Processing Groups'].append(res_group)
+
+        res_group['Input Ports'].append({
+            'id': str(connectable.uuid),
+            'name': connectable.name,
+            'max concurrent tasks': 1,
+            'Properties': {}
+        })
+
+    if isinstance(connectable, Processor):
+        res['Processors'].append({
+            'name': connectable_name,
+            'id': str(connectable.uuid),
+            'class': 'org.apache.nifi.processors.standard.' + connectable.clazz,
+            'scheduling strategy': connectable.schedule['scheduling strategy'],
+            'scheduling period': connectable.schedule['scheduling period'],
+            'penalization period': connectable.schedule['penalization period'],
+            'yield period': connectable.schedule['yield period'],
+            'run duration nanos': connectable.schedule['run duration nanos'],
+            'Properties': connectable.properties,
+            'auto-terminated relationships list': connectable.auto_terminate
         })
 
-    for conn_name in processor.connections:
-        conn_procs = processor.connections[conn_name]
+        for svc in connectable.controller_services:
+            if svc in visited:
+                continue
+
+            visited.append(svc)
+            res['Controller Services'].append({
+                'name': svc.name,
+                'id': svc.id,
+                'class': svc.service_class,
+                'Properties': svc.properties
+            })
+
+    for conn_name in connectable.connections:
+        conn_procs = connectable.connections[conn_name]
 
         if isinstance(conn_procs, list):
             for proc in conn_procs:
                 res['Connections'].append({
                     'name': str(uuid.uuid4()),
-                    'source id': str(processor.uuid),
+                    'source id': str(connectable.uuid),
                     'source relationship name': conn_name,
                     'destination id': str(proc.uuid)
                 })
                 if proc not in visited:
-                    flow_yaml(proc, res, visited)
+                    minifi_flow_yaml(proc, res, visited)
         else:
             res['Connections'].append({
                 'name': str(uuid.uuid4()),
-                'source id': str(processor.uuid),
+                'source id': str(connectable.uuid),
                 'source relationship name': conn_name,
                 'destination id': str(conn_procs.uuid)
             })
             if conn_procs not in visited:
-                flow_yaml(conn_procs, res, visited)
+                minifi_flow_yaml(conn_procs, res, visited)
 
     if root is None:
         return yaml.dump(res, default_flow_style=False)
+
+
+def nifi_flow_xml(connectable, nifi_version=None, root=None, visited=None):
+    if visited is None:
+        visited = []
+
+    position = Element('position')
+    position.set('x', '0.0')
+    position.set('y', '0.0')
+
+    comment = Element('comment')
+    styles = Element('styles')
+    bend_points = Element('bendPoints')
+    label_index = Element('labelIndex')
+    label_index.text = '1'
+    z_index = Element('zIndex')
+    z_index.text = '0'
+
+    if root is None:
+        res = Element('flowController')
+        max_timer_driven_thread_count = Element('maxTimerDrivenThreadCount')
+        max_timer_driven_thread_count.text = '10'
+        res.append(max_timer_driven_thread_count)
+        max_event_driven_thread_count = Element('maxEventDrivenThreadCount')
+        max_event_driven_thread_count.text = '5'
+        res.append(max_event_driven_thread_count)
+        root_group = Element('rootGroup')
+        root_group_id = Element('id')
+        root_group_id_text = str(uuid.uuid4())
+        root_group_id.text = root_group_id_text
+        root_group.append(root_group_id)
+        root_group_name = Element('name')
+        root_group_name.text = root_group_id_text
+        root_group.append(root_group_name)
+        res.append(root_group)
+        root_group.append(position)
+        root_group.append(comment)
+        res.append(Element('controllerServices'))
+        res.append(Element('reportingTasks'))
+        res.set('encoding-version', '1.2')
+    else:
+        res = root
+
+    visited.append(connectable)
+
+    if hasattr(connectable, 'name'):
+        connectable_name_text = connectable.name
+    else:
+        connectable_name_text = str(connectable.uuid)
+
+    if isinstance(connectable, InputPort):
+        input_port = Element('inputPort')
+
+        input_port_id = Element('id')
+        input_port_id.text = str(connectable.uuid)
+        input_port.append(input_port_id)
+
+        input_port_name = Element('name')
+        input_port_name.text = connectable_name_text
+        input_port.append(input_port_name)
+
+        input_port.append(position)
+        input_port.append(comment)
+
+        input_port_scheduled_state = Element('scheduledState')
+        input_port_scheduled_state.text = 'RUNNING'
+        input_port.append(input_port_scheduled_state)
+
+        input_port_max_concurrent_tasks = Element('maxConcurrentTasks')
+        input_port_max_concurrent_tasks.text = '1'
+        input_port.append(input_port_max_concurrent_tasks)
+
+        res.iterfind('rootGroup').next().append(input_port)
+
+    if isinstance(connectable, Processor):
+        conn_destination = Element('processor')
+
+        proc_id = Element('id')
+        proc_id.text = str(connectable.uuid)
+        conn_destination.append(proc_id)
+
+        proc_name = Element('name')
+        proc_name.text = connectable_name_text
+        conn_destination.append(proc_name)
+
+        conn_destination.append(position)
+        conn_destination.append(styles)
+        conn_destination.append(comment)
+
+        proc_class = Element('class')
+        proc_class.text = 'org.apache.nifi.processors.standard.' + connectable.clazz
+        conn_destination.append(proc_class)
+
+        proc_bundle = Element('bundle')
+        proc_bundle_group = Element('group')
+        proc_bundle_group.text = 'org.apache.nifi'
+        proc_bundle.append(proc_bundle_group)
+        proc_bundle_artifact = Element('artifact')
+        proc_bundle_artifact.text = 'nifi-standard-nar'
+        proc_bundle.append(proc_bundle_artifact)
+        proc_bundle_version = Element('version')
+        proc_bundle_version.text = nifi_version
+        proc_bundle.append(proc_bundle_version)
+        conn_destination.append(proc_bundle)
+
+        proc_max_concurrent_tasks = Element('maxConcurrentTasks')
+        proc_max_concurrent_tasks.text = '1'
+        conn_destination.append(proc_max_concurrent_tasks)
+
+        proc_scheduling_period = Element('schedulingPeriod')
+        proc_scheduling_period.text = connectable.schedule['scheduling period']
+        conn_destination.append(proc_scheduling_period)
+
+        proc_penalization_period = Element('penalizationPeriod')
+        proc_penalization_period.text = connectable.schedule['penalization period']
+        conn_destination.append(proc_penalization_period)
+
+        proc_yield_period = Element('yieldPeriod')
+        proc_yield_period.text = connectable.schedule['yield period']
+        conn_destination.append(proc_yield_period)
+
+        proc_bulletin_level = Element('bulletinLevel')
+        proc_bulletin_level.text = 'WARN'
+        conn_destination.append(proc_bulletin_level)
+
+        proc_loss_tolerant = Element('lossTolerant')
+        proc_loss_tolerant.text = 'false'
+        conn_destination.append(proc_loss_tolerant)
+
+        proc_scheduled_state = Element('scheduledState')
+        proc_scheduled_state.text = 'RUNNING'
+        conn_destination.append(proc_scheduled_state)
+
+        proc_scheduling_strategy = Element('schedulingStrategy')
+        proc_scheduling_strategy.text = connectable.schedule['scheduling strategy']
+        conn_destination.append(proc_scheduling_strategy)
+
+        proc_execution_node = Element('executionNode')
+        proc_execution_node.text = 'ALL'
+        conn_destination.append(proc_execution_node)
+
+        proc_run_duration_nanos = Element('runDurationNanos')
+        proc_run_duration_nanos.text = str(connectable.schedule['run duration nanos'])
+        conn_destination.append(proc_run_duration_nanos)
+
+        for property_key, property_value in connectable.properties.iteritems():
+            proc_property = Element('property')
+            proc_property_name = Element('name')
+            proc_property_name.text = connectable.nifi_property_key(property_key)
+            proc_property.append(proc_property_name)
+            proc_property_value = Element('value')
+            proc_property_value.text = property_value
+            proc_property.append(proc_property_value)
+            conn_destination.append(proc_property)
+
+        for auto_terminate_rel in connectable.auto_terminate:
+            proc_auto_terminated_relationship = Element('autoTerminatedRelationship')
+            proc_auto_terminated_relationship.text = auto_terminate_rel
+            conn_destination.append(proc_auto_terminated_relationship)
+
+        res.iterfind('rootGroup').next().append(conn_destination)
+
+        for svc in connectable.controller_services:
+            if svc in visited:
+                continue
+
+            visited.append(svc)
+            controller_service = Element('controllerService')
+
+            controller_service_id = Element('id')
+            controller_service_id.text = str(svc.id)
+            controller_service.append(controller_service_id)
+
+            controller_service_name = Element('name')
+            controller_service_name.text = svc.name
+            controller_service.append(controller_service_name)
+
+            controller_service.append(comment)
+
+            controller_service_class = Element('class')
+            controller_service_class.text = svc.service_class,
+            controller_service.append(controller_service_class)
+
+            controller_service_bundle = Element('bundle')
+            controller_service_bundle_group = Element('group')
+            controller_service_bundle_group.text = svc.group
+            controller_service_bundle.append(controller_service_bundle_group)
+            controller_service_bundle_artifact = Element('artifact')
+            controller_service_bundle_artifact.text = svc.artifact
+            controller_service_bundle.append(controller_service_bundle_artifact)
+            controller_service_bundle_version = Element('version')
+            controller_service_bundle_version.text = nifi_version
+            controller_service_bundle.append(controller_service_bundle_version)
+            controller_service.append(controller_service_bundle)
+
+            controller_enabled = Element('enabled')
+            controller_enabled.text = 'true',
+            controller_service.append(controller_enabled)
+
+            for property_key, property_value in svc.properties:
+                controller_service_property = Element('property')
+                controller_service_property_name = Element('name')
+                controller_service_property_name.text = property_key
+                controller_service_property.append(controller_service_property_name)
+                controller_service_property_value = Element('value')
+                controller_service_property_value.text = property_value
+                controller_service_property.append(controller_service_property_value)
+                controller_service.append(controller_service_property)
+
+            res.iterfind('rootGroup').next().append(controller_service)
+
+    for conn_name in connectable.connections:
+        conn_destinations = connectable.connections[conn_name]
+
+        if isinstance(conn_destinations, list):
+            for conn_destination in conn_destinations:
+                connection = nifi_flow_xml_connection(res,
+                                                      bend_points,
+                                                      conn_name,
+                                                      connectable,
+                                                      label_index,
+                                                      conn_destination,
+                                                      z_index)
+
+                res.iterfind('rootGroup').next().append(connection)
+
+                if conn_destination not in visited:
+                    nifi_flow_xml(conn_destination, nifi_version, res, visited)
+        else:
+            connection = nifi_flow_xml_connection(res,
+                                                  bend_points,
+                                                  conn_name,
+                                                  connectable,
+                                                  label_index,
+                                                  conn_destinations,
+                                                  z_index)
+
+            res.iterfind('rootGroup').next().append(connection)
+
+            if conn_destinations not in visited:
+                nifi_flow_xml(conn_destinations, nifi_version, res, visited)
+
+    if root is None:
+        return ('<?xml version="1.0" encoding="UTF-8" standalone="no"?>'
+                + "\n"
+                + elementTree.tostring(res, encoding='utf-8'))
+
+
+def nifi_flow_xml_connection(res, bend_points, conn_name, connectable, label_index, destination, z_index):
+    connection = Element('connection')
+
+    connection_id = Element('id')
+    connection_id.text = str(uuid.uuid4())
+    connection.append(connection_id)
+
+    connection_name = Element('name')
+    connection.append(connection_name)
+
+    connection.append(bend_points)
+    connection.append(label_index)
+    connection.append(z_index)
+
+    connection_source_id = Element('sourceId')
+    connection_source_id.text = str(connectable.uuid)
+    connection.append(connection_source_id)
+
+    connection_source_group_id = Element('sourceGroupId')
+    connection_source_group_id.text = res.iterfind('rootGroup/id').next().text
+    connection.append(connection_source_group_id)
+
+    connection_source_type = Element('sourceType')
+    if isinstance(connectable, Processor):
+        connection_source_type.text = 'PROCESSOR'
+    elif isinstance(connectable, InputPort):
+        connection_source_type.text = 'INPUT_PORT'
+    else:
+        raise Exception('Unexpected source type: %s' % type(connectable))
+    connection.append(connection_source_type)
+
+    connection_destination_id = Element('destinationId')
+    connection_destination_id.text = str(destination.uuid)
+    connection.append(connection_destination_id)
+
+    connection_destination_group_id = Element('destinationGroupId')
+    connection_destination_group_id.text = res.iterfind('rootGroup/id').next().text
+    connection.append(connection_destination_group_id)
+
+    connection_destination_type = Element('destinationType')
+    if isinstance(destination, Processor):
+        connection_destination_type.text = 'PROCESSOR'
+    elif isinstance(destination, InputPort):
+        connection_destination_type.text = 'INPUT_PORT'
+    else:
+        raise Exception('Unexpected destination type: %s' % type(destination))
+    connection.append(connection_destination_type)
+
+    connection_relationship = Element('relationship')
+    if not isinstance(connectable, InputPort):
+        connection_relationship.text = conn_name
+    connection.append(connection_relationship)
+
+    connection_max_work_queue_size = Element('maxWorkQueueSize')
+    connection_max_work_queue_size.text = '10000'
+    connection.append(connection_max_work_queue_size)
+
+    connection_max_work_queue_data_size = Element('maxWorkQueueDataSize')
+    connection_max_work_queue_data_size.text = '1 GB'
+    connection.append(connection_max_work_queue_data_size)
+
+    connection_flow_file_expiration = Element('flowFileExpiration')
+    connection_flow_file_expiration.text = '0 sec'
+    connection.append(connection_flow_file_expiration)
+
+    return connection

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c24ed7e/docker/test/integration/minifi/test/__init__.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py
index 8c0aee9..dbb3888 100644
--- a/docker/test/integration/minifi/test/__init__.py
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -36,15 +36,14 @@ def put_file_contents(contents, file_abs_path):
 
 
 class DockerTestCluster(SingleNodeDockerCluster):
-
     def __init__(self, output_validator):
 
         # Create test input/output directories
         test_cluster_id = str(uuid.uuid4())
 
-        self.tmp_test_output_dir = '/tmp/.minifi-test-output.' + test_cluster_id
-        self.tmp_test_input_dir = '/tmp/.minifi-test-input.' + test_cluster_id
-        self.tmp_test_resources_dir = '/tmp/.minifi-test-resources.' + test_cluster_id
+        self.tmp_test_output_dir = '/tmp/.nifi-test-output.' + test_cluster_id
+        self.tmp_test_input_dir = '/tmp/.nifi-test-input.' + test_cluster_id
+        self.tmp_test_resources_dir = '/tmp/.nifi-test-resources.' + test_cluster_id
 
         logging.info('Creating tmp test input dir: %s', self.tmp_test_input_dir)
         os.makedirs(self.tmp_test_input_dir, mode=0777)
@@ -66,17 +65,27 @@ class DockerTestCluster(SingleNodeDockerCluster):
 
         super(DockerTestCluster, self).__init__()
 
-    def deploy_flow(self, flow, name=None, vols=None):
+    def deploy_flow(self,
+                    flow,
+                    name=None,
+                    vols=None,
+                    engine='minifi-cpp'):
         """
         Performs a standard container flow deployment with the addition
         of volumes supporting test input/output directories.
         """
 
-        vols = {self.tmp_test_input_dir: {'bind': '/tmp/input', 'mode': 'rw'},
-                self.tmp_test_output_dir: {'bind': '/tmp/output', 'mode': 'rw'},
-                self.tmp_test_resources_dir: {'bind': '/tmp/resources', 'mode': 'rw'}}
+        if vols is None:
+            vols = {}
+
+        vols[self.tmp_test_input_dir] = {'bind': '/tmp/input', 'mode': 'rw'}
+        vols[self.tmp_test_output_dir] = {'bind': '/tmp/output', 'mode': 'rw'}
+        vols[self.tmp_test_resources_dir] = {'bind': '/tmp/resources', 'mode': 'rw'}
 
-        super(DockerTestCluster, self).deploy_flow(flow, vols=vols, name=name)
+        super(DockerTestCluster, self).deploy_flow(flow,
+                                                   vols=vols,
+                                                   name=name,
+                                                   engine=engine)
 
     def put_test_data(self, contents):
         """
@@ -103,14 +112,22 @@ class DockerTestCluster(SingleNodeDockerCluster):
         self.observer.stop()
         self.observer.join()
 
-    def log_minifi_output(self):
+    def log_nifi_output(self):
 
         for container in self.containers:
             container = self.client.containers.get(container.id)
             logging.info('Container logs for container \'%s\':\n%s', container.name, container.logs())
             if container.status == 'running':
-                app_logs = container.exec_run('cat ' + self.minifi_root + '/minifi-app.log')
-                logging.info('MiNiFi app logs for container \'%s\':\n%s', container.name, app_logs)
+                minifi_app_logs = container.exec_run('/bin/sh -c \'test -f ' + self.minifi_root + '/minifi-app.log '
+                                                                                                '&& cat ' +
+                                                     self.minifi_root + '/minifi-app.log\'')
+                if len(minifi_app_logs) > 0:
+                    logging.info('MiNiFi app logs for container \'%s\':\n%s', container.name, minifi_app_logs)
+                nifi_app_logs = container.exec_run('/bin/sh -c \'test -f ' + self.nifi_root + '/logs/nifi-app.log '
+                                                                                            '&& cat ' +
+                                                   self.nifi_root + '/logs/nifi-app.log\'')
+                if len(nifi_app_logs) > 0:
+                    logging.info('NiFi app logs for container \'%s\':\n%s', container.name, nifi_app_logs)
             else:
                 logging.info(container.status)
                 logging.info('Could not cat app logs for container \'%s\' because it is not running',
@@ -118,12 +135,12 @@ class DockerTestCluster(SingleNodeDockerCluster):
             stats = container.stats(decode=True, stream=False)
             logging.info('Container stats:\n%s', repr(stats))
 
-    def check_output(self):
+    def check_output(self, timeout=5):
         """
         Wait for flow output, validate it, and log minifi output.
         """
-        self.wait_for_output(5)
-        self.log_minifi_output()
+        self.wait_for_output(timeout)
+        self.log_nifi_output()
 
         return self.output_validator.validate()
 
@@ -143,7 +160,6 @@ class DockerTestCluster(SingleNodeDockerCluster):
 
 
 class OutputEventHandler(FileSystemEventHandler):
-
     def __init__(self, validator, done_event):
         self.validator = validator
         self.done_event = done_event
@@ -175,6 +191,7 @@ class OutputValidator(object):
         Return True if output is valid; False otherwise.
         """
 
+
 class SingleFileOutputValidator(OutputValidator):
     """
     Validates the content of a single file in the given directory.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c24ed7e/docker/test/integration/test_filesystem_ops.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/test_filesystem_ops.py b/docker/test/integration/test_filesystem_ops.py
index 7a6f212..3ae4ff5 100644
--- a/docker/test/integration/test_filesystem_ops.py
+++ b/docker/test/integration/test_filesystem_ops.py
@@ -25,7 +25,6 @@ def test_get_put():
     flow = GetFile('/tmp/input') >> LogAttribute() >> PutFile('/tmp/output')
 
     with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-
         cluster.put_test_data('test')
         cluster.deploy_flow(flow)
 
@@ -37,17 +36,17 @@ def test_file_exists_failure():
     Verify that putting to a file that already exists fails.
     """
 
-    flow = (GetFile('/tmp/input') >>
+    flow = (GetFile('/tmp/input')
 
             # First put should succeed
-            PutFile('/tmp') >>
+            >> PutFile('/tmp')
 
             # Second put should fail (file exists)
-            PutFile('/tmp') >> (('success', LogAttribute()),
-                                ('failure', LogAttribute() >> PutFile('/tmp/output'))))
+            >> PutFile('/tmp')
+            >> (('success', LogAttribute()),
+                ('failure', LogAttribute() >> PutFile('/tmp/output'))))
 
     with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-
         cluster.put_test_data('test')
         cluster.deploy_flow(flow)
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c24ed7e/docker/test/integration/test_http.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/test_http.py b/docker/test/integration/test_http.py
index 72c80bd..6b6a9f5 100644
--- a/docker/test/integration/test_http.py
+++ b/docker/test/integration/test_http.py
@@ -22,13 +22,13 @@ def test_invoke_listen():
     Verify sending using InvokeHTTP to a receiver using ListenHTTP.
     """
 
-    invoke_flow = (GetFile('/tmp/input') >> LogAttribute() >>
-                   InvokeHTTP('http://minifi-listen:8080/contentListener', method='POST'))
+    invoke_flow = (GetFile('/tmp/input')
+                   >> LogAttribute()
+                   >> InvokeHTTP('http://minifi-listen:8080/contentListener', method='POST'))
 
     listen_flow = ListenHTTP(8080) >> LogAttribute() >> PutFile('/tmp/output')
 
     with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-
         cluster.put_test_data('test')
         cluster.deploy_flow(listen_flow, name='minifi-listen')
         cluster.deploy_flow(invoke_flow, name='minifi-invoke')

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c24ed7e/docker/test/integration/test_s2s.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/test_s2s.py b/docker/test/integration/test_s2s.py
new file mode 100644
index 0000000..2d391d9
--- /dev/null
+++ b/docker/test/integration/test_s2s.py
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from minifi import *
+from minifi.test import *
+
+
+def test_minifi_to_nifi():
+    """
+    Verify sending data from a MiNiFi - C++ to NiFi using S2S protocol.
+    """
+
+    port = InputPort('from-minifi', RemoteProcessGroup('http://nifi:8080/nifi'))
+
+    recv_flow = (port
+                 >> LogAttribute()
+                 >> PutFile('/tmp/output'))
+
+    send_flow = (GetFile('/tmp/input')
+                 >> LogAttribute()
+                 >> port)
+
+    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+        cluster.put_test_data('test')
+        cluster.deploy_flow(recv_flow, name='nifi', engine='nifi')
+        cluster.deploy_flow(send_flow)
+
+        assert cluster.check_output(60)