You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by nk...@apache.org on 2018/06/27 17:58:13 UTC

[2/5] madlib git commit: MLP+Minibatch Preprocessing: Support special characters

MLP+Minibatch Preprocessing: Support special characters

JIRA: MADLIB-1237
JIRA: MADLIB-1238

This commit enables special character support for column names and
column values for mlp and minibatch preprocessor. We decided to use the
following strategy for supporting special characters

The module that needs to support special characters will have to call
quote_literal() on all the column values that need to be escaped and
quoted and then this list can be passed to the py_list_to_sql_string
function

We also created a function called get_distinct_col_levels which will
call quote_literal and then return a list of escaped column levels. The
output of this function can then be safely passed to
py_list_to_sql_string with long_format set as True.

Co-Authored-by: Jingyi Mei <jm...@pivotal.io>
Co-Authored-by: Rahul Iyer <ri...@apache.org>
Co-Authored-by: Arvind Sridhar <as...@pivotal.io>


Project: http://git-wip-us.apache.org/repos/asf/madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/madlib/commit/4713b24e
Tree: http://git-wip-us.apache.org/repos/asf/madlib/tree/4713b24e
Diff: http://git-wip-us.apache.org/repos/asf/madlib/diff/4713b24e

Branch: refs/heads/test
Commit: 4713b24eac1c27ba09cd1152e502b02bb1e13da4
Parents: 8e34f68
Author: Jingyi Mei <jm...@pivotal.io>
Authored: Wed May 23 16:29:54 2018 -0700
Committer: Nikhil Kak <nk...@pivotal.io>
Committed: Wed Jun 20 12:03:28 2018 -0700

----------------------------------------------------------------------
 src/ports/postgres/modules/convex/mlp_igd.py_in |  86 ++---
 .../postgres/modules/convex/test/mlp.sql_in     | 187 +++++++++-
 .../convex/test/unit_tests/test_mlp_igd.py_in   |   3 +-
 .../postgres/modules/internal/__init__.py_in    |   0
 .../postgres/modules/internal/db_utils.py_in    |  74 ++++
 .../utilities/minibatch_preprocessing.py_in     | 363 ++++++++-----------
 .../utilities/minibatch_preprocessing.sql_in    |   6 +-
 .../test/minibatch_preprocessing.sql_in         |  48 ++-
 .../test_minibatch_preprocessing.py_in          | 361 +++++++++---------
 .../test/unit_tests/test_utilities.py_in        |  32 +-
 .../postgres/modules/utilities/utilities.py_in  |  75 +++-
 11 files changed, 780 insertions(+), 455 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/madlib/blob/4713b24e/src/ports/postgres/modules/convex/mlp_igd.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/convex/mlp_igd.py_in b/src/ports/postgres/modules/convex/mlp_igd.py_in
index 2cfa12f..1ee282f 100644
--- a/src/ports/postgres/modules/convex/mlp_igd.py_in
+++ b/src/ports/postgres/modules/convex/mlp_igd.py_in
@@ -25,13 +25,15 @@
 """
 import math
 import plpy
-from random import random
 
 from convex.utils_regularization import utils_ind_var_scales
 from convex.utils_regularization import utils_ind_var_scales_grouping
 from convex.utils_regularization import __utils_normalize_data
 from convex.utils_regularization import __utils_normalize_data_grouping
 
+from internal.db_utils import get_distinct_col_levels
+from internal.db_utils import get_one_hot_encoded_expr
+from internal.db_utils import quote_literal_python_list
 from utilities.control import MinWarning
 from utilities.in_mem_group_control import GroupIterationController
 from utilities.utilities import _array_to_string
@@ -41,7 +43,9 @@ from utilities.utilities import _string_to_array_with_quotes
 from utilities.utilities import add_postfix
 from utilities.utilities import extract_keyvalue_params
 from utilities.utilities import get_grouping_col_str
-from utilities.utilities import is_psql_numeric_type
+from utilities.utilities import is_valid_psql_type
+from utilities.utilities import NUMERIC, INTEGER, TEXT, BOOLEAN
+from utilities.utilities import INCLUDE_ARRAY, ONLY_ARRAY
 from utilities.utilities import py_list_to_sql_string as PY2SQL
 from utilities.utilities import strip_end_quotes, split_quoted_delimited_str
 from utilities.utilities import unique_string
@@ -53,6 +57,7 @@ from utilities.validate_args import input_tbl_valid
 from utilities.validate_args import is_var_valid
 from utilities.validate_args import output_tbl_valid
 from utilities.validate_args import table_exists
+from utilities.validate_args import quote_ident
 from utilities.minibatch_validation import is_var_one_hot_encoded_for_minibatch
 
 
@@ -137,7 +142,7 @@ def mlp(schema_madlib, source_table, output_table, independent_varname,
                                             dim=2)
         if is_classification:
             if pp_summary_dict["class_values"]:
-                classes = pp_summary_dict["class_values"]
+                classes = quote_literal_python_list(pp_summary_dict["class_values"])
                 num_output_nodes = len(classes)
             else:
                 # Assume that the dependent variable is already one-hot-encoded
@@ -172,21 +177,14 @@ def mlp(schema_madlib, source_table, output_table, independent_varname,
                 num_output_nodes = get_col_dimension(tbl_data_scaled,
                                                      dependent_varname)
             else:
-                labels = plpy.execute("SELECT DISTINCT {0} FROM {1}".
-                                      format(dependent_varname_backup, source_table))
-                num_output_nodes = len(labels)
-                for label_obj in labels:
-                    label = _format_label(label_obj[dependent_varname_backup])
-                    classes.append(label)
-                classes.sort()
-                level_vals_str = ','.join(["{0}={1}".format(
-                                           col_dep_var_norm_new, str(c))
-                                           for c in classes])
-                # dependent_varname should be replaced with one-hot encoded varname
-                dependent_varname = "ARRAY[{0}]::integer[]".format(level_vals_str)
+                classes = get_distinct_col_levels(
+                    source_table, dependent_varname_backup, dependent_vartype)
+                num_output_nodes = len(classes)
+                dependent_varname = get_one_hot_encoded_expr(dependent_varname,
+                                                             classes)
         else:
             if "[]" not in dependent_vartype:
-                dependent_varname = "ARRAY[" + col_dep_var_norm_new + "]"
+                dependent_varname = "ARRAY[{0}]".format(col_dep_var_norm_new)
             num_output_nodes = get_col_dimension(tbl_data_scaled,
                                                  dependent_varname, dim=1)
 
@@ -479,15 +477,13 @@ def _create_summary_table(args):
         batch_size = args['batch_size']
         n_epochs = args['n_epochs']
         minibatch_summary_col_vals = """
-                '{source_table}',
-                '{independent_varname}',
-                '{dependent_varname}',
+                $__madlib__${source_table}$__madlib__$::TEXT,
+                $__madlib__${independent_varname}$__madlib__$::TEXT,
+                $__madlib__${dependent_varname}$__madlib__$::TEXT,
                 {batch_size},
                 {n_epochs},
             """.format(**locals())
 
-    classes_str = PY2SQL([strip_end_quotes(cl, "'") for cl in args['classes']],
-                         array_type=classes_type)
     summary_table_creation_query = """
         CREATE TABLE {summary_table}(
             source_table TEXT,
@@ -513,8 +509,8 @@ def _create_summary_table(args):
     summary_table_update_query = """
         INSERT INTO {summary_table} VALUES(
             '{source_table}',
-            '{independent_varname}',
-            '{dependent_varname_backup}',
+            $__madlib__${independent_varname}$__madlib__$::TEXT,
+            $__madlib__${dependent_varname_backup}$__madlib__$::TEXT,
             {minibatch_summary_col_vals}
             '{dependent_vartype}',
             {tolerance},
@@ -528,7 +524,8 @@ def _create_summary_table(args):
             {classes_str},
             '{weights}',
             '{grouping_text}'
-        )""".format(classes_str=classes_str,
+        )""".format(classes_str=PY2SQL(args['classes'], array_type=classes_type,
+                                       long_format=True),
                     minibatch_summary_col_vals=minibatch_summary_col_vals,
                     **args)
     plpy.execute(summary_table_creation_query)
@@ -678,33 +675,27 @@ def _validate_warm_start(output_table, summary_table, standardization_table,
 def _validate_dependent_var(source_table, dependent_varname,
                             is_classification, is_minibatch_enabled):
     expr_type = get_expr_type(dependent_varname, source_table)
-    int_types = ['integer', 'smallint', 'bigint']
-    text_types = ['text', 'varchar', 'character varying', 'char', 'character']
-    boolean_types = ['boolean']
-    classification_types = int_types + boolean_types + text_types
+    classification_types = INTEGER | BOOLEAN | TEXT
 
     if is_minibatch_enabled:
-        # With pre-processed data, dep type is always an array
-        _assert("[]" in expr_type,
-                "Dependent variable column should refer to an array.")
         # The dependent variable is always a double precision array in
         # preprocessed data (so check for numeric types)
         # strip out '[]' from expr_type
-        _assert(is_psql_numeric_type(expr_type[:-2]),
-                "Dependent variable column should be of numeric type.")
+        _assert(is_valid_psql_type(expr_type, NUMERIC | ONLY_ARRAY),
+                "Dependent variable column should be a numeric array.")
+
         if is_classification:
             is_var_one_hot_encoded_for_minibatch(source_table,dependent_varname)
     else:
         if is_classification:
-            _assert(("[]" in expr_type \
-                     and is_psql_numeric_type(expr_type[:-2]) \
-                     and not _get_dep_var_second_dim(dependent_varname, source_table) \
-                    ) \
-                    or expr_type in classification_types,
+            _assert((is_valid_psql_type(expr_type, NUMERIC | ONLY_ARRAY)
+                    and not _get_dep_var_second_dim(dependent_varname, source_table)
+                    )
+                    or is_valid_psql_type(expr_type, classification_types),
                     "Dependent variable column should either be a numeric 1-D"
-                    " array, or be of type: {0}".format(classification_types))
+                    " array, or be of type: {0}".format(','.join(classification_types)))
         else:
-            _assert("[]" in expr_type or is_psql_numeric_type(expr_type),
+            _assert(is_valid_psql_type(expr_type, NUMERIC | INCLUDE_ARRAY),
                     "Dependent variable column should be of numeric type.")
 
 def _get_dep_var_second_dim(dependent_varname, source_table):
@@ -735,9 +726,6 @@ def _validate_params_based_on_minibatch(source_table, independent_varname,
         float_types = ['double precision', 'real']
         _assert(get_expr_type(weights, source_table) in int_types + float_types,
                 "MLP error: Weights should be a numeric type")
-        # Validate independent variable
-        _assert("[]" in get_expr_type(independent_varname, source_table),
-                "Independent variable column should refer to an array")
         _assert(array_col_has_same_dimension(source_table, independent_varname),
                 "Independent variable column should refer to arrays of the same length")
 
@@ -761,6 +749,9 @@ def _validate_args(source_table, output_table, summary_table,
             "({source_table})!".format(
                 independent_varname=independent_varname,
                 source_table=source_table))
+    # independent variable should either be a 1D or 2D(minibatch) array
+    _assert("[]" in get_expr_type(independent_varname, source_table),
+            "Independent variable column should refer to an array")
 
     _assert(is_var_valid(source_table, dependent_varname),
             "MLP error: invalid dependent_varname "
@@ -1044,10 +1035,13 @@ def mlp_predict(schema_madlib, model_table, data_table, id_col_name,
         else:
             intermediate_col = unique_string()
             if classes:
+                # we cannot call sql quote_ident on the class value because
+                # aliasing does not support quote_ident. Hence calling our
+                # python implementation of quote_ident
                 score_format = ',\n'.join([
-                    'CAST({interim}[{j}] as DOUBLE PRECISION) as "estimated_prob_{c_str}"'.
-                    format(j=i + 1, c_str=str(c).strip(' "'),
-                           interim=intermediate_col)
+                    'CAST({interim}[{j}] as DOUBLE PRECISION) as "{c_str}"'.
+                    format(j=i + 1, c_str=quote_ident("estimated_prob_{0}".
+                        format(str(c))).strip(' "'), interim=intermediate_col)
                     for i, c in enumerate(classes)])
             else:
                 # Case when the training step did not have to one-hot encode

http://git-wip-us.apache.org/repos/asf/madlib/blob/4713b24e/src/ports/postgres/modules/convex/test/mlp.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/convex/test/mlp.sql_in b/src/ports/postgres/modules/convex/test/mlp.sql_in
index 16d1637..da8931f 100644
--- a/src/ports/postgres/modules/convex/test/mlp.sql_in
+++ b/src/ports/postgres/modules/convex/test/mlp.sql_in
@@ -233,7 +233,7 @@ INSERT INTO iris_data_batch_standardization VALUES
 (ARRAY[5.74893617021,3.02482269504,3.6865248227,1.18014184397],
     ARRAY[0.785472439601,0.396287027644,1.68671151195,0.750245336531]);
 
--- Create preprocessed data that can be used with minibatch MLP:
+-- Create grouping preprocessed data that can be used with minibatch MLP:
 DROP TABLE IF EXISTS iris_data_batch_grp, iris_data_batch_grp_summary, iris_data_batch_grp_standardization;
 CREATE TABLE iris_data_batch_grp(
     grp text,
@@ -270,7 +270,7 @@ CREATE TABLE iris_data_batch_grp_summary(
 -- the preprocessed table.
 INSERT INTO iris_data_batch_grp_summary VALUES
 ('iris_data_does_not_exist', 'iris_data_batch_grp', 'class::TEXT', 'attributes',
-    'text', 30, ARRAY[1,2,3], 141, 0, 'grp');
+    'text', 30, ARRAY['foo','bar','baaz'] , 141, 0, 'grp');
 -- Create the corresponding standardization table for preprocessed data
 CREATE TABLE iris_data_batch_grp_standardization(
     grp text,
@@ -318,7 +318,7 @@ SELECT mlp_classification(
     ARRAY[5],   -- Number of units per layer
     'learning_rate_init=0.1,
     learning_rate_policy=constant,
-    n_iterations=5,
+    n_iterations=1,
     n_tries=3,
     tolerance=0,
     n_epochs=20',
@@ -327,6 +327,28 @@ SELECT mlp_classification(
     False,
     False
 );
+SELECT assert
+        (
+        source_table        = 'iris_data_batch' AND
+        independent_varname = 'independent_varname' AND
+        dependent_varname   = 'dependent_varname' AND
+        original_source_table = 'iris_data_does_not_exist' AND
+        original_independent_varname = 'attributes' AND
+        original_dependent_varname = 'class::TEXT' AND
+        original_dependent_vartype = 'text' AND
+        tolerance         = 0 AND
+        learning_rate_init = 0.1 AND
+        learning_rate_policy = 'constant' AND
+        n_iterations = 1 AND
+        n_tries = 3 AND
+        layer_sizes = ARRAY[4,5,3] AND
+        activation = 'sigmoid' AND
+        is_classification = 't' AND
+        classes     = '{1,2,3}' AND
+        weights  = '1' AND
+        grouping_col    = NULL,
+        'Summary Validation failed for special chars. Actual:' || __to_char(summary)
+        ) from (select * from mlp_class_batch_summary order by classes) summary;
 
 DROP TABLE IF EXISTS mlp_prediction_batch_output, mlp_prediction_batch_output_summary, mlp_prediction_batch_output_standardization;
 SELECT mlp_predict(
@@ -336,6 +358,8 @@ SELECT mlp_predict(
     'mlp_prediction_batch_output',
     'output');
 
+
+
 -- minibatch with grouping and without warm_start
 DROP TABLE IF EXISTS mlp_class_batch, mlp_class_batch_summary, mlp_class_batch_standardization;
 SELECT mlp_classification(
@@ -1096,3 +1120,160 @@ FROM
 JOIN pg_catalog.pg_namespace n
 ON n.oid=c.relnamespace
 WHERE c.relname = 'lin_housing_wi_batch_standardization' AND c.relkind='r' AND nspname=current_schema();
+
+------------------------ Test special characters --------------------------------
+DROP TABLE IF EXISTS iris_data_special_char;
+CREATE TABLE iris_data_special_char(
+    id INTEGER,
+    "se$$''x" TEXT,
+    "len$$'%*Ж!#'()gth" DOUBLE PRECISION[],
+    "rinЖ!#'gs" INTEGER);
+
+INSERT INTO iris_data_special_char VALUES
+(1, 'M''M',ARRAY[0.66, 0.5],6),
+(2, '''M''M''',ARRAY[0.66, 0.5],6),
+(3, 'M|$$M',ARRAY[0.66, 0.5],6),
+(4, 'M,M',ARRAY[0.66, 0.5],6),
+(5, 'M@[}(:*;M',ARRAY[0.66, 0.5],6),
+(6, 'M"M',ARRAY[0.66, 0.5],6),
+(7, 'MЖM',ARRAY[0.66, 0.5],6);
+
+
+DROP TABLE IF EXISTS mlp_model, mlp_model_summary, mlp_model_standardization;
+-- Set seed so results are reproducible
+SELECT setseed(0);
+SELECT mlp_classification(
+    'iris_data_special_char',      -- Source table
+    'mlp_model_special_char',      -- Destination table
+    '"len$$''%*Ж!#''()gth"',     -- Input features
+    '"se$$''''x"',     -- Dependent variable
+    ARRAY[5],         -- Number of units per layer
+    'learning_rate_init=0.003,
+    n_iterations=10,
+    tolerance=0',     -- Optimizer params
+    'tanh',           -- Activation function
+    NULL,             -- Default weight (1)
+    FALSE,            -- No warm start
+    FALSE             -- Not verbose
+);
+
+SELECT assert
+        (
+        source_table        = 'iris_data_special_char' AND
+        independent_varname = '"len$$''%*Ж!#''()gth"' AND
+        dependent_varname   = '"se$$''''x"' AND
+        tolerance         = 0 AND
+        learning_rate_init = 0.003 AND
+        learning_rate_policy = 'constant' AND
+        n_iterations = 10 AND
+        n_tries = 1 AND
+        layer_sizes = ARRAY[2,5,7] AND
+        activation = 'tanh' AND
+        is_classification = 't' AND
+        classes        = $__madlib__${ 'M'M', M\"M, M'M, "M,M", "M@[}(:*;M", "M|$$M", MЖM }$__madlib__$ AND
+        weights  = '1' AND
+        grouping_col       = NULL,
+        'Summary Validation failed for special chars. Actual:' || __to_char(summary)
+        ) from (select * from mlp_model_special_char_summary order by classes) summary;
+
+
+DROP TABLE IF EXISTS mlp_prediction_batch_output, mlp_prediction_batch_output_summary, mlp_prediction_batch_output_standardization;
+SELECT mlp_predict(
+    'mlp_model_special_char',
+    'iris_data_special_char',
+    'id',
+    'mlp_prediction_batch_output',
+    'output');
+
+
+-- Test special chars with minibatch enabled
+DROP TABLE IF EXISTS iris_data_special_char_batch, iris_data_special_char_batch_summary, iris_data_special_char_batch_standardization;
+CREATE TABLE iris_data_special_char_batch(
+    __id__ integer,
+    dependent_varname double precision[],
+    independent_varname double precision[]
+);
+COPY iris_data_special_char_batch (__id__, dependent_varname, independent_varname) FROM STDIN NULL '?' DELIMITER '|';
+0 | {{0,0,0,1,0,0,0},{0,1,0,0,0,0,0},{0,0,1,0,0,0,0},{0,0,0,0,1,0,0}} | {{0,0},{0,0},{0,0},{0,0}}
+1 | {{0,0,0,0,0,0,1},{1,0,0,0,0,0,0},{0,0,0,0,0,1,0}} | {{0,0},{0,0},{0,0}}
+\.
+
+-- Create the corresponding summary table for preprocessed data
+CREATE TABLE iris_data_special_char_batch_summary(
+    source_table text,
+    output_table text,
+    dependent_varname text,
+    independent_varname text,
+    dependent_vartype text,
+    buffer_size integer,
+    class_values text[],
+    num_rows_processed integer,
+    num_rows_skipped integer,
+    grouping_cols text
+);
+-- The availability of the original source table should not be a condition for
+-- MLP to work correctly. It should work fine even the original source table is
+-- deleted (this basically ensures that all the necessary info is captured in
+-- the summary table). So name the original source table as
+-- 'iris_data_does_not_exist' instead of the original 'iris_data', to mimic the
+-- scenario where the original source table is deleted and MLP is trained with
+-- the preprocessed table.
+INSERT INTO iris_data_special_char_batch_summary VALUES
+('minibatch_preprocessing_input', 'minibatch_preprocessing_out', '"se$$''x"', '"len$$''%*Ж!#''()gth"',
+    'text', 4, $__madlib__${'M'M',"M\"M",M'M,"M,M","M@[}(:*;M",M|$$M,MЖM}$__madlib__$, 7, 0, '');
+-- Create the corresponding standardization table for preprocessed data
+CREATE TABLE iris_data_special_char_batch_standardization(
+    mean double precision[],
+    std double precision[]
+);
+INSERT INTO iris_data_special_char_batch_standardization VALUES
+(ARRAY[0.66,0.5], ARRAY[1,1]);
+
+DROP TABLE IF EXISTS mlp_class_batch, mlp_class_batch_summary, mlp_class_batch_standardization;
+SELECT mlp_classification(
+    'iris_data_special_char_batch',    -- Source table
+    'mlp_class_special_char_batch',    -- Desination table
+    'independent_varname',   -- Input features
+    'dependent_varname',        -- Label
+    ARRAY[5],   -- Number of units per layer
+    'learning_rate_init=0.1,
+    learning_rate_policy=constant,
+    n_iterations=1,
+    n_tries=1,
+    tolerance=0,
+    n_epochs=20',
+    'sigmoid',
+    '',
+    False,
+    False
+);
+SELECT assert
+        (
+        source_table        = 'iris_data_special_char_batch' AND
+        independent_varname = 'independent_varname' AND
+        dependent_varname   = 'dependent_varname' AND
+        original_source_table = 'minibatch_preprocessing_input' AND
+        original_independent_varname = '"len$$''%*Ж!#''()gth"' AND
+        original_dependent_varname = '"se$$''x"' AND
+        original_dependent_vartype = 'text' AND
+        tolerance         = 0 AND
+        learning_rate_init = 0.1 AND
+        learning_rate_policy = 'constant' AND
+        n_iterations = 1 AND
+        n_tries = 1 AND
+        layer_sizes = ARRAY[2,5,7] AND
+        activation = 'sigmoid' AND
+        is_classification = 't' AND
+        classes     = $__madlib__${ 'M'M', M\"M, M'M, "M,M", "M@[}(:*;M", "M|$$M", MЖM }$__madlib__$ AND
+        weights  = '1' AND
+        grouping_col    = NULL,
+        'Summary Validation failed for special chars. Actual:' || __to_char(summary)
+        ) from (select * from mlp_class_special_char_batch_summary order by classes) summary;
+
+DROP TABLE IF EXISTS mlp_prediction_batch_output, mlp_prediction_batch_output_summary, mlp_prediction_batch_output_standardization;
+SELECT mlp_predict(
+    'mlp_class_special_char_batch',
+    'iris_data_special_char',
+    'id',
+    'mlp_prediction_batch_output',
+    'output');

http://git-wip-us.apache.org/repos/asf/madlib/blob/4713b24e/src/ports/postgres/modules/convex/test/unit_tests/test_mlp_igd.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/convex/test/unit_tests/test_mlp_igd.py_in b/src/ports/postgres/modules/convex/test/unit_tests/test_mlp_igd.py_in
index 986687e..2883855 100644
--- a/src/ports/postgres/modules/convex/test/unit_tests/test_mlp_igd.py_in
+++ b/src/ports/postgres/modules/convex/test/unit_tests/test_mlp_igd.py_in
@@ -125,10 +125,11 @@ class MLPMiniBatchTestCase(unittest.TestCase):
         self.plpy_mock_execute.return_value = [{'independent_varname': 'value',
                                                 'dependent_varname': 'value',
                                                 'class_values': 'regression',
+                                                'grouping_cols': 'value',
                                                 'foo': 'bar'}]
         self.module = self.subject.MLPMinibatchPreProcessor("input")
         self.assertTrue(self.module.preprocessed_summary_dict)
-        self.assertEqual(4, len(self.module.preprocessed_summary_dict))
+        self.assertEqual(5, len(self.module.preprocessed_summary_dict))
 
     def test_check_if_minibatch_enabled_returns_bool(self):
         self.plpy_mock_execute.return_value =  [{'n_x': 1, 'n_y': 2, 'n_z': None}]

http://git-wip-us.apache.org/repos/asf/madlib/blob/4713b24e/src/ports/postgres/modules/internal/__init__.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/internal/__init__.py_in b/src/ports/postgres/modules/internal/__init__.py_in
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/madlib/blob/4713b24e/src/ports/postgres/modules/internal/db_utils.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/internal/db_utils.py_in b/src/ports/postgres/modules/internal/db_utils.py_in
new file mode 100644
index 0000000..e82ba91
--- /dev/null
+++ b/src/ports/postgres/modules/internal/db_utils.py_in
@@ -0,0 +1,74 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import plpy
+from utilities.utilities import is_psql_char_type
+from utilities.validate_args import get_expr_type
+
+m4_changequote(`<!', `!>')
+
+QUOTE_DELIMITER="$__madlib__$"
+
+def get_distinct_col_levels(source_table, col_name, col_type=None):
+	"""
+	Add description here
+	:return:
+	"""
+	if not col_type:
+		col_type = get_expr_type(col_name, source_table)
+
+	if is_psql_char_type(col_type):
+		dep_var_text_patched = "quote_literal({0})".format(col_name)
+	else:
+		dep_var_text_patched = col_name
+
+	levels = plpy.execute("""
+                SELECT DISTINCT {dep_var_text_patched} AS levels
+                FROM {source_table}
+                WHERE ({col_name}) is NOT NULL
+                """.format(**locals()))
+
+	levels = sorted(l["levels"] for l in levels)
+	return levels
+
+def get_one_hot_encoded_expr(col_name, col_levels):
+	"""
+	All the values in col_levels should have been quoted and escaped with
+	the sql function `quote_literal`.
+	:param col_name:
+	:param col_levels:
+	:return:
+	"""
+	one_hot_encoded_expr = ["({0}) = {1}".format(col_name, c)
+					for c in col_levels]
+	return 'ARRAY[{0}]::INTEGER[]'.format(', '.join(one_hot_encoded_expr))
+
+def quote_literal_python_list(values):
+	"""
+	This function will sql quote all the values inside a python list
+	:param values:
+	:return:
+	"""
+	quoted_values=[]
+	for value in values:
+		quoted_values.append(plpy.execute(""
+		"select quote_literal({0}{1}{0}) as quoted_value".
+		format(QUOTE_DELIMITER, value))[0]['quoted_value'])
+
+	return quoted_values

http://git-wip-us.apache.org/repos/asf/madlib/blob/4713b24e/src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in b/src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in
index 89eea6e..cbcd9b7 100644
--- a/src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in
+++ b/src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in
@@ -25,24 +25,24 @@
 from math import ceil
 import plpy
 
-from control import MinWarning
+from internal.db_utils import get_distinct_col_levels
+from internal.db_utils import get_one_hot_encoded_expr
 from utilities import add_postfix
 from utilities import _assert
 from utilities import get_seg_number
 from utilities import is_platform_pg
 from utilities import is_psql_boolean_type
 from utilities import is_psql_char_type
-from utilities import is_psql_numeric_type
 from utilities import is_psql_int_type
-from utilities import is_string_formatted_as_array_expression
+from utilities import is_valid_psql_type
 from utilities import py_list_to_sql_string
 from utilities import split_quoted_delimited_str
 from utilities import unique_string
-from utilities import _string_to_array
 from utilities import validate_module_input_params
+from utilities import NUMERIC, INTEGER, TEXT, BOOLEAN, INCLUDE_ARRAY
+
 from mean_std_dev_calculator import MeanStdDevCalculator
 from validate_args import get_expr_type
-from validate_args import output_tbl_valid
 from validate_args import _tbl_dimension_rownum
 
 m4_changequote(`<!', `!>')
@@ -58,10 +58,9 @@ class MiniBatchPreProcessor:
     preprocessing, which packs multiple rows of selected columns from the
     source table into one row based on the buffer size
     """
-    @MinWarning("error")
     def __init__(self, schema_madlib, source_table, output_table,
-                  dependent_varname, independent_varname, grouping_cols,
-                  buffer_size, one_hot_encode_int_dep_var=False, **kwargs):
+                 dependent_varname, independent_varname, grouping_cols,
+                 buffer_size, one_hot_encode_int_dep_var=False, **kwargs):
         self.schema_madlib = schema_madlib
         self.source_table = source_table
         self.output_table = output_table
@@ -69,119 +68,50 @@ class MiniBatchPreProcessor:
         self.independent_varname = independent_varname
         self.buffer_size = buffer_size
         self.grouping_cols = grouping_cols
-        self.one_hot_encode_int_dep_var = one_hot_encode_int_dep_var
 
         self.module_name = "minibatch_preprocessor"
         self.output_standardization_table = add_postfix(self.output_table,
                                                         "_standardization")
         self.output_summary_table = add_postfix(self.output_table, "_summary")
+        self.dependent_vartype = get_expr_type(self.dependent_varname,
+                                                self.source_table)
+
+        self.to_one_hot_encode = self.should_one_hot_encode(one_hot_encode_int_dep_var)
+        if self.to_one_hot_encode:
+            self.dependent_levels = get_distinct_col_levels(
+            self.source_table, self.dependent_varname, self.dependent_vartype)
+        else:
+            self.dependent_levels = None
         self._validate_minibatch_preprocessor_params()
 
-    @MinWarning("error")
     def minibatch_preprocessor(self):
         # Get array expressions for both dep and indep variables from the
         # MiniBatchQueryFormatter class
-        qry_formatter = MiniBatchQueryFormatter(self.source_table)
-        dependent_var_dbtype = get_expr_type(self.dependent_varname,
-                                             self.source_table)
-
-        dep_var_array_str, dep_var_classes_str = qry_formatter.\
-            get_dep_var_array_and_classes(self.dependent_varname,
-                                          dependent_var_dbtype,
-                                          self.one_hot_encode_int_dep_var)
-        indep_var_array_str = qry_formatter.get_indep_var_array_str(
-                                              self.independent_varname)
+        dep_var_array_expr = self.get_dep_var_array_expr()
+        indep_var_array_expr = self.get_indep_var_array_expr()
 
         standardizer = MiniBatchStandardizer(self.schema_madlib,
                                              self.source_table,
-                                             dep_var_array_str,
-                                             indep_var_array_str,
+                                             dep_var_array_expr,
+                                             indep_var_array_expr,
                                              self.grouping_cols,
                                              self.output_standardization_table)
 
         total_num_rows_processed, avg_num_rows_processed, \
         num_missing_rows_skipped = self._get_skipped_rows_processed_count(
-                                            dep_var_array_str,
-                                            indep_var_array_str)
+                                            dep_var_array_expr,
+                                            indep_var_array_expr)
         calculated_buffer_size = MiniBatchBufferSizeCalculator.\
             calculate_default_buffer_size(self.buffer_size,
                                           avg_num_rows_processed,
                                           standardizer.independent_var_dimension)
-        """
-        This query does the following:
-        1. Standardize the independent variables in the input table
-           (see MiniBatchStandardizer for more details)
-        2. Filter out rows with null values either in dependent/independent
-           variables
-        3. Converts the input dependent/independent variables into arrays
-          (see MiniBatchQueryFormatter for more details)
-        4. Based on the buffer size, pack the dependent/independent arrays into
-           matrices
-
-        Notes
-        1. we are ignoring null in x because
-             a. matrix_agg does not support null
-             b. __utils_normalize_data returns null if any element of the array
-                contains NULL
-        2. Please keep the null checking where clause of this query in sync with
-        the query in _get_skipped_rows_processed_count. We are doing this null
-        check in two places to prevent another pass of the entire dataset.
-        """
-
-        # This ID is the unique row id that get assigned to each row after
-        # preprocessing
-        unique_row_id = "__id__"
-        standardize_query = standardizer.get_query_for_standardizing()
-        if self.grouping_cols:
-            partition_by = 'PARTITION BY {0}'.format(self.grouping_cols)
-            grouping_cols_select_col = self.grouping_cols + ','
-            grouping_cols_group_by = ',' + self.grouping_cols
-        else:
-            partition_by = ''
-            grouping_cols_select_col = ''
-            grouping_cols_group_by = ''
-
-        sql = """
-            CREATE TABLE {self.output_table} AS
-            SELECT {row_id},
-                   {grouping_cols_select_col}
-                   {self.schema_madlib}.matrix_agg({dep_colname}) as {dep_colname},
-                   {self.schema_madlib}.matrix_agg({ind_colname}) as {ind_colname}
-            FROM (
-                SELECT (row_number() OVER ({partition_by} ORDER BY random()) - 1)
-                        / {buffer_size}
-                            as {row_id}, * FROM
-                (
-                    {standardize_query}
-                 ) sub_query_1
-                 WHERE NOT {self.schema_madlib}.array_contains_null({dep_colname})
-                 AND NOT {self.schema_madlib}.array_contains_null({ind_colname})
-            ) sub_query_2
-            GROUP BY {row_id} {grouping_cols_group_by}
-            {distributed_by_clause}
-            """.format(buffer_size=calculated_buffer_size,
-                       dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
-                       ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
-                       row_id=unique_row_id,
-                       distributed_by_clause='' if is_platform_pg() else
-                                             'DISTRIBUTED RANDOMLY',
-                       **locals())
-        plpy.execute(sql)
+        self.create_output_table(standardizer, calculated_buffer_size)
 
         standardizer.create_output_standardization_table()
         standardizer.drop_standardized_table()
-        MiniBatchSummarizer.create_output_summary_table(
-            self.output_summary_table,
-            self.source_table,
-            self.output_table,
-            self.dependent_varname,
-            self.independent_varname,
-            dependent_var_dbtype,
-            calculated_buffer_size,
-            dep_var_classes_str,
-            total_num_rows_processed,
-            num_missing_rows_skipped,
-            self.grouping_cols)
+        self.create_output_summary_table(calculated_buffer_size,
+                                        total_num_rows_processed,
+                                        num_missing_rows_skipped)
 
     def _validate_minibatch_preprocessor_params(self):
         # Test if the independent variable can be typecasted to a double
@@ -189,8 +119,7 @@ class MiniBatchPreProcessor:
 
         # Note that this will not fail for 2d arrays but the standardizer will
         # fail because utils_normalize_data will throw an error
-        typecasted_ind_varname = "{0}::double precision[]".format(
-                                                    self.independent_varname)
+        typecasted_ind_varname = self.get_indep_var_array_expr()
         validate_module_input_params(self.source_table, self.output_table,
                                      typecasted_ind_varname,
                                      self.dependent_varname, self.module_name,
@@ -198,9 +127,14 @@ class MiniBatchPreProcessor:
                                      [self.output_summary_table,
                                       self.output_standardization_table])
 
-        num_of_dependent_cols = split_quoted_delimited_str(
-                                                        self.dependent_varname)
 
+
+        num_of_dependent_cols = split_quoted_delimited_str(self.dependent_varname)
+        valid_types = NUMERIC | TEXT | BOOLEAN
+        _assert(is_valid_psql_type(self.dependent_vartype,
+                                    valid_types | INCLUDE_ARRAY),
+                "Invalid dependent variable type should be one of {0}".
+                format(','.join(valid_types)))
         _assert(len(num_of_dependent_cols) == 1,
                 "Invalid dependent_varname: only one column name is allowed "
                 "as input.")
@@ -236,9 +170,9 @@ class MiniBatchPreProcessor:
                                        if self.grouping_cols else '')
         result = plpy.execute(query)
 
-        ## SUM and AVG both return float, and we have to cast them into int fo
-        ## summary table. For avg_num_rows_processed we need to ceil first so
-        ## that the minimum won't be 0
+        # SUM and AVG both return float, and we have to cast them into int fo
+        # summary table. For avg_num_rows_processed we need to ceil first so
+        # that the minimum won't be 0
         source_table_row_count = int(result[0]['source_table_row_count'])
         total_num_rows_processed = int(result[0]['total_num_rows_processed'])
         avg_num_rows_processed = int(ceil(result[0]['avg_num_rows_processed']))
@@ -252,19 +186,13 @@ class MiniBatchPreProcessor:
         return (total_num_rows_processed, avg_num_rows_processed,
                 num_missing_rows_skipped)
 
+    def should_one_hot_encode(self, one_hot_encode_int_dep_var):
+        return (is_psql_char_type(self.dependent_vartype) or
+                is_psql_boolean_type(self.dependent_vartype) or
+                (is_psql_int_type(self.dependent_vartype) and
+                    one_hot_encode_int_dep_var))
 
-class MiniBatchQueryFormatter:
-    """ This class is responsible for formatting the independent and dependent
-    variables into arrays so that they can be matrix agged by the preprocessor
-    class.
-    """
-    def __init__(self, source_table):
-        self.source_table = source_table
-
-    def get_dep_var_array_and_classes(self,
-                                      dependent_varname,
-                                      dependent_var_dbtype,
-                                      to_one_hot_encode_int=False):
+    def get_dep_var_array_expr(self):
         """
         :param dependent_varname: Name of the dependent variable
         :param to_one_hot_encode_int: Boolean to determine if dependent
@@ -277,61 +205,23 @@ class MiniBatchQueryFormatter:
 
             If dep_type == numeric , do not encode
                     1. dependent_varname = rings
-                        transformed_value = ARRAY[[rings1], [rings2], []]
-                        class_level_str = ARRAY[rings = 'rings1',
-                                                rings = 'rings2']::integer[]
+                        transformed_value = ARRAY[rings]
                     2. dependent_varname = ARRAY[a, b, c]
-                        transformed_value = ARRAY[[a1, b1, c1], [a2, b2, c2], []]
-                        class_level_str = 'NULL::TEXT'
+                        transformed_value = ARRAY[a, b, c]
             else if dep_type in ("text", "boolean"), encode:
                     3. dependent_varname = rings (encoding)
-                        transformed_value = ARRAY[[rings1=1, rings1=2], [rings2=1,
-                                                    rings2=2], []]
-                        class_level_str = 'NULL::TEXT'
+                        transformed_value = ARRAY[rings=1, rings=2, rings=3]
         """
-        dep_var_class_value_str = 'NULL::TEXT'
-        is_dep_var_int_type = is_psql_int_type(dependent_var_dbtype)
-        to_one_hot_encode = (is_psql_char_type(dependent_var_dbtype) or
-                             is_psql_boolean_type(dependent_var_dbtype) or
-                                (to_one_hot_encode_int and
-                                    is_dep_var_int_type))
-        if to_one_hot_encode:
-            # for encoding, since dependent_varname can also be a logical
-            # expression, there is a () around it
-            dep_level_sql = """
-                SELECT DISTINCT ({dependent_varname}) AS class
-                FROM {source_table}
-                WHERE ({dependent_varname}) is NOT NULL
-                """.format(dependent_varname=dependent_varname,
-                           source_table=self.source_table)
-            dep_levels = plpy.execute(dep_level_sql)
-            dep_var_classes = sorted(l["class"] for l in dep_levels)
-            dep_var_array_str = \
-                self._get_one_hot_encoded_str(dependent_varname,
-                                              dep_var_classes,
-                                              to_quote=not is_dep_var_int_type)
-            dep_var_class_value_str = \
-                py_list_to_sql_string(dep_var_classes,
-                                      array_type=dependent_var_dbtype)
-        elif "[]" in dependent_var_dbtype:
-            dep_var_array_str = dependent_varname
-        elif is_psql_numeric_type(dependent_var_dbtype):
-            dep_var_array_str = 'ARRAY[{0}]'.format(dependent_varname)
-        else:
-            plpy.error("""Invalid dependent variable type. It should be character,
-                boolean, numeric, or array.""")
-
-        return dep_var_array_str, dep_var_class_value_str
+        if "[]" == self.dependent_vartype[-2:]:
+            return self.dependent_varname
 
-    def _get_one_hot_encoded_str(self, var_name, var_classes, to_quote=True):
-        def add_quote(c):
-            return "'{0}'".format(c) if to_quote else c
-
-        one_hot_list = ["({0}) = {1}".format(var_name, add_quote(c))
-                        for c in var_classes]
-        return 'ARRAY[{0}]::INTEGER[]'.format(', '.join(one_hot_list))
+        if self.to_one_hot_encode:
+            return get_one_hot_encoded_expr(self.dependent_varname,
+                                            self.dependent_levels)
+        else:
+            return "ARRAY[({0})]".format(self.dependent_varname)
 
-    def get_indep_var_array_str(self, independent_varname):
+    def get_indep_var_array_expr(self):
         """ we assume that all the independent features are either numeric or
         already encoded by the user.
         Supported formats
@@ -342,9 +232,106 @@ class MiniBatchQueryFormatter:
 
         we don't deal with a mixture of scalar and array independent variables
         """
-        return "({0})::DOUBLE PRECISION[]".format(independent_varname)
+        return "({0})::DOUBLE PRECISION[]".format(self.independent_varname)
+
+    def create_output_table(self, standardizer, calculated_buffer_size):
+        """
+        This query does the following:
+        1. Standardize the independent variables in the input table
+           (see MiniBatchStandardizer for more details)
+        2. Filter out rows with null values either in dependent/independent
+           variables
+        3. Converts the input dependent/independent variables into arrays
+          (see MiniBatchQueryFormatter for more details)
+        4. Based on the buffer size, pack the dependent/independent arrays into
+           matrices
+
+        Notes
+        1. we are ignoring null in x because
+             a. matrix_agg does not support null
+             b. __utils_normalize_data returns null if any element of the array
+                contains NULL
+        2. Please keep the null checking where clause of this query in sync with
+        the query in _get_skipped_rows_processed_count. We are doing this null
+        check in two places to prevent another pass of the entire dataset.
+        """
+
+        # This ID is the unique row id that get assigned to each row after
+        # preprocessing
+        unique_row_id = "__id__"
+        standardize_query = standardizer.get_query_for_standardizing()
+        if self.grouping_cols:
+            partition_by = 'PARTITION BY {0}'.format(self.grouping_cols)
+            grouping_cols_select_col = self.grouping_cols + ','
+            grouping_cols_group_by = ',' + self.grouping_cols
+        else:
+            partition_by = ''
+            grouping_cols_select_col = ''
+            grouping_cols_group_by = ''
+
+        sql = """
+            CREATE TABLE {self.output_table} AS
+            SELECT {row_id},
+                   {grouping_cols_select_col}
+                   {self.schema_madlib}.matrix_agg({dep_colname}) as {dep_colname},
+                   {self.schema_madlib}.matrix_agg({ind_colname}) as {ind_colname}
+            FROM (
+                SELECT (row_number() OVER ({partition_by} ORDER BY random()) - 1)
+                        / {buffer_size}
+                            as {row_id}, * FROM
+                (
+                    {standardize_query}
+                 ) sub_query_1
+                 WHERE NOT {self.schema_madlib}.array_contains_null({dep_colname})
+                 AND NOT {self.schema_madlib}.array_contains_null({ind_colname})
+            ) sub_query_2
+            GROUP BY {row_id} {grouping_cols_group_by}
+            {distributed_by_clause}
+            """.format(buffer_size=calculated_buffer_size,
+                       dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
+                       ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
+                       row_id=unique_row_id,
+                       distributed_by_clause='' if is_platform_pg() else
+                       'DISTRIBUTED RANDOMLY',
+                       **locals())
+        plpy.execute(sql)
 
 
+    def create_output_summary_table(self,
+                                    buffer_size,
+                                    total_num_rows_processed,
+                                    num_missing_rows_skipped):
+        # 1. All the string columns are surrounded by "$__madlib__$" to take care of
+        #    special characters in the column name.
+        # 2. We have to typecast all the string column names to ::TEXT because
+        #    otherwise there is a warning from psql
+        #    WARNING: column "independent_varname" has type "un
+        # known"
+        # class_level_str = ARRAY[rings = '1', rings = '2']::integer[]
+        class_level_str='NULL::TEXT'
+        if self.dependent_levels:
+            class_level_str=py_list_to_sql_string(
+                self.dependent_levels, array_type=self.dependent_vartype,
+                long_format=True)
+
+        query = """
+            CREATE TABLE {self.output_summary_table} AS
+            SELECT
+                $__madlib__${self.source_table}$__madlib__$::TEXT AS source_table,
+                $__madlib__${self.output_table}$__madlib__$::TEXT AS output_table,
+                $__madlib__${self.dependent_varname}$__madlib__$::TEXT AS dependent_varname,
+                $__madlib__${self.independent_varname}$__madlib__$::TEXT AS independent_varname,
+                $__madlib__${self.dependent_vartype}$__madlib__$::TEXT AS dependent_vartype,
+                {buffer_size} AS buffer_size,
+                {class_level_str} AS class_values,
+                {total_num_rows_processed} AS num_rows_processed,
+                {num_missing_rows_skipped} AS num_missing_rows_skipped,
+                {grouping_cols_str}::TEXT AS grouping_cols
+        """.format(grouping_cols_str="$__madlib__$" + self.grouping_cols + "$__madlib__$"
+                    if self.grouping_cols else "NULL",
+                   **locals())
+        plpy.execute(query)
+
 class MiniBatchStandardizer:
     """
     This class is responsible for
@@ -419,7 +406,6 @@ class MiniBatchStandardizer:
                      ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
                      self=self)
 
-
     def _get_query_for_standardizing_with_grouping(self):
         return """
         CREATE TEMP TABLE {self.standardized_table} AS
@@ -432,9 +418,9 @@ class MiniBatchStandardizer:
             ) AS {ind_colname},
             {self.source_table}.{self.grouping_cols}
         FROM
-          {self.source_table} 
-          INNER JOIN 
-          {self.x_mean_table} AS __x__ 
+          {self.source_table}
+          INNER JOIN
+          {self.x_mean_table} AS __x__
           ON  {self.source_table}.{self.grouping_cols} = __x__.{self.grouping_cols}
         """.format(
             self=self,
@@ -459,45 +445,6 @@ class MiniBatchStandardizer:
         plpy.execute("DROP TABLE IF EXISTS {0}".format(self.standardized_table))
 
 
-class MiniBatchSummarizer:
-    @staticmethod
-    def create_output_summary_table(output_summary_table,
-                                    source_table,
-                                    output_table,
-                                    dep_var_array_str,
-                                    indep_var_array_str,
-                                    dependent_var_dbtype,
-                                    buffer_size,
-                                    class_values,
-                                    total_num_rows_processed,
-                                    num_missing_rows_skipped,
-                                    grouping_cols):
-        # 1. All the string columns are surrounded by "$$" to take care of
-        #    special characters in the column name.
-        # 2. We have to typecast all the string column names to ::TEXT because
-        #    otherwise there is a warning from psql
-        #    WARNING: column "independent_varname" has type "unknown"
-        query = """
-            CREATE TABLE {output_summary_table} AS
-            SELECT
-                $${source_table}$$::TEXT AS source_table,
-                $${output_table}$$::TEXT AS output_table,
-                $${dependent_varname}$$::TEXT AS dependent_varname,
-                $${independent_varname}$$::TEXT AS independent_varname,
-                $${dependent_var_dbtype}$$::TEXT AS dependent_vartype,
-                {buffer_size} AS buffer_size,
-                {class_values} AS class_values,
-                {total_num_rows_processed} AS num_rows_processed,
-                {num_missing_rows_skipped} AS num_missing_rows_skipped,
-                {grouping_cols_str}::TEXT AS grouping_cols
-        """.format(dependent_varname=dep_var_array_str,
-                   independent_varname=indep_var_array_str,
-                   grouping_cols_str="$$" + grouping_cols + "$$"
-                                     if grouping_cols else "NULL",
-                   **locals())
-        plpy.execute(query)
-
-
 class MiniBatchBufferSizeCalculator:
     """
     This class is responsible for calculating the buffer size.

http://git-wip-us.apache.org/repos/asf/madlib/blob/4713b24e/src/ports/postgres/modules/utilities/minibatch_preprocessing.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/minibatch_preprocessing.sql_in b/src/ports/postgres/modules/utilities/minibatch_preprocessing.sql_in
index 4a08702..75adcc9 100644
--- a/src/ports/postgres/modules/utilities/minibatch_preprocessing.sql_in
+++ b/src/ports/postgres/modules/utilities/minibatch_preprocessing.sql_in
@@ -550,8 +550,10 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.minibatch_preprocessor(
     one_hot_encode_int_dep_var  BOOLEAN
 ) RETURNS VOID AS $$
     PythonFunctionBodyOnly(utilities, minibatch_preprocessing)
-    minibatch_preprocessor_obj = minibatch_preprocessing.MiniBatchPreProcessor(**globals())
-    minibatch_preprocessor_obj.minibatch_preprocessor()
+    from utilities.control import MinWarning
+    with MinWarning('error'):
+        minibatch_preprocessor_obj = minibatch_preprocessing.MiniBatchPreProcessor(**globals())
+        minibatch_preprocessor_obj.minibatch_preprocessor()
 $$ LANGUAGE plpythonu VOLATILE
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 

http://git-wip-us.apache.org/repos/asf/madlib/blob/4713b24e/src/ports/postgres/modules/utilities/test/minibatch_preprocessing.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/test/minibatch_preprocessing.sql_in b/src/ports/postgres/modules/utilities/test/minibatch_preprocessing.sql_in
index 97ed51f..b6b2996 100644
--- a/src/ports/postgres/modules/utilities/test/minibatch_preprocessing.sql_in
+++ b/src/ports/postgres/modules/utilities/test/minibatch_preprocessing.sql_in
@@ -292,27 +292,59 @@ SELECT assert
         || ind_var_rows
         ) from (select max(array_upper(o.dependent_varname, 1)) as dep_var_rows, max(array_upper(o.independent_varname, 1)) as ind_var_rows , s1.buffer_size from minibatch_preprocessing_out o, minibatch_preprocessing_out_summary s1 group by buffer_size) s;
 
--- Test special characters in independent_var, dependent_var and grouping_cols
+-- Test default buffer size calculator
 DROP TABLE IF EXISTS minibatch_preprocessing_input;
 CREATE TABLE minibatch_preprocessing_input(
-    "se''x" TEXT,
-    "len'%*()gth" DOUBLE PRECISION[],
-    "rin!#'gs" INTEGER);
+    sex TEXT,
+    length DOUBLE PRECISION[],
+    rings INTEGER);
 
 INSERT INTO minibatch_preprocessing_input VALUES
 ('F',ARRAY[0.66, 0.5],6);
 DROP TABLE IF EXISTS minibatch_preprocessing_out, minibatch_preprocessing_out_standardization, minibatch_preprocessing_out_summary;
-SELECT minibatch_preprocessor('minibatch_preprocessing_input', 'minibatch_preprocessing_out', '"se''''x"', '"len''%*()gth"', '"rin!#''gs"');
+SELECT minibatch_preprocessor('minibatch_preprocessing_input', 'minibatch_preprocessing_out', 'sex', 'length', 'rings');
 SELECT assert
         (
         source_table        = 'minibatch_preprocessing_input' AND
         output_table        = 'minibatch_preprocessing_out' AND
-        dependent_varname   = '"se''''x"' AND
-        independent_varname = '"len''%*()gth"' AND
+        dependent_varname   = 'sex' AND
+        independent_varname = 'length' AND
         buffer_size         = 1 AND
         class_values        = '{F}' AND -- we sort the class values in python
         num_rows_processed  = 1 AND
         num_missing_rows_skipped    = 0 AND
-        grouping_cols       = '"rin!#''gs"',
+        grouping_cols       = 'rings',
         'Summary Validation failed for special chars. Actual:' || __to_char(summary)
         ) from (select * from minibatch_preprocessing_out_summary) summary;
+
+
+-- Test special characters and unicode characters in independent_var, dependent_var and grouping_cols, both for column name and column value
+DROP TABLE IF EXISTS minibatch_preprocessing_input;
+CREATE TABLE minibatch_preprocessing_input(
+    "se$$''x" TEXT,
+    "len$$'%*()gth" DOUBLE PRECISION[],
+    "rin$$Ж!#'gs" INTEGER);
+
+INSERT INTO minibatch_preprocessing_input VALUES
+('M''M',ARRAY[0.66, 0.5],6),
+('''M''M''',ARRAY[0.66, 0.5],6),
+('M|$$M',ARRAY[0.66, 0.5],6),
+('M,M',ARRAY[0.66, 0.5],6),
+('M@[}(:*;M',ARRAY[0.66, 0.5],6),
+('M"M',ARRAY[0.66, 0.5],6),
+('MЖM',ARRAY[0.66, 0.5],6);
+DROP TABLE IF EXISTS minibatch_preprocessing_out, minibatch_preprocessing_out_standardization, minibatch_preprocessing_out_summary;
+SELECT minibatch_preprocessor('minibatch_preprocessing_input', 'minibatch_preprocessing_out', '"se$$''''x"', '"len$$''%*()gth"', '"rin$$Ж!#''gs"', 4);
+SELECT assert(
+        source_table        = 'minibatch_preprocessing_input' AND
+        output_table        = 'minibatch_preprocessing_out' AND
+        dependent_varname   = '"se$$''''x"' AND
+        independent_varname = '"len$$''%*()gth"' AND
+        buffer_size         = 4 AND
+        class_values        = $__madlib__${ 'M'M', M\"M, M'M, "M,M", "M@[}(:*;M", "M|$$M", MЖM }$__madlib__$ AND
+        -- we sort the class values in python
+        num_rows_processed  = 7 AND
+        num_missing_rows_skipped    = 0 AND
+        grouping_cols       = '"rin$$Ж!#''gs"',
+        'Summary Validation failed for special chars. Actual:' || __to_char(summary)
+        ) from (select * from minibatch_preprocessing_out_summary order by class_values) summary;

http://git-wip-us.apache.org/repos/asf/madlib/blob/4713b24e/src/ports/postgres/modules/utilities/test/unit_tests/test_minibatch_preprocessing.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/test/unit_tests/test_minibatch_preprocessing.py_in b/src/ports/postgres/modules/utilities/test/unit_tests/test_minibatch_preprocessing.py_in
index 75cc044..b262964 100644
--- a/src/ports/postgres/modules/utilities/test/unit_tests/test_minibatch_preprocessing.py_in
+++ b/src/ports/postgres/modules/utilities/test/unit_tests/test_minibatch_preprocessing.py_in
@@ -19,8 +19,11 @@
 
 import sys
 from os import path
-# Add utilites module to the pythonpath.
-sys.path.append(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))
+
+# Add modules to the pythonpath.
+sys.path.append(path.dirname(path.dirname(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))))
+#modules/utilities to the pythonpath.
+sys.path.append(path.dirname(path.dirname(path.dirname(path.dirname(path.abspath(__file__))))))
 
 import unittest
 from mock import *
@@ -29,125 +32,13 @@ import plpy_mock as plpy
 m4_changequote(`<!', `!>')
 
 
-# Commenting out MiniBatchPreProcessing test cases till we have a solution for
-# mocking out the MinWarning decorator.
-
-# class MiniBatchPreProcessingTestCase(unittest.TestCase):
-#     def setUp(self):
-#         self.plpy_mock = Mock(spec='error')
-#         patches = {
-#             'plpy': plpy,
-#             'mean_std_dev_calculator': Mock()
-#         }
-
-#         # we need to use MagicMock() instead of Mock() for the plpy.execute mock
-#         # to be able to iterate on the return value
-#         self.plpy_mock_execute = MagicMock()
-#         plpy.execute = self.plpy_mock_execute
-
-#         self.module_patcher = patch.dict('sys.modules', patches)
-#         self.module_patcher.start()
-
-#         self.default_schema_madlib = "madlib"
-#         self.default_source_table = "source"
-#         self.default_output_table = "output"
-#         self.default_dep_var = "depvar"
-#         self.default_ind_var = "indvar"
-#         self.grouping_cols = None
-#         self.default_buffer_size = 5
-
-#         import minibatch_preprocessing
-#         self.module = minibatch_preprocessing
-#         self.module.validate_module_input_params = Mock()
-#         self.output_tbl_valid_mock = Mock()
-#         self.module.output_tbl_valid = self.output_tbl_valid_mock
-
-#         self.minibatch_query_formatter = self.module.MiniBatchQueryFormatter
-#         self.minibatch_query_formatter.get_dep_var_array_and_classes = Mock(
-#                                         return_value=("anything1", "anything2"))
-#         self.minibatch_query_formatter.get_indep_var_array_str = Mock(
-#                                         return_value="anything3")
-
-#         self.module.MiniBatchStandardizer = Mock()
-#         self.module.MiniBatchSummarizer = Mock()
-#         self.module.get_expr_type = MagicMock(return_value="anytype")
-
-#     def tearDown(self):
-#         self.module_patcher.stop()
-
-#     def test_minibatch_preprocessor_executes_query(self):
-#         preprocessor_obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
-#                                                              "input",
-#                                                              "out",
-#                                                              self.default_dep_var,
-#                                                              self.default_ind_var,
-#                                                              self.grouping_cols,
-#                                                              self.default_buffer_size)
-#         self.plpy_mock_execute.side_effect = [[{"source_table_row_count":5 ,
-#                                                 "total_num_rows_processed":3,
-#                                                 "avg_num_rows_processed": 2}], ""]
-#         preprocessor_obj.minibatch_preprocessor()
-#         self.assertEqual(2, self.plpy_mock_execute.call_count)
-#         self.assertEqual(self.default_buffer_size, preprocessor_obj.buffer_size)
-
-#     def test_minibatch_preprocessor_null_buffer_size_executes_query(self):
-#         preprocessor_obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
-#                                                              "input",
-#                                                              "out",
-#                                                              self.default_dep_var,
-#                                                              self.default_ind_var,
-#                                                              self.grouping_cols,
-#                                                              None)
-#         self.plpy_mock_execute.side_effect = [[{"source_table_row_count":5 ,
-#                                                 "total_num_rows_processed":3,
-#                                                 "avg_num_rows_processed": 2}], ""]
-#         self.module.MiniBatchBufferSizeCalculator.calculate_default_buffer_size = Mock()
-#         preprocessor_obj.minibatch_preprocessor()
-#         self.assertEqual(2, self.plpy_mock_execute.call_count)
-
-#     def test_minibatch_preprocessor_multiple_dep_var_raises_exception(self):
-#             with self.assertRaises(plpy.PLPYException):
-#                 self.module.MiniBatchPreProcessor(self.default_schema_madlib,
-#                                                   self.default_source_table,
-#                                                   self.default_output_table,
-#                                                   "y1,y2",
-#                                                   self.default_ind_var,
-#                                                   self.grouping_cols,
-#                                                   self.default_buffer_size)
-
-#     def test_minibatch_preprocessor_buffer_size_zero_fails(self):
-#         with self.assertRaises(plpy.PLPYException):
-#             self.module.MiniBatchPreProcessor(self.default_schema_madlib,
-#                                               self.default_source_table,
-#                                               self.default_output_table,
-#                                               self.default_dep_var,
-#                                               self.default_ind_var,
-#                                               self.grouping_cols,
-#                                               0)
-
-#     def test_minibatch_preprocessor_buffer_size_one_passes(self):
-#         #not sure how to assert that an exception has not been raised
-#         preprocessor_obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
-#                                                              self.default_source_table,
-#                                                              self.default_output_table,
-#                                                              self.default_dep_var,
-#                                                              self.default_ind_var,
-#                                                              self.grouping_cols,
-#                                                              1)
-#         preprocessor_obj.minibatch_preprocessor()
-#         self.assertEqual(1, drop_table_mock.call_count)
-
-
-class MiniBatchQueryFormatterTestCase(unittest.TestCase):
+class MiniBatchPreProcessingTestCase(unittest.TestCase):
     def setUp(self):
-        self.default_source_table = "source"
-        self.default_dep_var = "depvar"
-        self.default_ind_var = "indvar"
+        self.plpy_mock = Mock(spec='error')
         patches = {
             'plpy': plpy,
-            'mean_std_dev_calculator': Mock()
+            'utilities.mean_std_dev_calculator': Mock(),
         }
-
         # we need to use MagicMock() instead of Mock() for the plpy.execute mock
         # to be able to iterate on the return value
         self.plpy_mock_execute = MagicMock()
@@ -156,67 +47,183 @@ class MiniBatchQueryFormatterTestCase(unittest.TestCase):
         self.module_patcher = patch.dict('sys.modules', patches)
         self.module_patcher.start()
 
-        import minibatch_preprocessing
-        self.module = minibatch_preprocessing
-        self.subject = self.module.MiniBatchQueryFormatter(self.default_source_table)
+        self.default_schema_madlib = "madlib"
+        self.default_source_table = "source"
+        self.default_output_table = "output"
+        self.default_dep_var = "depvar"
+        self.default_ind_var = "indvar"
+        self.grouping_cols = None
+        self.default_buffer_size = 5
+
+        import utilities.minibatch_preprocessing
+        self.module = utilities.minibatch_preprocessing
+        self.module.validate_module_input_params = Mock()
+        self.output_tbl_valid_mock = Mock()
+        self.module.output_tbl_valid = self.output_tbl_valid_mock
+
+        self.module.MiniBatchStandardizer = Mock()
+        self.module.get_expr_type = MagicMock(return_value="character")
+        self.module.get_distinct_col_levels = Mock(return_value = [0,22,100])
+        self.subject = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
+                                                self.default_source_table,
+                                                self.default_output_table,
+                                                self.default_dep_var,
+                                                self.default_ind_var,
+                                                self.grouping_cols,
+                                                self.default_buffer_size)
 
     def tearDown(self):
         self.module_patcher.stop()
 
-    def test_get_dep_var_array_str_int_type(self):
-        self.plpy_mock_execute.return_value = [{"class":100},{"class":0},
-                                               {"class":22}]
-
-        dep_var_array_str, _ = self.subject.get_dep_var_array_and_classes(
-            self.default_dep_var, 'integer', to_one_hot_encode_int=True)
-
+    def test_minibatch_preprocessor_executes_query(self):
+        preprocessor_obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
+                                                             "input",
+                                                             "out",
+                                                             self.default_dep_var,
+                                                             self.default_ind_var,
+                                                             self.grouping_cols,
+                                                             self.default_buffer_size)
+        preprocessor_obj.minibatch_preprocessor()
+
+    def test_minibatch_preprocessor_null_buffer_size_executes_query(self):
+        preprocessor_obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
+                                                             "input",
+                                                             "out",
+                                                             self.default_dep_var,
+                                                             self.default_ind_var,
+                                                             self.grouping_cols,
+                                                             None)
+        self.module.MiniBatchBufferSizeCalculator.calculate_default_buffer_size = Mock()
+        preprocessor_obj.minibatch_preprocessor()
+
+    def test_minibatch_preprocessor_multiple_dep_var_raises_exception(self):
+            with self.assertRaises(plpy.PLPYException):
+                self.module.MiniBatchPreProcessor(self.default_schema_madlib,
+                                                  self.default_source_table,
+                                                  self.default_output_table,
+                                                  "y1,y2",
+                                                  self.default_ind_var,
+                                                  self.grouping_cols,
+                                                  self.default_buffer_size)
+
+    def test_minibatch_preprocessor_buffer_size_zero_fails(self):
+        with self.assertRaises(plpy.PLPYException):
+            self.module.MiniBatchPreProcessor(self.default_schema_madlib,
+                                              self.default_source_table,
+                                              self.default_output_table,
+                                              self.default_dep_var,
+                                              self.default_ind_var,
+                                              self.grouping_cols,
+                                              0)
+
+    def test_minibatch_preprocessor_buffer_size_one_passes(self):
+        #not sure how to assert that an exception has not been raised
+        preprocessor_obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
+                                                             self.default_source_table,
+                                                             self.default_output_table,
+                                                             self.default_dep_var,
+                                                             self.default_ind_var,
+                                                             self.grouping_cols,
+                                                             1)
+        preprocessor_obj.minibatch_preprocessor()
+
+    def test_get_dep_var_array_expr_array_type(self):
+        self.module.get_expr_type = MagicMock(return_value="integer[]")
+        obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
+                                                self.default_source_table,
+                                                self.default_output_table,
+                                                self.default_dep_var,
+                                                self.default_ind_var,
+                                                self.grouping_cols,
+                                                self.default_buffer_size)
+        dep_var_array_expr = obj.get_dep_var_array_expr()
+
+        self.assertEqual(self.default_dep_var, dep_var_array_expr)
+
+    def test_get_dep_var_array_expr_numeric_type(self):
+        self.module.get_expr_type = MagicMock(return_value="integer")
+        self.module.get_distinct_col_levels = Mock(return_value = [0,22,100])
+        obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
+                                                     self.default_source_table,
+                                                     self.default_output_table,
+                                                     self.default_dep_var,
+                                                     self.default_ind_var,
+                                                     self.grouping_cols,
+                                                     self.default_buffer_size,
+                                                     one_hot_encode_int_dep_var=True)
+        dep_var_array_expr = obj.get_dep_var_array_expr()
         self.assertEqual("array[({0}) = 0, ({0}) = 22, ({0}) = 100]::integer[]".
                          format(self.default_dep_var),
-                         dep_var_array_str.lower())
-
-    def test_get_dep_var_array_str_text_type(self):
-        self.plpy_mock_execute.return_value = [{"class":'100'},{"class":'0'},
-                                               {"class":'22'}]
-
-        # if dependent_var_dbtype = 'text' then sorting is string sorting and
-        # not by actual value
-        dep_var_array_str, _ = self.subject.get_dep_var_array_and_classes\
-                                                (self.default_dep_var, 'text')
-        self.assertEqual("array[({0}) = '0', ({0}) = '100', ({0}) = '22']::integer[]".
-                         format(self.default_dep_var),
-                         dep_var_array_str.lower())
-
-    def test_get_dep_var_array_str_boolean_type(self):
-        self.plpy_mock_execute.return_value = [{"class":3}]
-
-        dep_var_array_str, _ = self.subject.\
-                            get_dep_var_array_and_classes(self.default_dep_var,
-                                                          'boolean')
-        self.assertEqual("array[({0}) = '3']::integer[]".format(self.default_dep_var),
-                         dep_var_array_str.lower())
-
-    def test_get_dep_var_array_str_array_type(self):
-        dep_var_array_str, _ = self.subject.\
-                        get_dep_var_array_and_classes(self.default_dep_var,
-                                                      'some_array[]')
-
-        self.assertEqual(self.default_dep_var, dep_var_array_str)
-
-    def test_get_dep_var_array_str_numeric_type(self):
-        dep_var_array_str, _ = self.subject. \
-            get_dep_var_array_and_classes(self.default_dep_var, 'integer')
-
-        self.assertEqual("array[{0}]".format(self.default_dep_var),
-                                            dep_var_array_str.lower())
-
-    def test_get_dep_var_array_str_other_type(self):
+                         dep_var_array_expr.lower())
+
+        self.module.get_expr_type = MagicMock(return_value="numeric")
+        obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
+                                                self.default_source_table,
+                                                self.default_output_table,
+                                                self.default_dep_var,
+                                                self.default_ind_var,
+                                                self.grouping_cols,
+                                                self.default_buffer_size,
+                                                one_hot_encode_int_dep_var=False)
+        dep_var_array_expr = obj.get_dep_var_array_expr()
+        self.assertEqual("array[({0})]".format(self.default_dep_var),
+            dep_var_array_expr.lower())
+
+
+    def test_get_dep_var_array_expr_text_type(self):
+        self.module.get_expr_type = MagicMock(return_value="character")
+        self.module.get_distinct_col_levels = Mock(return_value = ["'a'","'b'","'c'"])
+        obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
+                                            self.default_source_table,
+                                            self.default_output_table,
+                                            self.default_dep_var,
+                                            self.default_ind_var,
+                                            self.grouping_cols,
+                                            self.default_buffer_size)
+        dep_var_array_expr = obj.get_dep_var_array_expr()
+        self.assertEqual("array[({0}) = 'a', ({0}) = 'b', ({0}) = 'c']::integer[]".
+                     format(self.default_dep_var),
+                     dep_var_array_expr.lower())
+
+    def test_get_dep_var_array_expr_bool_type(self): #TODO check for boolean types
+        self.module.get_expr_type = MagicMock(return_value="boolean")
+        self.module.get_distinct_col_levels = Mock(return_value = ['True', False])
+        obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
+                                                self.default_source_table,
+                                                self.default_output_table,
+                                                self.default_dep_var,
+                                                self.default_ind_var,
+                                                self.grouping_cols,
+                                                self.default_buffer_size)
+        dep_var_array_expr = obj.get_dep_var_array_expr()
+        self.assertEqual("array[({0}) = True, ({0}) = False]::integer[]".
+                         format(self.default_dep_var).lower(),
+                         dep_var_array_expr.lower())
+
+
+    def test_get_dep_var_array_expr_other_type(self):
+        self.module.get_expr_type = MagicMock(return_value="other")
         with self.assertRaises(plpy.PLPYException):
-            self.subject.get_dep_var_array_and_classes(self.default_dep_var,
-                                                       'other')
-
-    def test_get_indep_var_array_str_passes(self):
-        ind_var_array_str = self.subject.get_indep_var_array_str('array[x1,x2,x3]')
-        self.assertEqual("(array[x1,x2,x3])::double precision[]", ind_var_array_str.lower())
+            self.module.MiniBatchPreProcessor(self.default_schema_madlib,
+                                                self.default_source_table,
+                                                self.default_output_table,
+                                                self.default_dep_var,
+                                                self.default_ind_var,
+                                                self.grouping_cols,
+                                                self.default_buffer_size)
+        self.module.get_expr_type = MagicMock(return_value="other[]")
+        with self.assertRaises(plpy.PLPYException):
+            self.module.MiniBatchPreProcessor(self.default_schema_madlib,
+                                              self.default_source_table,
+                                              self.default_output_table,
+                                              self.default_dep_var,
+                                              self.default_ind_var,
+                                              self.grouping_cols,
+                                              self.default_buffer_size)
+
+    def test_get_indep_var_array_expr_passes(self):
+        ind_var_array_str = self.subject.get_indep_var_array_expr()
+        self.assertEqual("({0})::double precision[]".format(self.default_ind_var), ind_var_array_str.lower())
 
 class MiniBatchQueryStandardizerTestCase(unittest.TestCase):
     def setUp(self):
@@ -227,7 +234,7 @@ class MiniBatchQueryStandardizerTestCase(unittest.TestCase):
         self.mean_std_calculator_mock = Mock()
         patches = {
             'plpy': plpy,
-            'mean_std_dev_calculator': self.mean_std_calculator_mock
+            'utilities.mean_std_dev_calculator': self.mean_std_calculator_mock
         }
         self.x_mean = "5678"
         self.x_std_dev = "4.789"
@@ -243,8 +250,8 @@ class MiniBatchQueryStandardizerTestCase(unittest.TestCase):
         self.module_patcher = patch.dict('sys.modules', patches)
         self.module_patcher.start()
 
-        import minibatch_preprocessing
-        self.module = minibatch_preprocessing
+        import utilities.minibatch_preprocessing
+        self.module = utilities.minibatch_preprocessing
         self.subject = self.module.MiniBatchStandardizer(self.default_schema,
                                                          self.default_source_table,
                                                          self.default_dep_var,
@@ -281,13 +288,13 @@ class MiniBatchBufferSizeCalculatorTestCase(unittest.TestCase):
     def setUp(self):
         patches = {
             'plpy': plpy,
-            'mean_std_dev_calculator': Mock()
+            'utilities.mean_std_dev_calculator': Mock()
         }
         self.a = 'a'
         self.module_patcher = patch.dict('sys.modules', patches)
         self.module_patcher.start()
-        import minibatch_preprocessing
-        self.module = minibatch_preprocessing
+        import utilities.minibatch_preprocessing
+        self.module = utilities.minibatch_preprocessing
         self.subject = self.module.MiniBatchBufferSizeCalculator
 
     def tearDown(self):

http://git-wip-us.apache.org/repos/asf/madlib/blob/4713b24e/src/ports/postgres/modules/utilities/test/unit_tests/test_utilities.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/test/unit_tests/test_utilities.py_in b/src/ports/postgres/modules/utilities/test/unit_tests/test_utilities.py_in
index 7ad6253..407a3c0 100644
--- a/src/ports/postgres/modules/utilities/test/unit_tests/test_utilities.py_in
+++ b/src/ports/postgres/modules/utilities/test/unit_tests/test_utilities.py_in
@@ -70,6 +70,19 @@ class UtilitiesTestCase(unittest.TestCase):
                                                   self.default_dep_var,
                                                   self.default_module, None)
         self.subject.input_tbl_valid.assert_any_call(self.default_source_table,
+                                                     self.default_module)
+        self.subject.output_tbl_valid.assert_any_call(self.default_output_table,
+                                                      self.default_module)
+
+    def test_validate_module_input_params_source_and_output_table_are_tested(self):
+        self.subject.input_tbl_valid = Mock()
+        self.subject.output_tbl_valid = Mock()
+        self.subject.validate_module_input_params(self.default_source_table,
+                                                  self.default_output_table,
+                                                  self.default_ind_var,
+                                                  self.default_dep_var,
+                                                  self.default_module, None)
+        self.subject.input_tbl_valid.assert_any_call(self.default_source_table,
                                                       self.default_module)
         self.subject.output_tbl_valid.assert_any_call(self.default_output_table,
                                                       self.default_module)
@@ -208,13 +221,28 @@ class UtilitiesTestCase(unittest.TestCase):
         self.assertFalse(self.subject.is_psql_char_type('1character'))
 
     def test_is_psql_char_type_excludes_list(self):
-        self.assertTrue(self.subject.is_psql_char_type('text', ['varchar','char']))
-        self.assertFalse(self.subject.is_psql_char_type('text', ['text','char']))
+        self.assertTrue(self.subject.is_psql_char_type('text', ['varchar', 'char']))
+        self.assertFalse(self.subject.is_psql_char_type('text', ['text', 'char']))
         self.assertFalse(self.subject.is_psql_char_type('varchar', 'varchar'))
 
     def test_is_psql_boolean_type(self):
         self.assertTrue(self.subject.is_psql_boolean_type('boolean'))
         self.assertFalse(self.subject.is_psql_boolean_type('not boolean'))
 
+    def test_is_valid_psql_type(self):
+        s = self.subject
+        self.assertTrue(s.is_valid_psql_type('boolean', s.TEXT | s.BOOLEAN))
+        self.assertFalse(s.is_valid_psql_type('boolean', s.TEXT))
+        self.assertTrue(s.is_valid_psql_type('boolean[]', s.BOOLEAN | s.INCLUDE_ARRAY))
+        self.assertTrue(s.is_valid_psql_type('boolean[]', s.BOOLEAN | s.ONLY_ARRAY))
+        self.assertFalse(s.is_valid_psql_type(
+            'boolean', s.BOOLEAN | s.ONLY_ARRAY | s.INCLUDE_ARRAY))
+        self.assertTrue(s.is_valid_psql_type(
+            'boolean[]', s.BOOLEAN | s.ONLY_ARRAY | s.INCLUDE_ARRAY))
+        self.assertFalse(s.is_valid_psql_type('boolean', s.INCLUDE_ARRAY | s.ONLY_ARRAY))
+        self.assertFalse(s.is_valid_psql_type('boolean[]', s.INCLUDE_ARRAY | s.ONLY_ARRAY))
+        self.assertFalse(s.is_valid_psql_type('boolean', s.ONLY_ARRAY))
+        self.assertFalse(s.is_valid_psql_type('boolean[]', s.ONLY_ARRAY))
+
 if __name__ == '__main__':
     unittest.main()