You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ok...@apache.org on 2019/04/10 22:29:37 UTC
[madlib] branch master updated (57b6e50 -> 50d8b59)
This is an automated email from the ASF dual-hosted git repository.
okislal pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git.
from 57b6e50 DL: Simplify madlib_keras_predict interface
new 5cee52e WCC: Duplicate edges to improve performance on Greenplum
new 50d8b59 Madpack: Fix IC/DC check to include wcc
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
src/madpack/madpack.py | 4 ++-
src/ports/postgres/modules/graph/wcc.py_in | 50 +++++++++++++++++++----------
src/ports/postgres/modules/graph/wcc.sql_in | 5 +++
3 files changed, 41 insertions(+), 18 deletions(-)
[madlib] 01/02: WCC: Duplicate edges to improve performance on
Greenplum
Posted by ok...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
okislal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git
commit 5cee52e5bc1b980ba63eb92c6178118473421512
Author: Ekta Khanna <ek...@pivotal.io>
AuthorDate: Wed Apr 10 15:27:55 2019 -0700
WCC: Duplicate edges to improve performance on Greenplum
JIRA: MADLIB-1320
Prior to this update, we used the edge table in join clauses with both
src and dest coulmns.
This commit updates the query on the message table such that the JOIN
always operates on the column that the table is distributed on. To this
end, we duplicate the edge table to distribute it on the dest column.
This seperation of accesses reduces the redistribute motion on gpdb.
In addition, we split the union all query to ensure that the working set
is limited with very large graphs.
Closes #364
Co-authored-by: Orhan Kislal <ok...@apache.org>
---
src/ports/postgres/modules/graph/wcc.py_in | 50 +++++++++++++++++++----------
src/ports/postgres/modules/graph/wcc.sql_in | 5 +++
2 files changed, 38 insertions(+), 17 deletions(-)
diff --git a/src/ports/postgres/modules/graph/wcc.py_in b/src/ports/postgres/modules/graph/wcc.py_in
index de1af27..4b4b05d 100644
--- a/src/ports/postgres/modules/graph/wcc.py_in
+++ b/src/ports/postgres/modules/graph/wcc.py_in
@@ -108,6 +108,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
newupdate = unique_string(desp='newupdate')
toupdate = unique_string(desp='toupdate')
temp_out_table = unique_string(desp='tempout')
+ edge_inverse = unique_string(desp='edge_inverse')
distribution = '' if is_platform_pg() else \
"DISTRIBUTED BY ({0})".format(vertex_id)
@@ -117,12 +118,23 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
old_new_update_where_condition = ''
new_to_update_where_condition = ''
edge_to_update_where_condition = ''
+ edge_inverse_to_update_where_condition = ''
INT_MAX = 2147483647
component_id = 'component_id'
grouping_cols_comma = '' if not grouping_cols else grouping_cols + ','
comma_grouping_cols = '' if not grouping_cols else ',' + grouping_cols
+ if not is_platform_pg():
+ # In Greenplum, to avoid redistribution of data when in later queries,
+ # edge_table is duplicated by creating a temporary table distributed
+ # on dest column
+ plpy.execute(""" CREATE TEMP TABLE {edge_inverse} AS
+ SELECT * FROM {edge_table} DISTRIBUTED BY ({dest})
+ """.format(**locals()))
+ else:
+ edge_inverse = edge_table
+
if grouping_cols:
distribution = ('' if is_platform_pg() else
"DISTRIBUTED BY ({0}, {1})".format(grouping_cols,
@@ -146,6 +158,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
_check_groups(newupdate, toupdate, grouping_cols_list)
edge_to_update_where_condition = ' AND ' + \
_check_groups(edge_table, toupdate, grouping_cols_list)
+ edge_inverse_to_update_where_condition = ' AND ' + \
+ _check_groups(edge_inverse, toupdate, grouping_cols_list)
join_grouping_cols = _check_groups(subq, distinct_grp_table, grouping_cols_list)
group_by_clause_newupdate = ('' if not grouping_cols else
'{0}, {1}.{2}'.format(subq_prefixed_grouping_cols,
@@ -160,7 +174,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
FROM {edge_table}
UNION
SELECT {select_grouping_cols_clause} {dest} AS {vertex_id}
- FROM {edge_table}
+ FROM {edge_inverse}
) {subq}
ON {join_grouping_cols}
GROUP BY {group_by_clause_newupdate}
@@ -201,7 +215,6 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
# updated in the next table. At every iteration update only those nodes
# whose component_id in the previous iteration are greater than what was
# found in the current iteration.
-
plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
plpy.execute("""
CREATE TEMP TABLE {oldupdate} AS
@@ -215,7 +228,6 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
', {0}'.format(grouping_cols),
group_by_clause=grouping_cols_comma,
**locals()))
-
plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate))
plpy.execute("""
CREATE TEMP TABLE {toupdate} AS
@@ -240,27 +252,28 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
plpy.execute("DROP TABLE IF EXISTS {0}".format(message))
plpy.execute("""
CREATE TEMP TABLE {message} AS
- SELECT {vertex_id}, MIN({component_id}) AS {component_id}
- {select_grouping_cols}
- FROM (
- SELECT {edge_table}.{src} AS {vertex_id},
- {toupdate}.{component_id}
+ SELECT {edge_inverse}.{src} AS {vertex_id},
+ MIN({toupdate}.{component_id}) AS {component_id}
{comma_toupdate_prefixed_grouping_cols}
- FROM {toupdate}, {edge_table}
- WHERE {edge_table}.{dest} = {toupdate}.{vertex_id}
- {edge_to_update_where_condition}
- UNION ALL
+ FROM {toupdate}, {edge_inverse}
+ WHERE {edge_inverse}.{dest} = {toupdate}.{vertex_id}
+ {edge_inverse_to_update_where_condition}
+ GROUP BY {edge_inverse}.{src} {comma_toupdate_prefixed_grouping_cols}
+ """.format(select_grouping_cols='' if not grouping_cols
+ else ', {0}'.format(grouping_cols),
+ **locals()))
+
+ plpy.execute("""
+ INSERT INTO {message}
SELECT {edge_table}.{dest} AS {vertex_id},
- {toupdate}.{component_id}
+ MIN({toupdate}.{component_id}) AS {component_id}
{comma_toupdate_prefixed_grouping_cols}
FROM {toupdate}, {edge_table}
WHERE {edge_table}.{src} = {toupdate}.{vertex_id}
{edge_to_update_where_condition}
- ) AS t
- GROUP BY {group_by_clause} {vertex_id}
+ GROUP BY {edge_table}.{dest} {comma_toupdate_prefixed_grouping_cols}
""".format(select_grouping_cols='' if not grouping_cols
- else ', {0}'.format(grouping_cols), group_by_clause=''
- if not grouping_cols else ' {0}, '.format(grouping_cols),
+ else ', {0}'.format(grouping_cols),
**locals()))
plpy.execute("DROP TABLE {0}".format(oldupdate))
@@ -278,6 +291,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
SELECT COUNT(*) AS cnt FROM {toupdate}
""".format(**locals()))[0]["cnt"]
+ if not is_platform_pg():
+ # Drop intermediate table created for Greenplum
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_inverse))
plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(newupdate, out_table))
# Create summary table. We only need the vertex_id and grouping columns
# in it.
diff --git a/src/ports/postgres/modules/graph/wcc.sql_in b/src/ports/postgres/modules/graph/wcc.sql_in
index f5879b9..1c3808b 100644
--- a/src/ports/postgres/modules/graph/wcc.sql_in
+++ b/src/ports/postgres/modules/graph/wcc.sql_in
@@ -115,6 +115,11 @@ weakly connected components are generated for all data
</dl>
+@note On Greenplum cluster, the edge table should be distributed on the src
+column for better performance. In addition, the user should note that this
+function creates a duplicate of the edge table (on Greenplum cluster) for
+better performance.
+
@anchor rlcc
@par Retrieve Largest Connected Component
[madlib] 02/02: Madpack: Fix IC/DC check to include wcc
Posted by ok...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
okislal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git
commit 50d8b5942fb872f44457a266dac22683756275c0
Author: Ekta Khanna <ek...@pivotal.io>
AuthorDate: Wed Apr 10 15:28:30 2019 -0700
Madpack: Fix IC/DC check to include wcc
Closes #364
Co-authored-by: Orhan Kislal <ok...@apache.org>
---
src/madpack/madpack.py | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/src/madpack/madpack.py b/src/madpack/madpack.py
index 6c191ca..d5eb3da 100755
--- a/src/madpack/madpack.py
+++ b/src/madpack/madpack.py
@@ -666,7 +666,7 @@ def _process_py_sql_files_in_modules(modset, args_dict):
if madpack_cmd == 'install-check':
mask = maddir_mod_sql + '/' + module + '/test/*.ic.sql_in'
else:
- mask = maddir_mod_sql + '/' + module + '/test/*[!ic].sql_in'
+ mask = maddir_mod_sql + '/' + module + '/test/*.sql_in'
elif calling_operation == UNIT_TEST:
mask = maddir_mod_py + '/' + module + '/test/unit_tests/test_*.py'
else:
@@ -674,6 +674,8 @@ def _process_py_sql_files_in_modules(modset, args_dict):
# Loop through all SQL files for this module
source_files = glob.glob(mask)
+ if calling_operation == INSTALL_DEV_CHECK and madpack_cmd != 'install-check':
+ source_files = [s for s in source_files if '.ic' not in s]
# Do this error check only when running install/reinstall/upgrade
if calling_operation == DB_CREATE_OBJECTS and not source_files: