You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2020/05/29 14:41:33 UTC

[nifi-minifi-cpp] branch master updated: Added py processor to make predictions with h2o3 model

This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f38a65  Added py processor to make predictions with h2o3 model
7f38a65 is described below

commit 7f38a656bee4316184cb3240a6f89275d250490f
Author: james94 <ja...@gmail.com>
AuthorDate: Tue May 12 19:23:46 2020 -0700

    Added py processor to make predictions with h2o3 model
    
    Replaced previous py processors that use Driverless AI models to make predictions
    with a new py processor that uses open source ALv2 h2o3 library to make predictions.
    This new py processor uses the h2o3 MOJO model to do batch scoring or real-time
    scoring. All the user needs to run this processor is the filepath of h2o3 MOJO model
    and pass that filepath into the 'MOJO Model Filetpath' minifi processor property.
    The other required property 'Is First Line Header' is set to True by default. There
    are 3 optional properties: 'Input Schema', 'Use Output Header', 'Output Schema'.
    This processor makes use of the h2o's open source datatable library licensed under
    Mozilla Public License 2.0. Datatable is used to load in the flow file content.
    Then that datatable frame is converted to a numpy array. Then the numpy array and
    input column names are passed to H2OFrame(np, col_names) to create h2oframe. Then
    the h2o3 MOJO model is used to make predictions on the h2oframe and the result is
    stored into a new h2oframe. That frame is then converted to pandas frame, which is
    stored into the outgoing flow file content. Thus, the predictions are stored in
    the outgoing flow file content.
    
    Added to comments install Java to use H2O3 algos
    
    In the comments section of the code, I added new recommendations.
    The first is to make all packages available on your machine by
    running an update. The next is to install Java since it is
    required to use H2O-3 algorithms.
    
    This closes #781.
    
    Signed-off-by: Marc Parisi <ph...@apache.org>
---
 .../h2o/dai/msp/H2oMojoPwScoring.py                | 121 ---------------
 .../h2o/dai/psp/H2oPspScoreBatches.py              |  98 ------------
 .../h2o/dai/psp/H2oPspScoreRealTime.py             | 108 --------------
 .../h2o/h2o3/mojo/ExecuteH2oMojoScoring.py         | 165 +++++++++++++++++++++
 4 files changed, 165 insertions(+), 327 deletions(-)

diff --git a/extensions/pythonprocessors/h2o/dai/msp/H2oMojoPwScoring.py b/extensions/pythonprocessors/h2o/dai/msp/H2oMojoPwScoring.py
deleted file mode 100644
index 0679018..0000000
--- a/extensions/pythonprocessors/h2o/dai/msp/H2oMojoPwScoring.py
+++ /dev/null
@@ -1,121 +0,0 @@
-#!/usr/bin/env python
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-    Install the following with pip
-
-    -- after downloading the mojo scoring pipeline from Driverless AI,
-       the following packages were needed for helping to make predictions
-
-    pip install datatable pandas scipy
-
-    -- after downloading the mojo2 py runtime from Driverless AI depending on
-       your OS, you need to install the appropriate package:
-
-    # Install the MOJO2 Py runtime on Mac OS X
-    pip install path/to/daimojo-2.2.0-cp36-cp36m-macosx_10_7_x86_64.whl
-    
-    # Install the MOJO2 Py runtime on Linux x86
-    pip install path/to/daimojo-2.2.0-cp36-cp36m-linux_x86_64.whl
-    
-    # Install the MOJO2 Py runtime on Linux PPC
-    pip install path/to/daimojo-2.2.0-cp36-cp36m-linux_ppc64le.whl
-"""
-import codecs
-import pandas as pd
-import datatable as dt
-from collections import Counter
-from scipy.special._ufuncs import expit
-import daimojo.model
-
-def describe(processor):
-    """ describe what this processor does
-    """
-    processor.setDescription("Executes H2O's MOJO Scoring Pipeline in C++ Runtime Python Wrapper \
-        to do batch scoring or real time scoring for one or more predicted label(s) on the tabular \
-        test data in the incoming flow file content. If tabular data is one row, then MOJO does \
-        real time scoring. If tabular data is multiple rows, then MOJO does batch scoring.")
-
-def onInitialize(processor):
-    """ onInitialize is where you can set properties
-    """
-    processor.addProperty("MOJO Pipeline Filepath", "Add the filepath to the MOJO pipeline file. For example, \
-        'path/to/mojo-pipeline/pipeline.mojo'.", "", True, False)
-
-class ContentExtract(object):
-    """ ContentExtract callback class is defined for reading streams of data through the session
-        and has a process function that accepts the input stream
-    """
-    def __init__(self):
-        self.content = None
-    
-    def process(self, input_stream):
-        """ Use codecs getReader to read that data
-        """
-        self.content = codecs.getreader('utf-8')(input_stream).read()
-        return len(self.content)
-
-class ContentWrite(object):
-    """ ContentWrite callback class is defined for writing streams of data through the session
-    """
-    def __init__(self, data):
-        self.content = data
-
-    def process(self, output_stream):
-        """ Use codecs getWriter to write data encoded to the stream
-        """
-        codecs.getwriter('utf-8')(output_stream).write(self.content)
-        return len(self.content)
-
-def onTrigger(context, session):
-    """ onTrigger is executed and passed processor context and session
-    """
-    flow_file = session.get()
-    # lambda compares two lists: does header equal expected header
-    compare = lambda header, exp_header: Counter(header) == Counter(exp_header)
-    if flow_file is not None:
-        # read test data of flow file content into read_cb.content
-        read_cb = ContentExtract()
-        session.read(flow_file, read_cb)
-        # instantiate H2O's MOJO Scoring Pipeline Scorer
-        mojo_pipeline_filepath = context.getProperty("MOJO Pipeline Filepath")
-        m_scorer = daimojo.model(mojo_pipeline_filepath)
-        # add flow file attribute for creation time of mojo
-        flow_file.addAttribute("mojo_creation_time", m_scorer.created_time)
-        # add flow file attribute for uuid of mojo
-        flow_file.addAttribute("mojo_uuid", m_scorer.uuid)
-        # get list of predicted label(s) for prediction header
-        pred_header = m_scorer.output_names
-        # load tabular data str of 1 or more rows into datatable frame
-        test_dt_frame = dt.Frame(read_cb.content)
-        # does test dt frame column names (header) equal m_scorer feature_names (exp_header)
-        if compare(test_dt_frame.names, m_scorer.feature_names) == False:
-            test_dt_frame.names = tuple(m_scorer.feature_names)
-        # do scoring on test data in the test_dt_frame, return dt frame with predicted label(s)
-        preds_dt_frame = m_scorer.predict(test_dt_frame)
-        # convert preds_dt_frame to pandas dataframe
-        preds_df = preds_dt_frame.to_pandas()
-        # convert pandas df to str without df index, then write to flow file
-        preds_df_str = preds_df.to_string(index=False)
-        write_cb = ContentWrite(preds_df_str)
-        session.write(flow_file, write_cb)
-        # add flow file attribute: number of rows to know how many rows were scored
-        flow_file.addAttribute("num_rows_scored", str(preds_dt_frame.nrows))
-        # add one or more flow file attributes: predicted label name and associated score pair
-        for i in range(len(pred_header)):
-            ff_attr_name = pred_header[i] + "_pred_0"
-            flow_file.addAttribute(ff_attr_name, str(preds_df.at[0,pred_header[i]]))
-            log.info("getAttribute({}): {}".format(ff_attr_name, flow_file.getAttribute(ff_attr_name)))
-        session.transfer(flow_file, REL_SUCCESS)
\ No newline at end of file
diff --git a/extensions/pythonprocessors/h2o/dai/psp/H2oPspScoreBatches.py b/extensions/pythonprocessors/h2o/dai/psp/H2oPspScoreBatches.py
deleted file mode 100644
index 8a737fb..0000000
--- a/extensions/pythonprocessors/h2o/dai/psp/H2oPspScoreBatches.py
+++ /dev/null
@@ -1,98 +0,0 @@
-#!/usr/bin/env python
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-    Install the following with pip
-
-    -- after downloading the python scoring pipeline from Driverless AI,
-       the following was needed for executing the Scorer to make predictions
-    pip install -r requirements.txt
-
-        This requirements.txt file includes pip packages that are available on
-        the internet for install, but there are some packages that only come with
-        the python scoring pipeline download, which include:
-    h2oaicore-1.8.4.1-cp36-cp36m-linux_x86_64.whl
-    scoring_h2oai_experiment_6a77d0a4_6a25_11ea_becf_0242ac110002-1.0.0-py3-none-any.whl
-"""
-import codecs
-import pandas as pd
-import datatable as dt
-from scipy.special._ufuncs import expit
-from scoring_h2oai_experiment_6a77d0a4_6a25_11ea_becf_0242ac110002 import Scorer
-
-def describe(processor):
-    """ describe what this processor does
-    """
-    processor.setDescription("Executes H2O's Python Scoring Pipeline to do batch \
-        scoring for one or more predicted label(s) on the tabular test data in the \
-        incoming flow file content.")
-
-def onInitialize(processor):
-    """ onInitialize is where you can set properties
-    """
-    processor.setSupportsDynamicProperties()
-
-class ContentExtract(object):
-    """ ContentExtract callback class is defined for reading streams of data through the session
-        and has a process function that accepts the input stream
-    """
-    def __init__(self):
-        self.content = None
-    
-    def process(self, input_stream):
-        """ Use codecs getReader to read that data
-        """
-        self.content = codecs.getreader('utf-8')(input_stream).read()
-        return len(self.content)
-
-class ContentWrite(object):
-    """ ContentWrite callback class is defined for writing streams of data through the session
-    """
-    def __init__(self, data):
-        self.content = data
-
-    def process(self, output_stream):
-        """ Use codecs getWriter to write data encoded to the stream
-        """
-        codecs.getwriter('utf-8')(output_stream).write(self.content)
-        return len(self.content)
-
-def onTrigger(context, session):
-    """ onTrigger is executed and passed processor context and session
-    """
-    flow_file = session.get()
-    if flow_file is not None:
-        read_cb = ContentExtract()
-        # read flow file tabular data content into read_cb.content data member
-        session.read(flow_file, read_cb)
-        # instantiate H2O's python scoring pipeline scorer
-        scorer = Scorer()
-        # load tabular data str into datatable
-        test_dt_frame = dt.Frame(read_cb.content)
-        # do batch scoring on test data in datatable frame, return pandas df with predicted labels
-        batch_scores_df = scorer.score_batch(test_dt_frame)
-        # convert df to str without df index, then write to flow file
-        batch_scores_df_str = batch_scores_df.to_string(index=False)
-        write_cb = ContentWrite(batch_scores_df_str)
-        session.write(flow_file, write_cb)
-        # add flow file attribute: number of rows in the frame to know how many rows were scored
-        flow_file.addAttribute("num_rows_scored", str(test_dt_frame.nrows))
-        # add flow file attribute: the score of the first row in the frame to see it's pred label
-        pred_header = batch_scores_df.columns
-        for i in range(len(pred_header)):
-            ff_attr_name = pred_header[i] + "_pred_0"
-            flow_file.addAttribute(ff_attr_name, str(batch_scores_df.at[0,pred_header[i]]))
-            log.info("getAttribute({}): {}".format(ff_attr_name, flow_file.getAttribute(ff_attr_name)))
-        session.transfer(flow_file, REL_SUCCESS)
diff --git a/extensions/pythonprocessors/h2o/dai/psp/H2oPspScoreRealTime.py b/extensions/pythonprocessors/h2o/dai/psp/H2oPspScoreRealTime.py
deleted file mode 100644
index 7dec8c1..0000000
--- a/extensions/pythonprocessors/h2o/dai/psp/H2oPspScoreRealTime.py
+++ /dev/null
@@ -1,108 +0,0 @@
-#!/usr/bin/env python
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-    Install the following with pip
-
-    -- after downloading the python scoring pipeline from Driverless AI,
-       the following was needed for executing the Scorer to make predictions
-    pip install -r requirements.txt
-
-        This requirements.txt file includes pip packages that are available on
-        the internet for install, but there are some packages that only come with
-        the python scoring pipeline download, which include:
-    h2oaicore-1.8.4.1-cp36-cp36m-linux_x86_64.whl
-    scoring_h2oai_experiment_6a77d0a4_6a25_11ea_becf_0242ac110002-1.0.0-py3-none-any.whl
-"""
-import codecs
-import pandas as pd
-import datatable as dt
-from scipy.special._ufuncs import expit
-from scoring_h2oai_experiment_6a77d0a4_6a25_11ea_becf_0242ac110002 import Scorer
-
-def describe(processor):
-    """ describe what this processor does
-    """
-    processor.setDescription("Executes H2O's Python Scoring Pipeline to do real time scoring for \
-        one or more predicted label(s) on the list test data in the incoming flow file content.")
-
-def onInitialize(processor):
-    """ onInitialize is where you can set properties
-    """
-    processor.addProperty("Predicted Label(s)", "Add One or more predicted label names for the prediction \
-        header. If there is only one predicted label name, then write it in directly. If there is more than \
-        one predicted label name, then write a comma separated list of predicted label names.", "", True, False)
-
-class ContentExtract(object):
-    """ ContentExtract callback class is defined for reading streams of data through the session
-        and has a process function that accepts the input stream
-    """
-    def __init__(self):
-        self.content = None
-    
-    def process(self, input_stream):
-        """ Use codecs getReader to read that data
-        """
-        self.content = codecs.getreader('utf-8')(input_stream).read()
-        return len(self.content)
-
-class ContentWrite(object):
-    """ ContentWrite callback class is defined for writing streams of data through the session
-    """
-    def __init__(self, data):
-        self.content = data
-
-    def process(self, output_stream):
-        """ Use codecs getWriter to write data encoded to the stream
-        """
-        codecs.getwriter('utf-8')(output_stream).write(self.content)
-        return len(self.content)
-
-def onTrigger(context, session):
-    """ onTrigger is executed and passed processor context and session
-    """
-    flow_file = session.get()
-    if flow_file is not None:
-        # read test data of flow file content into read_cb.content
-        read_cb = ContentExtract()
-        session.read(flow_file, read_cb)
-        # instantiate H2O's Python Scoring Pipeline Scorer
-        scorer = Scorer()
-        # get predicted label(s) for prediction header: comma separated labels if more than one
-        pred_header_str = context.getProperty("Predicted Label(s)")
-        pred_header_list = pred_header_str.split(",")
-        # load tabular data str into datatable, convert to pd df, then to list of lists
-        test_dt_frame = dt.Frame(read_cb.content)
-        test_pd_df = test_dt_frame.to_pandas()
-        # grab first list since there is only 1 list in the list of lists 
-        test_list = test_pd_df.values.tolist()[0]
-        log.info("test_list = {}".format(test_list))
-        log.info("len(test_list) = {}".format(len(test_list)))
-        # do real time scoring on test data in the list, return list with predicted label(s)
-        preds_list = scorer.score(test_list)
-        # convert pred list to a comma-separated string followed by \n for line end
-        preds_list_str = ','.join(map(str, preds_list)) + '\n'
-        # concatenate prediction header and list string to pred table string
-        preds_str = pred_header_str + '\n' + preds_list_str
-        write_cb = ContentWrite(preds_str)
-        session.write(flow_file, write_cb)
-        # add flow file attribute: number of lists to know how many lists were scored
-        flow_file.addAttribute("num_lists_scored", str(len(test_list)))
-        # add one or more flow file attributes: predicted label name and associated score pair
-        for i in range(len(pred_header_list)):
-            ff_attr_name = pred_header_list[i] + "_pred"
-            flow_file.addAttribute(ff_attr_name, str(preds_list[i]))
-            log.info("getAttribute({}): {}".format(ff_attr_name, flow_file.getAttribute(ff_attr_name)))
-        session.transfer(flow_file, REL_SUCCESS)
\ No newline at end of file
diff --git a/extensions/pythonprocessors/h2o/h2o3/mojo/ExecuteH2oMojoScoring.py b/extensions/pythonprocessors/h2o/h2o3/mojo/ExecuteH2oMojoScoring.py
new file mode 100644
index 0000000..9307f1a
--- /dev/null
+++ b/extensions/pythonprocessors/h2o/h2o3/mojo/ExecuteH2oMojoScoring.py
@@ -0,0 +1,165 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+    -- after downloading the mojo model from h2o3, the following packages
+       are needed to execute the model to do batch or real-time scoring
+
+    Make all packages available on your machine:
+
+    sudo apt-get -y update
+
+    Install Java to include open source H2O-3 algorithms:
+    
+    sudo apt-get -y install openjdk-8-jdk
+
+    Install Datatable and pandas:
+
+    pip install datatable
+    pip install pandas
+
+    Option 1: Install H2O-3 with conda
+
+    conda create -n h2o3-nifi-minifi python=3.6
+    conda activate h2o3-nifi-minifi
+    conda config --append channels conda-forge
+    conda install -y -c h2oai h2o
+
+    Option 2: Install H2O-3 with pip
+
+    pip install requests
+    pip install tabulate
+    pip install "colorama>=0.3.8"
+    pip install future
+    pip uninstall h2o
+    If on Mac OS X, must include --user:
+        pip install -f http://h2o-release.s3.amazonaws.com/h2o/latest_stable_Py.html h2o --user
+    else:
+        pip install -f http://h2o-release.s3.amazonaws.com/h2o/latest_stable_Py.html h2o
+
+"""
+import h2o
+import codecs
+import pandas as pd
+import datatable as dt
+
+mojo_model = None
+
+def describe(processor):
+    """ describe what this processor does
+    """
+    processor.setDescription("Executes H2O-3's MOJO Model in Python to do batch scoring or \
+        real-time scoring for one or more predicted label(s) on the tabular test data in \
+        the incoming flow file content. If tabular data is one row, then MOJO does real-time \
+        scoring. If tabular data is multiple rows, then MOJO does batch scoring.")
+
+def onInitialize(processor):
+    """ onInitialize is where you can set properties
+        processor.addProperty(name, description, defaultValue, required, el)
+    """
+    processor.addProperty("MOJO Model Filepath", "Add the filepath to the MOJO Model file. For example, \
+        'path/to/mojo-model/GBM_grid__1_AutoML_20200511_075150_model_180.zip'.", "", True, False)
+
+    processor.addProperty("Is First Line Header", "Add True or False for whether first line is header.", \
+        "True", True, False)
+
+    processor.addProperty("Input Schema", "If first line is not header, then you must add Input Schema for \
+        incoming data.If there is more than one column name, write a comma separated list of \
+        column names. Else, you do not need to add an Input Schema.", "", False, False)
+
+    processor.addProperty("Use Output Header", "Add True or False for whether you want to use an output \
+        for your predictions.", "False", False, False)
+
+    processor.addProperty("Output Schema", "To set Output Schema, 'Use Output Header' must be set to 'True' \
+        If you want more descriptive column names for your predictions, then add an Output Schema. If there \
+        is more than one column name, write a comma separated list of column names. Else, H2O-3 will include \
+        them by default", "", False, False)
+
+def onSchedule(context):
+    """ onSchedule is where you load and read properties
+        this function is called 1 time when the processor is scheduled to run
+    """
+    global mojo_model
+    h2o.init()
+    # instantiate H2O-3's MOJO Model
+    mojo_model_filepath = context.getProperty("MOJO Model Filepath")
+    mojo_model = h2o.import_mojo(mojo_model_filepath)
+
+class ContentExtract(object):
+    """ ContentExtract callback class is defined for reading streams of data through the session
+        and has a process function that accepts the input stream
+    """
+    def __init__(self):
+        self.content = None
+    
+    def process(self, input_stream):
+        """ Use codecs getReader to read that data
+        """
+        self.content = codecs.getreader('utf-8')(input_stream).read()
+        return len(self.content)
+
+class ContentWrite(object):
+    """ ContentWrite callback class is defined for writing streams of data through the session
+    """
+    def __init__(self, data):
+        self.content = data
+
+    def process(self, output_stream):
+        """ Use codecs getWriter to write data encoded to the stream
+        """
+        codecs.getwriter('utf-8')(output_stream).write(self.content)
+        return len(self.content)
+
+def onTrigger(context, session):
+    """ onTrigger is executed and passed processor context and session
+    """
+    global mojo_model
+    flow_file = session.get()
+    if flow_file is not None:
+        # read test data of flow file content into read_cb.content
+        read_cb = ContentExtract()
+        session.read(flow_file, read_cb)
+        # add flow file attribute for mojo model id
+        # flow_file.addAttribute("mojo_model_id", mojo_model.model_id)
+        # load tabular data str of 1 or more rows into datatable frame
+        test_dt_frame = dt.Frame(read_cb.content)
+        test_h2o_frame = h2o.H2OFrame(python_obj=test_dt_frame.to_numpy(), column_names=list(test_dt_frame.names))
+        # does test dt frame column names (header) equal m_scorer feature_names (exp_header)
+        first_line_header = context.getProperty("Is First Line Header")
+        if first_line_header == "False":
+            input_schema = context.getProperty("Input Schema")
+            test_h2o_frame.names = list(input_schema.split(","))
+        # do scoring on test data in the test_h2o_frame, return dt frame with predicted label(s)
+        preds_h2o_frame = mojo_model.predict(test_h2o_frame)
+        use_output_header = context.getProperty("Use Output Header")
+        if use_output_header == "True":
+            output_schema = context.getProperty("Output Schema")
+            preds_h2o_frame.names = list(output_schema.split(","))
+        # convert preds_h2o_frame to pandas dataframe, use_pandas=True by default
+        preds_pd_df = h2o.as_list(preds_h2o_frame)
+        # convert pandas df to str without df index, then write to flow file
+        preds_pd_df_str = preds_pd_df.to_string(index=False)
+        write_cb = ContentWrite(preds_pd_df_str)
+        session.write(flow_file, write_cb)
+        # get list of predicted label(s) for prediction header
+        pred_header = preds_h2o_frame.names
+        # add flow file attribute: number of rows to know how many rows were scored
+        flow_file.addAttribute("num_rows_scored", str(preds_h2o_frame.nrows))
+        # add one or more flow file attributes: predicted label name and associated score pair
+        for i in range(len(pred_header)):
+            ff_attr_name = pred_header[i] + "_pred_0"
+            flow_file.addAttribute(ff_attr_name, str(preds_pd_df.at[0,pred_header[i]]))
+            log.info("getAttribute({}): {}".format(ff_attr_name, flow_file.getAttribute(ff_attr_name)))
+        session.transfer(flow_file, REL_SUCCESS)