You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ok...@apache.org on 2017/08/29 20:41:56 UTC
[19/50] [abbrv] incubator-madlib git commit: Graph: Update Python
code to follow PEP-8
Graph: Update Python code to follow PEP-8
- Changed indentation to use spaces instead of tabs
- Updated to PEP-8 guidelines
- Updated to follow style guide convention
- Refactored few functions to clean code and design
Closes #148
Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/d487df3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/d487df3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/d487df3c
Branch: refs/heads/latest_release
Commit: d487df3c46f984fad6c15b8a1d3e50dc30f6b6f6
Parents: 8c9b955
Author: Rahul Iyer <ri...@apache.org>
Authored: Fri Jul 7 22:23:18 2017 -0700
Committer: Rahul Iyer <ri...@apache.org>
Committed: Thu Jul 20 14:23:22 2017 -0700
----------------------------------------------------------------------
src/ports/postgres/modules/graph/apsp.py_in | 1294 +++++++++---------
.../postgres/modules/graph/graph_utils.py_in | 170 +--
src/ports/postgres/modules/graph/pagerank.py_in | 916 +++++++------
src/ports/postgres/modules/graph/sssp.py_in | 1125 +++++++--------
src/ports/postgres/modules/graph/wcc.py_in | 119 +-
.../postgres/modules/utilities/utilities.py_in | 45 +-
6 files changed, 1842 insertions(+), 1827 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/apsp.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/apsp.py_in b/src/ports/postgres/modules/graph/apsp.py_in
index 1331301..ab8a566 100644
--- a/src/ports/postgres/modules/graph/apsp.py_in
+++ b/src/ports/postgres/modules/graph/apsp.py_in
@@ -38,17 +38,16 @@ from utilities.utilities import _assert
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string
from utilities.utilities import split_quoted_delimited_str
-from utilities.validate_args import get_cols
+from utilities.utilities import is_platform_pg, is_platform_hawq
from utilities.validate_args import table_exists
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_is_empty
from utilities.validate_args import get_expr_type
-m4_changequote(`<!', `!>')
def graph_apsp(schema_madlib, vertex_table, vertex_id, edge_table,
- edge_args, out_table, grouping_cols, **kwargs):
- """
+ edge_args, out_table, grouping_cols, **kwargs):
+ """
All Pairs shortest path function for graphs using the matrix
multiplication based algorithm [1].
[1] http://users.cecs.anu.edu.au/~Alistair.Rendell/Teaching/apac_comp3600/module4/all_pairs_shortest_paths.xhtml
@@ -57,655 +56,656 @@ def graph_apsp(schema_madlib, vertex_table, vertex_id, edge_table,
@param vertex_id Name of the column containing the vertex ids.
@param edge_table Name of the table that contains the edge data.
@param edge_args A comma-delimited string containing multiple
- named arguments of the form "name=value".
- @param out_table Name of the table to store the result of APSP.
+ named arguments of the form "name=value".
+ @param out_table Name of the table to store the result of APSP.
@param grouping_cols The list of grouping columns.
"""
+ with MinWarning("warning"):
+
+ INT_MAX = 2147483647
+ INFINITY = "'Infinity'"
+ EPSILON = 0.000001
+
+ params_types = {'src': str, 'dest': str, 'weight': str}
+ default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
+ edge_params = extract_keyvalue_params(edge_args, params_types, default_args)
+
+ # Prepare the input for recording in the summary table
+ if vertex_id is None:
+ v_st = "NULL"
+ vertex_id = "id"
+ else:
+ v_st = vertex_id
+ if edge_args is None:
+ e_st = "NULL"
+ else:
+ e_st = edge_args
+ if grouping_cols is None:
+ g_st = "NULL"
+ glist = None
+ else:
+ g_st = grouping_cols
+ glist = split_quoted_delimited_str(grouping_cols)
+
+ src = edge_params["src"]
+ dest = edge_params["dest"]
+ weight = edge_params["weight"]
+
+ distribution = '' if is_platform_pg() else "DISTRIBUTED BY ({0})".format(src)
+ is_hawq = is_platform_hawq()
+
+ _validate_apsp(vertex_table, vertex_id, edge_table,
+ edge_params, out_table, glist)
+
+ out_table_1 = unique_string(desp='out_table_1')
+ out_table_2 = unique_string(desp='out_table_2')
+ tmp_view = unique_string(desp='tmp_view')
+ v1 = unique_string(desp='v1')
+ v2 = unique_string(desp='v2')
+ message = unique_string(desp='message')
+
+ # Initialize grouping related variables
+ comma_grp = ""
+ comma_grp_e = ""
+ comma_grp_m = ""
+ grp_comma = ""
+ grp_v1_comma = ""
+ grp_o1_comma = ""
+ grp_o_comma = ""
+ checkg_eo = ""
+ checkg_eout = ""
+ checkg_ex = ""
+ checkg_om = ""
+ checkg_o1t_sub = ""
+ checkg_ot_sub = ""
+ checkg_ot = ""
+ checkg_o1t = ""
+ checkg_vv = ""
+ checkg_o2v = ""
+ checkg_oy = ""
+ checkg_vv_sub = "TRUE"
+ grp_by = ""
+
+ if grouping_cols is not None:
+
+ # We use actual table names in some cases and aliases in others
+ # In some cases, we swap the table names so use of an alias is
+ # necessary. In other cases, they are used to simplify debugging.
+
+ comma_grp = " , " + grouping_cols
+ comma_grp_e = " , " + _grp_from_table("edge", glist)
+ comma_grp_m = " , " + _grp_from_table(message, glist)
+ grp_comma = grouping_cols + " , "
+ grp_v1_comma = _grp_from_table("v1", glist) + " , "
+ grp_o1_comma = _grp_from_table(out_table_1, glist) + " , "
+ grp_o_comma = _grp_from_table("out", glist) + " , "
+
+ checkg_eo = " AND " + _check_groups(edge_table, out_table, glist)
+ checkg_eout = " AND " + _check_groups("edge", "out", glist)
+ checkg_ex = " AND " + _check_groups("edge", "x", glist)
+ checkg_om = " AND " + _check_groups(out_table, message, glist)
+ checkg_o1t_sub = _check_groups("out", tmp_view, glist)
+ checkg_ot_sub = _check_groups(out_table, tmp_view, glist)
+ checkg_ot = " AND " + _check_groups(out_table, tmp_view, glist)
+ checkg_o1t = " AND " + _check_groups("out", "t", glist)
+ checkg_vv = " AND " + _check_groups("v1", "v2", glist)
+ checkg_o2v = " AND " + _check_groups(out_table_2, "v2", glist)
+ checkg_oy = " AND " + _check_groups("out", "y", glist)
+ checkg_vv_sub = _check_groups("v1", "v2", glist)
+ grp_by = " GROUP BY " + grouping_cols
+
+ w_type = get_expr_type(weight, edge_table).lower()
+ init_w = INT_MAX
+ if w_type in ['real', 'double precision', 'float8']:
+ init_w = INFINITY
+
+ # We keep a summary table to keep track of the parameters used for this
+ # APSP run. This table is used in the path finding function to eliminate
+ # the need for repetition.
+ plpy.execute(""" CREATE TABLE {out_table}_summary (
+ vertex_table TEXT,
+ vertex_id TEXT,
+ edge_table TEXT,
+ edge_args TEXT,
+ out_table TEXT,
+ grouping_cols TEXT)
+ """.format(**locals()))
+ plpy.execute(""" INSERT INTO {out_table}_summary VALUES
+ ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
+ '{out_table}', '{g_st}') """.format(**locals()))
+
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+
+ # Find all of the vertices involved with a given group
+ plpy.execute(""" CREATE VIEW {tmp_view} AS
+ SELECT {src} AS {vertex_id} {comma_grp}
+ FROM {edge_table} WHERE {src} IS NOT NULL
+ UNION
+ SELECT {dest} AS {vertex_id} {comma_grp}
+ FROM {edge_table} WHERE {dest} IS NOT NULL
+ """.format(**locals()))
+
+ # Don't use the unnecessary rows of the output table during joins.
+ ot_sql = """ SELECT * FROM {out_table}
+ WHERE {weight} != {init_w} AND {src} != {dest} """
+
+ # HAWQ does not support UPDATE so the initialization has to differ.
+ if is_hawq:
+
+ plpy.execute(" DROP TABLE IF EXISTS {0},{1}".format(
+ out_table_1, out_table_2))
+ # Create 2 identical tables to swap at every iteration.
+ plpy.execute(""" CREATE TABLE {out_table_1} AS
+ SELECT {grp_comma} {src},{dest},{weight}, NULL::INT AS parent
+ FROM {edge_table} LIMIT 0 {distribution}
+ """.format(**locals()))
+ plpy.execute(""" CREATE TABLE {out_table_2} AS
+ SELECT * FROM {out_table_1} LIMIT 0 {distribution}
+ """.format(**locals()))
+
+ # The source can be reached with 0 cost and next is itself.
+ plpy.execute(""" INSERT INTO {out_table_2}
+ SELECT {grp_comma} {vertex_id} AS {src}, {vertex_id} AS {dest},
+ 0 AS {weight}, {vertex_id} AS parent
+ FROM {tmp_view} """.format(**locals()))
+ # Distance = 1: every edge means there is a path from src to dest
+ plpy.execute(""" INSERT INTO {out_table_2}
+ SELECT {grp_comma} {src}, {dest}, {weight}, {dest} AS parent
+ FROM {edge_table} """.format(**locals()))
+
+ # Fill the rest of the possible pairs with infinite initial weights
+ fill_sql = """ INSERT INTO {out_table_1}
+ SELECT {grp_v1_comma}
+ v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest},
+ {init_w} AS {weight}, NULL::INT AS parent
+ FROM {tmp_view} v1, {tmp_view} v2
+ WHERE NOT EXISTS
+ (SELECT 1 FROM {out_table_2}
+ WHERE v1.{vertex_id} = {src} AND
+ v2.{vertex_id} = {dest}
+ {checkg_vv} {checkg_o2v})
+ {checkg_vv}
+ UNION
+ SELECT * FROM {out_table_2}
+ """.format(**locals())
+ plpy.execute(fill_sql)
+
+ ot_sql1 = ot_sql.format(out_table=out_table_1, init_w=init_w,
+ weight=weight, src=src, dest=dest)
+ ot_sql2 = ot_sql.format(out_table=out_table_2, init_w=init_w,
+ weight=weight, src=src, dest=dest)
+
+ # PostgreSQL & GPDB initialization
+ else:
+
+ plpy.execute(""" CREATE TABLE {out_table} AS
+ (SELECT {grp_comma} {src}, {dest}, {weight},
+ {src} AS parent FROM {edge_table} LIMIT 0)
+ {distribution} """.format(**locals()))
+
+ plpy.execute(""" INSERT INTO {out_table}
+ SELECT {grp_v1_comma}
+ v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest},
+ {init_w} AS {weight}, NULL::INT AS parent
+ FROM
+ {tmp_view} AS v1 INNER JOIN
+ {tmp_view} AS v2 ON ({checkg_vv_sub})
+ """.format(**locals()))
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+
+ # GPDB and HAWQ have distributed by clauses to help them with indexing.
+ # For Postgres we add the indices manually.
+ if is_platform_pg():
+ sql_index = "CREATE INDEX ON {0}({1})".format(out_table, src)
+ else:
+ sql_index = ''
+
+ plpy.execute(sql_index)
+
+ # The source can be reached with 0 cost and next is itself.
+ plpy.execute(
+ """ UPDATE {out_table} SET
+ {weight} = 0, parent = {vertex_id}
+ FROM {vertex_table}
+ WHERE {out_table}.{src} = {vertex_id}
+ AND {out_table}.{dest} = {vertex_id}
+ """.format(**locals()))
+
+ # Distance = 1: every edge means there is a path from src to dest
+
+ # There may be multiple edges defined as a->b,
+ # we only need the minimum weighted one.
+
+ plpy.execute(
+ """ CREATE VIEW {tmp_view} AS
+ SELECT {grp_comma} {src}, {dest},
+ min({weight}) AS {weight}
+ FROM {edge_table}
+ GROUP BY {grp_comma} {src}, {dest}
+ """.format(**locals()))
+ plpy.execute(
+ """ UPDATE {out_table} SET
+ {weight} = {tmp_view}.{weight}, parent = {tmp_view}.{dest}
+ FROM {tmp_view}
+ WHERE {out_table}.{src} = {tmp_view}.{src}
+ AND {out_table}.{dest} = {tmp_view}.{dest}
+ AND {out_table}.{weight} > {tmp_view}.{weight} {checkg_ot}
+ """.format(**locals()))
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+
+ ot_sql1 = ot_sql.format(**locals())
+ out_table_1 = out_table
+
+ # Find the maximum number of iterations to try
+ # If not done by v_cnt iterations, there is a negative cycle.
+ v_cnt = plpy.execute(
+ """ SELECT max(count) as max FROM (
+ SELECT count(DISTINCT {src}) AS count
+ FROM {out_table_1}
+ {grp_by}) x
+ """.format(**locals()))[0]['max']
+
+ if v_cnt < 2:
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+ format(out_table, out_table + "_summary"))
+ if is_hawq:
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
+ out_table_1, out_table_2))
+ if grouping_cols:
+ plpy.error(("Graph APSP: {0} has less than 2 vertices in " +
+ "every group.").format(edge_table))
+ else:
+ plpy.error("Graph APSP: {0} has less than 2 vertices.".format(
+ vertex_table))
+
+ for i in range(0, v_cnt + 1):
+
+ """
+ Create a view that will be used to update the output table
+
+ The implementation is based on the matrix multiplication idea.
+ The initial output table consists of 3 sets of vertex pairs.
+ 1) for every vervex v: v -> v, weight 0, parent v
+ 2) for every edge v1,v2,w: v1 -> v2, weight w, parent v2
+ 3) for every other vertex pair v1,v2: v1 -> v2, weight 'Inf',
+ parent NULL
+
+ The algorithm "relaxes" the paths: finds alternate paths with less
+ weights
+ At every step, we look at every combination of non-infinite
+ existing paths and edges to see if we can relax a path.
+
+ Assume the graph is a chain: 1->2->3->...
+ The initial output table will have a finite weighted path for 1->2
+ and infinite for the rest. At ith iteration, the output table will
+ have 1->i path and will relax 1->i+1 path from infinite to a
+ finite value (weight of 1->i path + weight of i->i+1 edge) and
+ assign i as the parent.
+
+ Since using '=' with floats is dangerous we use an epsilon value
+ for comparison.
+ """
+
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+ update_sql = """ CREATE VIEW {tmp_view} AS
+ SELECT DISTINCT ON ({grp_o_comma} y.{src}, y.{dest})
+ {grp_o_comma} y.{src}, y.{dest},y.{weight}, y.parent
+ FROM {out_table_1} AS out
+ INNER JOIN
+ (SELECT x.{src}, x.{dest}, x.{weight},
+ out.{dest} as parent {comma_grp_e}
+ FROM
+ ({ot_sql1}) AS out
+ INNER JOIN
+ {edge_table} AS edge
+ ON (out.{dest} = edge.{src} {checkg_eout})
+ INNER JOIN
+ (SELECT out.{src}, edge.{dest},
+ min(out.{weight}+edge.{weight}) AS {weight}
+ {comma_grp_e}
+ FROM
+ ({ot_sql1}) AS out,
+ {edge_table} AS edge
+ WHERE out.{dest} = edge.{src} {checkg_eout}
+ GROUP BY out.{src},edge.{dest} {comma_grp_e}) x
+ ON (x.{src} = out.{src} AND x.{dest} = edge.{dest} {checkg_ex})
+ WHERE ABS(out.{weight}+edge.{weight} - x.{weight})
+ < {EPSILON}) y
+ ON (y.{src} = out.{src} AND y.{dest} = out.{dest} {checkg_oy})
+ WHERE y.{weight} < out.{weight}
+ """.format(**locals())
+ plpy.execute(update_sql)
+
+ # HAWQ employs alternating tables and has to be handled separately
+ if is_hawq:
+
+ # Stop if therea re no more updates
+ if table_is_empty(tmp_view):
+
+ plpy.execute("DROP VIEW {0}".format(tmp_view))
+ plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(
+ out_table_1, out_table))
+ break
+
+ # The new output table will still have the old values for
+ # every vertex pair that does not appear on the update view.
+
+ plpy.execute("TRUNCATE TABLE {0}".format(out_table_2))
+ fill_sql = """ INSERT INTO {out_table_2}
+ SELECT * FROM {out_table_1} AS out WHERE NOT EXISTS
+ (SELECT 1 FROM {tmp_view} AS t
+ WHERE t.{src} = out.{src} AND
+ t.{dest} = out.{dest} {checkg_o1t})
+ UNION
+ SELECT * FROM {tmp_view}""".format(**locals())
+ plpy.execute(fill_sql)
+
+ # Swap the table names and the sql command we use for filtering
+ tmpname = out_table_1
+ out_table_1 = out_table_2
+ out_table_2 = tmpname
+
+ tmpname = ot_sql1
+ ot_sql1 = ot_sql2
+ ot_sql2 = tmpname
+
+ else:
+
+ updates = plpy.execute(
+ """ UPDATE {out_table}
+ SET {weight} = {tmp_view}.{weight},
+ parent = {tmp_view}.parent
+ FROM {tmp_view}
+ WHERE {tmp_view}.{src} = {out_table}.{src} AND
+ {tmp_view}.{dest} = {out_table}.{dest} {checkg_ot}
+ """.format(**locals()))
+ if updates.nrows() == 0:
+ break
+
+ # The algorithm should have reached a break command by this point.
+ # This check handles the existence of a negative cycle.
+ if i == v_cnt:
+
+ # If there are no groups, clean up and give error.
+ if grouping_cols is None:
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+ format(out_table, out_table + "_summary"))
+ if is_hawq:
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
+ out_table_1, out_table_2))
+ plpy.error("Graph APSP: Detected a negative cycle in the graph.")
+
+ # It is possible that not all groups has negative cycles.
+ else:
+ # negs is the string created by collating grouping columns.
+ # By looking at the update view, we can see which groups
+ # are in a negative cycle.
+
+ negs = plpy.execute(
+ """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
+ FROM {tmp_view}
+ """.format(**locals()))[0]['grp']
+
+ # Delete the groups with negative cycles from the output table.
+
+ # HAWQ doesn't support DELETE so we have to copy the valid results.
+ if is_hawq:
+ sql_del = """ CREATE TABLE {out_table} AS
+ SELECT *
+ FROM {out_table_1} AS out
+ WHERE NOT EXISTS(
+ SELECT 1
+ FROM {tmp_view}
+ WHERE {checkg_o1t_sub}
+ );"""
+ plpy.execute(sql_del.format(**locals()))
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
+ out_table_1, out_table_2))
+ else:
+ sql_del = """ DELETE FROM {out_table}
+ USING {tmp_view}
+ WHERE {checkg_ot_sub}"""
+ plpy.execute(sql_del.format(**locals()))
+
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+
+ # If every group has a negative cycle,
+ # drop the output table as well.
+ if table_is_empty(out_table):
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+ format(out_table, out_table + "_summary"))
+ plpy.warning(
+ """Graph APSP: Detected a negative cycle in the """ +
+ """sub-graphs of following groups: {0}.""".
+ format(str(negs)[1:-1]))
+
+ else:
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+ if is_hawq:
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(out_table_2))
+
+ return None
- with MinWarning("warning"):
-
- INT_MAX = 2147483647
- INFINITY = "'Infinity'"
- EPSILON = 0.000001
-
- params_types = {'src': str, 'dest': str, 'weight': str}
- default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
- edge_params = extract_keyvalue_params(edge_args,
- params_types,
- default_args)
-
- # Prepare the input for recording in the summary table
- if vertex_id is None:
- v_st= "NULL"
- vertex_id = "id"
- else:
- v_st = vertex_id
- if edge_args is None:
- e_st = "NULL"
- else:
- e_st = edge_args
- if grouping_cols is None:
- g_st = "NULL"
- glist = None
- else:
- g_st = grouping_cols
- glist = split_quoted_delimited_str(grouping_cols)
-
- src = edge_params["src"]
- dest = edge_params["dest"]
- weight = edge_params["weight"]
-
- distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
- <!"DISTRIBUTED BY ({0})".format(src)!>)
-
- is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
-
- _validate_apsp(vertex_table, vertex_id, edge_table,
- edge_params, out_table, glist)
-
- out_table_1 = unique_string(desp='out_table_1')
- out_table_2 = unique_string(desp='out_table_2')
- tmp_view = unique_string(desp='tmp_view')
- v1 = unique_string(desp='v1')
- v2 = unique_string(desp='v2')
- message = unique_string(desp='message')
-
- # Initialize grouping related variables
- comma_grp = ""
- comma_grp_e = ""
- comma_grp_m = ""
- grp_comma = ""
- grp_v1_comma = ""
- grp_o1_comma = ""
- grp_o_comma = ""
- checkg_eo = ""
- checkg_eout = ""
- checkg_ex = ""
- checkg_om = ""
- checkg_o1t_sub = ""
- checkg_ot_sub = ""
- checkg_ot = ""
- checkg_o1t = ""
- checkg_vv = ""
- checkg_o2v = ""
- checkg_oy = ""
- checkg_vv_sub = "TRUE"
- grp_by = ""
-
- if grouping_cols is not None:
-
- # We use actual table names in some cases and aliases in others
- # In some cases, we swap the table names so use of an alias is
- # necessary. In other cases, they are used to simplify debugging.
-
- comma_grp = " , " + grouping_cols
- comma_grp_e = " , " + _grp_from_table("edge",glist)
- comma_grp_m = " , " + _grp_from_table(message,glist)
- grp_comma = grouping_cols + " , "
- grp_v1_comma = _grp_from_table("v1",glist) + " , "
- grp_o1_comma = _grp_from_table(out_table_1,glist) + " , "
- grp_o_comma = _grp_from_table("out",glist) + " , "
-
- checkg_eo = " AND " + _check_groups(edge_table,out_table,glist)
- checkg_eout = " AND " + _check_groups("edge","out",glist)
- checkg_ex = " AND " + _check_groups("edge","x",glist)
- checkg_om = " AND " + _check_groups(out_table,message,glist)
- checkg_o1t_sub = _check_groups("out",tmp_view,glist)
- checkg_ot_sub = _check_groups(out_table,tmp_view,glist)
- checkg_ot = " AND " + _check_groups(out_table,tmp_view,glist)
- checkg_o1t = " AND " + _check_groups("out","t",glist)
- checkg_vv = " AND " + _check_groups("v1","v2",glist)
- checkg_o2v = " AND " + _check_groups(out_table_2,"v2",glist)
- checkg_oy = " AND " + _check_groups("out","y",glist)
- checkg_vv_sub = _check_groups("v1","v2",glist)
- grp_by = " GROUP BY " + grouping_cols
-
- w_type = get_expr_type(weight,edge_table).lower()
- init_w = INT_MAX
- if w_type in ['real','double precision','float8']:
- init_w = INFINITY
-
- # We keep a summary table to keep track of the parameters used for this
- # APSP run. This table is used in the path finding function to eliminate
- # the need for repetition.
- plpy.execute( """ CREATE TABLE {out_table}_summary (
- vertex_table TEXT,
- vertex_id TEXT,
- edge_table TEXT,
- edge_args TEXT,
- out_table TEXT,
- grouping_cols TEXT)
- """.format(**locals()))
- plpy.execute( """ INSERT INTO {out_table}_summary VALUES
- ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
- '{out_table}', '{g_st}') """.format(**locals()))
-
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-
- # Find all of the vertices involved with a given group
- plpy.execute(""" CREATE VIEW {tmp_view} AS
- SELECT {src} AS {vertex_id} {comma_grp}
- FROM {edge_table} WHERE {src} IS NOT NULL
- UNION
- SELECT {dest} AS {vertex_id} {comma_grp}
- FROM {edge_table} WHERE {dest} IS NOT NULL
- """.format(**locals()))
-
- # Don't use the unnecessary rows of the output table during joins.
- ot_sql = """ SELECT * FROM {out_table}
- WHERE {weight} != {init_w} AND {src} != {dest} """
-
- # HAWQ does not support UPDATE so the initialization has to differ.
- if is_hawq:
-
- plpy.execute(" DROP TABLE IF EXISTS {0},{1}".format(
- out_table_1,out_table_2))
- # Create 2 identical tables to swap at every iteration.
- plpy.execute(""" CREATE TABLE {out_table_1} AS
- SELECT {grp_comma} {src},{dest},{weight}, NULL::INT AS parent
- FROM {edge_table} LIMIT 0 {distribution}
- """.format(**locals()))
- plpy.execute(""" CREATE TABLE {out_table_2} AS
- SELECT * FROM {out_table_1} LIMIT 0 {distribution}
- """.format(**locals()))
-
- # The source can be reached with 0 cost and next is itself.
- plpy.execute(""" INSERT INTO {out_table_2}
- SELECT {grp_comma} {vertex_id} AS {src}, {vertex_id} AS {dest},
- 0 AS {weight}, {vertex_id} AS parent
- FROM {tmp_view} """.format(**locals()))
- # Distance = 1: every edge means there is a path from src to dest
- plpy.execute(""" INSERT INTO {out_table_2}
- SELECT {grp_comma} {src}, {dest}, {weight}, {dest} AS parent
- FROM {edge_table} """.format(**locals()))
-
- # Fill the rest of the possible pairs with infinite initial weights
- fill_sql = """ INSERT INTO {out_table_1}
- SELECT {grp_v1_comma}
- v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest},
- {init_w} AS {weight}, NULL::INT AS parent
- FROM {tmp_view} v1, {tmp_view} v2
- WHERE NOT EXISTS
- (SELECT 1 FROM {out_table_2}
- WHERE v1.{vertex_id} = {src} AND
- v2.{vertex_id} = {dest}
- {checkg_vv} {checkg_o2v})
- {checkg_vv}
- UNION
- SELECT * FROM {out_table_2}
- """.format(**locals())
- plpy.execute(fill_sql)
-
- ot_sql1 = ot_sql.format(out_table = out_table_1, init_w = init_w,
- weight = weight, src = src, dest = dest)
- ot_sql2 = ot_sql.format(out_table = out_table_2, init_w = init_w,
- weight = weight, src = src, dest = dest)
-
- # PostgreSQL & GPDB initialization
- else:
-
- plpy.execute( """ CREATE TABLE {out_table} AS
- (SELECT {grp_comma} {src}, {dest}, {weight},
- {src} AS parent FROM {edge_table} LIMIT 0)
- {distribution} """.format(**locals()))
-
- plpy.execute( """ INSERT INTO {out_table}
- SELECT {grp_v1_comma}
- v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest},
- {init_w} AS {weight}, NULL::INT AS parent
- FROM
- {tmp_view} AS v1 INNER JOIN
- {tmp_view} AS v2 ON ({checkg_vv_sub})
- """.format(**locals()))
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-
- # GPDB and HAWQ have distributed by clauses to help them with indexing.
- # For Postgres we add the indices manually.
- sql_index = m4_ifdef(<!__POSTGRESQL__!>,
- <!""" CREATE INDEX ON {out_table} ({src});
- """.format(**locals())!>, <!''!>)
- plpy.execute(sql_index)
-
- # The source can be reached with 0 cost and next is itself.
- plpy.execute(
- """ UPDATE {out_table} SET
- {weight} = 0, parent = {vertex_id}
- FROM {vertex_table}
- WHERE {out_table}.{src} = {vertex_id}
- AND {out_table}.{dest} = {vertex_id}
- """.format(**locals()))
-
- # Distance = 1: every edge means there is a path from src to dest
-
- # There may be multiple edges defined as a->b,
- # we only need the minimum weighted one.
-
- plpy.execute(
- """ CREATE VIEW {tmp_view} AS
- SELECT {grp_comma} {src}, {dest},
- min({weight}) AS {weight}
- FROM {edge_table}
- GROUP BY {grp_comma} {src}, {dest}
- """.format(**locals()))
- plpy.execute(
- """ UPDATE {out_table} SET
- {weight} = {tmp_view}.{weight}, parent = {tmp_view}.{dest}
- FROM {tmp_view}
- WHERE {out_table}.{src} = {tmp_view}.{src}
- AND {out_table}.{dest} = {tmp_view}.{dest}
- AND {out_table}.{weight} > {tmp_view}.{weight} {checkg_ot}
- """.format(**locals()))
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-
- ot_sql1 = ot_sql.format(**locals())
- out_table_1 = out_table
-
- # Find the maximum number of iterations to try
- # If not done by v_cnt iterations, there is a negative cycle.
- v_cnt = plpy.execute(
- """ SELECT max(count) as max FROM (
- SELECT count(DISTINCT {src}) AS count
- FROM {out_table_1}
- {grp_by}) x
- """.format(**locals()))[0]['max']
-
- if v_cnt < 2:
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
- plpy.execute("DROP TABLE IF EXISTS {0},{1}".
- format(out_table, out_table+"_summary"))
- if is_hawq:
- plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
- out_table_1,out_table_2))
- if grouping_cols:
- plpy.error(("Graph APSP: {0} has less than 2 vertices in "+
- "every group.").format(edge_table))
- else:
- plpy.error("Graph APSP: {0} has less than 2 vertices.".format(
- vertex_table))
-
- for i in range(0,v_cnt+1):
-
- """
- Create a view that will be used to update the output table
-
- The implementation is based on the matrix multiplication idea.
- The initial output table consists of 3 sets of vertex pairs.
- 1) for every vervex v: v -> v, weight 0, parent v
- 2) for every edge v1,v2,w: v1 -> v2, weight w, parent v2
- 3) for every other vertex pair v1,v2: v1 -> v2, weight 'Inf',
- parent NULL
-
- The algorithm "relaxes" the paths: finds alternate paths with less
- weights
- At every step, we look at every combination of non-infinite
- existing paths and edges to see if we can relax a path.
-
- Assume the graph is a chain: 1->2->3->...
- The initial output table will have a finite weighted path for 1->2
- and infinite for the rest. At ith iteration, the output table will
- have 1->i path and will relax 1->i+1 path from infinite to a
- finite value (weight of 1->i path + weight of i->i+1 edge) and
- assign i as the parent.
-
- Since using '=' with floats is dangerous we use an epsilon value
- for comparison.
- """
-
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
- update_sql = """ CREATE VIEW {tmp_view} AS
- SELECT DISTINCT ON ({grp_o_comma} y.{src}, y.{dest})
- {grp_o_comma} y.{src}, y.{dest},y.{weight}, y.parent
- FROM {out_table_1} AS out
- INNER JOIN
- (SELECT x.{src}, x.{dest}, x.{weight},
- out.{dest} as parent {comma_grp_e}
- FROM
- ({ot_sql1}) AS out
- INNER JOIN
- {edge_table} AS edge
- ON (out.{dest} = edge.{src} {checkg_eout})
- INNER JOIN
- (SELECT out.{src}, edge.{dest},
- min(out.{weight}+edge.{weight}) AS {weight}
- {comma_grp_e}
- FROM
- ({ot_sql1}) AS out,
- {edge_table} AS edge
- WHERE out.{dest} = edge.{src} {checkg_eout}
- GROUP BY out.{src},edge.{dest} {comma_grp_e}) x
- ON (x.{src} = out.{src} AND x.{dest} = edge.{dest} {checkg_ex})
- WHERE ABS(out.{weight}+edge.{weight} - x.{weight})
- < {EPSILON}) y
- ON (y.{src} = out.{src} AND y.{dest} = out.{dest} {checkg_oy})
- WHERE y.{weight} < out.{weight}
- """.format(**locals())
- plpy.execute(update_sql)
-
- # HAWQ employs alternating tables and has to be handled separately
- if is_hawq:
-
- # Stop if therea re no more updates
- if table_is_empty(tmp_view):
-
- plpy.execute("DROP VIEW {0}".format(tmp_view))
- plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(
- out_table_1,out_table))
- break
-
- # The new output table will still have the old values for
- # every vertex pair that does not appear on the update view.
-
- plpy.execute("TRUNCATE TABLE {0}".format(out_table_2))
- fill_sql = """ INSERT INTO {out_table_2}
- SELECT * FROM {out_table_1} AS out WHERE NOT EXISTS
- (SELECT 1 FROM {tmp_view} AS t
- WHERE t.{src} = out.{src} AND
- t.{dest} = out.{dest} {checkg_o1t})
- UNION
- SELECT * FROM {tmp_view}""".format(**locals())
- plpy.execute(fill_sql)
-
- # Swap the table names and the sql command we use for filtering
- tmpname = out_table_1
- out_table_1 = out_table_2
- out_table_2 = tmpname
-
- tmpname = ot_sql1
- ot_sql1 = ot_sql2
- ot_sql2 = tmpname
-
- else:
-
- updates = plpy.execute(
- """ UPDATE {out_table}
- SET {weight} = {tmp_view}.{weight},
- parent = {tmp_view}.parent
- FROM {tmp_view}
- WHERE {tmp_view}.{src} = {out_table}.{src} AND
- {tmp_view}.{dest} = {out_table}.{dest} {checkg_ot}
- """.format(**locals()))
- if updates.nrows() == 0:
- break
-
- # The algorithm should have reached a break command by this point.
- # This check handles the existence of a negative cycle.
- if i == v_cnt:
-
- # If there are no groups, clean up and give error.
- if grouping_cols is None:
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
- plpy.execute("DROP TABLE IF EXISTS {0},{1}".
- format(out_table, out_table+"_summary"))
- if is_hawq:
- plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
- out_table_1,out_table_2))
- plpy.error("Graph APSP: Detected a negative cycle in the graph.")
-
- # It is possible that not all groups has negative cycles.
- else:
- # negs is the string created by collating grouping columns.
- # By looking at the update view, we can see which groups
- # are in a negative cycle.
-
- negs = plpy.execute(
- """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
- FROM {tmp_view}
- """.format(**locals()))[0]['grp']
-
- # Delete the groups with negative cycles from the output table.
-
- # HAWQ doesn't support DELETE so we have to copy the valid results.
- if is_hawq:
- sql_del = """ CREATE TABLE {out_table} AS
- SELECT *
- FROM {out_table_1} AS out
- WHERE NOT EXISTS(
- SELECT 1
- FROM {tmp_view}
- WHERE {checkg_o1t_sub}
- );"""
- plpy.execute(sql_del.format(**locals()))
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
- plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
- out_table_1,out_table_2))
- else:
- sql_del = """ DELETE FROM {out_table}
- USING {tmp_view}
- WHERE {checkg_ot_sub}"""
- plpy.execute(sql_del.format(**locals()))
-
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-
- # If every group has a negative cycle,
- # drop the output table as well.
- if table_is_empty(out_table):
- plpy.execute("DROP TABLE IF EXISTS {0},{1}".
- format(out_table,out_table+"_summary"))
- plpy.warning(
- """Graph APSP: Detected a negative cycle in the """ +
- """sub-graphs of following groups: {0}.""".
- format(str(negs)[1:-1]))
-
- else:
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
- if is_hawq:
- plpy.execute("DROP TABLE IF EXISTS {0}".format(out_table_2))
-
- return None
def graph_apsp_get_path(schema_madlib, apsp_table,
- source_vertex, dest_vertex, path_table, **kwargs):
- """
- Helper function that can be used to get the shortest path between any 2
- vertices
+ source_vertex, dest_vertex, path_table, **kwargs):
+ """
+ Helper function that can be used to get the shortest path between any 2
+ vertices
Args:
- @param apsp_table Name of the table that contains the APSP
- output.
- @param source_vertex The vertex that will be the source of the
- desired path.
- @param dest_vertex The vertex that will be the destination of the
- desired path.
- """
-
- with MinWarning("warning"):
- _validate_get_path(apsp_table, source_vertex, dest_vertex, path_table)
-
- temp1_name = unique_string(desp='temp1')
- temp2_name = unique_string(desp='temp2')
-
- summary = plpy.execute("SELECT * FROM {0}_summary".format(apsp_table))
- vertex_id = summary[0]['vertex_id']
- edge_args = summary[0]['edge_args']
- grouping_cols = summary[0]['grouping_cols']
-
- params_types = {'src': str, 'dest': str, 'weight': str}
- default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
- edge_params = extract_keyvalue_params(edge_args,
- params_types,
- default_args)
-
- src = edge_params["src"]
- dest = edge_params["dest"]
- weight = edge_params["weight"]
-
- if vertex_id == "NULL":
- vertex_id = "id"
-
- if grouping_cols == "NULL":
- grouping_cols = None
-
- select_grps = ""
- check_grps_t1 = ""
- check_grps_t2 = ""
- grp_comma = ""
- tmp = ""
-
- if grouping_cols is not None:
- glist = split_quoted_delimited_str(grouping_cols)
- select_grps = _grp_from_table(apsp_table,glist) + " , "
- check_grps_t1 = " AND " + _check_groups(
- apsp_table,temp1_name,glist)
- check_grps_t2 = " AND " + _check_groups(
- apsp_table,temp2_name,glist)
-
- grp_comma = grouping_cols + " , "
-
- # If the source and destination is the same vertex.
- # There is no need to check the paths for any group.
- if source_vertex == dest_vertex:
- plpy.execute("""
- CREATE TABLE {path_table} AS
- SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
- FROM {apsp_table} WHERE {src} = {source_vertex} AND
- {dest} = {dest_vertex}
- """.format(**locals()))
- return
-
- plpy.execute( "DROP TABLE IF EXISTS {0},{1}".
- format(temp1_name,temp2_name));
-
- # Initialize the temporary tables
- out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
- SELECT {grp_comma} {apsp_table}.parent AS {vertex_id},
- ARRAY[{dest_vertex}] AS path
- FROM {apsp_table}
- WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
- AND {apsp_table}.parent IS NOT NULL
- """.format(**locals()))
-
- plpy.execute("""
- CREATE TEMP TABLE {temp2_name} AS
- SELECT * FROM {temp1_name} LIMIT 0
- """.format(**locals()))
-
- # Follow the 'parent' chain until you reach the case where the parent
- # is the same as destination. This means it is the last vertex before
- # the source.
- while out.nrows() > 0:
-
- plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
- # If the parent id is not the same as dest,
- # that means we have to follow the chain:
- # Add it to the path and move to its parent
- out = plpy.execute(
- """ INSERT INTO {temp2_name}
- SELECT {select_grps} {apsp_table}.parent AS {vertex_id},
- {apsp_table}.{dest} || {temp1_name}.path AS path
- FROM {apsp_table} INNER JOIN {temp1_name} ON
- ({apsp_table}.{dest} = {temp1_name}.{vertex_id}
- {check_grps_t1})
- WHERE {src} = {source_vertex} AND
- {apsp_table}.parent <> {apsp_table}.{dest}
- """.format(**locals()))
-
- tmp = temp2_name
- temp2_name = temp1_name
- temp1_name = tmp
-
- tmp = check_grps_t1
- check_grps_t1 = check_grps_t2
- check_grps_t2 = tmp
-
- # We have to consider 3 cases.
- # 1) The path has more than 2 vertices:
- # Add the current parent and the source vertex
- # 2) The path has exactly 2 vertices (an edge between src and dest is
- # the shortest path).
- # Add the source vertex
- # 3) The path has 0 vertices (unreachable)
- # Add an empty array.
-
- # Path with 1 vertex (src == dest) has been handled before
- plpy.execute("""
- CREATE TABLE {path_table} AS
- SELECT {grp_comma} {source_vertex} || ({vertex_id} || path) AS path
- FROM {temp2_name}
- WHERE {vertex_id} <> {dest_vertex}
- UNION
- SELECT {grp_comma} {source_vertex} || path AS path
- FROM {temp2_name}
- WHERE {vertex_id} = {dest_vertex}
- UNION
- SELECT {grp_comma} '{{}}'::INT[] AS path
- FROM {apsp_table}
- WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
- AND {apsp_table}.parent IS NULL
- """.format(**locals()))
-
- out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
-
- if out.nrows() == 0:
- plpy.error( ("Graph APSP: Vertex {0} and/or {1} is not present"+
- " in the APSP table {1}").format(
- source_vertex,dest_vertex,apsp_table))
-
- plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
- format(**locals()))
-
- return None
+ @param apsp_table Name of the table that contains the APSP
+ output.
+ @param source_vertex The vertex that will be the source of the
+ desired path.
+ @param dest_vertex The vertex that will be the destination of the
+ desired path.
+ """
+
+ with MinWarning("warning"):
+ _validate_get_path(apsp_table, source_vertex, dest_vertex, path_table)
+
+ temp1_name = unique_string(desp='temp1')
+ temp2_name = unique_string(desp='temp2')
+
+ summary = plpy.execute("SELECT * FROM {0}_summary".format(apsp_table))
+ vertex_id = summary[0]['vertex_id']
+ edge_args = summary[0]['edge_args']
+ grouping_cols = summary[0]['grouping_cols']
+
+ params_types = {'src': str, 'dest': str, 'weight': str}
+ default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
+ edge_params = extract_keyvalue_params(edge_args,
+ params_types,
+ default_args)
+
+ src = edge_params["src"]
+ dest = edge_params["dest"]
+ weight = edge_params["weight"]
+
+ if vertex_id == "NULL":
+ vertex_id = "id"
+
+ if grouping_cols == "NULL":
+ grouping_cols = None
+
+ select_grps = ""
+ check_grps_t1 = ""
+ check_grps_t2 = ""
+ grp_comma = ""
+ tmp = ""
+
+ if grouping_cols is not None:
+ glist = split_quoted_delimited_str(grouping_cols)
+ select_grps = _grp_from_table(apsp_table, glist) + " , "
+ check_grps_t1 = " AND " + _check_groups(
+ apsp_table, temp1_name, glist)
+ check_grps_t2 = " AND " + _check_groups(
+ apsp_table, temp2_name, glist)
+
+ grp_comma = grouping_cols + " , "
+
+ # If the source and destination is the same vertex.
+ # There is no need to check the paths for any group.
+ if source_vertex == dest_vertex:
+ plpy.execute("""
+ CREATE TABLE {path_table} AS
+ SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
+ FROM {apsp_table} WHERE {src} = {source_vertex} AND
+ {dest} = {dest_vertex}
+ """.format(**locals()))
+ return
+
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+ format(temp1_name, temp2_name))
+
+ # Initialize the temporary tables
+ out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
+ SELECT {grp_comma} {apsp_table}.parent AS {vertex_id},
+ ARRAY[{dest_vertex}] AS path
+ FROM {apsp_table}
+ WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
+ AND {apsp_table}.parent IS NOT NULL
+ """.format(**locals()))
+
+ plpy.execute("""
+ CREATE TEMP TABLE {temp2_name} AS
+ SELECT * FROM {temp1_name} LIMIT 0
+ """.format(**locals()))
+
+ # Follow the 'parent' chain until you reach the case where the parent
+ # is the same as destination. This means it is the last vertex before
+ # the source.
+ while out.nrows() > 0:
+
+ plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
+ # If the parent id is not the same as dest,
+ # that means we have to follow the chain:
+ # Add it to the path and move to its parent
+ out = plpy.execute(
+ """ INSERT INTO {temp2_name}
+ SELECT {select_grps} {apsp_table}.parent AS {vertex_id},
+ {apsp_table}.{dest} || {temp1_name}.path AS path
+ FROM {apsp_table} INNER JOIN {temp1_name} ON
+ ({apsp_table}.{dest} = {temp1_name}.{vertex_id}
+ {check_grps_t1})
+ WHERE {src} = {source_vertex} AND
+ {apsp_table}.parent <> {apsp_table}.{dest}
+ """.format(**locals()))
+
+ tmp = temp2_name
+ temp2_name = temp1_name
+ temp1_name = tmp
+
+ tmp = check_grps_t1
+ check_grps_t1 = check_grps_t2
+ check_grps_t2 = tmp
+
+ # We have to consider 3 cases.
+ # 1) The path has more than 2 vertices:
+ # Add the current parent and the source vertex
+ # 2) The path has exactly 2 vertices (an edge between src and dest is
+ # the shortest path).
+ # Add the source vertex
+ # 3) The path has 0 vertices (unreachable)
+ # Add an empty array.
+
+ # Path with 1 vertex (src == dest) has been handled before
+ plpy.execute("""
+ CREATE TABLE {path_table} AS
+ SELECT {grp_comma} {source_vertex} || ({vertex_id} || path) AS path
+ FROM {temp2_name}
+ WHERE {vertex_id} <> {dest_vertex}
+ UNION
+ SELECT {grp_comma} {source_vertex} || path AS path
+ FROM {temp2_name}
+ WHERE {vertex_id} = {dest_vertex}
+ UNION
+ SELECT {grp_comma} '{{}}'::INT[] AS path
+ FROM {apsp_table}
+ WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
+ AND {apsp_table}.parent IS NULL
+ """.format(**locals()))
+
+ out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
+
+ if out.nrows() == 0:
+ plpy.error(("Graph APSP: Vertex {0} and/or {1} is not present" +
+ " in the APSP table {1}").format(
+ source_vertex, dest_vertex, apsp_table))
+
+ plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
+ format(**locals()))
+
+ return None
+
def _validate_apsp(vertex_table, vertex_id, edge_table, edge_params,
- out_table, glist, **kwargs):
+ out_table, glist, **kwargs):
+
+ validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
+ out_table, 'APSP')
- validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
- out_table,'APSP')
+ vt_error = plpy.execute(
+ """ SELECT {vertex_id}
+ FROM {vertex_table}
+ WHERE {vertex_id} IS NOT NULL
+ GROUP BY {vertex_id}
+ HAVING count(*) > 1 """.format(**locals()))
- vt_error = plpy.execute(
- """ SELECT {vertex_id}
- FROM {vertex_table}
- WHERE {vertex_id} IS NOT NULL
- GROUP BY {vertex_id}
- HAVING count(*) > 1 """.format(**locals()))
+ if vt_error.nrows() != 0:
+ plpy.error(
+ """Graph APSP: Source vertex table {vertex_table} contains duplicate vertex id's.""".
+ format(**locals()))
- if vt_error.nrows() != 0:
- plpy.error(
- """Graph APSP: Source vertex table {vertex_table} contains duplicate vertex id's.""".
- format(**locals()))
+ _assert(not table_exists(out_table + "_summary"),
+ "Graph APSP: Output summary table already exists!")
- _assert(not table_exists(out_table+"_summary"),
- "Graph APSP: Output summary table already exists!")
+ if glist is not None:
+ _assert(columns_exist_in_table(edge_table, glist),
+ """Graph APSP: Not all columns from {glist} are present in edge table ({edge_table}).""".
+ format(**locals()))
- if glist is not None:
- _assert(columns_exist_in_table(edge_table, glist),
- """Graph APSP: Not all columns from {glist} are present in edge table ({edge_table}).""".
- format(**locals()))
def _validate_get_path(apsp_table, source_vertex, dest_vertex,
- path_table, **kwargs):
+ path_table, **kwargs):
- _assert(apsp_table and apsp_table.strip().lower() not in ('null', ''),
- "Graph APSP: Invalid APSP table name!")
- _assert(table_exists(apsp_table),
- "Graph APSP: APSP table ({0}) is missing!".format(apsp_table))
- _assert(not table_is_empty(apsp_table),
- "Graph APSP: APSP table ({0}) is empty!".format(apsp_table))
+ _assert(apsp_table and apsp_table.strip().lower() not in ('null', ''),
+ "Graph APSP: Invalid APSP table name!")
+ _assert(table_exists(apsp_table),
+ "Graph APSP: APSP table ({0}) is missing!".format(apsp_table))
+ _assert(not table_is_empty(apsp_table),
+ "Graph APSP: APSP table ({0}) is empty!".format(apsp_table))
- summary = apsp_table+"_summary"
- _assert(table_exists(summary),
- "Graph APSP: APSP summary table ({0}) is missing!".format(summary))
- _assert(not table_is_empty(summary),
- "Graph APSP: APSP summary table ({0}) is empty!".format(summary))
+ summary = apsp_table + "_summary"
+ _assert(table_exists(summary),
+ "Graph APSP: APSP summary table ({0}) is missing!".format(summary))
+ _assert(not table_is_empty(summary),
+ "Graph APSP: APSP summary table ({0}) is empty!".format(summary))
- _assert(not table_exists(path_table),
- "Graph APSP: Output path table already exists!")
+ _assert(not table_exists(path_table),
+ "Graph APSP: Output path table already exists!")
+
+ return None
- return None
def graph_apsp_help(schema_madlib, message, **kwargs):
- """
- Help function for graph_apsp and graph_apsp_get_path
-
- Args:
- @param schema_madlib
- @param message: string, Help message string
- @param kwargs
-
- Returns:
- String. Help/usage information
- """
- if not message:
- help_string = """
+ """
+ Help function for graph_apsp and graph_apsp_get_path
+
+ Args:
+ @param schema_madlib
+ @param message: string, Help message string
+ @param kwargs
+
+ Returns:
+ String. Help/usage information
+ """
+ if not message:
+ help_string = """
-----------------------------------------------------------------------
SUMMARY
-----------------------------------------------------------------------
@@ -717,8 +717,8 @@ edges is minimized.
For more details on function usage:
SELECT {schema_madlib}.graph_apsp('usage')
"""
- elif message.lower() in ['usage', 'help', '?']:
- help_string = """
+ elif message.lower() in ['usage', 'help', '?']:
+ help_string = """
Given a graph, all pairs shortest path (apsp) algorithm finds a path for
every vertex pair such that the sum of the weights of its constituent
edges is minimized.
@@ -728,10 +728,10 @@ edges is minimized.
To retrieve the path for a specific vertex pair:
SELECT {schema_madlib}.graph_apsp_get_path(
- apsp_table TEXT, -- Name of the table that contains the apsp output.
+ apsp_table TEXT, -- Name of the table that contains the apsp output.
source_vertex INT, -- The vertex that will be the source of the
-- desired path.
- dest_vertex INT, -- The vertex that will be the destination of the
+ dest_vertex INT, -- The vertex that will be the destination of the
-- desired path.
path_table TEXT -- Name of the output table that contains the path.
);
@@ -762,8 +762,8 @@ every group and has the following columns:
- path (ARRAY) : The shortest path from the source vertex to the
destination vertex.
"""
- elif message.lower() in ("example", "examples"):
- help_string = """
+ elif message.lower() in ("example", "examples"):
+ help_string = """
----------------------------------------------------------------------------
EXAMPLES
----------------------------------------------------------------------------
@@ -806,11 +806,11 @@ INSERT INTO edge VALUES
-- Compute the apsp:
DROP TABLE IF EXISTS out;
SELECT madlib.graph_apsp(
- 'vertex', -- Vertex table
- 'id', -- Vertix id column
- 'edge', -- Edge table
- 'src=src, dest=dest, weight=weight', -- Comma delimited string of edge arguments
- 'out' -- Output table of apsp
+ 'vertex', -- Vertex table
+ 'id', -- Vertix id column
+ 'edge', -- Edge table
+ 'src=src, dest=dest, weight=weight', -- Comma delimited string of edge arguments
+ 'out' -- Output table of apsp
);
-- View the apsp costs for every vertex:
SELECT * FROM out ORDER BY src, dest;
@@ -834,11 +834,11 @@ INSERT INTO edge_gr VALUES
DROP TABLE IF EXISTS out_gr, out_gr_summary;
SELECT graph_apsp('vertex',NULL,'edge_gr',NULL,'out_gr','grp');
"""
- else:
- help_string = "No such option. Use {schema_madlib}.graph_apsp()"
+ else:
+ help_string = "No such option. Use {schema_madlib}.graph_apsp()"
- return help_string.format(schema_madlib=schema_madlib,
- graph_usage=get_graph_usage(schema_madlib, 'graph_apsp',
- """out_table TEXT, -- Name of the table to store the result of apsp.
+ return help_string.format(schema_madlib=schema_madlib,
+ graph_usage=get_graph_usage(schema_madlib, 'graph_apsp',
+ """out_table TEXT, -- Name of the table to store the result of apsp.
grouping_cols TEXT -- The list of grouping columns."""))
# ---------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/graph_utils.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/graph_utils.py_in b/src/ports/postgres/modules/graph/graph_utils.py_in
index 944c301..9d31345 100644
--- a/src/ports/postgres/modules/graph/graph_utils.py_in
+++ b/src/ports/postgres/modules/graph/graph_utils.py_in
@@ -27,115 +27,115 @@
@namespace graph
"""
-import plpy
-from utilities.control import MinWarning
from utilities.utilities import _assert
-from utilities.utilities import extract_keyvalue_params
-from utilities.utilities import unique_string
from utilities.validate_args import get_cols
from utilities.validate_args import unquote_ident
from utilities.validate_args import table_exists
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_is_empty
+
def _grp_null_checks(grp_list):
"""
- Helper function for generating NULL checks for grouping columns
+ Helper function for generating NULL checks for grouping columns
to be used within a WHERE clause
Args:
@param grp_list The list of grouping columns
"""
return ' AND '.join([" {i} IS NOT NULL ".format(**locals())
- for i in grp_list])
+ for i in grp_list])
+
def _check_groups(tbl1, tbl2, grp_list):
+ """
+ Helper function for joining tables with groups.
+ Args:
+ @param tbl1 Name of the first table
+ @param tbl2 Name of the second table
+ @param grp_list The list of grouping columns
+ """
- """
- Helper function for joining tables with groups.
- Args:
- @param tbl1 Name of the first table
- @param tbl2 Name of the second table
- @param grp_list The list of grouping columns
- """
+ return ' AND '.join([" {tbl1}.{i} = {tbl2}.{i} ".format(**locals())
+ for i in grp_list])
- return ' AND '.join([" {tbl1}.{i} = {tbl2}.{i} ".format(**locals())
- for i in grp_list])
def _grp_from_table(tbl, grp_list):
+ """
+ Helper function for selecting grouping columns of a table
+ Args:
+ @param tbl Name of the table
+ @param grp_list The list of grouping columns
+ """
+ return ' , '.join([" {tbl}.{i} ".format(**locals())
+ for i in grp_list])
- """
- Helper function for selecting grouping columns of a table
- Args:
- @param tbl Name of the table
- @param grp_list The list of grouping columns
- """
- return ' , '.join([" {tbl}.{i} ".format(**locals())
- for i in grp_list])
def validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
- out_table, func_name, **kwargs):
- """
- Validates graph tables (vertex and edge) as well as the output table.
- """
- _assert(out_table and out_table.strip().lower() not in ('null', ''),
- "Graph {func_name}: Invalid output table name!".format(**locals()))
- _assert(not table_exists(out_table),
- "Graph {func_name}: Output table already exists!".format(**locals()))
-
- _assert(vertex_table and vertex_table.strip().lower() not in ('null', ''),
- "Graph {func_name}: Invalid vertex table name!".format(**locals()))
- _assert(table_exists(vertex_table),
- "Graph {func_name}: Vertex table ({vertex_table}) is missing!".format(
- **locals()))
- _assert(not table_is_empty(vertex_table),
- "Graph {func_name}: Vertex table ({vertex_table}) is empty!".format(
- **locals()))
-
- _assert(edge_table and edge_table.strip().lower() not in ('null', ''),
- "Graph {func_name}: Invalid edge table name!".format(**locals()))
- _assert(table_exists(edge_table),
- "Graph {func_name}: Edge table ({edge_table}) is missing!".format(
- **locals()))
- _assert(not table_is_empty(edge_table),
- "Graph {func_name}: Edge table ({edge_table}) is empty!".format(
- **locals()))
-
- existing_cols = set(unquote_ident(i) for i in get_cols(vertex_table))
- _assert(vertex_id in existing_cols,
- """Graph {func_name}: The vertex column {vertex_id} is not present in vertex table ({vertex_table}) """.
- format(**locals()))
- _assert(columns_exist_in_table(edge_table, edge_params.values()),
- """Graph {func_name}: Not all columns from {cols} are present in edge table ({edge_table})""".
- format(cols=edge_params.values(), **locals()))
-
- return None
+ out_table, func_name, **kwargs):
+ """
+ Validates graph tables (vertex and edge) as well as the output table.
+ """
+ _assert(out_table and out_table.strip().lower() not in ('null', ''),
+ "Graph {func_name}: Invalid output table name!".format(**locals()))
+ _assert(not table_exists(out_table),
+ "Graph {func_name}: Output table already exists!".format(**locals()))
+
+ _assert(vertex_table and vertex_table.strip().lower() not in ('null', ''),
+ "Graph {func_name}: Invalid vertex table name!".format(**locals()))
+ _assert(table_exists(vertex_table),
+ "Graph {func_name}: Vertex table ({vertex_table}) is missing!".format(
+ **locals()))
+ _assert(not table_is_empty(vertex_table),
+ "Graph {func_name}: Vertex table ({vertex_table}) is empty!".format(
+ **locals()))
+
+ _assert(edge_table and edge_table.strip().lower() not in ('null', ''),
+ "Graph {func_name}: Invalid edge table name!".format(**locals()))
+ _assert(table_exists(edge_table),
+ "Graph {func_name}: Edge table ({edge_table}) is missing!".format(
+ **locals()))
+ _assert(not table_is_empty(edge_table),
+ "Graph {func_name}: Edge table ({edge_table}) is empty!".format(
+ **locals()))
+
+ existing_cols = set(unquote_ident(i) for i in get_cols(vertex_table))
+ _assert(vertex_id in existing_cols,
+ """Graph {func_name}: The vertex column {vertex_id} is not present in vertex table ({vertex_table}) """.
+ format(**locals()))
+ _assert(columns_exist_in_table(edge_table, edge_params.values()),
+ """Graph {func_name}: Not all columns from {cols} are present in edge table ({edge_table})""".
+ format(cols=edge_params.values(), **locals()))
+
+ return None
+
def get_graph_usage(schema_madlib, func_name, other_text):
- usage = """
-----------------------------------------------------------------------------
- USAGE
-----------------------------------------------------------------------------
- SELECT {schema_madlib}.{func_name}(
- vertex_table TEXT, -- Name of the table that contains the vertex data.
- vertex_id TEXT, -- Name of the column containing the vertex ids.
- edge_table TEXT, -- Name of the table that contains the edge data.
- edge_args TEXT{comma} -- A comma-delimited string containing multiple
- -- named arguments of the form "name=value".
- {other_text}
-);
-
-The following parameters are supported for edge table arguments ('edge_args'
- above):
-
-src (default = 'src') : Name of the column containing the source
- vertex ids in the edge table.
-dest (default = 'dest') : Name of the column containing the destination
- vertex ids in the edge table.
-weight (default = 'weight') : Name of the column containing the weight of
- edges in the edge table.
-""".format(schema_madlib=schema_madlib, func_name=func_name,
- other_text=other_text, comma = ',' if other_text is not None else ' ')
-
- return usage
+ usage = """
+ ----------------------------------------------------------------------------
+ USAGE
+ ----------------------------------------------------------------------------
+ SELECT {schema_madlib}.{func_name}(
+ vertex_table TEXT, -- Name of the table that contains the vertex data.
+ vertex_id TEXT, -- Name of the column containing the vertex ids.
+ edge_table TEXT, -- Name of the table that contains the edge data.
+ edge_args TEXT{comma} -- A comma-delimited string containing multiple
+ -- named arguments of the form "name=value".
+ {other_text}
+ );
+
+ The following parameters are supported for edge table arguments
+ ('edge_args' above):
+
+ src (default = 'src'): Name of the column containing the source
+ vertex ids in the edge table.
+ dest (default = 'dest'): Name of the column containing the destination
+ vertex ids in the edge table.
+ weight (default = 'weight'): Name of the column containing the weight of
+ edges in the edge table.
+ """.format(schema_madlib=schema_madlib,
+ func_name=func_name,
+ other_text=other_text,
+ comma=',' if other_text is not None else ' ')
+ return usage