You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ok...@apache.org on 2019/04/10 22:29:37 UTC

[madlib] branch master updated (57b6e50 -> 50d8b59)

This is an automated email from the ASF dual-hosted git repository.

okislal pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git.


    from 57b6e50  DL: Simplify madlib_keras_predict interface
     new 5cee52e  WCC: Duplicate edges to improve performance on Greenplum
     new 50d8b59  Madpack: Fix IC/DC check to include wcc

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/madpack/madpack.py                      |  4 ++-
 src/ports/postgres/modules/graph/wcc.py_in  | 50 +++++++++++++++++++----------
 src/ports/postgres/modules/graph/wcc.sql_in |  5 +++
 3 files changed, 41 insertions(+), 18 deletions(-)


[madlib] 01/02: WCC: Duplicate edges to improve performance on Greenplum

Posted by ok...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

okislal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git

commit 5cee52e5bc1b980ba63eb92c6178118473421512
Author: Ekta Khanna <ek...@pivotal.io>
AuthorDate: Wed Apr 10 15:27:55 2019 -0700

    WCC: Duplicate edges to improve performance on Greenplum
    
    JIRA: MADLIB-1320
    
    Prior to this update, we used the edge table in join clauses with both
    src and dest coulmns.
    
    This commit updates the query on the message table such that the JOIN
    always operates on the column that the table is distributed on. To this
    end, we duplicate the edge table to distribute it on the dest column.
    This seperation of accesses reduces the redistribute motion on gpdb.
    
    In addition, we split the union all query to ensure that the working set
    is limited with very large graphs.
    
    Closes #364
    
    Co-authored-by: Orhan Kislal <ok...@apache.org>
---
 src/ports/postgres/modules/graph/wcc.py_in  | 50 +++++++++++++++++++----------
 src/ports/postgres/modules/graph/wcc.sql_in |  5 +++
 2 files changed, 38 insertions(+), 17 deletions(-)

diff --git a/src/ports/postgres/modules/graph/wcc.py_in b/src/ports/postgres/modules/graph/wcc.py_in
index de1af27..4b4b05d 100644
--- a/src/ports/postgres/modules/graph/wcc.py_in
+++ b/src/ports/postgres/modules/graph/wcc.py_in
@@ -108,6 +108,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
     newupdate = unique_string(desp='newupdate')
     toupdate = unique_string(desp='toupdate')
     temp_out_table = unique_string(desp='tempout')
+    edge_inverse = unique_string(desp='edge_inverse')
 
     distribution = '' if is_platform_pg() else \
         "DISTRIBUTED BY ({0})".format(vertex_id)
@@ -117,12 +118,23 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
     old_new_update_where_condition = ''
     new_to_update_where_condition = ''
     edge_to_update_where_condition = ''
+    edge_inverse_to_update_where_condition = ''
 
     INT_MAX = 2147483647
     component_id = 'component_id'
     grouping_cols_comma = '' if not grouping_cols else grouping_cols + ','
     comma_grouping_cols = '' if not grouping_cols else ',' + grouping_cols
 
+    if not is_platform_pg():
+        # In Greenplum, to avoid redistribution of data when in later queries,
+        # edge_table is duplicated by creating a temporary table distributed
+        # on dest column
+        plpy.execute(""" CREATE TEMP TABLE {edge_inverse} AS
+                            SELECT * FROM {edge_table} DISTRIBUTED BY ({dest})
+                     """.format(**locals()))
+    else:
+        edge_inverse = edge_table
+
     if grouping_cols:
         distribution = ('' if is_platform_pg() else
                         "DISTRIBUTED BY ({0}, {1})".format(grouping_cols,
@@ -146,6 +158,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
             _check_groups(newupdate, toupdate, grouping_cols_list)
         edge_to_update_where_condition = ' AND ' + \
             _check_groups(edge_table, toupdate, grouping_cols_list)
+        edge_inverse_to_update_where_condition = ' AND ' + \
+            _check_groups(edge_inverse, toupdate, grouping_cols_list)
         join_grouping_cols = _check_groups(subq, distinct_grp_table, grouping_cols_list)
         group_by_clause_newupdate = ('' if not grouping_cols else
                                      '{0}, {1}.{2}'.format(subq_prefixed_grouping_cols,
@@ -160,7 +174,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
                     FROM {edge_table}
                     UNION
                     SELECT {select_grouping_cols_clause} {dest} AS {vertex_id}
-                    FROM {edge_table}
+                    FROM {edge_inverse}
                 ) {subq}
                 ON {join_grouping_cols}
                 GROUP BY {group_by_clause_newupdate}
@@ -201,7 +215,6 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
         # updated in the next table. At every iteration update only those nodes
         # whose component_id in the previous iteration are greater than what was
         # found in the current iteration.
-
         plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
         plpy.execute("""
             CREATE TEMP TABLE {oldupdate} AS
@@ -215,7 +228,6 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
                    ', {0}'.format(grouping_cols),
                    group_by_clause=grouping_cols_comma,
                    **locals()))
-
         plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate))
         plpy.execute("""
             CREATE TEMP TABLE {toupdate} AS
@@ -240,27 +252,28 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
         plpy.execute("DROP TABLE IF EXISTS {0}".format(message))
         plpy.execute("""
             CREATE TEMP TABLE {message} AS
-            SELECT {vertex_id}, MIN({component_id}) AS {component_id}
-                    {select_grouping_cols}
-            FROM (
-                SELECT {edge_table}.{src} AS {vertex_id},
-                    {toupdate}.{component_id}
+                SELECT {edge_inverse}.{src} AS {vertex_id},
+                    MIN({toupdate}.{component_id}) AS {component_id}
                     {comma_toupdate_prefixed_grouping_cols}
-                FROM {toupdate}, {edge_table}
-                WHERE {edge_table}.{dest} = {toupdate}.{vertex_id}
-                    {edge_to_update_where_condition}
-                UNION ALL
+                FROM {toupdate}, {edge_inverse}
+                WHERE {edge_inverse}.{dest} = {toupdate}.{vertex_id}
+                    {edge_inverse_to_update_where_condition}
+                GROUP BY {edge_inverse}.{src} {comma_toupdate_prefixed_grouping_cols}
+        """.format(select_grouping_cols='' if not grouping_cols
+                        else ', {0}'.format(grouping_cols),
+                   **locals()))
+
+        plpy.execute("""
+            INSERT INTO {message}
                 SELECT {edge_table}.{dest} AS {vertex_id},
-                    {toupdate}.{component_id}
+                    MIN({toupdate}.{component_id}) AS {component_id}
                     {comma_toupdate_prefixed_grouping_cols}
                 FROM {toupdate}, {edge_table}
                 WHERE {edge_table}.{src} = {toupdate}.{vertex_id}
                     {edge_to_update_where_condition}
-            ) AS t
-            GROUP BY {group_by_clause} {vertex_id}
+                GROUP BY {edge_table}.{dest} {comma_toupdate_prefixed_grouping_cols}
         """.format(select_grouping_cols='' if not grouping_cols
-                   else ', {0}'.format(grouping_cols), group_by_clause=''
-                   if not grouping_cols else ' {0}, '.format(grouping_cols),
+                        else ', {0}'.format(grouping_cols),
                    **locals()))
 
         plpy.execute("DROP TABLE {0}".format(oldupdate))
@@ -278,6 +291,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
                                 SELECT COUNT(*) AS cnt FROM {toupdate}
                             """.format(**locals()))[0]["cnt"]
 
+    if not is_platform_pg():
+        # Drop intermediate table created for Greenplum
+        plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_inverse))
     plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(newupdate, out_table))
     # Create summary table. We only need the vertex_id and grouping columns
     # in it.
diff --git a/src/ports/postgres/modules/graph/wcc.sql_in b/src/ports/postgres/modules/graph/wcc.sql_in
index f5879b9..1c3808b 100644
--- a/src/ports/postgres/modules/graph/wcc.sql_in
+++ b/src/ports/postgres/modules/graph/wcc.sql_in
@@ -115,6 +115,11 @@ weakly connected components are generated for all data
 
 </dl>
 
+@note On Greenplum cluster, the edge table should be distributed on the src
+column for better performance. In addition, the user should note that this
+function creates a duplicate of the edge table (on Greenplum cluster) for
+better performance.
+
 @anchor rlcc
 @par Retrieve Largest Connected Component
 


[madlib] 02/02: Madpack: Fix IC/DC check to include wcc

Posted by ok...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

okislal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git

commit 50d8b5942fb872f44457a266dac22683756275c0
Author: Ekta Khanna <ek...@pivotal.io>
AuthorDate: Wed Apr 10 15:28:30 2019 -0700

    Madpack: Fix IC/DC check to include wcc
    
    Closes #364
    
    Co-authored-by: Orhan Kislal <ok...@apache.org>
---
 src/madpack/madpack.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/madpack/madpack.py b/src/madpack/madpack.py
index 6c191ca..d5eb3da 100755
--- a/src/madpack/madpack.py
+++ b/src/madpack/madpack.py
@@ -666,7 +666,7 @@ def _process_py_sql_files_in_modules(modset, args_dict):
             if madpack_cmd == 'install-check':
                 mask = maddir_mod_sql + '/' + module + '/test/*.ic.sql_in'
             else:
-                mask = maddir_mod_sql + '/' + module + '/test/*[!ic].sql_in'
+                mask = maddir_mod_sql + '/' + module + '/test/*.sql_in'
         elif calling_operation == UNIT_TEST:
             mask = maddir_mod_py + '/' + module + '/test/unit_tests/test_*.py'
         else:
@@ -674,6 +674,8 @@ def _process_py_sql_files_in_modules(modset, args_dict):
 
         # Loop through all SQL files for this module
         source_files = glob.glob(mask)
+        if calling_operation == INSTALL_DEV_CHECK and madpack_cmd != 'install-check':
+            source_files = [s for s in source_files if '.ic' not in s]
 
         # Do this error check only when running install/reinstall/upgrade
         if calling_operation == DB_CREATE_OBJECTS and not source_files: