You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2021/04/22 14:24:43 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1345 Add flake8 check for python files

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 016e034  MINIFICPP-1345 Add flake8 check for python files
016e034 is described below

commit 016e034b868c29bc4e7090723dd5aabbea761f7c
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Mar 10 10:46:26 2021 +0100

    MINIFICPP-1345 Add flake8 check for python files
    
    Fix flake8 issues
    Add flake8 target
    Add flake8 to CI checks
    Ignore F811 on file level in steps.py
    Use W504 instead of W503
    Fix flake8 issues of Azure tests
    Add documentation on flake8 checks
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    
    This closes #1029
---
 .github/workflows/ci.yml                           |   4 +-
 CMakeLists.txt                                     |   3 +
 README.md                                          |   3 +
 bootstrap.py                                       | 335 ++++++++++-----------
 .../integration/MiNiFi_integration_test_driver.py  |   7 +-
 docker/test/integration/environment.py             |  13 +-
 docker/test/integration/minifi/core/Connectable.py |   2 +-
 .../integration/minifi/core/ControllerService.py   |   1 +
 .../integration/minifi/core/DockerTestCluster.py   |  15 +-
 .../minifi/core/DockerTestDirectoryBindings.py     |   3 +-
 .../integration/minifi/core/FileSystemObserver.py  |   3 +-
 docker/test/integration/minifi/core/InputPort.py   |   1 +
 .../integration/minifi/core/OutputEventHandler.py  |   1 +
 docker/test/integration/minifi/core/Processor.py   |   1 -
 .../integration/minifi/core/RemoteProcessGroup.py  |   6 +-
 .../integration/minifi/core/SSLContextService.py   |   1 +
 .../test/integration/minifi/core/SSL_cert_utils.py |   3 +
 .../minifi/core/SingleNodeDockerCluster.py         |  97 +++---
 .../Minifi_flow_yaml_serializer.py                 |   1 +
 .../flow_serialization/Nifi_flow_xml_serializer.py |  48 +--
 .../minifi/processors/DeleteS3Object.py            |  18 +-
 .../minifi/processors/GenerateFlowFile.py          |   8 +-
 .../test/integration/minifi/processors/GetFile.py  |  15 +-
 .../integration/minifi/processors/HashContent.py   |  12 +-
 .../integration/minifi/processors/InvokeHTTP.py    |  40 +--
 .../integration/minifi/processors/ListenHTTP.py    |  10 +-
 .../integration/minifi/processors/LogAttribute.py  |   8 +-
 .../integration/minifi/processors/PublishKafka.py  |  20 +-
 .../test/integration/minifi/processors/PutFile.py  |   4 +-
 .../integration/minifi/processors/PutS3Object.py   |  23 +-
 .../minifi/validators/EmptyFilesOutPutValidator.py |   3 +-
 .../minifi/validators/FileOutputValidator.py       |   1 +
 .../minifi/validators/MultiFileOutputValidator.py  |  25 +-
 .../minifi/validators/NoFileOutPutValidator.py     |   3 +-
 .../minifi/validators/OutputValidator.py           |   1 -
 .../minifi/validators/SegfaultValidator.py         |   1 +
 .../minifi/validators/SingleFileOutputValidator.py |   5 +-
 docker/test/integration/steps/steps.py             |  63 ++--
 .../pythonprocessors/examples/SentimentAnalysis.py |  36 ++-
 .../pythonprocessors/google/SentimentAnalyzer.py   |  52 ++--
 extensions/pythonprocessors/h2o/ConvertDsToCsv.py  |  13 +-
 .../h2o/h2o3/mojo/ExecuteH2oMojoScoring.py         |  18 +-
 extensions/script/ExampleProcessor.py              |  14 +-
 .../test_scripts/non_transferring_processor.py     |  16 +-
 ...passthrough_processor_transfering_to_failure.py |  18 +-
 ...passthrough_processor_transfering_to_success.py |  18 +-
 .../test_scripts/stateful_processor.py             |  42 +--
 python/getFile.py                                  |  28 +-
 python/minifi/__init__.py                          |  66 ++--
 run_flake8.sh                                      |  22 ++
 50 files changed, 630 insertions(+), 521 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index a9d6197..91a5931 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -211,7 +211,7 @@ jobs:
       - id: install_deps
         run: |
           sudo apt update
-          sudo apt install -y ccache libfl-dev libpcap-dev libboost-all-dev libsqliteodbc
+          sudo apt install -y ccache libfl-dev libpcap-dev libboost-all-dev libsqliteodbc flake8
           sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so
           echo "PATH=/usr/lib/ccache:$PATH" >> $GITHUB_ENV
           echo -e "127.0.0.1\t$HOSTNAME" | sudo tee -a /etc/hosts > /dev/null
@@ -219,6 +219,8 @@ jobs:
         run: ./bootstrap.sh -e -t && cd build  && cmake -DUSE_SHARED_LIBS= -DCMAKE_BUILD_TYPE=Release -DENABLE_BUSTACHE=ON -DENABLE_SQL=ON -DENABLE_PCAP=ON -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON .. && make -j4 VERBOSE=1  && make test ARGS="--timeout 300 -j2 --output-on-failure"
       - id: shellcheck
         run: cd build && make shellcheck
+      - id: flake8_check
+        run: cd build && make flake8
   ubuntu_20_04_all_clang:
     name: "ubuntu-20.04-all-clang"
     runs-on: ubuntu-20.04
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 40df39e..6a222db 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -864,6 +864,9 @@ add_custom_target(linter
 
 add_custom_target(shellcheck
 	COMMAND ${CMAKE_SOURCE_DIR}/run_shellcheck.sh ${CMAKE_SOURCE_DIR})
+
+add_custom_target(flake8
+	COMMAND ${CMAKE_SOURCE_DIR}/run_flake8.sh ${CMAKE_SOURCE_DIR})
 endif(NOT WIN32)
 
 # Custom target to download and run Apache Release Audit Tool (RAT)
diff --git a/README.md b/README.md
index 915f2dc..43ee8e2 100644
--- a/README.md
+++ b/README.md
@@ -597,6 +597,9 @@ New contributions are expected to pass the shellcheck analysis as part of the ve
 If a shellcheck requested change is unfeasable it shall be disabled on per-line basis and will be subjected to review.
 For more information on an issue please check the [shellcheck wiki page](https://github.com/koalaman/shellcheck/wiki).
 
+Python script files shall follow the PEP8 guidelines and best practices. The project includes [flake8](https://flake8.pycqa.org/en/latest/) checks
+as part of the verification process, that is applied to all new contributions.
+
 Additionally, all new files must include a copy of the Apache License Header.
 
 For more details on how to contribute please see our [Contribution Guide](CONTRIB.md)
diff --git a/bootstrap.py b/bootstrap.py
index cfe76e8..6a7d72d 100755
--- a/bootstrap.py
+++ b/bootstrap.py
@@ -18,220 +18,217 @@
 
 from __future__ import print_function
 
-MINIFI_SUBFOLDER = '/nifi/nifi-minifi-cpp/'
-APACHE_CLOSER_REPO_JSON_URL = 'https://www.apache.org/dyn/closer.cgi?as_json=1&path=/nifi/nifi-minifi-cpp'
-APACHE_MIRROR_LIST = "http://www.apache.org/mirrors/"
-
 import argparse
 import sys
-
-if sys.version_info[0] < 3:
-  from urllib2 import urlopen
-  input = raw_input
-else:
-  from urllib.request import urlopen
-
 import json
 import os.path
 import platform
 import tarfile
 
-
 from distutils.util import strtobool
 from ftplib import FTP
 
+if sys.version_info[0] < 3:
+    from urllib2 import urlopen
+    input = raw_input
+else:
+    from urllib.request import urlopen
+
+MINIFI_SUBFOLDER = '/nifi/nifi-minifi-cpp/'
+APACHE_CLOSER_REPO_JSON_URL = 'https://www.apache.org/dyn/closer.cgi?as_json=1&path=/nifi/nifi-minifi-cpp'
+APACHE_MIRROR_LIST = "http://www.apache.org/mirrors/"
+
+
 def install_package(package_name):
-  try:
-    import pip
-    if hasattr(pip, 'main'):
-      pipcode = pip.main(['install', package])
-    else:
-      pipcode = pip._internal.main(['install', package])
-    return pipcode == 0
-  except:
-    return False
+    try:
+        import pip
+        if hasattr(pip, 'main'):
+            pipcode = pip.main(['install', package_name])
+        else:
+            pipcode = pip._internal.main(['install', package_name])
+        return pipcode == 0
+    except ImportError:
+        return False
 
 
 distro_available = False
 
 try:
-  import distro
+    import distro
 
-  distro_available = True
-except:
-  distro_available = install_package("distro")
+    distro_available = True
+except ImportError:
+    distro_available = install_package("distro")
 
 
 def get_distro():
-  if is_mac():
-    return ["osx", "", "darwin"]
-  try:
-    if distro_available:
-      return distro.linux_distribution(full_distribution_name=False)
-    else:
-      return platform.linux_distribution()
-  except:
-    return ["N/A", "N/A", "N/A"]
+    if is_mac():
+        return ["osx", "", "darwin"]
+    try:
+        if distro_available:
+            return distro.linux_distribution(full_distribution_name=False)
+        else:
+            return platform.linux_distribution()
+    except Exception:
+        return ["N/A", "N/A", "N/A"]
 
 
 def is_mac():
-  return platform.system() == "Darwin"
+    return platform.system() == "Darwin"
 
 
 def mapped_distro():
-  distro_info = get_distro()
-  distro = distro_info[0].lower()
-  release = distro_info[2].lower()
-  if any(d in distro for d in ["rhel", "red hat", "centos"]):
-    return "rhel", release
-  else:
-    return distro, release
+    distro_info = get_distro()
+    distro = distro_info[0].lower()
+    release = distro_info[2].lower()
+    if any(d in distro for d in ["rhel", "red hat", "centos"]):
+        return "rhel", release
+    else:
+        return distro, release
 
 
 def find_closest_mirror():
-  try:
-    url = urlopen(APACHE_CLOSER_REPO_JSON_URL)
-    data = json.loads(url.read().decode())
+    try:
+        url = urlopen(APACHE_CLOSER_REPO_JSON_URL)
+        data = json.loads(url.read().decode())
 
-    return data['ftp'][0]
+        return data['ftp'][0]
+    except Exception:
+        print("Failed to find closest mirror, please specify one!")
+        return ""
 
-  except Exception as e:
-    print ("Failed to find closest mirror, please specify one!")
-    return ""
 
+def get_release_and_binaries_from_ftp(host, apache_dir, version=None):
+    ftp = FTP(host)
+    ftp.login()
+    ftp.cwd(apache_dir + MINIFI_SUBFOLDER)
+    # list files with ftplib
+    file_list = list(filter(lambda x: any(char.isdigit() for char in x),
+                            ftp.nlst("")))  # to filter "." and ".." - relese names contain number
+    file_list.sort(reverse=True)
+    if not version:
+        latest_release = file_list[0]
+    else:
+        if version not in file_list:
+            print("The specified version (" + version + ") doesn't exist. Please use one of the following: " + ", ".join(file_list))
+            exit(-1)
+        latest_release = version
 
-def get_release_and_binaries_from_ftp(host, apache_dir, version = None):
-  ftp = FTP(host)
-  ftp.login()
-  ftp.cwd(apache_dir + MINIFI_SUBFOLDER)
-  # list files with ftplib
-  file_list = list(filter(lambda x: any(char.isdigit() for char in x),
-                          ftp.nlst("")))  # to filter "." and ".." - relese names contain number
-  file_list.sort(reverse=True)
-  if not version:
-    latest_release = file_list[0]
-  else:
-    if version not in file_list:
-      print("The specified version (" + version + ") doesn't exist. Please use one of the following: " + ", ".join(file_list))
-      exit(-1)
-    latest_release = version
-
-  ftp.cwd("./" + latest_release)
-  binaries = list(filter(lambda x: any(char.isdigit() for char in x), ftp.nlst("")))
+    ftp.cwd("./" + latest_release)
+    binaries = list(filter(lambda x: any(char.isdigit() for char in x), ftp.nlst("")))
 
-  ftp.quit()
+    ftp.quit()
 
-  return latest_release, binaries
+    return latest_release, binaries
 
 
 def download_binary_from_ftp(host, apache_dir, release, binary):
-  successful_download = False
+    successful_download = False
 
-  try:
-    ftp = FTP(host)
-    ftp.login()
-    ftp.cwd(apache_dir + MINIFI_SUBFOLDER + release)
+    try:
+        ftp = FTP(host)
+        ftp.login()
+        ftp.cwd(apache_dir + MINIFI_SUBFOLDER + release)
 
-    print ("Downloading: ftp://" + host + "/" + MINIFI_SUBFOLDER + release + "/" + binary)
+        print("Downloading: ftp://" + host + "/" + MINIFI_SUBFOLDER + release + "/" + binary)
 
-    with open(os.path.join(os.getcwd(), binary), "wb") as targetfile:
-      ftp.retrbinary("RETR " + binary, targetfile.write)
-    successful_download = True
-  except:
-    print("Failed to download binary")
-  finally:
-    ftp.quit()
+        with open(os.path.join(os.getcwd(), binary), "wb") as targetfile:
+            ftp.retrbinary("RETR " + binary, targetfile.write)
+        successful_download = True
+    except Exception:
+        print("Failed to download binary")
+    finally:
+        ftp.quit()
 
-  return successful_download
+    return successful_download
 
 
 def main(args):
-  print(get_distro())
-  binaries = []
-
-  try:
-    local_repo = args.mirror if args.mirror else find_closest_mirror()
-
-    print(local_repo)
-
-    host, dir = local_repo.replace('ftp://', '').split('/', 1)
-    latest_release, binaries = get_release_and_binaries_from_ftp(host, dir, args.version if args.version else None)
-
-  except:
-    print("Failed to get binaries from Apache mirror")
-    return -1
-
-  matching_binaries = []
-
-  for binary in binaries:
-    distro, release = mapped_distro()
-    if release and release in binary:
-      matching_binaries.append(binary)
-    elif distro and distro in binary:
-      matching_binaries.append(binary)
-
-  if not matching_binaries:
-    print("No compatible binary found, MiNiFi needs to be compiled locally")
-    return 1
-
-  invalid_input = True
-  download = None
-  selected_binary = None
-
-  if len(matching_binaries) == 1:
-    print("A binary in Apache repo seems to match your system: " + matching_binaries[0])
-    while invalid_input:
-      try:
-        download = strtobool(input("Would you like to download? [y/n]"))
-        invalid_input = False
-        if download:
-          selected_binary = matching_binaries[0]
-      except:
-        pass
-
-  else:
-    print("The following binaries in Apache repo seem to match your system: ")
-    for i, item in enumerate(matching_binaries):
-      print(str(i + 1) + " - " + item)
-    print()
-    while invalid_input:
-      try:
-        user_input = input("Please select one to download (1 to " + str(
-          len(matching_binaries)) + ") or \"s\" to skip and compile locally\n")
-        user_input.lower()
-        if user_input == "s":
-          invalid_input = False
-          download = False
-          break
-        idx = int(user_input) - 1
-        if (idx < 0):
-          continue
-        selected_binary = matching_binaries[idx]
-        download = True
-        invalid_input = False
-      except:
-        pass
-
-  if not download:
-    return 1
-
-  if not download_binary_from_ftp(host, dir, latest_release, selected_binary):
-    return -1
-
-  try:
-    with tarfile.open(os.path.join(os.getcwd(), selected_binary), "r:gz") as tar:
-      tar.extractall()
-  except:
-    print("Failed to extract tar file")
-    return -1
-
-  print("Successfully downloaded and extracted MiNiFi")
-  return 0
+    print(get_distro())
+    binaries = []
+
+    try:
+        local_repo = args.mirror if args.mirror else find_closest_mirror()
+
+        print(local_repo)
+
+        host, dir = local_repo.replace('ftp://', '').split('/', 1)
+        latest_release, binaries = get_release_and_binaries_from_ftp(host, dir, args.version if args.version else None)
+
+    except Exception:
+        print("Failed to get binaries from Apache mirror")
+        return -1
+
+    matching_binaries = []
+
+    for binary in binaries:
+        distro, release = mapped_distro()
+        if release and release in binary:
+            matching_binaries.append(binary)
+        elif distro and distro in binary:
+            matching_binaries.append(binary)
+
+    if not matching_binaries:
+        print("No compatible binary found, MiNiFi needs to be compiled locally")
+        return 1
+
+    invalid_input = True
+    download = None
+    selected_binary = None
+
+    if len(matching_binaries) == 1:
+        print("A binary in Apache repo seems to match your system: " + matching_binaries[0])
+        while invalid_input:
+            try:
+                download = strtobool(input("Would you like to download? [y/n]"))
+                invalid_input = False
+                if download:
+                    selected_binary = matching_binaries[0]
+            except Exception:
+                pass
+
+    else:
+        print("The following binaries in Apache repo seem to match your system: ")
+        for i, item in enumerate(matching_binaries):
+            print(str(i + 1) + " - " + item)
+        print()
+        while invalid_input:
+            try:
+                user_input = input("Please select one to download (1 to " + str(len(matching_binaries)) + ") or \"s\" to skip and compile locally\n")
+                user_input.lower()
+                if user_input == "s":
+                    invalid_input = False
+                    download = False
+                    break
+                idx = int(user_input) - 1
+                if (idx < 0):
+                    continue
+                selected_binary = matching_binaries[idx]
+                download = True
+                invalid_input = False
+            except Exception:
+                pass
+
+    if not download:
+        return 1
+
+    if not download_binary_from_ftp(host, dir, latest_release, selected_binary):
+        return -1
+
+    try:
+        with tarfile.open(os.path.join(os.getcwd(), selected_binary), "r:gz") as tar:
+            tar.extractall()
+    except Exception:
+        print("Failed to extract tar file")
+        return -1
+
+    print("Successfully downloaded and extracted MiNiFi")
+    return 0
 
 
 if __name__ == '__main__':
-  parser = argparse.ArgumentParser(description="Download latest MiNiFi release")
-  parser.add_argument("-m", "--mirror", dest="mirror", help="user-specified apache mirror")
-  parser.add_argument("-v", "--version", dest="version", help="user-specified version to be downloaded")
-  args = parser.parse_args()
-  main(args)
+    parser = argparse.ArgumentParser(description="Download latest MiNiFi release")
+    parser.add_argument("-m", "--mirror", dest="mirror", help="user-specified apache mirror")
+    parser.add_argument("-v", "--version", dest="version", help="user-specified version to be downloaded")
+    args = parser.parse_args()
+    main(args)
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index 6bb5151..d8f9eec 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -1,10 +1,5 @@
-from subprocess import Popen, PIPE, STDOUT
-
 import docker
 import logging
-import os
-import shutil
-import threading
 import time
 import uuid
 
@@ -21,6 +16,7 @@ from minifi.validators.NoFileOutPutValidator import NoFileOutPutValidator
 from minifi.validators.SingleFileOutputValidator import SingleFileOutputValidator
 from minifi.validators.MultiFileOutputValidator import MultiFileOutputValidator
 
+
 class MiNiFi_integration_test():
     def __init__(self, context):
         self.test_id = str(uuid.uuid4())
@@ -55,7 +51,6 @@ class MiNiFi_integration_test():
             del cluster
 
         # The cluster deleter is not reliable for cleaning up
-        docker_client = docker.from_env()
         for container_id in container_ids:
             self.delete_docker_container_by_id(container_id)
 
diff --git a/docker/test/integration/environment.py b/docker/test/integration/environment.py
index 4a3c12f..46c68c3 100644
--- a/docker/test/integration/environment.py
+++ b/docker/test/integration/environment.py
@@ -1,14 +1,16 @@
 from behave import fixture, use_fixture
+import logging
 import sys
 sys.path.append('../minifi')
-import logging
 
-from MiNiFi_integration_test_driver import MiNiFi_integration_test
-from minifi import *
+from MiNiFi_integration_test_driver import MiNiFi_integration_test  # noqa: E402
+from minifi import *  # noqa
+
 
 def raise_exception(exception):
     raise exception
 
+
 @fixture
 def test_driver_fixture(context):
     context.test = MiNiFi_integration_test(context)
@@ -16,11 +18,14 @@ def test_driver_fixture(context):
     logging.info("Integration test teardown...")
     del context.test
 
+
 def before_scenario(context, scenario):
     use_fixture(test_driver_fixture, context)
 
+
 def after_scenario(context, scenario):
-	pass
+    pass
+
 
 def before_all(context):
     context.config.setup_logging()
diff --git a/docker/test/integration/minifi/core/Connectable.py b/docker/test/integration/minifi/core/Connectable.py
index 6af3a7d..3bebc81 100644
--- a/docker/test/integration/minifi/core/Connectable.py
+++ b/docker/test/integration/minifi/core/Connectable.py
@@ -1,5 +1,5 @@
 import uuid
-from copy import copy
+
 
 class Connectable(object):
     def __init__(self,
diff --git a/docker/test/integration/minifi/core/ControllerService.py b/docker/test/integration/minifi/core/ControllerService.py
index 6263ff5..f46b500 100644
--- a/docker/test/integration/minifi/core/ControllerService.py
+++ b/docker/test/integration/minifi/core/ControllerService.py
@@ -1,6 +1,7 @@
 import uuid
 import logging
 
+
 class ControllerService(object):
     def __init__(self, name=None, properties=None):
 
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
index 8eceb67..6546666 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -1,15 +1,13 @@
 import json
 import logging
-import os
-import shutil
 import subprocess
 import sys
 import time
-import uuid
 
 from .SingleNodeDockerCluster import SingleNodeDockerCluster
 from .utils import retry_check
 
+
 class DockerTestCluster(SingleNodeDockerCluster):
     def __init__(self):
         self.segfault = False
@@ -101,13 +99,10 @@ class DockerTestCluster(SingleNodeDockerCluster):
 
     def check_http_proxy_access(self, url):
         output = subprocess.check_output(["docker", "exec", "http-proxy", "cat", "/var/log/squid/access.log"]).decode(self.get_stdout_encoding())
-        print(output)
-        print(output.count("TCP_DENIED/407"))
-        print(output.count("TCP_MISS"))
-        return url in output and \
-            ((output.count("TCP_DENIED/407") != 0 and \
-              output.count("TCP_MISS") == output.count("TCP_DENIED/407")) or \
-             output.count("TCP_DENIED/407") == 0 and "TCP_MISS" in output)
+        return url in output \
+            and ((output.count("TCP_DENIED/407") != 0
+                  and output.count("TCP_MISS") == output.count("TCP_DENIED/407"))
+                 or output.count("TCP_DENIED/407") == 0 and "TCP_MISS" in output)
 
     @retry_check()
     def check_s3_server_object_data(self, test_data):
diff --git a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
index b0d0397..c1dd3c9 100644
--- a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
+++ b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
@@ -2,6 +2,7 @@ import logging
 import os
 import shutil
 
+
 class DockerTestDirectoryBindings:
     def __init__(self):
         self.data_directories = {}
@@ -19,7 +20,7 @@ class DockerTestDirectoryBindings:
         [self.create_directory(directory) for directory in self.data_directories[test_id].values()]
 
         # Add resources
-        test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on DockerVerify.sh
+        test_dir = os.environ['PYTHONPATH'].split(':')[-1]  # Based on DockerVerify.sh
         shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs", self.data_directories[test_id]["resources_dir"] + "/certs")
 
     def get_data_directories(self, test_id):
diff --git a/docker/test/integration/minifi/core/FileSystemObserver.py b/docker/test/integration/minifi/core/FileSystemObserver.py
index 70ead37..f73c5a3 100644
--- a/docker/test/integration/minifi/core/FileSystemObserver.py
+++ b/docker/test/integration/minifi/core/FileSystemObserver.py
@@ -2,11 +2,10 @@ import logging
 import time
 from threading import Event
 
-from watchdog.events import FileSystemEventHandler
 from watchdog.observers import Observer
 
 from .OutputEventHandler import OutputEventHandler
-from ..validators.FileOutputValidator import FileOutputValidator
+
 
 class FileSystemObserver(object):
     def __init__(self, test_output_dir):
diff --git a/docker/test/integration/minifi/core/InputPort.py b/docker/test/integration/minifi/core/InputPort.py
index 1f00b7f..cb6b5f1 100644
--- a/docker/test/integration/minifi/core/InputPort.py
+++ b/docker/test/integration/minifi/core/InputPort.py
@@ -1,5 +1,6 @@
 from .Connectable import Connectable
 
+
 class InputPort(Connectable):
     def __init__(self, name=None, remote_process_group=None):
         super(InputPort, self).__init__(name=name)
diff --git a/docker/test/integration/minifi/core/OutputEventHandler.py b/docker/test/integration/minifi/core/OutputEventHandler.py
index f505695..d6986fe 100644
--- a/docker/test/integration/minifi/core/OutputEventHandler.py
+++ b/docker/test/integration/minifi/core/OutputEventHandler.py
@@ -2,6 +2,7 @@ import logging
 
 from watchdog.events import FileSystemEventHandler
 
+
 class OutputEventHandler(FileSystemEventHandler):
     def __init__(self, done_event):
         self.done_event = done_event
diff --git a/docker/test/integration/minifi/core/Processor.py b/docker/test/integration/minifi/core/Processor.py
index ddc2a9d..8b7eb10 100644
--- a/docker/test/integration/minifi/core/Processor.py
+++ b/docker/test/integration/minifi/core/Processor.py
@@ -1,6 +1,5 @@
 from .Connectable import Connectable
 
-import logging
 
 class Processor(Connectable):
     def __init__(self,
diff --git a/docker/test/integration/minifi/core/RemoteProcessGroup.py b/docker/test/integration/minifi/core/RemoteProcessGroup.py
index 6132ad4..c4cafc1 100644
--- a/docker/test/integration/minifi/core/RemoteProcessGroup.py
+++ b/docker/test/integration/minifi/core/RemoteProcessGroup.py
@@ -1,5 +1,6 @@
 import uuid
 
+
 class RemoteProcessGroup(object):
     def __init__(self, url, name=None):
         self.uuid = uuid.uuid4()
@@ -11,9 +12,8 @@ class RemoteProcessGroup(object):
 
         self.url = url
 
-
     def get_name(self):
-    	return self.name
+        return self.name
 
     def get_uuid(self):
-    	return self.uuid
+        return self.uuid
diff --git a/docker/test/integration/minifi/core/SSLContextService.py b/docker/test/integration/minifi/core/SSLContextService.py
index 5866508..8e8dc40 100644
--- a/docker/test/integration/minifi/core/SSLContextService.py
+++ b/docker/test/integration/minifi/core/SSLContextService.py
@@ -1,5 +1,6 @@
 from .ControllerService import ControllerService
 
+
 class SSLContextService(ControllerService):
     def __init__(self, name=None, cert=None, key=None, ca_cert=None):
         super(SSLContextService, self).__init__(name=name)
diff --git a/docker/test/integration/minifi/core/SSL_cert_utils.py b/docker/test/integration/minifi/core/SSL_cert_utils.py
index c2461c3..81bae9b 100644
--- a/docker/test/integration/minifi/core/SSL_cert_utils.py
+++ b/docker/test/integration/minifi/core/SSL_cert_utils.py
@@ -3,6 +3,7 @@ import logging
 
 from M2Crypto import X509, EVP, RSA, ASN1
 
+
 def gen_cert():
     """
     Generate TLS certificate request for testing
@@ -32,9 +33,11 @@ def gen_cert():
 
     return cert, key
 
+
 def rsa_gen_key_callback():
     pass
 
+
 def gen_req():
     """
     Generate TLS certificate request for testing
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index 083340f..e33b57e 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -3,7 +3,6 @@ import docker
 import logging
 import os
 import tarfile
-import time
 import uuid
 
 from collections import OrderedDict
@@ -14,6 +13,7 @@ from .Cluster import Cluster
 from ..flow_serialization.Minifi_flow_yaml_serializer import Minifi_flow_yaml_serializer
 from ..flow_serialization.Nifi_flow_xml_serializer import Nifi_flow_xml_serializer
 
+
 class SingleNodeDockerCluster(Cluster):
     """
     A "cluster" which consists of a single docker node. Useful for
@@ -125,8 +125,7 @@ class SingleNodeDockerCluster(Cluster):
                 ADD config.yml {minifi_root}/conf/config.yml
                 RUN chown minificpp:minificpp {minifi_root}/conf/config.yml
                 USER minificpp
-                """.format(name=self.name,hostname=self.name,
-                           base_image='apacheminificpp:' + self.minifi_version,
+                """.format(base_image='apacheminificpp:' + self.minifi_version,
                            minifi_root=self.minifi_root))
 
         serializer = Minifi_flow_yaml_serializer()
@@ -154,11 +153,11 @@ class SingleNodeDockerCluster(Cluster):
             conf_file_buffer.close()
 
         container = self.client.containers.run(
-                configured_image[0],
-                detach=True,
-                name=self.name,
-                network=self.network.name,
-                volumes=self.vols)
+            configured_image[0],
+            detach=True,
+            name=self.name,
+            network=self.network.name,
+            volumes=self.vols)
         self.network.reload()
 
         logging.info('Started container \'%s\'', container.name)
@@ -205,12 +204,12 @@ class SingleNodeDockerCluster(Cluster):
         logging.info('Creating and running docker container for flow...')
 
         container = self.client.containers.run(
-                configured_image[0],
-                detach=True,
-                name=self.name,
-                hostname=self.name,
-                network=self.network.name,
-                volumes=self.vols)
+            configured_image[0],
+            detach=True,
+            name=self.name,
+            hostname=self.name,
+            network=self.network.name,
+            volumes=self.vols)
 
         logging.info('Started container \'%s\'', container.name)
 
@@ -219,24 +218,22 @@ class SingleNodeDockerCluster(Cluster):
     def deploy_kafka_broker(self):
         logging.info('Creating and running docker containers for kafka broker...')
         zookeeper = self.client.containers.run(
-                    self.client.images.pull("wurstmeister/zookeeper:latest"),
-                    detach=True,
-                    name='zookeeper',
-                    network=self.network.name,
-                    ports={'2181/tcp': 2181},
-                    )
+            self.client.images.pull("wurstmeister/zookeeper:latest"),
+            detach=True,
+            name='zookeeper',
+            network=self.network.name,
+            ports={'2181/tcp': 2181})
         self.containers[zookeeper.name] = zookeeper
 
-        test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on DockerVerify.sh
+        test_dir = os.environ['PYTHONPATH'].split(':')[-1]  # Based on DockerVerify.sh
         broker_image = self.build_image_by_path(test_dir + "/resources/kafka_broker", 'minifi-kafka')
         broker = self.client.containers.run(
-                    broker_image[0],
-                    detach=True,
-                    name='kafka-broker',
-                    network=self.network.name,
-                    ports={'9092/tcp': 9092},
-                    environment=["KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093", "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"],
-                    )
+            broker_image[0],
+            detach=True,
+            name='kafka-broker',
+            network=self.network.name,
+            ports={'9092/tcp': 9092},
+            environment=["KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093", "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"])
         self.containers[broker.name] = broker
 
         dockerfile = dedent("""FROM {base_image}
@@ -245,11 +242,10 @@ class SingleNodeDockerCluster(Cluster):
                 """.format(base_image='wurstmeister/kafka:2.12-2.5.0'))
         configured_image = self.build_image(dockerfile, [])
         consumer = self.client.containers.run(
-                    configured_image[0],
-                    detach=True,
-                    name='kafka-consumer',
-                    network=self.network.name,
-                    )
+            configured_image[0],
+            detach=True,
+            name='kafka-consumer',
+            network=self.network.name)
         self.containers[consumer.name] = consumer
 
     def deploy_http_proxy(self):
@@ -266,33 +262,30 @@ class SingleNodeDockerCluster(Cluster):
                 """.format(base_image='sameersbn/squid:3.5.27-2', proxy_username='admin', proxy_password='test101', proxy_port='3128'))
         configured_image = self.build_image(dockerfile, [])
         consumer = self.client.containers.run(
-                    configured_image[0],
-                    detach=True,
-                    name='http-proxy',
-                    network=self.network.name,
-                    ports={'3128/tcp': 3128},
-                    )
+            configured_image[0],
+            detach=True,
+            name='http-proxy',
+            network=self.network.name,
+            ports={'3128/tcp': 3128})
         self.containers[consumer.name] = consumer
 
     def deploy_s3_server(self):
         server = self.client.containers.run(
-                    "adobe/s3mock:2.1.28",
-                    detach=True,
-                    name='s3-server',
-                    network=self.network.name,
-                    ports={'9090/tcp': 9090, '9191/tcp': 9191},
-                    environment=["initialBuckets=test_bucket"],
-                    )
+            "adobe/s3mock:2.1.28",
+            detach=True,
+            name='s3-server',
+            network=self.network.name,
+            ports={'9090/tcp': 9090, '9191/tcp': 9191},
+            environment=["initialBuckets=test_bucket"])
         self.containers[server.name] = server
 
     def deploy_azure_storage_server(self):
         server = self.client.containers.run(
-                    "mcr.microsoft.com/azure-storage/azurite",
-                    detach=True,
-                    name='azure-storage-server',
-                    network=self.network.name,
-                    ports={'10000/tcp': 10000, '10001/tcp': 10001},
-                    )
+            "mcr.microsoft.com/azure-storage/azurite",
+            detach=True,
+            name='azure-storage-server',
+            network=self.network.name,
+            ports={'10000/tcp': 10000, '10001/tcp': 10001})
         self.containers[server.name] = server
 
     def build_image(self, dockerfile, context_files):
diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
index c7a9e0d..9220825 100644
--- a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
+++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
@@ -4,6 +4,7 @@ import yaml
 from ..core.Processor import Processor
 from ..core.InputPort import InputPort
 
+
 class Minifi_flow_yaml_serializer:
     def serialize(self, connectable, root=None, visited=None):
         if visited is None:
diff --git a/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py b/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py
index 22fa098..0f3d62c 100644
--- a/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py
+++ b/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py
@@ -6,6 +6,7 @@ from xml.etree.cElementTree import Element
 from ..core.Processor import Processor
 from ..core.InputPort import InputPort
 
+
 class Nifi_flow_xml_serializer:
     def serialize(self, connectable, nifi_version=None, root=None, visited=None):
         if visited is None:
@@ -76,7 +77,7 @@ class Nifi_flow_xml_serializer:
             input_port_max_concurrent_tasks = Element('maxConcurrentTasks')
             input_port_max_concurrent_tasks.text = '1'
             input_port.append(input_port_max_concurrent_tasks)
-            next( res.iterfind('rootGroup') ).append(input_port)
+            next(res.iterfind('rootGroup')).append(input_port)
 
         if isinstance(connectable, Processor):
             conn_destination = Element('processor')
@@ -165,7 +166,7 @@ class Nifi_flow_xml_serializer:
                 proc_auto_terminated_relationship = Element('autoTerminatedRelationship')
                 proc_auto_terminated_relationship.text = auto_terminate_rel
                 conn_destination.append(proc_auto_terminated_relationship)
-            next( res.iterfind('rootGroup') ).append(conn_destination)
+            next(res.iterfind('rootGroup')).append(conn_destination)
             """ res.iterfind('rootGroup').next().append(conn_destination) """
 
             for svc in connectable.controller_services:
@@ -214,7 +215,7 @@ class Nifi_flow_xml_serializer:
                     controller_service_property_value.text = property_value
                     controller_service_property.append(controller_service_property_value)
                     controller_service.append(controller_service_property)
-                next( res.iterfind('rootGroup') ).append(controller_service)
+                next(res.iterfind('rootGroup')).append(controller_service)
                 """ res.iterfind('rootGroup').next().append(controller_service)"""
 
         for conn_name in connectable.connections:
@@ -222,36 +223,36 @@ class Nifi_flow_xml_serializer:
 
             if isinstance(conn_destinations, list):
                 for conn_destination in conn_destinations:
-                    connection = self.build_nifi_flow_xml_connection_element(res,
-                                                          bend_points,
-                                                          conn_name,
-                                                          connectable,
-                                                          label_index,
-                                                          conn_destination,
-                                                          z_index)
-                    next( res.iterfind('rootGroup') ).append(connection)
+                    connection = self.build_nifi_flow_xml_connection_element(
+                        res,
+                        bend_points,
+                        conn_name,
+                        connectable,
+                        label_index,
+                        conn_destination,
+                        z_index)
+                    next(res.iterfind('rootGroup')).append(connection)
                     """ res.iterfind('rootGroup').next().append(connection) """
 
                     if conn_destination not in visited:
                         self.serialize(conn_destination, nifi_version, res, visited)
             else:
-                connection = self.build_nifi_flow_xml_connection_element(res,
-                                                      bend_points,
-                                                      conn_name,
-                                                      connectable,
-                                                      label_index,
-                                                      conn_destinations,
-                                                      z_index)
-                next( res.iterfind('rootGroup') ).append(connection)
+                connection = self.build_nifi_flow_xml_connection_element(
+                    res,
+                    bend_points,
+                    conn_name,
+                    connectable,
+                    label_index,
+                    conn_destinations,
+                    z_index)
+                next(res.iterfind('rootGroup')).append(connection)
                 """ res.iterfind('rootGroup').next().append(connection) """
 
                 if conn_destinations not in visited:
                     self.serialize(conn_destinations, nifi_version, res, visited)
 
         if root is None:
-            return ('<?xml version="1.0" encoding="UTF-8" standalone="no"?>'
-                    + "\n"
-                    + elementTree.tostring(res, encoding='utf-8').decode('utf-8'))
+            return ('<?xml version="1.0" encoding="UTF-8" standalone="no"?>' + "\n" + elementTree.tostring(res, encoding='utf-8').decode('utf-8'))
 
     def build_nifi_flow_xml_connection_element(self, res, bend_points, conn_name, connectable, label_index, destination, z_index):
         connection = Element('connection')
@@ -272,7 +273,7 @@ class Nifi_flow_xml_serializer:
         connection.append(connection_source_id)
 
         connection_source_group_id = Element('sourceGroupId')
-        connection_source_group_id.text = next( res.iterfind('rootGroup/id') ).text
+        connection_source_group_id.text = next(res.iterfind('rootGroup/id')).text
         """connection_source_group_id.text = res.iterfind('rootGroup/id').next().text"""
         connection.append(connection_source_group_id)
 
@@ -321,4 +322,3 @@ class Nifi_flow_xml_serializer:
         connection.append(connection_flow_file_expiration)
 
         return connection
-
diff --git a/docker/test/integration/minifi/processors/DeleteS3Object.py b/docker/test/integration/minifi/processors/DeleteS3Object.py
index 003a9cd..edfe8fc 100644
--- a/docker/test/integration/minifi/processors/DeleteS3Object.py
+++ b/docker/test/integration/minifi/processors/DeleteS3Object.py
@@ -1,13 +1,16 @@
 from ..core.Processor import Processor
 
+
 class DeleteS3Object(Processor):
-    def __init__(self,
-        proxy_host = '',
-        proxy_port = '',
-        proxy_username = '',
-        proxy_password = ''):
-            super(DeleteS3Object, self).__init__('DeleteS3Object',
-            properties = {
+    def __init__(
+            self,
+            proxy_host='',
+            proxy_port='',
+            proxy_username='',
+            proxy_password=''):
+        super(DeleteS3Object, self).__init__(
+            'DeleteS3Object',
+            properties={
                 'Object Key': 'test_object_key',
                 'Bucket': 'test_bucket',
                 'Access Key': 'test_access_key',
@@ -19,4 +22,3 @@ class DeleteS3Object(Processor):
                 'Proxy Password': proxy_password,
             },
             auto_terminate=['success'])
-
diff --git a/docker/test/integration/minifi/processors/GenerateFlowFile.py b/docker/test/integration/minifi/processors/GenerateFlowFile.py
index 65af6cf..e9ebc5f 100644
--- a/docker/test/integration/minifi/processors/GenerateFlowFile.py
+++ b/docker/test/integration/minifi/processors/GenerateFlowFile.py
@@ -1,7 +1,9 @@
 from ..core.Processor import Processor
 
+
 class GenerateFlowFile(Processor):
     def __init__(self, schedule={'scheduling period': '2 sec'}):
-        super(GenerateFlowFile, self).__init__('GenerateFlowFile',
-			schedule=schedule,
-			auto_terminate=['success'])
+        super(GenerateFlowFile, self).__init__(
+            'GenerateFlowFile',
+            schedule=schedule,
+            auto_terminate=['success'])
diff --git a/docker/test/integration/minifi/processors/GetFile.py b/docker/test/integration/minifi/processors/GetFile.py
index 23a575d..70d7bb7 100644
--- a/docker/test/integration/minifi/processors/GetFile.py
+++ b/docker/test/integration/minifi/processors/GetFile.py
@@ -1,8 +1,13 @@
 from ..core.Processor import Processor
 
+
 class GetFile(Processor):
-	def __init__(self, input_dir ="/tmp/input", schedule={'scheduling period': '2 sec'}):
-		super(GetFile, self).__init__('GetFile',
-			properties={'Input Directory': input_dir, 'Keep Source File': 'true'},
-			schedule=schedule,
-			auto_terminate=['success'])
+    def __init__(self, input_dir="/tmp/input", schedule={'scheduling period': '2 sec'}):
+        super(GetFile, self).__init__(
+            'GetFile',
+            properties={
+                'Input Directory': input_dir,
+                'Keep Source File': 'true'
+            },
+            schedule=schedule,
+            auto_terminate=['success'])
diff --git a/docker/test/integration/minifi/processors/HashContent.py b/docker/test/integration/minifi/processors/HashContent.py
index f88f05f..9d15a51 100644
--- a/docker/test/integration/minifi/processors/HashContent.py
+++ b/docker/test/integration/minifi/processors/HashContent.py
@@ -1,8 +1,10 @@
 from ..core.Processor import Processor
 
+
 class HashContent(Processor):
-	def __init__(self, schedule={"scheduling period": "2 sec"}):
-		super(HashContent, self).__init__("HashContent",
-			properties={"Hash Attribute": "hash"},
-			schedule=schedule,
-			auto_terminate=["success", "failure"])
+    def __init__(self, schedule={"scheduling period": "2 sec"}):
+        super(HashContent, self).__init__(
+            "HashContent",
+            properties={"Hash Attribute": "hash"},
+            schedule=schedule,
+            auto_terminate=["success", "failure"])
diff --git a/docker/test/integration/minifi/processors/InvokeHTTP.py b/docker/test/integration/minifi/processors/InvokeHTTP.py
index a19688e..43355e9 100644
--- a/docker/test/integration/minifi/processors/InvokeHTTP.py
+++ b/docker/test/integration/minifi/processors/InvokeHTTP.py
@@ -1,24 +1,28 @@
 from ..core.Processor import Processor
 
+
 class InvokeHTTP(Processor):
-    def __init__(self,
-        ssl_context_service=None,
-        schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
-            properties = {
-                "Proxy Host": "",
-                "Proxy Port": "",
-                "invokehttp-proxy-username": "",
-                "invokehttp-proxy-password": "" }
+    def __init__(
+            self,
+            ssl_context_service=None,
+            schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+        properties = {
+            "Proxy Host": "",
+            "Proxy Port": "",
+            "invokehttp-proxy-username": "",
+            "invokehttp-proxy-password": ""
+        }
 
-            controller_services = []
+        controller_services = []
 
-            if ssl_context_service is not None:
-                properties['SSL Context Service'] = ssl_context_service.name
-                controller_services.append(ssl_context_service)
+        if ssl_context_service is not None:
+            properties['SSL Context Service'] = ssl_context_service.name
+            controller_services.append(ssl_context_service)
 
-            super(InvokeHTTP, self).__init__('InvokeHTTP',
-                properties = properties,
-                controller_services = controller_services,
-                auto_terminate = ['success', 'response', 'retry', 'failure', 'no retry'],
-                schedule = schedule)
-            self.out_proc.connect({"failure": self})
+        super(InvokeHTTP, self).__init__(
+            'InvokeHTTP',
+            properties=properties,
+            controller_services=controller_services,
+            auto_terminate=['success', 'response', 'retry', 'failure', 'no retry'],
+            schedule=schedule)
+        self.out_proc.connect({"failure": self})
diff --git a/docker/test/integration/minifi/processors/ListenHTTP.py b/docker/test/integration/minifi/processors/ListenHTTP.py
index 7eadc92..c4efb9b 100644
--- a/docker/test/integration/minifi/processors/ListenHTTP.py
+++ b/docker/test/integration/minifi/processors/ListenHTTP.py
@@ -1,5 +1,6 @@
 from ..core.Processor import Processor
 
+
 class ListenHTTP(Processor):
     def __init__(self, cert=None, schedule=None):
         properties = {}
@@ -8,7 +9,8 @@ class ListenHTTP(Processor):
             properties['SSL Certificate'] = cert
             properties['SSL Verify Peer'] = 'no'
 
-        super(ListenHTTP, self).__init__('ListenHTTP',
-			properties=properties,
-			auto_terminate=['success'],
-			schedule=schedule)
+        super(ListenHTTP, self).__init__(
+            'ListenHTTP',
+            properties=properties,
+            auto_terminate=['success'],
+            schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/LogAttribute.py b/docker/test/integration/minifi/processors/LogAttribute.py
index 1be4c38..3f824d7 100644
--- a/docker/test/integration/minifi/processors/LogAttribute.py
+++ b/docker/test/integration/minifi/processors/LogAttribute.py
@@ -1,7 +1,9 @@
 from ..core.Processor import Processor
 
+
 class LogAttribute(Processor):
     def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
-        super(LogAttribute, self).__init__('LogAttribute',
-			auto_terminate=['success'],
-			schedule=schedule)
+        super(LogAttribute, self).__init__(
+            'LogAttribute',
+            auto_terminate=['success'],
+            schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/PublishKafka.py b/docker/test/integration/minifi/processors/PublishKafka.py
index 698cd74..4cc1ce6 100644
--- a/docker/test/integration/minifi/processors/PublishKafka.py
+++ b/docker/test/integration/minifi/processors/PublishKafka.py
@@ -1,10 +1,18 @@
 from ..core.Processor import Processor
 
+
 class PublishKafka(Processor):
     def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
-        super(PublishKafka, self).__init__('PublishKafka',
-			properties={'Client Name': 'nghiaxlee', 'Known Brokers': 'kafka-broker:9092', 'Topic Name': 'test',
-				'Batch Size': '10', 'Compress Codec': 'none', 'Delivery Guarantee': '1',
-				'Request Timeout': '10 sec', 'Message Timeout': '12 sec'},
-			auto_terminate=['success'],
-			schedule=schedule)
+        super(PublishKafka, self).__init__(
+            'PublishKafka',
+            properties={
+                'Client Name': 'nghiaxlee',
+                'Known Brokers': 'kafka-broker:9092',
+                'Topic Name': 'test',
+                'Batch Size': '10',
+                'Compress Codec': 'none',
+                'Delivery Guarantee': '1',
+                'Request Timeout': '10 sec',
+                'Message Timeout': '12 sec'},
+            auto_terminate=['success'],
+            schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/PutFile.py b/docker/test/integration/minifi/processors/PutFile.py
index db307dd..9428b7b 100644
--- a/docker/test/integration/minifi/processors/PutFile.py
+++ b/docker/test/integration/minifi/processors/PutFile.py
@@ -1,8 +1,10 @@
 from ..core.Processor import Processor
 
+
 class PutFile(Processor):
     def __init__(self, output_dir="/tmp/output", schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
-        super(PutFile, self).__init__('PutFile',
+        super(PutFile, self).__init__(
+            'PutFile',
             properties={'Directory': output_dir, 'Directory Permissions': '777', 'Permissions': '777'},
             auto_terminate=['success', 'failure'],
             schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/PutS3Object.py b/docker/test/integration/minifi/processors/PutS3Object.py
index bd5fc84..f78dcbb 100644
--- a/docker/test/integration/minifi/processors/PutS3Object.py
+++ b/docker/test/integration/minifi/processors/PutS3Object.py
@@ -2,14 +2,16 @@ from ..core.Processor import Processor
 
 
 class PutS3Object(Processor):
-    def __init__(self,
-        object_key='test_object_key',
-        proxy_host='',
-        proxy_port='',
-        proxy_username='',
-        proxy_password=''):
-            super(PutS3Object, self).__init__('PutS3Object',
-            properties = {
+    def __init__(
+            self,
+            object_key='test_object_key',
+            proxy_host='',
+            proxy_port='',
+            proxy_username='',
+            proxy_password=''):
+        super(PutS3Object, self).__init__(
+            'PutS3Object',
+            properties={
                 'Object Key': object_key,
                 'Bucket': 'test_bucket',
                 'Access Key': 'test_access_key',
@@ -18,5 +20,6 @@ class PutS3Object(Processor):
                 'Proxy Host': proxy_host,
                 'Proxy Port': proxy_port,
                 'Proxy Username': proxy_username,
-                'Proxy Password': proxy_password },
-            auto_terminate = ["success", "failure"])
+                'Proxy Password': proxy_password
+            },
+            auto_terminate=["success", "failure"])
diff --git a/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py b/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py
index 689cf7d..07eac6e 100644
--- a/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py
+++ b/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py
@@ -5,6 +5,7 @@ from os import listdir
 
 from .FileOutputValidator import FileOutputValidator
 
+
 class EmptyFilesOutPutValidator(FileOutputValidator):
 
     """
@@ -22,6 +23,6 @@ class EmptyFilesOutPutValidator(FileOutputValidator):
         logging.info("Output folder: %s", full_dir)
         listing = listdir(full_dir)
         if listing:
-            self.valid = all(os.path.getsize(os.path.join(full_dir,x)) == 0 for x in listing)
+            self.valid = all(os.path.getsize(os.path.join(full_dir, x)) == 0 for x in listing)
 
         return self.valid
diff --git a/docker/test/integration/minifi/validators/FileOutputValidator.py b/docker/test/integration/minifi/validators/FileOutputValidator.py
index d558c43..caaf789 100644
--- a/docker/test/integration/minifi/validators/FileOutputValidator.py
+++ b/docker/test/integration/minifi/validators/FileOutputValidator.py
@@ -1,5 +1,6 @@
 from .OutputValidator import OutputValidator
 
+
 class FileOutputValidator(OutputValidator):
     def set_output_dir(self, output_dir):
         self.output_dir = output_dir
diff --git a/docker/test/integration/minifi/validators/MultiFileOutputValidator.py b/docker/test/integration/minifi/validators/MultiFileOutputValidator.py
index e97829e..05b439d 100644
--- a/docker/test/integration/minifi/validators/MultiFileOutputValidator.py
+++ b/docker/test/integration/minifi/validators/MultiFileOutputValidator.py
@@ -1,6 +1,5 @@
 import logging
 import os
-import subprocess
 
 from os import listdir
 from os.path import join
@@ -38,21 +37,19 @@ class MultiFileOutputValidator(FileOutputValidator):
             if not os.path.isfile(full_path):
                 return self.valid
 
-            with open(full_path, 'r') as out_file:
-                contents = out_file.read()
-                logging.info("dir %s -- name %s", full_dir, out_file_name)
-                logging.info("expected file count %d -- current file count %d", self.expected_file_count, len(self.file_timestamps))
+            logging.info("dir %s -- name %s", full_dir, out_file_name)
+            logging.info("expected file count %d -- current file count %d", self.expected_file_count, len(self.file_timestamps))
 
-                if full_path in self.file_timestamps and self.file_timestamps[full_path] != os.path.getmtime(full_path):
-                    logging.error("Last modified timestamp changed for %s", full_path)
-                    self.valid = False
-                    return self.valid
+            if full_path in self.file_timestamps and self.file_timestamps[full_path] != os.path.getmtime(full_path):
+                logging.error("Last modified timestamp changed for %s", full_path)
+                self.valid = False
+                return self.valid
 
-                self.file_timestamps[full_path] = os.path.getmtime(full_path)
-                logging.info("New file added %s", full_path)
+            self.file_timestamps[full_path] = os.path.getmtime(full_path)
+            logging.info("New file added %s", full_path)
 
-                if len(self.file_timestamps) == self.expected_file_count:
-                    self.valid = True
-                    return self.valid
+            if len(self.file_timestamps) == self.expected_file_count:
+                self.valid = True
+                return self.valid
 
         return self.valid
diff --git a/docker/test/integration/minifi/validators/NoFileOutPutValidator.py b/docker/test/integration/minifi/validators/NoFileOutPutValidator.py
index f60a008..7383fc0 100644
--- a/docker/test/integration/minifi/validators/NoFileOutPutValidator.py
+++ b/docker/test/integration/minifi/validators/NoFileOutPutValidator.py
@@ -1,9 +1,10 @@
 import logging
 
-from os import listdir
 
+from os import listdir
 from .FileOutputValidator import FileOutputValidator
 
+
 class NoFileOutPutValidator(FileOutputValidator):
     """
     Validates if no flowfiles were transferred
diff --git a/docker/test/integration/minifi/validators/OutputValidator.py b/docker/test/integration/minifi/validators/OutputValidator.py
index c05d5fa..9fdb4b4 100644
--- a/docker/test/integration/minifi/validators/OutputValidator.py
+++ b/docker/test/integration/minifi/validators/OutputValidator.py
@@ -9,4 +9,3 @@ class OutputValidator(object):
         Return True if output is valid; False otherwise.
         """
         raise NotImplementedError("validate function needs to be implemented for validators")
-
diff --git a/docker/test/integration/minifi/validators/SegfaultValidator.py b/docker/test/integration/minifi/validators/SegfaultValidator.py
index ee0227d..3dae0f7 100644
--- a/docker/test/integration/minifi/validators/SegfaultValidator.py
+++ b/docker/test/integration/minifi/validators/SegfaultValidator.py
@@ -1,5 +1,6 @@
 from .OutputValidator import OutputValidator
 
+
 class SegfaultValidator(OutputValidator):
     """
     Validate that a file was received.
diff --git a/docker/test/integration/minifi/validators/SingleFileOutputValidator.py b/docker/test/integration/minifi/validators/SingleFileOutputValidator.py
index 7466b41..4f53bf7 100644
--- a/docker/test/integration/minifi/validators/SingleFileOutputValidator.py
+++ b/docker/test/integration/minifi/validators/SingleFileOutputValidator.py
@@ -6,6 +6,7 @@ from os.path import join
 
 from .FileOutputValidator import FileOutputValidator
 
+
 class SingleFileOutputValidator(FileOutputValidator):
     """
     Validates the content of a single file in the given directory.
@@ -26,8 +27,8 @@ class SingleFileOutputValidator(FileOutputValidator):
 
         listing = listdir(full_dir)
         if listing:
-            for l in listing:
-                logging.info("name:: %s", l)
+            for listed in listing:
+                logging.info("name:: %s", listed)
             out_file_name = listing[0]
             full_path = join(full_dir, out_file_name)
             if not os.path.isfile(full_path):
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 53742a9..a78dc51 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -1,41 +1,30 @@
-from MiNiFi_integration_test_driver import MiNiFi_integration_test
-
-from minifi.core.DockerTestCluster import DockerTestCluster
 from minifi.core.FileSystemObserver import FileSystemObserver
 from minifi.core.RemoteProcessGroup import RemoteProcessGroup
-from minifi.core.InputPort import InputPort
 from minifi.core.SSLContextService import SSLContextService
-from minifi.core.SSL_cert_utils import gen_cert, gen_req, rsa_gen_key_callback
+from minifi.core.SSL_cert_utils import gen_cert, rsa_gen_key_callback
 
 from minifi.processors.PublishKafka import PublishKafka
 from minifi.processors.PutS3Object import PutS3Object
 from minifi.processors.DeleteS3Object import DeleteS3Object
 from minifi.processors.FetchS3Object import FetchS3Object
-from minifi.processors.ListS3 import ListS3
 from minifi.processors.PutAzureBlobStorage import PutAzureBlobStorage
 
-
 from behave import given, then, when
 from behave.model_describe import ModelDescriptor
-from copy import copy
-from copy import deepcopy
 from pydoc import locate
 from pytimeparse.timeparse import timeparse
 
-import os
 import logging
-import re
 import time
-import uuid
 
-# Background
 
+# Background
 @given("the content of \"{directory}\" is monitored")
 def step_impl(context, directory):
     context.test.add_file_system_observer(FileSystemObserver(context.test.docker_path_to_local_path(directory)))
 
-# MiNiFi cluster setups
 
+# MiNiFi cluster setups
 @given("a {processor_type} processor with the \"{property}\" property set to \"{property_value}\" in a \"{cluster_name}\" flow")
 @given("a {processor_type} processor with the \"{property}\" property set to \"{property_value}\" in the \"{cluster_name}\" flow")
 def step_impl(context, processor_type, property, property_value, cluster_name):
@@ -51,16 +40,19 @@ def step_impl(context, processor_type, property, property_value, cluster_name):
         cluster.set_name(cluster_name)
         cluster.set_flow(processor)
 
+
 @given("a {processor_type} processor with the \"{property}\" property set to \"{property_value}\"")
 def step_impl(context, processor_type, property, property_value):
     context.execute_steps("given a {processor_type} processor with the \"{property}\" property set to \"{property_value}\" in the \"{cluster_name}\" flow".
-        format(processor_type=processor_type, property=property, property_value=property_value, cluster_name="primary_cluster"))
+                          format(processor_type=processor_type, property=property, property_value=property_value, cluster_name="primary_cluster"))
+
 
 @given("a {processor_type} processor in the \"{cluster_name}\" flow")
 @given("a {processor_type} processor in a \"{cluster_name}\" flow")
 def step_impl(context, processor_type, cluster_name):
     context.execute_steps("given a {processor_type} processor with the \"{property}\" property set to \"{property_value}\" in the \"{cluster_name}\" flow".
-        format(processor_type=processor_type, property=None, property_value=None, cluster_name=cluster_name))
+                          format(processor_type=processor_type, property=None, property_value=None, cluster_name=cluster_name))
+
 
 @given("a set of processors in the \"{cluster_name}\" flow")
 def step_impl(context, cluster_name):
@@ -75,6 +67,7 @@ def step_impl(context, cluster_name):
         if cluster.get_flow() is None:
             cluster.set_flow(processor)
 
+
 @given("a set of processors")
 def step_impl(context):
     rendered_table = ModelDescriptor.describe_table(context.table, "    ")
@@ -82,11 +75,13 @@ def step_impl(context):
         {table}
         """.format(cluster_name="primary_cluster", table=rendered_table))
 
+
 @given("a RemoteProcessGroup node opened on \"{address}\"")
 def step_impl(context, address):
     remote_process_group = RemoteProcessGroup(address, "RemoteProcessGroup")
     context.test.add_remote_process_group(remote_process_group)
 
+
 @given("a PutS3Object processor set up to communicate with an s3 server")
 def step_impl(context):
     # PublishKafka is never the first node of a flow potential cluster-flow setup is omitted
@@ -94,6 +89,7 @@ def step_impl(context):
     put_s3.set_name("PutS3Object")
     context.test.add_node(put_s3)
 
+
 @given("a DeleteS3Object processor set up to communicate with the same s3 server")
 @given("a DeleteS3Object processor set up to communicate with an s3 server")
 def step_impl(context):
@@ -101,6 +97,7 @@ def step_impl(context):
     delete_s3.set_name("DeleteS3Object")
     context.test.add_node(delete_s3)
 
+
 @given("a FetchS3Object processor set up to communicate with the same s3 server")
 @given("a FetchS3Object processor set up to communicate with an s3 server")
 def step_impl(context):
@@ -108,12 +105,14 @@ def step_impl(context):
     fetch_s3.set_name("FetchS3Object")
     context.test.add_node(fetch_s3)
 
+
 @given("a PutAzureBlobStorage processor set up to communicate with an Azure blob storage")
 def step_impl(context):
     put_azure_blob_storage = PutAzureBlobStorage()
     put_azure_blob_storage.set_name("PutAzureBlobStorage")
     context.test.add_node(put_azure_blob_storage)
 
+
 @given("a PublishKafka processor set up to communicate with a kafka broker instance")
 def step_impl(context):
     # PublishKafka is never the first node of a flow potential cluster-flow setup is omitted
@@ -121,6 +120,7 @@ def step_impl(context):
     publish_kafka.set_name("PublishKafka")
     context.test.add_node(publish_kafka)
 
+
 @given("the \"{property_name}\" of the {processor_name} processor is set to \"{property_value}\"")
 def step_impl(context, property_name, processor_name, property_value):
     processor = context.test.get_node_by_name(processor_name)
@@ -132,12 +132,14 @@ def step_impl(context, processor_name, sceduling_period):
     processor = context.test.get_node_by_name(processor_name)
     processor.set_scheduling_period(sceduling_period)
 
+
 @given("these processor properties are set")
 @given("these processor properties are set to match the http proxy")
 def step_impl(context):
     for row in context.table:
         context.test.get_node_by_name(row["processor name"]).set_property(row["property name"], row["property value"])
 
+
 @given("the \"{relationship}\" relationship of the {source_name} processor is connected to the input port on the {remote_process_group_name}")
 def step_impl(context, relationship, source_name, remote_process_group_name):
     source = context.test.get_node_by_name(source_name)
@@ -146,6 +148,7 @@ def step_impl(context, relationship, source_name, remote_process_group_name):
     context.test.add_node(input_port_node)
     source.out_proc.connect({relationship: input_port_node})
 
+
 @given("the \"{relationship}\" relationship of the {source_name} is connected to the {destination_name}")
 @given("the \"{relationship}\" relationship of the {source_name} processor is connected to the {destination_name}")
 def step_impl(context, relationship, source_name, destination_name):
@@ -153,27 +156,31 @@ def step_impl(context, relationship, source_name, destination_name):
     destination = context.test.get_node_by_name(destination_name)
     source.out_proc.connect({relationship: destination})
 
+
 @given("the processors are connected up as described here")
 def step_impl(context):
     for row in context.table:
         context.execute_steps(
             "given the \"" + row["relationship name"] + "\" relationship of the " + row["source name"] + " processor is connected to the " + row["destination name"])
 
+
 @given("the connection going to the RemoteProcessGroup has \"drop empty\" set")
 def step_impl(context):
     input_port = context.test.get_node_by_name("to_nifi")
     input_port.drop_empty_flowfiles = True
 
+
 @given("a file with the content \"{content}\" is present in \"{path}\"")
 def step_impl(context, content, path):
     context.test.add_test_data(path, content)
 
+
 @given("a file with filename \"{file_name}\" and content \"{content}\" is present in \"{path}\"")
 def step_impl(context, file_name, content, path):
     context.test.add_test_data(path, content, file_name)
 
-# NiFi setups
 
+# NiFi setups
 @given("a NiFi flow \"{cluster_name}\" receiving data from a RemoteProcessGroup \"{source_name}\" on port {port}")
 def step_impl(context, cluster_name, source_name, port):
     remote_process_group = context.test.get_remote_process_group_by_name("RemoteProcessGroup")
@@ -186,6 +193,7 @@ def step_impl(context, cluster_name, source_name, port):
     if cluster.get_flow() is None:
         cluster.set_flow(source)
 
+
 @given("in the \"{cluster_name}\" flow the \"{relationship}\" relationship of the {source_name} processor is connected to the {destination_name}")
 def step_impl(context, cluster_name, relationship, source_name, destination_name):
     cluster = context.test.acquire_cluster(cluster_name)
@@ -195,8 +203,8 @@ def step_impl(context, cluster_name, relationship, source_name, destination_name
     if cluster.get_flow() is None:
         cluster.set_flow(source)
 
-# HTTP proxy setup
 
+# HTTP proxy setup
 @given("the http proxy server \"{cluster_name}\" is set up")
 @given("a http proxy server \"{cluster_name}\" is set up accordingly")
 def step_impl(context, cluster_name):
@@ -205,8 +213,8 @@ def step_impl(context, cluster_name):
     cluster.set_engine("http-proxy")
     cluster.set_flow(None)
 
+
 # TLS
-#
 @given("an ssl context service set up for {producer_name} and {consumer_name}")
 def step_impl(context, producer_name, consumer_name):
     cert, key = gen_cert()
@@ -220,8 +228,8 @@ def step_impl(context, producer_name, consumer_name):
     consumer.set_property("SSL Certificate", crt_file)
     consumer.set_property("SSL Verify Peer", "no")
 
-# Kafka setup
 
+# Kafka setup
 @given("a kafka broker \"{cluster_name}\" is set up in correspondence with the PublishKafka")
 def step_impl(context, cluster_name):
     cluster = context.test.acquire_cluster(cluster_name)
@@ -229,8 +237,8 @@ def step_impl(context, cluster_name):
     cluster.set_engine("kafka-broker")
     cluster.set_flow(None)
 
-# s3 setup
 
+# s3 setup
 @given("a s3 server \"{cluster_name}\" is set up in correspondence with the PutS3Object")
 @given("a s3 server \"{cluster_name}\" is set up in correspondence with the DeleteS3Object")
 def step_impl(context, cluster_name):
@@ -239,8 +247,8 @@ def step_impl(context, cluster_name):
     cluster.set_engine("s3-server")
     cluster.set_flow(None)
 
-# azure storage setup
 
+# azure storage setup
 @given("an Azure storage server \"{cluster_name}\" is set up in correspondence with the PutAzureBlobStorage")
 def step_impl(context, cluster_name):
     cluster = context.test.acquire_cluster(cluster_name)
@@ -248,50 +256,61 @@ def step_impl(context, cluster_name):
     cluster.set_engine("azure-storage-server")
     cluster.set_flow(None)
 
+
 @when("the MiNiFi instance starts up")
 @when("both instances start up")
 @when("all instances start up")
 def step_impl(context):
     context.test.start()
 
+
 @when("content \"{content}\" is added to file \"{file_name}\" present in directory \"{path}\" {seconds:d} seconds later")
 def step_impl(context, content, file_name, path, seconds):
     time.sleep(seconds)
     context.test.add_test_data(path, content, file_name)
 
+
 @then("a flowfile with the content \"{content}\" is placed in the monitored directory in less than {duration}")
 def step_impl(context, content, duration):
     context.test.check_for_file_with_content_generated(content, timeparse(duration))
 
+
 @then("{number_of_files:d} flowfiles are placed in the monitored directory in {duration}")
 @then("{number_of_files:d} flowfile is placed in the monitored directory in {duration}")
 def step_impl(context, number_of_files, duration):
     context.test.check_for_multiple_files_generated(number_of_files, timeparse(duration))
 
+
 @then("at least one empty flowfile is placed in the monitored directory in less than {duration}")
 def step_impl(context, duration):
     context.test.check_for_multiple_empty_files_generated(timeparse(duration))
 
+
 @then("no files are placed in the monitored directory in {duration} of running time")
 def step_impl(context, duration):
     context.test.check_for_no_files_generated(timeparse(duration))
 
+
 @then("no errors were generated on the \"{cluster_name}\" regarding \"{url}\"")
 def step_impl(context, cluster_name, url):
     context.test.check_http_proxy_access(cluster_name, url)
 
+
 @then("the object on the \"{cluster_name}\" s3 server is \"{object_data}\"")
 def step_impl(context, cluster_name, object_data):
     context.test.check_s3_server_object_data(cluster_name, object_data)
 
+
 @then("the object content type on the \"{cluster_name}\" s3 server is \"{content_type}\" and the object metadata matches use metadata")
 def step_impl(context, cluster_name, content_type):
     context.test.check_s3_server_object_metadata(cluster_name, content_type)
 
+
 @then("the object bucket on the \"{cluster_name}\" s3 server is empty")
 def step_impl(context, cluster_name):
     context.test.check_empty_s3_bucket(cluster_name)
 
+
 @then("the object on the \"{cluster_name}\" Azure storage server is \"{object_data}\"")
 def step_impl(context, cluster_name, object_data):
     context.test.check_azure_storage_server_data(cluster_name, object_data)
diff --git a/extensions/pythonprocessors/examples/SentimentAnalysis.py b/extensions/pythonprocessors/examples/SentimentAnalysis.py
index f92f8bc..4db2260 100644
--- a/extensions/pythonprocessors/examples/SentimentAnalysis.py
+++ b/extensions/pythonprocessors/examples/SentimentAnalysis.py
@@ -16,28 +16,32 @@
 import codecs
 from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
 
+
 def describe(processor):
     processor.setDescription("Provides a sentiment analysis of the content within the flow file")
 
+
 def onInitialize(processor):
-  processor.setSupportsDynamicProperties()
+    processor.setSupportsDynamicProperties()
+
 
 class VaderSentiment(object):
-  def __init__(self):
-    self.content = None
+    def __init__(self):
+        self.content = None
+
+    def process(self, input_stream):
+        self.content = codecs.getreader('utf-8')(input_stream).read()
+        return len(self.content)
 
-  def process(self, input_stream):
-    self.content = codecs.getreader('utf-8')(input_stream).read()
-    return len(self.content)
 
 def onTrigger(context, session):
-  flow_file = session.get()
-  if flow_file is not None:
-    sentiment = VaderSentiment()
-    session.read(flow_file,sentiment)
-    analyzer = SentimentIntensityAnalyzer()
-    vs = analyzer.polarity_scores(sentiment.content)
-    flow_file.addAttribute("positive",str(vs['pos']))
-    flow_file.addAttribute("negative",str(vs['neg']))
-    flow_file.addAttribute("neutral",str(vs['neu']))
-    session.transfer(flow_file, REL_SUCCESS)
+    flow_file = session.get()
+    if flow_file is not None:
+        sentiment = VaderSentiment()
+        session.read(flow_file, sentiment)
+        analyzer = SentimentIntensityAnalyzer()
+        vs = analyzer.polarity_scores(sentiment.content)
+        flow_file.addAttribute("positive", str(vs['pos']))
+        flow_file.addAttribute("negative", str(vs['neg']))
+        flow_file.addAttribute("neutral", str(vs['neu']))
+        session.transfer(flow_file, REL_SUCCESS)
diff --git a/extensions/pythonprocessors/google/SentimentAnalyzer.py b/extensions/pythonprocessors/google/SentimentAnalyzer.py
index 3d54af0..c7ab9c0 100644
--- a/extensions/pythonprocessors/google/SentimentAnalyzer.py
+++ b/extensions/pythonprocessors/google/SentimentAnalyzer.py
@@ -15,50 +15,50 @@
 # limitations under the License.
 """
   Install the following with pip ( or pip3 )
-  
+
   pip install google-cloud-language
-  
+
   -- the following were needed during development as we saw SSL timeout errors
   pip install requests[security]
   pip install -U httplib2
 """
-import json
-import sys
 import codecs
 from google.cloud import language
 from google.cloud.language import enums
 from google.cloud.language import types
 
+
 def describe(processor):
     processor.setDescription("Performs a sentiment Analysis of incoming flowfile content using Google Cloud.")
 
+
 def onInitialize(processor):
-    # is required, 
-    processor.addProperty("Credentials Path","Path to your Google Credentials JSON File. Must exist on agent hosts.","", True, False)
+    # is required,
+    processor.addProperty("Credentials Path", "Path to your Google Credentials JSON File. Must exist on agent hosts.", "", True, False)
+
 
 class ContentExtract(object):
-  def __init__(self):
-    self.content = None
+    def __init__(self):
+        self.content = None
 
-  def process(self, input_stream):
-    self.content = codecs.getreader('utf-8')(input_stream).read()
-    return len(self.content)
+    def process(self, input_stream):
+        self.content = codecs.getreader('utf-8')(input_stream).read()
+        return len(self.content)
 
 
 def onTrigger(context, session):
-  flow_file = session.get()
-  if flow_file is not None:
-    credentials_filename = context.getProperty("Credentials Path")
-    sentiment = ContentExtract()
-    session.read(flow_file,sentiment)
-    client = language.LanguageServiceClient.from_service_account_json(credentials_filename)
-    document = types.Document(content=sentiment.content,type=enums.Document.Type.PLAIN_TEXT)
-    
-    annotations = client.analyze_sentiment(document=document, retry = None,timeout=1.0 )
-    score = annotations.document_sentiment.score
-    magnitude = annotations.document_sentiment.magnitude
-    
-    flow_file.addAttribute("score",str(score))
-    flow_file.addAttribute("magnitude",str(magnitude))
-    session.transfer(flow_file, REL_SUCCESS)
+    flow_file = session.get()
+    if flow_file is not None:
+        credentials_filename = context.getProperty("Credentials Path")
+        sentiment = ContentExtract()
+        session.read(flow_file, sentiment)
+        client = language.LanguageServiceClient.from_service_account_json(credentials_filename)
+        document = types.Document(content=sentiment.content, type=enums.Document.Type.PLAIN_TEXT)
+
+        annotations = client.analyze_sentiment(document=document, retry=None, timeout=1.0)
+        score = annotations.document_sentiment.score
+        magnitude = annotations.document_sentiment.magnitude
 
+        flow_file.addAttribute("score", str(score))
+        flow_file.addAttribute("magnitude", str(magnitude))
+        session.transfer(flow_file, REL_SUCCESS)
diff --git a/extensions/pythonprocessors/h2o/ConvertDsToCsv.py b/extensions/pythonprocessors/h2o/ConvertDsToCsv.py
index 8526045..566f4e3 100644
--- a/extensions/pythonprocessors/h2o/ConvertDsToCsv.py
+++ b/extensions/pythonprocessors/h2o/ConvertDsToCsv.py
@@ -20,10 +20,11 @@
     pip install pandas
 """
 import codecs
-import pandas as pd
+import pandas as pd  # noqa F401
 import datatable as dt
 from io import StringIO
 
+
 def describe(processor):
     """ describe what this processor does
     """
@@ -31,24 +32,27 @@ def describe(processor):
                               supports a variety of data sources: pandas DataFrames, csv, numpy \
                               arrays, dictionary, list, raw Python objects, etc")
 
+
 def onInitialize(processor):
     """ onInitialize is where you can set properties
     """
     processor.setSupportsDynamicProperties()
 
+
 class ContentExtract(object):
     """ ContentExtract callback class is defined for reading streams of data through the session
         and has a process function that accepts the input stream
     """
     def __init__(self):
         self.content = None
-    
+
     def process(self, input_stream):
         """ Use codecs getReader to read that data
         """
         self.content = codecs.getreader('utf-8')(input_stream).read()
         return len(self.content)
 
+
 class ContentWrite(object):
     """ ContentWrite callback class is defined for writing streams of data through the session
     """
@@ -61,6 +65,7 @@ class ContentWrite(object):
         codecs.getwriter('utf-8')(output_stream).write(self.content)
         return len(self.content)
 
+
 def onTrigger(context, session):
     """ onTrigger is executed and passed processor context and session
     """
@@ -73,7 +78,7 @@ def onTrigger(context, session):
         csv_data = StringIO()
         # load str data into datatable, then convert to pandas df
         dt_frame = dt.Frame(read_cb.content)
-        pd_dframe = dt_frame.to_pandas() 
+        pd_dframe = dt_frame.to_pandas()
         # convert df to csv file like object without df index
         pd_dframe.to_csv(csv_data, index=False)
         # set the csv to the start of text stream
@@ -83,4 +88,4 @@ def onTrigger(context, session):
         # write csv str to flow file
         write_cb = ContentWrite(csv_data)
         session.write(flow_file, write_cb)
-        session.transfer(flow_file, REL_SUCCESS)
\ No newline at end of file
+        session.transfer(flow_file, REL_SUCCESS)
diff --git a/extensions/pythonprocessors/h2o/h2o3/mojo/ExecuteH2oMojoScoring.py b/extensions/pythonprocessors/h2o/h2o3/mojo/ExecuteH2oMojoScoring.py
index 9307f1a..0c6f092 100644
--- a/extensions/pythonprocessors/h2o/h2o3/mojo/ExecuteH2oMojoScoring.py
+++ b/extensions/pythonprocessors/h2o/h2o3/mojo/ExecuteH2oMojoScoring.py
@@ -22,7 +22,7 @@
     sudo apt-get -y update
 
     Install Java to include open source H2O-3 algorithms:
-    
+
     sudo apt-get -y install openjdk-8-jdk
 
     Install Datatable and pandas:
@@ -52,11 +52,12 @@
 """
 import h2o
 import codecs
-import pandas as pd
+import pandas as pd  # noqa: F401
 import datatable as dt
 
 mojo_model = None
 
+
 def describe(processor):
     """ describe what this processor does
     """
@@ -65,6 +66,7 @@ def describe(processor):
         the incoming flow file content. If tabular data is one row, then MOJO does real-time \
         scoring. If tabular data is multiple rows, then MOJO does batch scoring.")
 
+
 def onInitialize(processor):
     """ onInitialize is where you can set properties
         processor.addProperty(name, description, defaultValue, required, el)
@@ -72,8 +74,8 @@ def onInitialize(processor):
     processor.addProperty("MOJO Model Filepath", "Add the filepath to the MOJO Model file. For example, \
         'path/to/mojo-model/GBM_grid__1_AutoML_20200511_075150_model_180.zip'.", "", True, False)
 
-    processor.addProperty("Is First Line Header", "Add True or False for whether first line is header.", \
-        "True", True, False)
+    processor.addProperty("Is First Line Header", "Add True or False for whether first line is header.",
+                          "True", True, False)
 
     processor.addProperty("Input Schema", "If first line is not header, then you must add Input Schema for \
         incoming data.If there is more than one column name, write a comma separated list of \
@@ -87,6 +89,7 @@ def onInitialize(processor):
         is more than one column name, write a comma separated list of column names. Else, H2O-3 will include \
         them by default", "", False, False)
 
+
 def onSchedule(context):
     """ onSchedule is where you load and read properties
         this function is called 1 time when the processor is scheduled to run
@@ -97,19 +100,21 @@ def onSchedule(context):
     mojo_model_filepath = context.getProperty("MOJO Model Filepath")
     mojo_model = h2o.import_mojo(mojo_model_filepath)
 
+
 class ContentExtract(object):
     """ ContentExtract callback class is defined for reading streams of data through the session
         and has a process function that accepts the input stream
     """
     def __init__(self):
         self.content = None
-    
+
     def process(self, input_stream):
         """ Use codecs getReader to read that data
         """
         self.content = codecs.getreader('utf-8')(input_stream).read()
         return len(self.content)
 
+
 class ContentWrite(object):
     """ ContentWrite callback class is defined for writing streams of data through the session
     """
@@ -122,6 +127,7 @@ class ContentWrite(object):
         codecs.getwriter('utf-8')(output_stream).write(self.content)
         return len(self.content)
 
+
 def onTrigger(context, session):
     """ onTrigger is executed and passed processor context and session
     """
@@ -160,6 +166,6 @@ def onTrigger(context, session):
         # add one or more flow file attributes: predicted label name and associated score pair
         for i in range(len(pred_header)):
             ff_attr_name = pred_header[i] + "_pred_0"
-            flow_file.addAttribute(ff_attr_name, str(preds_pd_df.at[0,pred_header[i]]))
+            flow_file.addAttribute(ff_attr_name, str(preds_pd_df.at[0, pred_header[i]]))
             log.info("getAttribute({}): {}".format(ff_attr_name, flow_file.getAttribute(ff_attr_name)))
         session.transfer(flow_file, REL_SUCCESS)
diff --git a/extensions/script/ExampleProcessor.py b/extensions/script/ExampleProcessor.py
index 6ef7627..53c8a60 100644
--- a/extensions/script/ExampleProcessor.py
+++ b/extensions/script/ExampleProcessor.py
@@ -13,14 +13,18 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+
 def describe(processor):
     processor.setDescription("Adds an attribute to your flow files")
 
+
 def onInitialize(processor):
-  processor.setSupportsDynamicProperties()
+    processor.setSupportsDynamicProperties()
+
 
 def onTrigger(context, session):
-  flow_file = session.get()
-  if flow_file is not None:
-    flow_file.addAttribute("Python attribute","attributevalue")
-    session.transfer(flow_file, REL_SUCCESS)
+    flow_file = session.get()
+    if flow_file is not None:
+        flow_file.addAttribute("Python attribute", "attributevalue")
+        session.transfer(flow_file, REL_SUCCESS)
diff --git a/libminifi/test/script-tests/test_scripts/non_transferring_processor.py b/libminifi/test/script-tests/test_scripts/non_transferring_processor.py
index 6f7950e..afb83d4 100644
--- a/libminifi/test/script-tests/test_scripts/non_transferring_processor.py
+++ b/libminifi/test/script-tests/test_scripts/non_transferring_processor.py
@@ -6,9 +6,9 @@
 #  The ASF licenses this file to You under the Apache License, Version 2.0
 #  (the "License"); you may not use this file except in compliance with
 #  the License.  You may obtain a copy of the License at
-# 
+#
 #      http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 #  Unless required by applicable law or agreed to in writing, software
 #  distributed under the License is distributed on an "AS IS" BASIS,
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,12 +16,14 @@
 #  limitations under the License.
 #
 
+
 def describe(processor):
-  processor.setDescription("Processor used for testing in ExecutePythonProcessorTests.cpp")
+    processor.setDescription("Processor used for testing in ExecutePythonProcessorTests.cpp")
+
 
 def onTrigger(context, session):
-  flow_file = session.get()
-  log.info('Vrrm, vrrrm, processor is running, vrrrm!!')
+    flow_file = session.get()
+    log.info('Vrrm, vrrrm, processor is running, vrrrm!!')
 
-  if flow_file is not None:
-    log.info('created flow file: %s' % flow_file.getAttribute('filename'))
+    if flow_file is not None:
+        log.info('created flow file: %s' % flow_file.getAttribute('filename'))
diff --git a/libminifi/test/script-tests/test_scripts/passthrough_processor_transfering_to_failure.py b/libminifi/test/script-tests/test_scripts/passthrough_processor_transfering_to_failure.py
index c8cd1c6..cad1d8e 100644
--- a/libminifi/test/script-tests/test_scripts/passthrough_processor_transfering_to_failure.py
+++ b/libminifi/test/script-tests/test_scripts/passthrough_processor_transfering_to_failure.py
@@ -6,9 +6,9 @@
 #  The ASF licenses this file to You under the Apache License, Version 2.0
 #  (the "License"); you may not use this file except in compliance with
 #  the License.  You may obtain a copy of the License at
-# 
+#
 #      http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 #  Unless required by applicable law or agreed to in writing, software
 #  distributed under the License is distributed on an "AS IS" BASIS,
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,13 +16,15 @@
 #  limitations under the License.
 #
 
+
 def describe(processor):
-  processor.setDescription("Processor used for testing in ExecutePythonProcessorTests.cpp")
+    processor.setDescription("Processor used for testing in ExecutePythonProcessorTests.cpp")
+
 
 def onTrigger(context, session):
-  flow_file = session.get()
-  log.info('Vrrm, vrrrm, processor is running, vrrrm!!')
+    flow_file = session.get()
+    log.info('Vrrm, vrrrm, processor is running, vrrrm!!')
 
-  if flow_file is not None:
-    log.info('created flow file: %s' % flow_file.getAttribute('filename'))
-    session.transfer(flow_file, REL_FAILURE)
+    if flow_file is not None:
+        log.info('created flow file: %s' % flow_file.getAttribute('filename'))
+        session.transfer(flow_file, REL_FAILURE)
diff --git a/libminifi/test/script-tests/test_scripts/passthrough_processor_transfering_to_success.py b/libminifi/test/script-tests/test_scripts/passthrough_processor_transfering_to_success.py
index 7bdb41f..5ef7f5f 100644
--- a/libminifi/test/script-tests/test_scripts/passthrough_processor_transfering_to_success.py
+++ b/libminifi/test/script-tests/test_scripts/passthrough_processor_transfering_to_success.py
@@ -6,9 +6,9 @@
 #  The ASF licenses this file to You under the Apache License, Version 2.0
 #  (the "License"); you may not use this file except in compliance with
 #  the License.  You may obtain a copy of the License at
-# 
+#
 #      http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 #  Unless required by applicable law or agreed to in writing, software
 #  distributed under the License is distributed on an "AS IS" BASIS,
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,13 +16,15 @@
 #  limitations under the License.
 #
 
+
 def describe(processor):
-  processor.setDescription("Processor used for testing in ExecutePythonProcessorTests.cpp")
+    processor.setDescription("Processor used for testing in ExecutePythonProcessorTests.cpp")
+
 
 def onTrigger(context, session):
-  flow_file = session.get()
-  log.info('Vrrm, vrrrm, processor is running, vrrrm!!')
+    flow_file = session.get()
+    log.info('Vrrm, vrrrm, processor is running, vrrrm!!')
 
-  if flow_file is not None:
-    log.info('created flow file: %s' % flow_file.getAttribute('filename'))
-    session.transfer(flow_file, REL_SUCCESS)
+    if flow_file is not None:
+        log.info('created flow file: %s' % flow_file.getAttribute('filename'))
+        session.transfer(flow_file, REL_SUCCESS)
diff --git a/libminifi/test/script-tests/test_scripts/stateful_processor.py b/libminifi/test/script-tests/test_scripts/stateful_processor.py
index 37f0d73..2cfa072 100644
--- a/libminifi/test/script-tests/test_scripts/stateful_processor.py
+++ b/libminifi/test/script-tests/test_scripts/stateful_processor.py
@@ -6,9 +6,9 @@
 #  The ASF licenses this file to You under the Apache License, Version 2.0
 #  (the "License"); you may not use this file except in compliance with
 #  the License.  You may obtain a copy of the License at
-# 
+#
 #      http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 #  Unless required by applicable law or agreed to in writing, software
 #  distributed under the License is distributed on an "AS IS" BASIS,
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,27 +16,31 @@
 #  limitations under the License.
 #
 
+
 def describe(processor):
-  processor.setDescription("Processor used for testing in ExecutePythonProcessorTests.cpp")
+    processor.setDescription("Processor used for testing in ExecutePythonProcessorTests.cpp")
 
 
 state = 0
+
+
 class WriteCallback(object):
-  def process(self, output_stream):
-    global state
-    new_content = str(state).encode('utf-8')
-    output_stream.write(new_content)
-    state = state + 1
-    return len(new_content)
+    def process(self, output_stream):
+        global state
+        new_content = str(state).encode('utf-8')
+        output_stream.write(new_content)
+        state = state + 1
+        return len(new_content)
+
 
 def onTrigger(context, session):
-  global state
-  log.info('Vrrm, vrrrm, processor is running, vrrrm!!')
-  # flow_file = session.get()
-  flow_file = session.create()
-  flow_file.setAttribute("filename", str(state))
-  log.info('created flow file: %s' % flow_file.getAttribute('filename'))
-
-  if flow_file is not None:
-    session.write(flow_file, WriteCallback())
-    session.transfer(flow_file, REL_SUCCESS)
+    global state
+    log.info('Vrrm, vrrrm, processor is running, vrrrm!!')
+    # flow_file = session.get()
+    flow_file = session.create()
+    flow_file.setAttribute("filename", str(state))
+    log.info('created flow file: %s' % flow_file.getAttribute('filename'))
+
+    if flow_file is not None:
+        session.write(flow_file, WriteCallback())
+        session.transfer(flow_file, REL_SUCCESS)
diff --git a/python/getFile.py b/python/getFile.py
index c8aff98..95c37c9 100644
--- a/python/getFile.py
+++ b/python/getFile.py
@@ -13,33 +13,33 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from minifi import *
+from minifi import *  # noqa F403
 
 from argparse import ArgumentParser
-from ctypes import cdll
-import ctypes
+from ctypes import cdll  # noqa F401
+import ctypes  # noqa F401
 import sys
-from _cffi_backend import callback
+from _cffi_backend import callback  # noqa F401
 
 
-class GetFilePrinterProcessor(PyProcessor):
+class GetFilePrinterProcessor(PyProcessor):  # noqa F405
     def __init__(self, minifi, flow):
-        PyProcessor.__init__(self, minifi, flow)
+        PyProcessor.__init__(self, minifi, flow)  # noqa F405
         self._callback = None
 
     def _onTriggerCallback(self):
         def onTrigger(session, context):
             flow_file = self.get(session, context)
             if flow_file:
-                if flow_file.add_attribute("python_test","value"):
+                if flow_file.add_attribute("python_test", "value"):
                     print("Add attribute succeeded")
-                if not flow_file.add_attribute("python_test","value2"):
+                if not flow_file.add_attribute("python_test", "value2"):
                     print("Cannot add the same attribute twice!")
-                print ("original file name: " + flow_file.get_attribute("filename"))
+                print("original file name: " + flow_file.get_attribute("filename"))
                 target_relationship = "success"
                 if not self.transfer(session, flow_file, target_relationship):
                     print("transfer to relationship " + target_relationship + " failed")
-        return CALLBACK(onTrigger)
+        return CALLBACK(onTrigger)  # noqa F405
 
 
 parser = ArgumentParser()
@@ -53,16 +53,16 @@ parser.add_argument("-i", "--input", dest="input_port",
                     help="NiFi Input Port")
 
 parser.add_argument("-d", "--dir", dest="dir",
-                help="GetFile Dir to monitor", metavar="FILE")
+                    help="GetFile Dir to monitor", metavar="FILE")
 
 args = parser.parse_args()
 
 """ dll_file is the path to the shared object """
-minifi = MiNiFi(dll_file=args.dll_file,url = args.nifi_instance.encode('utf-8'), port=args.input_port.encode('utf-8'))
+minifi = MiNiFi(dll_file=args.dll_file, url=args.nifi_instance.encode('utf-8'), port=args.input_port.encode('utf-8'))  # noqa F405
 
-minifi.set_property("nifi.remote.input.http.enabled","true")
+minifi.set_property("nifi.remote.input.http.enabled", "true")
 
-processor = minifi.add_processor( GetFile() )
+processor = minifi.add_processor(GetFile())  # noqa F405
 
 processor.set_property("Input Directory", args.dir)
 processor.set_property("Keep Source File", "true")
diff --git a/python/minifi/__init__.py b/python/minifi/__init__.py
index 4632347..baeed7e 100644
--- a/python/minifi/__init__.py
+++ b/python/minifi/__init__.py
@@ -18,41 +18,48 @@ import ctypes
 from abc import abstractmethod
 
 
-
 class RPG_PORT(ctypes.Structure):
     _fields_ = [('port_id', ctypes.c_char_p)]
 
+
 class NIFI_STRUCT(ctypes.Structure):
     _fields_ = [('instancePtr', ctypes.c_void_p),
-                 ('port', RPG_PORT)]
+                ('port', RPG_PORT)]
+
 
 class CFlow(ctypes.Structure):
     _fields_ = [('plan', ctypes.c_void_p)]
 
+
 class CFlowFile(ctypes.Structure):
     _fields_ = [('size', ctypes.c_int),
-                 ('in', ctypes.c_void_p),
-                 ('contentLocation', ctypes.c_char_p),
-                 ('attributes', ctypes.c_void_p),
-                 ('ffp', ctypes.c_void_p)]
+                ('in', ctypes.c_void_p),
+                ('contentLocation', ctypes.c_char_p),
+                ('attributes', ctypes.c_void_p),
+                ('ffp', ctypes.c_void_p)]
+
 
 class CAttribute(ctypes.Structure):
     _fields_ = [('key', ctypes.c_char_p),
                 ('value', ctypes.c_void_p),
                 ('value_size', ctypes.c_size_t)]
 
+
 class CProcessor(ctypes.Structure):
     _fields_ = [('processor_ptr', ctypes.c_void_p)]
 
+
 class CProcessSession(ctypes.Structure):
     _fields_ = [('process_session', ctypes.c_void_p)]
 
+
 class CProcessContext(ctypes.Structure):
     _fields_ = [('process_context', ctypes.c_void_p)]
 
 
 CALLBACK = ctypes.CFUNCTYPE(None, ctypes.POINTER(CProcessSession), ctypes.POINTER(CProcessContext))
 
+
 class Processor(object):
     def __init__(self, cprocessor, minifi):
         super(Processor, self).__init__()
@@ -60,7 +67,8 @@ class Processor(object):
         self._minifi = minifi
 
     def set_property(self, name, value):
-        return self._minifi.set_property( self._proc, name.encode("UTF-8"), value.encode("UTF-8")) == 0
+        return self._minifi.set_property(self._proc, name.encode("UTF-8"), value.encode("UTF-8")) == 0
+
 
 class PyProcessor(object):
     def __init__(self, minifi, flow):
@@ -104,6 +112,7 @@ class RPG(object):
     def get_instance(self):
         return self._nifi
 
+
 class FlowFile(object):
     def __init__(self, minifi, ff):
         super(FlowFile, self).__init__()
@@ -130,67 +139,64 @@ class FlowFile(object):
         return self._ff
 
 
-
 class MiNiFi(object):
     """ Proxy Connector """
     def __init__(self, dll_file, url, port):
         super(MiNiFi, self).__init__()
-        self._minifi= cdll.LoadLibrary(dll_file)
+        self._minifi = cdll.LoadLibrary(dll_file)
         """ create instance """
-        self._minifi.create_instance.argtypes = [ctypes.c_char_p , ctypes.POINTER(RPG_PORT)]
+        self._minifi.create_instance.argtypes = [ctypes.c_char_p, ctypes.POINTER(RPG_PORT)]
         self._minifi.create_instance.restype = ctypes.POINTER(NIFI_STRUCT)
         """ create new flow """
         self._minifi.create_new_flow.argtype = ctypes.POINTER(NIFI_STRUCT)
         self._minifi.create_new_flow.restype = ctypes.POINTER(CFlow)
         """ add processor """
-        self._minifi.add_processor.argtypes = [ctypes.POINTER(CFlow) , ctypes.c_char_p ]
+        self._minifi.add_processor.argtypes = [ctypes.POINTER(CFlow), ctypes.c_char_p]
         self._minifi.add_processor.restype = ctypes.POINTER(CProcessor)
         """ set processor property"""
-        self._minifi.set_property.argtypes = [ctypes.POINTER(CProcessor) , ctypes.c_char_p , ctypes.c_char_p ]
+        self._minifi.set_property.argtypes = [ctypes.POINTER(CProcessor), ctypes.c_char_p, ctypes.c_char_p]
         self._minifi.set_property.restype = ctypes.c_int
         """ set instance property"""
-        self._minifi.set_instance_property.argtypes = [ctypes.POINTER(NIFI_STRUCT) , ctypes.c_char_p , ctypes.c_char_p ]
+        self._minifi.set_instance_property.argtypes = [ctypes.POINTER(NIFI_STRUCT), ctypes.c_char_p, ctypes.c_char_p]
         self._minifi.set_instance_property.restype = ctypes.c_int
         """ get next flow file """
-        self._minifi.get_next_flow_file.argtypes = [ctypes.POINTER(NIFI_STRUCT) , ctypes.POINTER(CFlow) ]
+        self._minifi.get_next_flow_file.argtypes = [ctypes.POINTER(NIFI_STRUCT), ctypes.POINTER(CFlow)]
         self._minifi.get_next_flow_file.restype = ctypes.POINTER(CFlowFile)
         """ transmit flow file """
-        self._minifi.transmit_flowfile.argtypes = [ctypes.POINTER(CFlowFile) , ctypes.POINTER(NIFI_STRUCT) ]
+        self._minifi.transmit_flowfile.argtypes = [ctypes.POINTER(CFlowFile), ctypes.POINTER(NIFI_STRUCT)]
         self._minifi.transmit_flowfile.restype = ctypes.c_int
         """ get ff """
-        self._minifi.get.argtypes = [ctypes.POINTER(CProcessSession), ctypes.POINTER(CProcessContext) ]
+        self._minifi.get.argtypes = [ctypes.POINTER(CProcessSession), ctypes.POINTER(CProcessContext)]
         self._minifi.get.restype = ctypes.POINTER(CFlowFile)
         """ add python processor """
-        self._minifi.add_python_processor.argtypes = [ctypes.POINTER(CFlow) , ctypes.c_void_p ]
+        self._minifi.add_python_processor.argtypes = [ctypes.POINTER(CFlow), ctypes.c_void_p]
         self._minifi.add_python_processor.restype = ctypes.POINTER(CProcessor)
         """ transfer ff """
-        self._minifi.transfer.argtypes = [ctypes.POINTER(CProcessSession), ctypes.POINTER(CFlow) , ctypes.c_char_p ]
+        self._minifi.transfer.argtypes = [ctypes.POINTER(CProcessSession), ctypes.POINTER(CFlow), ctypes.c_char_p]
         self._minifi.transfer.restype = ctypes.c_int
         """ transfer ff to relationship """
-        self._minifi.transfer_to_relationship.argtypes = [ctypes.POINTER(CFlowFile), ctypes.POINTER(CProcessSession), ctypes.c_char_p ]
+        self._minifi.transfer_to_relationship.argtypes = [ctypes.POINTER(CFlowFile), ctypes.POINTER(CProcessSession), ctypes.c_char_p]
         self._minifi.transfer_to_relationship.restype = ctypes.c_int
         """ add attribute to ff """
-        self._minifi.add_attribute.argtypes = [ctypes.POINTER(CFlowFile), ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int ]
+        self._minifi.add_attribute.argtypes = [ctypes.POINTER(CFlowFile), ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int]
         self._minifi.add_attribute.restype = ctypes.c_int
 
         """ update (overwrite) attribute to ff """
-        self._minifi.update_attribute.argtypes = [ctypes.POINTER(CFlowFile), ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int ]
+        self._minifi.update_attribute.argtypes = [ctypes.POINTER(CFlowFile), ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int]
         self._minifi.update_attribute.restype = None
 
         """ get attribute of ff """
-        self._minifi.get_attribute.argtypes = [ctypes.POINTER(CFlowFile), ctypes.POINTER(CAttribute) ]
+        self._minifi.get_attribute.argtypes = [ctypes.POINTER(CFlowFile), ctypes.POINTER(CAttribute)]
         self._minifi.get_attribute.restype = ctypes.c_int
 
         self._minifi.init_api.argtype = ctypes.c_char_p
         self._minifi.init_api.restype = ctypes.c_int
         self._minifi.init_api(dll_file.encode("UTF-8"))
 
-        self._instance = self.__open_rpg(url,port)
-        self._flow = self._minifi.create_new_flow( self._instance.get_instance() )
+        self._instance = self.__open_rpg(url, port)
+        self._flow = self._minifi.create_new_flow(self._instance.get_instance())
         self._minifi.enable_logging()
 
-
-
     def __open_rpg(self, url, port):
         rpgPort = (RPG_PORT)(port)
         rpg = self._minifi.create_instance(url, rpgPort)
@@ -203,13 +209,12 @@ class MiNiFi(object):
     def set_property(self, name, value):
         self._minifi.set_instance_property(self._instance.get_instance(), name.encode("UTF-8"), value.encode("UTF-8"))
 
-
     def add_processor(self, processor):
         proc = self._minifi.add_processor(self._flow, processor.get_name().encode("UTF-8"))
-        return Processor(proc,self._minifi)
+        return Processor(proc, self._minifi)
 
     def create_python_processor(self, module, processor):
-        m =  getattr(module, processor)(self._minifi, self._flow)
+        m = getattr(module, processor)(self._minifi, self._flow)
         proc = self._minifi.add_python_processor(self._flow, m.getTriggerCallback())
         m.setBase(proc)
         return m
@@ -220,7 +225,8 @@ class MiNiFi(object):
 
     def transmit_flowfile(self, ff):
         if ff.get_instance():
-            self._minifi.transmit_flowfile(ff.get_instance(),self._instance.get_instance())
+            self._minifi.transmit_flowfile(ff.get_instance(), self._instance.get_instance())
+
 
 class GetFile(object):
     def __init__(self):
diff --git a/run_flake8.sh b/run_flake8.sh
new file mode 100755
index 0000000..d7e5c4f
--- /dev/null
+++ b/run_flake8.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -euo pipefail
+
+directory=${1:-.}
+flake8 --exclude thirdparty,build --builtins log,REL_SUCCESS,REL_FAILURE,raw_input --ignore E501,W503 --per-file-ignores="steps.py:F811" "${directory}"