You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@madlib.apache.org by kaknikhil <gi...@git.apache.org> on 2018/03/14 22:08:50 UTC

[GitHub] madlib pull request #241: MiniBatch Pre-Processor: Add new module minibatch_...

GitHub user kaknikhil opened a pull request:

    https://github.com/apache/madlib/pull/241

    MiniBatch Pre-Processor: Add new module minibatch_preprocessing

    JIRA: MADLIB-1200
    
    MiniBatch Preprocessor is a utility function to pre-process the input
    data for use with models that support mini-batching as an optimization.
    TODO add more description here ??
    
    The main purpose of the function is to prepare the training data for minibatching algorithms.
    1. If the dependent variable is boolean or text, perform one hot encoding.  N/A for numeric.
    2. Typecast independent variable to double precision[]
    2. Based on the buffer size, group all the dependent and independent variables in a single tuple representative of the buffer.
    
    Notes
    1. Ignore null values in independent and dependent variables
    2. Standardize the input before packing it.
    
    Other changes:
    1. Removed __ from public methods in utils_regularization.py
    Renamed __utils_ind_var_scales and __utils_ind_var_scales_grouping
    so that we can access them from within a class, more specifically
    the minibatch_preprocessing module.
    2. Added new function for regex match and refactored elastic_net.py_in to use this function
    
    Co-authored-by: Rahul Iyer <ri...@apache.org>
    Co-authored-by: Jingyi Mei <jm...@pivotal.io>
    Co-authored-by: Nandish Jayaram <nj...@apache.org>
    Co-authored-by: Orhan Kislal <ok...@pivotal.io>

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/madlib/madlib feature/minibatch_preprocessing

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/madlib/pull/241.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #241
    
----
commit 7e89d4097d1d889adfa2eff3ed6217c75b519427
Author: Nikhil Kak <nk...@...>
Date:   2018-01-24T20:01:40Z

    MiniBatch Pre-Processor: Add new module minibatch_preprocessing
    
    JIRA: MADLIB-1200
    
    MiniBatch Preprocessor is a utility function to pre-process the input
    data for use with models that support mini-batching as an optimization.
    TODO add more description here ??
    
    The main purpose of the function is to prepare the training data for minibatching algorithms.
    1. If the dependent variable is boolean or text, perform one hot encoding.  N/A for numeric.
    2. Typecast independent variable to double precision[]
    2. Based on the buffer size, group all the dependent and independent variables in a single tuple representative of the buffer.
    
    Notes
    1. Ignore null values in independent and dependent variables
    2. Standardize the input before packing it.
    
    Other changes:
    1. Removed __ from public methods in utils_regularization.py
    Rename __utils_ind_var_scales and __utils_ind_var_scales_grouping
    so that we can access them from within a class, more specifically
    the minibatch_preprocessing module.
    2. Added new function for regex match and refactored elastic_net.py_in to use this function
    
    Co-authored-by: Rahul Iyer <ri...@apache.org>
    Co-authored-by: Jingyi Mei <jm...@pivotal.io>
    Co-authored-by: Nandish Jayaram <nj...@apache.org>
    Co-authored-by: Orhan Kislal <ok...@pivotal.io>

----


---

[GitHub] madlib pull request #241: MiniBatch Pre-Processor: Add new module minibatch_...

Posted by njayaram2 <gi...@git.apache.org>.
Github user njayaram2 commented on a diff in the pull request:

    https://github.com/apache/madlib/pull/241#discussion_r175532984
  
    --- Diff: src/ports/postgres/modules/utilities/mean_std_dev_calculator.py_in ---
    @@ -0,0 +1,54 @@
    +# coding=utf-8
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +"""
    +@file mean_std_dev_calculator.py_in
    +
    +@brief
    +
    +@namespace utilities
    +
    +"""
    +
    +from convex.utils_regularization import utils_ind_var_scales
    +from utilities import _array_to_string
    +
    +m4_changequote(`<!', `!>')
    +
    +#TODO: use this for all the modules that calculate the std dev and mean for x
    +# mlp, pca, elastic_net
    --- End diff --
    
    Just a note, we should ideally have support for grouping too before using this for other modules. Although, support for grouping in this class could probably be done as part of a different JIRA.


---

[GitHub] madlib issue #241: MiniBatch Pre-Processor: Add new module minibatch_preproc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/madlib/pull/241
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/madlib-pr-build/394/



---

[GitHub] madlib pull request #241: MiniBatch Pre-Processor: Add new module minibatch_...

Posted by njayaram2 <gi...@git.apache.org>.
Github user njayaram2 commented on a diff in the pull request:

    https://github.com/apache/madlib/pull/241#discussion_r175588969
  
    --- Diff: src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in ---
    @@ -0,0 +1,559 @@
    +# coding=utf-8
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +
    +"""
    +@file minibatch_preprocessing.py_in
    +
    +"""
    +from math import ceil
    +import plpy
    +
    +from utilities import add_postfix
    +from utilities import _assert
    +from utilities import get_seg_number
    +from utilities import is_platform_pg
    +from utilities import is_psql_numeric_type
    +from utilities import is_string_formatted_as_array_expression
    +from utilities import py_list_to_sql_string
    +from utilities import split_quoted_delimited_str
    +from utilities import _string_to_array
    +from utilities import validate_module_input_params
    +from mean_std_dev_calculator import MeanStdDevCalculator
    +from validate_args import get_expr_type
    +from validate_args import output_tbl_valid
    +from validate_args import _tbl_dimension_rownum
    +
    +m4_changequote(`<!', `!>')
    +
    +# These are readonly variables, do not modify
    +MINIBATCH_OUTPUT_DEPENDENT_COLNAME = "dependent_varname"
    +MINIBATCH_OUTPUT_INDEPENDENT_COLNAME = "independent_varname"
    +
    +class MiniBatchPreProcessor:
    +    """
    +    This class is responsible for executing the main logic of mini batch
    +    preprocessing, which packs multiple rows of selected columns from the
    +    source table into one row based on the buffer size
    +    """
    +    def __init__(self, schema_madlib, source_table, output_table,
    +                  dependent_varname, independent_varname, buffer_size, **kwargs):
    +        self.schema_madlib = schema_madlib
    +        self.source_table = source_table
    +        self.output_table = output_table
    +        self.dependent_varname = dependent_varname
    +        self.independent_varname = independent_varname
    +        self.buffer_size = buffer_size
    +
    +        self.module_name = "minibatch_preprocessor"
    +        self.output_standardization_table = add_postfix(self.output_table,
    +                                                   "_standardization")
    +        self.output_summary_table = add_postfix(self.output_table, "_summary")
    +        self._validate_minibatch_preprocessor_params()
    +
    +    def minibatch_preprocessor(self):
    +        # Get array expressions for both dep and indep variables from the
    +        # MiniBatchQueryFormatter class
    +        dependent_var_dbtype = get_expr_type(self.dependent_varname,
    +                                             self.source_table)
    +        qry_formatter = MiniBatchQueryFormatter(self.source_table)
    +        dep_var_array_str, dep_var_classes_str = qry_formatter.\
    +            get_dep_var_array_and_classes(self.dependent_varname,
    +                                          dependent_var_dbtype)
    +        indep_var_array_str = qry_formatter.get_indep_var_array_str(
    +                                              self.independent_varname)
    +
    +        standardizer = MiniBatchStandardizer(self.schema_madlib,
    +                                             self.source_table,
    +                                             dep_var_array_str,
    +                                             indep_var_array_str,
    +                                             self.output_standardization_table)
    +        standardize_query = standardizer.get_query_for_standardizing()
    +
    +        num_rows_processed, num_missing_rows_skipped = self.\
    +                                                _get_skipped_rows_processed_count(
    +                                                dep_var_array_str,
    +                                                indep_var_array_str)
    +        calculated_buffer_size = MiniBatchBufferSizeCalculator.\
    +                                         calculate_default_buffer_size(
    +                                         self.buffer_size,
    +                                         num_rows_processed,
    +                                         standardizer.independent_var_dimension)
    +        """
    +        This query does the following:
    +        1. Standardize the independent variables in the input table
    +           (see MiniBatchStandardizer for more details)
    +        2. Filter out rows with null values either in dependent/independent
    +           variables
    +        3. Converts the input dependent/independent variables into arrays
    +          (see MiniBatchQueryFormatter for more details)
    +        4. Based on the buffer size, pack the dependent/independent arrays into
    +           matrices
    +
    +        Notes
    +        1. we are ignoring null in x because
    +             a. matrix_agg does not support null
    +             b. __utils_normalize_data returns null if any element of the array
    +                contains NULL
    +        2. Please keep the null checking where clause of this query in sync with
    +        the query in _get_skipped_rows_processed_count. We are doing this null
    +        check in two places to prevent another pass of the entire dataset.
    +        """
    +
    +        # This ID is the unique row id that get assigned to each row after preprocessing
    +        unique_row_id = "__id__"
    +        sql = """
    +            CREATE TABLE {output_table} AS
    +            SELECT {row_id},
    +                   {schema_madlib}.matrix_agg({dep_colname}) as {dep_colname},
    +                   {schema_madlib}.matrix_agg({ind_colname}) as {ind_colname}
    +            FROM (
    +                SELECT (row_number() OVER (ORDER BY random()) - 1) / {buffer_size}
    +                            as {row_id}, * FROM
    +                (
    +                    {standardize_query}
    +                 ) sub_query_1
    +                 WHERE NOT {schema_madlib}.array_contains_null({dep_colname})
    +                 AND NOT {schema_madlib}.array_contains_null({ind_colname})
    +            ) sub_query_2
    +            GROUP BY {row_id}
    +            {distributed_by_clause}
    +            """.format(
    +            schema_madlib=self.schema_madlib,
    +            source_table=self.source_table,
    +            output_table=self.output_table,
    +            dependent_varname=self.dependent_varname,
    +            independent_varname=self.independent_varname,
    +            buffer_size = calculated_buffer_size,
    +            dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
    +            ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
    +            row_id = unique_row_id,
    +            distributed_by_clause = '' if is_platform_pg() else 'DISTRIBUTED RANDOMLY',
    +            **locals())
    +        plpy.execute(sql)
    +
    +
    +        standardizer.create_output_standardization_table()
    +        MiniBatchSummarizer.create_output_summary_table(
    +            self.source_table,
    +            self.output_table,
    +            self.dependent_varname,
    +            self.independent_varname,
    +            calculated_buffer_size,
    +            dep_var_classes_str,
    +            num_rows_processed,
    +            num_missing_rows_skipped,
    +            self.output_summary_table)
    +
    +    def _validate_minibatch_preprocessor_params(self):
    +        # Test if the independent variable can be typecasted to a double precision
    +        # array and let postgres validate the expression
    +
    +        # Note that this will not fail for 2d arrays but the standardizer will
    +        # fail because utils_normalize_data will throw an error
    +        typecasted_ind_varname = "{0}::double precision[]".format(
    +                                                    self.independent_varname)
    +        validate_module_input_params(self.source_table, self.output_table,
    +                                     typecasted_ind_varname,
    +                                     self.dependent_varname, self.module_name)
    +
    +        self._validate_other_output_tables()
    +
    +        num_of_dependent_cols = split_quoted_delimited_str(self.dependent_varname)
    +
    +        _assert(len(num_of_dependent_cols) == 1,
    +                "Invalid dependent_varname: only one column name is allowed "
    +                "as input.")
    +
    +        if self.buffer_size is not None:
    +            _assert(self.buffer_size > 0,
    +                """minibatch_preprocessor: The buffer size has to be a positive
    +                 integer or NULL.""")
    +
    +    def _validate_other_output_tables(self):
    +        """
    +        Validate that standardization and summary table do not exist.
    +        :return:
    +        """
    +        output_tbl_valid(self.output_standardization_table, self.module_name)
    +        output_tbl_valid(self.output_summary_table, self.module_name)
    +
    +    def _get_skipped_rows_processed_count(self, dep_var_array, indep_var_array):
    +        # Note: Keep the null checking where clause of this query in sync with the
    +        # main create output table query.
    +        query = """
    +                SELECT COUNT(*) AS source_table_row_count,
    +                sum(CASE WHEN
    +                NOT {schema_madlib}.array_contains_null({dep_var_array})
    +                AND NOT {schema_madlib}.array_contains_null({indep_var_array})
    +                THEN 1 ELSE 0 END) AS num_rows_processed
    +                FROM {source_table}
    +        """.format(
    +        schema_madlib = self.schema_madlib,
    +        source_table = self.source_table,
    +        dep_var_array = dep_var_array,
    +        indep_var_array = indep_var_array)
    +        result = plpy.execute(query)
    +
    +        source_table_row_count = result[0]['source_table_row_count']
    +        num_rows_processed = result[0]['num_rows_processed']
    +        if not source_table_row_count or not num_rows_processed:
    +            plpy.error("Error while getting the row count of the source table"
    +                       "{0}".format(self.source_table))
    +        num_missing_rows_skipped = source_table_row_count - num_rows_processed
    +
    +        return num_rows_processed, num_missing_rows_skipped
    +
    +class MiniBatchQueryFormatter:
    +    """
    +    This class is responsible for formatting the independent and dependent
    +    variables into arrays so that they can be matrix agged by the preprocessor
    +    class.
    +    """
    +    def __init__(self, source_table):
    +        self.source_table = source_table
    +
    +    def get_dep_var_array_and_classes(self, dependent_varname, dependent_var_dbtype):
    +        """
    +        This function returns a tuple of
    +        1. A string with transformed dependent varname depending on it's type
    +        2. All the distinct dependent class levels encoded as a string
    +
    +        If dep_type == numeric , do not encode
    +                1. dependent_varname = rings
    +                    transformed_value = ARRAY[[rings1], [rings2], []]
    +                    class_level_str = ARRAY[rings = 'rings1',
    +                                            rings = 'rings2']::integer[]
    +                2. dependent_varname = ARRAY[a, b, c]
    +                    transformed_value = ARRAY[[a1, b1, c1], [a2, b2, c2], []]
    +                    class_level_str = 'NULL::TEXT'
    +        else if dep_type in ("text", "boolean"), encode:
    +                3. dependent_varname = rings (encoding)
    +                    transformed_value = ARRAY[[rings1=1, rings1=2], [rings2=1,
    +                                                rings2=2], []]
    +                    class_level_str = 'NULL::TEXT'
    +
    +        :param dependent_varname:
    +        :param dependent_var_dbtype:
    +        :return:
    +        """
    +        """
    +        """
    +        dep_var_class_value_str = 'NULL::TEXT'
    +        if dependent_var_dbtype in ("text", "boolean"):
    +            # for encoding, and since boolean can also be a logical expression,
    +            # there is a () for {dependent_varname} to make the query work
    +            dep_level_sql = """
    +            SELECT DISTINCT ({dependent_varname}) AS class
    +            FROM {source_table} where ({dependent_varname}) is NOT NULL
    +            """.format(dependent_varname=dependent_varname,
    +                       source_table=self.source_table)
    +            dep_levels = plpy.execute(dep_level_sql)
    +
    +            # this is string sorting
    +            dep_var_classes = sorted(
    +                ["{0}".format(l["class"]) for l in dep_levels])
    +
    +            dep_var_array_str = self._get_one_hot_encoded_str(dependent_varname,
    +                                                              dep_var_classes)
    +            dep_var_class_value_str = py_list_to_sql_string(dep_var_classes,
    +                                         array_type=dependent_var_dbtype)
    +
    +        elif "[]" in dependent_var_dbtype:
    +            dep_var_array_str = dependent_varname
    +
    +        elif is_psql_numeric_type(dependent_var_dbtype):
    +            dep_var_array_str = 'ARRAY[{0}]'.format(dependent_varname)
    +
    +        else:
    +            plpy.error("""Invalid dependent variable type. It should be text,
    +                boolean, numeric, or an array.""")
    +
    +        return dep_var_array_str, dep_var_class_value_str
    +
    +    def _get_one_hot_encoded_str(self, var_name, var_classes):
    +        one_hot_list = []
    +        for c in var_classes:
    +            one_hot_list.append("({0}) = '{1}'".format(var_name, c))
    +
    +        return 'ARRAY[{0}]::integer[]'.format(','.join(one_hot_list))
    +
    +    def get_indep_var_array_str(self, independent_varname):
    +        """
    +        we assume that all the independent features are either numeric or
    +        already encoded by the user.
    +        Supported formats
    +        1. ‘ARRAY[x1,x2,x3]’ , where x1,x2,x3 are columns in source table with
    +        scalar values
    +        2. ‘x1’, where x1 is a single column in source table, with value as an
    +        array, like ARRAY[1,2,3] or {1,2,3}
    +
    +        we don't deal with a mixture of scalar and array independent variables
    +        """
    +        typecasted_ind_varname = "{0}::double precision[]".format(
    +                                                            independent_varname)
    +        return typecasted_ind_varname
    +
    +class MiniBatchStandardizer:
    +    """
    +    This class is responsible for
    +    1. Calculating the mean and std dev for independent variables
    +    2. Format the query to standardize the input table based on the
    +       calculated mean/std dev
    +    3. Creating the output standardization table
    +    """
    +    def __init__(self, schema_madlib, source_table, dep_var_array_str,
    +                 indep_var_array_str, output_standardization_table):
    +        self.schema_madlib = schema_madlib
    +        self.source_table = source_table
    +        self.dep_var_array_str = dep_var_array_str
    +        self.indep_var_array_str = indep_var_array_str
    +        self.output_standardization_table = output_standardization_table
    +
    +        self.x_mean_str = None
    +        self.x_std_dev_str = None
    +        self.source_table_row_count = 0
    +        self.grouping_cols = "NULL"
    +        self.independent_var_dimension = None
    +        self._calculate_mean_and_std_dev_str()
    +
    +    def _calculate_mean_and_std_dev_str(self):
    +        self.independent_var_dimension, _ = _tbl_dimension_rownum(
    +                                                        self.schema_madlib,
    +                                                        self.source_table,
    +                                                        self.indep_var_array_str,
    +                                                        skip_row_count=True)
    +
    +        calculator = MeanStdDevCalculator(self.schema_madlib,
    +                                          self.source_table,
    +                                          self.indep_var_array_str,
    +                                          self.independent_var_dimension)
    +
    +        self.x_mean_str, self.x_std_dev_str = calculator.\
    +                                              get_mean_and_std_dev_for_ind_var()
    +
    +        if not self.x_mean_str or not self.x_std_dev_str:
    +            plpy.error("mean/stddev for the independent variable"
    +                       "cannot be null")
    +
    +    def get_query_for_standardizing(self):
    +        query="""
    +        SELECT
    +        {dep_var_array_str} as {dep_colname},
    +        {schema_madlib}.utils_normalize_data
    +        (
    +            {indep_var_array_str},'{x_mean_str}'::double precision[],
    +            '{x_std_dev_str}'::double precision[]
    +        ) as {ind_colname}
    +        FROM {source_table}
    +        """.format(
    +            source_table = self.source_table,
    +            schema_madlib = self.schema_madlib,
    +            dep_var_array_str = self.dep_var_array_str,
    +            indep_var_array_str = self.indep_var_array_str,
    +            dep_colname = MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
    +            ind_colname = MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
    +            x_mean_str = self.x_mean_str,
    +            x_std_dev_str = self.x_std_dev_str)
    +        return query
    +
    +    def create_output_standardization_table(self):
    +        query = """
    +        CREATE TABLE {output_standardization_table} AS
    +        select {grouping_cols}::TEXT AS grouping_cols,
    +        '{x_mean_str}'::double precision[] AS mean,
    +        '{x_std_dev_str}'::double precision[] AS std
    +        """.format(
    +        output_standardization_table = self.output_standardization_table,
    +        grouping_cols = self.grouping_cols,
    +        x_mean_str = self.x_mean_str,
    +        x_std_dev_str = self.x_std_dev_str)
    +        plpy.execute(query)
    +
    +class MiniBatchSummarizer:
    +    @staticmethod
    +    def create_output_summary_table(source_table, output_table,
    +                                    dep_var_array_str, indep_var_array_str,
    +                                    buffer_size, class_values, num_rows_processed,
    +                                    num_missing_rows_skipped, output_summary_table):
    +        query = """
    +            CREATE TABLE {output_summary_table} AS
    +            SELECT '{source_table}'::TEXT AS source_table,
    +            '{output_table}'::TEXT AS output_table,
    +            '{dependent_varname}'::TEXT AS dependent_varname,
    +            '{independent_varname}'::TEXT AS independent_varname,
    +            {buffer_size} AS buffer_size,
    +            {class_values} AS class_values,
    +            {num_rows_processed} AS num_rows_processed,
    +            {num_missing_rows_skipped} AS num_missing_rows_skipped,
    +            {grouping_cols}::TEXT AS grouping_cols
    +        """.format(output_summary_table = output_summary_table,
    +                   source_table = source_table,
    +                   output_table = output_table,
    +                   dependent_varname = dep_var_array_str,
    +                   independent_varname = indep_var_array_str,
    +                   buffer_size = buffer_size,
    +                   class_values = class_values,
    +                   num_rows_processed = num_rows_processed,
    +                   num_missing_rows_skipped = num_missing_rows_skipped,
    +                   grouping_cols = "NULL")
    +        plpy.execute(query)
    --- End diff --
    
    I tried running the module, without setting any value for `buffer_size`. In the output table, each buffer had exactly one row (due to the work in progress formula used in `MiniBatchBufferSizeCalculator`), but the summary table reported `buffer_size = 38.0`.
    `buffer_size` in the output table and summary table are in sync when I specify a value for `buffer_size`, but there seems to be an issue when we don't specify the `buffer_size`. 



---

[GitHub] madlib pull request #241: MiniBatch Pre-Processor: Add new module minibatch_...

Posted by njayaram2 <gi...@git.apache.org>.
Github user njayaram2 commented on a diff in the pull request:

    https://github.com/apache/madlib/pull/241#discussion_r175522378
  
    --- Diff: src/ports/postgres/modules/utilities/utilities.py_in ---
    @@ -794,6 +794,41 @@ def collate_plpy_result(plpy_result_rows):
     # ------------------------------------------------------------------------------
     
     
    +def validate_module_input_params(source_table, output_table, independent_varname,
    +                  dependent_varname, module_name, **kwargs):
    --- End diff --
    
    How about having an optional param to deal with checking for residual output tables (summary and standardization tables). We could take a list of suffixes to check for.


---

[GitHub] madlib pull request #241: MiniBatch Pre-Processor: Add new module minibatch_...

Posted by jingyimei <gi...@git.apache.org>.
Github user jingyimei commented on a diff in the pull request:

    https://github.com/apache/madlib/pull/241#discussion_r175957289
  
    --- Diff: src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in ---
    @@ -0,0 +1,559 @@
    +# coding=utf-8
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +
    +"""
    +@file minibatch_preprocessing.py_in
    +
    +"""
    +from math import ceil
    +import plpy
    +
    +from utilities import add_postfix
    +from utilities import _assert
    +from utilities import get_seg_number
    +from utilities import is_platform_pg
    +from utilities import is_psql_numeric_type
    +from utilities import is_string_formatted_as_array_expression
    +from utilities import py_list_to_sql_string
    +from utilities import split_quoted_delimited_str
    +from utilities import _string_to_array
    +from utilities import validate_module_input_params
    +from mean_std_dev_calculator import MeanStdDevCalculator
    +from validate_args import get_expr_type
    +from validate_args import output_tbl_valid
    +from validate_args import _tbl_dimension_rownum
    +
    +m4_changequote(`<!', `!>')
    +
    +# These are readonly variables, do not modify
    +MINIBATCH_OUTPUT_DEPENDENT_COLNAME = "dependent_varname"
    +MINIBATCH_OUTPUT_INDEPENDENT_COLNAME = "independent_varname"
    +
    +class MiniBatchPreProcessor:
    +    """
    +    This class is responsible for executing the main logic of mini batch
    +    preprocessing, which packs multiple rows of selected columns from the
    +    source table into one row based on the buffer size
    +    """
    +    def __init__(self, schema_madlib, source_table, output_table,
    +                  dependent_varname, independent_varname, buffer_size, **kwargs):
    +        self.schema_madlib = schema_madlib
    +        self.source_table = source_table
    +        self.output_table = output_table
    +        self.dependent_varname = dependent_varname
    +        self.independent_varname = independent_varname
    +        self.buffer_size = buffer_size
    +
    +        self.module_name = "minibatch_preprocessor"
    +        self.output_standardization_table = add_postfix(self.output_table,
    +                                                   "_standardization")
    +        self.output_summary_table = add_postfix(self.output_table, "_summary")
    +        self._validate_minibatch_preprocessor_params()
    +
    +    def minibatch_preprocessor(self):
    +        # Get array expressions for both dep and indep variables from the
    +        # MiniBatchQueryFormatter class
    +        dependent_var_dbtype = get_expr_type(self.dependent_varname,
    +                                             self.source_table)
    +        qry_formatter = MiniBatchQueryFormatter(self.source_table)
    +        dep_var_array_str, dep_var_classes_str = qry_formatter.\
    +            get_dep_var_array_and_classes(self.dependent_varname,
    +                                          dependent_var_dbtype)
    +        indep_var_array_str = qry_formatter.get_indep_var_array_str(
    +                                              self.independent_varname)
    +
    +        standardizer = MiniBatchStandardizer(self.schema_madlib,
    +                                             self.source_table,
    +                                             dep_var_array_str,
    +                                             indep_var_array_str,
    +                                             self.output_standardization_table)
    +        standardize_query = standardizer.get_query_for_standardizing()
    +
    +        num_rows_processed, num_missing_rows_skipped = self.\
    +                                                _get_skipped_rows_processed_count(
    +                                                dep_var_array_str,
    +                                                indep_var_array_str)
    +        calculated_buffer_size = MiniBatchBufferSizeCalculator.\
    +                                         calculate_default_buffer_size(
    +                                         self.buffer_size,
    +                                         num_rows_processed,
    +                                         standardizer.independent_var_dimension)
    +        """
    +        This query does the following:
    +        1. Standardize the independent variables in the input table
    +           (see MiniBatchStandardizer for more details)
    +        2. Filter out rows with null values either in dependent/independent
    +           variables
    +        3. Converts the input dependent/independent variables into arrays
    +          (see MiniBatchQueryFormatter for more details)
    +        4. Based on the buffer size, pack the dependent/independent arrays into
    +           matrices
    +
    +        Notes
    +        1. we are ignoring null in x because
    +             a. matrix_agg does not support null
    +             b. __utils_normalize_data returns null if any element of the array
    +                contains NULL
    +        2. Please keep the null checking where clause of this query in sync with
    +        the query in _get_skipped_rows_processed_count. We are doing this null
    +        check in two places to prevent another pass of the entire dataset.
    +        """
    +
    +        # This ID is the unique row id that get assigned to each row after preprocessing
    +        unique_row_id = "__id__"
    +        sql = """
    +            CREATE TABLE {output_table} AS
    +            SELECT {row_id},
    +                   {schema_madlib}.matrix_agg({dep_colname}) as {dep_colname},
    +                   {schema_madlib}.matrix_agg({ind_colname}) as {ind_colname}
    +            FROM (
    +                SELECT (row_number() OVER (ORDER BY random()) - 1) / {buffer_size}
    +                            as {row_id}, * FROM
    +                (
    +                    {standardize_query}
    +                 ) sub_query_1
    +                 WHERE NOT {schema_madlib}.array_contains_null({dep_colname})
    +                 AND NOT {schema_madlib}.array_contains_null({ind_colname})
    +            ) sub_query_2
    +            GROUP BY {row_id}
    +            {distributed_by_clause}
    +            """.format(
    +            schema_madlib=self.schema_madlib,
    +            source_table=self.source_table,
    +            output_table=self.output_table,
    +            dependent_varname=self.dependent_varname,
    +            independent_varname=self.independent_varname,
    +            buffer_size = calculated_buffer_size,
    +            dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
    +            ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
    +            row_id = unique_row_id,
    +            distributed_by_clause = '' if is_platform_pg() else 'DISTRIBUTED RANDOMLY',
    +            **locals())
    +        plpy.execute(sql)
    +
    +
    +        standardizer.create_output_standardization_table()
    +        MiniBatchSummarizer.create_output_summary_table(
    +            self.source_table,
    +            self.output_table,
    +            self.dependent_varname,
    +            self.independent_varname,
    +            calculated_buffer_size,
    +            dep_var_classes_str,
    +            num_rows_processed,
    +            num_missing_rows_skipped,
    +            self.output_summary_table)
    +
    +    def _validate_minibatch_preprocessor_params(self):
    +        # Test if the independent variable can be typecasted to a double precision
    +        # array and let postgres validate the expression
    +
    +        # Note that this will not fail for 2d arrays but the standardizer will
    +        # fail because utils_normalize_data will throw an error
    +        typecasted_ind_varname = "{0}::double precision[]".format(
    +                                                    self.independent_varname)
    +        validate_module_input_params(self.source_table, self.output_table,
    +                                     typecasted_ind_varname,
    +                                     self.dependent_varname, self.module_name)
    +
    +        self._validate_other_output_tables()
    +
    +        num_of_dependent_cols = split_quoted_delimited_str(self.dependent_varname)
    +
    +        _assert(len(num_of_dependent_cols) == 1,
    +                "Invalid dependent_varname: only one column name is allowed "
    +                "as input.")
    +
    +        if self.buffer_size is not None:
    +            _assert(self.buffer_size > 0,
    +                """minibatch_preprocessor: The buffer size has to be a positive
    +                 integer or NULL.""")
    +
    +    def _validate_other_output_tables(self):
    +        """
    +        Validate that standardization and summary table do not exist.
    +        :return:
    +        """
    +        output_tbl_valid(self.output_standardization_table, self.module_name)
    +        output_tbl_valid(self.output_summary_table, self.module_name)
    +
    +    def _get_skipped_rows_processed_count(self, dep_var_array, indep_var_array):
    +        # Note: Keep the null checking where clause of this query in sync with the
    +        # main create output table query.
    +        query = """
    +                SELECT COUNT(*) AS source_table_row_count,
    +                sum(CASE WHEN
    +                NOT {schema_madlib}.array_contains_null({dep_var_array})
    +                AND NOT {schema_madlib}.array_contains_null({indep_var_array})
    +                THEN 1 ELSE 0 END) AS num_rows_processed
    +                FROM {source_table}
    +        """.format(
    +        schema_madlib = self.schema_madlib,
    +        source_table = self.source_table,
    +        dep_var_array = dep_var_array,
    +        indep_var_array = indep_var_array)
    +        result = plpy.execute(query)
    +
    +        source_table_row_count = result[0]['source_table_row_count']
    +        num_rows_processed = result[0]['num_rows_processed']
    +        if not source_table_row_count or not num_rows_processed:
    +            plpy.error("Error while getting the row count of the source table"
    +                       "{0}".format(self.source_table))
    +        num_missing_rows_skipped = source_table_row_count - num_rows_processed
    +
    +        return num_rows_processed, num_missing_rows_skipped
    +
    +class MiniBatchQueryFormatter:
    +    """
    +    This class is responsible for formatting the independent and dependent
    +    variables into arrays so that they can be matrix agged by the preprocessor
    +    class.
    +    """
    +    def __init__(self, source_table):
    +        self.source_table = source_table
    +
    +    def get_dep_var_array_and_classes(self, dependent_varname, dependent_var_dbtype):
    +        """
    +        This function returns a tuple of
    +        1. A string with transformed dependent varname depending on it's type
    +        2. All the distinct dependent class levels encoded as a string
    +
    +        If dep_type == numeric , do not encode
    +                1. dependent_varname = rings
    +                    transformed_value = ARRAY[[rings1], [rings2], []]
    +                    class_level_str = ARRAY[rings = 'rings1',
    +                                            rings = 'rings2']::integer[]
    +                2. dependent_varname = ARRAY[a, b, c]
    +                    transformed_value = ARRAY[[a1, b1, c1], [a2, b2, c2], []]
    +                    class_level_str = 'NULL::TEXT'
    +        else if dep_type in ("text", "boolean"), encode:
    +                3. dependent_varname = rings (encoding)
    +                    transformed_value = ARRAY[[rings1=1, rings1=2], [rings2=1,
    +                                                rings2=2], []]
    +                    class_level_str = 'NULL::TEXT'
    +
    +        :param dependent_varname:
    +        :param dependent_var_dbtype:
    +        :return:
    +        """
    +        """
    +        """
    +        dep_var_class_value_str = 'NULL::TEXT'
    +        if dependent_var_dbtype in ("text", "boolean"):
    +            # for encoding, and since boolean can also be a logical expression,
    +            # there is a () for {dependent_varname} to make the query work
    +            dep_level_sql = """
    +            SELECT DISTINCT ({dependent_varname}) AS class
    +            FROM {source_table} where ({dependent_varname}) is NOT NULL
    +            """.format(dependent_varname=dependent_varname,
    +                       source_table=self.source_table)
    +            dep_levels = plpy.execute(dep_level_sql)
    +
    +            # this is string sorting
    +            dep_var_classes = sorted(
    +                ["{0}".format(l["class"]) for l in dep_levels])
    +
    +            dep_var_array_str = self._get_one_hot_encoded_str(dependent_varname,
    +                                                              dep_var_classes)
    +            dep_var_class_value_str = py_list_to_sql_string(dep_var_classes,
    +                                         array_type=dependent_var_dbtype)
    +
    +        elif "[]" in dependent_var_dbtype:
    +            dep_var_array_str = dependent_varname
    +
    +        elif is_psql_numeric_type(dependent_var_dbtype):
    +            dep_var_array_str = 'ARRAY[{0}]'.format(dependent_varname)
    +
    +        else:
    +            plpy.error("""Invalid dependent variable type. It should be text,
    +                boolean, numeric, or an array.""")
    +
    +        return dep_var_array_str, dep_var_class_value_str
    +
    +    def _get_one_hot_encoded_str(self, var_name, var_classes):
    +        one_hot_list = []
    +        for c in var_classes:
    +            one_hot_list.append("({0}) = '{1}'".format(var_name, c))
    +
    +        return 'ARRAY[{0}]::integer[]'.format(','.join(one_hot_list))
    +
    +    def get_indep_var_array_str(self, independent_varname):
    +        """
    +        we assume that all the independent features are either numeric or
    +        already encoded by the user.
    +        Supported formats
    +        1. ‘ARRAY[x1,x2,x3]’ , where x1,x2,x3 are columns in source table with
    +        scalar values
    +        2. ‘x1’, where x1 is a single column in source table, with value as an
    +        array, like ARRAY[1,2,3] or {1,2,3}
    +
    +        we don't deal with a mixture of scalar and array independent variables
    +        """
    +        typecasted_ind_varname = "{0}::double precision[]".format(
    +                                                            independent_varname)
    +        return typecasted_ind_varname
    +
    +class MiniBatchStandardizer:
    +    """
    +    This class is responsible for
    +    1. Calculating the mean and std dev for independent variables
    +    2. Format the query to standardize the input table based on the
    +       calculated mean/std dev
    +    3. Creating the output standardization table
    +    """
    +    def __init__(self, schema_madlib, source_table, dep_var_array_str,
    +                 indep_var_array_str, output_standardization_table):
    +        self.schema_madlib = schema_madlib
    +        self.source_table = source_table
    +        self.dep_var_array_str = dep_var_array_str
    +        self.indep_var_array_str = indep_var_array_str
    +        self.output_standardization_table = output_standardization_table
    +
    +        self.x_mean_str = None
    +        self.x_std_dev_str = None
    +        self.source_table_row_count = 0
    +        self.grouping_cols = "NULL"
    +        self.independent_var_dimension = None
    +        self._calculate_mean_and_std_dev_str()
    +
    +    def _calculate_mean_and_std_dev_str(self):
    +        self.independent_var_dimension, _ = _tbl_dimension_rownum(
    +                                                        self.schema_madlib,
    +                                                        self.source_table,
    +                                                        self.indep_var_array_str,
    +                                                        skip_row_count=True)
    +
    +        calculator = MeanStdDevCalculator(self.schema_madlib,
    +                                          self.source_table,
    +                                          self.indep_var_array_str,
    +                                          self.independent_var_dimension)
    +
    +        self.x_mean_str, self.x_std_dev_str = calculator.\
    +                                              get_mean_and_std_dev_for_ind_var()
    +
    +        if not self.x_mean_str or not self.x_std_dev_str:
    +            plpy.error("mean/stddev for the independent variable"
    +                       "cannot be null")
    +
    +    def get_query_for_standardizing(self):
    +        query="""
    +        SELECT
    +        {dep_var_array_str} as {dep_colname},
    +        {schema_madlib}.utils_normalize_data
    +        (
    +            {indep_var_array_str},'{x_mean_str}'::double precision[],
    +            '{x_std_dev_str}'::double precision[]
    +        ) as {ind_colname}
    +        FROM {source_table}
    +        """.format(
    +            source_table = self.source_table,
    +            schema_madlib = self.schema_madlib,
    +            dep_var_array_str = self.dep_var_array_str,
    +            indep_var_array_str = self.indep_var_array_str,
    +            dep_colname = MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
    +            ind_colname = MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
    +            x_mean_str = self.x_mean_str,
    +            x_std_dev_str = self.x_std_dev_str)
    +        return query
    +
    +    def create_output_standardization_table(self):
    +        query = """
    +        CREATE TABLE {output_standardization_table} AS
    +        select {grouping_cols}::TEXT AS grouping_cols,
    +        '{x_mean_str}'::double precision[] AS mean,
    +        '{x_std_dev_str}'::double precision[] AS std
    +        """.format(
    +        output_standardization_table = self.output_standardization_table,
    +        grouping_cols = self.grouping_cols,
    +        x_mean_str = self.x_mean_str,
    +        x_std_dev_str = self.x_std_dev_str)
    +        plpy.execute(query)
    +
    +class MiniBatchSummarizer:
    +    @staticmethod
    +    def create_output_summary_table(source_table, output_table,
    +                                    dep_var_array_str, indep_var_array_str,
    +                                    buffer_size, class_values, num_rows_processed,
    +                                    num_missing_rows_skipped, output_summary_table):
    +        query = """
    +            CREATE TABLE {output_summary_table} AS
    +            SELECT '{source_table}'::TEXT AS source_table,
    +            '{output_table}'::TEXT AS output_table,
    +            '{dependent_varname}'::TEXT AS dependent_varname,
    +            '{independent_varname}'::TEXT AS independent_varname,
    +            {buffer_size} AS buffer_size,
    +            {class_values} AS class_values,
    +            {num_rows_processed} AS num_rows_processed,
    +            {num_missing_rows_skipped} AS num_missing_rows_skipped,
    +            {grouping_cols}::TEXT AS grouping_cols
    +        """.format(output_summary_table = output_summary_table,
    +                   source_table = source_table,
    +                   output_table = output_table,
    +                   dependent_varname = dep_var_array_str,
    +                   independent_varname = indep_var_array_str,
    +                   buffer_size = buffer_size,
    +                   class_values = class_values,
    +                   num_rows_processed = num_rows_processed,
    +                   num_missing_rows_skipped = num_missing_rows_skipped,
    +                   grouping_cols = "NULL")
    +        plpy.execute(query)
    +
    +class MiniBatchBufferSizeCalculator:
    +    """
    +    This class is responsible for calculating the buffer size.
    +    This is a work in progress, final formula might change.
    +    """
    +    @staticmethod
    +    def calculate_default_buffer_size(buffer_size,
    +                                      num_rows_processed, independent_var_dimension):
    +        if buffer_size is not None:
    +            return buffer_size
    +        num_of_segments = get_seg_number()
    +
    +        default_buffer_size = min(75000000.0/independent_var_dimension,
    +                                    float(num_rows_processed)/num_of_segments)
    +        return ceil(default_buffer_size)
    +
    +class MiniBatchDocumentation:
    +    @staticmethod
    +    def minibatch_preprocessor_help(schema_madlib, message):
    +        method = "minibatch_preprocessor"
    +        summary = """
    +        ----------------------------------------------------------------
    +                            SUMMARY
    +        ----------------------------------------------------------------
    +        MiniBatch Preprocessor is a utility function to pre process the input
    +        data for use with models that support mini-batching as an optimization
    +
    +        #TODO add more here
    --- End diff --
    
    Yes. This is a placeholder for adding more detail description later.


---

[GitHub] madlib issue #241: MiniBatch Pre-Processor: Add new module minibatch_preproc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/madlib/pull/241
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/madlib-pr-build/375/



---

[GitHub] madlib issue #241: MiniBatch Pre-Processor: Add new module minibatch_preproc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/madlib/pull/241
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/madlib-pr-build/370/



---

[GitHub] madlib issue #241: MiniBatch Pre-Processor: Add new module minibatch_preproc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/madlib/pull/241
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/madlib-pr-build/393/



---

[GitHub] madlib issue #241: MiniBatch Pre-Processor: Add new module minibatch_preproc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/madlib/pull/241
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/madlib-pr-build/373/



---

[GitHub] madlib pull request #241: MiniBatch Pre-Processor: Add new module minibatch_...

Posted by kaknikhil <gi...@git.apache.org>.
Github user kaknikhil commented on a diff in the pull request:

    https://github.com/apache/madlib/pull/241#discussion_r174625748
  
    --- Diff: src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in ---
    @@ -0,0 +1,559 @@
    +# coding=utf-8
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +
    +"""
    +@file minibatch_preprocessing.py_in
    +
    +"""
    +from math import ceil
    +import plpy
    +
    +from utilities import add_postfix
    +from utilities import _assert
    +from utilities import get_seg_number
    +from utilities import is_platform_pg
    +from utilities import is_psql_numeric_type
    +from utilities import is_string_formatted_as_array_expression
    +from utilities import py_list_to_sql_string
    +from utilities import split_quoted_delimited_str
    +from utilities import _string_to_array
    +from utilities import validate_module_input_params
    +from mean_std_dev_calculator import MeanStdDevCalculator
    +from validate_args import get_expr_type
    +from validate_args import output_tbl_valid
    +from validate_args import _tbl_dimension_rownum
    +
    +m4_changequote(`<!', `!>')
    +
    +# These are readonly variables, do not modify
    +MINIBATCH_OUTPUT_DEPENDENT_COLNAME = "dependent_varname"
    +MINIBATCH_OUTPUT_INDEPENDENT_COLNAME = "independent_varname"
    +
    +class MiniBatchPreProcessor:
    +    """
    +    This class is responsible for executing the main logic of mini batch
    +    preprocessing, which packs multiple rows of selected columns from the
    +    source table into one row based on the buffer size
    +    """
    +    def __init__(self, schema_madlib, source_table, output_table,
    +                  dependent_varname, independent_varname, buffer_size, **kwargs):
    +        self.schema_madlib = schema_madlib
    +        self.source_table = source_table
    +        self.output_table = output_table
    +        self.dependent_varname = dependent_varname
    +        self.independent_varname = independent_varname
    +        self.buffer_size = buffer_size
    +
    +        self.module_name = "minibatch_preprocessor"
    +        self.output_standardization_table = add_postfix(self.output_table,
    +                                                   "_standardization")
    +        self.output_summary_table = add_postfix(self.output_table, "_summary")
    +        self._validate_minibatch_preprocessor_params()
    +
    +    def minibatch_preprocessor(self):
    +        # Get array expressions for both dep and indep variables from the
    +        # MiniBatchQueryFormatter class
    +        dependent_var_dbtype = get_expr_type(self.dependent_varname,
    +                                             self.source_table)
    +        qry_formatter = MiniBatchQueryFormatter(self.source_table)
    +        dep_var_array_str, dep_var_classes_str = qry_formatter.\
    +            get_dep_var_array_and_classes(self.dependent_varname,
    +                                          dependent_var_dbtype)
    +        indep_var_array_str = qry_formatter.get_indep_var_array_str(
    +                                              self.independent_varname)
    +
    +        standardizer = MiniBatchStandardizer(self.schema_madlib,
    +                                             self.source_table,
    +                                             dep_var_array_str,
    +                                             indep_var_array_str,
    +                                             self.output_standardization_table)
    +        standardize_query = standardizer.get_query_for_standardizing()
    +
    +        num_rows_processed, num_missing_rows_skipped = self.\
    +                                                _get_skipped_rows_processed_count(
    +                                                dep_var_array_str,
    +                                                indep_var_array_str)
    +        calculated_buffer_size = MiniBatchBufferSizeCalculator.\
    +                                         calculate_default_buffer_size(
    +                                         self.buffer_size,
    +                                         num_rows_processed,
    +                                         standardizer.independent_var_dimension)
    +        """
    +        This query does the following:
    +        1. Standardize the independent variables in the input table
    +           (see MiniBatchStandardizer for more details)
    +        2. Filter out rows with null values either in dependent/independent
    +           variables
    +        3. Converts the input dependent/independent variables into arrays
    +          (see MiniBatchQueryFormatter for more details)
    +        4. Based on the buffer size, pack the dependent/independent arrays into
    +           matrices
    +
    +        Notes
    +        1. we are ignoring null in x because
    +             a. matrix_agg does not support null
    +             b. __utils_normalize_data returns null if any element of the array
    +                contains NULL
    +        2. Please keep the null checking where clause of this query in sync with
    +        the query in _get_skipped_rows_processed_count. We are doing this null
    +        check in two places to prevent another pass of the entire dataset.
    +        """
    +
    +        # This ID is the unique row id that get assigned to each row after preprocessing
    +        unique_row_id = "__id__"
    +        sql = """
    +            CREATE TABLE {output_table} AS
    +            SELECT {row_id},
    +                   {schema_madlib}.matrix_agg({dep_colname}) as {dep_colname},
    +                   {schema_madlib}.matrix_agg({ind_colname}) as {ind_colname}
    +            FROM (
    +                SELECT (row_number() OVER (ORDER BY random()) - 1) / {buffer_size}
    +                            as {row_id}, * FROM
    +                (
    +                    {standardize_query}
    +                 ) sub_query_1
    +                 WHERE NOT {schema_madlib}.array_contains_null({dep_colname})
    +                 AND NOT {schema_madlib}.array_contains_null({ind_colname})
    +            ) sub_query_2
    +            GROUP BY {row_id}
    +            {distributed_by_clause}
    +            """.format(
    +            schema_madlib=self.schema_madlib,
    +            source_table=self.source_table,
    +            output_table=self.output_table,
    +            dependent_varname=self.dependent_varname,
    +            independent_varname=self.independent_varname,
    +            buffer_size = calculated_buffer_size,
    +            dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
    +            ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
    +            row_id = unique_row_id,
    +            distributed_by_clause = '' if is_platform_pg() else 'DISTRIBUTED RANDOMLY',
    +            **locals())
    +        plpy.execute(sql)
    +
    +
    +        standardizer.create_output_standardization_table()
    +        MiniBatchSummarizer.create_output_summary_table(
    +            self.source_table,
    +            self.output_table,
    +            self.dependent_varname,
    +            self.independent_varname,
    +            calculated_buffer_size,
    +            dep_var_classes_str,
    +            num_rows_processed,
    +            num_missing_rows_skipped,
    +            self.output_summary_table)
    +
    +    def _validate_minibatch_preprocessor_params(self):
    +        # Test if the independent variable can be typecasted to a double precision
    +        # array and let postgres validate the expression
    +
    +        # Note that this will not fail for 2d arrays but the standardizer will
    +        # fail because utils_normalize_data will throw an error
    +        typecasted_ind_varname = "{0}::double precision[]".format(
    --- End diff --
    
    This code is also duplicated in get_indep_var_array_str(). We kept the duplication because it looked cleaner.
    Suggestions are welcome. 


---

[GitHub] madlib issue #241: MiniBatch Pre-Processor: Add new module minibatch_preproc...

Posted by jingyimei <gi...@git.apache.org>.
Github user jingyimei commented on the issue:

    https://github.com/apache/madlib/pull/241
  
    @njayaram2 
    ```
    Another issue I found but forgot to mention in the review:
    The __id__ column has double values instead of integers. For instance, I found values such as 0.20000000000000000000 for that column in the output table.
    This issue also happens only when the module is used without specifying a value for the buffer_size param.
    ```
    
    This is the caused by the same math.ceil bug as mentioned above. will take care of this. 


---

[GitHub] madlib pull request #241: MiniBatch Pre-Processor: Add new module minibatch_...

Posted by jingyimei <gi...@git.apache.org>.
Github user jingyimei commented on a diff in the pull request:

    https://github.com/apache/madlib/pull/241#discussion_r175957056
  
    --- Diff: src/ports/postgres/modules/utilities/mean_std_dev_calculator.py_in ---
    @@ -0,0 +1,54 @@
    +# coding=utf-8
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +"""
    +@file mean_std_dev_calculator.py_in
    +
    +@brief
    +
    +@namespace utilities
    +
    +"""
    +
    +from convex.utils_regularization import utils_ind_var_scales
    +from utilities import _array_to_string
    +
    +m4_changequote(`<!', `!>')
    +
    +#TODO: use this for all the modules that calculate the std dev and mean for x
    +# mlp, pca, elastic_net
    --- End diff --
    
    Yes the TODO comment is left here for future reference. The idea is that whenever we update any of these modules, we can refactor the code to call this class. 


---

[GitHub] madlib pull request #241: MiniBatch Pre-Processor: Add new module minibatch_...

Posted by njayaram2 <gi...@git.apache.org>.
Github user njayaram2 commented on a diff in the pull request:

    https://github.com/apache/madlib/pull/241#discussion_r175548350
  
    --- Diff: src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in ---
    @@ -0,0 +1,559 @@
    +# coding=utf-8
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +
    +"""
    +@file minibatch_preprocessing.py_in
    +
    +"""
    +from math import ceil
    +import plpy
    +
    +from utilities import add_postfix
    +from utilities import _assert
    +from utilities import get_seg_number
    +from utilities import is_platform_pg
    +from utilities import is_psql_numeric_type
    +from utilities import is_string_formatted_as_array_expression
    +from utilities import py_list_to_sql_string
    +from utilities import split_quoted_delimited_str
    +from utilities import _string_to_array
    +from utilities import validate_module_input_params
    +from mean_std_dev_calculator import MeanStdDevCalculator
    +from validate_args import get_expr_type
    +from validate_args import output_tbl_valid
    +from validate_args import _tbl_dimension_rownum
    +
    +m4_changequote(`<!', `!>')
    +
    +# These are readonly variables, do not modify
    +MINIBATCH_OUTPUT_DEPENDENT_COLNAME = "dependent_varname"
    +MINIBATCH_OUTPUT_INDEPENDENT_COLNAME = "independent_varname"
    +
    +class MiniBatchPreProcessor:
    +    """
    +    This class is responsible for executing the main logic of mini batch
    +    preprocessing, which packs multiple rows of selected columns from the
    +    source table into one row based on the buffer size
    +    """
    +    def __init__(self, schema_madlib, source_table, output_table,
    +                  dependent_varname, independent_varname, buffer_size, **kwargs):
    +        self.schema_madlib = schema_madlib
    +        self.source_table = source_table
    +        self.output_table = output_table
    +        self.dependent_varname = dependent_varname
    +        self.independent_varname = independent_varname
    +        self.buffer_size = buffer_size
    +
    +        self.module_name = "minibatch_preprocessor"
    +        self.output_standardization_table = add_postfix(self.output_table,
    +                                                   "_standardization")
    +        self.output_summary_table = add_postfix(self.output_table, "_summary")
    +        self._validate_minibatch_preprocessor_params()
    +
    +    def minibatch_preprocessor(self):
    +        # Get array expressions for both dep and indep variables from the
    +        # MiniBatchQueryFormatter class
    +        dependent_var_dbtype = get_expr_type(self.dependent_varname,
    +                                             self.source_table)
    +        qry_formatter = MiniBatchQueryFormatter(self.source_table)
    +        dep_var_array_str, dep_var_classes_str = qry_formatter.\
    +            get_dep_var_array_and_classes(self.dependent_varname,
    +                                          dependent_var_dbtype)
    +        indep_var_array_str = qry_formatter.get_indep_var_array_str(
    +                                              self.independent_varname)
    +
    +        standardizer = MiniBatchStandardizer(self.schema_madlib,
    +                                             self.source_table,
    +                                             dep_var_array_str,
    +                                             indep_var_array_str,
    +                                             self.output_standardization_table)
    +        standardize_query = standardizer.get_query_for_standardizing()
    +
    +        num_rows_processed, num_missing_rows_skipped = self.\
    +                                                _get_skipped_rows_processed_count(
    +                                                dep_var_array_str,
    +                                                indep_var_array_str)
    +        calculated_buffer_size = MiniBatchBufferSizeCalculator.\
    +                                         calculate_default_buffer_size(
    +                                         self.buffer_size,
    +                                         num_rows_processed,
    +                                         standardizer.independent_var_dimension)
    +        """
    +        This query does the following:
    +        1. Standardize the independent variables in the input table
    +           (see MiniBatchStandardizer for more details)
    +        2. Filter out rows with null values either in dependent/independent
    +           variables
    +        3. Converts the input dependent/independent variables into arrays
    +          (see MiniBatchQueryFormatter for more details)
    +        4. Based on the buffer size, pack the dependent/independent arrays into
    +           matrices
    +
    +        Notes
    +        1. we are ignoring null in x because
    +             a. matrix_agg does not support null
    +             b. __utils_normalize_data returns null if any element of the array
    +                contains NULL
    +        2. Please keep the null checking where clause of this query in sync with
    +        the query in _get_skipped_rows_processed_count. We are doing this null
    +        check in two places to prevent another pass of the entire dataset.
    +        """
    +
    +        # This ID is the unique row id that get assigned to each row after preprocessing
    +        unique_row_id = "__id__"
    +        sql = """
    +            CREATE TABLE {output_table} AS
    +            SELECT {row_id},
    +                   {schema_madlib}.matrix_agg({dep_colname}) as {dep_colname},
    +                   {schema_madlib}.matrix_agg({ind_colname}) as {ind_colname}
    +            FROM (
    +                SELECT (row_number() OVER (ORDER BY random()) - 1) / {buffer_size}
    +                            as {row_id}, * FROM
    +                (
    +                    {standardize_query}
    +                 ) sub_query_1
    +                 WHERE NOT {schema_madlib}.array_contains_null({dep_colname})
    +                 AND NOT {schema_madlib}.array_contains_null({ind_colname})
    +            ) sub_query_2
    +            GROUP BY {row_id}
    +            {distributed_by_clause}
    +            """.format(
    +            schema_madlib=self.schema_madlib,
    +            source_table=self.source_table,
    +            output_table=self.output_table,
    +            dependent_varname=self.dependent_varname,
    +            independent_varname=self.independent_varname,
    +            buffer_size = calculated_buffer_size,
    +            dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
    +            ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
    +            row_id = unique_row_id,
    +            distributed_by_clause = '' if is_platform_pg() else 'DISTRIBUTED RANDOMLY',
    +            **locals())
    +        plpy.execute(sql)
    +
    +
    +        standardizer.create_output_standardization_table()
    +        MiniBatchSummarizer.create_output_summary_table(
    +            self.source_table,
    +            self.output_table,
    +            self.dependent_varname,
    +            self.independent_varname,
    +            calculated_buffer_size,
    +            dep_var_classes_str,
    +            num_rows_processed,
    +            num_missing_rows_skipped,
    +            self.output_summary_table)
    +
    +    def _validate_minibatch_preprocessor_params(self):
    +        # Test if the independent variable can be typecasted to a double precision
    +        # array and let postgres validate the expression
    +
    +        # Note that this will not fail for 2d arrays but the standardizer will
    +        # fail because utils_normalize_data will throw an error
    +        typecasted_ind_varname = "{0}::double precision[]".format(
    +                                                    self.independent_varname)
    +        validate_module_input_params(self.source_table, self.output_table,
    +                                     typecasted_ind_varname,
    +                                     self.dependent_varname, self.module_name)
    +
    +        self._validate_other_output_tables()
    +
    +        num_of_dependent_cols = split_quoted_delimited_str(self.dependent_varname)
    +
    +        _assert(len(num_of_dependent_cols) == 1,
    +                "Invalid dependent_varname: only one column name is allowed "
    +                "as input.")
    +
    +        if self.buffer_size is not None:
    +            _assert(self.buffer_size > 0,
    +                """minibatch_preprocessor: The buffer size has to be a positive
    +                 integer or NULL.""")
    +
    +    def _validate_other_output_tables(self):
    +        """
    +        Validate that standardization and summary table do not exist.
    +        :return:
    +        """
    +        output_tbl_valid(self.output_standardization_table, self.module_name)
    +        output_tbl_valid(self.output_summary_table, self.module_name)
    +
    +    def _get_skipped_rows_processed_count(self, dep_var_array, indep_var_array):
    +        # Note: Keep the null checking where clause of this query in sync with the
    +        # main create output table query.
    +        query = """
    +                SELECT COUNT(*) AS source_table_row_count,
    +                sum(CASE WHEN
    +                NOT {schema_madlib}.array_contains_null({dep_var_array})
    +                AND NOT {schema_madlib}.array_contains_null({indep_var_array})
    +                THEN 1 ELSE 0 END) AS num_rows_processed
    +                FROM {source_table}
    +        """.format(
    +        schema_madlib = self.schema_madlib,
    +        source_table = self.source_table,
    +        dep_var_array = dep_var_array,
    +        indep_var_array = indep_var_array)
    +        result = plpy.execute(query)
    +
    +        source_table_row_count = result[0]['source_table_row_count']
    +        num_rows_processed = result[0]['num_rows_processed']
    +        if not source_table_row_count or not num_rows_processed:
    +            plpy.error("Error while getting the row count of the source table"
    +                       "{0}".format(self.source_table))
    +        num_missing_rows_skipped = source_table_row_count - num_rows_processed
    +
    +        return num_rows_processed, num_missing_rows_skipped
    +
    +class MiniBatchQueryFormatter:
    +    """
    +    This class is responsible for formatting the independent and dependent
    +    variables into arrays so that they can be matrix agged by the preprocessor
    +    class.
    +    """
    +    def __init__(self, source_table):
    +        self.source_table = source_table
    +
    +    def get_dep_var_array_and_classes(self, dependent_varname, dependent_var_dbtype):
    +        """
    +        This function returns a tuple of
    +        1. A string with transformed dependent varname depending on it's type
    +        2. All the distinct dependent class levels encoded as a string
    +
    +        If dep_type == numeric , do not encode
    +                1. dependent_varname = rings
    +                    transformed_value = ARRAY[[rings1], [rings2], []]
    +                    class_level_str = ARRAY[rings = 'rings1',
    +                                            rings = 'rings2']::integer[]
    +                2. dependent_varname = ARRAY[a, b, c]
    +                    transformed_value = ARRAY[[a1, b1, c1], [a2, b2, c2], []]
    +                    class_level_str = 'NULL::TEXT'
    +        else if dep_type in ("text", "boolean"), encode:
    +                3. dependent_varname = rings (encoding)
    +                    transformed_value = ARRAY[[rings1=1, rings1=2], [rings2=1,
    +                                                rings2=2], []]
    +                    class_level_str = 'NULL::TEXT'
    +
    +        :param dependent_varname:
    +        :param dependent_var_dbtype:
    +        :return:
    +        """
    +        """
    +        """
    +        dep_var_class_value_str = 'NULL::TEXT'
    +        if dependent_var_dbtype in ("text", "boolean"):
    +            # for encoding, and since boolean can also be a logical expression,
    +            # there is a () for {dependent_varname} to make the query work
    +            dep_level_sql = """
    +            SELECT DISTINCT ({dependent_varname}) AS class
    +            FROM {source_table} where ({dependent_varname}) is NOT NULL
    +            """.format(dependent_varname=dependent_varname,
    +                       source_table=self.source_table)
    +            dep_levels = plpy.execute(dep_level_sql)
    +
    +            # this is string sorting
    +            dep_var_classes = sorted(
    +                ["{0}".format(l["class"]) for l in dep_levels])
    +
    +            dep_var_array_str = self._get_one_hot_encoded_str(dependent_varname,
    +                                                              dep_var_classes)
    +            dep_var_class_value_str = py_list_to_sql_string(dep_var_classes,
    +                                         array_type=dependent_var_dbtype)
    +
    +        elif "[]" in dependent_var_dbtype:
    +            dep_var_array_str = dependent_varname
    +
    +        elif is_psql_numeric_type(dependent_var_dbtype):
    +            dep_var_array_str = 'ARRAY[{0}]'.format(dependent_varname)
    +
    +        else:
    +            plpy.error("""Invalid dependent variable type. It should be text,
    +                boolean, numeric, or an array.""")
    +
    +        return dep_var_array_str, dep_var_class_value_str
    +
    +    def _get_one_hot_encoded_str(self, var_name, var_classes):
    +        one_hot_list = []
    +        for c in var_classes:
    +            one_hot_list.append("({0}) = '{1}'".format(var_name, c))
    +
    +        return 'ARRAY[{0}]::integer[]'.format(','.join(one_hot_list))
    +
    +    def get_indep_var_array_str(self, independent_varname):
    +        """
    +        we assume that all the independent features are either numeric or
    +        already encoded by the user.
    +        Supported formats
    +        1. ‘ARRAY[x1,x2,x3]’ , where x1,x2,x3 are columns in source table with
    +        scalar values
    +        2. ‘x1’, where x1 is a single column in source table, with value as an
    +        array, like ARRAY[1,2,3] or {1,2,3}
    +
    +        we don't deal with a mixture of scalar and array independent variables
    +        """
    +        typecasted_ind_varname = "{0}::double precision[]".format(
    +                                                            independent_varname)
    +        return typecasted_ind_varname
    +
    +class MiniBatchStandardizer:
    +    """
    +    This class is responsible for
    +    1. Calculating the mean and std dev for independent variables
    +    2. Format the query to standardize the input table based on the
    +       calculated mean/std dev
    +    3. Creating the output standardization table
    +    """
    +    def __init__(self, schema_madlib, source_table, dep_var_array_str,
    +                 indep_var_array_str, output_standardization_table):
    +        self.schema_madlib = schema_madlib
    +        self.source_table = source_table
    +        self.dep_var_array_str = dep_var_array_str
    +        self.indep_var_array_str = indep_var_array_str
    +        self.output_standardization_table = output_standardization_table
    +
    +        self.x_mean_str = None
    +        self.x_std_dev_str = None
    +        self.source_table_row_count = 0
    +        self.grouping_cols = "NULL"
    +        self.independent_var_dimension = None
    +        self._calculate_mean_and_std_dev_str()
    +
    +    def _calculate_mean_and_std_dev_str(self):
    +        self.independent_var_dimension, _ = _tbl_dimension_rownum(
    +                                                        self.schema_madlib,
    +                                                        self.source_table,
    +                                                        self.indep_var_array_str,
    +                                                        skip_row_count=True)
    +
    +        calculator = MeanStdDevCalculator(self.schema_madlib,
    +                                          self.source_table,
    +                                          self.indep_var_array_str,
    +                                          self.independent_var_dimension)
    +
    +        self.x_mean_str, self.x_std_dev_str = calculator.\
    +                                              get_mean_and_std_dev_for_ind_var()
    +
    +        if not self.x_mean_str or not self.x_std_dev_str:
    +            plpy.error("mean/stddev for the independent variable"
    +                       "cannot be null")
    +
    +    def get_query_for_standardizing(self):
    +        query="""
    +        SELECT
    +        {dep_var_array_str} as {dep_colname},
    +        {schema_madlib}.utils_normalize_data
    +        (
    +            {indep_var_array_str},'{x_mean_str}'::double precision[],
    +            '{x_std_dev_str}'::double precision[]
    +        ) as {ind_colname}
    +        FROM {source_table}
    +        """.format(
    +            source_table = self.source_table,
    +            schema_madlib = self.schema_madlib,
    +            dep_var_array_str = self.dep_var_array_str,
    +            indep_var_array_str = self.indep_var_array_str,
    +            dep_colname = MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
    +            ind_colname = MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
    +            x_mean_str = self.x_mean_str,
    +            x_std_dev_str = self.x_std_dev_str)
    +        return query
    +
    +    def create_output_standardization_table(self):
    +        query = """
    +        CREATE TABLE {output_standardization_table} AS
    +        select {grouping_cols}::TEXT AS grouping_cols,
    +        '{x_mean_str}'::double precision[] AS mean,
    +        '{x_std_dev_str}'::double precision[] AS std
    +        """.format(
    +        output_standardization_table = self.output_standardization_table,
    +        grouping_cols = self.grouping_cols,
    +        x_mean_str = self.x_mean_str,
    +        x_std_dev_str = self.x_std_dev_str)
    +        plpy.execute(query)
    +
    +class MiniBatchSummarizer:
    +    @staticmethod
    +    def create_output_summary_table(source_table, output_table,
    +                                    dep_var_array_str, indep_var_array_str,
    +                                    buffer_size, class_values, num_rows_processed,
    +                                    num_missing_rows_skipped, output_summary_table):
    +        query = """
    +            CREATE TABLE {output_summary_table} AS
    +            SELECT '{source_table}'::TEXT AS source_table,
    +            '{output_table}'::TEXT AS output_table,
    +            '{dependent_varname}'::TEXT AS dependent_varname,
    +            '{independent_varname}'::TEXT AS independent_varname,
    +            {buffer_size} AS buffer_size,
    +            {class_values} AS class_values,
    +            {num_rows_processed} AS num_rows_processed,
    +            {num_missing_rows_skipped} AS num_missing_rows_skipped,
    +            {grouping_cols}::TEXT AS grouping_cols
    +        """.format(output_summary_table = output_summary_table,
    +                   source_table = source_table,
    +                   output_table = output_table,
    +                   dependent_varname = dep_var_array_str,
    +                   independent_varname = indep_var_array_str,
    +                   buffer_size = buffer_size,
    +                   class_values = class_values,
    +                   num_rows_processed = num_rows_processed,
    +                   num_missing_rows_skipped = num_missing_rows_skipped,
    +                   grouping_cols = "NULL")
    +        plpy.execute(query)
    +
    +class MiniBatchBufferSizeCalculator:
    +    """
    +    This class is responsible for calculating the buffer size.
    +    This is a work in progress, final formula might change.
    +    """
    +    @staticmethod
    +    def calculate_default_buffer_size(buffer_size,
    +                                      num_rows_processed, independent_var_dimension):
    +        if buffer_size is not None:
    +            return buffer_size
    +        num_of_segments = get_seg_number()
    +
    +        default_buffer_size = min(75000000.0/independent_var_dimension,
    +                                    float(num_rows_processed)/num_of_segments)
    +        return ceil(default_buffer_size)
    +
    +class MiniBatchDocumentation:
    +    @staticmethod
    +    def minibatch_preprocessor_help(schema_madlib, message):
    +        method = "minibatch_preprocessor"
    +        summary = """
    +        ----------------------------------------------------------------
    +                            SUMMARY
    +        ----------------------------------------------------------------
    +        MiniBatch Preprocessor is a utility function to pre process the input
    +        data for use with models that support mini-batching as an optimization
    +
    +        #TODO add more here
    +
    +        For more details on function usage:
    +        SELECT {schema_madlib}.{method}('usage')
    +
    +        For a small example on using the function:
    +        SELECT {schema_madlib}.{method}('example')
    +        """.format(**locals())
    +
    +        usage = """
    +        ---------------------------------------------------------------------------
    +                                        USAGE
    +        ---------------------------------------------------------------------------
    +        SELECT {schema_madlib}.{method}(
    +            source_table,          -- TEXT. Name of the table containing input data.  Can also be a view
    +            output_table ,         -- TEXT. Name of the output table for mini-batching
    +            dependent_varname,     -- TEXT. Name of the dependent variable column
    +            independent_varname,   -- TEXT. Name of the independent variable column
    +            buffer_size            -- INTEGER. Number of source input rows to pack into batch
    +        );
    +
    +
    +        ---------------------------------------------------------------------------
    +                                        OUTPUT
    +        ---------------------------------------------------------------------------
    +        The output table produced by MiniBatch Preprocessor contains the following columns:
    +
    +        id					    -- INTEGER.  Unique id for packed table.
    +        dependent_varname 		-- FLOAT8[]. Packed array of dependent variables.
    +        independent_varname		-- FLOAT8[]. Packed array of independent variables.
    +
    +        ---------------------------------------------------------------------------
    +        The algorithm also creates a summary table named <output_table>_summary
    +        that has the following columns:
    +
    +        source_table    		  -- Source table name.
    +        output_table			  -- Output table name from preprocessor.
    +        dependent_varname   	  -- Dependent variable from the original table.
    +        independent_varname 	  -- Independent variables from the original table.
    +        buffer_size			      -- Buffer size used in preprocessing step.
    +        class_values			  -- Class values of the dependent variable (‘NULL’(as TEXT type) for non categorical vars).
    +        num_rows_processed  	  -- The total number of rows that were used in the computation.
    +        num_missing_rows_skipped  -- The total number of rows that were skipped because of NULL values in them.
    +        grouping_cols   		  -- NULL if no grouping_col was specified during training,
    +                                  -- and a comma separated list of grouping column names if not.
    +
    --- End diff --
    
    Would be nice to align the comments and make it multi-line if it runs too long.
    Please also consider aligning the total line length.


---

[GitHub] madlib pull request #241: MiniBatch Pre-Processor: Add new module minibatch_...

Posted by njayaram2 <gi...@git.apache.org>.
Github user njayaram2 commented on a diff in the pull request:

    https://github.com/apache/madlib/pull/241#discussion_r175593796
  
    --- Diff: src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in ---
    @@ -0,0 +1,559 @@
    +# coding=utf-8
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +
    +"""
    +@file minibatch_preprocessing.py_in
    +
    +"""
    +from math import ceil
    +import plpy
    +
    +from utilities import add_postfix
    +from utilities import _assert
    +from utilities import get_seg_number
    +from utilities import is_platform_pg
    +from utilities import is_psql_numeric_type
    +from utilities import is_string_formatted_as_array_expression
    +from utilities import py_list_to_sql_string
    +from utilities import split_quoted_delimited_str
    +from utilities import _string_to_array
    +from utilities import validate_module_input_params
    +from mean_std_dev_calculator import MeanStdDevCalculator
    +from validate_args import get_expr_type
    +from validate_args import output_tbl_valid
    +from validate_args import _tbl_dimension_rownum
    +
    +m4_changequote(`<!', `!>')
    +
    +# These are readonly variables, do not modify
    +MINIBATCH_OUTPUT_DEPENDENT_COLNAME = "dependent_varname"
    +MINIBATCH_OUTPUT_INDEPENDENT_COLNAME = "independent_varname"
    +
    +class MiniBatchPreProcessor:
    +    """
    +    This class is responsible for executing the main logic of mini batch
    +    preprocessing, which packs multiple rows of selected columns from the
    +    source table into one row based on the buffer size
    +    """
    +    def __init__(self, schema_madlib, source_table, output_table,
    +                  dependent_varname, independent_varname, buffer_size, **kwargs):
    +        self.schema_madlib = schema_madlib
    +        self.source_table = source_table
    +        self.output_table = output_table
    +        self.dependent_varname = dependent_varname
    +        self.independent_varname = independent_varname
    +        self.buffer_size = buffer_size
    +
    +        self.module_name = "minibatch_preprocessor"
    +        self.output_standardization_table = add_postfix(self.output_table,
    +                                                   "_standardization")
    +        self.output_summary_table = add_postfix(self.output_table, "_summary")
    +        self._validate_minibatch_preprocessor_params()
    +
    +    def minibatch_preprocessor(self):
    +        # Get array expressions for both dep and indep variables from the
    +        # MiniBatchQueryFormatter class
    +        dependent_var_dbtype = get_expr_type(self.dependent_varname,
    +                                             self.source_table)
    +        qry_formatter = MiniBatchQueryFormatter(self.source_table)
    +        dep_var_array_str, dep_var_classes_str = qry_formatter.\
    +            get_dep_var_array_and_classes(self.dependent_varname,
    +                                          dependent_var_dbtype)
    +        indep_var_array_str = qry_formatter.get_indep_var_array_str(
    +                                              self.independent_varname)
    +
    +        standardizer = MiniBatchStandardizer(self.schema_madlib,
    +                                             self.source_table,
    +                                             dep_var_array_str,
    +                                             indep_var_array_str,
    +                                             self.output_standardization_table)
    +        standardize_query = standardizer.get_query_for_standardizing()
    +
    +        num_rows_processed, num_missing_rows_skipped = self.\
    +                                                _get_skipped_rows_processed_count(
    +                                                dep_var_array_str,
    +                                                indep_var_array_str)
    +        calculated_buffer_size = MiniBatchBufferSizeCalculator.\
    +                                         calculate_default_buffer_size(
    +                                         self.buffer_size,
    +                                         num_rows_processed,
    +                                         standardizer.independent_var_dimension)
    +        """
    +        This query does the following:
    +        1. Standardize the independent variables in the input table
    +           (see MiniBatchStandardizer for more details)
    +        2. Filter out rows with null values either in dependent/independent
    +           variables
    +        3. Converts the input dependent/independent variables into arrays
    +          (see MiniBatchQueryFormatter for more details)
    +        4. Based on the buffer size, pack the dependent/independent arrays into
    +           matrices
    +
    +        Notes
    +        1. we are ignoring null in x because
    +             a. matrix_agg does not support null
    +             b. __utils_normalize_data returns null if any element of the array
    +                contains NULL
    +        2. Please keep the null checking where clause of this query in sync with
    +        the query in _get_skipped_rows_processed_count. We are doing this null
    +        check in two places to prevent another pass of the entire dataset.
    +        """
    +
    +        # This ID is the unique row id that get assigned to each row after preprocessing
    +        unique_row_id = "__id__"
    +        sql = """
    +            CREATE TABLE {output_table} AS
    +            SELECT {row_id},
    +                   {schema_madlib}.matrix_agg({dep_colname}) as {dep_colname},
    +                   {schema_madlib}.matrix_agg({ind_colname}) as {ind_colname}
    +            FROM (
    +                SELECT (row_number() OVER (ORDER BY random()) - 1) / {buffer_size}
    +                            as {row_id}, * FROM
    +                (
    +                    {standardize_query}
    +                 ) sub_query_1
    +                 WHERE NOT {schema_madlib}.array_contains_null({dep_colname})
    +                 AND NOT {schema_madlib}.array_contains_null({ind_colname})
    +            ) sub_query_2
    +            GROUP BY {row_id}
    +            {distributed_by_clause}
    +            """.format(
    +            schema_madlib=self.schema_madlib,
    +            source_table=self.source_table,
    +            output_table=self.output_table,
    +            dependent_varname=self.dependent_varname,
    +            independent_varname=self.independent_varname,
    +            buffer_size = calculated_buffer_size,
    +            dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
    +            ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
    +            row_id = unique_row_id,
    +            distributed_by_clause = '' if is_platform_pg() else 'DISTRIBUTED RANDOMLY',
    +            **locals())
    +        plpy.execute(sql)
    +
    +
    +        standardizer.create_output_standardization_table()
    +        MiniBatchSummarizer.create_output_summary_table(
    +            self.source_table,
    +            self.output_table,
    +            self.dependent_varname,
    +            self.independent_varname,
    +            calculated_buffer_size,
    +            dep_var_classes_str,
    +            num_rows_processed,
    +            num_missing_rows_skipped,
    +            self.output_summary_table)
    +
    +    def _validate_minibatch_preprocessor_params(self):
    +        # Test if the independent variable can be typecasted to a double precision
    +        # array and let postgres validate the expression
    +
    +        # Note that this will not fail for 2d arrays but the standardizer will
    +        # fail because utils_normalize_data will throw an error
    +        typecasted_ind_varname = "{0}::double precision[]".format(
    +                                                    self.independent_varname)
    +        validate_module_input_params(self.source_table, self.output_table,
    +                                     typecasted_ind_varname,
    +                                     self.dependent_varname, self.module_name)
    +
    +        self._validate_other_output_tables()
    --- End diff --
    
    This is linked to the last comment on this PR. We could use an optional param in `validate_module_input_params` to validate other output tables too (a list of suffixes such as `['_summary', '_standardization']`).


---

[GitHub] madlib issue #241: MiniBatch Pre-Processor: Add new module minibatch_preproc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/madlib/pull/241
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/madlib-pr-build/388/



---

[GitHub] madlib pull request #241: MiniBatch Pre-Processor: Add new module minibatch_...

Posted by njayaram2 <gi...@git.apache.org>.
Github user njayaram2 commented on a diff in the pull request:

    https://github.com/apache/madlib/pull/241#discussion_r175548591
  
    --- Diff: src/ports/postgres/modules/utilities/test/minibatch_preprocessing.sql_in ---
    @@ -0,0 +1,223 @@
    +/* ----------------------------------------------------------------------- *//**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + *
    + *//* ----------------------------------------------------------------------- */
    +DROP TABLE IF EXISTS minibatch_preprocessing_input;
    +CREATE TABLE minibatch_preprocessing_input(
    +    sex TEXT,
    +    id SERIAL NOT NULL,
    +    length DOUBLE PRECISION,
    +    diameter DOUBLE PRECISION,
    +    height DOUBLE PRECISION,
    +    whole DOUBLE PRECISION,
    +    shucked DOUBLE PRECISION,
    +    viscera DOUBLE PRECISION,
    +    shell DOUBLE PRECISION,
    +    rings INTEGER);
    +
    +-- Test empty input table
    +-- Error out if out table already exists
    +
    +INSERT INTO minibatch_preprocessing_input(id,sex,length,diameter,height,whole,shucked,viscera,shell,rings) VALUES
    +(1040,'F',0.66,0.475,0.18,1.3695,0.641,0.294,0.335,6),
    +(3160,'F',0.34,0.255,0.085,0.204,0.097,0.021,0.05,6),
    +(3984,'F',0.585,0.45,0.125,0.874,0.3545,0.2075,0.225,6),
    +(2551,'I',0.28,0.22,0.08,0.1315,0.066,0.024,0.03,5),
    +(1246,'I',0.385,0.28,0.09,0.228,0.1025,0.042,0.0655,5),
    +(519,'M',0.325,0.23,0.09,0.147,0.06,0.034,0.045,4),
    +(2382,'M',0.155,0.115,0.025,0.024,0.009,0.005,0.0075,5),
    +(698,'M',0.28,0.205,0.1,0.1165,0.0545,0.0285,0.03,5),
    +(2381,'M',0.175,0.135,0.04,0.0305,0.011,0.0075,0.01,5),
    +(516,'M',0.27,0.195,0.08,0.1,0.0385,0.0195,0.03,6);
    +
    +-- no of rows = 10, buffer_size = 4, so assert that count =  10/4 = 3
    +\set expected_row_count 3
    +DROP TABLE IF EXISTS minibatch_preprocessing_out, minibatch_preprocessing_out_standardization, minibatch_preprocessing_out_summary;
    +SELECT minibatch_preprocessor('minibatch_preprocessing_input', 'minibatch_preprocessing_out',  'length>0.2',  'ARRAY[diameter,height,whole,shucked,viscera,shell]', 4);
    +SELECT assert
    +        (
    +        row_count = :expected_row_count, 'Row count validation failed for minibatch_preprocessing_out.
    +        Expected:' || :expected_row_count || ' Actual: ' || row_count
    +        ) from (select count(*) as row_count from minibatch_preprocessing_out) s;
    +
    +\set expected_dep_row_count '\'' 2,4,4 '\''
    +\set expected_dep_col_count '\'' 2,2,2 '\''
    +\set expected_indep_row_count '\'' 2,4,4 '\''
    +\set expected_indep_col_count '\'' 6,6,6 '\''
    +
    +-- assert dimensions for both dependent and independent variable
    +SELECT assert
    +        (
    +        str_dep_row_count = :expected_dep_row_count, 'Dependent variable row count failed. Actual: ' || str_dep_row_count || ' Expected:' || :expected_dep_row_count
    +        ) from
    +        (
    +        select array_to_string(array_agg(row_count order by row_count asc), ',') as str_dep_row_count from (select array_upper(dependent_varname,1) as row_count from minibatch_preprocessing_out order by row_count asc) s
    +        ) s;
    +
    +SELECT assert
    +        (
    +        str_dep_col_count = :expected_dep_col_count, 'Dependent variable col count failed. Actual: ' || str_dep_col_count || ' Expected:' || :expected_dep_col_count
    +        ) from
    +        (
    +        select array_to_string(array_agg(col_count order by col_count asc), ',') as str_dep_col_count from (select array_upper(dependent_varname,2) as col_count from minibatch_preprocessing_out order by col_count asc) s
    +        ) s;
    +
    +SELECT assert
    +        (
    +        str_indep_row_count = :expected_indep_row_count, 'Independent variable row count failed. Actual: ' || str_indep_row_count || ' Expected:' || :expected_indep_row_count
    +        ) from
    +        (
    +        select array_to_string(array_agg(row_count order by row_count asc), ',') as str_indep_row_count from (select array_upper(independent_varname, 1) as row_count from minibatch_preprocessing_out order by row_count asc) s
    +        ) s;
    +
    +SELECT assert
    +        (
    +        str_indep_col_count = :expected_indep_col_count, 'Independent variable col count failed. Actual: ' || str_indep_col_count || ' Expected:' || :expected_indep_col_count
    +        ) from
    +        (
    +        select array_to_string(array_agg(col_count order by col_count asc), ',') as str_indep_col_count from (select array_upper(independent_varname,2) as col_count from minibatch_preprocessing_out order by col_count asc) s
    +        ) s;
    +
    +SELECT assert
    +        (
    +        source_table        = 'minibatch_preprocessing_input' AND
    +        output_table        = 'minibatch_preprocessing_out' AND
    +        dependent_varname   = 'length>0.2' AND
    +        independent_varname = 'ARRAY[diameter,height,whole,shucked,viscera,shell]' AND
    +        buffer_size         = 4 AND
    +        class_values        = '{f,t}' AND -- we sort the class values in python
    +        num_rows_processed  = 10 AND
    +        num_missing_rows_skipped    = 0 AND
    +        grouping_cols       is NULL,
    +        'Summary Validation failed. Expected:' || __to_char(summary)
    +        ) from (select * from minibatch_preprocessing_out_summary) summary;
    +
    +
    +-- Test null values in x and y
    +\set expected_row_count 1
    +DROP TABLE IF EXISTS minibatch_preprocessing_out, minibatch_preprocessing_out_standardization, minibatch_preprocessing_out_summary;
    +
    +TRUNCATE TABLE minibatch_preprocessing_input;
    +INSERT INTO minibatch_preprocessing_input(id,sex,length,diameter,height,whole,shucked,viscera,shell,rings) VALUES
    +(1040,'F',0.66,0.475,0.18,NULL,0.641,0.294,0.335,6),
    +(3160,'F',0.34,0.35,0.085,0.204,0.097,0.021,0.05,6),
    +(3984,NULL,0.585,0.45,0.25,0.874,0.3545,0.2075,0.225,5),
    +(861,'M',0.595,0.475,NULL,1.1405,0.547,0.231,0.271,6),
    +(932,NULL,0.445,0.335,0.11,0.4355,0.2025,0.1095,0.1195,6),
    +(698,'F',0.445,0.335,0.11,0.4355,0.2025,0.1095,0.1195,6),
    +(922,NULL,0.445,0.335,0.11,NULL,0.2025,0.1095,0.1195,6);
    +SELECT minibatch_preprocessor('minibatch_preprocessing_input', 'minibatch_preprocessing_out', 'sex', 'ARRAY[length,diameter,height,whole,shucked,viscera,shell]', 2);
    +SELECT assert
    +        (
    +        row_count = :expected_row_count, 'Row count validation failed for minibatch_preprocessing_out.
    +        Expected:' || :expected_row_count || ' Actual: ' || row_count
    +        ) from (select count(*) as row_count from minibatch_preprocessing_out) s;
    +SELECT assert
    +        (num_rows_processed = 2 AND num_missing_rows_skipped = 5,
    +         'Rows processed/skipped validation failed for minibatch_preprocessing_out_summary.
    +         Actual num_rows_processed:' || num_rows_processed || ', Actual num_missing_rows_skipped: ' || num_missing_rows_skipped
    +        ) from (select * from minibatch_preprocessing_out_summary) s;
    +
    +-- Test standardization
    +DROP TABLE IF EXISTS minibatch_preprocessing_input;
    +DROP TABLE IF EXISTS minibatch_preprocessing_out, minibatch_preprocessing_out_standardization, minibatch_preprocessing_out_summary;
    +CREATE TABLE minibatch_preprocessing_input(x1 INTEGER ,x2 INTEGER ,y TEXT);
    +INSERT INTO minibatch_preprocessing_input(x1,x2,y) VALUES
    +(2,10,'y1'),
    +(4,30,'y2');
    +SELECT minibatch_preprocessor('minibatch_preprocessing_input', 'minibatch_preprocessing_out', 'y', 'ARRAY[x1,x2]', 2);
    +
    +-- since the order is not deterministic, we assert for all possible orders
    +\set expected_normalized_independent_var1 '\'' {{-1, -1},{1, 1}} '\''
    +\set expected_normalized_independent_var2 '\'' {{1, 1},{-1, -1}} '\''
    +
    +SELECT assert
    +(
    +    independent_varname = :expected_normalized_independent_var1 OR
    +    independent_varname = :expected_normalized_independent_var2,
    +    'Standardization check failed. Actual: ' || independent_varname
    +) from
    +(
    +    select __to_char(independent_varname) as independent_varname from minibatch_preprocessing_out
    +) s;
    +
    +
    +-- Test that the standardization table gets created.
    +\set expected_row_count 1
    +SELECT assert
    +(
    +  row_count = :expected_row_count, 'Row count validation failed for minibatch_preprocessing_out_standardization.
    +        Expected:' || :expected_row_count || ' Actual: ' || row_count
    +) from
    +(
    +  select count(*) as row_count from minibatch_preprocessing_out_standardization
    +) s;
    +
    +-- Test that the summary table gets created.
    +\set expected_row_count 1
    +SELECT assert
    +(
    +  row_count = :expected_row_count, 'Row count validation failed for minibatch_preprocessing_out_summary.
    +        Expected:' || :expected_row_count || ' Actual: ' || row_count
    +) from
    +(
    +  select count(*) as row_count from minibatch_preprocessing_out_summary
    +) s;
    +
    +-- Test for array values in indep column
    +DROP TABLE IF EXISTS minibatch_preprocessing_input;
    +DROP TABLE IF EXISTS minibatch_preprocessing_out, minibatch_preprocessing_out_standardization, minibatch_preprocessing_out_summary;
    +CREATE TABLE minibatch_preprocessing_input(
    +    id INTEGER,
    +    sex TEXT,
    +    attributes double precision[],
    +    rings INTEGER);
    +TRUNCATE TABLE minibatch_preprocessing_input;
    +INSERT INTO minibatch_preprocessing_input(id,sex,attributes) VALUES
    +(1040,'F',ARRAY[0.66,0.475,0.18,NULL,0.641,0.294,0.335]),
    +(3160,'F',ARRAY[0.34,0.35,0.085,0.204,0.097,0.021,0.05]),
    +(3984,NULL,ARRAY[0.585,0.45,0.25,0.874,0.3545,0.2075,0.225]),
    +(861,'M',ARRAY[0.595,0.475,NULL,1.1405,0.547,0.231,0.271]),
    +(932,NULL,ARRAY[0.445,0.335,0.11,0.4355,0.2025,0.1095,0.1195]),
    +(NULL,'F',ARRAY[0.445,0.335,0.11,0.4355,0.2025,0.1095,0.1195]),
    +(922,NULL,ARRAY[0.445,0.335,0.11,NULL,0.2025,0.1095,0.1195]);
    +SELECT minibatch_preprocessor('minibatch_preprocessing_input', 'minibatch_preprocessing_out', 'sex', 'attributes', 1);
    +SELECT assert
    +        (
    +        row_count = 2, 'Row count validation failed for minibatch_preprocessing_out.
    +        Expected:' || 2 || ' Actual: ' || row_count
    +        ) from (select count(*) as row_count from minibatch_preprocessing_out) s;
    +
    +-- Test for array values in dep column
    +DROP TABLE IF EXISTS minibatch_preprocessing_out, minibatch_preprocessing_out_standardization, minibatch_preprocessing_out_summary;
    +SELECT minibatch_preprocessor('minibatch_preprocessing_input', 'minibatch_preprocessing_out', 'attributes', 'ARRAY[id]', 1);
    +SELECT assert
    +        (
    +        row_count = 3, 'Row count validation failed array values in dependent variable.
    +        Expected:' || 3 || ' Actual: ' || row_count
    +        ) from (select count(*) as row_count from minibatch_preprocessing_out) s;
    +
    +-- Test for null buffer size
    +DROP TABLE IF EXISTS minibatch_preprocessing_out, minibatch_preprocessing_out_standardization, minibatch_preprocessing_out_summary;
    +SELECT minibatch_preprocessor('minibatch_preprocessing_input', 'minibatch_preprocessing_out', 'attributes', 'ARRAY[id]');
    +SELECT assert
    +        (
    +        row_count = 3, 'Row count validation failed for null buffer size.
    +        Expected:' || 3 || ' Actual: ' || row_count
    +        ) from (select count(*) as row_count from minibatch_preprocessing_out) s;
    --- End diff --
    
    Please end this and other files with a new line.


---

[GitHub] madlib pull request #241: MiniBatch Pre-Processor: Add new module minibatch_...

Posted by njayaram2 <gi...@git.apache.org>.
Github user njayaram2 commented on a diff in the pull request:

    https://github.com/apache/madlib/pull/241#discussion_r175585050
  
    --- Diff: src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in ---
    @@ -0,0 +1,559 @@
    +# coding=utf-8
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +
    +"""
    +@file minibatch_preprocessing.py_in
    +
    +"""
    +from math import ceil
    +import plpy
    +
    +from utilities import add_postfix
    +from utilities import _assert
    +from utilities import get_seg_number
    +from utilities import is_platform_pg
    +from utilities import is_psql_numeric_type
    +from utilities import is_string_formatted_as_array_expression
    +from utilities import py_list_to_sql_string
    +from utilities import split_quoted_delimited_str
    +from utilities import _string_to_array
    +from utilities import validate_module_input_params
    +from mean_std_dev_calculator import MeanStdDevCalculator
    +from validate_args import get_expr_type
    +from validate_args import output_tbl_valid
    +from validate_args import _tbl_dimension_rownum
    +
    +m4_changequote(`<!', `!>')
    +
    +# These are readonly variables, do not modify
    +MINIBATCH_OUTPUT_DEPENDENT_COLNAME = "dependent_varname"
    +MINIBATCH_OUTPUT_INDEPENDENT_COLNAME = "independent_varname"
    +
    +class MiniBatchPreProcessor:
    +    """
    +    This class is responsible for executing the main logic of mini batch
    +    preprocessing, which packs multiple rows of selected columns from the
    +    source table into one row based on the buffer size
    +    """
    +    def __init__(self, schema_madlib, source_table, output_table,
    +                  dependent_varname, independent_varname, buffer_size, **kwargs):
    +        self.schema_madlib = schema_madlib
    +        self.source_table = source_table
    +        self.output_table = output_table
    +        self.dependent_varname = dependent_varname
    +        self.independent_varname = independent_varname
    +        self.buffer_size = buffer_size
    +
    +        self.module_name = "minibatch_preprocessor"
    +        self.output_standardization_table = add_postfix(self.output_table,
    +                                                   "_standardization")
    +        self.output_summary_table = add_postfix(self.output_table, "_summary")
    +        self._validate_minibatch_preprocessor_params()
    +
    +    def minibatch_preprocessor(self):
    +        # Get array expressions for both dep and indep variables from the
    +        # MiniBatchQueryFormatter class
    +        dependent_var_dbtype = get_expr_type(self.dependent_varname,
    +                                             self.source_table)
    +        qry_formatter = MiniBatchQueryFormatter(self.source_table)
    +        dep_var_array_str, dep_var_classes_str = qry_formatter.\
    +            get_dep_var_array_and_classes(self.dependent_varname,
    +                                          dependent_var_dbtype)
    +        indep_var_array_str = qry_formatter.get_indep_var_array_str(
    +                                              self.independent_varname)
    +
    +        standardizer = MiniBatchStandardizer(self.schema_madlib,
    +                                             self.source_table,
    +                                             dep_var_array_str,
    +                                             indep_var_array_str,
    +                                             self.output_standardization_table)
    +        standardize_query = standardizer.get_query_for_standardizing()
    +
    +        num_rows_processed, num_missing_rows_skipped = self.\
    +                                                _get_skipped_rows_processed_count(
    +                                                dep_var_array_str,
    +                                                indep_var_array_str)
    +        calculated_buffer_size = MiniBatchBufferSizeCalculator.\
    +                                         calculate_default_buffer_size(
    +                                         self.buffer_size,
    +                                         num_rows_processed,
    +                                         standardizer.independent_var_dimension)
    +        """
    +        This query does the following:
    +        1. Standardize the independent variables in the input table
    +           (see MiniBatchStandardizer for more details)
    +        2. Filter out rows with null values either in dependent/independent
    +           variables
    +        3. Converts the input dependent/independent variables into arrays
    +          (see MiniBatchQueryFormatter for more details)
    +        4. Based on the buffer size, pack the dependent/independent arrays into
    +           matrices
    +
    +        Notes
    +        1. we are ignoring null in x because
    +             a. matrix_agg does not support null
    +             b. __utils_normalize_data returns null if any element of the array
    +                contains NULL
    +        2. Please keep the null checking where clause of this query in sync with
    +        the query in _get_skipped_rows_processed_count. We are doing this null
    +        check in two places to prevent another pass of the entire dataset.
    +        """
    +
    +        # This ID is the unique row id that get assigned to each row after preprocessing
    +        unique_row_id = "__id__"
    +        sql = """
    +            CREATE TABLE {output_table} AS
    +            SELECT {row_id},
    +                   {schema_madlib}.matrix_agg({dep_colname}) as {dep_colname},
    +                   {schema_madlib}.matrix_agg({ind_colname}) as {ind_colname}
    +            FROM (
    +                SELECT (row_number() OVER (ORDER BY random()) - 1) / {buffer_size}
    +                            as {row_id}, * FROM
    +                (
    +                    {standardize_query}
    +                 ) sub_query_1
    +                 WHERE NOT {schema_madlib}.array_contains_null({dep_colname})
    +                 AND NOT {schema_madlib}.array_contains_null({ind_colname})
    +            ) sub_query_2
    +            GROUP BY {row_id}
    +            {distributed_by_clause}
    +            """.format(
    +            schema_madlib=self.schema_madlib,
    +            source_table=self.source_table,
    +            output_table=self.output_table,
    +            dependent_varname=self.dependent_varname,
    +            independent_varname=self.independent_varname,
    +            buffer_size = calculated_buffer_size,
    +            dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
    +            ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
    +            row_id = unique_row_id,
    +            distributed_by_clause = '' if is_platform_pg() else 'DISTRIBUTED RANDOMLY',
    +            **locals())
    +        plpy.execute(sql)
    +
    +
    +        standardizer.create_output_standardization_table()
    +        MiniBatchSummarizer.create_output_summary_table(
    +            self.source_table,
    +            self.output_table,
    +            self.dependent_varname,
    +            self.independent_varname,
    +            calculated_buffer_size,
    +            dep_var_classes_str,
    +            num_rows_processed,
    +            num_missing_rows_skipped,
    +            self.output_summary_table)
    +
    +    def _validate_minibatch_preprocessor_params(self):
    +        # Test if the independent variable can be typecasted to a double precision
    +        # array and let postgres validate the expression
    +
    +        # Note that this will not fail for 2d arrays but the standardizer will
    +        # fail because utils_normalize_data will throw an error
    +        typecasted_ind_varname = "{0}::double precision[]".format(
    +                                                    self.independent_varname)
    +        validate_module_input_params(self.source_table, self.output_table,
    +                                     typecasted_ind_varname,
    +                                     self.dependent_varname, self.module_name)
    +
    +        self._validate_other_output_tables()
    +
    +        num_of_dependent_cols = split_quoted_delimited_str(self.dependent_varname)
    +
    +        _assert(len(num_of_dependent_cols) == 1,
    +                "Invalid dependent_varname: only one column name is allowed "
    +                "as input.")
    +
    +        if self.buffer_size is not None:
    +            _assert(self.buffer_size > 0,
    +                """minibatch_preprocessor: The buffer size has to be a positive
    +                 integer or NULL.""")
    +
    +    def _validate_other_output_tables(self):
    +        """
    +        Validate that standardization and summary table do not exist.
    +        :return:
    +        """
    +        output_tbl_valid(self.output_standardization_table, self.module_name)
    +        output_tbl_valid(self.output_summary_table, self.module_name)
    +
    +    def _get_skipped_rows_processed_count(self, dep_var_array, indep_var_array):
    +        # Note: Keep the null checking where clause of this query in sync with the
    +        # main create output table query.
    +        query = """
    +                SELECT COUNT(*) AS source_table_row_count,
    +                sum(CASE WHEN
    +                NOT {schema_madlib}.array_contains_null({dep_var_array})
    +                AND NOT {schema_madlib}.array_contains_null({indep_var_array})
    +                THEN 1 ELSE 0 END) AS num_rows_processed
    +                FROM {source_table}
    +        """.format(
    +        schema_madlib = self.schema_madlib,
    +        source_table = self.source_table,
    +        dep_var_array = dep_var_array,
    +        indep_var_array = indep_var_array)
    +        result = plpy.execute(query)
    +
    +        source_table_row_count = result[0]['source_table_row_count']
    +        num_rows_processed = result[0]['num_rows_processed']
    +        if not source_table_row_count or not num_rows_processed:
    +            plpy.error("Error while getting the row count of the source table"
    +                       "{0}".format(self.source_table))
    +        num_missing_rows_skipped = source_table_row_count - num_rows_processed
    +
    +        return num_rows_processed, num_missing_rows_skipped
    +
    +class MiniBatchQueryFormatter:
    +    """
    +    This class is responsible for formatting the independent and dependent
    +    variables into arrays so that they can be matrix agged by the preprocessor
    +    class.
    +    """
    +    def __init__(self, source_table):
    +        self.source_table = source_table
    +
    +    def get_dep_var_array_and_classes(self, dependent_varname, dependent_var_dbtype):
    +        """
    +        This function returns a tuple of
    +        1. A string with transformed dependent varname depending on it's type
    +        2. All the distinct dependent class levels encoded as a string
    +
    +        If dep_type == numeric , do not encode
    +                1. dependent_varname = rings
    +                    transformed_value = ARRAY[[rings1], [rings2], []]
    +                    class_level_str = ARRAY[rings = 'rings1',
    +                                            rings = 'rings2']::integer[]
    +                2. dependent_varname = ARRAY[a, b, c]
    +                    transformed_value = ARRAY[[a1, b1, c1], [a2, b2, c2], []]
    +                    class_level_str = 'NULL::TEXT'
    +        else if dep_type in ("text", "boolean"), encode:
    +                3. dependent_varname = rings (encoding)
    +                    transformed_value = ARRAY[[rings1=1, rings1=2], [rings2=1,
    +                                                rings2=2], []]
    +                    class_level_str = 'NULL::TEXT'
    +
    +        :param dependent_varname:
    +        :param dependent_var_dbtype:
    +        :return:
    +        """
    +        """
    +        """
    +        dep_var_class_value_str = 'NULL::TEXT'
    +        if dependent_var_dbtype in ("text", "boolean"):
    +            # for encoding, and since boolean can also be a logical expression,
    +            # there is a () for {dependent_varname} to make the query work
    +            dep_level_sql = """
    +            SELECT DISTINCT ({dependent_varname}) AS class
    +            FROM {source_table} where ({dependent_varname}) is NOT NULL
    +            """.format(dependent_varname=dependent_varname,
    +                       source_table=self.source_table)
    +            dep_levels = plpy.execute(dep_level_sql)
    +
    +            # this is string sorting
    +            dep_var_classes = sorted(
    +                ["{0}".format(l["class"]) for l in dep_levels])
    +
    +            dep_var_array_str = self._get_one_hot_encoded_str(dependent_varname,
    +                                                              dep_var_classes)
    +            dep_var_class_value_str = py_list_to_sql_string(dep_var_classes,
    +                                         array_type=dependent_var_dbtype)
    +
    +        elif "[]" in dependent_var_dbtype:
    +            dep_var_array_str = dependent_varname
    +
    +        elif is_psql_numeric_type(dependent_var_dbtype):
    +            dep_var_array_str = 'ARRAY[{0}]'.format(dependent_varname)
    +
    +        else:
    +            plpy.error("""Invalid dependent variable type. It should be text,
    +                boolean, numeric, or an array.""")
    +
    +        return dep_var_array_str, dep_var_class_value_str
    +
    +    def _get_one_hot_encoded_str(self, var_name, var_classes):
    +        one_hot_list = []
    +        for c in var_classes:
    +            one_hot_list.append("({0}) = '{1}'".format(var_name, c))
    +
    +        return 'ARRAY[{0}]::integer[]'.format(','.join(one_hot_list))
    +
    +    def get_indep_var_array_str(self, independent_varname):
    +        """
    +        we assume that all the independent features are either numeric or
    +        already encoded by the user.
    +        Supported formats
    +        1. ‘ARRAY[x1,x2,x3]’ , where x1,x2,x3 are columns in source table with
    +        scalar values
    +        2. ‘x1’, where x1 is a single column in source table, with value as an
    +        array, like ARRAY[1,2,3] or {1,2,3}
    +
    +        we don't deal with a mixture of scalar and array independent variables
    +        """
    +        typecasted_ind_varname = "{0}::double precision[]".format(
    +                                                            independent_varname)
    +        return typecasted_ind_varname
    +
    +class MiniBatchStandardizer:
    +    """
    +    This class is responsible for
    +    1. Calculating the mean and std dev for independent variables
    +    2. Format the query to standardize the input table based on the
    +       calculated mean/std dev
    +    3. Creating the output standardization table
    +    """
    +    def __init__(self, schema_madlib, source_table, dep_var_array_str,
    +                 indep_var_array_str, output_standardization_table):
    +        self.schema_madlib = schema_madlib
    +        self.source_table = source_table
    +        self.dep_var_array_str = dep_var_array_str
    +        self.indep_var_array_str = indep_var_array_str
    +        self.output_standardization_table = output_standardization_table
    +
    +        self.x_mean_str = None
    +        self.x_std_dev_str = None
    +        self.source_table_row_count = 0
    +        self.grouping_cols = "NULL"
    +        self.independent_var_dimension = None
    +        self._calculate_mean_and_std_dev_str()
    +
    +    def _calculate_mean_and_std_dev_str(self):
    +        self.independent_var_dimension, _ = _tbl_dimension_rownum(
    +                                                        self.schema_madlib,
    +                                                        self.source_table,
    +                                                        self.indep_var_array_str,
    +                                                        skip_row_count=True)
    +
    +        calculator = MeanStdDevCalculator(self.schema_madlib,
    +                                          self.source_table,
    +                                          self.indep_var_array_str,
    +                                          self.independent_var_dimension)
    +
    +        self.x_mean_str, self.x_std_dev_str = calculator.\
    +                                              get_mean_and_std_dev_for_ind_var()
    +
    +        if not self.x_mean_str or not self.x_std_dev_str:
    +            plpy.error("mean/stddev for the independent variable"
    +                       "cannot be null")
    +
    +    def get_query_for_standardizing(self):
    +        query="""
    +        SELECT
    +        {dep_var_array_str} as {dep_colname},
    +        {schema_madlib}.utils_normalize_data
    +        (
    +            {indep_var_array_str},'{x_mean_str}'::double precision[],
    +            '{x_std_dev_str}'::double precision[]
    +        ) as {ind_colname}
    +        FROM {source_table}
    +        """.format(
    +            source_table = self.source_table,
    +            schema_madlib = self.schema_madlib,
    +            dep_var_array_str = self.dep_var_array_str,
    +            indep_var_array_str = self.indep_var_array_str,
    +            dep_colname = MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
    +            ind_colname = MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
    +            x_mean_str = self.x_mean_str,
    +            x_std_dev_str = self.x_std_dev_str)
    +        return query
    +
    +    def create_output_standardization_table(self):
    +        query = """
    +        CREATE TABLE {output_standardization_table} AS
    +        select {grouping_cols}::TEXT AS grouping_cols,
    +        '{x_mean_str}'::double precision[] AS mean,
    +        '{x_std_dev_str}'::double precision[] AS std
    +        """.format(
    +        output_standardization_table = self.output_standardization_table,
    +        grouping_cols = self.grouping_cols,
    +        x_mean_str = self.x_mean_str,
    +        x_std_dev_str = self.x_std_dev_str)
    +        plpy.execute(query)
    +
    +class MiniBatchSummarizer:
    +    @staticmethod
    +    def create_output_summary_table(source_table, output_table,
    +                                    dep_var_array_str, indep_var_array_str,
    +                                    buffer_size, class_values, num_rows_processed,
    +                                    num_missing_rows_skipped, output_summary_table):
    +        query = """
    +            CREATE TABLE {output_summary_table} AS
    +            SELECT '{source_table}'::TEXT AS source_table,
    +            '{output_table}'::TEXT AS output_table,
    +            '{dependent_varname}'::TEXT AS dependent_varname,
    +            '{independent_varname}'::TEXT AS independent_varname,
    +            {buffer_size} AS buffer_size,
    +            {class_values} AS class_values,
    +            {num_rows_processed} AS num_rows_processed,
    +            {num_missing_rows_skipped} AS num_missing_rows_skipped,
    +            {grouping_cols}::TEXT AS grouping_cols
    +        """.format(output_summary_table = output_summary_table,
    +                   source_table = source_table,
    +                   output_table = output_table,
    +                   dependent_varname = dep_var_array_str,
    +                   independent_varname = indep_var_array_str,
    +                   buffer_size = buffer_size,
    +                   class_values = class_values,
    +                   num_rows_processed = num_rows_processed,
    +                   num_missing_rows_skipped = num_missing_rows_skipped,
    +                   grouping_cols = "NULL")
    +        plpy.execute(query)
    +
    +class MiniBatchBufferSizeCalculator:
    +    """
    +    This class is responsible for calculating the buffer size.
    +    This is a work in progress, final formula might change.
    +    """
    +    @staticmethod
    +    def calculate_default_buffer_size(buffer_size,
    +                                      num_rows_processed, independent_var_dimension):
    +        if buffer_size is not None:
    +            return buffer_size
    +        num_of_segments = get_seg_number()
    +
    +        default_buffer_size = min(75000000.0/independent_var_dimension,
    +                                    float(num_rows_processed)/num_of_segments)
    +        return ceil(default_buffer_size)
    +
    +class MiniBatchDocumentation:
    +    @staticmethod
    +    def minibatch_preprocessor_help(schema_madlib, message):
    +        method = "minibatch_preprocessor"
    +        summary = """
    +        ----------------------------------------------------------------
    +                            SUMMARY
    +        ----------------------------------------------------------------
    +        MiniBatch Preprocessor is a utility function to pre process the input
    +        data for use with models that support mini-batching as an optimization
    +
    +        #TODO add more here
    --- End diff --
    
    Is the comment still required?


---

[GitHub] madlib issue #241: MiniBatch Pre-Processor: Add new module minibatch_preproc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/madlib/pull/241
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/madlib-pr-build/374/



---

[GitHub] madlib issue #241: MiniBatch Pre-Processor: Add new module minibatch_preproc...

Posted by njayaram2 <gi...@git.apache.org>.
Github user njayaram2 commented on the issue:

    https://github.com/apache/madlib/pull/241
  
    Another issue I found but forgot to mention in the review:
    The `__id__` column has double values instead of integers. For instance, I found values such as `0.20000000000000000000` for that column in the output table.
    This issue also happens only when the module is used without specifying a value for the `buffer_size` param.


---

[GitHub] madlib issue #241: MiniBatch Pre-Processor: Add new module minibatch_preproc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/madlib/pull/241
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/madlib-pr-build/389/



---

[GitHub] madlib pull request #241: MiniBatch Pre-Processor: Add new module minibatch_...

Posted by njayaram2 <gi...@git.apache.org>.
Github user njayaram2 commented on a diff in the pull request:

    https://github.com/apache/madlib/pull/241#discussion_r175531202
  
    --- Diff: src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in ---
    @@ -0,0 +1,559 @@
    +# coding=utf-8
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +
    +"""
    +@file minibatch_preprocessing.py_in
    +
    +"""
    +from math import ceil
    +import plpy
    +
    +from utilities import add_postfix
    +from utilities import _assert
    +from utilities import get_seg_number
    +from utilities import is_platform_pg
    +from utilities import is_psql_numeric_type
    +from utilities import is_string_formatted_as_array_expression
    +from utilities import py_list_to_sql_string
    +from utilities import split_quoted_delimited_str
    +from utilities import _string_to_array
    +from utilities import validate_module_input_params
    +from mean_std_dev_calculator import MeanStdDevCalculator
    +from validate_args import get_expr_type
    +from validate_args import output_tbl_valid
    +from validate_args import _tbl_dimension_rownum
    +
    +m4_changequote(`<!', `!>')
    +
    +# These are readonly variables, do not modify
    +MINIBATCH_OUTPUT_DEPENDENT_COLNAME = "dependent_varname"
    +MINIBATCH_OUTPUT_INDEPENDENT_COLNAME = "independent_varname"
    +
    +class MiniBatchPreProcessor:
    +    """
    +    This class is responsible for executing the main logic of mini batch
    +    preprocessing, which packs multiple rows of selected columns from the
    +    source table into one row based on the buffer size
    +    """
    +    def __init__(self, schema_madlib, source_table, output_table,
    +                  dependent_varname, independent_varname, buffer_size, **kwargs):
    +        self.schema_madlib = schema_madlib
    +        self.source_table = source_table
    +        self.output_table = output_table
    +        self.dependent_varname = dependent_varname
    +        self.independent_varname = independent_varname
    +        self.buffer_size = buffer_size
    +
    +        self.module_name = "minibatch_preprocessor"
    +        self.output_standardization_table = add_postfix(self.output_table,
    +                                                   "_standardization")
    +        self.output_summary_table = add_postfix(self.output_table, "_summary")
    +        self._validate_minibatch_preprocessor_params()
    +
    +    def minibatch_preprocessor(self):
    +        # Get array expressions for both dep and indep variables from the
    +        # MiniBatchQueryFormatter class
    +        dependent_var_dbtype = get_expr_type(self.dependent_varname,
    +                                             self.source_table)
    +        qry_formatter = MiniBatchQueryFormatter(self.source_table)
    +        dep_var_array_str, dep_var_classes_str = qry_formatter.\
    +            get_dep_var_array_and_classes(self.dependent_varname,
    +                                          dependent_var_dbtype)
    +        indep_var_array_str = qry_formatter.get_indep_var_array_str(
    +                                              self.independent_varname)
    +
    +        standardizer = MiniBatchStandardizer(self.schema_madlib,
    +                                             self.source_table,
    +                                             dep_var_array_str,
    +                                             indep_var_array_str,
    +                                             self.output_standardization_table)
    +        standardize_query = standardizer.get_query_for_standardizing()
    +
    +        num_rows_processed, num_missing_rows_skipped = self.\
    +                                                _get_skipped_rows_processed_count(
    +                                                dep_var_array_str,
    +                                                indep_var_array_str)
    +        calculated_buffer_size = MiniBatchBufferSizeCalculator.\
    +                                         calculate_default_buffer_size(
    +                                         self.buffer_size,
    +                                         num_rows_processed,
    +                                         standardizer.independent_var_dimension)
    +        """
    +        This query does the following:
    +        1. Standardize the independent variables in the input table
    +           (see MiniBatchStandardizer for more details)
    +        2. Filter out rows with null values either in dependent/independent
    +           variables
    +        3. Converts the input dependent/independent variables into arrays
    +          (see MiniBatchQueryFormatter for more details)
    +        4. Based on the buffer size, pack the dependent/independent arrays into
    +           matrices
    +
    +        Notes
    +        1. we are ignoring null in x because
    +             a. matrix_agg does not support null
    +             b. __utils_normalize_data returns null if any element of the array
    +                contains NULL
    +        2. Please keep the null checking where clause of this query in sync with
    +        the query in _get_skipped_rows_processed_count. We are doing this null
    +        check in two places to prevent another pass of the entire dataset.
    +        """
    +
    +        # This ID is the unique row id that get assigned to each row after preprocessing
    +        unique_row_id = "__id__"
    +        sql = """
    +            CREATE TABLE {output_table} AS
    +            SELECT {row_id},
    +                   {schema_madlib}.matrix_agg({dep_colname}) as {dep_colname},
    +                   {schema_madlib}.matrix_agg({ind_colname}) as {ind_colname}
    +            FROM (
    +                SELECT (row_number() OVER (ORDER BY random()) - 1) / {buffer_size}
    +                            as {row_id}, * FROM
    +                (
    +                    {standardize_query}
    +                 ) sub_query_1
    +                 WHERE NOT {schema_madlib}.array_contains_null({dep_colname})
    +                 AND NOT {schema_madlib}.array_contains_null({ind_colname})
    +            ) sub_query_2
    +            GROUP BY {row_id}
    +            {distributed_by_clause}
    +            """.format(
    +            schema_madlib=self.schema_madlib,
    +            source_table=self.source_table,
    +            output_table=self.output_table,
    +            dependent_varname=self.dependent_varname,
    +            independent_varname=self.independent_varname,
    +            buffer_size = calculated_buffer_size,
    +            dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
    +            ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
    +            row_id = unique_row_id,
    +            distributed_by_clause = '' if is_platform_pg() else 'DISTRIBUTED RANDOMLY',
    +            **locals())
    +        plpy.execute(sql)
    +
    +
    +        standardizer.create_output_standardization_table()
    +        MiniBatchSummarizer.create_output_summary_table(
    +            self.source_table,
    +            self.output_table,
    +            self.dependent_varname,
    +            self.independent_varname,
    +            calculated_buffer_size,
    +            dep_var_classes_str,
    +            num_rows_processed,
    +            num_missing_rows_skipped,
    +            self.output_summary_table)
    +
    +    def _validate_minibatch_preprocessor_params(self):
    +        # Test if the independent variable can be typecasted to a double precision
    +        # array and let postgres validate the expression
    +
    +        # Note that this will not fail for 2d arrays but the standardizer will
    +        # fail because utils_normalize_data will throw an error
    +        typecasted_ind_varname = "{0}::double precision[]".format(
    +                                                    self.independent_varname)
    +        validate_module_input_params(self.source_table, self.output_table,
    +                                     typecasted_ind_varname,
    +                                     self.dependent_varname, self.module_name)
    +
    +        self._validate_other_output_tables()
    +
    +        num_of_dependent_cols = split_quoted_delimited_str(self.dependent_varname)
    +
    +        _assert(len(num_of_dependent_cols) == 1,
    +                "Invalid dependent_varname: only one column name is allowed "
    +                "as input.")
    +
    +        if self.buffer_size is not None:
    +            _assert(self.buffer_size > 0,
    +                """minibatch_preprocessor: The buffer size has to be a positive
    +                 integer or NULL.""")
    +
    +    def _validate_other_output_tables(self):
    +        """
    +        Validate that standardization and summary table do not exist.
    +        :return:
    +        """
    +        output_tbl_valid(self.output_standardization_table, self.module_name)
    +        output_tbl_valid(self.output_summary_table, self.module_name)
    +
    +    def _get_skipped_rows_processed_count(self, dep_var_array, indep_var_array):
    +        # Note: Keep the null checking where clause of this query in sync with the
    +        # main create output table query.
    +        query = """
    +                SELECT COUNT(*) AS source_table_row_count,
    +                sum(CASE WHEN
    +                NOT {schema_madlib}.array_contains_null({dep_var_array})
    +                AND NOT {schema_madlib}.array_contains_null({indep_var_array})
    +                THEN 1 ELSE 0 END) AS num_rows_processed
    +                FROM {source_table}
    +        """.format(
    +        schema_madlib = self.schema_madlib,
    +        source_table = self.source_table,
    +        dep_var_array = dep_var_array,
    +        indep_var_array = indep_var_array)
    +        result = plpy.execute(query)
    +
    +        source_table_row_count = result[0]['source_table_row_count']
    +        num_rows_processed = result[0]['num_rows_processed']
    +        if not source_table_row_count or not num_rows_processed:
    +            plpy.error("Error while getting the row count of the source table"
    +                       "{0}".format(self.source_table))
    +        num_missing_rows_skipped = source_table_row_count - num_rows_processed
    +
    +        return num_rows_processed, num_missing_rows_skipped
    +
    +class MiniBatchQueryFormatter:
    +    """
    +    This class is responsible for formatting the independent and dependent
    +    variables into arrays so that they can be matrix agged by the preprocessor
    +    class.
    +    """
    +    def __init__(self, source_table):
    +        self.source_table = source_table
    +
    +    def get_dep_var_array_and_classes(self, dependent_varname, dependent_var_dbtype):
    +        """
    +        This function returns a tuple of
    +        1. A string with transformed dependent varname depending on it's type
    +        2. All the distinct dependent class levels encoded as a string
    +
    +        If dep_type == numeric , do not encode
    +                1. dependent_varname = rings
    +                    transformed_value = ARRAY[[rings1], [rings2], []]
    +                    class_level_str = ARRAY[rings = 'rings1',
    +                                            rings = 'rings2']::integer[]
    +                2. dependent_varname = ARRAY[a, b, c]
    +                    transformed_value = ARRAY[[a1, b1, c1], [a2, b2, c2], []]
    +                    class_level_str = 'NULL::TEXT'
    +        else if dep_type in ("text", "boolean"), encode:
    +                3. dependent_varname = rings (encoding)
    +                    transformed_value = ARRAY[[rings1=1, rings1=2], [rings2=1,
    +                                                rings2=2], []]
    +                    class_level_str = 'NULL::TEXT'
    +
    +        :param dependent_varname:
    +        :param dependent_var_dbtype:
    +        :return:
    +        """
    +        """
    +        """
    --- End diff --
    
    Missing return value, and an extra pair of `"""` in the docstring.


---

[GitHub] madlib pull request #241: MiniBatch Pre-Processor: Add new module minibatch_...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/madlib/pull/241


---

[GitHub] madlib issue #241: MiniBatch Pre-Processor: Add new module minibatch_preproc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/madlib/pull/241
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/madlib-pr-build/390/



---

[GitHub] madlib issue #241: MiniBatch Pre-Processor: Add new module minibatch_preproc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/madlib/pull/241
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/madlib-pr-build/369/



---

[GitHub] madlib pull request #241: MiniBatch Pre-Processor: Add new module minibatch_...

Posted by jingyimei <gi...@git.apache.org>.
Github user jingyimei commented on a diff in the pull request:

    https://github.com/apache/madlib/pull/241#discussion_r175957501
  
    --- Diff: src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in ---
    @@ -0,0 +1,559 @@
    +# coding=utf-8
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +
    +"""
    +@file minibatch_preprocessing.py_in
    +
    +"""
    +from math import ceil
    +import plpy
    +
    +from utilities import add_postfix
    +from utilities import _assert
    +from utilities import get_seg_number
    +from utilities import is_platform_pg
    +from utilities import is_psql_numeric_type
    +from utilities import is_string_formatted_as_array_expression
    +from utilities import py_list_to_sql_string
    +from utilities import split_quoted_delimited_str
    +from utilities import _string_to_array
    +from utilities import validate_module_input_params
    +from mean_std_dev_calculator import MeanStdDevCalculator
    +from validate_args import get_expr_type
    +from validate_args import output_tbl_valid
    +from validate_args import _tbl_dimension_rownum
    +
    +m4_changequote(`<!', `!>')
    +
    +# These are readonly variables, do not modify
    +MINIBATCH_OUTPUT_DEPENDENT_COLNAME = "dependent_varname"
    +MINIBATCH_OUTPUT_INDEPENDENT_COLNAME = "independent_varname"
    +
    +class MiniBatchPreProcessor:
    +    """
    +    This class is responsible for executing the main logic of mini batch
    +    preprocessing, which packs multiple rows of selected columns from the
    +    source table into one row based on the buffer size
    +    """
    +    def __init__(self, schema_madlib, source_table, output_table,
    +                  dependent_varname, independent_varname, buffer_size, **kwargs):
    +        self.schema_madlib = schema_madlib
    +        self.source_table = source_table
    +        self.output_table = output_table
    +        self.dependent_varname = dependent_varname
    +        self.independent_varname = independent_varname
    +        self.buffer_size = buffer_size
    +
    +        self.module_name = "minibatch_preprocessor"
    +        self.output_standardization_table = add_postfix(self.output_table,
    +                                                   "_standardization")
    +        self.output_summary_table = add_postfix(self.output_table, "_summary")
    +        self._validate_minibatch_preprocessor_params()
    +
    +    def minibatch_preprocessor(self):
    +        # Get array expressions for both dep and indep variables from the
    +        # MiniBatchQueryFormatter class
    +        dependent_var_dbtype = get_expr_type(self.dependent_varname,
    +                                             self.source_table)
    +        qry_formatter = MiniBatchQueryFormatter(self.source_table)
    +        dep_var_array_str, dep_var_classes_str = qry_formatter.\
    +            get_dep_var_array_and_classes(self.dependent_varname,
    +                                          dependent_var_dbtype)
    +        indep_var_array_str = qry_formatter.get_indep_var_array_str(
    +                                              self.independent_varname)
    +
    +        standardizer = MiniBatchStandardizer(self.schema_madlib,
    +                                             self.source_table,
    +                                             dep_var_array_str,
    +                                             indep_var_array_str,
    +                                             self.output_standardization_table)
    +        standardize_query = standardizer.get_query_for_standardizing()
    +
    +        num_rows_processed, num_missing_rows_skipped = self.\
    +                                                _get_skipped_rows_processed_count(
    +                                                dep_var_array_str,
    +                                                indep_var_array_str)
    +        calculated_buffer_size = MiniBatchBufferSizeCalculator.\
    +                                         calculate_default_buffer_size(
    +                                         self.buffer_size,
    +                                         num_rows_processed,
    +                                         standardizer.independent_var_dimension)
    +        """
    +        This query does the following:
    +        1. Standardize the independent variables in the input table
    +           (see MiniBatchStandardizer for more details)
    +        2. Filter out rows with null values either in dependent/independent
    +           variables
    +        3. Converts the input dependent/independent variables into arrays
    +          (see MiniBatchQueryFormatter for more details)
    +        4. Based on the buffer size, pack the dependent/independent arrays into
    +           matrices
    +
    +        Notes
    +        1. we are ignoring null in x because
    +             a. matrix_agg does not support null
    +             b. __utils_normalize_data returns null if any element of the array
    +                contains NULL
    +        2. Please keep the null checking where clause of this query in sync with
    +        the query in _get_skipped_rows_processed_count. We are doing this null
    +        check in two places to prevent another pass of the entire dataset.
    +        """
    +
    +        # This ID is the unique row id that get assigned to each row after preprocessing
    +        unique_row_id = "__id__"
    +        sql = """
    +            CREATE TABLE {output_table} AS
    +            SELECT {row_id},
    +                   {schema_madlib}.matrix_agg({dep_colname}) as {dep_colname},
    +                   {schema_madlib}.matrix_agg({ind_colname}) as {ind_colname}
    +            FROM (
    +                SELECT (row_number() OVER (ORDER BY random()) - 1) / {buffer_size}
    +                            as {row_id}, * FROM
    +                (
    +                    {standardize_query}
    +                 ) sub_query_1
    +                 WHERE NOT {schema_madlib}.array_contains_null({dep_colname})
    +                 AND NOT {schema_madlib}.array_contains_null({ind_colname})
    +            ) sub_query_2
    +            GROUP BY {row_id}
    +            {distributed_by_clause}
    +            """.format(
    +            schema_madlib=self.schema_madlib,
    +            source_table=self.source_table,
    +            output_table=self.output_table,
    +            dependent_varname=self.dependent_varname,
    +            independent_varname=self.independent_varname,
    +            buffer_size = calculated_buffer_size,
    +            dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
    +            ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
    +            row_id = unique_row_id,
    +            distributed_by_clause = '' if is_platform_pg() else 'DISTRIBUTED RANDOMLY',
    +            **locals())
    +        plpy.execute(sql)
    +
    +
    +        standardizer.create_output_standardization_table()
    +        MiniBatchSummarizer.create_output_summary_table(
    +            self.source_table,
    +            self.output_table,
    +            self.dependent_varname,
    +            self.independent_varname,
    +            calculated_buffer_size,
    +            dep_var_classes_str,
    +            num_rows_processed,
    +            num_missing_rows_skipped,
    +            self.output_summary_table)
    +
    +    def _validate_minibatch_preprocessor_params(self):
    +        # Test if the independent variable can be typecasted to a double precision
    +        # array and let postgres validate the expression
    +
    +        # Note that this will not fail for 2d arrays but the standardizer will
    +        # fail because utils_normalize_data will throw an error
    +        typecasted_ind_varname = "{0}::double precision[]".format(
    +                                                    self.independent_varname)
    +        validate_module_input_params(self.source_table, self.output_table,
    +                                     typecasted_ind_varname,
    +                                     self.dependent_varname, self.module_name)
    +
    +        self._validate_other_output_tables()
    +
    +        num_of_dependent_cols = split_quoted_delimited_str(self.dependent_varname)
    +
    +        _assert(len(num_of_dependent_cols) == 1,
    +                "Invalid dependent_varname: only one column name is allowed "
    +                "as input.")
    +
    +        if self.buffer_size is not None:
    +            _assert(self.buffer_size > 0,
    +                """minibatch_preprocessor: The buffer size has to be a positive
    +                 integer or NULL.""")
    +
    +    def _validate_other_output_tables(self):
    +        """
    +        Validate that standardization and summary table do not exist.
    +        :return:
    +        """
    +        output_tbl_valid(self.output_standardization_table, self.module_name)
    +        output_tbl_valid(self.output_summary_table, self.module_name)
    +
    +    def _get_skipped_rows_processed_count(self, dep_var_array, indep_var_array):
    +        # Note: Keep the null checking where clause of this query in sync with the
    +        # main create output table query.
    +        query = """
    +                SELECT COUNT(*) AS source_table_row_count,
    +                sum(CASE WHEN
    +                NOT {schema_madlib}.array_contains_null({dep_var_array})
    +                AND NOT {schema_madlib}.array_contains_null({indep_var_array})
    +                THEN 1 ELSE 0 END) AS num_rows_processed
    +                FROM {source_table}
    +        """.format(
    +        schema_madlib = self.schema_madlib,
    +        source_table = self.source_table,
    +        dep_var_array = dep_var_array,
    +        indep_var_array = indep_var_array)
    +        result = plpy.execute(query)
    +
    +        source_table_row_count = result[0]['source_table_row_count']
    +        num_rows_processed = result[0]['num_rows_processed']
    +        if not source_table_row_count or not num_rows_processed:
    +            plpy.error("Error while getting the row count of the source table"
    +                       "{0}".format(self.source_table))
    +        num_missing_rows_skipped = source_table_row_count - num_rows_processed
    +
    +        return num_rows_processed, num_missing_rows_skipped
    +
    +class MiniBatchQueryFormatter:
    +    """
    +    This class is responsible for formatting the independent and dependent
    +    variables into arrays so that they can be matrix agged by the preprocessor
    +    class.
    +    """
    +    def __init__(self, source_table):
    +        self.source_table = source_table
    +
    +    def get_dep_var_array_and_classes(self, dependent_varname, dependent_var_dbtype):
    +        """
    +        This function returns a tuple of
    +        1. A string with transformed dependent varname depending on it's type
    +        2. All the distinct dependent class levels encoded as a string
    +
    +        If dep_type == numeric , do not encode
    +                1. dependent_varname = rings
    +                    transformed_value = ARRAY[[rings1], [rings2], []]
    +                    class_level_str = ARRAY[rings = 'rings1',
    +                                            rings = 'rings2']::integer[]
    +                2. dependent_varname = ARRAY[a, b, c]
    +                    transformed_value = ARRAY[[a1, b1, c1], [a2, b2, c2], []]
    +                    class_level_str = 'NULL::TEXT'
    +        else if dep_type in ("text", "boolean"), encode:
    +                3. dependent_varname = rings (encoding)
    +                    transformed_value = ARRAY[[rings1=1, rings1=2], [rings2=1,
    +                                                rings2=2], []]
    +                    class_level_str = 'NULL::TEXT'
    +
    +        :param dependent_varname:
    +        :param dependent_var_dbtype:
    +        :return:
    +        """
    +        """
    +        """
    +        dep_var_class_value_str = 'NULL::TEXT'
    +        if dependent_var_dbtype in ("text", "boolean"):
    +            # for encoding, and since boolean can also be a logical expression,
    +            # there is a () for {dependent_varname} to make the query work
    +            dep_level_sql = """
    +            SELECT DISTINCT ({dependent_varname}) AS class
    +            FROM {source_table} where ({dependent_varname}) is NOT NULL
    +            """.format(dependent_varname=dependent_varname,
    +                       source_table=self.source_table)
    +            dep_levels = plpy.execute(dep_level_sql)
    +
    +            # this is string sorting
    +            dep_var_classes = sorted(
    +                ["{0}".format(l["class"]) for l in dep_levels])
    +
    +            dep_var_array_str = self._get_one_hot_encoded_str(dependent_varname,
    +                                                              dep_var_classes)
    +            dep_var_class_value_str = py_list_to_sql_string(dep_var_classes,
    +                                         array_type=dependent_var_dbtype)
    +
    +        elif "[]" in dependent_var_dbtype:
    +            dep_var_array_str = dependent_varname
    +
    +        elif is_psql_numeric_type(dependent_var_dbtype):
    +            dep_var_array_str = 'ARRAY[{0}]'.format(dependent_varname)
    +
    +        else:
    +            plpy.error("""Invalid dependent variable type. It should be text,
    +                boolean, numeric, or an array.""")
    +
    +        return dep_var_array_str, dep_var_class_value_str
    +
    +    def _get_one_hot_encoded_str(self, var_name, var_classes):
    +        one_hot_list = []
    +        for c in var_classes:
    +            one_hot_list.append("({0}) = '{1}'".format(var_name, c))
    +
    +        return 'ARRAY[{0}]::integer[]'.format(','.join(one_hot_list))
    +
    +    def get_indep_var_array_str(self, independent_varname):
    +        """
    +        we assume that all the independent features are either numeric or
    +        already encoded by the user.
    +        Supported formats
    +        1. ‘ARRAY[x1,x2,x3]’ , where x1,x2,x3 are columns in source table with
    +        scalar values
    +        2. ‘x1’, where x1 is a single column in source table, with value as an
    +        array, like ARRAY[1,2,3] or {1,2,3}
    +
    +        we don't deal with a mixture of scalar and array independent variables
    +        """
    +        typecasted_ind_varname = "{0}::double precision[]".format(
    +                                                            independent_varname)
    +        return typecasted_ind_varname
    +
    +class MiniBatchStandardizer:
    +    """
    +    This class is responsible for
    +    1. Calculating the mean and std dev for independent variables
    +    2. Format the query to standardize the input table based on the
    +       calculated mean/std dev
    +    3. Creating the output standardization table
    +    """
    +    def __init__(self, schema_madlib, source_table, dep_var_array_str,
    +                 indep_var_array_str, output_standardization_table):
    +        self.schema_madlib = schema_madlib
    +        self.source_table = source_table
    +        self.dep_var_array_str = dep_var_array_str
    +        self.indep_var_array_str = indep_var_array_str
    +        self.output_standardization_table = output_standardization_table
    +
    +        self.x_mean_str = None
    +        self.x_std_dev_str = None
    +        self.source_table_row_count = 0
    +        self.grouping_cols = "NULL"
    +        self.independent_var_dimension = None
    +        self._calculate_mean_and_std_dev_str()
    +
    +    def _calculate_mean_and_std_dev_str(self):
    +        self.independent_var_dimension, _ = _tbl_dimension_rownum(
    +                                                        self.schema_madlib,
    +                                                        self.source_table,
    +                                                        self.indep_var_array_str,
    +                                                        skip_row_count=True)
    +
    +        calculator = MeanStdDevCalculator(self.schema_madlib,
    +                                          self.source_table,
    +                                          self.indep_var_array_str,
    +                                          self.independent_var_dimension)
    +
    +        self.x_mean_str, self.x_std_dev_str = calculator.\
    +                                              get_mean_and_std_dev_for_ind_var()
    +
    +        if not self.x_mean_str or not self.x_std_dev_str:
    +            plpy.error("mean/stddev for the independent variable"
    +                       "cannot be null")
    +
    +    def get_query_for_standardizing(self):
    +        query="""
    +        SELECT
    +        {dep_var_array_str} as {dep_colname},
    +        {schema_madlib}.utils_normalize_data
    +        (
    +            {indep_var_array_str},'{x_mean_str}'::double precision[],
    +            '{x_std_dev_str}'::double precision[]
    +        ) as {ind_colname}
    +        FROM {source_table}
    +        """.format(
    +            source_table = self.source_table,
    +            schema_madlib = self.schema_madlib,
    +            dep_var_array_str = self.dep_var_array_str,
    +            indep_var_array_str = self.indep_var_array_str,
    +            dep_colname = MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
    +            ind_colname = MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
    +            x_mean_str = self.x_mean_str,
    +            x_std_dev_str = self.x_std_dev_str)
    +        return query
    +
    +    def create_output_standardization_table(self):
    +        query = """
    +        CREATE TABLE {output_standardization_table} AS
    +        select {grouping_cols}::TEXT AS grouping_cols,
    +        '{x_mean_str}'::double precision[] AS mean,
    +        '{x_std_dev_str}'::double precision[] AS std
    +        """.format(
    +        output_standardization_table = self.output_standardization_table,
    +        grouping_cols = self.grouping_cols,
    +        x_mean_str = self.x_mean_str,
    +        x_std_dev_str = self.x_std_dev_str)
    +        plpy.execute(query)
    +
    +class MiniBatchSummarizer:
    +    @staticmethod
    +    def create_output_summary_table(source_table, output_table,
    +                                    dep_var_array_str, indep_var_array_str,
    +                                    buffer_size, class_values, num_rows_processed,
    +                                    num_missing_rows_skipped, output_summary_table):
    +        query = """
    +            CREATE TABLE {output_summary_table} AS
    +            SELECT '{source_table}'::TEXT AS source_table,
    +            '{output_table}'::TEXT AS output_table,
    +            '{dependent_varname}'::TEXT AS dependent_varname,
    +            '{independent_varname}'::TEXT AS independent_varname,
    +            {buffer_size} AS buffer_size,
    +            {class_values} AS class_values,
    +            {num_rows_processed} AS num_rows_processed,
    +            {num_missing_rows_skipped} AS num_missing_rows_skipped,
    +            {grouping_cols}::TEXT AS grouping_cols
    +        """.format(output_summary_table = output_summary_table,
    +                   source_table = source_table,
    +                   output_table = output_table,
    +                   dependent_varname = dep_var_array_str,
    +                   independent_varname = indep_var_array_str,
    +                   buffer_size = buffer_size,
    +                   class_values = class_values,
    +                   num_rows_processed = num_rows_processed,
    +                   num_missing_rows_skipped = num_missing_rows_skipped,
    +                   grouping_cols = "NULL")
    +        plpy.execute(query)
    --- End diff --
    
    good catch. The problem is that we assumed that math.ceil returns an int but it returns a float and hence the bug. Our unit tests could not catch this because comparing an int value for a float variable is valid. We need to assert that the instance type of the variable is int. 
    We will improve the unit test as well as fix the bug.


---