You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ri...@apache.org on 2016/12/16 00:03:23 UTC

[3/4] incubator-madlib git commit: Elastic Net: Add grouping support

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/6138b006/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_igd.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_igd.py_in b/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_igd.py_in
index 7371d5e..187cc83 100644
--- a/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_igd.py_in
+++ b/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_igd.py_in
@@ -2,8 +2,7 @@
 import plpy
 import math
 from utilities.utilities import unique_string
-from utilities.control import IterationController2S
-from elastic_net_utils import IterationControllerNoTableDrop
+from utilities.in_mem_group_control import GroupIterationController
 from elastic_net_utils import __compute_means
 from elastic_net_utils import __normalize_data
 from elastic_net_utils import __compute_data_scales
@@ -16,6 +15,7 @@ from elastic_net_utils import __process_warmup_lambdas
 from elastic_net_generate_result import __elastic_net_generate_result
 from utilities.utilities import __mad_version
 from utilities.utilities import preprocess_keyvalue_params
+from utilities.control import MinWarning
 
 version_wrapper = __mad_version()
 mad_vec = version_wrapper.select_vecfunc()
@@ -142,40 +142,6 @@ def __igd_params_parser(optimizer_params, lambda_value, tolerance, schema_madlib
     return name_value
 ## ========================================================================
 
-
-def __igd_create_tbl_args(**args):
-    """
-    create the temporary schema and argument table used in IGD iterations
-    """
-    (xmean_str0, ymean) = __compute_means(**args)
-
-    plpy.execute("""
-                 drop table if exists {tbl_igd_args};
-                 create temp table {tbl_igd_args} (
-                    {dimension_name}       integer,
-                    {stepsize_name}        double precision,
-                    {lambda_name}          double precision[],
-                    {alpha_name}           double precision,
-                    {total_rows_name}      integer,
-                    {max_iter_name}        integer,
-                    {tolerance_name}       double precision,
-                    {xmean_name}           double precision[],
-                    {ymean_name}           double precision
-                 );
-                 """.format(**args))
-    plpy.execute("""
-                 insert into {tbl_igd_args} values
-                    ({dimension}, {stepsize}, '{warmup_lambdas}'::double precision[],
-                     {alpha},
-                     {row_num}, {max_iter}, {tolerance},
-                     '{xmean_str0}'::double precision[], {ymean})
-                 """.format(xmean_str0=xmean_str0, ymean=ymean,
-                            **args))
-
-    return None
-## ========================================================================
-
-
 def __igd_construct_dict(schema_madlib, family, tbl_source,
                          col_ind_var, col_dep_var,
                          tbl_result, dimension, row_num, lambda_value, alpha,
@@ -189,10 +155,10 @@ def __igd_construct_dict(schema_madlib, family, tbl_source,
                 tbl_source=tbl_source,
                 tbl_data=tbl_source,  # argument name used in normalization
                 col_ind_var=col_ind_var, col_dep_var=col_dep_var,
-                col_ind_var_norm_new=unique_string(),  # for normalization usage
-                col_ind_var_tmp=unique_string(),
-                col_dep_var_norm_new=unique_string(),  # for normalization usage
-                col_dep_var_tmp=unique_string(),
+                col_ind_var_norm_new=unique_string(desp='temp_features_norm'),  # for normalization usage
+                col_ind_var_tmp=unique_string(desp='temp_features'),
+                col_dep_var_norm_new=unique_string(desp='temp_target_norm'),  # for normalization usage
+                col_dep_var_tmp=unique_string(desp='temp_target'),
                 tbl_result=tbl_result,
                 lambda_value=lambda_value, alpha=alpha,
                 dimension=dimension, row_num=row_num,
@@ -209,9 +175,9 @@ def __igd_construct_dict(schema_madlib, family, tbl_source,
     # actually have the same value. This is a price that one has to pay
     # if he wants to save typing argument names by using **args as the
     # function argument.
-    tbl_ind_scales = unique_string()
-    tbl_dep_scale = unique_string()
-    tbl_data_scaled = unique_string()
+    tbl_ind_scales = unique_string(desp='temp_ind_scales')
+    tbl_dep_scale = unique_string(desp='temp_dep_scales')
+    tbl_data_scaled = unique_string(desp='temp_data_scales')
     args.update(tbl_dep_scale=tbl_dep_scale,
                 tbl_ind_scales=tbl_ind_scales,
                 tbl_data_scaled=tbl_data_scaled)
@@ -255,27 +221,30 @@ def __igd_cleanup_temp_tbls(**args):
 def __elastic_net_igd_train(schema_madlib, func_step_aggregate,
                             func_state_diff, family,
                             tbl_source, col_ind_var,
-                            col_dep_var, tbl_result, lambda_value, alpha,
+                            col_dep_var, tbl_result, tbl_summary, lambda_value, alpha,
                             normalization, optimizer_params, max_iter,
-                            tolerance, outstr_array, **kwargs):
-    __elastic_net_validate_args(tbl_source, col_ind_var, col_dep_var, tbl_result,
+                            tolerance, outstr_array, grouping_str,
+                            grouping_col, **kwargs):
+    __elastic_net_validate_args(tbl_source, col_ind_var, col_dep_var, tbl_result, tbl_summary,
                                 lambda_value, alpha, normalization, max_iter, tolerance)
 
     return __elastic_net_igd_train_compute(schema_madlib, func_step_aggregate,
                                            func_state_diff, family,
                                            tbl_source, col_ind_var,
-                                           col_dep_var, tbl_result, lambda_value, alpha,
+                                           col_dep_var, tbl_result, tbl_summary, lambda_value, alpha,
                                            normalization, optimizer_params, max_iter,
-                                           tolerance, outstr_array, **kwargs)
+                                           tolerance, outstr_array, grouping_str,
+                                           grouping_col, **kwargs)
 ## ========================================================================
 
 
 def __elastic_net_igd_train_compute(schema_madlib, func_step_aggregate,
                                     func_state_diff, family,
                                     tbl_source, col_ind_var,
-                                    col_dep_var, tbl_result, lambda_value, alpha,
+                                    col_dep_var, tbl_result, tbl_summary, lambda_value, alpha,
                                     normalization, optimizer_params, max_iter,
-                                    tolerance, outstr_array, **kwargs):
+                                    tolerance, outstr_array, grouping_str,
+                                    grouping_col, **kwargs):
     """
     Fit linear model with elastic net regularization using IGD optimization.
 
@@ -309,18 +278,21 @@ def __elastic_net_igd_train_compute(schema_madlib, func_step_aggregate,
                                 __igd_params_parser(optimizer_params, lambda_value,
                                                     tolerance, schema_madlib))
 
+    args.update({'grouping_col': grouping_col})
     # use normalized data or not
     if normalization:
         __normalize_data(args)
-        args["tbl_used"] = args["tbl_data_scaled"]
+        tbl_used = args["tbl_data_scaled"]
         args["col_ind_var_new"] = args["col_ind_var_norm_new"]
         args["col_dep_var_new"] = args["col_dep_var_norm_new"]
     else:
         __compute_data_scales(args)
-        args["tbl_used"] = tbl_source
+        tbl_used = tbl_source
         args["col_ind_var_new"] = col_ind_var
         args["col_dep_var_new"] = col_dep_var
 
+    args["tbl_used"] = tbl_used
+
     # average squares of each feature
     # used to estimate the largest lambda value
     # also used to screen out tiny values, so order is needed
@@ -329,47 +301,74 @@ def __elastic_net_igd_train_compute(schema_madlib, func_step_aggregate,
 
     if args["warmup_lambdas"] is not None:
         args["warm_no"] = len(args["warmup_lambdas"])
-        args["warmup_lambdas"] = _array_to_string(args["warmup_lambdas"])
+        args["warmup_lambdas"] = args["warmup_lambdas"]
 
     if args["warmup"] and args["warmup_lambdas"] is None:
-        args["warmup_lambdas"] = __generate_warmup_lambda_sequence(args["tbl_used"], args["col_ind_var_new"],
-                                                                   args["col_dep_var_new"],
-                                                                   dimension, row_num, lambda_value, alpha,
-                                                                   args["warmup_lambda_no"], args["sq"])
+        args["warmup_lambdas"] = \
+        __generate_warmup_lambda_sequence(
+           args["tbl_used"], args["col_ind_var_new"], args["col_dep_var_new"],
+           dimension, row_num, lambda_value, alpha,
+           args["warmup_lambda_no"], args["sq"])
         args["warm_no"] = len(args["warmup_lambdas"])
-        args["warmup_lambdas"] = _array_to_string(args["warmup_lambdas"])
+        args["warmup_lambdas"] = args["warmup_lambdas"]
     elif args["warmup"] is False:
         args["warm_no"] = 1
-        args["warmup_lambdas"] = _array_to_string([lambda_value]) # only one value
-
-    # create the temp table that passes parameter values to IGD optimizer
-    __igd_create_tbl_args(**args)
-
+        args["warmup_lambdas"] = [lambda_value]  # only one value
+
+    # parameter values required by the IGD optimizer
+    (xmean, ymean) = __compute_means(**args)
+
+    args.update({
+        'rel_args': args["tbl_igd_args"],
+        'rel_state': args["tbl_igd_state"],
+        'col_grp_iteration': unique_string(desp='col_grp_iteration'),
+        'col_grp_state': unique_string(desp='col_grp_state'),
+        'col_grp_key': unique_string(desp='col_grp_key'),
+        'col_n_tuples': unique_string(desp='col_n_tuples'),
+        'lambda_count': 1,
+        'state_type': "double precision[]",
+        'rel_source': tbl_used,
+        'grouping_str': grouping_str,
+        'xmean_val': xmean,
+        'ymean_val': ymean,
+        'tbl_source': tbl_source,
+        'tbl_summary': tbl_summary
+        })
+    if not args.get('parallel'):
+        func_step_aggregate += "_single_seg"
     # perform the actual calculation
     iteration_run = __compute_igd(schema_madlib,
                                   func_step_aggregate,
                                   func_state_diff,
                                   args["tbl_igd_args"],
-                                  args["tbl_igd_state"], args["tbl_used"],
-                                  args["col_ind_var_new"], args["col_dep_var_new"],
-                                  True,
-                                  max_iter = args["max_iter"],
-                                  tolerance = args["tolerance"],
-                                  warmup_tolerance = args["warmup_tolerance"],
-                                  warm_no = args["warm_no"],
-                                  parallel = args["parallel"],
-                                  step_decay = args["step_decay"],
-                                  col_ind_var_new = args["col_ind_var_new"],
-                                  col_dep_var_new = args["col_dep_var_new"],
-                                  dimension_name = args["dimension_name"],
-                                  stepsize_name = args["stepsize_name"],
-                                  lambda_name = args["lambda_name"],
-                                  alpha_name = args["alpha_name"],
-                                  total_rows_name = args["total_rows_name"],
-                                  max_iter_name = args["max_iter_name"],
-                                  tolerance_name = args["tolerance_name"],
-                                  xmean_name = args["xmean_name"],
-                                  ymean_name = args["ymean_name"])
+                                  args["tbl_igd_state"], 
+                                  tbl_used,
+                                  args["col_ind_var_new"], 
+                                  args["col_dep_var_new"],
+                                  grouping_str,
+                                  grouping_col,
+                                  start_iter=0,
+                                  max_iter= args["max_iter"],
+                                  tolerance= args["tolerance"],
+                                  warmup_tolerance= args["warmup_tolerance"],
+                                  warm_no= args["warm_no"],
+                                  step_decay= args["step_decay"],
+                                  dimension= args["dimension"],
+                                  stepsize= args["stepsize"],
+                                  lambda_name= args["warmup_lambdas"],
+                                  warmup_lambda_value = args.get('warmup_lambdas')[args["lambda_count"]-1],
+                                  alpha= args["alpha"],
+                                  row_num= args["row_num"],
+                                  xmean_val= args["xmean_val"],
+                                  ymean_val= args["ymean_val"],
+                                  lambda_count= args["lambda_count"],
+                                  rel_state= args["tbl_igd_state"],
+                                  col_grp_iteration= args["col_grp_iteration"],
+                                  col_grp_state= args["col_grp_state"],
+                                  col_grp_key= args["col_grp_key"],
+                                  col_n_tuples= args["col_n_tuples"],
+                                  rel_source= args["rel_source"],
+                                  state_type= args["state_type"],)
 
     __elastic_net_generate_result("igd", iteration_run, **args)
 
@@ -382,7 +381,8 @@ def __elastic_net_igd_train_compute(schema_madlib, func_step_aggregate,
 
 def __compute_igd(schema_madlib, func_step_aggregate, func_state_diff,
                   tbl_args, tbl_state, tbl_source,
-                  col_ind_var, col_dep_var, drop_table, **kwargs):
+                  col_ind_var, col_dep_var, grouping_str, grouping_col, 
+                  start_iter, **kwargs):
     """
     Driver function for elastic net with Gaussian response using IGD
 
@@ -394,8 +394,6 @@ def __compute_igd(schema_madlib, func_step_aggregate, func_state_diff,
     @param rel_source Name of the relation containing input points
     @param col_ind_var Name of the independent variables column
     @param col_dep_var Name of the dependent variable column
-    @param drop_table Boolean, whether to use IterationController (True) or
-                      IterationControllerNoTableDrop (False)
     @param kwargs We allow the caller to specify additional arguments (all of
         which will be ignored though). The purpose of this is to allow the
         caller to unpack a dictionary whose element set is a superset of
@@ -404,87 +402,41 @@ def __compute_igd(schema_madlib, func_step_aggregate, func_state_diff,
     @return The iteration number (i.e., the key) with which to look up the
         result in \c tbl_state
     """
-
-    m4_ifdef(`__HAWQ__', `
-    iterationCtrl = IterationController2S(
-        func_step_aggregate=func_step_aggregate,
-        func_state_diff=func_state_diff,
-        rel_args=tbl_args,
-        rel_state=tbl_state,
-        stateType="double precision[]",
-        truncAfterIteration=False,
-        schema_madlib=schema_madlib,  # Identifiers start here
-        rel_source=tbl_source,
-        col_ind_var=col_ind_var,
-        col_dep_var=col_dep_var,
-        lambda_count=1,
-        **kwargs)
-    ', `
-    if drop_table:
-        iterationCtrl = IterationController2S(
-            func_step_aggregate=func_step_aggregate,
-            func_state_diff=func_state_diff,
-            rel_args=tbl_args,
-            rel_state=tbl_state,
-            stateType="double precision[]",
-            truncAfterIteration=False,
-            schema_madlib=schema_madlib,  # Identifiers start here
-            rel_source=tbl_source,
-            col_ind_var=col_ind_var,
-            col_dep_var=col_dep_var,
-            lambda_count=1,
-            **kwargs)
-    else:
-        iterationCtrl = IterationControllerNoTableDrop(
-            rel_args=tbl_args,
-            rel_state=tbl_state,
-            stateType="double precision[]",
-            truncAfterIteration=False,
-            schema_madlib=schema_madlib,  # Identifiers start here
-            rel_source=tbl_source,
-            col_ind_var=col_ind_var,
-            col_dep_var=col_dep_var,
-            lambda_count=1,
-            **kwargs)
-    ')
-
+    args = locals()
+    
+    for k, v in kwargs.iteritems():
+        if k not in args:
+            args.update({k: v})
+    iterationCtrl = GroupIterationController(args)
     with iterationCtrl as it:
-        it.iteration = 0
-
-        if it.kwargs["parallel"]:
-            it.kwargs["parallel_step_func"] = func_step_aggregate
-        else:
-            it.kwargs["parallel_step_func"] = func_step_aggregate + "_single_seg"
-
+        it.iteration = start_iter
         while True:
             # manually add the intercept term
+            if (it.kwargs["lambda_count"] > len(args.get('lambda_name'))):
+                break
+            it.kwargs["warmup_lambda_value"] = args.get('lambda_name')[it.kwargs["lambda_count"]-1]
             it.update("""
-                      select
-                        {schema_madlib}.{parallel_step_func}(
-                            {col_ind_var}::double precision[],
-                            {col_dep_var},
-                            m4_ifdef(`__HAWQ__', `({{__state__}})',
-                            `(select _state from {rel_state}
-                                where _iteration = {iteration})'),
-                            (_args.{lambda_name}[{lambda_count}])::double precision,
-                            (_args.{alpha_name})::double precision,
-                            (_args.{dimension_name})::integer,
-                            (_args.{stepsize_name})::double precision,
-                            (_args.{total_rows_name})::integer,
-                            (_args.{xmean_name})::double precision[],
-                            (_args.{ymean_name})::double precision,
-                            {step_decay}::double precision
-                      )
-                      from {rel_source} as _src, {rel_args} as _args
-                      """)
-
+                    {schema_madlib}.{func_step_aggregate}(
+                        ({col_ind_var})::double precision[],
+                        ({col_dep_var}),
+                        {rel_state}.{col_grp_state},
+                        ({warmup_lambda_value})::double precision,
+                        ({alpha})::double precision,
+                        ({dimension})::integer,
+                        ({stepsize})::double precision,
+                        ({row_num})::integer,
+                        ('{xmean_val}')::double precision[],
+                        ({ymean_val})::double precision,
+                        ({step_decay})::double precision
+                    )
+            """)
             if it.kwargs["lambda_count"] < it.kwargs["warm_no"]:
                 it.kwargs["use_tolerance"] = it.kwargs["warmup_tolerance"]
             else:
                 it.kwargs["use_tolerance"] = it.kwargs["tolerance"]
 
             if it.test("""
-                       {iteration} > _args.{max_iter_name} or
+                       {iteration} > {max_iter} or
                        {schema_madlib}.{func_state_diff}(
                             _state_previous, _state_current) < {use_tolerance}
                        """):
@@ -493,10 +445,11 @@ def __compute_igd(schema_madlib, func_step_aggregate, func_state_diff,
                     it.kwargs["lambda_count"] += 1
                 else:
                     break
+
+        it.final()
         if it.kwargs["lambda_count"] < it.kwargs["warm_no"]:
             plpy.error("""
                        Elastic Net error: The final target lambda value is not
                        reached in warm-up iterations. You need more iterations!
                        """)
-
     return iterationCtrl.iteration

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/6138b006/src/ports/postgres/modules/elastic_net/elastic_net_utils.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/elastic_net/elastic_net_utils.py_in b/src/ports/postgres/modules/elastic_net/elastic_net_utils.py_in
index 144fe27..6b291ec 100644
--- a/src/ports/postgres/modules/elastic_net/elastic_net_utils.py_in
+++ b/src/ports/postgres/modules/elastic_net/elastic_net_utils.py_in
@@ -178,7 +178,7 @@ def __compute_log_likelihood(coef, intercept, **args):
 
 
 def __elastic_net_validate_args(tbl_source, col_ind_var, col_dep_var,
-                                tbl_result, lambda_value, alpha,
+                                tbl_result, tbl_summary, lambda_value, alpha,
                                 normalization, max_iter, tolerance):
     if (tbl_source is None or col_ind_var is None or col_dep_var is None
             or tbl_result is None or lambda_value is None or alpha is None
@@ -189,6 +189,9 @@ def __elastic_net_validate_args(tbl_source, col_ind_var, col_dep_var,
     if table_exists(tbl_result, only_first_schema=True):
         plpy.error("Elastic Net error: Output table " + tbl_result + " already exists!")
 
+    if table_exists(tbl_summary, only_first_schema=True):
+        plpy.error("Elastic Net error: Output summary table " + tbl_summary + " already exists!")
+
     if lambda_value < 0:
         plpy.error("Elastic Net error: The regularization parameter lambda cannot be negative!")
 
@@ -241,7 +244,8 @@ def __normalize_data(args):
                            x_mean_str=args["xmean_str"],
                            x_std_str=_array_to_string(args["x_scales"]["std"]),
                            y_mean=args["y_scale"]["mean"],
-                           y_std=args["y_scale"]["std"])
+                           y_std=args["y_scale"]["std"],
+                           grouping_col=args["grouping_col"])
 
     return None
 # ========================================================================