You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/09/29 17:17:50 UTC
nifi-minifi-cpp git commit: MINIFI-398 Added test framework support
for s2s and added s2s integration test between minifi-cpp and nifi.
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master aaeb929b2 -> 7c24ed7e6
MINIFI-398 Added test framework support for s2s and added s2s integration test between minifi-cpp and nifi.
This closes #136.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/7c24ed7e
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/7c24ed7e
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/7c24ed7e
Branch: refs/heads/master
Commit: 7c24ed7e60b4e553aadba913e83760ac80f95d33
Parents: aaeb929
Author: Andrew I. Christianson <an...@andyic.org>
Authored: Thu Sep 14 13:47:20 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Fri Sep 29 13:17:24 2017 -0400
----------------------------------------------------------------------
docker/test/integration/README.md | 33 +
docker/test/integration/minifi/__init__.py | 806 +++++++++++++++----
docker/test/integration/minifi/test/__init__.py | 49 +-
docker/test/integration/test_filesystem_ops.py | 11 +-
docker/test/integration/test_http.py | 6 +-
docker/test/integration/test_s2s.py | 40 +
6 files changed, 783 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c24ed7e/docker/test/integration/README.md
----------------------------------------------------------------------
diff --git a/docker/test/integration/README.md b/docker/test/integration/README.md
index 6d8c066..c4fb9f4 100644
--- a/docker/test/integration/README.md
+++ b/docker/test/integration/README.md
@@ -64,6 +64,32 @@ The following processors/parameters are supported:
- method='GET'
- ssl\_context\_service=None
+#### Remote Process Groups
+
+Remote process groups and input ports are supported.
+
+**Example InputPort/RemoteProcessGroup:**
+
+```python
+port = InputPort('from-minifi', RemoteProcessGroup('http://nifi:8080/nifi'))
+```
+
+InputPorts may be used as inputs or outputs in the flow DSL:
+
+```python
+recv_flow = (port
+ >> LogAttribute()
+ >> PutFile('/tmp/output'))
+
+send_flow = (GetFile('/tmp/input')
+ >> LogAttribute()
+ >> port)
+```
+
+These example flows could be deployed as separate NiFi/MiNiFi instances where
+the send\_flow would send data to the recv\_flow using the site-to-site
+protocol.
+
### Definition of an output validator
The output validator is responsible for checking the state of a cluster for
@@ -148,6 +174,13 @@ be used.
cluster.deploy_flow(flow, name='test-flow')
```
+The deploy\_flow function defaults to a MiNiFi - C++ engine, but other engines,
+such as NiFi may be used:
+
+```python
+cluster.deploy_flow(flow, engine='nifi')
+```
+
### Execution of one or more flows
Flows are executed immediately upon deployment and according to schedule
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c24ed7e/docker/test/integration/minifi/__init__.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py
index 2d83126..36a1b4f 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -12,13 +12,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import os
-import uuid
+import gzip
import logging
+import tarfile
+import uuid
+import xml.etree.cElementTree as elementTree
+from xml.etree.cElementTree import Element
+from StringIO import StringIO
+from io import BytesIO
+from textwrap import dedent
-import yaml
import docker
+import os
+import yaml
+from copy import copy
class Cluster(object):
@@ -52,66 +59,181 @@ class SingleNodeDockerCluster(Cluster):
"""
def __init__(self):
+ self.minifi_version = os.environ['MINIFI_VERSION']
+ self.nifi_version = '1.4.0-SNAPSHOT'
+ self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version
+ self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version
self.network = None
self.containers = []
+ self.images = []
self.tmp_files = []
# Get docker client
self.client = docker.from_env()
- def deploy_flow(self, flow, name=None, vols=None):
+ def deploy_flow(self,
+ flow,
+ name=None,
+ vols=None,
+ engine='minifi-cpp'):
"""
- Compiles the flow to YAML and maps it into the container using
- the docker volumes API.
+ Compiles the flow to a valid config file and overlays it into a new image.
"""
if vols is None:
vols = {}
- logging.info('Deploying flow...')
+ logging.info('Deploying %s flow...', engine)
if name is None:
- name = 'minifi-' + str(uuid.uuid4())
+ name = engine + '-' + str(uuid.uuid4())
logging.info('Flow name was not provided; using generated name \'%s\'', name)
- minifi_version = os.environ['MINIFI_VERSION']
- self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + minifi_version
+ # Create network if necessary
+ if self.network is None:
+ net_name = 'nifi-' + str(uuid.uuid4())
+ logging.info('Creating network: %s', net_name)
+ self.network = self.client.networks.create(net_name)
+
+ if engine == 'nifi':
+ self.deploy_nifi_flow(flow, name, vols)
+ elif engine == 'minifi-cpp':
+ self.deploy_minifi_cpp_flow(flow, name, vols)
+ else:
+ raise Exception('invalid flow engine: \'%s\'' % engine)
+
+ def deploy_minifi_cpp_flow(self, flow, name, vols):
- # Write flow config
- tmp_flow_file_name = '/tmp/.minifi-flow.' + str(uuid.uuid4())
- self.tmp_files.append(tmp_flow_file_name)
+ # Build configured image
+ dockerfile = dedent("""FROM {base_image}
+ USER root
+ ADD config.yml {minifi_root}/conf/config.yml
+ RUN chown minificpp:minificpp {minifi_root}/conf/config.yml
+ USER minificpp
+ """.format(name=name,
+ base_image='apacheminificpp:' + self.minifi_version,
+ minifi_root=self.minifi_root))
- yaml = flow_yaml(flow)
+ test_flow_yaml = minifi_flow_yaml(flow)
+ logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
- logging.info('Using generated flow config yml:\n%s', yaml)
+ conf_file_buffer = StringIO()
- with open(tmp_flow_file_name, 'w') as tmp_flow_file:
- tmp_flow_file.write(yaml)
+ try:
+ conf_file_buffer.write(test_flow_yaml)
+ conf_file_len = conf_file_buffer.tell()
+ conf_file_buffer.seek(0)
- conf_file = tmp_flow_file_name
+ context_files = [
+ {
+ 'name': 'config.yml',
+ 'size': conf_file_len,
+ 'file_obj': conf_file_buffer
+ }
+ ]
- local_vols = {}
- local_vols[conf_file] = {'bind': self.minifi_root + '/conf/config.yml', 'mode': 'ro'}
- local_vols.update(vols)
+ configured_image = self.build_image(dockerfile, context_files)
+
+ finally:
+ conf_file_buffer.close()
logging.info('Creating and running docker container for flow...')
- # Create network if necessary
- if self.network is None:
- net_name = 'minifi-' + str(uuid.uuid4())
- logging.info('Creating network: %s', net_name)
- self.network = self.client.networks.create(net_name)
+ container = self.client.containers.run(
+ configured_image,
+ detach=True,
+ name=name,
+ network=self.network.name,
+ volumes=vols)
+
+ logging.info('Started container \'%s\'', container.name)
+
+ self.containers.append(container)
+
+ def deploy_nifi_flow(self, flow, name, vols):
+ dockerfile = dedent("""FROM {base_image}
+ USER root
+ ADD flow.xml.gz {nifi_root}/conf/flow.xml.gz
+ RUN chown nifi:nifi {nifi_root}/conf/flow.xml.gz
+ RUN sed -i -e 's/^\(nifi.remote.input.host\)=.*/\\1={name}/' {nifi_root}/conf/nifi.properties
+ RUN sed -i -e 's/^\(nifi.remote.input.socket.port\)=.*/\\1=5000/' {nifi_root}/conf/nifi.properties
+ USER nifi
+ """.format(name=name,
+ base_image='apachenifi:' + self.nifi_version,
+ nifi_root=self.nifi_root))
+
+ test_flow_xml = nifi_flow_xml(flow, self.nifi_version)
+ logging.info('Using generated flow config xml:\n%s', test_flow_xml)
+
+ conf_file_buffer = BytesIO()
+
+ try:
+ with gzip.GzipFile(mode='wb', fileobj=conf_file_buffer) as conf_gz_file_buffer:
+ conf_gz_file_buffer.write(test_flow_xml)
+ conf_file_len = conf_file_buffer.tell()
+ conf_file_buffer.seek(0)
+
+ context_files = [
+ {
+ 'name': 'flow.xml.gz',
+ 'size': conf_file_len,
+ 'file_obj': conf_file_buffer
+ }
+ ]
+
+ configured_image = self.build_image(dockerfile, context_files)
+
+ finally:
+ conf_file_buffer.close()
+
+ logging.info('Creating and running docker container for flow...')
container = self.client.containers.run(
- 'apacheminificpp:' + minifi_version,
+ configured_image,
detach=True,
name=name,
network=self.network.name,
- volumes=local_vols)
+ volumes=vols)
logging.info('Started container \'%s\'', container.name)
+
self.containers.append(container)
+ def build_image(self, dockerfile, context_files):
+ conf_dockerfile_buffer = StringIO()
+ docker_context_buffer = BytesIO()
+
+ try:
+ # Overlay conf onto base nifi image
+ conf_dockerfile_buffer.write(dockerfile)
+ conf_dockerfile_buffer.seek(0)
+
+ with tarfile.open(mode='w', fileobj=docker_context_buffer) as docker_context:
+ dockerfile_info = tarfile.TarInfo('Dockerfile')
+ dockerfile_info.size = conf_dockerfile_buffer.len
+ docker_context.addfile(dockerfile_info,
+ fileobj=conf_dockerfile_buffer)
+
+ for context_file in context_files:
+ file_info = tarfile.TarInfo(context_file['name'])
+ file_info.size = context_file['size']
+ docker_context.addfile(file_info,
+ fileobj=context_file['file_obj'])
+ docker_context_buffer.seek(0)
+
+ logging.info('Creating configured image...')
+ configured_image = self.client.images.build(fileobj=docker_context_buffer,
+ custom_context=True,
+ rm=True,
+ forcerm=True)
+ self.images.append(configured_image)
+
+ finally:
+ conf_dockerfile_buffer.close()
+ docker_context_buffer.close()
+
+ return configured_image
+
def __enter__(self):
"""
Allocate ephemeral cluster resources.
@@ -128,6 +250,11 @@ class SingleNodeDockerCluster(Cluster):
logging.info('Cleaning up container: %s', container.name)
container.remove(v=True, force=True)
+ # Clean up images
+ for image in self.images:
+ logging.info('Cleaning up image: %s', image.id)
+ self.client.images.remove(image.id, force=True)
+
# Clean up network
if self.network is not None:
logging.info('Cleaning up network network: %s', self.network.name)
@@ -138,49 +265,26 @@ class SingleNodeDockerCluster(Cluster):
os.remove(tmp_file)
-class Processor(object):
+class Connectable(object):
def __init__(self,
- clazz,
- properties=None,
- schedule=None,
name=None,
- auto_terminate=None,
- controller_services=None):
-
- if controller_services is None:
- controller_services = []
+ auto_terminate=None):
- if auto_terminate is None:
- auto_terminate = []
-
- if schedule is None:
- schedule = {}
-
- if properties is None:
- properties = {}
-
- self.connections = {}
self.uuid = uuid.uuid4()
if name is None:
self.name = str(self.uuid)
+ else:
+ self.name = name
- self.clazz = clazz
- self.properties = properties
- self.auto_terminate = auto_terminate
- self.controller_services = controller_services
+ if auto_terminate is None:
+ self.auto_terminate = []
+ else:
+ self.auto_terminate = auto_terminate
+ self.connections = {}
self.out_proc = self
- self.schedule = {
- 'scheduling strategy': 'EVENT_DRIVEN',
- 'scheduling period': '1 sec',
- 'penalization period': '30 sec',
- 'yield period': '1 sec',
- 'run duration nanos': 0
- }
- self.schedule.update(schedule)
-
def connect(self, connections):
for rel in connections:
@@ -189,7 +293,7 @@ class Processor(object):
del self.auto_terminate[self.auto_terminate.index(rel)]
# Add to set of output connections for this rel
- if not rel in self.connections:
+ if rel not in self.connections:
self.connections[rel] = []
self.connections[rel].append(connections[rel])
@@ -203,69 +307,154 @@ class Processor(object):
"""
+ connected = copy(self)
+ connected.connections = copy(self.connections)
+
+ if self.out_proc is self:
+ connected.out_proc = connected
+ else:
+ connected.out_proc = copy(connected.out_proc)
+
if isinstance(other, tuple):
if isinstance(other[0], tuple):
for rel_tuple in other:
rel = {rel_tuple[0]: rel_tuple[1]}
- self.out_proc.connect(rel)
+ connected.out_proc.connect(rel)
else:
rel = {other[0]: other[1]}
- self.out_proc.connect(rel)
+ connected.out_proc.connect(rel)
else:
- self.out_proc.connect({'success': other})
- self.out_proc = other
+ connected.out_proc.connect({'success': other})
+ connected.out_proc = other
- return self
+ return connected
-def InvokeHTTP(url,
- method='GET',
- ssl_context_service=None):
- properties = {'Remote URL': url, 'HTTP Method': method}
+class Processor(Connectable):
+ def __init__(self,
+ clazz,
+ properties=None,
+ schedule=None,
+ name=None,
+ controller_services=None,
+ auto_terminate=None):
- controller_services = []
+ super(Processor, self).__init__(name=name,
+ auto_terminate=auto_terminate)
- if ssl_context_service is not None:
- properties['SSL Context Service'] = ssl_context_service.name
- controller_services.append(ssl_context_service)
+ if controller_services is None:
+ controller_services = []
- return Processor('InvokeHTTP',
- properties=properties,
- controller_services=controller_services,
- auto_terminate=['success',
- 'response',
- 'retry',
- 'failure',
- 'no retry'])
+ if schedule is None:
+ schedule = {}
+
+ if properties is None:
+ properties = {}
+
+ if name is None:
+ pass
+
+ self.clazz = clazz
+ self.properties = properties
+ self.controller_services = controller_services
+
+ self.schedule = {
+ 'scheduling strategy': 'EVENT_DRIVEN',
+ 'scheduling period': '1 sec',
+ 'penalization period': '30 sec',
+ 'yield period': '1 sec',
+ 'run duration nanos': 0
+ }
+ self.schedule.update(schedule)
+
+ def nifi_property_key(self, key):
+ """
+ Returns the Apache NiFi-equivalent property key for the given key. This is often, but not always, the same as
+ the internal key.
+ """
+ return key
-def ListenHTTP(port, cert=None):
- properties = {'Listening Port': port}
+class InvokeHTTP(Processor):
+ def __init__(self, url,
+ method='GET',
+ ssl_context_service=None):
+ properties = {'Remote URL': url, 'HTTP Method': method}
- if cert is not None:
- properties['SSL Certificate'] = cert
+ controller_services = []
- return Processor('ListenHTTP',
- properties=properties,
- auto_terminate=['success'])
+ if ssl_context_service is not None:
+ properties['SSL Context Service'] = ssl_context_service.name
+ controller_services.append(ssl_context_service)
+ super(InvokeHTTP, self).__init__('InvokeHTTP',
+ properties=properties,
+ controller_services=controller_services,
+ auto_terminate=['success',
+ 'response',
+ 'retry',
+ 'failure',
+ 'no retry'])
-def LogAttribute():
- return Processor('LogAttribute',
- auto_terminate=['success'])
+class ListenHTTP(Processor):
+ def __init__(self, port, cert=None):
+ properties = {'Listening Port': port}
-def GetFile(input_dir):
- return Processor('GetFile',
- properties={'Input Directory': input_dir},
- schedule={'scheduling period': '0 sec'},
- auto_terminate=['success'])
+ if cert is not None:
+ properties['SSL Certificate'] = cert
+
+ super(ListenHTTP, self).__init__('ListenHTTP',
+ properties=properties,
+ auto_terminate=['success'])
+
+
+class LogAttribute(Processor):
+ def __init__(self, ):
+ super(LogAttribute, self).__init__('LogAttribute',
+ auto_terminate=['success'])
+
+
+class GetFile(Processor):
+ def __init__(self, input_dir):
+ super(GetFile, self).__init__('GetFile',
+ properties={'Input Directory': input_dir},
+ schedule={'scheduling period': '0 sec'},
+ auto_terminate=['success'])
+
+
+class PutFile(Processor):
+ def __init__(self, output_dir):
+ super(PutFile, self).__init__('PutFile',
+ properties={'Output Directory': output_dir},
+ auto_terminate=['success', 'failure'])
+
+ def nifi_property_key(self, key):
+ if key == 'Output Directory':
+ return 'Directory'
+ else:
+ return key
-def PutFile(output_dir):
- return Processor('PutFile',
- properties={'Output Directory': output_dir},
- auto_terminate=['success', 'failure'])
+class InputPort(Connectable):
+ def __init__(self, name=None, remote_process_group=None):
+ super(InputPort, self).__init__(name=name)
+
+ self.remote_process_group = remote_process_group
+
+
+class RemoteProcessGroup(object):
+ def __init__(self, url,
+ name=None):
+
+ self.uuid = uuid.uuid4()
+
+ if name is None:
+ self.name = str(self.uuid)
+ else:
+ self.name = name
+
+ self.url = url
class ControllerService(object):
@@ -301,7 +490,7 @@ class SSLContextService(ControllerService):
self.properties['CA Certificate'] = ca_cert
-def flow_yaml(processor, root=None, visited=None):
+def minifi_flow_yaml(connectable, root=None, visited=None):
if visited is None:
visited = []
@@ -318,60 +507,403 @@ def flow_yaml(processor, root=None, visited=None):
else:
res = root
- visited.append(processor)
+ visited.append(connectable)
- if hasattr(processor, 'name'):
- proc_name = processor.name
+ if hasattr(connectable, 'name'):
+ connectable_name = connectable.name
else:
- proc_name = str(processor.uuid)
-
- res['Processors'].append({
- 'name': proc_name,
- 'id': str(processor.uuid),
- 'class': 'org.apache.nifi.processors.standard.' + processor.clazz,
- 'scheduling strategy': processor.schedule['scheduling strategy'],
- 'scheduling period': processor.schedule['scheduling period'],
- 'penalization period': processor.schedule['penalization period'],
- 'yield period': processor.schedule['yield period'],
- 'run duration nanos': processor.schedule['run duration nanos'],
- 'Properties': processor.properties,
- 'auto-terminated relationships list': processor.auto_terminate
- })
-
- for svc in processor.controller_services:
- if svc in visited:
- continue
-
- visited.append(svc)
- res['Controller Services'].append({
- 'name': svc.name,
- 'id': svc.id,
- 'class': svc.service_class,
- 'Properties': svc.properties
+ connectable_name = str(connectable.uuid)
+
+ if isinstance(connectable, InputPort):
+ group = connectable.remote_process_group
+ res_group = None
+
+ for res_group_candidate in res['Remote Processing Groups']:
+ assert isinstance(res_group_candidate, dict)
+ if res_group_candidate['id'] == str(group.uuid):
+ res_group = res_group_candidate
+
+ if res_group is None:
+ res_group = {
+ 'name': group.name,
+ 'id': str(group.uuid),
+ 'url': group.url,
+ 'timeout': '30 sec',
+ 'yield period': '10 sec',
+ 'Input Ports': []
+ }
+
+ res['Remote Processing Groups'].append(res_group)
+
+ res_group['Input Ports'].append({
+ 'id': str(connectable.uuid),
+ 'name': connectable.name,
+ 'max concurrent tasks': 1,
+ 'Properties': {}
+ })
+
+ if isinstance(connectable, Processor):
+ res['Processors'].append({
+ 'name': connectable_name,
+ 'id': str(connectable.uuid),
+ 'class': 'org.apache.nifi.processors.standard.' + connectable.clazz,
+ 'scheduling strategy': connectable.schedule['scheduling strategy'],
+ 'scheduling period': connectable.schedule['scheduling period'],
+ 'penalization period': connectable.schedule['penalization period'],
+ 'yield period': connectable.schedule['yield period'],
+ 'run duration nanos': connectable.schedule['run duration nanos'],
+ 'Properties': connectable.properties,
+ 'auto-terminated relationships list': connectable.auto_terminate
})
- for conn_name in processor.connections:
- conn_procs = processor.connections[conn_name]
+ for svc in connectable.controller_services:
+ if svc in visited:
+ continue
+
+ visited.append(svc)
+ res['Controller Services'].append({
+ 'name': svc.name,
+ 'id': svc.id,
+ 'class': svc.service_class,
+ 'Properties': svc.properties
+ })
+
+ for conn_name in connectable.connections:
+ conn_procs = connectable.connections[conn_name]
if isinstance(conn_procs, list):
for proc in conn_procs:
res['Connections'].append({
'name': str(uuid.uuid4()),
- 'source id': str(processor.uuid),
+ 'source id': str(connectable.uuid),
'source relationship name': conn_name,
'destination id': str(proc.uuid)
})
if proc not in visited:
- flow_yaml(proc, res, visited)
+ minifi_flow_yaml(proc, res, visited)
else:
res['Connections'].append({
'name': str(uuid.uuid4()),
- 'source id': str(processor.uuid),
+ 'source id': str(connectable.uuid),
'source relationship name': conn_name,
'destination id': str(conn_procs.uuid)
})
if conn_procs not in visited:
- flow_yaml(conn_procs, res, visited)
+ minifi_flow_yaml(conn_procs, res, visited)
if root is None:
return yaml.dump(res, default_flow_style=False)
+
+
+def nifi_flow_xml(connectable, nifi_version=None, root=None, visited=None):
+ if visited is None:
+ visited = []
+
+ position = Element('position')
+ position.set('x', '0.0')
+ position.set('y', '0.0')
+
+ comment = Element('comment')
+ styles = Element('styles')
+ bend_points = Element('bendPoints')
+ label_index = Element('labelIndex')
+ label_index.text = '1'
+ z_index = Element('zIndex')
+ z_index.text = '0'
+
+ if root is None:
+ res = Element('flowController')
+ max_timer_driven_thread_count = Element('maxTimerDrivenThreadCount')
+ max_timer_driven_thread_count.text = '10'
+ res.append(max_timer_driven_thread_count)
+ max_event_driven_thread_count = Element('maxEventDrivenThreadCount')
+ max_event_driven_thread_count.text = '5'
+ res.append(max_event_driven_thread_count)
+ root_group = Element('rootGroup')
+ root_group_id = Element('id')
+ root_group_id_text = str(uuid.uuid4())
+ root_group_id.text = root_group_id_text
+ root_group.append(root_group_id)
+ root_group_name = Element('name')
+ root_group_name.text = root_group_id_text
+ root_group.append(root_group_name)
+ res.append(root_group)
+ root_group.append(position)
+ root_group.append(comment)
+ res.append(Element('controllerServices'))
+ res.append(Element('reportingTasks'))
+ res.set('encoding-version', '1.2')
+ else:
+ res = root
+
+ visited.append(connectable)
+
+ if hasattr(connectable, 'name'):
+ connectable_name_text = connectable.name
+ else:
+ connectable_name_text = str(connectable.uuid)
+
+ if isinstance(connectable, InputPort):
+ input_port = Element('inputPort')
+
+ input_port_id = Element('id')
+ input_port_id.text = str(connectable.uuid)
+ input_port.append(input_port_id)
+
+ input_port_name = Element('name')
+ input_port_name.text = connectable_name_text
+ input_port.append(input_port_name)
+
+ input_port.append(position)
+ input_port.append(comment)
+
+ input_port_scheduled_state = Element('scheduledState')
+ input_port_scheduled_state.text = 'RUNNING'
+ input_port.append(input_port_scheduled_state)
+
+ input_port_max_concurrent_tasks = Element('maxConcurrentTasks')
+ input_port_max_concurrent_tasks.text = '1'
+ input_port.append(input_port_max_concurrent_tasks)
+
+ res.iterfind('rootGroup').next().append(input_port)
+
+ if isinstance(connectable, Processor):
+ conn_destination = Element('processor')
+
+ proc_id = Element('id')
+ proc_id.text = str(connectable.uuid)
+ conn_destination.append(proc_id)
+
+ proc_name = Element('name')
+ proc_name.text = connectable_name_text
+ conn_destination.append(proc_name)
+
+ conn_destination.append(position)
+ conn_destination.append(styles)
+ conn_destination.append(comment)
+
+ proc_class = Element('class')
+ proc_class.text = 'org.apache.nifi.processors.standard.' + connectable.clazz
+ conn_destination.append(proc_class)
+
+ proc_bundle = Element('bundle')
+ proc_bundle_group = Element('group')
+ proc_bundle_group.text = 'org.apache.nifi'
+ proc_bundle.append(proc_bundle_group)
+ proc_bundle_artifact = Element('artifact')
+ proc_bundle_artifact.text = 'nifi-standard-nar'
+ proc_bundle.append(proc_bundle_artifact)
+ proc_bundle_version = Element('version')
+ proc_bundle_version.text = nifi_version
+ proc_bundle.append(proc_bundle_version)
+ conn_destination.append(proc_bundle)
+
+ proc_max_concurrent_tasks = Element('maxConcurrentTasks')
+ proc_max_concurrent_tasks.text = '1'
+ conn_destination.append(proc_max_concurrent_tasks)
+
+ proc_scheduling_period = Element('schedulingPeriod')
+ proc_scheduling_period.text = connectable.schedule['scheduling period']
+ conn_destination.append(proc_scheduling_period)
+
+ proc_penalization_period = Element('penalizationPeriod')
+ proc_penalization_period.text = connectable.schedule['penalization period']
+ conn_destination.append(proc_penalization_period)
+
+ proc_yield_period = Element('yieldPeriod')
+ proc_yield_period.text = connectable.schedule['yield period']
+ conn_destination.append(proc_yield_period)
+
+ proc_bulletin_level = Element('bulletinLevel')
+ proc_bulletin_level.text = 'WARN'
+ conn_destination.append(proc_bulletin_level)
+
+ proc_loss_tolerant = Element('lossTolerant')
+ proc_loss_tolerant.text = 'false'
+ conn_destination.append(proc_loss_tolerant)
+
+ proc_scheduled_state = Element('scheduledState')
+ proc_scheduled_state.text = 'RUNNING'
+ conn_destination.append(proc_scheduled_state)
+
+ proc_scheduling_strategy = Element('schedulingStrategy')
+ proc_scheduling_strategy.text = connectable.schedule['scheduling strategy']
+ conn_destination.append(proc_scheduling_strategy)
+
+ proc_execution_node = Element('executionNode')
+ proc_execution_node.text = 'ALL'
+ conn_destination.append(proc_execution_node)
+
+ proc_run_duration_nanos = Element('runDurationNanos')
+ proc_run_duration_nanos.text = str(connectable.schedule['run duration nanos'])
+ conn_destination.append(proc_run_duration_nanos)
+
+ for property_key, property_value in connectable.properties.iteritems():
+ proc_property = Element('property')
+ proc_property_name = Element('name')
+ proc_property_name.text = connectable.nifi_property_key(property_key)
+ proc_property.append(proc_property_name)
+ proc_property_value = Element('value')
+ proc_property_value.text = property_value
+ proc_property.append(proc_property_value)
+ conn_destination.append(proc_property)
+
+ for auto_terminate_rel in connectable.auto_terminate:
+ proc_auto_terminated_relationship = Element('autoTerminatedRelationship')
+ proc_auto_terminated_relationship.text = auto_terminate_rel
+ conn_destination.append(proc_auto_terminated_relationship)
+
+ res.iterfind('rootGroup').next().append(conn_destination)
+
+ for svc in connectable.controller_services:
+ if svc in visited:
+ continue
+
+ visited.append(svc)
+ controller_service = Element('controllerService')
+
+ controller_service_id = Element('id')
+ controller_service_id.text = str(svc.id)
+ controller_service.append(controller_service_id)
+
+ controller_service_name = Element('name')
+ controller_service_name.text = svc.name
+ controller_service.append(controller_service_name)
+
+ controller_service.append(comment)
+
+ controller_service_class = Element('class')
+ controller_service_class.text = svc.service_class,
+ controller_service.append(controller_service_class)
+
+ controller_service_bundle = Element('bundle')
+ controller_service_bundle_group = Element('group')
+ controller_service_bundle_group.text = svc.group
+ controller_service_bundle.append(controller_service_bundle_group)
+ controller_service_bundle_artifact = Element('artifact')
+ controller_service_bundle_artifact.text = svc.artifact
+ controller_service_bundle.append(controller_service_bundle_artifact)
+ controller_service_bundle_version = Element('version')
+ controller_service_bundle_version.text = nifi_version
+ controller_service_bundle.append(controller_service_bundle_version)
+ controller_service.append(controller_service_bundle)
+
+ controller_enabled = Element('enabled')
+ controller_enabled.text = 'true',
+ controller_service.append(controller_enabled)
+
+ for property_key, property_value in svc.properties:
+ controller_service_property = Element('property')
+ controller_service_property_name = Element('name')
+ controller_service_property_name.text = property_key
+ controller_service_property.append(controller_service_property_name)
+ controller_service_property_value = Element('value')
+ controller_service_property_value.text = property_value
+ controller_service_property.append(controller_service_property_value)
+ controller_service.append(controller_service_property)
+
+ res.iterfind('rootGroup').next().append(controller_service)
+
+ for conn_name in connectable.connections:
+ conn_destinations = connectable.connections[conn_name]
+
+ if isinstance(conn_destinations, list):
+ for conn_destination in conn_destinations:
+ connection = nifi_flow_xml_connection(res,
+ bend_points,
+ conn_name,
+ connectable,
+ label_index,
+ conn_destination,
+ z_index)
+
+ res.iterfind('rootGroup').next().append(connection)
+
+ if conn_destination not in visited:
+ nifi_flow_xml(conn_destination, nifi_version, res, visited)
+ else:
+ connection = nifi_flow_xml_connection(res,
+ bend_points,
+ conn_name,
+ connectable,
+ label_index,
+ conn_destinations,
+ z_index)
+
+ res.iterfind('rootGroup').next().append(connection)
+
+ if conn_destinations not in visited:
+ nifi_flow_xml(conn_destinations, nifi_version, res, visited)
+
+ if root is None:
+ return ('<?xml version="1.0" encoding="UTF-8" standalone="no"?>'
+ + "\n"
+ + elementTree.tostring(res, encoding='utf-8'))
+
+
+def nifi_flow_xml_connection(res, bend_points, conn_name, connectable, label_index, destination, z_index):
+ connection = Element('connection')
+
+ connection_id = Element('id')
+ connection_id.text = str(uuid.uuid4())
+ connection.append(connection_id)
+
+ connection_name = Element('name')
+ connection.append(connection_name)
+
+ connection.append(bend_points)
+ connection.append(label_index)
+ connection.append(z_index)
+
+ connection_source_id = Element('sourceId')
+ connection_source_id.text = str(connectable.uuid)
+ connection.append(connection_source_id)
+
+ connection_source_group_id = Element('sourceGroupId')
+ connection_source_group_id.text = res.iterfind('rootGroup/id').next().text
+ connection.append(connection_source_group_id)
+
+ connection_source_type = Element('sourceType')
+ if isinstance(connectable, Processor):
+ connection_source_type.text = 'PROCESSOR'
+ elif isinstance(connectable, InputPort):
+ connection_source_type.text = 'INPUT_PORT'
+ else:
+ raise Exception('Unexpected source type: %s' % type(connectable))
+ connection.append(connection_source_type)
+
+ connection_destination_id = Element('destinationId')
+ connection_destination_id.text = str(destination.uuid)
+ connection.append(connection_destination_id)
+
+ connection_destination_group_id = Element('destinationGroupId')
+ connection_destination_group_id.text = res.iterfind('rootGroup/id').next().text
+ connection.append(connection_destination_group_id)
+
+ connection_destination_type = Element('destinationType')
+ if isinstance(destination, Processor):
+ connection_destination_type.text = 'PROCESSOR'
+ elif isinstance(destination, InputPort):
+ connection_destination_type.text = 'INPUT_PORT'
+ else:
+ raise Exception('Unexpected destination type: %s' % type(destination))
+ connection.append(connection_destination_type)
+
+ connection_relationship = Element('relationship')
+ if not isinstance(connectable, InputPort):
+ connection_relationship.text = conn_name
+ connection.append(connection_relationship)
+
+ connection_max_work_queue_size = Element('maxWorkQueueSize')
+ connection_max_work_queue_size.text = '10000'
+ connection.append(connection_max_work_queue_size)
+
+ connection_max_work_queue_data_size = Element('maxWorkQueueDataSize')
+ connection_max_work_queue_data_size.text = '1 GB'
+ connection.append(connection_max_work_queue_data_size)
+
+ connection_flow_file_expiration = Element('flowFileExpiration')
+ connection_flow_file_expiration.text = '0 sec'
+ connection.append(connection_flow_file_expiration)
+
+ return connection
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c24ed7e/docker/test/integration/minifi/test/__init__.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py
index 8c0aee9..dbb3888 100644
--- a/docker/test/integration/minifi/test/__init__.py
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -36,15 +36,14 @@ def put_file_contents(contents, file_abs_path):
class DockerTestCluster(SingleNodeDockerCluster):
-
def __init__(self, output_validator):
# Create test input/output directories
test_cluster_id = str(uuid.uuid4())
- self.tmp_test_output_dir = '/tmp/.minifi-test-output.' + test_cluster_id
- self.tmp_test_input_dir = '/tmp/.minifi-test-input.' + test_cluster_id
- self.tmp_test_resources_dir = '/tmp/.minifi-test-resources.' + test_cluster_id
+ self.tmp_test_output_dir = '/tmp/.nifi-test-output.' + test_cluster_id
+ self.tmp_test_input_dir = '/tmp/.nifi-test-input.' + test_cluster_id
+ self.tmp_test_resources_dir = '/tmp/.nifi-test-resources.' + test_cluster_id
logging.info('Creating tmp test input dir: %s', self.tmp_test_input_dir)
os.makedirs(self.tmp_test_input_dir, mode=0777)
@@ -66,17 +65,27 @@ class DockerTestCluster(SingleNodeDockerCluster):
super(DockerTestCluster, self).__init__()
- def deploy_flow(self, flow, name=None, vols=None):
+ def deploy_flow(self,
+ flow,
+ name=None,
+ vols=None,
+ engine='minifi-cpp'):
"""
Performs a standard container flow deployment with the addition
of volumes supporting test input/output directories.
"""
- vols = {self.tmp_test_input_dir: {'bind': '/tmp/input', 'mode': 'rw'},
- self.tmp_test_output_dir: {'bind': '/tmp/output', 'mode': 'rw'},
- self.tmp_test_resources_dir: {'bind': '/tmp/resources', 'mode': 'rw'}}
+ if vols is None:
+ vols = {}
+
+ vols[self.tmp_test_input_dir] = {'bind': '/tmp/input', 'mode': 'rw'}
+ vols[self.tmp_test_output_dir] = {'bind': '/tmp/output', 'mode': 'rw'}
+ vols[self.tmp_test_resources_dir] = {'bind': '/tmp/resources', 'mode': 'rw'}
- super(DockerTestCluster, self).deploy_flow(flow, vols=vols, name=name)
+ super(DockerTestCluster, self).deploy_flow(flow,
+ vols=vols,
+ name=name,
+ engine=engine)
def put_test_data(self, contents):
"""
@@ -103,14 +112,22 @@ class DockerTestCluster(SingleNodeDockerCluster):
self.observer.stop()
self.observer.join()
- def log_minifi_output(self):
+ def log_nifi_output(self):
for container in self.containers:
container = self.client.containers.get(container.id)
logging.info('Container logs for container \'%s\':\n%s', container.name, container.logs())
if container.status == 'running':
- app_logs = container.exec_run('cat ' + self.minifi_root + '/minifi-app.log')
- logging.info('MiNiFi app logs for container \'%s\':\n%s', container.name, app_logs)
+ minifi_app_logs = container.exec_run('/bin/sh -c \'test -f ' + self.minifi_root + '/minifi-app.log '
+ '&& cat ' +
+ self.minifi_root + '/minifi-app.log\'')
+ if len(minifi_app_logs) > 0:
+ logging.info('MiNiFi app logs for container \'%s\':\n%s', container.name, minifi_app_logs)
+ nifi_app_logs = container.exec_run('/bin/sh -c \'test -f ' + self.nifi_root + '/logs/nifi-app.log '
+ '&& cat ' +
+ self.nifi_root + '/logs/nifi-app.log\'')
+ if len(nifi_app_logs) > 0:
+ logging.info('NiFi app logs for container \'%s\':\n%s', container.name, nifi_app_logs)
else:
logging.info(container.status)
logging.info('Could not cat app logs for container \'%s\' because it is not running',
@@ -118,12 +135,12 @@ class DockerTestCluster(SingleNodeDockerCluster):
stats = container.stats(decode=True, stream=False)
logging.info('Container stats:\n%s', repr(stats))
- def check_output(self):
+ def check_output(self, timeout=5):
"""
Wait for flow output, validate it, and log minifi output.
"""
- self.wait_for_output(5)
- self.log_minifi_output()
+ self.wait_for_output(timeout)
+ self.log_nifi_output()
return self.output_validator.validate()
@@ -143,7 +160,6 @@ class DockerTestCluster(SingleNodeDockerCluster):
class OutputEventHandler(FileSystemEventHandler):
-
def __init__(self, validator, done_event):
self.validator = validator
self.done_event = done_event
@@ -175,6 +191,7 @@ class OutputValidator(object):
Return True if output is valid; False otherwise.
"""
+
class SingleFileOutputValidator(OutputValidator):
"""
Validates the content of a single file in the given directory.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c24ed7e/docker/test/integration/test_filesystem_ops.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/test_filesystem_ops.py b/docker/test/integration/test_filesystem_ops.py
index 7a6f212..3ae4ff5 100644
--- a/docker/test/integration/test_filesystem_ops.py
+++ b/docker/test/integration/test_filesystem_ops.py
@@ -25,7 +25,6 @@ def test_get_put():
flow = GetFile('/tmp/input') >> LogAttribute() >> PutFile('/tmp/output')
with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-
cluster.put_test_data('test')
cluster.deploy_flow(flow)
@@ -37,17 +36,17 @@ def test_file_exists_failure():
Verify that putting to a file that already exists fails.
"""
- flow = (GetFile('/tmp/input') >>
+ flow = (GetFile('/tmp/input')
# First put should succeed
- PutFile('/tmp') >>
+ >> PutFile('/tmp')
# Second put should fail (file exists)
- PutFile('/tmp') >> (('success', LogAttribute()),
- ('failure', LogAttribute() >> PutFile('/tmp/output'))))
+ >> PutFile('/tmp')
+ >> (('success', LogAttribute()),
+ ('failure', LogAttribute() >> PutFile('/tmp/output'))))
with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-
cluster.put_test_data('test')
cluster.deploy_flow(flow)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c24ed7e/docker/test/integration/test_http.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/test_http.py b/docker/test/integration/test_http.py
index 72c80bd..6b6a9f5 100644
--- a/docker/test/integration/test_http.py
+++ b/docker/test/integration/test_http.py
@@ -22,13 +22,13 @@ def test_invoke_listen():
Verify sending using InvokeHTTP to a receiver using ListenHTTP.
"""
- invoke_flow = (GetFile('/tmp/input') >> LogAttribute() >>
- InvokeHTTP('http://minifi-listen:8080/contentListener', method='POST'))
+ invoke_flow = (GetFile('/tmp/input')
+ >> LogAttribute()
+ >> InvokeHTTP('http://minifi-listen:8080/contentListener', method='POST'))
listen_flow = ListenHTTP(8080) >> LogAttribute() >> PutFile('/tmp/output')
with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-
cluster.put_test_data('test')
cluster.deploy_flow(listen_flow, name='minifi-listen')
cluster.deploy_flow(invoke_flow, name='minifi-invoke')
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c24ed7e/docker/test/integration/test_s2s.py
----------------------------------------------------------------------
diff --git a/docker/test/integration/test_s2s.py b/docker/test/integration/test_s2s.py
new file mode 100644
index 0000000..2d391d9
--- /dev/null
+++ b/docker/test/integration/test_s2s.py
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from minifi import *
+from minifi.test import *
+
+
+def test_minifi_to_nifi():
+ """
+ Verify sending data from a MiNiFi - C++ to NiFi using S2S protocol.
+ """
+
+ port = InputPort('from-minifi', RemoteProcessGroup('http://nifi:8080/nifi'))
+
+ recv_flow = (port
+ >> LogAttribute()
+ >> PutFile('/tmp/output'))
+
+ send_flow = (GetFile('/tmp/input')
+ >> LogAttribute()
+ >> port)
+
+ with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+ cluster.put_test_data('test')
+ cluster.deploy_flow(recv_flow, name='nifi', engine='nifi')
+ cluster.deploy_flow(send_flow)
+
+ assert cluster.check_output(60)