You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by nj...@apache.org on 2017/01/19 19:53:33 UTC
[6/6] incubator-madlib git commit: Bug fixes and minor changes
Bug fixes and minor changes
* Validate parameters earlier than what was done.
* There was an issue with pca_project on greenplum, due to
the usage of row_number() multiple times (while trying to
create a mapping table between original row_id and the
serially increasing row id introduced in the code). Changes
made to use row_number only once now.
Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/edb69dd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/edb69dd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/edb69dd4
Branch: refs/heads/master
Commit: edb69dd4126125f0a6ecc7eaf4149022da157442
Parents: 8bd46ae
Author: Nandish Jayaram <nj...@users.noreply.github.com>
Authored: Thu Jan 5 09:48:02 2017 -0800
Committer: Nandish Jayaram <nj...@users.noreply.github.com>
Committed: Wed Jan 18 14:05:09 2017 -0800
----------------------------------------------------------------------
.../postgres/modules/linalg/matrix_ops.py_in | 9 +-
src/ports/postgres/modules/pca/pca.py_in | 154 +++++------
src/ports/postgres/modules/pca/pca.sql_in | 1 +
.../postgres/modules/pca/pca_project.py_in | 257 +++++++++----------
.../postgres/modules/pca/pca_project.sql_in | 1 +
src/ports/postgres/modules/pca/test/pca.sql_in | 42 ++-
6 files changed, 219 insertions(+), 245 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/edb69dd4/src/ports/postgres/modules/linalg/matrix_ops.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/linalg/matrix_ops.py_in b/src/ports/postgres/modules/linalg/matrix_ops.py_in
index 51ae7e3..9f3215c 100644
--- a/src/ports/postgres/modules/linalg/matrix_ops.py_in
+++ b/src/ports/postgres/modules/linalg/matrix_ops.py_in
@@ -86,7 +86,8 @@ def _matrix_column_to_array_format(source_table, row_id, output_table,
def create_temp_sparse_matrix_table_with_dims(source_table,
out_table,
row_id, col_id, value,
- row_dim, col_dim):
+ row_dim, col_dim,
+ sparse_where_condition=None):
"""
Make a copy of the input sparse table and add (row_dim, col_dim, NULL) to it
@@ -102,6 +103,8 @@ def create_temp_sparse_matrix_table_with_dims(source_table,
Returns:
None
"""
+ if not sparse_where_condition:
+ sparse_where_condition = ''
plpy.execute("""
CREATE TABLE {out_table} as
SELECT
@@ -110,11 +113,13 @@ def create_temp_sparse_matrix_table_with_dims(source_table,
{value}
FROM {source_table}
WHERE {value} is not NULL
+ {sparse_where_condition}
""".format(row_id=row_id,
col_id=col_id,
value=value,
source_table=source_table,
- out_table=out_table))
+ out_table=out_table,
+ sparse_where_condition=sparse_where_condition))
res_row_dim, res_col_dim = get_dims(out_table, {'row': row_id,
'col': col_id,
'val': value})
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/edb69dd4/src/ports/postgres/modules/pca/pca.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/pca/pca.py_in b/src/ports/postgres/modules/pca/pca.py_in
index 9a13b2f..71146d5 100644
--- a/src/ports/postgres/modules/pca/pca.py_in
+++ b/src/ports/postgres/modules/pca/pca.py_in
@@ -108,7 +108,15 @@ def pca_wrap(schema_madlib, source_table, pc_table, row_id,
""")[0]['setting']
plpy.execute('SET client_min_messages TO warning')
grouping_cols_list = []
- _validate_args_output_table(pc_table, result_summary_table)
+ if is_sparse:
+ _validate_args(schema_madlib, source_table, pc_table, k, row_id, col_id,
+ val_id, row_dim, col_dim, lanczos_iter,
+ use_correlation, result_summary_table, variance)
+ else:
+ _validate_args(schema_madlib, source_table, pc_table, k,
+ row_id, None, None, None, None,
+ lanczos_iter, use_correlation,
+ result_summary_table,variance)
if(grouping_cols):
# validate the grouping columns. We currently only support grouping_cols
# to be column names in the source_table, and not expressions!
@@ -162,21 +170,26 @@ def pca_wrap(schema_madlib, source_table, pc_table, row_id,
# declare variables whose values will be different for each group, if
# grouping_cols is specified
grouping_where_clause = ''
+ sparse_where_condition = ''
select_grouping_cols = ''
temp_table_columns = ''
result_summary_table_temp = ''
- other_columns_in_table.remove(row_id)
- temp_table_columns = """ ROW_NUMBER() OVER({partition_over}) AS row_id,
- """.format(partition_over='' if not grouping_cols else 'PARTITION BY {0}'.format(grouping_cols)) +\
- ','.join(other_columns_in_table)
+ # For Dense matrix format only:
# We can now ignore the original row_id for all computations since we will
- # create a new table with a row_id column that has perfect serially increasing
- # row_id value. This is to support the scenario where users are not forced
- # to have a row_id that follows a particular format. This restriction of having to
- # provide a serially increasing row_id value starting from 1 becomes a pain
- # point when grouping is used, since the row_id for each group will then have
- # to start from 1.
- row_id = 'row_id'
+ # create a new table with a row_id column that has not duplicates and ranges
+ # from 1 to number of rows in the group/table. This is to mainly support the
+ # grouping scneario where the row_id values might not range between 1 and
+ # number of rows in the group, for each group. Doing this also just extends
+ # this behavior for non-grouping scenarios too. If creating a new temp table
+ # that corrects the row_id column is not of much importance in non-grouping
+ # cases, we can avoid creating the temp table and save some computation time.
+ # But, at the moment, the code creates the temp table even for the non-grouping
+ # scenario.
+ # We don't need to do this for sparse representation because of the nature
+ # of its definition.
+ other_columns_in_table.remove(row_id)
+ temp_table_columns = """ ROW_NUMBER() OVER() AS row_id, """ + ','.join(other_columns_in_table)
+
pca_union_call_list = []
grp_id = 0
if not is_sparse:
@@ -189,16 +202,18 @@ def pca_wrap(schema_madlib, source_table, pc_table, row_id,
result_summary_table_temp = result_summary_table + unique_string() + "_" + str(grp_id)
if grouping_cols:
grp_value_dict = distinct_grouping_values[grp_id]
- grouping_where_clause = ' WHERE ' + ' AND '.join([str(key)+"="+str(value) for (key, value) in grp_value_dict.items()])
+ where_conditions = ' AND '.join([str(key)+"="+str(value) for (key, value) in grp_value_dict.items()])
+ sparse_where_condition = ' AND ' + where_conditions
+ grouping_where_clause = ' WHERE ' + where_conditions
select_grouping_cols = ', ' + ', '.join([str(value)+" AS "+key for (key, value) in grp_value_dict.items()])
pca_union_call_list.append("""
{schema_madlib}._pca_union('{source_table}', '{pc_table}', '{pc_table_mean}', '{row_id}',
{k}, '{grouping_cols}', {lanczos_iter}, {use_correlation},
'{result_summary_table}', '{result_summary_table_temp}', {variance},
- {grp_id}, '{grouping_where_clause}', '{select_grouping_cols}',
- '{temp_table_columns}', {is_sparse}, '{col_id}', '{val_id}',
- {row_dim}, {col_dim})
+ {grp_id}, '{grouping_where_clause}', '{sparse_where_condition}',
+ '{select_grouping_cols}', '{temp_table_columns}', {is_sparse},
+ '{col_id}', '{val_id}', {row_dim}, {col_dim})
""".format(schema_madlib=schema_madlib,
source_table=source_table, pc_table=pc_table,
pc_table_mean=pc_table_mean, row_id=row_id,
@@ -208,6 +223,7 @@ def pca_wrap(schema_madlib, source_table, pc_table, row_id,
result_summary_table_temp=result_summary_table_temp,
variance='NULL' if variance==None else variance,
grp_id=grp_id, grouping_where_clause=grouping_where_clause,
+ sparse_where_condition=sparse_where_condition,
select_grouping_cols=select_grouping_cols,
temp_table_columns=temp_table_columns, is_sparse=is_sparse,
col_id=col_id, val_id=val_id, row_dim=row_dim, col_dim=col_dim))
@@ -217,20 +233,7 @@ def pca_wrap(schema_madlib, source_table, pc_table, row_id,
# "SELECT <query_1>, <query_2>, <query_3>, ..." is expected to run each
# <query_i> in parallel.
pca_union_call = 'SELECT ' + ', '.join(pca_union_call_list)
- try:
- plpy.execute(pca_union_call)
- except Exception as e:
- ## drop the output tables that were created if PCA errored out.
- plpy.execute("""
- DROP TABLE IF EXISTS {0};
- DROP TABLE IF EXISTS {1};
- """.format(pc_table, pc_table_mean))
- if result_summary_table:
- plpy.execute("""
- DROP TABLE IF EXISTS {0};
- """.format(result_summary_table))
- plpy.error(str(e) + "\n" + str(e.args) + "\n" + str(e.strerror))
- raise
+ plpy.execute(pca_union_call)
plpy.execute("SET client_min_messages TO %s" % old_msg_level)
@@ -238,9 +241,9 @@ def pca_wrap(schema_madlib, source_table, pc_table, row_id,
def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
row_id, k, grouping_cols, lanczos_iter, use_correlation,
result_summary_table, result_summary_table_temp, variance,
- grp_id, grouping_where_clause, select_grouping_cols,
- temp_table_columns, is_sparse, col_id, val_id, row_dim,
- col_dim, **kwargs):
+ grp_id, grouping_where_clause, sparse_where_condition,
+ select_grouping_cols, temp_table_columns, is_sparse, col_id,
+ val_id, row_dim, col_dim, **kwargs):
"""
This function does all the heavy lifting of PCA, for both pca and pca_sparse.
Compute the PCA of the matrix in source_table. This function is the specific
@@ -272,36 +275,22 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
Returns:
None
"""
- ## Creation of this temp table is unnecessary if the scenario does not involve
- ## grouping, and/or, the input table had perfect values for the row_id column.
- ## This temp table will ensure pca works even when row_id of the source_table
- ## does not have serially increasing numbers starting from 1;
- source_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id)
- plpy.execute("""
- CREATE TEMP TABLE {source_table_grouped} AS
- SELECT {temp_table_columns}
- FROM {source_table}
- {grouping_where_clause}
- """.format(source_table_grouped=source_table_grouped,
- source_table=source_table, grouping_where_clause=grouping_where_clause,
- temp_table_columns=temp_table_columns))
startTime = time.time() # measure the starting time
- # Step 1: Validate the input arguments
+ # Step 1: Modify data format for sparse input
if is_sparse:
- _validate_args(schema_madlib, source_table_grouped, k, row_id, col_id,
- val_id, row_dim, col_dim, lanczos_iter,
- use_correlation, variance)
# Step 1.1: Densify the matrix for sparse input tables
# We densify the matrix because the recentering process will generate a
# dense matrix, so we just wrap around regular PCA.
# First we must copy the sparse matrix and add in the dimension information
sparse_temp = "pg_temp." + unique_string() + "_sparse"
-
# Add in the dimension information needed by the densifying process
- create_temp_sparse_matrix_table_with_dims(source_table_grouped, sparse_temp,
+ create_temp_sparse_matrix_table_with_dims(source_table, sparse_temp,
row_id, col_id, val_id,
- row_dim, col_dim)
-
+ row_dim, col_dim, sparse_where_condition)
+ validate_sparse(sparse_temp,
+ {'row': row_id, 'col': col_id, 'val': val_id},
+ check_col=False)
+ # Step 1.2: Densify the input matrix
x_dense = "pg_temp." + unique_string() + "_dense"
plpy.execute("""
SELECT {schema_madlib}.matrix_densify(
@@ -315,12 +304,21 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
DROP TABLE IF EXISTS {0};
""".format(sparse_temp))
source_table_grouped = x_dense
- row_id = 'row_id'
else:
- _validate_args(schema_madlib, source_table_grouped, k,
- row_id, None, None, None, None,
- lanczos_iter, use_correlation, variance)
-
+ # Creation of this temp table is unnecessary if the scenario does not involve
+ # grouping, and/or, the input table had perfect values for the row_id column.
+ # This temp table will ensure pca works even when the value of row_id column
+ # in dense matrix format does not have values ranging from 1 to number of rows.
+ source_table_grouped = unique_string() + "group_" + str(grp_id)
+ plpy.execute("""
+ CREATE TABLE {source_table_grouped} AS
+ SELECT {temp_table_columns}
+ FROM {source_table}
+ {grouping_where_clause}
+ """.format(source_table_grouped=source_table_grouped,
+ source_table=source_table, grouping_where_clause=grouping_where_clause,
+ temp_table_columns=temp_table_columns))
+ row_id = 'row_id'
# Make sure that the table has row_id and row_vec
source_table_copy = "pg_temp." + unique_string() + "_reformated_names"
created_new_table = cast_dense_input_table_to_correct_columns(
@@ -328,7 +326,6 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
if(created_new_table):
source_table_grouped = source_table_copy
-
[row_dim, col_dim] = get_dims(source_table_grouped,
{'row': 'row_id', 'col': 'col_id',
'val': 'row_vec'})
@@ -351,10 +348,8 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
else:
if variance: #lanczos_iter overrides the proportion default for k
curK = lanczos_iter
-
# Note: we currently don't support grouping columns or correlation matrices
if not use_correlation:
-
# Step 2: Normalize the data (Column means)
dimension = col_dim
scaled_source_table = "pg_temp." + unique_string() + "_scaled_table"
@@ -371,7 +366,6 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
result_summary_table_string = ''
else:
result_summary_table_string = ", '{0}'".format(result_summary_table_temp)
-
# Step 4: Perform SVD
# Step 4.1: Perform upper part of SVD
if result_summary_table_temp:
@@ -392,7 +386,6 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
)
FROM {scaled_source_table}
""".format(**locals()))[0]['array_sum']
-
# Step 4.2: Adjust the k value
if variance:
variance_tmp_table = "pg_temp."+ unique_string()+ "_var_tmp"
@@ -416,7 +409,6 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
plpy.execute("""
DROP TABLE IF EXISTS {variance_tmp_table}
""".format(variance_tmp_table=variance_tmp_table))
-
# Step 4.3: Perform the lower part of SVD
tmp_matrix_table = "temp_"+ unique_string()+ "_matrix"
tmp_matrix_s_table = add_postfix(tmp_matrix_table, "_s")
@@ -444,7 +436,6 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
tmp_matrix_table = svd_output_temp_table
_svd_lower_wrap(schema_madlib, source_table_svd,
svd_output_temp_table, row_id, curK, lanczos_iter, bd_pref)
-
# Step 4.4: Create the SVD result table
if result_summary_table_temp:
t1 = time.time()
@@ -559,6 +550,7 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
# ------------------------------------------------------------------------
def _validate_args(schema_madlib,
source_table,
+ pc_table,
k,
row_id,
col_id=None,
@@ -567,12 +559,14 @@ def _validate_args(schema_madlib,
col_dim=None,
lanczos_iter=0,
use_correlation=False,
+ result_summary_table=None,
variance=None):
"""
Validates all arguments passed to the PCA function
Args:
@param schema_madlib Name of the schema where MADlib is installed
@param source_table Name of the source table
+ @param output_table Name of the output table
@param k Number of singular vectors to return
@param row_id Name of the row_id column
@param col_id Name of the col_id column
@@ -580,6 +574,7 @@ def _validate_args(schema_madlib,
@param grouping_cols The columns that the data should be grouped by
@param lanczos_iter The number of lanczos iterations to use in the SVD calculation
@param use_correlation If the correlation matrix should be used instead of the covariance matrix
+ @param result_summary_table Name of summary table
@param variance Proportion of variance
Returns:
@@ -603,6 +598,14 @@ def _validate_args(schema_madlib,
if (variance <= 0) or (variance >1):
plpy.error("""PCA error: components_param must be either
a positive integer or a float in the range (0.0,1.0]!""")
+ # confirm output tables are valid
+ if pc_table:
+ _assert(not table_exists(pc_table, only_first_schema=True) and
+ not table_exists(pc_table + '_mean', only_first_schema=True),
+ "PCA error: Output table {pc_table}/{pc_table}_mean "
+ "already exist!".format(pc_table=pc_table))
+ else:
+ plpy.error("PCA error: Invalid output table prefix!")
_assert(columns_exist_in_table(source_table, [row_id], schema_madlib),
"PCA error: {1} column does not exist in {0}!".
@@ -640,33 +643,18 @@ def _validate_args(schema_madlib,
if col_dim <= 0:
plpy.error("PCA error: The column dimension must be larger than 0!")
- validate_sparse(source_table,
- {'row': row_id, 'col': col_id, 'val': val_id},
- check_col=False)
if use_correlation:
plpy.error("PCA error: Using the correlation matrix is not enabled! \
This value must be set to FALSE")
-# ========================================================================
-def _validate_args_output_table(pc_table, result_summary_table=None):
- """
- confirm output tables are valid
- @param pc_table Name of the output table
- @param result_summary_table Name of summary table
- """
- if pc_table:
- _assert(not table_exists(pc_table, only_first_schema=True) and
- not table_exists(pc_table + '_mean', only_first_schema=True),
- "PCA error: Output table {pc_table}/{pc_table}_mean "
- "already exist!".format(pc_table=pc_table))
- else:
- plpy.error("PCA error: Invalid output table prefix!")
if result_summary_table:
if not result_summary_table.strip():
plpy.error("PCA error: Invalid result summary table name!")
_assert(not table_exists(result_summary_table, only_first_schema=True),
"PCA error: Result summary table {0} \
already exists!".format(result_summary_table))
+# ========================================================================
+
def _recenter_data(schema_madlib, source_table, output_table, row_id,
col_name, dimension):
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/edb69dd4/src/ports/postgres/modules/pca/pca.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/pca/pca.sql_in b/src/ports/postgres/modules/pca/pca.sql_in
index 3edf304..9de5559 100644
--- a/src/ports/postgres/modules/pca/pca.sql_in
+++ b/src/ports/postgres/modules/pca/pca.sql_in
@@ -1022,6 +1022,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA._pca_union(
variance DOUBLE PRECISION, -- The proportion of variance (Default: NULL)
grp_id INTEGER, -- a place holder id for each group
grouping_where_clause TEXT, -- WHERE clause using grouping_cols
+ sparse_where_condition TEXT, -- WHERE clause used when creating temp sparse matrix table with dims
select_grouping_cols TEXT, -- SELECT clause using grouping_cols
temp_table_columns TEXT, -- SELECT caluse for creating temporary copy of the source_table
is_sparse BOOLEAN, -- specifies if the PCA call is for sparse or dense matrices
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/edb69dd4/src/ports/postgres/modules/pca/pca_project.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/pca/pca_project.py_in b/src/ports/postgres/modules/pca/pca_project.py_in
index 1e61d3c..52d14ec 100644
--- a/src/ports/postgres/modules/pca/pca_project.py_in
+++ b/src/ports/postgres/modules/pca/pca_project.py_in
@@ -331,54 +331,17 @@ SELECT {schema_madlib}.pca_sparse_project('usage');
--
""".format(schema_madlib=schema_madlib)
-
-def _validate_args_output_table(out_table, residual_table=None,
- result_summary_table=None):
- """
- Validates the output table(s) arguments to the PCA project function
-
- Args:
- @param out_table Name of output table to store projection result
- @param residual_table Name of the residual table (to store error in projection)
- @param result_summary_table Name of result summary table
- Returns:
- None
- Throws:
- plpy.error if any argument is invalid
- """
- # Make sure that the output table does not exist
- # Also check that the output table is not null
- _assert(out_table and out_table.strip(),
- "PCA error: Invalid output table name.")
- _assert(not table_exists(out_table, only_first_schema=True),
- "PCA error: Output table {0} already exists!".format(str(out_table)))
-
- # Check that the result summary table is not empty
- if result_summary_table is not None:
- _assert(result_summary_table.strip(),
- "PCA error: Invalid result summary table name!")
- _assert(not table_exists(result_summary_table, only_first_schema=True),
- "PCA error: Result summary table {0} already exists!".
- format(result_summary_table))
-
- # Check that the result summary table is not empty
- if residual_table is not None:
- _assert(residual_table.strip(),
- "PCA error: Invalid residual table name!")
- _assert(not table_exists(residual_table, only_first_schema=True),
- "PCA error: Residual table {0} already exists!".
- format(residual_table))
-
-# Validate arguments: Same as pca
-# ------------------------------------------------------------------------
def _validate_args(schema_madlib,
source_table,
pc_table,
+ out_table,
row_id,
col_id=None,
val_id=None,
row_dim=None,
- col_dim=None):
+ col_dim=None,
+ residual_table=None,
+ result_summary_table=None):
"""
Validates all arguments passed to the PCA function
@@ -408,6 +371,29 @@ def _validate_args(schema_madlib,
_assert(table_exists(add_postfix(pc_table, "_mean")),
"PCA error: Source data table column means does not exist!")
+ # Make sure that the output table does not exist
+ # Also check that the output table is not null
+ _assert(out_table and out_table.strip(),
+ "PCA error: Invalid output table name.")
+ _assert(not table_exists(out_table, only_first_schema=True),
+ "PCA error: Output table {0} already exists!".format(str(out_table)))
+
+ # Check that the result summary table is not empty
+ if result_summary_table is not None:
+ _assert(result_summary_table.strip(),
+ "PCA error: Invalid result summary table name!")
+ _assert(not table_exists(result_summary_table, only_first_schema=True),
+ "PCA error: Result summary table {0} already exists!".
+ format(result_summary_table))
+
+ # Check that the result summary table is not empty
+ if residual_table is not None:
+ _assert(residual_table.strip(),
+ "PCA error: Invalid residual table name!")
+ _assert(not table_exists(residual_table, only_first_schema=True),
+ "PCA error: Residual table {0} already exists!".
+ format(residual_table))
+
# Check that the row_id exists
_assert(columns_exist_in_table(source_table, [row_id], schema_madlib),
"PCA error: {1} column does not exist in {0}!".
@@ -428,11 +414,9 @@ def _validate_args(schema_madlib,
_assert(row_dim > 0 and col_dim > 0,
"PCA error: row_dim/col_dim should be positive integer")
- validate_sparse(source_table,
- {'row': row_id, 'col': col_id, 'val': val_id},
- check_col=False)
# ------------------------------------------------------------------------
+
def pca_sparse_project(schema_madlib,
source_table,
pc_table,
@@ -523,6 +507,14 @@ def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
WHERE name='client_min_messages'
""")[0]['setting']
plpy.execute('SET client_min_messages TO warning')
+ if is_sparse:
+ _validate_args(schema_madlib, source_table, pc_table, out_table,
+ row_id, col_id, val_id, row_dim, col_dim, residual_table,
+ result_summary_table)
+ else:
+ _validate_args(schema_madlib, source_table, pc_table, out_table,
+ row_id, None, None, None, None,
+ residual_table, result_summary_table)
# If we add new columns to the pca_train output table in the future, they should
# be included in this list:
pc_table_model_cols = ['row_id', 'principal_components', 'std_dev', 'proportion']
@@ -530,7 +522,6 @@ def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
grouping_cols = ''
if grouping_cols_list:
grouping_cols = ', '.join(grouping_cols_list)
- _validate_args_output_table(out_table, residual_table, result_summary_table)
other_columns_in_table = [col for col in get_cols(source_table) if col not in grouping_cols_list]
grouping_cols_clause = ''
@@ -547,7 +538,7 @@ def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
cols_names_types = get_cols_and_types(source_table)
grouping_cols_clause = ', ' + ', '.join([c_name+" "+c_type
for (c_name, c_type) in cols_names_types if c_name in grouping_cols_list])
- ## Create all output tables
+ # Create all output tables
plpy.execute("""
DROP TABLE IF EXISTS {0};
CREATE TABLE {0} (
@@ -567,7 +558,7 @@ def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
) """.format(result_summary_table, grouping_cols_clause))
else:
result_summary_table = ''
- if residual_table and grouping_cols:
+ if residual_table:
plpy.execute("""
DROP TABLE IF EXISTS {0};
CREATE TABLE {0} (
@@ -581,25 +572,17 @@ def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
# declare variables whose values will be different for each group, if
# grouping_cols is specified
grouping_where_clause = ''
+ sparse_where_condition = ''
select_grouping_cols = ''
- temp_source_table_columns = ''
grouping_cols_values = ''
result_summary_table_temp = ''
other_columns_in_pc_table = [col for col in get_cols(pc_table) if col not in grouping_cols_list]
temp_pc_table_columns = ', '.join(other_columns_in_pc_table)
original_row_id = row_id
+
other_columns_in_table.remove(row_id)
- temp_source_table_columns = """ ROW_NUMBER() OVER({partition_over}) AS row_id,
- """.format(partition_over='' if not grouping_cols else 'PARTITION BY {0}'.format(grouping_cols)) +\
- ','.join(other_columns_in_table)
- # We can now ignore the original row_id for all computations since we will
- # create a new table with a row_id column that has perfect serially increasing
- # row_id value. This is to support the scenario where users are not forced
- # to have a row_id that follows a particular format. This restriction of having to
- # provide a serially increasing row_id value starting from 1 becomes a pain
- # point when grouping is used, since the row_id for each group will then have
- # to start from 1.
- row_id = 'row_id'
+ temp_source_table_columns = ','.join(other_columns_in_table)
+
pca_union_call_list = []
grp_id = 0
if not is_sparse:
@@ -610,15 +593,17 @@ def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
while True:
if grouping_cols:
grp_value_dict = distinct_grouping_values[grp_id]
- grouping_where_clause = ' WHERE ' + ' AND '.join([str(key)+"="+str(value) for (key, value) in grp_value_dict.items()])
+ where_conditions = ' AND '.join([str(key)+"="+str(value) for (key, value) in grp_value_dict.items()])
+ sparse_where_condition = ' AND ' + where_conditions
+ grouping_where_clause = ' WHERE ' + where_conditions
select_grouping_cols = ', ' + ', '.join([str(value)+" AS "+key for (key, value) in grp_value_dict.items()])
grouping_cols_values = ', ' + ', '.join([str(value) for (key, value) in grp_value_dict.items()])
pca_union_call_list.append("""
{schema_madlib}._pca_project_union('{source_table}', '{pc_table}', '{out_table}',
'{row_id}', '{original_row_id}', '{grouping_cols}',
- '{grouping_cols_clause}', '{residual_table}',
- '{result_summary_table}', {grp_id}, '{grouping_where_clause}', '{select_grouping_cols}',
+ '{grouping_cols_clause}', '{residual_table}', '{result_summary_table}',
+ {grp_id}, '{grouping_where_clause}', '{sparse_where_condition}','{select_grouping_cols}',
'{grouping_cols_values}', '{temp_source_table_columns}', '{temp_pc_table_columns}',
{is_sparse}, '{col_id}', '{val_id}', {row_dim}, {col_dim})
""".format(schema_madlib=schema_madlib,
@@ -630,6 +615,7 @@ def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
residual_table=residual_table,
result_summary_table=result_summary_table,
grp_id=grp_id, grouping_where_clause=grouping_where_clause,
+ sparse_where_condition=sparse_where_condition,
select_grouping_cols=select_grouping_cols,
grouping_cols_values=grouping_cols_values,
temp_source_table_columns=temp_source_table_columns,
@@ -641,31 +627,15 @@ def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
# "SELECT <query_1>, <query_2>, <query_3>, ..." is expected to run each
# <query_i> in parallel.
pca_union_call = 'SELECT ' + ', '.join(pca_union_call_list)
- try:
- plpy.execute(pca_union_call)
- except Exception as e:
- ## drop the output tables that were created if PCA errored out.
- plpy.execute("""
- DROP TABLE IF EXISTS {0};
- """.format(out_table))
- if result_summary_table:
- plpy.execute("""
- DROP TABLE IF EXISTS {0}
- """.format(result_summary_table))
- if residual_table:
- plpy.execute("""
- DROP TABLE IF EXISTS {0}
- """.format(residual_table))
- plpy.error(str(e) + "\n" + str(e.args) + "\n" + str(e.strerror))
- raise
-
+ plpy.execute(pca_union_call)
plpy.execute("SET client_min_messages TO %s" % old_msg_level)
def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
row_id, original_row_id, grouping_cols, grouping_cols_clause,
residual_table, result_summary_table, grp_id, grouping_where_clause,
- select_grouping_cols, grouping_cols_values, temp_source_table_columns,
- temp_pc_table_columns, is_sparse, col_id, val_id, row_dim, col_dim, **kwargs):
+ sparse_where_condition, select_grouping_cols, grouping_cols_values,
+ temp_source_table_columns, temp_pc_table_columns, is_sparse, col_id,
+ val_id, row_dim, col_dim, **kwargs):
"""
The pca_project is performed over each group, if any.
@@ -696,23 +666,10 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
None
"""
out_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id)
- ## Creation of this temp table is unnecessary if the scenario does not involve
- ## grouping, and/or, the input table had perfect values for the row_id column.
- ## This temp table will ensure pca works even when row_id of the source_table
- ## does not have serially increasing numbers starting from 1;
- source_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id)
- plpy.execute("""
- CREATE TEMP TABLE {source_table_grouped} AS
- SELECT {temp_source_table_columns}
- FROM {source_table}
- {grouping_where_clause}
- """.format(source_table_grouped=source_table_grouped,
- source_table=source_table, grouping_where_clause=grouping_where_clause,
- temp_source_table_columns=temp_source_table_columns))
if grouping_cols:
pc_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id)
plpy.execute("""
- CREATE TEMP TABLE {pc_table_grouped} AS
+ CREATE TABLE {pc_table_grouped} AS
SELECT {temp_pc_table_columns}
FROM {pc_table}
{grouping_where_clause}
@@ -725,15 +682,15 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
t0 = time.time() # measure the starting time
# Step 1: Validate the input arguments
if is_sparse:
- _validate_args(schema_madlib, source_table_grouped, pc_table, row_id,
- col_id, val_id, row_dim, col_dim)
# Step 1.1: Create a copy of the sparse matrix and add row_dims and col_dims
# Warning: This changes the column names of the table
sparse_table_copy = "pg_temp." + unique_string() + "_sparse_table_copy"
- create_temp_sparse_matrix_table_with_dims(source_table_grouped, sparse_table_copy,
+ create_temp_sparse_matrix_table_with_dims(source_table, sparse_table_copy,
row_id, col_id, val_id,
- row_dim, col_dim)
-
+ row_dim, col_dim, sparse_where_condition)
+ validate_sparse(sparse_table_copy,
+ {'row': row_id, 'col': col_id, 'val': val_id},
+ check_col=False)
# Step 1.2: Densify the input matrix
x_dense = "pg_temp." + unique_string() + "_dense"
plpy.execute("""
@@ -747,16 +704,29 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
DROP TABLE IF EXISTS {0};
""".format(sparse_table_copy))
source_table_grouped = x_dense
- row_id = 'row_id'
else:
+ # For Dense matrix format only:
+ # We can now ignore the original row_id for all computations since we will
+ # create a new table with a row_id column that has not duplicates and ranges
+ # from 1 to number of rows in the group/table. This is to mainly support the
+ # grouping scneario where the row_id values might not range between 1 and
+ # number of rows in the group, for each group. Doing this also just extends
+ # this behavior for non-grouping scenarios too. If creating a new temp table
+ # that corrects the row_id column is not of much importance in non-grouping
+ # cases, we can avoid creating the temp table and save some computation time.
+ # But, at the moment, the code creates the temp table even for the non-grouping
+ # scenario.
+ # We don't need to do this for sparse representation because of the nature
+ # of its definition.
+
# Preserve the mapping between new row_id created and the original row_id. This is
# required only for dense input format.
temp_row_id = original_row_id + unique_string()
row_id_map_table = "rowid" + unique_string()
plpy.execute("""
- CREATE TEMP TABLE {row_id_map_table} AS
+ CREATE TABLE {row_id_map_table} AS
SELECT
- {original_row_id} AS {temp_row_id},
+ {source_table}.{original_row_id} AS {temp_row_id},
{select_clause}
FROM {source_table}
{grouping_where_clause}
@@ -764,14 +734,31 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
original_row_id=original_row_id,
temp_row_id=temp_row_id,
source_table=source_table,
- select_clause="""
- ROW_NUMBER() OVER({partition_over}) AS row_id
- """.format(partition_over='' if not grouping_cols else 'PARTITION BY {0}'.format(grouping_cols)),
+ select_clause=""" ROW_NUMBER() OVER() AS row_id """,
grouping_where_clause=grouping_where_clause))
- ## Validate the arguments
- _validate_args(schema_madlib, source_table_grouped, pc_table,
- row_id, None, None, None, None)
+ # Creation of this temp table is unnecessary if the scenario does not involve
+ # grouping, and/or, the input table had perfect values for the row_id column.
+ # This temp table will ensure pca works even when row_id of the source_table
+ # does not have serially increasing numbers starting from 1;
+ source_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id)
+ plpy.execute("""
+ CREATE TABLE {source_table_grouped} AS
+ SELECT {row_id_map_table}.row_id, {temp_source_table_columns}
+ FROM
+ (
+ SELECT *
+ FROM {source_table}
+ {grouping_where_clause}
+ ) t1
+ INNER JOIN {row_id_map_table}
+ ON {row_id_map_table}.{temp_row_id}=t1.{row_id}
+ """.format(source_table_grouped=source_table_grouped,
+ temp_row_id=temp_row_id, row_id_map_table=row_id_map_table, row_id=row_id,
+ source_table=source_table, grouping_where_clause=grouping_where_clause,
+ temp_source_table_columns=temp_source_table_columns))
+
+ row_id = 'row_id'
# Make sure that the table has row_id and row_vec
source_table_copy = "pg_temp." + unique_string()
need_new_column_names = cast_dense_input_table_to_correct_columns(
@@ -779,7 +766,6 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
if(need_new_column_names):
source_table_grouped = source_table_copy
-
[row_dim, col_dim] = get_dims(source_table_grouped,
{'row': 'row_id', 'col': 'col_id',
'val': 'row_vec'})
@@ -833,10 +819,7 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
# Residual table: res = mat - proj
create_residual_table = False
if residual_table or result_summary_table:
- if grouping_cols:
- residual_table_grouped = "pg_temp." + unique_string() + "_temp_residual"
- else:
- residual_table_grouped = residual_table
+ residual_table_grouped = "pg_temp." + unique_string() + "_temp_residual"
create_temp_residual_table = False
if not residual_table:
create_temp_residual_table = True
@@ -928,14 +911,13 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
if is_sparse:
## We don't have to join based on row_id for sparse project.
if create_residual_table:
- if grouping_cols:
- plpy.execute("""
- INSERT INTO {residual_table}
- SELECT * {select_grouping_cols}
- FROM {residual_table_grouped}
- """.format(residual_table=residual_table,
- select_grouping_cols=select_grouping_cols,
- residual_table_grouped=residual_table_grouped))
+ plpy.execute("""
+ INSERT INTO {residual_table}
+ SELECT * {select_grouping_cols}
+ FROM {residual_table_grouped}
+ """.format(residual_table=residual_table,
+ select_grouping_cols=select_grouping_cols,
+ residual_table_grouped=residual_table_grouped))
plpy.execute("""
INSERT INTO {out_table}
SELECT * {select_grouping_cols}
@@ -954,21 +936,16 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
out_table_cols=', '.join(output_table_cols),
select_grouping_cols=select_grouping_cols)
if create_residual_table:
- if grouping_cols:
- plpy.execute("""
- INSERT INTO {residual_table}
- SELECT {select_clause}
- FROM {residual_table_grouped}
- INNER JOIN {row_id_map_table}
- ON {row_id_map_table}.row_id={residual_table_grouped}.row_id
- """.format(residual_table=residual_table,
- select_clause=output_table_select_clause,
- residual_table_grouped=residual_table_grouped,
- row_id_map_table=row_id_map_table))
- plpy.execute("""
- DROP TABLE IF EXISTS {0}
- """.format(residual_table_grouped))
-
+ plpy.execute("""
+ INSERT INTO {residual_table}
+ SELECT {select_clause}
+ FROM {residual_table_grouped}
+ INNER JOIN {row_id_map_table}
+ ON {row_id_map_table}.row_id={residual_table_grouped}.row_id
+ """.format(residual_table=residual_table,
+ select_clause=output_table_select_clause,
+ residual_table_grouped=residual_table_grouped,
+ row_id_map_table=row_id_map_table))
plpy.execute("""
INSERT INTO {out_table}
SELECT {select_clause}
@@ -982,6 +959,10 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
plpy.execute("""
DROP TABLE IF EXISTS {0};
""".format(row_id_map_table))
+ if residual_table or result_summary_table:
+ plpy.execute("""
+ DROP TABLE IF EXISTS {0}
+ """.format(residual_table_grouped))
plpy.execute("""
DROP TABLE IF EXISTS {0};
DROP TABLE IF EXISTS {1};
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/edb69dd4/src/ports/postgres/modules/pca/pca_project.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/pca/pca_project.sql_in b/src/ports/postgres/modules/pca/pca_project.sql_in
index 814292c..0a0a69c 100644
--- a/src/ports/postgres/modules/pca/pca_project.sql_in
+++ b/src/ports/postgres/modules/pca/pca_project.sql_in
@@ -682,6 +682,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA._pca_project_union(
result_summary_table TEXT, -- Table name to store summary of results (Default: NULL)
grp_id INTEGER, -- a place holder id for each group
grouping_where_clause TEXT, -- WHERE clause using grouping_cols
+ sparse_where_condition TEXT, -- WHERE clause used when creating temp sparse matrix table with dims
select_grouping_cols TEXT, -- SELECT clause using grouping_cols
grouping_cols_values TEXT, -- distinct values of the grouping_cols
temp_source_table_columns TEXT, -- SELECT caluse for creating temporary copy of the source_table
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/edb69dd4/src/ports/postgres/modules/pca/test/pca.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/pca/test/pca.sql_in b/src/ports/postgres/modules/pca/test/pca.sql_in
index 20093ac..12d8ab1 100644
--- a/src/ports/postgres/modules/pca/test/pca.sql_in
+++ b/src/ports/postgres/modules/pca/test/pca.sql_in
@@ -308,48 +308,46 @@ select pca_sparse_train('sparse_mat', 'result_table_214712398172490837',
select * from result_table_214712398172490837;
select * from result_table_214712398172490838;
+-------------------------------------------------------------------------
+-- test a different column name
+alter table sparse_mat rename column row_id to rownr;
+alter table sparse_mat rename column col_id to colnr;
+alter table sparse_mat rename column val_id to vals;
+
+drop table if exists result_table_214712398172490837;
+drop table if exists result_table_214712398172490837_mean;
+select pca_sparse_train('sparse_mat', 'result_table_214712398172490837',
+'rownr', 'colnr', 'vals', 10, 10, 10);
+select * from result_table_214712398172490837;
+
-- Sparse input data with grouping column
-DROP TABLE IF EXISTS sparse_mat;
-CREATE TABLE sparse_mat (
+DROP TABLE IF EXISTS sparse_mat_grp;
+CREATE TABLE sparse_mat_grp (
id integer,
col_id integer,
val_id integer,
grp integer
);
-COPY sparse_mat (id, col_id, val_id, grp) FROM stdin delimiter '|';
+COPY sparse_mat_grp (id, col_id, val_id, grp) FROM stdin delimiter '|';
1|2|4|1
1|5|6|1
3|8|4|1
5|4|2|1
-6|6|12|2
-8|7|2|2
-8|1|2|2
-9|8|2|2
-9|3|4|2
+1|2|4|2
+1|5|6|2
+3|8|4|2
+5|4|2|2
\.
-- Learn individaul PCA models based on grouping column (grp)
drop table if exists result_table_214712398172490837;
drop table if exists result_table_214712398172490837_mean;
drop table if exists result_table_214712398172490838;
-select pca_sparse_train('sparse_mat', 'result_table_214712398172490837',
+select pca_sparse_train('sparse_mat_grp', 'result_table_214712398172490837',
'id', 'col_id', 'val_id', 10, 10, 0.8, 'grp', 0, FALSE, 'result_table_214712398172490838');
select * from result_table_214712398172490837;
select * from result_table_214712398172490838;
-------------------------------------------------------------------------
--- test a different column name
-alter table sparse_mat rename column id to rownr;
-alter table sparse_mat rename column col_id to colnr;
-alter table sparse_mat rename column val_id to vals;
-
-drop table if exists result_table_214712398172490837;
-drop table if exists result_table_214712398172490837_mean;
-select pca_sparse_train('sparse_mat', 'result_table_214712398172490837',
-'rownr', 'colnr', 'vals', 10, 10, 10);
-select * from result_table_214712398172490837;
-
-
--------------------------------------------------------------------------
drop table if exists mat;
-- Check the second input matrix format produces the same results as the first format
CREATE TABLE mat (