You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by nj...@apache.org on 2017/01/19 19:53:33 UTC

[6/6] incubator-madlib git commit: Bug fixes and minor changes

Bug fixes and minor changes

* Validate parameters earlier than what was done.
* There was an issue with pca_project on greenplum, due to
the usage of row_number() multiple times (while trying to
create a mapping table between original row_id and the
serially increasing row id introduced in the code). Changes
made to use row_number only once now.


Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/edb69dd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/edb69dd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/edb69dd4

Branch: refs/heads/master
Commit: edb69dd4126125f0a6ecc7eaf4149022da157442
Parents: 8bd46ae
Author: Nandish Jayaram <nj...@users.noreply.github.com>
Authored: Thu Jan 5 09:48:02 2017 -0800
Committer: Nandish Jayaram <nj...@users.noreply.github.com>
Committed: Wed Jan 18 14:05:09 2017 -0800

----------------------------------------------------------------------
 .../postgres/modules/linalg/matrix_ops.py_in    |   9 +-
 src/ports/postgres/modules/pca/pca.py_in        | 154 +++++------
 src/ports/postgres/modules/pca/pca.sql_in       |   1 +
 .../postgres/modules/pca/pca_project.py_in      | 257 +++++++++----------
 .../postgres/modules/pca/pca_project.sql_in     |   1 +
 src/ports/postgres/modules/pca/test/pca.sql_in  |  42 ++-
 6 files changed, 219 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/edb69dd4/src/ports/postgres/modules/linalg/matrix_ops.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/linalg/matrix_ops.py_in b/src/ports/postgres/modules/linalg/matrix_ops.py_in
index 51ae7e3..9f3215c 100644
--- a/src/ports/postgres/modules/linalg/matrix_ops.py_in
+++ b/src/ports/postgres/modules/linalg/matrix_ops.py_in
@@ -86,7 +86,8 @@ def _matrix_column_to_array_format(source_table, row_id, output_table,
 def create_temp_sparse_matrix_table_with_dims(source_table,
                                               out_table,
                                               row_id, col_id, value,
-                                              row_dim, col_dim):
+                                              row_dim, col_dim,
+                                              sparse_where_condition=None):
     """
     Make a copy of the input sparse table and add (row_dim, col_dim, NULL) to it
 
@@ -102,6 +103,8 @@ def create_temp_sparse_matrix_table_with_dims(source_table,
     Returns:
         None
     """
+    if not sparse_where_condition:
+        sparse_where_condition = ''
     plpy.execute("""
                  CREATE TABLE {out_table} as
                      SELECT
@@ -110,11 +113,13 @@ def create_temp_sparse_matrix_table_with_dims(source_table,
                          {value}
                      FROM {source_table}
                      WHERE {value} is not NULL
+                     {sparse_where_condition}
                  """.format(row_id=row_id,
                             col_id=col_id,
                             value=value,
                             source_table=source_table,
-                            out_table=out_table))
+                            out_table=out_table,
+                            sparse_where_condition=sparse_where_condition))
     res_row_dim, res_col_dim = get_dims(out_table, {'row': row_id,
                                                     'col': col_id,
                                                     'val': value})

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/edb69dd4/src/ports/postgres/modules/pca/pca.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/pca/pca.py_in b/src/ports/postgres/modules/pca/pca.py_in
index 9a13b2f..71146d5 100644
--- a/src/ports/postgres/modules/pca/pca.py_in
+++ b/src/ports/postgres/modules/pca/pca.py_in
@@ -108,7 +108,15 @@ def pca_wrap(schema_madlib, source_table, pc_table, row_id,
                                   """)[0]['setting']
     plpy.execute('SET client_min_messages TO warning')
     grouping_cols_list = []
-    _validate_args_output_table(pc_table, result_summary_table)
+    if is_sparse:
+        _validate_args(schema_madlib, source_table, pc_table, k, row_id, col_id,
+                   val_id, row_dim, col_dim, lanczos_iter,
+                   use_correlation, result_summary_table, variance)
+    else:
+        _validate_args(schema_madlib, source_table, pc_table, k,
+                   row_id, None, None, None, None,
+                   lanczos_iter, use_correlation,
+                   result_summary_table,variance)
     if(grouping_cols):
         # validate the grouping columns. We currently only support grouping_cols
         # to be column names in the source_table, and not expressions!
@@ -162,21 +170,26 @@ def pca_wrap(schema_madlib, source_table, pc_table, row_id,
     # declare variables whose values will be different for each group, if
     # grouping_cols is specified
     grouping_where_clause = ''
+    sparse_where_condition = ''
     select_grouping_cols = ''
     temp_table_columns = ''
     result_summary_table_temp = ''
-    other_columns_in_table.remove(row_id)
-    temp_table_columns = """ ROW_NUMBER() OVER({partition_over}) AS row_id,
-                    """.format(partition_over='' if not grouping_cols else 'PARTITION BY {0}'.format(grouping_cols)) +\
-                    ','.join(other_columns_in_table)
+    # For Dense matrix format only:
     # We can now ignore the original row_id for all computations since we will
-    # create a new table with a row_id column that has perfect serially increasing
-    # row_id value. This is to support the scenario where users are not forced
-    # to have a row_id that follows a particular format. This restriction of having to
-    # provide a serially increasing row_id value starting from 1 becomes a pain
-    # point when grouping is used, since the row_id for each group will then have
-    # to start from 1.
-    row_id = 'row_id'
+    # create a new table with a row_id column that has not duplicates and ranges
+    # from 1 to number of rows in the group/table. This is to mainly support the
+    # grouping scneario where the row_id values might not range between 1 and
+    # number of rows in the group, for each group. Doing this also just extends
+    # this behavior for non-grouping scenarios too. If creating a new temp table
+    # that corrects the row_id column is not of much importance in non-grouping
+    # cases, we can avoid creating the temp table and save some computation time.
+    # But, at the moment, the code creates the temp table even for the non-grouping
+    # scenario.
+    # We don't need to do this for sparse representation because of the nature
+    # of its definition.
+    other_columns_in_table.remove(row_id)
+    temp_table_columns = """ ROW_NUMBER() OVER() AS row_id, """ + ','.join(other_columns_in_table)
+
     pca_union_call_list = []
     grp_id = 0
     if not is_sparse:
@@ -189,16 +202,18 @@ def pca_wrap(schema_madlib, source_table, pc_table, row_id,
             result_summary_table_temp = result_summary_table + unique_string() + "_" + str(grp_id)
         if grouping_cols:
             grp_value_dict = distinct_grouping_values[grp_id]
-            grouping_where_clause = ' WHERE ' + ' AND '.join([str(key)+"="+str(value) for (key, value) in grp_value_dict.items()])
+            where_conditions = ' AND '.join([str(key)+"="+str(value) for (key, value) in grp_value_dict.items()])
+            sparse_where_condition = ' AND ' + where_conditions
+            grouping_where_clause = ' WHERE ' + where_conditions
             select_grouping_cols = ', ' + ', '.join([str(value)+" AS "+key for (key, value) in grp_value_dict.items()])
 
         pca_union_call_list.append("""
             {schema_madlib}._pca_union('{source_table}', '{pc_table}', '{pc_table_mean}', '{row_id}',
                 {k}, '{grouping_cols}', {lanczos_iter}, {use_correlation},
                 '{result_summary_table}', '{result_summary_table_temp}', {variance},
-                {grp_id}, '{grouping_where_clause}', '{select_grouping_cols}',
-                '{temp_table_columns}', {is_sparse}, '{col_id}', '{val_id}',
-                {row_dim}, {col_dim})
+                {grp_id}, '{grouping_where_clause}', '{sparse_where_condition}',
+                '{select_grouping_cols}', '{temp_table_columns}', {is_sparse},
+                '{col_id}', '{val_id}', {row_dim}, {col_dim})
             """.format(schema_madlib=schema_madlib,
                 source_table=source_table, pc_table=pc_table,
                 pc_table_mean=pc_table_mean, row_id=row_id,
@@ -208,6 +223,7 @@ def pca_wrap(schema_madlib, source_table, pc_table, row_id,
                 result_summary_table_temp=result_summary_table_temp,
                 variance='NULL' if variance==None else variance,
                 grp_id=grp_id, grouping_where_clause=grouping_where_clause,
+                sparse_where_condition=sparse_where_condition,
                 select_grouping_cols=select_grouping_cols,
                 temp_table_columns=temp_table_columns, is_sparse=is_sparse,
                 col_id=col_id, val_id=val_id, row_dim=row_dim, col_dim=col_dim))
@@ -217,20 +233,7 @@ def pca_wrap(schema_madlib, source_table, pc_table, row_id,
     # "SELECT <query_1>, <query_2>, <query_3>, ..." is expected to run each
     # <query_i> in parallel.
     pca_union_call = 'SELECT ' + ', '.join(pca_union_call_list)
-    try:
-        plpy.execute(pca_union_call)
-    except Exception as e:
-        ## drop the output tables that were created if PCA errored out.
-        plpy.execute("""
-            DROP TABLE IF EXISTS {0};
-            DROP TABLE IF EXISTS {1};
-            """.format(pc_table, pc_table_mean))
-        if result_summary_table:
-            plpy.execute("""
-                DROP TABLE IF EXISTS {0};
-                """.format(result_summary_table))
-        plpy.error(str(e) + "\n" + str(e.args) + "\n" + str(e.strerror))
-        raise
+    plpy.execute(pca_union_call)
 
     plpy.execute("SET client_min_messages TO %s" % old_msg_level)
 
@@ -238,9 +241,9 @@ def pca_wrap(schema_madlib, source_table, pc_table, row_id,
 def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
         row_id, k, grouping_cols, lanczos_iter, use_correlation,
         result_summary_table, result_summary_table_temp, variance,
-        grp_id, grouping_where_clause, select_grouping_cols,
-        temp_table_columns, is_sparse, col_id, val_id, row_dim,
-        col_dim, **kwargs):
+        grp_id, grouping_where_clause, sparse_where_condition,
+        select_grouping_cols, temp_table_columns, is_sparse, col_id,
+        val_id, row_dim, col_dim, **kwargs):
     """
     This function does all the heavy lifting of PCA, for both pca and pca_sparse.
     Compute the PCA of the matrix in source_table. This function is the specific
@@ -272,36 +275,22 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
     Returns:
         None
     """
-    ## Creation of this temp table is unnecessary if the scenario does not involve
-    ## grouping, and/or, the input table had perfect values for the row_id column.
-    ## This temp table will ensure pca works even when row_id of the source_table
-    ## does not have serially increasing numbers starting from 1;
-    source_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id)
-    plpy.execute("""
-            CREATE TEMP TABLE {source_table_grouped} AS
-            SELECT {temp_table_columns}
-            FROM {source_table}
-            {grouping_where_clause}
-        """.format(source_table_grouped=source_table_grouped,
-            source_table=source_table, grouping_where_clause=grouping_where_clause,
-            temp_table_columns=temp_table_columns))
     startTime = time.time()  # measure the starting time
-    # Step 1: Validate the input arguments
+    # Step 1: Modify data format for sparse input
     if is_sparse:
-        _validate_args(schema_madlib, source_table_grouped, k, row_id, col_id,
-               val_id, row_dim, col_dim, lanczos_iter,
-               use_correlation, variance)
         # Step 1.1: Densify the matrix for sparse input tables
         # We densify the matrix because the recentering process will generate a
         # dense matrix, so we just wrap around regular PCA.
         # First we must copy the sparse matrix and add in the dimension information
         sparse_temp = "pg_temp." + unique_string() + "_sparse"
-
         # Add in the dimension information needed by the densifying process
-        create_temp_sparse_matrix_table_with_dims(source_table_grouped, sparse_temp,
+        create_temp_sparse_matrix_table_with_dims(source_table, sparse_temp,
                                                   row_id, col_id, val_id,
-                                                  row_dim, col_dim)
-
+                                                  row_dim, col_dim, sparse_where_condition)
+        validate_sparse(sparse_temp,
+                        {'row': row_id, 'col': col_id, 'val': val_id},
+                        check_col=False)
+        # Step 1.2: Densify the input matrix
         x_dense = "pg_temp." + unique_string() + "_dense"
         plpy.execute("""
             SELECT {schema_madlib}.matrix_densify(
@@ -315,12 +304,21 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
             DROP TABLE IF EXISTS {0};
             """.format(sparse_temp))
         source_table_grouped = x_dense
-        row_id = 'row_id'
     else:
-        _validate_args(schema_madlib, source_table_grouped, k,
-                       row_id, None, None, None, None,
-                       lanczos_iter, use_correlation, variance)
-
+        # Creation of this temp table is unnecessary if the scenario does not involve
+        # grouping, and/or, the input table had perfect values for the row_id column.
+        # This temp table will ensure pca works even when the value of row_id column
+        # in dense matrix format does not have values ranging from 1 to number of rows.
+        source_table_grouped = unique_string() + "group_" + str(grp_id)
+        plpy.execute("""
+                    CREATE TABLE {source_table_grouped} AS
+                    SELECT {temp_table_columns}
+                    FROM {source_table}
+                    {grouping_where_clause}
+                """.format(source_table_grouped=source_table_grouped,
+                    source_table=source_table, grouping_where_clause=grouping_where_clause,
+                    temp_table_columns=temp_table_columns))
+    row_id = 'row_id'
     # Make sure that the table has row_id and row_vec
     source_table_copy = "pg_temp." + unique_string() + "_reformated_names"
     created_new_table = cast_dense_input_table_to_correct_columns(
@@ -328,7 +326,6 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
 
     if(created_new_table):
         source_table_grouped = source_table_copy
-
     [row_dim, col_dim] = get_dims(source_table_grouped,
                                   {'row': 'row_id', 'col': 'col_id',
                                    'val': 'row_vec'})
@@ -351,10 +348,8 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
     else:
         if variance: #lanczos_iter overrides the proportion default for k
             curK = lanczos_iter
-
     # Note: we currently don't support grouping columns or correlation matrices
     if not use_correlation:
-
         # Step 2: Normalize the data (Column means)
         dimension = col_dim
         scaled_source_table = "pg_temp." + unique_string() + "_scaled_table"
@@ -371,7 +366,6 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
             result_summary_table_string = ''
         else:
             result_summary_table_string = ", '{0}'".format(result_summary_table_temp)
-
         # Step 4: Perform SVD
         # Step 4.1: Perform upper part of SVD
         if result_summary_table_temp:
@@ -392,7 +386,6 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
             )
             FROM {scaled_source_table}
             """.format(**locals()))[0]['array_sum']
-
         # Step 4.2: Adjust the k value
         if variance:
             variance_tmp_table = "pg_temp."+ unique_string()+ "_var_tmp"
@@ -416,7 +409,6 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
             plpy.execute("""
                 DROP TABLE IF EXISTS {variance_tmp_table}
                 """.format(variance_tmp_table=variance_tmp_table))
-
         # Step 4.3: Perform the lower part of SVD
         tmp_matrix_table = "temp_"+ unique_string()+ "_matrix"
         tmp_matrix_s_table = add_postfix(tmp_matrix_table, "_s")
@@ -444,7 +436,6 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
             tmp_matrix_table = svd_output_temp_table
             _svd_lower_wrap(schema_madlib, source_table_svd,
                 svd_output_temp_table, row_id, curK, lanczos_iter, bd_pref)
-
         # Step 4.4: Create the SVD result table
         if result_summary_table_temp:
             t1 = time.time()
@@ -559,6 +550,7 @@ def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
 # ------------------------------------------------------------------------
 def _validate_args(schema_madlib,
                    source_table,
+                   pc_table,
                    k,
                    row_id,
                    col_id=None,
@@ -567,12 +559,14 @@ def _validate_args(schema_madlib,
                    col_dim=None,
                    lanczos_iter=0,
                    use_correlation=False,
+                   result_summary_table=None,
                    variance=None):
     """
     Validates all arguments passed to the PCA function
     Args:
         @param schema_madlib    Name of the schema where MADlib is installed
         @param source_table     Name of the source table
+        @param output_table     Name of the output table
         @param k                Number of singular vectors to return
         @param row_id           Name of the row_id column
         @param col_id           Name of the col_id column
@@ -580,6 +574,7 @@ def _validate_args(schema_madlib,
         @param grouping_cols    The columns that the data should be grouped by
         @param lanczos_iter     The number of lanczos iterations to use in the SVD calculation
         @param use_correlation  If the correlation matrix should be used instead of the covariance matrix
+        @param result_summary_table  Name of summary table
         @param variance         Proportion of variance
 
     Returns:
@@ -603,6 +598,14 @@ def _validate_args(schema_madlib,
         if (variance <= 0) or (variance >1):
             plpy.error("""PCA error: components_param must be either
                 a positive integer or a float in the range (0.0,1.0]!""")
+    # confirm output tables are valid
+    if pc_table:
+        _assert(not table_exists(pc_table, only_first_schema=True) and
+                not table_exists(pc_table + '_mean', only_first_schema=True),
+                "PCA error: Output table {pc_table}/{pc_table}_mean "
+                "already exist!".format(pc_table=pc_table))
+    else:
+        plpy.error("PCA error: Invalid output table prefix!")
 
     _assert(columns_exist_in_table(source_table, [row_id], schema_madlib),
             "PCA error: {1} column does not exist in {0}!".
@@ -640,33 +643,18 @@ def _validate_args(schema_madlib,
         if col_dim <= 0:
             plpy.error("PCA error: The column dimension must be larger than 0!")
 
-        validate_sparse(source_table,
-                        {'row': row_id, 'col': col_id, 'val': val_id},
-                        check_col=False)
     if use_correlation:
         plpy.error("PCA error: Using the correlation matrix is not enabled! \
         This value must be set to FALSE")
-# ========================================================================
 
-def _validate_args_output_table(pc_table, result_summary_table=None):
-    """
-        confirm output tables are valid
-        @param pc_table     Name of the output table
-        @param result_summary_table  Name of summary table
-    """
-    if pc_table:
-        _assert(not table_exists(pc_table, only_first_schema=True) and
-                not table_exists(pc_table + '_mean', only_first_schema=True),
-                "PCA error: Output table {pc_table}/{pc_table}_mean "
-                "already exist!".format(pc_table=pc_table))
-    else:
-        plpy.error("PCA error: Invalid output table prefix!")
     if result_summary_table:
         if not result_summary_table.strip():
             plpy.error("PCA error: Invalid result summary table name!")
         _assert(not table_exists(result_summary_table, only_first_schema=True),
                 "PCA error: Result summary table {0} \
                         already exists!".format(result_summary_table))
+# ========================================================================
+
 
 def _recenter_data(schema_madlib, source_table, output_table, row_id,
                    col_name, dimension):

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/edb69dd4/src/ports/postgres/modules/pca/pca.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/pca/pca.sql_in b/src/ports/postgres/modules/pca/pca.sql_in
index 3edf304..9de5559 100644
--- a/src/ports/postgres/modules/pca/pca.sql_in
+++ b/src/ports/postgres/modules/pca/pca.sql_in
@@ -1022,6 +1022,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA._pca_union(
     variance              DOUBLE PRECISION,   -- The proportion of variance (Default: NULL)
     grp_id                INTEGER, -- a place holder id for each group
     grouping_where_clause TEXT,    -- WHERE clause using grouping_cols
+    sparse_where_condition TEXT,   -- WHERE clause used when creating temp sparse matrix table with dims
     select_grouping_cols  TEXT,    -- SELECT clause using grouping_cols
     temp_table_columns    TEXT,    -- SELECT caluse for creating temporary copy of the source_table
     is_sparse             BOOLEAN, -- specifies if the PCA call is for sparse or dense matrices

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/edb69dd4/src/ports/postgres/modules/pca/pca_project.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/pca/pca_project.py_in b/src/ports/postgres/modules/pca/pca_project.py_in
index 1e61d3c..52d14ec 100644
--- a/src/ports/postgres/modules/pca/pca_project.py_in
+++ b/src/ports/postgres/modules/pca/pca_project.py_in
@@ -331,54 +331,17 @@ SELECT {schema_madlib}.pca_sparse_project('usage');
 --
         """.format(schema_madlib=schema_madlib)
 
-
-def _validate_args_output_table(out_table, residual_table=None,
-                   result_summary_table=None):
-    """
-    Validates the output table(s) arguments to the PCA project function
-
-    Args:
-        @param out_table  Name of output table to store projection result
-        @param residual_table  Name of the residual table (to store error in projection)
-        @param result_summary_table  Name of result summary table
-    Returns:
-        None
-    Throws:
-        plpy.error if any argument is invalid
-    """
-    # Make sure that the output table does not exist
-    # Also check that the output table is not null
-    _assert(out_table and out_table.strip(),
-            "PCA error: Invalid output table name.")
-    _assert(not table_exists(out_table, only_first_schema=True),
-            "PCA error: Output table {0} already exists!".format(str(out_table)))
-
-        # Check that the result summary table is not empty
-    if result_summary_table is not None:
-        _assert(result_summary_table.strip(),
-                "PCA error: Invalid result summary table name!")
-        _assert(not table_exists(result_summary_table, only_first_schema=True),
-                "PCA error: Result summary table {0} already exists!".
-                format(result_summary_table))
-
-    # Check that the result summary table is not empty
-    if residual_table is not None:
-        _assert(residual_table.strip(),
-                "PCA error: Invalid residual table name!")
-        _assert(not table_exists(residual_table, only_first_schema=True),
-                "PCA error: Residual table {0} already exists!".
-                format(residual_table))
-
-# Validate arguments: Same as pca
-# ------------------------------------------------------------------------
 def _validate_args(schema_madlib,
                    source_table,
                    pc_table,
+                   out_table,
                    row_id,
                    col_id=None,
                    val_id=None,
                    row_dim=None,
-                   col_dim=None):
+                   col_dim=None,
+                   residual_table=None,
+                   result_summary_table=None):
     """
     Validates all arguments passed to the PCA function
 
@@ -408,6 +371,29 @@ def _validate_args(schema_madlib,
     _assert(table_exists(add_postfix(pc_table, "_mean")),
             "PCA error: Source data table column means does not exist!")
 
+    # Make sure that the output table does not exist
+    # Also check that the output table is not null
+    _assert(out_table and out_table.strip(),
+            "PCA error: Invalid output table name.")
+    _assert(not table_exists(out_table, only_first_schema=True),
+            "PCA error: Output table {0} already exists!".format(str(out_table)))
+
+    # Check that the result summary table is not empty
+    if result_summary_table is not None:
+        _assert(result_summary_table.strip(),
+                "PCA error: Invalid result summary table name!")
+        _assert(not table_exists(result_summary_table, only_first_schema=True),
+                "PCA error: Result summary table {0} already exists!".
+                format(result_summary_table))
+
+    # Check that the result summary table is not empty
+    if residual_table is not None:
+        _assert(residual_table.strip(),
+                "PCA error: Invalid residual table name!")
+        _assert(not table_exists(residual_table, only_first_schema=True),
+                "PCA error: Residual table {0} already exists!".
+                format(residual_table))
+
     # Check that the row_id exists
     _assert(columns_exist_in_table(source_table, [row_id], schema_madlib),
             "PCA error: {1} column does not exist in {0}!".
@@ -428,11 +414,9 @@ def _validate_args(schema_madlib,
         _assert(row_dim > 0 and col_dim > 0,
                 "PCA error: row_dim/col_dim should be positive integer")
 
-        validate_sparse(source_table,
-                        {'row': row_id, 'col': col_id, 'val': val_id},
-                        check_col=False)
 # ------------------------------------------------------------------------
 
+
 def pca_sparse_project(schema_madlib,
                        source_table,
                        pc_table,
@@ -523,6 +507,14 @@ def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
                                   WHERE name='client_min_messages'
                                   """)[0]['setting']
     plpy.execute('SET client_min_messages TO warning')
+    if is_sparse:
+        _validate_args(schema_madlib, source_table, pc_table, out_table,
+                   row_id, col_id, val_id, row_dim, col_dim, residual_table,
+                   result_summary_table)
+    else:
+        _validate_args(schema_madlib, source_table, pc_table, out_table,
+                   row_id, None, None, None, None,
+                   residual_table, result_summary_table)
     # If we add new columns to the pca_train output table in the future, they should
     # be included in this list:
     pc_table_model_cols = ['row_id', 'principal_components', 'std_dev', 'proportion']
@@ -530,7 +522,6 @@ def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
     grouping_cols = ''
     if grouping_cols_list:
         grouping_cols = ', '.join(grouping_cols_list)
-    _validate_args_output_table(out_table, residual_table, result_summary_table)
 
     other_columns_in_table = [col for col in get_cols(source_table) if col not in grouping_cols_list]
     grouping_cols_clause = ''
@@ -547,7 +538,7 @@ def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
         cols_names_types = get_cols_and_types(source_table)
         grouping_cols_clause = ', ' + ', '.join([c_name+" "+c_type
             for (c_name, c_type) in cols_names_types if c_name in grouping_cols_list])
-    ## Create all output tables
+    # Create all output tables
     plpy.execute("""
             DROP TABLE IF EXISTS {0};
             CREATE TABLE {0} (
@@ -567,7 +558,7 @@ def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
                 ) """.format(result_summary_table, grouping_cols_clause))
     else:
         result_summary_table = ''
-    if residual_table and grouping_cols:
+    if residual_table:
         plpy.execute("""
             DROP TABLE IF EXISTS {0};
             CREATE TABLE {0} (
@@ -581,25 +572,17 @@ def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
     # declare variables whose values will be different for each group, if
     # grouping_cols is specified
     grouping_where_clause = ''
+    sparse_where_condition = ''
     select_grouping_cols = ''
-    temp_source_table_columns = ''
     grouping_cols_values = ''
     result_summary_table_temp = ''
     other_columns_in_pc_table = [col for col in get_cols(pc_table) if col not in grouping_cols_list]
     temp_pc_table_columns = ', '.join(other_columns_in_pc_table)
     original_row_id = row_id
+
     other_columns_in_table.remove(row_id)
-    temp_source_table_columns = """ ROW_NUMBER() OVER({partition_over}) AS row_id,
-                        """.format(partition_over='' if not grouping_cols else 'PARTITION BY {0}'.format(grouping_cols)) +\
-                        ','.join(other_columns_in_table)
-    # We can now ignore the original row_id for all computations since we will
-    # create a new table with a row_id column that has perfect serially increasing
-    # row_id value. This is to support the scenario where users are not forced
-    # to have a row_id that follows a particular format. This restriction of having to
-    # provide a serially increasing row_id value starting from 1 becomes a pain
-    # point when grouping is used, since the row_id for each group will then have
-    # to start from 1.
-    row_id = 'row_id'
+    temp_source_table_columns = ','.join(other_columns_in_table)
+
     pca_union_call_list = []
     grp_id = 0
     if not is_sparse:
@@ -610,15 +593,17 @@ def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
     while True:
         if grouping_cols:
             grp_value_dict = distinct_grouping_values[grp_id]
-            grouping_where_clause = ' WHERE ' + ' AND '.join([str(key)+"="+str(value) for (key, value) in grp_value_dict.items()])
+            where_conditions = ' AND '.join([str(key)+"="+str(value) for (key, value) in grp_value_dict.items()])
+            sparse_where_condition = ' AND ' + where_conditions
+            grouping_where_clause = ' WHERE ' + where_conditions
             select_grouping_cols = ', ' + ', '.join([str(value)+" AS "+key for (key, value) in grp_value_dict.items()])
             grouping_cols_values = ', ' + ', '.join([str(value) for (key, value) in grp_value_dict.items()])
 
         pca_union_call_list.append("""
             {schema_madlib}._pca_project_union('{source_table}', '{pc_table}', '{out_table}',
                 '{row_id}', '{original_row_id}', '{grouping_cols}',
-                '{grouping_cols_clause}', '{residual_table}',
-                '{result_summary_table}', {grp_id}, '{grouping_where_clause}', '{select_grouping_cols}',
+                '{grouping_cols_clause}', '{residual_table}', '{result_summary_table}',
+                {grp_id}, '{grouping_where_clause}', '{sparse_where_condition}','{select_grouping_cols}',
                 '{grouping_cols_values}', '{temp_source_table_columns}', '{temp_pc_table_columns}',
                 {is_sparse}, '{col_id}', '{val_id}', {row_dim}, {col_dim})
             """.format(schema_madlib=schema_madlib,
@@ -630,6 +615,7 @@ def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
                 residual_table=residual_table,
                 result_summary_table=result_summary_table,
                 grp_id=grp_id, grouping_where_clause=grouping_where_clause,
+                sparse_where_condition=sparse_where_condition,
                 select_grouping_cols=select_grouping_cols,
                 grouping_cols_values=grouping_cols_values,
                 temp_source_table_columns=temp_source_table_columns,
@@ -641,31 +627,15 @@ def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
     # "SELECT <query_1>, <query_2>, <query_3>, ..." is expected to run each
     # <query_i> in parallel.
     pca_union_call = 'SELECT ' + ', '.join(pca_union_call_list)
-    try:
-        plpy.execute(pca_union_call)
-    except Exception as e:
-        ## drop the output tables that were created if PCA errored out.
-        plpy.execute("""
-            DROP TABLE IF EXISTS {0};
-            """.format(out_table))
-        if result_summary_table:
-            plpy.execute("""
-                    DROP TABLE IF EXISTS {0}
-                """.format(result_summary_table))
-        if residual_table:
-            plpy.execute("""
-                    DROP TABLE IF EXISTS {0}
-                """.format(residual_table))
-        plpy.error(str(e) + "\n" + str(e.args) + "\n" + str(e.strerror))
-        raise
-
+    plpy.execute(pca_union_call)
     plpy.execute("SET client_min_messages TO %s" % old_msg_level)
 
 def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
         row_id, original_row_id, grouping_cols, grouping_cols_clause,
         residual_table, result_summary_table, grp_id, grouping_where_clause,
-        select_grouping_cols, grouping_cols_values, temp_source_table_columns,
-        temp_pc_table_columns, is_sparse, col_id, val_id, row_dim, col_dim, **kwargs):
+        sparse_where_condition, select_grouping_cols, grouping_cols_values,
+        temp_source_table_columns, temp_pc_table_columns, is_sparse, col_id,
+        val_id, row_dim, col_dim, **kwargs):
     """
     The pca_project is performed over each group, if any.
 
@@ -696,23 +666,10 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
         None
     """
     out_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id)
-    ## Creation of this temp table is unnecessary if the scenario does not involve
-    ## grouping, and/or, the input table had perfect values for the row_id column.
-    ## This temp table will ensure pca works even when row_id of the source_table
-    ## does not have serially increasing numbers starting from 1;
-    source_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id)
-    plpy.execute("""
-                CREATE TEMP TABLE {source_table_grouped} AS
-                SELECT {temp_source_table_columns}
-                FROM {source_table}
-                {grouping_where_clause}
-            """.format(source_table_grouped=source_table_grouped,
-                source_table=source_table, grouping_where_clause=grouping_where_clause,
-                temp_source_table_columns=temp_source_table_columns))
     if grouping_cols:
         pc_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id)
         plpy.execute("""
-                CREATE TEMP TABLE {pc_table_grouped} AS
+                CREATE TABLE {pc_table_grouped} AS
                 SELECT {temp_pc_table_columns}
                 FROM {pc_table}
                 {grouping_where_clause}
@@ -725,15 +682,15 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
     t0 = time.time()  # measure the starting time
     # Step 1: Validate the input arguments
     if is_sparse:
-        _validate_args(schema_madlib, source_table_grouped, pc_table, row_id,
-                       col_id, val_id, row_dim, col_dim)
         # Step 1.1: Create a copy of the sparse matrix and add row_dims and col_dims
         # Warning: This changes the column names of the table
         sparse_table_copy = "pg_temp." + unique_string() + "_sparse_table_copy"
-        create_temp_sparse_matrix_table_with_dims(source_table_grouped, sparse_table_copy,
+        create_temp_sparse_matrix_table_with_dims(source_table, sparse_table_copy,
                                                   row_id, col_id, val_id,
-                                                  row_dim, col_dim)
-
+                                                  row_dim, col_dim, sparse_where_condition)
+        validate_sparse(sparse_table_copy,
+                        {'row': row_id, 'col': col_id, 'val': val_id},
+                        check_col=False)
         # Step 1.2: Densify the input matrix
         x_dense = "pg_temp." + unique_string() + "_dense"
         plpy.execute("""
@@ -747,16 +704,29 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
             DROP TABLE IF EXISTS {0};
             """.format(sparse_table_copy))
         source_table_grouped = x_dense
-        row_id = 'row_id'
     else:
+        # For Dense matrix format only:
+        # We can now ignore the original row_id for all computations since we will
+        # create a new table with a row_id column that has not duplicates and ranges
+        # from 1 to number of rows in the group/table. This is to mainly support the
+        # grouping scneario where the row_id values might not range between 1 and
+        # number of rows in the group, for each group. Doing this also just extends
+        # this behavior for non-grouping scenarios too. If creating a new temp table
+        # that corrects the row_id column is not of much importance in non-grouping
+        # cases, we can avoid creating the temp table and save some computation time.
+        # But, at the moment, the code creates the temp table even for the non-grouping
+        # scenario.
+        # We don't need to do this for sparse representation because of the nature
+        # of its definition.
+
         # Preserve the mapping between new row_id created and the original row_id. This is
         # required only for dense input format.
         temp_row_id = original_row_id + unique_string()
         row_id_map_table = "rowid" + unique_string()
         plpy.execute("""
-                CREATE TEMP TABLE {row_id_map_table} AS
+                CREATE TABLE {row_id_map_table} AS
                 SELECT
-                    {original_row_id} AS {temp_row_id},
+                    {source_table}.{original_row_id} AS {temp_row_id},
                     {select_clause}
                 FROM {source_table}
                 {grouping_where_clause}
@@ -764,14 +734,31 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
                 original_row_id=original_row_id,
                 temp_row_id=temp_row_id,
                 source_table=source_table,
-                select_clause="""
-                    ROW_NUMBER() OVER({partition_over}) AS row_id
-                """.format(partition_over='' if not grouping_cols else 'PARTITION BY {0}'.format(grouping_cols)),
+                select_clause=""" ROW_NUMBER() OVER() AS row_id """,
                 grouping_where_clause=grouping_where_clause))
-        ## Validate the arguments
-        _validate_args(schema_madlib, source_table_grouped, pc_table,
-                       row_id, None, None, None, None)
 
+        # Creation of this temp table is unnecessary if the scenario does not involve
+        # grouping, and/or, the input table had perfect values for the row_id column.
+        # This temp table will ensure pca works even when row_id of the source_table
+        # does not have serially increasing numbers starting from 1;
+        source_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id)
+        plpy.execute("""
+                    CREATE TABLE {source_table_grouped} AS
+                    SELECT {row_id_map_table}.row_id, {temp_source_table_columns}
+                    FROM
+                        (
+                            SELECT *
+                            FROM {source_table}
+                            {grouping_where_clause}
+                        ) t1
+                    INNER JOIN {row_id_map_table}
+                    ON {row_id_map_table}.{temp_row_id}=t1.{row_id}
+                """.format(source_table_grouped=source_table_grouped,
+                    temp_row_id=temp_row_id, row_id_map_table=row_id_map_table, row_id=row_id,
+                    source_table=source_table, grouping_where_clause=grouping_where_clause,
+                    temp_source_table_columns=temp_source_table_columns))
+
+    row_id = 'row_id'
     # Make sure that the table has row_id and row_vec
     source_table_copy = "pg_temp." + unique_string()
     need_new_column_names = cast_dense_input_table_to_correct_columns(
@@ -779,7 +766,6 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
 
     if(need_new_column_names):
         source_table_grouped = source_table_copy
-
     [row_dim, col_dim] = get_dims(source_table_grouped,
                                   {'row': 'row_id', 'col': 'col_id',
                                    'val': 'row_vec'})
@@ -833,10 +819,7 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
     # Residual table: res = mat - proj
     create_residual_table = False
     if residual_table or result_summary_table:
-        if grouping_cols:
-            residual_table_grouped = "pg_temp." + unique_string() + "_temp_residual"
-        else:
-            residual_table_grouped = residual_table
+        residual_table_grouped = "pg_temp." + unique_string() + "_temp_residual"
         create_temp_residual_table = False
         if not residual_table:
             create_temp_residual_table = True
@@ -928,14 +911,13 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
     if is_sparse:
         ## We don't have to join based on row_id for sparse project.
         if create_residual_table:
-            if grouping_cols:
-                plpy.execute("""
-                        INSERT INTO {residual_table}
-                        SELECT * {select_grouping_cols}
-                        FROM {residual_table_grouped}
-                    """.format(residual_table=residual_table,
-                        select_grouping_cols=select_grouping_cols,
-                        residual_table_grouped=residual_table_grouped))
+            plpy.execute("""
+                    INSERT INTO {residual_table}
+                    SELECT * {select_grouping_cols}
+                    FROM {residual_table_grouped}
+                """.format(residual_table=residual_table,
+                    select_grouping_cols=select_grouping_cols,
+                    residual_table_grouped=residual_table_grouped))
         plpy.execute("""
                 INSERT INTO {out_table}
                 SELECT * {select_grouping_cols}
@@ -954,21 +936,16 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
                 out_table_cols=', '.join(output_table_cols),
                 select_grouping_cols=select_grouping_cols)
         if create_residual_table:
-            if grouping_cols:
-                plpy.execute("""
-                    INSERT INTO {residual_table}
-                    SELECT {select_clause}
-                    FROM {residual_table_grouped}
-                    INNER JOIN {row_id_map_table}
-                    ON {row_id_map_table}.row_id={residual_table_grouped}.row_id
-                    """.format(residual_table=residual_table,
-                        select_clause=output_table_select_clause,
-                        residual_table_grouped=residual_table_grouped,
-                        row_id_map_table=row_id_map_table))
-                plpy.execute("""
-                        DROP TABLE IF EXISTS {0}
-                    """.format(residual_table_grouped))
-
+            plpy.execute("""
+                INSERT INTO {residual_table}
+                SELECT {select_clause}
+                FROM {residual_table_grouped}
+                INNER JOIN {row_id_map_table}
+                ON {row_id_map_table}.row_id={residual_table_grouped}.row_id
+                """.format(residual_table=residual_table,
+                    select_clause=output_table_select_clause,
+                    residual_table_grouped=residual_table_grouped,
+                    row_id_map_table=row_id_map_table))
         plpy.execute("""
                     INSERT INTO {out_table}
                     SELECT {select_clause}
@@ -982,6 +959,10 @@ def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
         plpy.execute("""
                 DROP TABLE IF EXISTS {0};
             """.format(row_id_map_table))
+    if residual_table or result_summary_table:
+        plpy.execute("""
+                DROP TABLE IF EXISTS {0}
+            """.format(residual_table_grouped))
     plpy.execute("""
             DROP TABLE IF EXISTS {0};
             DROP TABLE IF EXISTS {1};

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/edb69dd4/src/ports/postgres/modules/pca/pca_project.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/pca/pca_project.sql_in b/src/ports/postgres/modules/pca/pca_project.sql_in
index 814292c..0a0a69c 100644
--- a/src/ports/postgres/modules/pca/pca_project.sql_in
+++ b/src/ports/postgres/modules/pca/pca_project.sql_in
@@ -682,6 +682,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA._pca_project_union(
     result_summary_table        TEXT,    -- Table name to store summary of results (Default: NULL)
     grp_id                      INTEGER, -- a place holder id for each group
     grouping_where_clause       TEXT,    -- WHERE clause using grouping_cols
+    sparse_where_condition      TEXT,   -- WHERE clause used when creating temp sparse matrix table with dims
     select_grouping_cols        TEXT,    -- SELECT clause using grouping_cols
     grouping_cols_values        TEXT,    -- distinct values of the grouping_cols
     temp_source_table_columns   TEXT,    -- SELECT caluse for creating temporary copy of the source_table

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/edb69dd4/src/ports/postgres/modules/pca/test/pca.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/pca/test/pca.sql_in b/src/ports/postgres/modules/pca/test/pca.sql_in
index 20093ac..12d8ab1 100644
--- a/src/ports/postgres/modules/pca/test/pca.sql_in
+++ b/src/ports/postgres/modules/pca/test/pca.sql_in
@@ -308,48 +308,46 @@ select pca_sparse_train('sparse_mat', 'result_table_214712398172490837',
 select * from result_table_214712398172490837;
 select * from result_table_214712398172490838;
 
+-------------------------------------------------------------------------
+-- test a different column name
+alter table sparse_mat rename column row_id to rownr;
+alter table sparse_mat rename column col_id to colnr;
+alter table sparse_mat rename column val_id to vals;
+
+drop table if exists result_table_214712398172490837;
+drop table if exists result_table_214712398172490837_mean;
+select pca_sparse_train('sparse_mat', 'result_table_214712398172490837',
+'rownr', 'colnr', 'vals', 10, 10, 10);
+select * from result_table_214712398172490837;
+
 -- Sparse input data with grouping column
-DROP TABLE IF EXISTS sparse_mat;
-CREATE TABLE sparse_mat (
+DROP TABLE IF EXISTS sparse_mat_grp;
+CREATE TABLE sparse_mat_grp (
     id integer,
     col_id integer,
     val_id integer,
     grp    integer
 );
-COPY sparse_mat (id, col_id, val_id, grp) FROM stdin delimiter '|';
+COPY sparse_mat_grp (id, col_id, val_id, grp) FROM stdin delimiter '|';
 1|2|4|1
 1|5|6|1
 3|8|4|1
 5|4|2|1
-6|6|12|2
-8|7|2|2
-8|1|2|2
-9|8|2|2
-9|3|4|2
+1|2|4|2
+1|5|6|2
+3|8|4|2
+5|4|2|2
 \.
 -- Learn individaul PCA models based on grouping column (grp)
 drop table if exists result_table_214712398172490837;
 drop table if exists result_table_214712398172490837_mean;
 drop table if exists result_table_214712398172490838;
-select pca_sparse_train('sparse_mat', 'result_table_214712398172490837',
+select pca_sparse_train('sparse_mat_grp', 'result_table_214712398172490837',
 'id', 'col_id', 'val_id', 10, 10, 0.8, 'grp', 0, FALSE, 'result_table_214712398172490838');
 select * from result_table_214712398172490837;
 select * from result_table_214712398172490838;
 
 -------------------------------------------------------------------------
--- test a different column name
-alter table sparse_mat rename column id to rownr;
-alter table sparse_mat rename column col_id to colnr;
-alter table sparse_mat rename column val_id to vals;
-
-drop table if exists result_table_214712398172490837;
-drop table if exists result_table_214712398172490837_mean;
-select pca_sparse_train('sparse_mat', 'result_table_214712398172490837',
-'rownr', 'colnr', 'vals', 10, 10, 10);
-select * from result_table_214712398172490837;
-
-
--------------------------------------------------------------------------
 drop table if exists mat;
 -- Check the second input matrix format produces the same results as the first format
 CREATE TABLE mat (