You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ri...@apache.org on 2016/12/16 00:03:23 UTC
[3/4] incubator-madlib git commit: Elastic Net: Add grouping support
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/6138b006/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_igd.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_igd.py_in b/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_igd.py_in
index 7371d5e..187cc83 100644
--- a/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_igd.py_in
+++ b/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_igd.py_in
@@ -2,8 +2,7 @@
import plpy
import math
from utilities.utilities import unique_string
-from utilities.control import IterationController2S
-from elastic_net_utils import IterationControllerNoTableDrop
+from utilities.in_mem_group_control import GroupIterationController
from elastic_net_utils import __compute_means
from elastic_net_utils import __normalize_data
from elastic_net_utils import __compute_data_scales
@@ -16,6 +15,7 @@ from elastic_net_utils import __process_warmup_lambdas
from elastic_net_generate_result import __elastic_net_generate_result
from utilities.utilities import __mad_version
from utilities.utilities import preprocess_keyvalue_params
+from utilities.control import MinWarning
version_wrapper = __mad_version()
mad_vec = version_wrapper.select_vecfunc()
@@ -142,40 +142,6 @@ def __igd_params_parser(optimizer_params, lambda_value, tolerance, schema_madlib
return name_value
## ========================================================================
-
-def __igd_create_tbl_args(**args):
- """
- create the temporary schema and argument table used in IGD iterations
- """
- (xmean_str0, ymean) = __compute_means(**args)
-
- plpy.execute("""
- drop table if exists {tbl_igd_args};
- create temp table {tbl_igd_args} (
- {dimension_name} integer,
- {stepsize_name} double precision,
- {lambda_name} double precision[],
- {alpha_name} double precision,
- {total_rows_name} integer,
- {max_iter_name} integer,
- {tolerance_name} double precision,
- {xmean_name} double precision[],
- {ymean_name} double precision
- );
- """.format(**args))
- plpy.execute("""
- insert into {tbl_igd_args} values
- ({dimension}, {stepsize}, '{warmup_lambdas}'::double precision[],
- {alpha},
- {row_num}, {max_iter}, {tolerance},
- '{xmean_str0}'::double precision[], {ymean})
- """.format(xmean_str0=xmean_str0, ymean=ymean,
- **args))
-
- return None
-## ========================================================================
-
-
def __igd_construct_dict(schema_madlib, family, tbl_source,
col_ind_var, col_dep_var,
tbl_result, dimension, row_num, lambda_value, alpha,
@@ -189,10 +155,10 @@ def __igd_construct_dict(schema_madlib, family, tbl_source,
tbl_source=tbl_source,
tbl_data=tbl_source, # argument name used in normalization
col_ind_var=col_ind_var, col_dep_var=col_dep_var,
- col_ind_var_norm_new=unique_string(), # for normalization usage
- col_ind_var_tmp=unique_string(),
- col_dep_var_norm_new=unique_string(), # for normalization usage
- col_dep_var_tmp=unique_string(),
+ col_ind_var_norm_new=unique_string(desp='temp_features_norm'), # for normalization usage
+ col_ind_var_tmp=unique_string(desp='temp_features'),
+ col_dep_var_norm_new=unique_string(desp='temp_target_norm'), # for normalization usage
+ col_dep_var_tmp=unique_string(desp='temp_target'),
tbl_result=tbl_result,
lambda_value=lambda_value, alpha=alpha,
dimension=dimension, row_num=row_num,
@@ -209,9 +175,9 @@ def __igd_construct_dict(schema_madlib, family, tbl_source,
# actually have the same value. This is a price that one has to pay
# if he wants to save typing argument names by using **args as the
# function argument.
- tbl_ind_scales = unique_string()
- tbl_dep_scale = unique_string()
- tbl_data_scaled = unique_string()
+ tbl_ind_scales = unique_string(desp='temp_ind_scales')
+ tbl_dep_scale = unique_string(desp='temp_dep_scales')
+ tbl_data_scaled = unique_string(desp='temp_data_scales')
args.update(tbl_dep_scale=tbl_dep_scale,
tbl_ind_scales=tbl_ind_scales,
tbl_data_scaled=tbl_data_scaled)
@@ -255,27 +221,30 @@ def __igd_cleanup_temp_tbls(**args):
def __elastic_net_igd_train(schema_madlib, func_step_aggregate,
func_state_diff, family,
tbl_source, col_ind_var,
- col_dep_var, tbl_result, lambda_value, alpha,
+ col_dep_var, tbl_result, tbl_summary, lambda_value, alpha,
normalization, optimizer_params, max_iter,
- tolerance, outstr_array, **kwargs):
- __elastic_net_validate_args(tbl_source, col_ind_var, col_dep_var, tbl_result,
+ tolerance, outstr_array, grouping_str,
+ grouping_col, **kwargs):
+ __elastic_net_validate_args(tbl_source, col_ind_var, col_dep_var, tbl_result, tbl_summary,
lambda_value, alpha, normalization, max_iter, tolerance)
return __elastic_net_igd_train_compute(schema_madlib, func_step_aggregate,
func_state_diff, family,
tbl_source, col_ind_var,
- col_dep_var, tbl_result, lambda_value, alpha,
+ col_dep_var, tbl_result, tbl_summary, lambda_value, alpha,
normalization, optimizer_params, max_iter,
- tolerance, outstr_array, **kwargs)
+ tolerance, outstr_array, grouping_str,
+ grouping_col, **kwargs)
## ========================================================================
def __elastic_net_igd_train_compute(schema_madlib, func_step_aggregate,
func_state_diff, family,
tbl_source, col_ind_var,
- col_dep_var, tbl_result, lambda_value, alpha,
+ col_dep_var, tbl_result, tbl_summary, lambda_value, alpha,
normalization, optimizer_params, max_iter,
- tolerance, outstr_array, **kwargs):
+ tolerance, outstr_array, grouping_str,
+ grouping_col, **kwargs):
"""
Fit linear model with elastic net regularization using IGD optimization.
@@ -309,18 +278,21 @@ def __elastic_net_igd_train_compute(schema_madlib, func_step_aggregate,
__igd_params_parser(optimizer_params, lambda_value,
tolerance, schema_madlib))
+ args.update({'grouping_col': grouping_col})
# use normalized data or not
if normalization:
__normalize_data(args)
- args["tbl_used"] = args["tbl_data_scaled"]
+ tbl_used = args["tbl_data_scaled"]
args["col_ind_var_new"] = args["col_ind_var_norm_new"]
args["col_dep_var_new"] = args["col_dep_var_norm_new"]
else:
__compute_data_scales(args)
- args["tbl_used"] = tbl_source
+ tbl_used = tbl_source
args["col_ind_var_new"] = col_ind_var
args["col_dep_var_new"] = col_dep_var
+ args["tbl_used"] = tbl_used
+
# average squares of each feature
# used to estimate the largest lambda value
# also used to screen out tiny values, so order is needed
@@ -329,47 +301,74 @@ def __elastic_net_igd_train_compute(schema_madlib, func_step_aggregate,
if args["warmup_lambdas"] is not None:
args["warm_no"] = len(args["warmup_lambdas"])
- args["warmup_lambdas"] = _array_to_string(args["warmup_lambdas"])
+ args["warmup_lambdas"] = args["warmup_lambdas"]
if args["warmup"] and args["warmup_lambdas"] is None:
- args["warmup_lambdas"] = __generate_warmup_lambda_sequence(args["tbl_used"], args["col_ind_var_new"],
- args["col_dep_var_new"],
- dimension, row_num, lambda_value, alpha,
- args["warmup_lambda_no"], args["sq"])
+ args["warmup_lambdas"] = \
+ __generate_warmup_lambda_sequence(
+ args["tbl_used"], args["col_ind_var_new"], args["col_dep_var_new"],
+ dimension, row_num, lambda_value, alpha,
+ args["warmup_lambda_no"], args["sq"])
args["warm_no"] = len(args["warmup_lambdas"])
- args["warmup_lambdas"] = _array_to_string(args["warmup_lambdas"])
+ args["warmup_lambdas"] = args["warmup_lambdas"]
elif args["warmup"] is False:
args["warm_no"] = 1
- args["warmup_lambdas"] = _array_to_string([lambda_value]) # only one value
-
- # create the temp table that passes parameter values to IGD optimizer
- __igd_create_tbl_args(**args)
-
+ args["warmup_lambdas"] = [lambda_value] # only one value
+
+ # parameter values required by the IGD optimizer
+ (xmean, ymean) = __compute_means(**args)
+
+ args.update({
+ 'rel_args': args["tbl_igd_args"],
+ 'rel_state': args["tbl_igd_state"],
+ 'col_grp_iteration': unique_string(desp='col_grp_iteration'),
+ 'col_grp_state': unique_string(desp='col_grp_state'),
+ 'col_grp_key': unique_string(desp='col_grp_key'),
+ 'col_n_tuples': unique_string(desp='col_n_tuples'),
+ 'lambda_count': 1,
+ 'state_type': "double precision[]",
+ 'rel_source': tbl_used,
+ 'grouping_str': grouping_str,
+ 'xmean_val': xmean,
+ 'ymean_val': ymean,
+ 'tbl_source': tbl_source,
+ 'tbl_summary': tbl_summary
+ })
+ if not args.get('parallel'):
+ func_step_aggregate += "_single_seg"
# perform the actual calculation
iteration_run = __compute_igd(schema_madlib,
func_step_aggregate,
func_state_diff,
args["tbl_igd_args"],
- args["tbl_igd_state"], args["tbl_used"],
- args["col_ind_var_new"], args["col_dep_var_new"],
- True,
- max_iter = args["max_iter"],
- tolerance = args["tolerance"],
- warmup_tolerance = args["warmup_tolerance"],
- warm_no = args["warm_no"],
- parallel = args["parallel"],
- step_decay = args["step_decay"],
- col_ind_var_new = args["col_ind_var_new"],
- col_dep_var_new = args["col_dep_var_new"],
- dimension_name = args["dimension_name"],
- stepsize_name = args["stepsize_name"],
- lambda_name = args["lambda_name"],
- alpha_name = args["alpha_name"],
- total_rows_name = args["total_rows_name"],
- max_iter_name = args["max_iter_name"],
- tolerance_name = args["tolerance_name"],
- xmean_name = args["xmean_name"],
- ymean_name = args["ymean_name"])
+ args["tbl_igd_state"],
+ tbl_used,
+ args["col_ind_var_new"],
+ args["col_dep_var_new"],
+ grouping_str,
+ grouping_col,
+ start_iter=0,
+ max_iter= args["max_iter"],
+ tolerance= args["tolerance"],
+ warmup_tolerance= args["warmup_tolerance"],
+ warm_no= args["warm_no"],
+ step_decay= args["step_decay"],
+ dimension= args["dimension"],
+ stepsize= args["stepsize"],
+ lambda_name= args["warmup_lambdas"],
+ warmup_lambda_value = args.get('warmup_lambdas')[args["lambda_count"]-1],
+ alpha= args["alpha"],
+ row_num= args["row_num"],
+ xmean_val= args["xmean_val"],
+ ymean_val= args["ymean_val"],
+ lambda_count= args["lambda_count"],
+ rel_state= args["tbl_igd_state"],
+ col_grp_iteration= args["col_grp_iteration"],
+ col_grp_state= args["col_grp_state"],
+ col_grp_key= args["col_grp_key"],
+ col_n_tuples= args["col_n_tuples"],
+ rel_source= args["rel_source"],
+ state_type= args["state_type"],)
__elastic_net_generate_result("igd", iteration_run, **args)
@@ -382,7 +381,8 @@ def __elastic_net_igd_train_compute(schema_madlib, func_step_aggregate,
def __compute_igd(schema_madlib, func_step_aggregate, func_state_diff,
tbl_args, tbl_state, tbl_source,
- col_ind_var, col_dep_var, drop_table, **kwargs):
+ col_ind_var, col_dep_var, grouping_str, grouping_col,
+ start_iter, **kwargs):
"""
Driver function for elastic net with Gaussian response using IGD
@@ -394,8 +394,6 @@ def __compute_igd(schema_madlib, func_step_aggregate, func_state_diff,
@param rel_source Name of the relation containing input points
@param col_ind_var Name of the independent variables column
@param col_dep_var Name of the dependent variable column
- @param drop_table Boolean, whether to use IterationController (True) or
- IterationControllerNoTableDrop (False)
@param kwargs We allow the caller to specify additional arguments (all of
which will be ignored though). The purpose of this is to allow the
caller to unpack a dictionary whose element set is a superset of
@@ -404,87 +402,41 @@ def __compute_igd(schema_madlib, func_step_aggregate, func_state_diff,
@return The iteration number (i.e., the key) with which to look up the
result in \c tbl_state
"""
-
- m4_ifdef(`__HAWQ__', `
- iterationCtrl = IterationController2S(
- func_step_aggregate=func_step_aggregate,
- func_state_diff=func_state_diff,
- rel_args=tbl_args,
- rel_state=tbl_state,
- stateType="double precision[]",
- truncAfterIteration=False,
- schema_madlib=schema_madlib, # Identifiers start here
- rel_source=tbl_source,
- col_ind_var=col_ind_var,
- col_dep_var=col_dep_var,
- lambda_count=1,
- **kwargs)
- ', `
- if drop_table:
- iterationCtrl = IterationController2S(
- func_step_aggregate=func_step_aggregate,
- func_state_diff=func_state_diff,
- rel_args=tbl_args,
- rel_state=tbl_state,
- stateType="double precision[]",
- truncAfterIteration=False,
- schema_madlib=schema_madlib, # Identifiers start here
- rel_source=tbl_source,
- col_ind_var=col_ind_var,
- col_dep_var=col_dep_var,
- lambda_count=1,
- **kwargs)
- else:
- iterationCtrl = IterationControllerNoTableDrop(
- rel_args=tbl_args,
- rel_state=tbl_state,
- stateType="double precision[]",
- truncAfterIteration=False,
- schema_madlib=schema_madlib, # Identifiers start here
- rel_source=tbl_source,
- col_ind_var=col_ind_var,
- col_dep_var=col_dep_var,
- lambda_count=1,
- **kwargs)
- ')
-
+ args = locals()
+
+ for k, v in kwargs.iteritems():
+ if k not in args:
+ args.update({k: v})
+ iterationCtrl = GroupIterationController(args)
with iterationCtrl as it:
- it.iteration = 0
-
- if it.kwargs["parallel"]:
- it.kwargs["parallel_step_func"] = func_step_aggregate
- else:
- it.kwargs["parallel_step_func"] = func_step_aggregate + "_single_seg"
-
+ it.iteration = start_iter
while True:
# manually add the intercept term
+ if (it.kwargs["lambda_count"] > len(args.get('lambda_name'))):
+ break
+ it.kwargs["warmup_lambda_value"] = args.get('lambda_name')[it.kwargs["lambda_count"]-1]
it.update("""
- select
- {schema_madlib}.{parallel_step_func}(
- {col_ind_var}::double precision[],
- {col_dep_var},
- m4_ifdef(`__HAWQ__', `({{__state__}})',
- `(select _state from {rel_state}
- where _iteration = {iteration})'),
- (_args.{lambda_name}[{lambda_count}])::double precision,
- (_args.{alpha_name})::double precision,
- (_args.{dimension_name})::integer,
- (_args.{stepsize_name})::double precision,
- (_args.{total_rows_name})::integer,
- (_args.{xmean_name})::double precision[],
- (_args.{ymean_name})::double precision,
- {step_decay}::double precision
- )
- from {rel_source} as _src, {rel_args} as _args
- """)
-
+ {schema_madlib}.{func_step_aggregate}(
+ ({col_ind_var})::double precision[],
+ ({col_dep_var}),
+ {rel_state}.{col_grp_state},
+ ({warmup_lambda_value})::double precision,
+ ({alpha})::double precision,
+ ({dimension})::integer,
+ ({stepsize})::double precision,
+ ({row_num})::integer,
+ ('{xmean_val}')::double precision[],
+ ({ymean_val})::double precision,
+ ({step_decay})::double precision
+ )
+ """)
if it.kwargs["lambda_count"] < it.kwargs["warm_no"]:
it.kwargs["use_tolerance"] = it.kwargs["warmup_tolerance"]
else:
it.kwargs["use_tolerance"] = it.kwargs["tolerance"]
if it.test("""
- {iteration} > _args.{max_iter_name} or
+ {iteration} > {max_iter} or
{schema_madlib}.{func_state_diff}(
_state_previous, _state_current) < {use_tolerance}
"""):
@@ -493,10 +445,11 @@ def __compute_igd(schema_madlib, func_step_aggregate, func_state_diff,
it.kwargs["lambda_count"] += 1
else:
break
+
+ it.final()
if it.kwargs["lambda_count"] < it.kwargs["warm_no"]:
plpy.error("""
Elastic Net error: The final target lambda value is not
reached in warm-up iterations. You need more iterations!
""")
-
return iterationCtrl.iteration
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/6138b006/src/ports/postgres/modules/elastic_net/elastic_net_utils.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/elastic_net/elastic_net_utils.py_in b/src/ports/postgres/modules/elastic_net/elastic_net_utils.py_in
index 144fe27..6b291ec 100644
--- a/src/ports/postgres/modules/elastic_net/elastic_net_utils.py_in
+++ b/src/ports/postgres/modules/elastic_net/elastic_net_utils.py_in
@@ -178,7 +178,7 @@ def __compute_log_likelihood(coef, intercept, **args):
def __elastic_net_validate_args(tbl_source, col_ind_var, col_dep_var,
- tbl_result, lambda_value, alpha,
+ tbl_result, tbl_summary, lambda_value, alpha,
normalization, max_iter, tolerance):
if (tbl_source is None or col_ind_var is None or col_dep_var is None
or tbl_result is None or lambda_value is None or alpha is None
@@ -189,6 +189,9 @@ def __elastic_net_validate_args(tbl_source, col_ind_var, col_dep_var,
if table_exists(tbl_result, only_first_schema=True):
plpy.error("Elastic Net error: Output table " + tbl_result + " already exists!")
+ if table_exists(tbl_summary, only_first_schema=True):
+ plpy.error("Elastic Net error: Output summary table " + tbl_summary + " already exists!")
+
if lambda_value < 0:
plpy.error("Elastic Net error: The regularization parameter lambda cannot be negative!")
@@ -241,7 +244,8 @@ def __normalize_data(args):
x_mean_str=args["xmean_str"],
x_std_str=_array_to_string(args["x_scales"]["std"]),
y_mean=args["y_scale"]["mean"],
- y_std=args["y_scale"]["std"])
+ y_std=args["y_scale"]["std"],
+ grouping_col=args["grouping_col"])
return None
# ========================================================================