You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ri...@apache.org on 2017/07/20 21:24:26 UTC
[1/3] incubator-madlib git commit: Graph: Update Python code to
follow PEP-8
Repository: incubator-madlib
Updated Branches:
refs/heads/master 8c9b955cd -> d487df3c4
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/sssp.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/sssp.py_in b/src/ports/postgres/modules/graph/sssp.py_in
index 4839d2d..93497c4 100644
--- a/src/ports/postgres/modules/graph/sssp.py_in
+++ b/src/ports/postgres/modules/graph/sssp.py_in
@@ -33,21 +33,22 @@ from graph_utils import get_graph_usage
from graph_utils import _grp_from_table
from graph_utils import _check_groups
from utilities.control import MinWarning
+
from utilities.utilities import _assert
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string
from utilities.utilities import split_quoted_delimited_str
+from utilities.utilities import is_platform_pg, is_platform_hawq
+
from utilities.validate_args import table_exists
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_is_empty
from utilities.validate_args import get_expr_type
-m4_changequote(`<!', `!>')
def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
- edge_args, source_vertex, out_table, grouping_cols, **kwargs):
-
- """
+ edge_args, source_vertex, out_table, grouping_cols, **kwargs):
+ """
Single source shortest path function for graphs using the Bellman-Ford
algorhtm [1].
Args:
@@ -63,383 +64,382 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
[1] https://en.wikipedia.org/wiki/Bellman-Ford_algorithm
"""
- with MinWarning("warning"):
-
- INT_MAX = 2147483647
- INFINITY = "'Infinity'"
- EPSILON = 0.000001
-
- message = unique_string(desp='message')
-
- oldupdate = unique_string(desp='oldupdate')
- newupdate = unique_string(desp='newupdate')
-
- params_types = {'src': str, 'dest': str, 'weight': str}
- default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
- edge_params = extract_keyvalue_params(edge_args,
- params_types,
- default_args)
-
- # Prepare the input for recording in the summary table
- if vertex_id is None:
- v_st= "NULL"
- vertex_id = "id"
- else:
- v_st = vertex_id
- if edge_args is None:
- e_st = "NULL"
- else:
- e_st = edge_args
- if grouping_cols is None:
- g_st = "NULL"
- glist = None
- else:
- g_st = grouping_cols
- glist = split_quoted_delimited_str(grouping_cols)
-
- src = edge_params["src"]
- dest = edge_params["dest"]
- weight = edge_params["weight"]
-
- distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
- <!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
- local_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
- <!"DISTRIBUTED BY (id)"!>)
-
- is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
- _validate_sssp(vertex_table, vertex_id, edge_table,
- edge_params, source_vertex, out_table, glist)
-
- plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format(
- message,oldupdate,newupdate))
-
- # Initialize grouping related variables
- comma_grp = ""
- comma_grp_e = ""
- comma_grp_m = ""
- grp_comma = ""
- checkg_oo = ""
- checkg_eo = ""
- checkg_ex = ""
- checkg_om = ""
- group_by = ""
-
- if grouping_cols is not None:
- comma_grp = " , " + grouping_cols
- group_by = " , " + _grp_from_table(edge_table,glist)
- comma_grp_e = " , " + _grp_from_table(edge_table,glist)
- comma_grp_m = " , " + _grp_from_table("message",glist)
- grp_comma = grouping_cols + " , "
-
- checkg_oo_sub = _check_groups(out_table,"oldupdate",glist)
- checkg_oo = " AND " + checkg_oo_sub
- checkg_eo = " AND " + _check_groups(edge_table,"oldupdate",glist)
- checkg_ex = " AND " + _check_groups(edge_table,"x",glist)
- checkg_om = " AND " + _check_groups("out_table","message",glist)
-
- w_type = get_expr_type(weight,edge_table).lower()
- init_w = INT_MAX
- if w_type in ['real','double precision','float8']:
- init_w = INFINITY
-
- # We keep a table of every vertex, the minimum cost to that destination
- # seen so far and the parent to this vertex in the associated shortest
- # path. This table will be updated throughout the execution.
- plpy.execute(
- """ CREATE TABLE {out_table} AS ( SELECT
- {grp_comma} {src} AS {vertex_id}, {weight},
- {src} AS parent FROM {edge_table} LIMIT 0)
- {distribution} """.format(**locals()))
-
- # We keep a summary table to keep track of the parameters used for this
- # SSSP run. This table is used in the path finding function to eliminate
- # the need for repetition.
- plpy.execute( """ CREATE TABLE {out_table}_summary (
- vertex_table TEXT,
- vertex_id TEXT,
- edge_table TEXT,
- edge_args TEXT,
- source_vertex INTEGER,
- out_table TEXT,
- grouping_cols TEXT)
- """.format(**locals()))
- plpy.execute( """ INSERT INTO {out_table}_summary VALUES
- ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
- {source_vertex}, '{out_table}', '{g_st}')
- """.format(**locals()))
-
- # We keep 2 update tables and alternate them during the execution.
- # This is necessary since we need to know which vertices are updated in
- # the previous iteration to calculate the next set of updates.
- plpy.execute(
- """ CREATE TEMP TABLE {oldupdate} AS ( SELECT
- {src} AS id, {weight},
- {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
- {local_distribution}
- """.format(**locals()))
- plpy.execute(
- """ CREATE TEMP TABLE {newupdate} AS ( SELECT
- {src} AS id, {weight},
- {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
- {local_distribution}
- """.format(**locals()))
-
- # Since HAWQ does not allow us to update, we create a new table and
- # rename at every iteration.
- if is_hawq:
- temp_table = unique_string(desp='temp')
- sql =""" CREATE TABLE {temp_table} AS ( SELECT * FROM {out_table} )
- {distribution} """
- plpy.execute(sql.format(**locals()))
-
- # GPDB and HAWQ have distributed by clauses to help them with indexing.
- # For Postgres we add the indices manually.
- sql_index = m4_ifdef(<!__POSTGRESQL__!>,
- <!""" CREATE INDEX ON {out_table} ({vertex_id});
- CREATE INDEX ON {oldupdate} (id);
- CREATE INDEX ON {newupdate} (id);
- """.format(**locals())!>,
- <!''!>)
- plpy.execute(sql_index)
-
- # The initialization step is quite different when grouping is involved
- # since not every group (subgraph) will have the same set of vertices.
-
- # Example:
- # Assume there are two grouping columns g1 and g2
- # g1 values are 0 and 1. g2 values are 5 and 6
- if grouping_cols is not None:
-
- distinct_grp_table = unique_string(desp='grp')
- plpy.execute(""" DROP TABLE IF EXISTS {distinct_grp_table} """.
- format(**locals()))
- plpy.execute( """ CREATE TEMP TABLE {distinct_grp_table} AS
- SELECT DISTINCT {grouping_cols} FROM {edge_table} """.
- format(**locals()))
- subq = unique_string(desp='subquery')
-
- checkg_ds_sub = _check_groups(distinct_grp_table,subq,glist)
- grp_d_comma = _grp_from_table(distinct_grp_table,glist) +","
-
- plpy.execute(
- """ INSERT INTO {out_table}
- SELECT {grp_d_comma} {vertex_id} AS {vertex_id},
- {init_w} AS {weight}, NULL::INT AS parent
- FROM {distinct_grp_table} INNER JOIN
- (
- SELECT {src} AS {vertex_id} {comma_grp}
- FROM {edge_table}
- UNION
- SELECT {dest} AS {vertex_id} {comma_grp}
- FROM {edge_table}
- ) {subq} ON ({checkg_ds_sub})
- WHERE {vertex_id} IS NOT NULL
- """.format(**locals()))
-
- plpy.execute(
- """ INSERT INTO {oldupdate}
- SELECT {source_vertex}, 0, {source_vertex},
- {grouping_cols}
- FROM {distinct_grp_table}
- """.format(**locals()))
-
- # The maximum number of vertices for any group.
- # Used for determining negative cycles.
- v_cnt = plpy.execute(
- """ SELECT max(count) as max FROM (
- SELECT count({vertex_id}) AS count
- FROM {out_table}
- GROUP BY {grouping_cols}) x
- """.format(**locals()))[0]['max']
- plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
- else:
- plpy.execute(
- """ INSERT INTO {out_table}
- SELECT {vertex_id} AS {vertex_id},
- {init_w} AS {weight},
- NULL AS parent
- FROM {vertex_table}
- WHERE {vertex_id} IS NOT NULL
- """.format(**locals()))
-
- # The source can be reached with 0 cost and it has itself as the
- # parent.
- plpy.execute(
- """ INSERT INTO {oldupdate}
- VALUES({source_vertex},0,{source_vertex})
- """.format(**locals()))
-
- v_cnt = plpy.execute(
- """ SELECT count(*) FROM {vertex_table}
- WHERE {vertex_id} IS NOT NULL
- """.format(**locals()))[0]['count']
-
- for i in range(0,v_cnt+1):
-
- # Apply the updates calculated in the last iteration.
- if is_hawq:
- sql = """
- TRUNCATE TABLE {temp_table};
- INSERT INTO {temp_table}
- SELECT *
- FROM {out_table}
- WHERE NOT EXISTS (
- SELECT 1
- FROM {oldupdate} as oldupdate
- WHERE {out_table}.{vertex_id} = oldupdate.id
- {checkg_oo})
- UNION
- SELECT {grp_comma} id, {weight}, parent FROM {oldupdate};
- """
- plpy.execute(sql.format(**locals()))
- plpy.execute("DROP TABLE {0}".format(out_table))
- plpy.execute("ALTER TABLE {0} RENAME TO {1}".
- format(temp_table,out_table))
- sql = """ CREATE TABLE {temp_table} AS (
- SELECT * FROM {out_table} LIMIT 0)
- {distribution};"""
- plpy.execute(sql.format(**locals()))
- ret = plpy.execute("SELECT id FROM {0} LIMIT 1".
- format(oldupdate))
- else:
- sql = """
- UPDATE {out_table} SET
- {weight}=oldupdate.{weight},
- parent=oldupdate.parent
- FROM
- {oldupdate} AS oldupdate
- WHERE
- {out_table}.{vertex_id}=oldupdate.id AND
- {out_table}.{weight}>oldupdate.{weight} {checkg_oo}
- """
- ret = plpy.execute(sql.format(**locals()))
-
- if ret.nrows() == 0:
- break
-
- plpy.execute("TRUNCATE TABLE {0}".format(newupdate))
-
- # 'oldupdate' table has the update info from the last iteration
-
- # Consider every edge that has an updated source
- # From these edges:
- # For every destination vertex, find the min total cost to reach.
- # Note that, just calling an aggregate function with group by won't
- # let us store the src field of the edge (needed for the parent).
- # This is why we need the 'x'; it gives a list of destinations and
- # associated min values. Using these values, we identify which edge
- # is selected.
-
- # Since using '=' with floats is dangerous we use an epsilon value
- # for comparison.
-
- # Once we have a list of edges and values (stores as 'message'),
- # we check if these values are lower than the existing shortest
- # path values.
-
- sql = (""" INSERT INTO {newupdate}
- SELECT DISTINCT ON (message.id {comma_grp})
- message.id AS id,
- message.{weight} AS {weight},
- message.parent AS parent {comma_grp_m}
- FROM {out_table} AS out_table INNER JOIN
- (
- SELECT {edge_table}.{dest} AS id, x.{weight} AS {weight},
- oldupdate.id AS parent {comma_grp_e}
- FROM {oldupdate} AS oldupdate INNER JOIN
- {edge_table} ON
- ({edge_table}.{src} = oldupdate.id {checkg_eo})
- INNER JOIN
- (
- SELECT {edge_table}.{dest} AS id,
- min(oldupdate.{weight} +
- {edge_table}.{weight}) AS {weight} {comma_grp_e}
- FROM {oldupdate} AS oldupdate INNER JOIN
- {edge_table} ON
- ({edge_table}.{src}=oldupdate.id {checkg_eo})
- GROUP BY {edge_table}.{dest} {comma_grp_e}
- ) x
- ON ({edge_table}.{dest} = x.id {checkg_ex} )
- WHERE ABS(oldupdate.{weight} + {edge_table}.{weight}
- - x.{weight}) < {EPSILON}
- ) message
- ON (message.id = out_table.{vertex_id} {checkg_om})
- WHERE message.{weight}<out_table.{weight}
- """.format(**locals()))
-
- plpy.execute(sql)
-
- # Swap the update tables for the next iteration.
- tmp = oldupdate
- oldupdate = newupdate
- newupdate = tmp
-
- plpy.execute("DROP TABLE IF EXISTS {0}".format(newupdate))
- # The algorithm should converge in less than |V| iterations.
- # Otherwise there is a negative cycle in the graph.
- if i == v_cnt:
- if grouping_cols is None:
- plpy.execute("DROP TABLE IF EXISTS {0},{1},{2}".
- format(out_table, out_table+"_summary", oldupdate))
- if is_hawq:
- plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_table))
- plpy.error("Graph SSSP: Detected a negative cycle in the graph.")
-
- # It is possible that not all groups has negative cycles.
- else:
-
- # negs is the string created by collating grouping columns.
- # By looking at the oldupdate table we can see which groups
- # are in a negative cycle.
-
- negs = plpy.execute(
- """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
- FROM {oldupdate}
- """.format(**locals()))[0]['grp']
-
- # Delete the groups with negative cycles from the output table.
- if is_hawq:
- sql_del = """
- TRUNCATE TABLE {temp_table};
- INSERT INTO {temp_table}
- SELECT *
- FROM {out_table}
- WHERE NOT EXISTS(
- SELECT 1
- FROM {oldupdate} as oldupdate
- WHERE {checkg_oo_sub}
- );"""
- plpy.execute(sql_del.format(**locals()))
- plpy.execute("DROP TABLE {0}".format(out_table))
- plpy.execute("ALTER TABLE {0} RENAME TO {1}".
- format(temp_table,out_table))
- else:
- sql_del = """ DELETE FROM {out_table}
- USING {oldupdate} AS oldupdate
- WHERE {checkg_oo_sub}"""
- plpy.execute(sql_del.format(**locals()))
-
- # If every group has a negative cycle,
- # drop the output table as well.
- if table_is_empty(out_table):
- plpy.execute("DROP TABLE IF EXISTS {0},{1}".
- format(out_table,out_table+"_summary"))
-
- plpy.warning(
- """Graph SSSP: Detected a negative cycle in the """ +
- """sub-graphs of following groups: {0}.""".
- format(str(negs)[1:-1]))
-
- plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
- if is_hawq:
- plpy.execute("DROP TABLE IF EXISTS {temp_table} ".
- format(**locals()))
-
- return None
+ with MinWarning("warning"):
+
+ INT_MAX = 2147483647
+ INFINITY = "'Infinity'"
+ EPSILON = 0.000001
+
+ message = unique_string(desp='message')
+
+ oldupdate = unique_string(desp='oldupdate')
+ newupdate = unique_string(desp='newupdate')
+
+ params_types = {'src': str, 'dest': str, 'weight': str}
+ default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
+ edge_params = extract_keyvalue_params(edge_args,
+ params_types,
+ default_args)
+
+ # Prepare the input for recording in the summary table
+ if vertex_id is None:
+ v_st = "NULL"
+ vertex_id = "id"
+ else:
+ v_st = vertex_id
+ if edge_args is None:
+ e_st = "NULL"
+ else:
+ e_st = edge_args
+ if grouping_cols is None:
+ g_st = "NULL"
+ glist = None
+ else:
+ g_st = grouping_cols
+ glist = split_quoted_delimited_str(grouping_cols)
+
+ src = edge_params["src"]
+ dest = edge_params["dest"]
+ weight = edge_params["weight"]
+
+ distribution = '' if is_platform_pg() else "DISTRIBUTED BY ({0})".format(vertex_id)
+ local_distribution = '' if is_platform_pg() else "DISTRIBUTED BY (id)"
+
+ is_hawq = is_platform_hawq()
+
+ _validate_sssp(vertex_table, vertex_id, edge_table,
+ edge_params, source_vertex, out_table, glist)
+
+ plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format(
+ message, oldupdate, newupdate))
+
+ # Initialize grouping related variables
+ comma_grp = ""
+ comma_grp_e = ""
+ comma_grp_m = ""
+ grp_comma = ""
+ checkg_oo = ""
+ checkg_eo = ""
+ checkg_ex = ""
+ checkg_om = ""
+ group_by = ""
+
+ if grouping_cols is not None:
+ comma_grp = " , " + grouping_cols
+ group_by = " , " + _grp_from_table(edge_table, glist)
+ comma_grp_e = " , " + _grp_from_table(edge_table, glist)
+ comma_grp_m = " , " + _grp_from_table("message", glist)
+ grp_comma = grouping_cols + " , "
+
+ checkg_oo_sub = _check_groups(out_table, "oldupdate", glist)
+ checkg_oo = " AND " + checkg_oo_sub
+ checkg_eo = " AND " + _check_groups(edge_table, "oldupdate", glist)
+ checkg_ex = " AND " + _check_groups(edge_table, "x", glist)
+ checkg_om = " AND " + _check_groups("out_table", "message", glist)
+
+ w_type = get_expr_type(weight, edge_table).lower()
+ init_w = INT_MAX
+ if w_type in ['real', 'double precision', 'float8']:
+ init_w = INFINITY
+
+ # We keep a table of every vertex, the minimum cost to that destination
+ # seen so far and the parent to this vertex in the associated shortest
+ # path. This table will be updated throughout the execution.
+ plpy.execute(
+ """ CREATE TABLE {out_table} AS (SELECT
+ {grp_comma} {src} AS {vertex_id}, {weight},
+ {src} AS parent FROM {edge_table} LIMIT 0)
+ {distribution} """.format(**locals()))
+
+ # We keep a summary table to keep track of the parameters used for this
+ # SSSP run. This table is used in the path finding function to eliminate
+ # the need for repetition.
+ plpy.execute(""" CREATE TABLE {out_table}_summary (
+ vertex_table TEXT,
+ vertex_id TEXT,
+ edge_table TEXT,
+ edge_args TEXT,
+ source_vertex INTEGER,
+ out_table TEXT,
+ grouping_cols TEXT)
+ """.format(**locals()))
+ plpy.execute(""" INSERT INTO {out_table}_summary VALUES
+ ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
+ {source_vertex}, '{out_table}', '{g_st}')
+ """.format(**locals()))
+
+ # We keep 2 update tables and alternate them during the execution.
+ # This is necessary since we need to know which vertices are updated in
+ # the previous iteration to calculate the next set of updates.
+ plpy.execute(
+ """ CREATE TEMP TABLE {oldupdate} AS (SELECT
+ {src} AS id, {weight},
+ {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
+ {local_distribution}
+ """.format(**locals()))
+ plpy.execute(
+ """ CREATE TEMP TABLE {newupdate} AS (SELECT
+ {src} AS id, {weight},
+ {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
+ {local_distribution}
+ """.format(**locals()))
+
+ # Since HAWQ does not allow us to update, we create a new table and
+ # rename at every iteration.
+ if is_hawq:
+ temp_table = unique_string(desp='temp')
+ sql = """ CREATE TABLE {temp_table} AS (SELECT * FROM {out_table} )
+ {distribution} """
+ plpy.execute(sql.format(**locals()))
+
+ # GPDB and HAWQ have distributed by clauses to help them with indexing.
+ # For Postgres we add the indices manually.
+ if is_platform_pg():
+ plpy.execute("""
+ CREATE INDEX ON {out_table} ({vertex_id});
+ CREATE INDEX ON {oldupdate} (id);
+ CREATE INDEX ON {newupdate} (id);
+ """.format(**locals()))
+
+ # The initialization step is quite different when grouping is involved
+ # since not every group (subgraph) will have the same set of vertices.
+
+ # Example:
+ # Assume there are two grouping columns g1 and g2
+ # g1 values are 0 and 1. g2 values are 5 and 6
+ if grouping_cols is not None:
+
+ distinct_grp_table = unique_string(desp='grp')
+ plpy.execute("DROP TABLE IF EXISTS {distinct_grp_table}".
+ format(**locals()))
+ plpy.execute("""
+ CREATE TEMP TABLE {distinct_grp_table} AS
+ SELECT DISTINCT {grouping_cols} FROM {edge_table}
+ """.format(**locals()))
+ subq = unique_string(desp='subquery')
+
+ checkg_ds_sub = _check_groups(distinct_grp_table, subq, glist)
+ grp_d_comma = _grp_from_table(distinct_grp_table, glist) + ","
+
+ plpy.execute("""
+ INSERT INTO {out_table}
+ SELECT {grp_d_comma} {vertex_id} AS {vertex_id},
+ {init_w} AS {weight}, NULL::INT AS parent
+ FROM {distinct_grp_table} INNER JOIN
+ (
+ SELECT {src} AS {vertex_id} {comma_grp}
+ FROM {edge_table}
+ UNION
+ SELECT {dest} AS {vertex_id} {comma_grp}
+ FROM {edge_table}
+ ) {subq} ON ({checkg_ds_sub})
+ WHERE {vertex_id} IS NOT NULL
+ """.format(**locals()))
+
+ plpy.execute("""
+ INSERT INTO {oldupdate}
+ SELECT {source_vertex}, 0, {source_vertex},
+ {grouping_cols}
+ FROM {distinct_grp_table}
+ """.format(**locals()))
+
+ # The maximum number of vertices for any group.
+ # Used for determining negative cycles.
+ v_cnt = plpy.execute("""
+ SELECT max(count) as max FROM (
+ SELECT count({vertex_id}) AS count
+ FROM {out_table}
+ GROUP BY {grouping_cols}) x
+ """.format(**locals()))[0]['max']
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
+ else:
+ plpy.execute("""
+ INSERT INTO {out_table}
+ SELECT {vertex_id} AS {vertex_id},
+ {init_w} AS {weight},
+ NULL AS parent
+ FROM {vertex_table}
+ WHERE {vertex_id} IS NOT NULL
+ """.format(**locals()))
+
+ # The source can be reached with 0 cost and it has itself as the
+ # parent.
+ plpy.execute("""
+ INSERT INTO {oldupdate}
+ VALUES({source_vertex},0,{source_vertex})
+ """.format(**locals()))
+
+ v_cnt = plpy.execute("""
+ SELECT count(*) FROM {vertex_table}
+ WHERE {vertex_id} IS NOT NULL
+ """.format(**locals()))[0]['count']
+
+ for i in range(0, v_cnt + 1):
+
+ # Apply the updates calculated in the last iteration.
+ if is_hawq:
+ sql = """
+ TRUNCATE TABLE {temp_table};
+ INSERT INTO {temp_table}
+ SELECT *
+ FROM {out_table}
+ WHERE NOT EXISTS (
+ SELECT 1
+ FROM {oldupdate} as oldupdate
+ WHERE {out_table}.{vertex_id} = oldupdate.id
+ {checkg_oo})
+ UNION
+ SELECT {grp_comma} id, {weight}, parent FROM {oldupdate};
+ """
+ plpy.execute(sql.format(**locals()))
+ plpy.execute("DROP TABLE {0}".format(out_table))
+ plpy.execute("ALTER TABLE {0} RENAME TO {1}".
+ format(temp_table, out_table))
+ sql = """ CREATE TABLE {temp_table} AS (
+ SELECT * FROM {out_table} LIMIT 0)
+ {distribution};"""
+ plpy.execute(sql.format(**locals()))
+ ret = plpy.execute("SELECT id FROM {0} LIMIT 1".
+ format(oldupdate))
+ else:
+ sql = """
+ UPDATE {out_table} SET
+ {weight} = oldupdate.{weight},
+ parent = oldupdate.parent
+ FROM
+ {oldupdate} AS oldupdate
+ WHERE
+ {out_table}.{vertex_id} = oldupdate.id AND
+ {out_table}.{weight} > oldupdate.{weight} {checkg_oo}
+ """
+ ret = plpy.execute(sql.format(**locals()))
+
+ if ret.nrows() == 0:
+ break
+
+ plpy.execute("TRUNCATE TABLE {0}".format(newupdate))
+
+ # 'oldupdate' table has the update info from the last iteration
+
+ # Consider every edge that has an updated source
+ # From these edges:
+ # For every destination vertex, find the min total cost to reach.
+ # Note that, just calling an aggregate function with group by won't
+ # let us store the src field of the edge (needed for the parent).
+ # This is why we need the 'x'; it gives a list of destinations and
+ # associated min values. Using these values, we identify which edge
+ # is selected.
+
+ # Since using '=' with floats is dangerous we use an epsilon value
+ # for comparison.
+
+ # Once we have a list of edges and values (stores as 'message'),
+ # we check if these values are lower than the existing shortest
+ # path values.
+
+ sql = (""" INSERT INTO {newupdate}
+ SELECT DISTINCT ON (message.id {comma_grp})
+ message.id AS id,
+ message.{weight} AS {weight},
+ message.parent AS parent {comma_grp_m}
+ FROM {out_table} AS out_table INNER JOIN
+ (
+ SELECT {edge_table}.{dest} AS id, x.{weight} AS {weight},
+ oldupdate.id AS parent {comma_grp_e}
+ FROM {oldupdate} AS oldupdate INNER JOIN
+ {edge_table} ON
+ ({edge_table}.{src} = oldupdate.id {checkg_eo})
+ INNER JOIN
+ (
+ SELECT {edge_table}.{dest} AS id,
+ min(oldupdate.{weight} +
+ {edge_table}.{weight}) AS {weight} {comma_grp_e}
+ FROM {oldupdate} AS oldupdate INNER JOIN
+ {edge_table} ON
+ ({edge_table}.{src}=oldupdate.id {checkg_eo})
+ GROUP BY {edge_table}.{dest} {comma_grp_e}
+ ) x
+ ON ({edge_table}.{dest} = x.id {checkg_ex} )
+ WHERE ABS(oldupdate.{weight} + {edge_table}.{weight}
+ - x.{weight}) < {EPSILON}
+ ) message
+ ON (message.id = out_table.{vertex_id} {checkg_om})
+ WHERE message.{weight}<out_table.{weight}
+ """.format(**locals()))
+
+ plpy.execute(sql)
+
+ # Swap the update tables for the next iteration.
+ tmp = oldupdate
+ oldupdate = newupdate
+ newupdate = tmp
+
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(newupdate))
+ # The algorithm should converge in less than |V| iterations.
+ # Otherwise there is a negative cycle in the graph.
+ if i == v_cnt:
+ if grouping_cols is None:
+ plpy.execute("DROP TABLE IF EXISTS {0},{1},{2}".
+ format(out_table, out_table + "_summary", oldupdate))
+ if is_hawq:
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_table))
+ plpy.error("Graph SSSP: Detected a negative cycle in the graph.")
+
+ # It is possible that not all groups has negative cycles.
+ else:
+
+ # negs is the string created by collating grouping columns.
+ # By looking at the oldupdate table we can see which groups
+ # are in a negative cycle.
+
+ negs = plpy.execute(
+ """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
+ FROM {oldupdate}
+ """.format(**locals()))[0]['grp']
+
+ # Delete the groups with negative cycles from the output table.
+ if is_hawq:
+ sql_del = """
+ TRUNCATE TABLE {temp_table};
+ INSERT INTO {temp_table}
+ SELECT *
+ FROM {out_table}
+ WHERE NOT EXISTS(
+ SELECT 1
+ FROM {oldupdate} as oldupdate
+ WHERE {checkg_oo_sub}
+ );"""
+ plpy.execute(sql_del.format(**locals()))
+ plpy.execute("DROP TABLE {0}".format(out_table))
+ plpy.execute("ALTER TABLE {0} RENAME TO {1}".
+ format(temp_table, out_table))
+ else:
+ sql_del = """ DELETE FROM {out_table}
+ USING {oldupdate} AS oldupdate
+ WHERE {checkg_oo_sub}"""
+ plpy.execute(sql_del.format(**locals()))
+
+ # If every group has a negative cycle,
+ # drop the output table as well.
+ if table_is_empty(out_table):
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+ format(out_table, out_table + "_summary"))
+
+ plpy.warning(
+ """Graph SSSP: Detected a negative cycle in the """ +
+ """sub-graphs of following groups: {0}.""".
+ format(str(negs)[1:-1]))
+
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
+ if is_hawq:
+ plpy.execute("DROP TABLE IF EXISTS {temp_table} ".
+ format(**locals()))
+ return None
+
def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, path_table,
- **kwargs):
- """
+ **kwargs):
+ """
Helper function that can be used to get the shortest path for a vertex
Args:
@param sssp_table Name of the table that contains the SSSP output.
@@ -447,188 +447,187 @@ def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, path_table,
desired path.
@param path_table Name of the output table that contains the path.
- """
- with MinWarning("warning"):
- _validate_get_path(sssp_table, dest_vertex, path_table)
-
- temp1_name = unique_string(desp='temp1')
- temp2_name = unique_string(desp='temp2')
-
- select_grps = ""
- check_grps_t1 = ""
- check_grps_t2 = ""
- grp_comma = ""
- tmp = ""
-
- summary = plpy.execute("SELECT * FROM {0}_summary".format(sssp_table))
- vertex_id = summary[0]['vertex_id']
- source_vertex = summary[0]['source_vertex']
-
- if vertex_id == "NULL":
- vertex_id = "id"
-
- grouping_cols = summary[0]['grouping_cols']
- if grouping_cols == "NULL":
- grouping_cols = None
-
- if grouping_cols is not None:
- glist = split_quoted_delimited_str(grouping_cols)
- select_grps = _grp_from_table(sssp_table,glist) + " , "
- check_grps_t1 = " AND " + _check_groups(
- sssp_table,temp1_name,glist)
- check_grps_t2 = " AND " + _check_groups(
- sssp_table,temp2_name,glist)
-
- grp_comma = grouping_cols + " , "
-
- if source_vertex == dest_vertex:
- plpy.execute("""
- CREATE TABLE {path_table} AS
- SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
- FROM {sssp_table} WHERE {vertex_id} = {dest_vertex}
- """.format(**locals()))
- return
-
- plpy.execute( "DROP TABLE IF EXISTS {0},{1}".
- format(temp1_name,temp2_name));
- out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
- SELECT {grp_comma} {sssp_table}.parent AS {vertex_id},
- ARRAY[{dest_vertex}] AS path
- FROM {sssp_table}
- WHERE {vertex_id} = {dest_vertex}
- AND {sssp_table}.parent IS NOT NULL
- """.format(**locals()))
-
- plpy.execute("""
- CREATE TEMP TABLE {temp2_name} AS
- SELECT * FROM {temp1_name} LIMIT 0
- """.format(**locals()))
-
- # Follow the 'parent' chain until you reach the source.
- while out.nrows() > 0:
-
- plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
- # If the vertex id is not the source vertex,
- # Add it to the path and move to its parent
- out = plpy.execute(
- """ INSERT INTO {temp2_name}
- SELECT {select_grps} {sssp_table}.parent AS {vertex_id},
- {sssp_table}.{vertex_id} || {temp1_name}.path AS path
- FROM {sssp_table} INNER JOIN {temp1_name} ON
- ({sssp_table}.{vertex_id} = {temp1_name}.{vertex_id}
- {check_grps_t1})
- WHERE {source_vertex} <> {sssp_table}.{vertex_id}
- """.format(**locals()))
-
- tmp = temp2_name
- temp2_name = temp1_name
- temp1_name = tmp
-
- tmp = check_grps_t1
- check_grps_t1 = check_grps_t2
- check_grps_t2 = tmp
-
- # Add the source vertex to the beginning of every path and
- # add the empty arrays for the groups that don't have a path to reach
- # the destination vertex
- plpy.execute("""
- CREATE TABLE {path_table} AS
- SELECT {grp_comma} {source_vertex} || path AS path
- FROM {temp2_name}
- UNION
- SELECT {grp_comma} '{{}}'::INT[] AS path
- FROM {sssp_table}
- WHERE {vertex_id} = {dest_vertex}
- AND {sssp_table}.parent IS NULL
- """.format(**locals()))
-
- out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
-
- if out.nrows() == 0:
- plpy.error(
- "Graph SSSP: Vertex {0} is not present in the SSSP table {1}".
- format(dest_vertex,sssp_table))
-
- plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
- format(**locals()))
-
- return None
+ """
+ with MinWarning("warning"):
+ _validate_get_path(sssp_table, dest_vertex, path_table)
+
+ temp1_name = unique_string(desp='temp1')
+ temp2_name = unique_string(desp='temp2')
+
+ select_grps = ""
+ check_grps_t1 = ""
+ check_grps_t2 = ""
+ grp_comma = ""
+ tmp = ""
+
+ summary = plpy.execute("SELECT * FROM {0}_summary".format(sssp_table))
+ vertex_id = summary[0]['vertex_id']
+ source_vertex = summary[0]['source_vertex']
+
+ if vertex_id == "NULL":
+ vertex_id = "id"
+
+ grouping_cols = summary[0]['grouping_cols']
+ if grouping_cols == "NULL":
+ grouping_cols = None
+
+ if grouping_cols is not None:
+ glist = split_quoted_delimited_str(grouping_cols)
+ select_grps = _grp_from_table(sssp_table, glist) + " , "
+ check_grps_t1 = " AND " + _check_groups(
+ sssp_table, temp1_name, glist)
+ check_grps_t2 = " AND " + _check_groups(
+ sssp_table, temp2_name, glist)
+
+ grp_comma = grouping_cols + " , "
+
+ if source_vertex == dest_vertex:
+ plpy.execute("""
+ CREATE TABLE {path_table} AS
+ SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
+ FROM {sssp_table} WHERE {vertex_id} = {dest_vertex}
+ """.format(**locals()))
+ return
+
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+ format(temp1_name, temp2_name))
+ out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
+ SELECT {grp_comma} {sssp_table}.parent AS {vertex_id},
+ ARRAY[{dest_vertex}] AS path
+ FROM {sssp_table}
+ WHERE {vertex_id} = {dest_vertex}
+ AND {sssp_table}.parent IS NOT NULL
+ """.format(**locals()))
+
+ plpy.execute("""
+ CREATE TEMP TABLE {temp2_name} AS
+ SELECT * FROM {temp1_name} LIMIT 0
+ """.format(**locals()))
+
+ # Follow the 'parent' chain until you reach the source.
+ while out.nrows() > 0:
+
+ plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
+ # If the vertex id is not the source vertex,
+ # Add it to the path and move to its parent
+ out = plpy.execute(
+ """ INSERT INTO {temp2_name}
+ SELECT {select_grps} {sssp_table}.parent AS {vertex_id},
+ {sssp_table}.{vertex_id} || {temp1_name}.path AS path
+ FROM {sssp_table} INNER JOIN {temp1_name} ON
+ ({sssp_table}.{vertex_id} = {temp1_name}.{vertex_id}
+ {check_grps_t1})
+ WHERE {source_vertex} <> {sssp_table}.{vertex_id}
+ """.format(**locals()))
+
+ tmp = temp2_name
+ temp2_name = temp1_name
+ temp1_name = tmp
+
+ tmp = check_grps_t1
+ check_grps_t1 = check_grps_t2
+ check_grps_t2 = tmp
+
+ # Add the source vertex to the beginning of every path and
+ # add the empty arrays for the groups that don't have a path to reach
+ # the destination vertex
+ plpy.execute("""
+ CREATE TABLE {path_table} AS
+ SELECT {grp_comma} {source_vertex} || path AS path
+ FROM {temp2_name}
+ UNION
+ SELECT {grp_comma} '{{}}'::INT[] AS path
+ FROM {sssp_table}
+ WHERE {vertex_id} = {dest_vertex}
+ AND {sssp_table}.parent IS NULL
+ """.format(**locals()))
+
+ out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
+
+ if out.nrows() == 0:
+ plpy.error(
+ "Graph SSSP: Vertex {0} is not present in the SSSP table {1}".
+ format(dest_vertex, sssp_table))
+
+ plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
+ format(**locals()))
+ return None
def _validate_sssp(vertex_table, vertex_id, edge_table, edge_params,
- source_vertex, out_table, glist, **kwargs):
+ source_vertex, out_table, glist, **kwargs):
- validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
- out_table,'SSSP')
+ validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
+ out_table, 'SSSP')
- _assert(isinstance(source_vertex,int),
- """Graph SSSP: Source vertex {source_vertex} has to be an integer.""".
- format(**locals()))
- src_exists = plpy.execute("""
- SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
- """.format(**locals()))
+ _assert(isinstance(source_vertex, int),
+ """Graph SSSP: Source vertex {source_vertex} has to be an integer.""".
+ format(**locals()))
+ src_exists = plpy.execute("""
+ SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
+ """.format(**locals()))
- if src_exists.nrows() == 0:
- plpy.error(
- """Graph SSSP: Source vertex {source_vertex} is not present in the vertex table {vertex_table}.""".
- format(**locals()))
+ if src_exists.nrows() == 0:
+ plpy.error("Graph SSSP: Source vertex {source_vertex} is not present "
+ "in the vertex table {vertex_table}.".format(**locals()))
- vt_error = plpy.execute(
- """ SELECT {vertex_id}
- FROM {vertex_table}
- WHERE {vertex_id} IS NOT NULL
- GROUP BY {vertex_id}
- HAVING count(*) > 1 """.format(**locals()))
+ vt_error = plpy.execute("""
+ SELECT {vertex_id}
+ FROM {vertex_table}
+ WHERE {vertex_id} IS NOT NULL
+ GROUP BY {vertex_id}
+ HAVING count(*) > 1
+ """.format(**locals()))
- if vt_error.nrows() != 0:
- plpy.error(
- """Graph SSSP: Source vertex table {vertex_table} contains duplicate vertex id's.""".
- format(**locals()))
+ if vt_error.nrows() != 0:
+ plpy.error("Graph SSSP: Source vertex table {vertex_table} "
+ "contains duplicate vertex id's.".format(**locals()))
- _assert(not table_exists(out_table+"_summary"),
- "Graph SSSP: Output summary table already exists!")
+ _assert(not table_exists(out_table + "_summary"),
+ "Graph SSSP: Output summary table already exists!")
- if glist is not None:
- _assert(columns_exist_in_table(edge_table, glist),
- """Graph SSSP: Not all columns from {glist} are present in edge table ({edge_table}).""".
- format(**locals()))
+ if glist is not None:
+ _assert(columns_exist_in_table(edge_table, glist),
+ "Graph SSSP: Not all columns from {glist} are present in "
+ "edge table ({edge_table}).".format(**locals()))
+ return None
- return None
def _validate_get_path(sssp_table, dest_vertex, path_table, **kwargs):
- _assert(sssp_table and sssp_table.strip().lower() not in ('null', ''),
- "Graph SSSP: Invalid SSSP table name!")
- _assert(table_exists(sssp_table),
- "Graph SSSP: SSSP table ({0}) is missing!".format(sssp_table))
- _assert(not table_is_empty(sssp_table),
- "Graph SSSP: SSSP table ({0}) is empty!".format(sssp_table))
+ _assert(sssp_table and sssp_table.strip().lower() not in ('null', ''),
+ "Graph SSSP: Invalid SSSP table name!")
+ _assert(table_exists(sssp_table),
+ "Graph SSSP: SSSP table ({0}) is missing!".format(sssp_table))
+ _assert(not table_is_empty(sssp_table),
+ "Graph SSSP: SSSP table ({0}) is empty!".format(sssp_table))
- summary = sssp_table+"_summary"
- _assert(table_exists(summary),
- "Graph SSSP: SSSP summary table ({0}) is missing!".format(summary))
- _assert(not table_is_empty(summary),
- "Graph SSSP: SSSP summary table ({0}) is empty!".format(summary))
+ summary = sssp_table + "_summary"
+ _assert(table_exists(summary),
+ "Graph SSSP: SSSP summary table ({0}) is missing!".format(summary))
+ _assert(not table_is_empty(summary),
+ "Graph SSSP: SSSP summary table ({0}) is empty!".format(summary))
- _assert(not table_exists(path_table),
- "Graph SSSP: Output path table already exists!")
+ _assert(not table_exists(path_table),
+ "Graph SSSP: Output path table already exists!")
+
+ return None
- return None
def graph_sssp_help(schema_madlib, message, **kwargs):
- """
- Help function for graph_sssp and graph_sssp_get_path
-
- Args:
- @param schema_madlib
- @param message: string, Help message string
- @param kwargs
-
- Returns:
- String. Help/usage information
- """
- if not message:
- help_string = """
+ """
+ Help function for graph_sssp and graph_sssp_get_path
+
+ Args:
+ @param schema_madlib
+ @param message: string, Help message string
+ @param kwargs
+
+ Returns:
+ String. Help/usage information
+ """
+ if not message:
+ help_string = """
-----------------------------------------------------------------------
SUMMARY
-----------------------------------------------------------------------
@@ -640,8 +639,8 @@ weights of its constituent edges is minimized.
For more details on function usage:
SELECT {schema_madlib}.graph_sssp('usage')
"""
- elif message.lower() in ['usage', 'help', '?']:
- help_string = """
+ elif message.lower() in ['usage', 'help', '?']:
+ help_string = """
Given a graph and a source vertex, single source shortest path (SSSP)
algorithm finds a path for every vertex such that the sum of the
weights of its constituent edges is minimized.
@@ -651,8 +650,8 @@ weights of its constituent edges is minimized.
To retrieve the path for a specific vertex:
SELECT {schema_madlib}.graph_sssp_get_path(
- sssp_table TEXT, -- Name of the table that contains the SSSP output.
- dest_vertex INT, -- The vertex that will be the destination of the
+ sssp_table TEXT, -- Name of the table that contains the SSSP output.
+ dest_vertex INT, -- The vertex that will be the destination of the
-- desired path.
path_table TEXT -- Name of the output table that contains the path.
);
@@ -679,8 +678,8 @@ every group and has the following columns:
- path (ARRAY) : The shortest path from the source vertex (as specified
in the SSSP execution) to the destination vertex.
"""
- elif message.lower() in ("example", "examples"):
- help_string = """
+ elif message.lower() in ("example", "examples"):
+ help_string = """
----------------------------------------------------------------------------
EXAMPLES
----------------------------------------------------------------------------
@@ -723,12 +722,12 @@ INSERT INTO edge VALUES
-- Compute the SSSP:
DROP TABLE IF EXISTS out;
SELECT madlib.graph_sssp(
- 'vertex', -- Vertex table
- 'id', -- Vertix id column
- 'edge', -- Edge table
- 'src=src, dest=dest, weight=weight', -- Comma delimted string of edge arguments
- 0, -- The source vertex
- 'out' -- Output table of SSSP
+ 'vertex', -- Vertex table
+ 'id', -- Vertix id column
+ 'edge', -- Edge table
+ 'src=src, dest=dest, weight=weight', -- Comma delimted string of edge arguments
+ 0, -- The source vertex
+ 'out' -- Output table of SSSP
);
-- View the SSSP costs for every vertex:
SELECT * FROM out ORDER BY id;
@@ -752,12 +751,14 @@ INSERT INTO edge_gr VALUES
DROP TABLE IF EXISTS out_gr, out_gr_summary;
SELECT graph_sssp('vertex',NULL,'edge_gr',NULL,0,'out_gr','grp');
"""
- else:
- help_string = "No such option. Use {schema_madlib}.graph_sssp()"
-
- return help_string.format(schema_madlib=schema_madlib,
- graph_usage=get_graph_usage(schema_madlib, 'graph_sssp',
- """source_vertex INT, -- The source vertex id for the algorithm to start.
- out_table TEXT, -- Name of the table to store the result of SSSP.
- grouping_cols TEXT -- The list of grouping columns."""))
+ else:
+ help_string = "No such option. Use {schema_madlib}.graph_sssp()"
+
+ common_usage_string = get_graph_usage(
+ schema_madlib, 'graph_sssp',
+ """source_vertex INT, -- The source vertex id for the algorithm to start.
+ out_table TEXT, -- Name of the table to store the result of SSSP.
+ grouping_cols TEXT -- The list of grouping columns.""")
+ return help_string.format(schema_madlib=schema_madlib,
+ graph_usage=common_usage_string)
# ---------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/wcc.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/wcc.py_in b/src/ports/postgres/modules/graph/wcc.py_in
index 02cceeb..1f6a81f 100644
--- a/src/ports/postgres/modules/graph/wcc.py_in
+++ b/src/ports/postgres/modules/graph/wcc.py_in
@@ -31,34 +31,37 @@ import plpy
from utilities.utilities import _assert
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string, split_quoted_delimited_str
-from utilities.validate_args import columns_exist_in_table, get_cols_and_types
-from graph_utils import *
+from utilities.validate_args import columns_exist_in_table
+from utilities.utilities import is_platform_pg, is_platform_hawq
+from graph_utils import validate_graph_coding, get_graph_usage
-m4_changequote(`<!', `!>')
def validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table,
- edge_params, out_table, grouping_cols_list, module_name):
+ edge_params, out_table, grouping_cols_list, module_name):
"""
Function to validate input parameters for wcc
"""
validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
- out_table, module_name)
+ out_table, module_name)
if grouping_cols_list:
# validate the grouping columns. We currently only support grouping_cols
# to be column names in the edge_table, and not expressions!
_assert(columns_exist_in_table(edge_table, grouping_cols_list, schema_madlib),
- "Weakly Connected Components error: One or more grouping columns specified do not exist!")
+ "Weakly Connected Components error: "
+ "One or more grouping columns specified do not exist!")
def prefix_tablename_to_colnames(table, cols_list):
return ' , '.join(["{0}.{1}".format(table, col) for col in cols_list])
+
def get_where_condition(table1, table2, cols_list):
return ' AND '.join(['{0}.{2}={1}.{2}'.format(table1, table2, col)
- for col in cols_list])
+ for col in cols_list])
+
def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
- out_table, grouping_cols, **kwargs):
+ out_table, grouping_cols, **kwargs):
"""
Function that computes the wcc
@@ -78,8 +81,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
plpy.execute('SET client_min_messages TO warning')
params_types = {'src': str, 'dest': str}
default_args = {'src': 'src', 'dest': 'dest'}
- edge_params = extract_keyvalue_params(edge_args,
- params_types, default_args)
+ edge_params = extract_keyvalue_params(edge_args, params_types, default_args)
# populate default values for optional params if null
if vertex_id is None:
@@ -89,7 +91,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
grouping_cols_list = split_quoted_delimited_str(grouping_cols)
validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table,
- edge_params, out_table, grouping_cols_list, 'Weakly Connected Components')
+ edge_params, out_table, grouping_cols_list,
+ 'Weakly Connected Components')
src = edge_params["src"]
dest = edge_params["dest"]
@@ -99,21 +102,22 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
toupdate = unique_string(desp='toupdate')
temp_out_table = unique_string(desp='tempout')
- distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
- <!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
+ distribution = '' if is_platform_pg() else "DISTRIBUTED BY ({0})".format(vertex_id)
subq_prefixed_grouping_cols = ''
comma_toupdate_prefixed_grouping_cols = ''
comma_oldupdate_prefixed_grouping_cols = ''
old_new_update_where_condition = ''
new_to_update_where_condition = ''
edge_to_update_where_condition = ''
- is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+ is_hawq = is_platform_hawq()
INT_MAX = 2147483647
component_id = 'component_id'
+ grouping_cols_comma = '' if not grouping_cols else grouping_cols + ','
+
if grouping_cols:
- distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
- <!"DISTRIBUTED BY ({0},{1})".format(grouping_cols, vertex_id)!>)
+ distribution = ('' if is_platform_pg() else
+ "DISTRIBUTED BY ({0}, {1})".format(grouping_cols, vertex_id))
# Update some variables useful for grouping based query strings
subq = unique_string(desp='subquery')
distinct_grp_table = unique_string(desp='grptable')
@@ -121,18 +125,20 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
CREATE TABLE {distinct_grp_table} AS
SELECT DISTINCT {grouping_cols} FROM {edge_table}
""".format(**locals()))
- comma_toupdate_prefixed_grouping_cols = ', ' + prefix_tablename_to_colnames(toupdate,
- grouping_cols_list)
- comma_oldupdate_prefixed_grouping_cols = ', ' + prefix_tablename_to_colnames(
- oldupdate, grouping_cols_list)
- subq_prefixed_grouping_cols = prefix_tablename_to_colnames(subq,
- grouping_cols_list)
- old_new_update_where_condition = ' AND ' + get_where_condition(
- oldupdate, newupdate, grouping_cols_list)
- new_to_update_where_condition = ' AND ' + get_where_condition(
- newupdate, toupdate, grouping_cols_list)
- edge_to_update_where_condition = ' AND ' + get_where_condition(
- edge_table, toupdate, grouping_cols_list)
+
+ pttc = prefix_tablename_to_colnames
+ gwc = get_where_condition
+
+ comma_toupdate_prefixed_grouping_cols = ', ' + pttc(toupdate, grouping_cols_list)
+ comma_oldupdate_prefixed_grouping_cols = ', ' + pttc(oldupdate, grouping_cols_list)
+ subq_prefixed_grouping_cols = pttc(subq, grouping_cols_list)
+ old_new_update_where_condition = ' AND ' + gwc(oldupdate, newupdate, grouping_cols_list)
+ new_to_update_where_condition = ' AND ' + gwc(newupdate, toupdate, grouping_cols_list)
+ edge_to_update_where_condition = ' AND ' + gwc(edge_table, toupdate, grouping_cols_list)
+ join_grouping_cols = gwc(subq, distinct_grp_table, grouping_cols_list)
+ group_by_clause = ('' if not grouping_cols else
+ '{0}, {1}.{2}'.format(subq_prefixed_grouping_cols,
+ subq, vertex_id))
plpy.execute("""
CREATE TABLE {newupdate} AS
SELECT {subq}.{vertex_id},
@@ -148,13 +154,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
ON {join_grouping_cols}
GROUP BY {group_by_clause}
{distribution}
- """.format(select_grouping_cols=','+subq_prefixed_grouping_cols,
- join_grouping_cols=get_where_condition(subq,
- distinct_grp_table, grouping_cols_list),
- group_by_clause='' if not grouping_cols else
- subq_prefixed_grouping_cols+', {0}.{1}'.format(subq, vertex_id),
- select_grouping_cols_clause='' if not grouping_cols else
- grouping_cols+', ', **locals()))
+ """.format(select_grouping_cols=',' + subq_prefixed_grouping_cols,
+ select_grouping_cols_clause=grouping_cols_comma,
+ **locals()))
plpy.execute("""
CREATE TEMP TABLE {message} AS
SELECT {vertex_id},
@@ -162,8 +164,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
{select_grouping_cols_clause}
FROM {newupdate}
{distribution}
- """.format(select_grouping_cols_clause='' if not grouping_cols else
- ', '+grouping_cols, **locals()))
+ """.format(select_grouping_cols_clause=grouping_cols_comma,
+ **locals()))
else:
plpy.execute("""
CREATE TABLE {newupdate} AS
@@ -186,13 +188,14 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
""".format(**locals()))
nodes_to_update = 1
while nodes_to_update > 0:
- # This idea here is simple. Look at all the neighbors of a node, and
- # assign the smallest node id among the neighbors as its component_id.
- # The next table starts off with very high component_id (INT_MAX). The
- # component_id of all nodes which obtain a smaller component_id after
- # looking at its neighbors are updated in the next table. At every
- # iteration update only those nodes whose component_id in the previous
- # iteration are greater than what was found in the current iteration.
+ # Look at all the neighbors of a node, and assign the smallest node id
+ # among the neighbors as its component_id. The next table starts off
+ # with very high component_id (INT_MAX). The component_id of all nodes
+ # which obtain a smaller component_id after looking at its neighbors are
+ # updated in the next table. At every iteration update only those nodes
+ # whose component_id in the previous iteration are greater than what was
+ # found in the current iteration.
+
plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
plpy.execute("""
CREATE TEMP TABLE {oldupdate} AS
@@ -202,10 +205,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
FROM {message}
GROUP BY {group_by_clause} {vertex_id}
{distribution}
- """.format(grouping_cols_select='' if not grouping_cols else
- ', {0}'.format(grouping_cols), group_by_clause=''
- if not grouping_cols else '{0}, '.format(grouping_cols),
- **locals()))
+ """.format(grouping_cols_select='' if not grouping_cols else ', {0}'.format(grouping_cols),
+ group_by_clause=grouping_cols_comma,
+ **locals()))
plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate))
plpy.execute("""
@@ -236,8 +238,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
SELECT * FROM {toupdate};
""".format(**locals()))
plpy.execute("DROP TABLE {0}".format(newupdate))
- plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(temp_out_table,
- newupdate))
+ plpy.execute("ALTER TABLE {0} RENAME TO {1}".
+ format(temp_out_table, newupdate))
plpy.execute("""
CREATE TABLE {temp_out_table} AS
SELECT * FROM {newupdate}
@@ -275,9 +277,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
) AS t
GROUP BY {group_by_clause} {vertex_id}
""".format(select_grouping_cols='' if not grouping_cols
- else ', {0}'.format(grouping_cols), group_by_clause=''
- if not grouping_cols else ' {0}, '.format(grouping_cols),
- **locals()))
+ else ', {0}'.format(grouping_cols), group_by_clause=''
+ if not grouping_cols else ' {0}, '.format(grouping_cols),
+ **locals()))
plpy.execute("DROP TABLE {0}".format(oldupdate))
if grouping_cols:
@@ -300,6 +302,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
if is_hawq:
plpy.execute("""DROP TABLE IF EXISTS {0}""".format(temp_out_table))
+
def wcc_help(schema_madlib, message, **kwargs):
"""
Help function for wcc
@@ -315,11 +318,13 @@ def wcc_help(schema_madlib, message, **kwargs):
if message is not None and \
message.lower() in ("usage", "help", "?"):
help_string = "Get from method below"
- help_string = get_graph_usage(schema_madlib, 'Weakly Connected Components',
+ help_string = get_graph_usage(
+ schema_madlib,
+ 'Weakly Connected Components',
"""out_table TEXT, -- Output table of weakly connected components
- grouping_col TEXT -- Comma separated column names to group on
- -- (DEFAULT = NULL, no grouping)
-""")
+ grouping_col TEXT -- Comma separated column names to group on
+ -- (DEFAULT = NULL, no grouping)
+ """)
else:
if message is not None and \
message.lower() in ("example", "examples"):
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/utilities/utilities.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/utilities.py_in b/src/ports/postgres/modules/utilities/utilities.py_in
index 6a5e8f9..b28a5f3 100644
--- a/src/ports/postgres/modules/utilities/utilities.py_in
+++ b/src/ports/postgres/modules/utilities/utilities.py_in
@@ -14,32 +14,43 @@ if __name__ != "__main__":
m4_changequote(`<!', `!>')
+def has_function_properties():
+ """ __HAS_FUNCTION_PROPERTIES__ variable defined during configure """
+ return m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!True!>, <!False!>)
+
+
+def is_platform_pg():
+ """ __POSTGRESQL__ variable defined during configure """
+ return m4_ifdef(<!__POSTGRESQL__!>, <!True!>, <!False!>)
+# ------------------------------------------------------------------------------
+
+
+def is_platform_hawq():
+ """ __HAWQ__ variable defined during configure """
+ return m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+# ------------------------------------------------------------------------------
+
+
def get_seg_number():
""" Find out how many primary segments exist in the distribution
Might be useful for partitioning data.
"""
- m4_ifdef(<!__POSTGRESQL__!>, <!return 1!>, <!
- return plpy.execute(
- """
- SELECT count(*) from gp_segment_configuration
- WHERE role = 'p'
- """)[0]['count']
- !>)
+ if is_platform_pg():
+ return 1
+ else:
+ return plpy.execute("""
+ SELECT count(*) from gp_segment_configuration
+ WHERE role = 'p'
+ """)[0]['count']
# ------------------------------------------------------------------------------
def is_orca():
- m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!
- optimizer = plpy.execute("show optimizer")[0]["optimizer"]
- return True if optimizer == 'on' else False
- !>, <!
+ if has_function_properties():
+ optimizer = plpy.execute("show optimizer")[0]["optimizer"]
+ if optimizer == 'on':
+ return True
return False
- !>)
-# ------------------------------------------------------------------------------
-
-
-def is_platform_pg():
- return m4_ifdef(<!__POSTGRESQL__!>, <!True!>, <!False!>)
# ------------------------------------------------------------------------------
[2/3] incubator-madlib git commit: Graph: Update Python code to
follow PEP-8
Posted by ri...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/pagerank.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/pagerank.py_in b/src/ports/postgres/modules/graph/pagerank.py_in
index 4ef5876..95bc4b4 100644
--- a/src/ports/postgres/modules/graph/pagerank.py_in
+++ b/src/ports/postgres/modules/graph/pagerank.py_in
@@ -32,36 +32,38 @@ from utilities.control import MinWarning
from utilities.utilities import _assert
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string, split_quoted_delimited_str
+from utilities.utilities import is_platform_pg
+
from utilities.validate_args import columns_exist_in_table, get_cols_and_types
from graph_utils import *
-m4_changequote(`<!', `!>')
def validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table,
- edge_params, out_table, damping_factor, max_iter, threshold,
- grouping_cols_list, module_name):
+ edge_params, out_table, damping_factor, max_iter,
+ threshold, grouping_cols_list):
"""
Function to validate input parameters for PageRank
"""
validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
- out_table, module_name)
+ out_table, 'PageRank')
_assert(damping_factor >= 0.0 and damping_factor <= 1.0,
- """PageRank: Invalid damping factor value ({0}), must be between 0 and 1.""".
- format(damping_factor))
+ "PageRank: Invalid damping factor value ({0}), must be between 0 and 1.".
+ format(damping_factor))
_assert(not threshold or (threshold >= 0.0 and threshold <= 1.0),
- """PageRank: Invalid threshold value ({0}), must be between 0 and 1.""".
- format(threshold))
+ "PageRank: Invalid threshold value ({0}), must be between 0 and 1.".
+ format(threshold))
_assert(max_iter > 0,
- """PageRank: Invalid max_iter value ({0}), must be a positive integer.""".
- format(max_iter))
+ """PageRank: Invalid max_iter value ({0}), must be a positive integer.""".
+ format(max_iter))
if grouping_cols_list:
# validate the grouping columns. We currently only support grouping_cols
# to be column names in the edge_table, and not expressions!
_assert(columns_exist_in_table(edge_table, grouping_cols_list, schema_madlib),
"PageRank error: One or more grouping columns specified do not exist!")
+
def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
- out_table, damping_factor, max_iter, threshold, grouping_cols, **kwargs):
+ out_table, damping_factor, max_iter, threshold, grouping_cols, **kwargs):
"""
Function that computes the PageRank
@@ -76,458 +78,451 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
@param max_iter
@param threshold
"""
- old_msg_level = plpy.execute("""
- SELECT setting
- FROM pg_settings
- WHERE name='client_min_messages'
- """)[0]['setting']
- plpy.execute('SET client_min_messages TO warning')
- params_types = {'src': str, 'dest': str}
- default_args = {'src': 'src', 'dest': 'dest'}
- edge_params = extract_keyvalue_params(edge_args, params_types, default_args)
-
- # populate default values for optional params if null
- if damping_factor is None:
- damping_factor = 0.85
- if max_iter is None:
- max_iter = 100
- if vertex_id is None:
- vertex_id = "id"
- if not grouping_cols:
- grouping_cols = ''
-
- grouping_cols_list = split_quoted_delimited_str(grouping_cols)
- validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table,
- edge_params, out_table, damping_factor, max_iter, threshold,
- grouping_cols_list, 'PageRank')
- summary_table = out_table + "_summary"
- _assert(not table_exists(summary_table),
- "Graph PageRank: Output summary table ({summary_table}) already exists."
- .format(**locals()))
- src = edge_params["src"]
- dest = edge_params["dest"]
- nvertices = plpy.execute("""
- SELECT COUNT({0}) AS cnt
- FROM {1}
- """.format(vertex_id, vertex_table))[0]["cnt"]
- # A fixed threshold value, of say 1e-5, might not work well when the
- # number of vertices is a billion, since the initial pagerank value
- # of all nodes would then be 1/1e-9. So, assign default threshold
- # value based on number of nodes in the graph.
- # NOTE: The heuristic below is not based on any scientific evidence.
- if threshold is None:
- threshold = 1.0/(nvertices*1000)
-
- # table/column names used when grouping_cols is set.
- distinct_grp_table = ''
- vertices_per_group = ''
- vpg = ''
- grouping_where_clause = ''
- group_by_clause = ''
- random_prob = ''
-
- edge_temp_table = unique_string(desp='temp_edge')
- distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
- <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+','
- if grouping_cols else '', dest)!>)
- plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table))
- plpy.execute("""CREATE TEMP TABLE {edge_temp_table} AS
- SELECT * FROM {edge_table}
- {distribution}
- """.format(**locals()))
- # GPDB and HAWQ have distributed by clauses to help them with indexing.
- # For Postgres we add the index explicitly.
- sql_index = m4_ifdef(<!__POSTGRESQL__!>,
- <!"""CREATE INDEX ON {edge_temp_table} ({src});
- """.format(**locals())!>,
- <!''!>)
- plpy.execute(sql_index)
-
- # Intermediate tables required.
- cur = unique_string(desp='cur')
- message = unique_string(desp='message')
- cur_unconv = unique_string(desp='cur_unconv')
- message_unconv = unique_string(desp='message_unconv')
- out_cnts = unique_string(desp='out_cnts')
- out_cnts_cnt = unique_string(desp='cnt')
- v1 = unique_string(desp='v1')
-
- cur_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
- <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+','
- if grouping_cols else '', vertex_id)!>)
- cnts_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
- <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+','
- if grouping_cols else '', vertex_id)!>)
- cur_join_clause = """{edge_temp_table}.{dest}={cur}.{vertex_id}
- """.format(**locals())
- out_cnts_join_clause = """{out_cnts}.{vertex_id}={edge_temp_table}.{src}
- """.format(**locals())
- v1_join_clause = """{v1}.{vertex_id}={edge_temp_table}.{src}
- """.format(**locals())
-
- random_probability = (1.0-damping_factor)/nvertices
- ######################################################################
- # Create several strings that will be used to construct required
- # queries. These strings will be required only during grouping.
- random_jump_prob = random_probability
- ignore_group_clause_first = ''
- limit = ' LIMIT 1 '
-
- grouping_cols_select_pr = ''
- vertices_per_group_inner_join_pr = ''
- ignore_group_clause_pr= ''
-
- grouping_cols_select_ins = ''
- vpg_from_clause_ins = ''
- vpg_where_clause_ins = ''
- message_grp_where_ins = ''
- ignore_group_clause_ins = ''
-
- nodes_with_no_incoming_edges = unique_string(desp='no_incoming')
- ignore_group_clause_ins_noincoming = ''
-
- grouping_cols_select_conv = '{0}.{1}'.format(cur, vertex_id)
- group_by_grouping_cols_conv = ''
- message_grp_clause_conv = ''
- ignore_group_clause_conv = ''
- ######################################################################
-
- # Queries when groups are involved need a lot more conditions in
- # various clauses, so populating the required variables. Some intermediate
- # tables are unnecessary when no grouping is involved, so create some
- # tables and certain columns only during grouping.
- if grouping_cols:
- distinct_grp_table = unique_string(desp='grp')
- plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
- plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS
- SELECT DISTINCT {grouping_cols} FROM {edge_temp_table}
- """.format(**locals()))
- vertices_per_group = unique_string(desp='nvert_grp')
- init_pr = unique_string(desp='init')
- random_prob = unique_string(desp='rand')
- subq = unique_string(desp='subquery')
- rand_damp = 1-damping_factor
- grouping_where_clause = ' AND '.join(
- [distinct_grp_table+'.'+col+'='+subq+'.'+col
- for col in grouping_cols_list])
- group_by_clause = ', '.join([distinct_grp_table+'.'+col
- for col in grouping_cols_list])
- # Find number of vertices in each group, this is the normalizing factor
- # for computing the random_prob
- plpy.execute("DROP TABLE IF EXISTS {0}".format(vertices_per_group))
- plpy.execute("""CREATE TEMP TABLE {vertices_per_group} AS
- SELECT {distinct_grp_table}.*,
- 1/COUNT(__vertices__)::DOUBLE PRECISION AS {init_pr},
- {rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION AS {random_prob}
- FROM {distinct_grp_table} INNER JOIN (
- SELECT {grouping_cols}, {src} AS __vertices__
- FROM {edge_temp_table}
- UNION
- SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
- ){subq}
- ON {grouping_where_clause}
- GROUP BY {group_by_clause}
- """.format(**locals()))
-
- grouping_where_clause = ' AND '.join(
- [vertices_per_group+'.'+col+'='+subq+'.'+col
- for col in grouping_cols_list])
- group_by_clause = ', '.join([vertices_per_group+'.'+col
- for col in grouping_cols_list])
+ with MinWarning('warning'):
+ params_types = {'src': str, 'dest': str}
+ default_args = {'src': 'src', 'dest': 'dest'}
+ edge_params = extract_keyvalue_params(edge_args, params_types, default_args)
+
+ # populate default values for optional params if null
+ if damping_factor is None:
+ damping_factor = 0.85
+ if max_iter is None:
+ max_iter = 100
+ if vertex_id is None:
+ vertex_id = "id"
+ if not grouping_cols:
+ grouping_cols = ''
+
+ grouping_cols_list = split_quoted_delimited_str(grouping_cols)
+ validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table,
+ edge_params, out_table, damping_factor,
+ max_iter, threshold, grouping_cols_list)
+ summary_table = out_table + "_summary"
+ _assert(not table_exists(summary_table),
+ "Graph PageRank: Output summary table ({summary_table}) already exists."
+ .format(**locals()))
+ src = edge_params["src"]
+ dest = edge_params["dest"]
+ n_vertices = plpy.execute("""
+ SELECT COUNT({0}) AS cnt
+ FROM {1}
+ """.format(vertex_id, vertex_table))[0]["cnt"]
+ # A fixed threshold value, of say 1e-5, might not work well when the
+ # number of vertices is a billion, since the initial pagerank value
+ # of all nodes would then be 1/1e-9. So, assign default threshold
+ # value based on number of nodes in the graph.
+ # NOTE: The heuristic below is not based on any scientific evidence.
+ if threshold is None:
+ threshold = 1.0 / (n_vertices * 1000)
+
+ # table/column names used when grouping_cols is set.
+ distinct_grp_table = ''
+ vertices_per_group = ''
+ vpg = ''
+ grouping_where_clause = ''
+ group_by_clause = ''
+ random_prob = ''
+
+ edge_temp_table = unique_string(desp='temp_edge')
+ grouping_cols_comma = grouping_cols + ',' if grouping_cols else ''
+ distribution = ('' if is_platform_pg() else
+ "DISTRIBUTED BY ({0}{1})".format(grouping_cols_comma, dest))
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table))
plpy.execute("""
- CREATE TEMP TABLE {cur} AS
- SELECT {group_by_clause}, {subq}.__vertices__ as {vertex_id},
- {init_pr} AS pagerank
- FROM {vertices_per_group} INNER JOIN (
- SELECT {grouping_cols}, {src} AS __vertices__
- FROM {edge_temp_table}
- UNION
- SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
- ){subq}
- ON {grouping_where_clause}
- {cur_distribution}
- """.format(**locals()))
- vpg = unique_string(desp='vpg')
- # Compute the out-degree of every node in the group-based subgraphs.
- plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
- plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
- SELECT {grouping_cols_select} {src} AS {vertex_id},
- COUNT({dest}) AS {out_cnts_cnt}
- FROM {edge_temp_table}
- GROUP BY {grouping_cols_select} {src}
- {cnts_distribution}
- """.format(grouping_cols_select=grouping_cols+','
- if grouping_cols else '', **locals()))
-
- message_grp = ' AND '.join(
- ["{cur}.{col}={message}.{col}".format(**locals())
- for col in grouping_cols_list])
- cur_join_clause = cur_join_clause + ' AND ' + ' AND '.join(
- ["{edge_temp_table}.{col}={cur}.{col}".format(**locals())
- for col in grouping_cols_list])
- out_cnts_join_clause = out_cnts_join_clause + ' AND ' + ' AND '.join(
- ["{edge_temp_table}.{col}={out_cnts}.{col}".format(**locals())
- for col in grouping_cols_list])
- v1_join_clause = v1_join_clause + ' AND ' + ' AND '.join(
- ["{edge_temp_table}.{col}={v1}.{col}".format(**locals())
- for col in grouping_cols_list])
- vpg_join_clause = ' AND '.join(
- ["{edge_temp_table}.{col}={vpg}.{col}".format(**locals())
- for col in grouping_cols_list])
- vpg_t1_join_clause = ' AND '.join(
- ["__t1__.{col}={vpg}.{col}".format(**locals())
- for col in grouping_cols_list])
- # join clause specific to populating random_prob for nodes without any
- # incoming edges.
- edge_grouping_cols_select = ', '.join(
- ["{edge_temp_table}.{col}".format(**locals())
- for col in grouping_cols_list])
- cur_grouping_cols_select = ', '.join(
- ["{cur}.{col}".format(**locals()) for col in grouping_cols_list])
- # Create output summary table:
- cols_names_types = get_cols_and_types(edge_table)
- grouping_cols_clause = ', '.join([c_name+" "+c_type
- for (c_name, c_type) in cols_names_types
- if c_name in grouping_cols_list])
- plpy.execute("""
- CREATE TABLE {summary_table} (
- {grouping_cols_clause},
- __iterations__ INTEGER
- )
- """.format(**locals()))
- # Create output table. This will be updated whenever a group converges
- # Note that vertex_id is assumed to be an integer (as described in
- # documentation)
- plpy.execute("""
- CREATE TABLE {out_table} (
- {grouping_cols_clause},
- {vertex_id} INTEGER,
- pagerank DOUBLE PRECISION
- )
- """.format(**locals()))
- temp_summary_table = unique_string(desp='temp_summary')
- plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_summary_table))
- plpy.execute("""
- CREATE TABLE {temp_summary_table} (
- {grouping_cols_clause}
- )
+ CREATE TEMP TABLE {edge_temp_table} AS
+ SELECT * FROM {edge_table}
+ {distribution}
""".format(**locals()))
+
+ # GPDB and HAWQ have distributed by clauses to help them with indexing.
+ # For Postgres we add the index explicitly.
+ if is_platform_pg():
+ plpy.execute("CREATE INDEX ON {0}({1})".format(edge_temp_table, src))
+
+ # Intermediate tables required.
+ cur = unique_string(desp='cur')
+ message = unique_string(desp='message')
+ cur_unconv = unique_string(desp='cur_unconv')
+ message_unconv = unique_string(desp='message_unconv')
+ out_cnts = unique_string(desp='out_cnts')
+ out_cnts_cnt = unique_string(desp='cnt')
+ v1 = unique_string(desp='v1')
+
+ if is_platform_pg():
+ cur_distribution = cnts_distribution = ''
+ else:
+ cur_distribution = cnts_distribution = \
+ "DISTRIBUTED BY ({0}{1})".format(grouping_cols_comma, vertex_id)
+ cur_join_clause = "{edge_temp_table}.{dest} = {cur}.{vertex_id}".format(**locals())
+ out_cnts_join_clause = "{out_cnts}.{vertex_id} = {edge_temp_table}.{src}".format(**locals())
+ v1_join_clause = "{v1}.{vertex_id} = {edge_temp_table}.{src}".format(**locals())
+
+ random_probability = (1.0 - damping_factor) / n_vertices
+ ######################################################################
+ # Create several strings that will be used to construct required
+ # queries. These strings will be required only during grouping.
+ random_jump_prob = random_probability
+ ignore_group_clause_first = ''
+ limit = ' LIMIT 1 '
+
+ grouping_cols_select_pr = ''
+ vertices_per_group_inner_join_pr = ''
+ ignore_group_clause_pr = ''
+
+ grouping_cols_select_ins = ''
+ vpg_from_clause_ins = ''
+ vpg_where_clause_ins = ''
+ message_grp_where_ins = ''
+ ignore_group_clause_ins = ''
+
+ nodes_with_no_incoming_edges = unique_string(desp='no_incoming')
+ ignore_group_clause_ins_noincoming = ''
+
+ grouping_cols_select_conv = '{0}.{1}'.format(cur, vertex_id)
+ group_by_grouping_cols_conv = ''
+ message_grp_clause_conv = ''
+ ignore_group_clause_conv = ''
######################################################################
- # Strings required for the main PageRank computation query
- grouping_cols_select_pr = edge_grouping_cols_select+', '
- random_jump_prob = 'MIN({vpg}.{random_prob})'.format(**locals())
- vertices_per_group_inner_join_pr = """INNER JOIN {vertices_per_group}
- AS {vpg} ON {vpg_join_clause}""".format(**locals())
- ignore_group_clause_pr=' WHERE '+get_ignore_groups(summary_table,
- edge_temp_table, grouping_cols_list)
- ignore_group_clause_ins_noincoming = ' WHERE '+get_ignore_groups(
- summary_table, nodes_with_no_incoming_edges, grouping_cols_list)
- # Strings required for updating PageRank scores of vertices that have
- # no incoming edges
- grouping_cols_select_ins = cur_grouping_cols_select+','
- vpg_from_clause_ins = ', {vertices_per_group} AS {vpg}'.format(
- **locals())
- vpg_where_clause_ins = ' AND {vpg_t1_join_clause} '.format(
- **locals())
- message_grp_where_ins = 'WHERE {message_grp}'.format(**locals())
- ignore_group_clause_ins = ' AND '+get_ignore_groups(summary_table,
- cur, grouping_cols_list)
- # Strings required for convergence test query
- grouping_cols_select_conv = cur_grouping_cols_select
- group_by_grouping_cols_conv = ' GROUP BY {0}'.format(
- cur_grouping_cols_select)
- message_grp_clause_conv = '{0} AND '.format(message_grp)
- ignore_group_clause_conv = ' AND '+get_ignore_groups(summary_table,
- cur, grouping_cols_list)
- limit = ''
-
- # Find all nodes, in each group, that have no incoming edges. The PageRank
- # value of these nodes are not updated using the first query in the
- # following for loop. They must be explicitly plugged back in to the
- # message table, with their corresponding group's random_prob as their
- # PageRank values.
- plpy.execute("""
- CREATE TABLE {nodes_with_no_incoming_edges} AS
- SELECT {select_group_cols}, __t1__.{src} AS {vertex_id},
- {vpg}.{random_prob} AS pagerank
- FROM {edge_temp_table} AS __t1__ {vpg_from_clause_ins}
- WHERE NOT EXISTS (
- SELECT 1
- FROM {edge_temp_table} AS __t2__
- WHERE __t1__.{src}=__t2__.{dest} AND {where_group_clause}
- ) {vpg_where_clause_ins}
- """.format(select_group_cols=','.join(['__t1__.{0}'.format(col)
- for col in grouping_cols_list]),
- where_group_clause=' AND '.join(['__t1__.{0}=__t2__.{0}'.format(col)
- for col in grouping_cols_list]),
- **locals()))
- else:
- # cur and out_cnts tables can be simpler when no grouping is involved.
- init_value = 1.0/nvertices
- plpy.execute("""
- CREATE TEMP TABLE {cur} AS
- SELECT {vertex_id}, {init_value}::DOUBLE PRECISION AS pagerank
- FROM {vertex_table}
- {cur_distribution}
- """.format(**locals()))
- # Compute the out-degree of every node in the graph.
- plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
- plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
- SELECT {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt}
- FROM {edge_temp_table}
- GROUP BY {src}
- {cnts_distribution}
- """.format(**locals()))
- # The summary table when there is no grouping will contain only
- # the iteration column. We don't need to create the out_table
- # when no grouping is used since the 'cur' table will be renamed
- # to out_table after pagerank computation is completed.
- plpy.execute("""
- CREATE TABLE {summary_table} (
- __iterations__ INTEGER
- )
- """.format(**locals()))
- # Find all nodes in the graph that don't have any incoming edges and
- # assign random_prob as their pagerank values.
- plpy.execute("""
- CREATE TABLE {nodes_with_no_incoming_edges} AS
- SELECT DISTINCT({src}), {random_probability} AS pagerank
- FROM {edge_temp_table}
- EXCEPT
- (SELECT DISTINCT({dest}), {random_probability} AS pagerank
- FROM {edge_temp_table})
- """.format(**locals()))
- unconverged = 0
- iteration_num = 0
- for iteration_num in range(max_iter):
- #####################################################################
- # PageRank for node 'A' at any given iteration 'i' is given by:
- # PR_i(A) = damping_factor(PR_i-1(B)/degree(B) +
- # PR_i-1(C)/degree(C) + ...) + (1-damping_factor)/N
- # where 'N' is the number of vertices in the graph,
- # B, C are nodes that have edges to node A, and
- # degree(node) represents the number of outgoing edges from 'node'
- #####################################################################
- # Essentially, the pagerank for a node is based on an aggregate of a
- # fraction of the pagerank values of all the nodes that have incoming
- # edges to it, along with a small random probability.
- # More information can be found at:
- # https://en.wikipedia.org/wiki/PageRank#Damping_factor
-
- # The query below computes the PageRank of each node using the above
- # formula. A small explanatory note on ignore_group_clause:
- # This is used only when grouping is set. This essentially will have
- # the condition that will help skip the PageRank computation on groups
- # that have converged.
- plpy.execute("""
- CREATE TABLE {message} AS
- SELECT {grouping_cols_select_pr} {edge_temp_table}.{dest} AS {vertex_id},
- SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_jump_prob} AS pagerank
+ # Queries when groups are involved need a lot more conditions in
+ # various clauses, so populating the required variables. Some intermediate
+ # tables are unnecessary when no grouping is involved, so create some
+ # tables and certain columns only during grouping.
+ if grouping_cols:
+ distinct_grp_table = unique_string(desp='grp')
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
+ plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS
+ SELECT DISTINCT {grouping_cols} FROM {edge_temp_table}
+ """.format(**locals()))
+ vertices_per_group = unique_string(desp='nvert_grp')
+ init_pr = unique_string(desp='init')
+ random_prob = unique_string(desp='rand')
+ subq = unique_string(desp='subquery')
+ rand_damp = 1 - damping_factor
+ grouping_where_clause = ' AND '.join(
+ [distinct_grp_table + '.' + col + '=' + subq + '.' + col
+ for col in grouping_cols_list])
+ group_by_clause = ', '.join([distinct_grp_table + '.' + col
+ for col in grouping_cols_list])
+ # Find number of vertices in each group, this is the normalizing factor
+ # for computing the random_prob
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(vertices_per_group))
+ plpy.execute("""CREATE TEMP TABLE {vertices_per_group} AS
+ SELECT {distinct_grp_table}.*,
+ 1/COUNT(__vertices__)::DOUBLE PRECISION AS {init_pr},
+ {rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION AS {random_prob}
+ FROM {distinct_grp_table} INNER JOIN (
+ SELECT {grouping_cols}, {src} AS __vertices__
+ FROM {edge_temp_table}
+ UNION
+ SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
+ ){subq}
+ ON {grouping_where_clause}
+ GROUP BY {group_by_clause}
+ """.format(**locals()))
+
+ grouping_where_clause = ' AND '.join(
+ [vertices_per_group + '.' + col + '=' + subq + '.' + col
+ for col in grouping_cols_list])
+ group_by_clause = ', '.join([vertices_per_group + '.' + col
+ for col in grouping_cols_list])
+ plpy.execute("""
+ CREATE TEMP TABLE {cur} AS
+ SELECT {group_by_clause}, {subq}.__vertices__ as {vertex_id},
+ {init_pr} AS pagerank
+ FROM {vertices_per_group} INNER JOIN (
+ SELECT {grouping_cols}, {src} AS __vertices__
+ FROM {edge_temp_table}
+ UNION
+ SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
+ ){subq}
+ ON {grouping_where_clause}
+ {cur_distribution}
+ """.format(**locals()))
+ vpg = unique_string(desp='vpg')
+ # Compute the out-degree of every node in the group-based subgraphs.
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
+ plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
+ SELECT {grouping_cols_select} {src} AS {vertex_id},
+ COUNT({dest}) AS {out_cnts_cnt}
FROM {edge_temp_table}
- INNER JOIN {cur} ON {cur_join_clause}
- INNER JOIN {out_cnts} ON {out_cnts_join_clause}
- INNER JOIN {cur} AS {v1} ON {v1_join_clause}
- {vertices_per_group_inner_join_pr}
- {ignore_group_clause}
- GROUP BY {grouping_cols_select_pr} {edge_temp_table}.{dest}
- {cur_distribution}
- """.format(ignore_group_clause=ignore_group_clause_pr
- if iteration_num>0 else ignore_group_clause_first,
- **locals()))
- # If there are nodes that have no incoming edges, they are not
- # captured in the message table. Insert entries for such nodes,
- # with random_prob.
- plpy.execute("""
- INSERT INTO {message}
- SELECT *
- FROM {nodes_with_no_incoming_edges}
- {ignore_group_clause}
- """.format(ignore_group_clause=ignore_group_clause_ins_noincoming
- if iteration_num>0 else ignore_group_clause_first,
- **locals()))
- # Check for convergence:
- ## Check for convergence only if threshold != 0.
- if threshold != 0:
- # message_unconv and cur_unconv will contain the unconverged groups
- # after current # and previous iterations respectively. Groups that
- # are missing in message_unconv but appear in cur_unconv are the
- # groups that have converged after this iteration's computations.
- # If no grouping columns are specified, then we check if there is
- # at least one unconverged node (limit 1 is used in the query).
+ GROUP BY {grouping_cols_select} {src}
+ {cnts_distribution}
+ """.format(grouping_cols_select=grouping_cols + ','
+ if grouping_cols else '', **locals()))
+
+ message_grp = ' AND '.join(
+ ["{cur}.{col}={message}.{col}".format(**locals())
+ for col in grouping_cols_list])
+ cur_join_clause = cur_join_clause + ' AND ' + ' AND '.join(
+ ["{edge_temp_table}.{col}={cur}.{col}".format(**locals())
+ for col in grouping_cols_list])
+ out_cnts_join_clause = out_cnts_join_clause + ' AND ' + ' AND '.join(
+ ["{edge_temp_table}.{col}={out_cnts}.{col}".format(**locals())
+ for col in grouping_cols_list])
+ v1_join_clause = v1_join_clause + ' AND ' + ' AND '.join(
+ ["{edge_temp_table}.{col}={v1}.{col}".format(**locals())
+ for col in grouping_cols_list])
+ vpg_join_clause = ' AND '.join(
+ ["{edge_temp_table}.{col}={vpg}.{col}".format(**locals())
+ for col in grouping_cols_list])
+ vpg_t1_join_clause = ' AND '.join(
+ ["__t1__.{col}={vpg}.{col}".format(**locals())
+ for col in grouping_cols_list])
+ # join clause specific to populating random_prob for nodes without any
+ # incoming edges.
+ edge_grouping_cols_select = ', '.join(
+ ["{edge_temp_table}.{col}".format(**locals())
+ for col in grouping_cols_list])
+ cur_grouping_cols_select = ', '.join(
+ ["{cur}.{col}".format(**locals()) for col in grouping_cols_list])
+ # Create output summary table:
+ cols_names_types = get_cols_and_types(edge_table)
+ grouping_cols_clause = ', '.join([c_name + " " + c_type
+ for (c_name, c_type) in cols_names_types
+ if c_name in grouping_cols_list])
plpy.execute("""
- CREATE TEMP TABLE {message_unconv} AS
- SELECT {grouping_cols_select_conv}
- FROM {message}
- INNER JOIN {cur}
- ON {cur}.{vertex_id}={message}.{vertex_id}
- WHERE {message_grp_clause_conv}
- ABS({cur}.pagerank-{message}.pagerank) > {threshold}
- {ignore_group_clause}
- {group_by_grouping_cols_conv}
- {limit}
- """.format(ignore_group_clause=ignore_group_clause_ins
- if iteration_num>0 else ignore_group_clause_conv,
- **locals()))
- unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
- """.format(message_unconv))[0]["cnt"]
- if iteration_num > 0 and grouping_cols:
- # Update result and summary tables for groups that have
- # converged
- # since the last iteration.
- update_result_tables(temp_summary_table, iteration_num,
- summary_table, out_table, message, grouping_cols_list,
- cur_unconv, message_unconv)
- plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv))
- plpy.execute("""ALTER TABLE {message_unconv} RENAME TO
- {cur_unconv} """.format(**locals()))
+ CREATE TABLE {summary_table} (
+ {grouping_cols_clause},
+ __iterations__ INTEGER
+ )
+ """.format(**locals()))
+ # Create output table. This will be updated whenever a group converges
+ # Note that vertex_id is assumed to be an integer (as described in
+ # documentation)
+ plpy.execute("""
+ CREATE TABLE {out_table} (
+ {grouping_cols_clause},
+ {vertex_id} INTEGER,
+ pagerank DOUBLE PRECISION
+ )
+ """.format(**locals()))
+ temp_summary_table = unique_string(desp='temp_summary')
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_summary_table))
+ plpy.execute("""
+ CREATE TABLE {temp_summary_table} (
+ {grouping_cols_clause}
+ )
+ """.format(**locals()))
+ ######################################################################
+ # Strings required for the main PageRank computation query
+ grouping_cols_select_pr = edge_grouping_cols_select + ', '
+ random_jump_prob = 'MIN({vpg}.{random_prob})'.format(**locals())
+ vertices_per_group_inner_join_pr = """INNER JOIN {vertices_per_group}
+ AS {vpg} ON {vpg_join_clause}""".format(**locals())
+ ignore_group_clause_pr = ' WHERE ' + get_ignore_groups(summary_table,
+ edge_temp_table, grouping_cols_list)
+ ignore_group_clause_ins_noincoming = ' WHERE ' + get_ignore_groups(
+ summary_table, nodes_with_no_incoming_edges, grouping_cols_list)
+ # Strings required for updating PageRank scores of vertices that have
+ # no incoming edges
+ grouping_cols_select_ins = cur_grouping_cols_select + ','
+ vpg_from_clause_ins = ', {vertices_per_group} AS {vpg}'.format(
+ **locals())
+ vpg_where_clause_ins = ' AND {vpg_t1_join_clause} '.format(
+ **locals())
+ message_grp_where_ins = 'WHERE {message_grp}'.format(**locals())
+ ignore_group_clause_ins = ' AND ' + get_ignore_groups(summary_table,
+ cur, grouping_cols_list)
+ # Strings required for convergence test query
+ grouping_cols_select_conv = cur_grouping_cols_select
+ group_by_grouping_cols_conv = ' GROUP BY {0}'.format(
+ cur_grouping_cols_select)
+ message_grp_clause_conv = '{0} AND '.format(message_grp)
+ ignore_group_clause_conv = ' AND ' + get_ignore_groups(summary_table,
+ cur, grouping_cols_list)
+ limit = ''
+
+ # Find all nodes, in each group, that have no incoming edges. The PageRank
+ # value of these nodes are not updated using the first query in the
+ # following for loop. They must be explicitly plugged back in to the
+ # message table, with their corresponding group's random_prob as their
+ # PageRank values.
+ plpy.execute("""
+ CREATE TABLE {nodes_with_no_incoming_edges} AS
+ SELECT {select_group_cols}, __t1__.{src} AS {vertex_id},
+ {vpg}.{random_prob} AS pagerank
+ FROM {edge_temp_table} AS __t1__ {vpg_from_clause_ins}
+ WHERE NOT EXISTS (
+ SELECT 1
+ FROM {edge_temp_table} AS __t2__
+ WHERE __t1__.{src}=__t2__.{dest} AND {where_group_clause}
+ ) {vpg_where_clause_ins}
+ """.format(select_group_cols=','.join(['__t1__.{0}'.format(col)
+ for col in grouping_cols_list]),
+ where_group_clause=' AND '.join(['__t1__.{0}=__t2__.{0}'.format(col)
+ for col in grouping_cols_list]),
+ **locals()))
else:
- # Do not run convergence test if threshold=0, since that implies
- # the user wants to run max_iter iterations.
- unconverged = 1
- plpy.execute("DROP TABLE IF EXISTS {0}".format(cur))
- plpy.execute("""ALTER TABLE {message} RENAME TO {cur}
+ # cur and out_cnts tables can be simpler when no grouping is involved.
+ init_value = 1.0 / n_vertices
+ plpy.execute("""
+ CREATE TEMP TABLE {cur} AS
+ SELECT {vertex_id}, {init_value}::DOUBLE PRECISION AS pagerank
+ FROM {vertex_table}
+ {cur_distribution}
""".format(**locals()))
- if unconverged == 0:
- break
- # If there still are some unconverged groups/(entire table),
- # update results.
- if grouping_cols:
- if unconverged > 0:
+ # Compute the out-degree of every node in the graph.
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
+ plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
+ SELECT {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt}
+ FROM {edge_temp_table}
+ GROUP BY {src}
+ {cnts_distribution}
+ """.format(**locals()))
+ # The summary table when there is no grouping will contain only
+ # the iteration column. We don't need to create the out_table
+ # when no grouping is used since the 'cur' table will be renamed
+ # to out_table after pagerank computation is completed.
+ plpy.execute("""
+ CREATE TABLE {summary_table} (
+ __iterations__ INTEGER
+ )
+ """.format(**locals()))
+ # Find all nodes in the graph that don't have any incoming edges and
+ # assign random_prob as their pagerank values.
+ plpy.execute("""
+ CREATE TABLE {nodes_with_no_incoming_edges} AS
+ SELECT DISTINCT({src}), {random_probability} AS pagerank
+ FROM {edge_temp_table}
+ EXCEPT
+ (SELECT DISTINCT({dest}), {random_probability} AS pagerank
+ FROM {edge_temp_table})
+ """.format(**locals()))
+ unconverged = 0
+ iteration_num = 0
+ for iteration_num in range(max_iter):
+ #####################################################################
+ # PageRank for node 'A' at any given iteration 'i' is given by:
+ # PR_i(A) = damping_factor(PR_i-1(B)/degree(B) +
+ # PR_i-1(C)/degree(C) + ...) + (1-damping_factor)/N
+ # where 'N' is the number of vertices in the graph,
+ # B, C are nodes that have edges to node A, and
+ # degree(node) represents the number of outgoing edges from 'node'
+ #####################################################################
+ # Essentially, the pagerank for a node is based on an aggregate of a
+ # fraction of the pagerank values of all the nodes that have incoming
+ # edges to it, along with a small random probability.
+ # More information can be found at:
+ # https://en.wikipedia.org/wiki/PageRank#Damping_factor
+
+ # The query below computes the PageRank of each node using the above
+ # formula. A small explanatory note on ignore_group_clause:
+ # This is used only when grouping is set. This essentially will have
+ # the condition that will help skip the PageRank computation on groups
+ # that have converged.
+ plpy.execute("""
+ CREATE TABLE {message} AS
+ SELECT {grouping_cols_select_pr} {edge_temp_table}.{dest} AS {vertex_id},
+ SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_jump_prob} AS pagerank
+ FROM {edge_temp_table}
+ INNER JOIN {cur} ON {cur_join_clause}
+ INNER JOIN {out_cnts} ON {out_cnts_join_clause}
+ INNER JOIN {cur} AS {v1} ON {v1_join_clause}
+ {vertices_per_group_inner_join_pr}
+ {ignore_group_clause}
+ GROUP BY {grouping_cols_select_pr} {edge_temp_table}.{dest}
+ {cur_distribution}
+ """.format(ignore_group_clause=ignore_group_clause_pr
+ if iteration_num > 0 else ignore_group_clause_first,
+ **locals()))
+ # If there are nodes that have no incoming edges, they are not
+ # captured in the message table. Insert entries for such nodes,
+ # with random_prob.
+ plpy.execute("""
+ INSERT INTO {message}
+ SELECT *
+ FROM {nodes_with_no_incoming_edges}
+ {ignore_group_clause}
+ """.format(ignore_group_clause=ignore_group_clause_ins_noincoming
+ if iteration_num > 0 else ignore_group_clause_first,
+ **locals()))
+ # Check for convergence:
+ # Check for convergence only if threshold != 0.
if threshold != 0:
- # We completed max_iters, but there are still some unconverged
- # groups # Update the result and summary tables for unconverged
- # groups.
- update_result_tables(temp_summary_table, iteration_num,
- summary_table, out_table, cur, grouping_cols_list,
- cur_unconv)
+ # message_unconv and cur_unconv will contain the unconverged groups
+ # after current # and previous iterations respectively. Groups that
+ # are missing in message_unconv but appear in cur_unconv are the
+ # groups that have converged after this iteration's computations.
+ # If no grouping columns are specified, then we check if there is
+ # at least one unconverged node (limit 1 is used in the query).
+ plpy.execute("""
+ CREATE TEMP TABLE {message_unconv} AS
+ SELECT {grouping_cols_select_conv}
+ FROM {message}
+ INNER JOIN {cur}
+ ON {cur}.{vertex_id}={message}.{vertex_id}
+ WHERE {message_grp_clause_conv}
+ ABS({cur}.pagerank-{message}.pagerank) > {threshold}
+ {ignore_group_clause}
+ {group_by_grouping_cols_conv}
+ {limit}
+ """.format(ignore_group_clause=ignore_group_clause_ins
+ if iteration_num > 0 else ignore_group_clause_conv,
+ **locals()))
+ unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
+ """.format(message_unconv))[0]["cnt"]
+ if iteration_num > 0 and grouping_cols:
+ # Update result and summary tables for groups that have
+ # converged
+ # since the last iteration.
+ update_result_tables(temp_summary_table, iteration_num,
+ summary_table, out_table, message, grouping_cols_list,
+ cur_unconv, message_unconv)
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv))
+ plpy.execute("""ALTER TABLE {message_unconv} RENAME TO
+ {cur_unconv} """.format(**locals()))
else:
- # No group has converged. List of all group values are in
- # distinct_grp_table.
- update_result_tables(temp_summary_table, iteration_num,
- summary_table, out_table, cur, grouping_cols_list,
- distinct_grp_table)
- else:
- plpy.execute("""ALTER TABLE {table_name} RENAME TO {out_table}
- """.format(table_name=cur, **locals()))
- plpy.execute("""
+ # Do not run convergence test if threshold=0, since that implies
+ # the user wants to run max_iter iterations.
+ unconverged = 1
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(cur))
+ plpy.execute("""ALTER TABLE {message} RENAME TO {cur}
+ """.format(**locals()))
+ if unconverged == 0:
+ break
+
+ # If there still are some unconverged groups/(entire table),
+ # update results.
+ if grouping_cols:
+ if unconverged > 0:
+ if threshold != 0:
+ # We completed max_iters, but there are still some unconverged
+ # groups # Update the result and summary tables for unconverged
+ # groups.
+ update_result_tables(temp_summary_table, iteration_num,
+ summary_table, out_table, cur, grouping_cols_list,
+ cur_unconv)
+ else:
+ # No group has converged. List of all group values are in
+ # distinct_grp_table.
+ update_result_tables(temp_summary_table, iteration_num,
+ summary_table, out_table, cur, grouping_cols_list,
+ distinct_grp_table)
+ else:
+ plpy.execute("""
+ ALTER TABLE {table_name}
+ RENAME TO {out_table}
+ """.format(table_name=cur, **locals()))
+ plpy.execute("""
INSERT INTO {summary_table} VALUES
- ({iteration_num}+1);
- """.format(**locals()))
+ ({iteration_num}+1)
+ """.format(**locals()))
+
+ # Step 4: Cleanup
+ plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5},{6}
+ """.format(out_cnts, edge_temp_table, cur, message, cur_unconv,
+ message_unconv, nodes_with_no_incoming_edges))
+ if grouping_cols:
+ plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2}
+ """.format(vertices_per_group, temp_summary_table,
+ distinct_grp_table))
- ## Step 4: Cleanup
- plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5},{6};
- """.format(out_cnts, edge_temp_table, cur, message, cur_unconv,
- message_unconv, nodes_with_no_incoming_edges))
- if grouping_cols:
- plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2};
- """.format(vertices_per_group, temp_summary_table,
- distinct_grp_table))
- plpy.execute("SET client_min_messages TO %s" % old_msg_level)
def update_result_tables(temp_summary_table, i, summary_table, out_table,
- res_table, grouping_cols_list, cur_unconv, message_unconv=None):
+ res_table, grouping_cols_list, cur_unconv,
+ message_unconv=None):
"""
This function updates the summary and output tables only for those
groups that have converged. This is found out by looking at groups
@@ -556,7 +551,7 @@ def update_result_tables(temp_summary_table, i, summary_table, out_table,
FROM {cur_unconv}
WHERE {join_condition}
""".format(join_condition=get_ignore_groups(
- message_unconv, cur_unconv, grouping_cols_list), **locals()))
+ message_unconv, cur_unconv, grouping_cols_list), **locals()))
plpy.execute("""
INSERT INTO {summary_table}
SELECT *, {i}+1 AS __iteration__
@@ -569,21 +564,24 @@ def update_result_tables(temp_summary_table, i, summary_table, out_table,
INNER JOIN {temp_summary_table}
ON {join_condition}
""".format(join_condition=' AND '.join(
- ["{res_table}.{col}={temp_summary_table}.{col}".format(
- **locals())
- for col in grouping_cols_list]), **locals()))
+ ["{res_table}.{col}={temp_summary_table}.{col}".format(
+ **locals())
+ for col in grouping_cols_list]), **locals()))
+
def get_ignore_groups(first_table, second_table, grouping_cols_list):
"""
This function generates the necessary clause to only select the
groups that appear in second_table and not in first_table.
"""
- return """({second_table_cols}) NOT IN (SELECT {grouping_cols} FROM
- {first_table}) """.format(second_table_cols=', '.join(
- ["{second_table}.{col}".format(**locals())
- for col in grouping_cols_list]),
- grouping_cols=', '.join([col for col in grouping_cols_list]),
- **locals())
+ second_table_cols = ', '.join(["{0}.{1}".format(second_table, col)
+ for col in grouping_cols_list])
+ grouping_cols = ', '.join([col for col in grouping_cols_list])
+ return """({second_table_cols}) NOT IN
+ (SELECT {grouping_cols}
+ FROM {first_table})
+ """.format(**locals())
+
def pagerank_help(schema_madlib, message, **kwargs):
"""
@@ -601,7 +599,7 @@ def pagerank_help(schema_madlib, message, **kwargs):
message.lower() in ("usage", "help", "?"):
help_string = "Get from method below"
help_string = get_graph_usage(schema_madlib, 'PageRank',
- """out_table TEXT, -- Name of the output table for PageRank
+ """out_table TEXT, -- Name of the output table for PageRank
damping_factor DOUBLE PRECISION, -- Damping factor in random surfer model
-- (DEFAULT = 0.85)
max_iter INTEGER, -- Maximum iteration number (DEFAULT = 100)
[3/3] incubator-madlib git commit: Graph: Update Python code to
follow PEP-8
Posted by ri...@apache.org.
Graph: Update Python code to follow PEP-8
- Changed indentation to use spaces instead of tabs
- Updated to PEP-8 guidelines
- Updated to follow style guide convention
- Refactored few functions to clean code and design
Closes #148
Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/d487df3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/d487df3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/d487df3c
Branch: refs/heads/master
Commit: d487df3c46f984fad6c15b8a1d3e50dc30f6b6f6
Parents: 8c9b955
Author: Rahul Iyer <ri...@apache.org>
Authored: Fri Jul 7 22:23:18 2017 -0700
Committer: Rahul Iyer <ri...@apache.org>
Committed: Thu Jul 20 14:23:22 2017 -0700
----------------------------------------------------------------------
src/ports/postgres/modules/graph/apsp.py_in | 1294 +++++++++---------
.../postgres/modules/graph/graph_utils.py_in | 170 +--
src/ports/postgres/modules/graph/pagerank.py_in | 916 +++++++------
src/ports/postgres/modules/graph/sssp.py_in | 1125 +++++++--------
src/ports/postgres/modules/graph/wcc.py_in | 119 +-
.../postgres/modules/utilities/utilities.py_in | 45 +-
6 files changed, 1842 insertions(+), 1827 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/apsp.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/apsp.py_in b/src/ports/postgres/modules/graph/apsp.py_in
index 1331301..ab8a566 100644
--- a/src/ports/postgres/modules/graph/apsp.py_in
+++ b/src/ports/postgres/modules/graph/apsp.py_in
@@ -38,17 +38,16 @@ from utilities.utilities import _assert
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string
from utilities.utilities import split_quoted_delimited_str
-from utilities.validate_args import get_cols
+from utilities.utilities import is_platform_pg, is_platform_hawq
from utilities.validate_args import table_exists
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_is_empty
from utilities.validate_args import get_expr_type
-m4_changequote(`<!', `!>')
def graph_apsp(schema_madlib, vertex_table, vertex_id, edge_table,
- edge_args, out_table, grouping_cols, **kwargs):
- """
+ edge_args, out_table, grouping_cols, **kwargs):
+ """
All Pairs shortest path function for graphs using the matrix
multiplication based algorithm [1].
[1] http://users.cecs.anu.edu.au/~Alistair.Rendell/Teaching/apac_comp3600/module4/all_pairs_shortest_paths.xhtml
@@ -57,655 +56,656 @@ def graph_apsp(schema_madlib, vertex_table, vertex_id, edge_table,
@param vertex_id Name of the column containing the vertex ids.
@param edge_table Name of the table that contains the edge data.
@param edge_args A comma-delimited string containing multiple
- named arguments of the form "name=value".
- @param out_table Name of the table to store the result of APSP.
+ named arguments of the form "name=value".
+ @param out_table Name of the table to store the result of APSP.
@param grouping_cols The list of grouping columns.
"""
+ with MinWarning("warning"):
+
+ INT_MAX = 2147483647
+ INFINITY = "'Infinity'"
+ EPSILON = 0.000001
+
+ params_types = {'src': str, 'dest': str, 'weight': str}
+ default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
+ edge_params = extract_keyvalue_params(edge_args, params_types, default_args)
+
+ # Prepare the input for recording in the summary table
+ if vertex_id is None:
+ v_st = "NULL"
+ vertex_id = "id"
+ else:
+ v_st = vertex_id
+ if edge_args is None:
+ e_st = "NULL"
+ else:
+ e_st = edge_args
+ if grouping_cols is None:
+ g_st = "NULL"
+ glist = None
+ else:
+ g_st = grouping_cols
+ glist = split_quoted_delimited_str(grouping_cols)
+
+ src = edge_params["src"]
+ dest = edge_params["dest"]
+ weight = edge_params["weight"]
+
+ distribution = '' if is_platform_pg() else "DISTRIBUTED BY ({0})".format(src)
+ is_hawq = is_platform_hawq()
+
+ _validate_apsp(vertex_table, vertex_id, edge_table,
+ edge_params, out_table, glist)
+
+ out_table_1 = unique_string(desp='out_table_1')
+ out_table_2 = unique_string(desp='out_table_2')
+ tmp_view = unique_string(desp='tmp_view')
+ v1 = unique_string(desp='v1')
+ v2 = unique_string(desp='v2')
+ message = unique_string(desp='message')
+
+ # Initialize grouping related variables
+ comma_grp = ""
+ comma_grp_e = ""
+ comma_grp_m = ""
+ grp_comma = ""
+ grp_v1_comma = ""
+ grp_o1_comma = ""
+ grp_o_comma = ""
+ checkg_eo = ""
+ checkg_eout = ""
+ checkg_ex = ""
+ checkg_om = ""
+ checkg_o1t_sub = ""
+ checkg_ot_sub = ""
+ checkg_ot = ""
+ checkg_o1t = ""
+ checkg_vv = ""
+ checkg_o2v = ""
+ checkg_oy = ""
+ checkg_vv_sub = "TRUE"
+ grp_by = ""
+
+ if grouping_cols is not None:
+
+ # We use actual table names in some cases and aliases in others
+ # In some cases, we swap the table names so use of an alias is
+ # necessary. In other cases, they are used to simplify debugging.
+
+ comma_grp = " , " + grouping_cols
+ comma_grp_e = " , " + _grp_from_table("edge", glist)
+ comma_grp_m = " , " + _grp_from_table(message, glist)
+ grp_comma = grouping_cols + " , "
+ grp_v1_comma = _grp_from_table("v1", glist) + " , "
+ grp_o1_comma = _grp_from_table(out_table_1, glist) + " , "
+ grp_o_comma = _grp_from_table("out", glist) + " , "
+
+ checkg_eo = " AND " + _check_groups(edge_table, out_table, glist)
+ checkg_eout = " AND " + _check_groups("edge", "out", glist)
+ checkg_ex = " AND " + _check_groups("edge", "x", glist)
+ checkg_om = " AND " + _check_groups(out_table, message, glist)
+ checkg_o1t_sub = _check_groups("out", tmp_view, glist)
+ checkg_ot_sub = _check_groups(out_table, tmp_view, glist)
+ checkg_ot = " AND " + _check_groups(out_table, tmp_view, glist)
+ checkg_o1t = " AND " + _check_groups("out", "t", glist)
+ checkg_vv = " AND " + _check_groups("v1", "v2", glist)
+ checkg_o2v = " AND " + _check_groups(out_table_2, "v2", glist)
+ checkg_oy = " AND " + _check_groups("out", "y", glist)
+ checkg_vv_sub = _check_groups("v1", "v2", glist)
+ grp_by = " GROUP BY " + grouping_cols
+
+ w_type = get_expr_type(weight, edge_table).lower()
+ init_w = INT_MAX
+ if w_type in ['real', 'double precision', 'float8']:
+ init_w = INFINITY
+
+ # We keep a summary table to keep track of the parameters used for this
+ # APSP run. This table is used in the path finding function to eliminate
+ # the need for repetition.
+ plpy.execute(""" CREATE TABLE {out_table}_summary (
+ vertex_table TEXT,
+ vertex_id TEXT,
+ edge_table TEXT,
+ edge_args TEXT,
+ out_table TEXT,
+ grouping_cols TEXT)
+ """.format(**locals()))
+ plpy.execute(""" INSERT INTO {out_table}_summary VALUES
+ ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
+ '{out_table}', '{g_st}') """.format(**locals()))
+
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+
+ # Find all of the vertices involved with a given group
+ plpy.execute(""" CREATE VIEW {tmp_view} AS
+ SELECT {src} AS {vertex_id} {comma_grp}
+ FROM {edge_table} WHERE {src} IS NOT NULL
+ UNION
+ SELECT {dest} AS {vertex_id} {comma_grp}
+ FROM {edge_table} WHERE {dest} IS NOT NULL
+ """.format(**locals()))
+
+ # Don't use the unnecessary rows of the output table during joins.
+ ot_sql = """ SELECT * FROM {out_table}
+ WHERE {weight} != {init_w} AND {src} != {dest} """
+
+ # HAWQ does not support UPDATE so the initialization has to differ.
+ if is_hawq:
+
+ plpy.execute(" DROP TABLE IF EXISTS {0},{1}".format(
+ out_table_1, out_table_2))
+ # Create 2 identical tables to swap at every iteration.
+ plpy.execute(""" CREATE TABLE {out_table_1} AS
+ SELECT {grp_comma} {src},{dest},{weight}, NULL::INT AS parent
+ FROM {edge_table} LIMIT 0 {distribution}
+ """.format(**locals()))
+ plpy.execute(""" CREATE TABLE {out_table_2} AS
+ SELECT * FROM {out_table_1} LIMIT 0 {distribution}
+ """.format(**locals()))
+
+ # The source can be reached with 0 cost and next is itself.
+ plpy.execute(""" INSERT INTO {out_table_2}
+ SELECT {grp_comma} {vertex_id} AS {src}, {vertex_id} AS {dest},
+ 0 AS {weight}, {vertex_id} AS parent
+ FROM {tmp_view} """.format(**locals()))
+ # Distance = 1: every edge means there is a path from src to dest
+ plpy.execute(""" INSERT INTO {out_table_2}
+ SELECT {grp_comma} {src}, {dest}, {weight}, {dest} AS parent
+ FROM {edge_table} """.format(**locals()))
+
+ # Fill the rest of the possible pairs with infinite initial weights
+ fill_sql = """ INSERT INTO {out_table_1}
+ SELECT {grp_v1_comma}
+ v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest},
+ {init_w} AS {weight}, NULL::INT AS parent
+ FROM {tmp_view} v1, {tmp_view} v2
+ WHERE NOT EXISTS
+ (SELECT 1 FROM {out_table_2}
+ WHERE v1.{vertex_id} = {src} AND
+ v2.{vertex_id} = {dest}
+ {checkg_vv} {checkg_o2v})
+ {checkg_vv}
+ UNION
+ SELECT * FROM {out_table_2}
+ """.format(**locals())
+ plpy.execute(fill_sql)
+
+ ot_sql1 = ot_sql.format(out_table=out_table_1, init_w=init_w,
+ weight=weight, src=src, dest=dest)
+ ot_sql2 = ot_sql.format(out_table=out_table_2, init_w=init_w,
+ weight=weight, src=src, dest=dest)
+
+ # PostgreSQL & GPDB initialization
+ else:
+
+ plpy.execute(""" CREATE TABLE {out_table} AS
+ (SELECT {grp_comma} {src}, {dest}, {weight},
+ {src} AS parent FROM {edge_table} LIMIT 0)
+ {distribution} """.format(**locals()))
+
+ plpy.execute(""" INSERT INTO {out_table}
+ SELECT {grp_v1_comma}
+ v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest},
+ {init_w} AS {weight}, NULL::INT AS parent
+ FROM
+ {tmp_view} AS v1 INNER JOIN
+ {tmp_view} AS v2 ON ({checkg_vv_sub})
+ """.format(**locals()))
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+
+ # GPDB and HAWQ have distributed by clauses to help them with indexing.
+ # For Postgres we add the indices manually.
+ if is_platform_pg():
+ sql_index = "CREATE INDEX ON {0}({1})".format(out_table, src)
+ else:
+ sql_index = ''
+
+ plpy.execute(sql_index)
+
+ # The source can be reached with 0 cost and next is itself.
+ plpy.execute(
+ """ UPDATE {out_table} SET
+ {weight} = 0, parent = {vertex_id}
+ FROM {vertex_table}
+ WHERE {out_table}.{src} = {vertex_id}
+ AND {out_table}.{dest} = {vertex_id}
+ """.format(**locals()))
+
+ # Distance = 1: every edge means there is a path from src to dest
+
+ # There may be multiple edges defined as a->b,
+ # we only need the minimum weighted one.
+
+ plpy.execute(
+ """ CREATE VIEW {tmp_view} AS
+ SELECT {grp_comma} {src}, {dest},
+ min({weight}) AS {weight}
+ FROM {edge_table}
+ GROUP BY {grp_comma} {src}, {dest}
+ """.format(**locals()))
+ plpy.execute(
+ """ UPDATE {out_table} SET
+ {weight} = {tmp_view}.{weight}, parent = {tmp_view}.{dest}
+ FROM {tmp_view}
+ WHERE {out_table}.{src} = {tmp_view}.{src}
+ AND {out_table}.{dest} = {tmp_view}.{dest}
+ AND {out_table}.{weight} > {tmp_view}.{weight} {checkg_ot}
+ """.format(**locals()))
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+
+ ot_sql1 = ot_sql.format(**locals())
+ out_table_1 = out_table
+
+ # Find the maximum number of iterations to try
+ # If not done by v_cnt iterations, there is a negative cycle.
+ v_cnt = plpy.execute(
+ """ SELECT max(count) as max FROM (
+ SELECT count(DISTINCT {src}) AS count
+ FROM {out_table_1}
+ {grp_by}) x
+ """.format(**locals()))[0]['max']
+
+ if v_cnt < 2:
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+ format(out_table, out_table + "_summary"))
+ if is_hawq:
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
+ out_table_1, out_table_2))
+ if grouping_cols:
+ plpy.error(("Graph APSP: {0} has less than 2 vertices in " +
+ "every group.").format(edge_table))
+ else:
+ plpy.error("Graph APSP: {0} has less than 2 vertices.".format(
+ vertex_table))
+
+ for i in range(0, v_cnt + 1):
+
+ """
+ Create a view that will be used to update the output table
+
+ The implementation is based on the matrix multiplication idea.
+ The initial output table consists of 3 sets of vertex pairs.
+ 1) for every vervex v: v -> v, weight 0, parent v
+ 2) for every edge v1,v2,w: v1 -> v2, weight w, parent v2
+ 3) for every other vertex pair v1,v2: v1 -> v2, weight 'Inf',
+ parent NULL
+
+ The algorithm "relaxes" the paths: finds alternate paths with less
+ weights
+ At every step, we look at every combination of non-infinite
+ existing paths and edges to see if we can relax a path.
+
+ Assume the graph is a chain: 1->2->3->...
+ The initial output table will have a finite weighted path for 1->2
+ and infinite for the rest. At ith iteration, the output table will
+ have 1->i path and will relax 1->i+1 path from infinite to a
+ finite value (weight of 1->i path + weight of i->i+1 edge) and
+ assign i as the parent.
+
+ Since using '=' with floats is dangerous we use an epsilon value
+ for comparison.
+ """
+
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+ update_sql = """ CREATE VIEW {tmp_view} AS
+ SELECT DISTINCT ON ({grp_o_comma} y.{src}, y.{dest})
+ {grp_o_comma} y.{src}, y.{dest},y.{weight}, y.parent
+ FROM {out_table_1} AS out
+ INNER JOIN
+ (SELECT x.{src}, x.{dest}, x.{weight},
+ out.{dest} as parent {comma_grp_e}
+ FROM
+ ({ot_sql1}) AS out
+ INNER JOIN
+ {edge_table} AS edge
+ ON (out.{dest} = edge.{src} {checkg_eout})
+ INNER JOIN
+ (SELECT out.{src}, edge.{dest},
+ min(out.{weight}+edge.{weight}) AS {weight}
+ {comma_grp_e}
+ FROM
+ ({ot_sql1}) AS out,
+ {edge_table} AS edge
+ WHERE out.{dest} = edge.{src} {checkg_eout}
+ GROUP BY out.{src},edge.{dest} {comma_grp_e}) x
+ ON (x.{src} = out.{src} AND x.{dest} = edge.{dest} {checkg_ex})
+ WHERE ABS(out.{weight}+edge.{weight} - x.{weight})
+ < {EPSILON}) y
+ ON (y.{src} = out.{src} AND y.{dest} = out.{dest} {checkg_oy})
+ WHERE y.{weight} < out.{weight}
+ """.format(**locals())
+ plpy.execute(update_sql)
+
+ # HAWQ employs alternating tables and has to be handled separately
+ if is_hawq:
+
+ # Stop if therea re no more updates
+ if table_is_empty(tmp_view):
+
+ plpy.execute("DROP VIEW {0}".format(tmp_view))
+ plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(
+ out_table_1, out_table))
+ break
+
+ # The new output table will still have the old values for
+ # every vertex pair that does not appear on the update view.
+
+ plpy.execute("TRUNCATE TABLE {0}".format(out_table_2))
+ fill_sql = """ INSERT INTO {out_table_2}
+ SELECT * FROM {out_table_1} AS out WHERE NOT EXISTS
+ (SELECT 1 FROM {tmp_view} AS t
+ WHERE t.{src} = out.{src} AND
+ t.{dest} = out.{dest} {checkg_o1t})
+ UNION
+ SELECT * FROM {tmp_view}""".format(**locals())
+ plpy.execute(fill_sql)
+
+ # Swap the table names and the sql command we use for filtering
+ tmpname = out_table_1
+ out_table_1 = out_table_2
+ out_table_2 = tmpname
+
+ tmpname = ot_sql1
+ ot_sql1 = ot_sql2
+ ot_sql2 = tmpname
+
+ else:
+
+ updates = plpy.execute(
+ """ UPDATE {out_table}
+ SET {weight} = {tmp_view}.{weight},
+ parent = {tmp_view}.parent
+ FROM {tmp_view}
+ WHERE {tmp_view}.{src} = {out_table}.{src} AND
+ {tmp_view}.{dest} = {out_table}.{dest} {checkg_ot}
+ """.format(**locals()))
+ if updates.nrows() == 0:
+ break
+
+ # The algorithm should have reached a break command by this point.
+ # This check handles the existence of a negative cycle.
+ if i == v_cnt:
+
+ # If there are no groups, clean up and give error.
+ if grouping_cols is None:
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+ format(out_table, out_table + "_summary"))
+ if is_hawq:
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
+ out_table_1, out_table_2))
+ plpy.error("Graph APSP: Detected a negative cycle in the graph.")
+
+ # It is possible that not all groups has negative cycles.
+ else:
+ # negs is the string created by collating grouping columns.
+ # By looking at the update view, we can see which groups
+ # are in a negative cycle.
+
+ negs = plpy.execute(
+ """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
+ FROM {tmp_view}
+ """.format(**locals()))[0]['grp']
+
+ # Delete the groups with negative cycles from the output table.
+
+ # HAWQ doesn't support DELETE so we have to copy the valid results.
+ if is_hawq:
+ sql_del = """ CREATE TABLE {out_table} AS
+ SELECT *
+ FROM {out_table_1} AS out
+ WHERE NOT EXISTS(
+ SELECT 1
+ FROM {tmp_view}
+ WHERE {checkg_o1t_sub}
+ );"""
+ plpy.execute(sql_del.format(**locals()))
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
+ out_table_1, out_table_2))
+ else:
+ sql_del = """ DELETE FROM {out_table}
+ USING {tmp_view}
+ WHERE {checkg_ot_sub}"""
+ plpy.execute(sql_del.format(**locals()))
+
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+
+ # If every group has a negative cycle,
+ # drop the output table as well.
+ if table_is_empty(out_table):
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+ format(out_table, out_table + "_summary"))
+ plpy.warning(
+ """Graph APSP: Detected a negative cycle in the """ +
+ """sub-graphs of following groups: {0}.""".
+ format(str(negs)[1:-1]))
+
+ else:
+ plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+ if is_hawq:
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(out_table_2))
+
+ return None
- with MinWarning("warning"):
-
- INT_MAX = 2147483647
- INFINITY = "'Infinity'"
- EPSILON = 0.000001
-
- params_types = {'src': str, 'dest': str, 'weight': str}
- default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
- edge_params = extract_keyvalue_params(edge_args,
- params_types,
- default_args)
-
- # Prepare the input for recording in the summary table
- if vertex_id is None:
- v_st= "NULL"
- vertex_id = "id"
- else:
- v_st = vertex_id
- if edge_args is None:
- e_st = "NULL"
- else:
- e_st = edge_args
- if grouping_cols is None:
- g_st = "NULL"
- glist = None
- else:
- g_st = grouping_cols
- glist = split_quoted_delimited_str(grouping_cols)
-
- src = edge_params["src"]
- dest = edge_params["dest"]
- weight = edge_params["weight"]
-
- distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
- <!"DISTRIBUTED BY ({0})".format(src)!>)
-
- is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
-
- _validate_apsp(vertex_table, vertex_id, edge_table,
- edge_params, out_table, glist)
-
- out_table_1 = unique_string(desp='out_table_1')
- out_table_2 = unique_string(desp='out_table_2')
- tmp_view = unique_string(desp='tmp_view')
- v1 = unique_string(desp='v1')
- v2 = unique_string(desp='v2')
- message = unique_string(desp='message')
-
- # Initialize grouping related variables
- comma_grp = ""
- comma_grp_e = ""
- comma_grp_m = ""
- grp_comma = ""
- grp_v1_comma = ""
- grp_o1_comma = ""
- grp_o_comma = ""
- checkg_eo = ""
- checkg_eout = ""
- checkg_ex = ""
- checkg_om = ""
- checkg_o1t_sub = ""
- checkg_ot_sub = ""
- checkg_ot = ""
- checkg_o1t = ""
- checkg_vv = ""
- checkg_o2v = ""
- checkg_oy = ""
- checkg_vv_sub = "TRUE"
- grp_by = ""
-
- if grouping_cols is not None:
-
- # We use actual table names in some cases and aliases in others
- # In some cases, we swap the table names so use of an alias is
- # necessary. In other cases, they are used to simplify debugging.
-
- comma_grp = " , " + grouping_cols
- comma_grp_e = " , " + _grp_from_table("edge",glist)
- comma_grp_m = " , " + _grp_from_table(message,glist)
- grp_comma = grouping_cols + " , "
- grp_v1_comma = _grp_from_table("v1",glist) + " , "
- grp_o1_comma = _grp_from_table(out_table_1,glist) + " , "
- grp_o_comma = _grp_from_table("out",glist) + " , "
-
- checkg_eo = " AND " + _check_groups(edge_table,out_table,glist)
- checkg_eout = " AND " + _check_groups("edge","out",glist)
- checkg_ex = " AND " + _check_groups("edge","x",glist)
- checkg_om = " AND " + _check_groups(out_table,message,glist)
- checkg_o1t_sub = _check_groups("out",tmp_view,glist)
- checkg_ot_sub = _check_groups(out_table,tmp_view,glist)
- checkg_ot = " AND " + _check_groups(out_table,tmp_view,glist)
- checkg_o1t = " AND " + _check_groups("out","t",glist)
- checkg_vv = " AND " + _check_groups("v1","v2",glist)
- checkg_o2v = " AND " + _check_groups(out_table_2,"v2",glist)
- checkg_oy = " AND " + _check_groups("out","y",glist)
- checkg_vv_sub = _check_groups("v1","v2",glist)
- grp_by = " GROUP BY " + grouping_cols
-
- w_type = get_expr_type(weight,edge_table).lower()
- init_w = INT_MAX
- if w_type in ['real','double precision','float8']:
- init_w = INFINITY
-
- # We keep a summary table to keep track of the parameters used for this
- # APSP run. This table is used in the path finding function to eliminate
- # the need for repetition.
- plpy.execute( """ CREATE TABLE {out_table}_summary (
- vertex_table TEXT,
- vertex_id TEXT,
- edge_table TEXT,
- edge_args TEXT,
- out_table TEXT,
- grouping_cols TEXT)
- """.format(**locals()))
- plpy.execute( """ INSERT INTO {out_table}_summary VALUES
- ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
- '{out_table}', '{g_st}') """.format(**locals()))
-
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-
- # Find all of the vertices involved with a given group
- plpy.execute(""" CREATE VIEW {tmp_view} AS
- SELECT {src} AS {vertex_id} {comma_grp}
- FROM {edge_table} WHERE {src} IS NOT NULL
- UNION
- SELECT {dest} AS {vertex_id} {comma_grp}
- FROM {edge_table} WHERE {dest} IS NOT NULL
- """.format(**locals()))
-
- # Don't use the unnecessary rows of the output table during joins.
- ot_sql = """ SELECT * FROM {out_table}
- WHERE {weight} != {init_w} AND {src} != {dest} """
-
- # HAWQ does not support UPDATE so the initialization has to differ.
- if is_hawq:
-
- plpy.execute(" DROP TABLE IF EXISTS {0},{1}".format(
- out_table_1,out_table_2))
- # Create 2 identical tables to swap at every iteration.
- plpy.execute(""" CREATE TABLE {out_table_1} AS
- SELECT {grp_comma} {src},{dest},{weight}, NULL::INT AS parent
- FROM {edge_table} LIMIT 0 {distribution}
- """.format(**locals()))
- plpy.execute(""" CREATE TABLE {out_table_2} AS
- SELECT * FROM {out_table_1} LIMIT 0 {distribution}
- """.format(**locals()))
-
- # The source can be reached with 0 cost and next is itself.
- plpy.execute(""" INSERT INTO {out_table_2}
- SELECT {grp_comma} {vertex_id} AS {src}, {vertex_id} AS {dest},
- 0 AS {weight}, {vertex_id} AS parent
- FROM {tmp_view} """.format(**locals()))
- # Distance = 1: every edge means there is a path from src to dest
- plpy.execute(""" INSERT INTO {out_table_2}
- SELECT {grp_comma} {src}, {dest}, {weight}, {dest} AS parent
- FROM {edge_table} """.format(**locals()))
-
- # Fill the rest of the possible pairs with infinite initial weights
- fill_sql = """ INSERT INTO {out_table_1}
- SELECT {grp_v1_comma}
- v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest},
- {init_w} AS {weight}, NULL::INT AS parent
- FROM {tmp_view} v1, {tmp_view} v2
- WHERE NOT EXISTS
- (SELECT 1 FROM {out_table_2}
- WHERE v1.{vertex_id} = {src} AND
- v2.{vertex_id} = {dest}
- {checkg_vv} {checkg_o2v})
- {checkg_vv}
- UNION
- SELECT * FROM {out_table_2}
- """.format(**locals())
- plpy.execute(fill_sql)
-
- ot_sql1 = ot_sql.format(out_table = out_table_1, init_w = init_w,
- weight = weight, src = src, dest = dest)
- ot_sql2 = ot_sql.format(out_table = out_table_2, init_w = init_w,
- weight = weight, src = src, dest = dest)
-
- # PostgreSQL & GPDB initialization
- else:
-
- plpy.execute( """ CREATE TABLE {out_table} AS
- (SELECT {grp_comma} {src}, {dest}, {weight},
- {src} AS parent FROM {edge_table} LIMIT 0)
- {distribution} """.format(**locals()))
-
- plpy.execute( """ INSERT INTO {out_table}
- SELECT {grp_v1_comma}
- v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest},
- {init_w} AS {weight}, NULL::INT AS parent
- FROM
- {tmp_view} AS v1 INNER JOIN
- {tmp_view} AS v2 ON ({checkg_vv_sub})
- """.format(**locals()))
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-
- # GPDB and HAWQ have distributed by clauses to help them with indexing.
- # For Postgres we add the indices manually.
- sql_index = m4_ifdef(<!__POSTGRESQL__!>,
- <!""" CREATE INDEX ON {out_table} ({src});
- """.format(**locals())!>, <!''!>)
- plpy.execute(sql_index)
-
- # The source can be reached with 0 cost and next is itself.
- plpy.execute(
- """ UPDATE {out_table} SET
- {weight} = 0, parent = {vertex_id}
- FROM {vertex_table}
- WHERE {out_table}.{src} = {vertex_id}
- AND {out_table}.{dest} = {vertex_id}
- """.format(**locals()))
-
- # Distance = 1: every edge means there is a path from src to dest
-
- # There may be multiple edges defined as a->b,
- # we only need the minimum weighted one.
-
- plpy.execute(
- """ CREATE VIEW {tmp_view} AS
- SELECT {grp_comma} {src}, {dest},
- min({weight}) AS {weight}
- FROM {edge_table}
- GROUP BY {grp_comma} {src}, {dest}
- """.format(**locals()))
- plpy.execute(
- """ UPDATE {out_table} SET
- {weight} = {tmp_view}.{weight}, parent = {tmp_view}.{dest}
- FROM {tmp_view}
- WHERE {out_table}.{src} = {tmp_view}.{src}
- AND {out_table}.{dest} = {tmp_view}.{dest}
- AND {out_table}.{weight} > {tmp_view}.{weight} {checkg_ot}
- """.format(**locals()))
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-
- ot_sql1 = ot_sql.format(**locals())
- out_table_1 = out_table
-
- # Find the maximum number of iterations to try
- # If not done by v_cnt iterations, there is a negative cycle.
- v_cnt = plpy.execute(
- """ SELECT max(count) as max FROM (
- SELECT count(DISTINCT {src}) AS count
- FROM {out_table_1}
- {grp_by}) x
- """.format(**locals()))[0]['max']
-
- if v_cnt < 2:
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
- plpy.execute("DROP TABLE IF EXISTS {0},{1}".
- format(out_table, out_table+"_summary"))
- if is_hawq:
- plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
- out_table_1,out_table_2))
- if grouping_cols:
- plpy.error(("Graph APSP: {0} has less than 2 vertices in "+
- "every group.").format(edge_table))
- else:
- plpy.error("Graph APSP: {0} has less than 2 vertices.".format(
- vertex_table))
-
- for i in range(0,v_cnt+1):
-
- """
- Create a view that will be used to update the output table
-
- The implementation is based on the matrix multiplication idea.
- The initial output table consists of 3 sets of vertex pairs.
- 1) for every vervex v: v -> v, weight 0, parent v
- 2) for every edge v1,v2,w: v1 -> v2, weight w, parent v2
- 3) for every other vertex pair v1,v2: v1 -> v2, weight 'Inf',
- parent NULL
-
- The algorithm "relaxes" the paths: finds alternate paths with less
- weights
- At every step, we look at every combination of non-infinite
- existing paths and edges to see if we can relax a path.
-
- Assume the graph is a chain: 1->2->3->...
- The initial output table will have a finite weighted path for 1->2
- and infinite for the rest. At ith iteration, the output table will
- have 1->i path and will relax 1->i+1 path from infinite to a
- finite value (weight of 1->i path + weight of i->i+1 edge) and
- assign i as the parent.
-
- Since using '=' with floats is dangerous we use an epsilon value
- for comparison.
- """
-
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
- update_sql = """ CREATE VIEW {tmp_view} AS
- SELECT DISTINCT ON ({grp_o_comma} y.{src}, y.{dest})
- {grp_o_comma} y.{src}, y.{dest},y.{weight}, y.parent
- FROM {out_table_1} AS out
- INNER JOIN
- (SELECT x.{src}, x.{dest}, x.{weight},
- out.{dest} as parent {comma_grp_e}
- FROM
- ({ot_sql1}) AS out
- INNER JOIN
- {edge_table} AS edge
- ON (out.{dest} = edge.{src} {checkg_eout})
- INNER JOIN
- (SELECT out.{src}, edge.{dest},
- min(out.{weight}+edge.{weight}) AS {weight}
- {comma_grp_e}
- FROM
- ({ot_sql1}) AS out,
- {edge_table} AS edge
- WHERE out.{dest} = edge.{src} {checkg_eout}
- GROUP BY out.{src},edge.{dest} {comma_grp_e}) x
- ON (x.{src} = out.{src} AND x.{dest} = edge.{dest} {checkg_ex})
- WHERE ABS(out.{weight}+edge.{weight} - x.{weight})
- < {EPSILON}) y
- ON (y.{src} = out.{src} AND y.{dest} = out.{dest} {checkg_oy})
- WHERE y.{weight} < out.{weight}
- """.format(**locals())
- plpy.execute(update_sql)
-
- # HAWQ employs alternating tables and has to be handled separately
- if is_hawq:
-
- # Stop if therea re no more updates
- if table_is_empty(tmp_view):
-
- plpy.execute("DROP VIEW {0}".format(tmp_view))
- plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(
- out_table_1,out_table))
- break
-
- # The new output table will still have the old values for
- # every vertex pair that does not appear on the update view.
-
- plpy.execute("TRUNCATE TABLE {0}".format(out_table_2))
- fill_sql = """ INSERT INTO {out_table_2}
- SELECT * FROM {out_table_1} AS out WHERE NOT EXISTS
- (SELECT 1 FROM {tmp_view} AS t
- WHERE t.{src} = out.{src} AND
- t.{dest} = out.{dest} {checkg_o1t})
- UNION
- SELECT * FROM {tmp_view}""".format(**locals())
- plpy.execute(fill_sql)
-
- # Swap the table names and the sql command we use for filtering
- tmpname = out_table_1
- out_table_1 = out_table_2
- out_table_2 = tmpname
-
- tmpname = ot_sql1
- ot_sql1 = ot_sql2
- ot_sql2 = tmpname
-
- else:
-
- updates = plpy.execute(
- """ UPDATE {out_table}
- SET {weight} = {tmp_view}.{weight},
- parent = {tmp_view}.parent
- FROM {tmp_view}
- WHERE {tmp_view}.{src} = {out_table}.{src} AND
- {tmp_view}.{dest} = {out_table}.{dest} {checkg_ot}
- """.format(**locals()))
- if updates.nrows() == 0:
- break
-
- # The algorithm should have reached a break command by this point.
- # This check handles the existence of a negative cycle.
- if i == v_cnt:
-
- # If there are no groups, clean up and give error.
- if grouping_cols is None:
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
- plpy.execute("DROP TABLE IF EXISTS {0},{1}".
- format(out_table, out_table+"_summary"))
- if is_hawq:
- plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
- out_table_1,out_table_2))
- plpy.error("Graph APSP: Detected a negative cycle in the graph.")
-
- # It is possible that not all groups has negative cycles.
- else:
- # negs is the string created by collating grouping columns.
- # By looking at the update view, we can see which groups
- # are in a negative cycle.
-
- negs = plpy.execute(
- """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
- FROM {tmp_view}
- """.format(**locals()))[0]['grp']
-
- # Delete the groups with negative cycles from the output table.
-
- # HAWQ doesn't support DELETE so we have to copy the valid results.
- if is_hawq:
- sql_del = """ CREATE TABLE {out_table} AS
- SELECT *
- FROM {out_table_1} AS out
- WHERE NOT EXISTS(
- SELECT 1
- FROM {tmp_view}
- WHERE {checkg_o1t_sub}
- );"""
- plpy.execute(sql_del.format(**locals()))
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
- plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
- out_table_1,out_table_2))
- else:
- sql_del = """ DELETE FROM {out_table}
- USING {tmp_view}
- WHERE {checkg_ot_sub}"""
- plpy.execute(sql_del.format(**locals()))
-
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-
- # If every group has a negative cycle,
- # drop the output table as well.
- if table_is_empty(out_table):
- plpy.execute("DROP TABLE IF EXISTS {0},{1}".
- format(out_table,out_table+"_summary"))
- plpy.warning(
- """Graph APSP: Detected a negative cycle in the """ +
- """sub-graphs of following groups: {0}.""".
- format(str(negs)[1:-1]))
-
- else:
- plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
- if is_hawq:
- plpy.execute("DROP TABLE IF EXISTS {0}".format(out_table_2))
-
- return None
def graph_apsp_get_path(schema_madlib, apsp_table,
- source_vertex, dest_vertex, path_table, **kwargs):
- """
- Helper function that can be used to get the shortest path between any 2
- vertices
+ source_vertex, dest_vertex, path_table, **kwargs):
+ """
+ Helper function that can be used to get the shortest path between any 2
+ vertices
Args:
- @param apsp_table Name of the table that contains the APSP
- output.
- @param source_vertex The vertex that will be the source of the
- desired path.
- @param dest_vertex The vertex that will be the destination of the
- desired path.
- """
-
- with MinWarning("warning"):
- _validate_get_path(apsp_table, source_vertex, dest_vertex, path_table)
-
- temp1_name = unique_string(desp='temp1')
- temp2_name = unique_string(desp='temp2')
-
- summary = plpy.execute("SELECT * FROM {0}_summary".format(apsp_table))
- vertex_id = summary[0]['vertex_id']
- edge_args = summary[0]['edge_args']
- grouping_cols = summary[0]['grouping_cols']
-
- params_types = {'src': str, 'dest': str, 'weight': str}
- default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
- edge_params = extract_keyvalue_params(edge_args,
- params_types,
- default_args)
-
- src = edge_params["src"]
- dest = edge_params["dest"]
- weight = edge_params["weight"]
-
- if vertex_id == "NULL":
- vertex_id = "id"
-
- if grouping_cols == "NULL":
- grouping_cols = None
-
- select_grps = ""
- check_grps_t1 = ""
- check_grps_t2 = ""
- grp_comma = ""
- tmp = ""
-
- if grouping_cols is not None:
- glist = split_quoted_delimited_str(grouping_cols)
- select_grps = _grp_from_table(apsp_table,glist) + " , "
- check_grps_t1 = " AND " + _check_groups(
- apsp_table,temp1_name,glist)
- check_grps_t2 = " AND " + _check_groups(
- apsp_table,temp2_name,glist)
-
- grp_comma = grouping_cols + " , "
-
- # If the source and destination is the same vertex.
- # There is no need to check the paths for any group.
- if source_vertex == dest_vertex:
- plpy.execute("""
- CREATE TABLE {path_table} AS
- SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
- FROM {apsp_table} WHERE {src} = {source_vertex} AND
- {dest} = {dest_vertex}
- """.format(**locals()))
- return
-
- plpy.execute( "DROP TABLE IF EXISTS {0},{1}".
- format(temp1_name,temp2_name));
-
- # Initialize the temporary tables
- out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
- SELECT {grp_comma} {apsp_table}.parent AS {vertex_id},
- ARRAY[{dest_vertex}] AS path
- FROM {apsp_table}
- WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
- AND {apsp_table}.parent IS NOT NULL
- """.format(**locals()))
-
- plpy.execute("""
- CREATE TEMP TABLE {temp2_name} AS
- SELECT * FROM {temp1_name} LIMIT 0
- """.format(**locals()))
-
- # Follow the 'parent' chain until you reach the case where the parent
- # is the same as destination. This means it is the last vertex before
- # the source.
- while out.nrows() > 0:
-
- plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
- # If the parent id is not the same as dest,
- # that means we have to follow the chain:
- # Add it to the path and move to its parent
- out = plpy.execute(
- """ INSERT INTO {temp2_name}
- SELECT {select_grps} {apsp_table}.parent AS {vertex_id},
- {apsp_table}.{dest} || {temp1_name}.path AS path
- FROM {apsp_table} INNER JOIN {temp1_name} ON
- ({apsp_table}.{dest} = {temp1_name}.{vertex_id}
- {check_grps_t1})
- WHERE {src} = {source_vertex} AND
- {apsp_table}.parent <> {apsp_table}.{dest}
- """.format(**locals()))
-
- tmp = temp2_name
- temp2_name = temp1_name
- temp1_name = tmp
-
- tmp = check_grps_t1
- check_grps_t1 = check_grps_t2
- check_grps_t2 = tmp
-
- # We have to consider 3 cases.
- # 1) The path has more than 2 vertices:
- # Add the current parent and the source vertex
- # 2) The path has exactly 2 vertices (an edge between src and dest is
- # the shortest path).
- # Add the source vertex
- # 3) The path has 0 vertices (unreachable)
- # Add an empty array.
-
- # Path with 1 vertex (src == dest) has been handled before
- plpy.execute("""
- CREATE TABLE {path_table} AS
- SELECT {grp_comma} {source_vertex} || ({vertex_id} || path) AS path
- FROM {temp2_name}
- WHERE {vertex_id} <> {dest_vertex}
- UNION
- SELECT {grp_comma} {source_vertex} || path AS path
- FROM {temp2_name}
- WHERE {vertex_id} = {dest_vertex}
- UNION
- SELECT {grp_comma} '{{}}'::INT[] AS path
- FROM {apsp_table}
- WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
- AND {apsp_table}.parent IS NULL
- """.format(**locals()))
-
- out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
-
- if out.nrows() == 0:
- plpy.error( ("Graph APSP: Vertex {0} and/or {1} is not present"+
- " in the APSP table {1}").format(
- source_vertex,dest_vertex,apsp_table))
-
- plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
- format(**locals()))
-
- return None
+ @param apsp_table Name of the table that contains the APSP
+ output.
+ @param source_vertex The vertex that will be the source of the
+ desired path.
+ @param dest_vertex The vertex that will be the destination of the
+ desired path.
+ """
+
+ with MinWarning("warning"):
+ _validate_get_path(apsp_table, source_vertex, dest_vertex, path_table)
+
+ temp1_name = unique_string(desp='temp1')
+ temp2_name = unique_string(desp='temp2')
+
+ summary = plpy.execute("SELECT * FROM {0}_summary".format(apsp_table))
+ vertex_id = summary[0]['vertex_id']
+ edge_args = summary[0]['edge_args']
+ grouping_cols = summary[0]['grouping_cols']
+
+ params_types = {'src': str, 'dest': str, 'weight': str}
+ default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
+ edge_params = extract_keyvalue_params(edge_args,
+ params_types,
+ default_args)
+
+ src = edge_params["src"]
+ dest = edge_params["dest"]
+ weight = edge_params["weight"]
+
+ if vertex_id == "NULL":
+ vertex_id = "id"
+
+ if grouping_cols == "NULL":
+ grouping_cols = None
+
+ select_grps = ""
+ check_grps_t1 = ""
+ check_grps_t2 = ""
+ grp_comma = ""
+ tmp = ""
+
+ if grouping_cols is not None:
+ glist = split_quoted_delimited_str(grouping_cols)
+ select_grps = _grp_from_table(apsp_table, glist) + " , "
+ check_grps_t1 = " AND " + _check_groups(
+ apsp_table, temp1_name, glist)
+ check_grps_t2 = " AND " + _check_groups(
+ apsp_table, temp2_name, glist)
+
+ grp_comma = grouping_cols + " , "
+
+ # If the source and destination is the same vertex.
+ # There is no need to check the paths for any group.
+ if source_vertex == dest_vertex:
+ plpy.execute("""
+ CREATE TABLE {path_table} AS
+ SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
+ FROM {apsp_table} WHERE {src} = {source_vertex} AND
+ {dest} = {dest_vertex}
+ """.format(**locals()))
+ return
+
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+ format(temp1_name, temp2_name))
+
+ # Initialize the temporary tables
+ out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
+ SELECT {grp_comma} {apsp_table}.parent AS {vertex_id},
+ ARRAY[{dest_vertex}] AS path
+ FROM {apsp_table}
+ WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
+ AND {apsp_table}.parent IS NOT NULL
+ """.format(**locals()))
+
+ plpy.execute("""
+ CREATE TEMP TABLE {temp2_name} AS
+ SELECT * FROM {temp1_name} LIMIT 0
+ """.format(**locals()))
+
+ # Follow the 'parent' chain until you reach the case where the parent
+ # is the same as destination. This means it is the last vertex before
+ # the source.
+ while out.nrows() > 0:
+
+ plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
+ # If the parent id is not the same as dest,
+ # that means we have to follow the chain:
+ # Add it to the path and move to its parent
+ out = plpy.execute(
+ """ INSERT INTO {temp2_name}
+ SELECT {select_grps} {apsp_table}.parent AS {vertex_id},
+ {apsp_table}.{dest} || {temp1_name}.path AS path
+ FROM {apsp_table} INNER JOIN {temp1_name} ON
+ ({apsp_table}.{dest} = {temp1_name}.{vertex_id}
+ {check_grps_t1})
+ WHERE {src} = {source_vertex} AND
+ {apsp_table}.parent <> {apsp_table}.{dest}
+ """.format(**locals()))
+
+ tmp = temp2_name
+ temp2_name = temp1_name
+ temp1_name = tmp
+
+ tmp = check_grps_t1
+ check_grps_t1 = check_grps_t2
+ check_grps_t2 = tmp
+
+ # We have to consider 3 cases.
+ # 1) The path has more than 2 vertices:
+ # Add the current parent and the source vertex
+ # 2) The path has exactly 2 vertices (an edge between src and dest is
+ # the shortest path).
+ # Add the source vertex
+ # 3) The path has 0 vertices (unreachable)
+ # Add an empty array.
+
+ # Path with 1 vertex (src == dest) has been handled before
+ plpy.execute("""
+ CREATE TABLE {path_table} AS
+ SELECT {grp_comma} {source_vertex} || ({vertex_id} || path) AS path
+ FROM {temp2_name}
+ WHERE {vertex_id} <> {dest_vertex}
+ UNION
+ SELECT {grp_comma} {source_vertex} || path AS path
+ FROM {temp2_name}
+ WHERE {vertex_id} = {dest_vertex}
+ UNION
+ SELECT {grp_comma} '{{}}'::INT[] AS path
+ FROM {apsp_table}
+ WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
+ AND {apsp_table}.parent IS NULL
+ """.format(**locals()))
+
+ out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
+
+ if out.nrows() == 0:
+ plpy.error(("Graph APSP: Vertex {0} and/or {1} is not present" +
+ " in the APSP table {1}").format(
+ source_vertex, dest_vertex, apsp_table))
+
+ plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
+ format(**locals()))
+
+ return None
+
def _validate_apsp(vertex_table, vertex_id, edge_table, edge_params,
- out_table, glist, **kwargs):
+ out_table, glist, **kwargs):
+
+ validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
+ out_table, 'APSP')
- validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
- out_table,'APSP')
+ vt_error = plpy.execute(
+ """ SELECT {vertex_id}
+ FROM {vertex_table}
+ WHERE {vertex_id} IS NOT NULL
+ GROUP BY {vertex_id}
+ HAVING count(*) > 1 """.format(**locals()))
- vt_error = plpy.execute(
- """ SELECT {vertex_id}
- FROM {vertex_table}
- WHERE {vertex_id} IS NOT NULL
- GROUP BY {vertex_id}
- HAVING count(*) > 1 """.format(**locals()))
+ if vt_error.nrows() != 0:
+ plpy.error(
+ """Graph APSP: Source vertex table {vertex_table} contains duplicate vertex id's.""".
+ format(**locals()))
- if vt_error.nrows() != 0:
- plpy.error(
- """Graph APSP: Source vertex table {vertex_table} contains duplicate vertex id's.""".
- format(**locals()))
+ _assert(not table_exists(out_table + "_summary"),
+ "Graph APSP: Output summary table already exists!")
- _assert(not table_exists(out_table+"_summary"),
- "Graph APSP: Output summary table already exists!")
+ if glist is not None:
+ _assert(columns_exist_in_table(edge_table, glist),
+ """Graph APSP: Not all columns from {glist} are present in edge table ({edge_table}).""".
+ format(**locals()))
- if glist is not None:
- _assert(columns_exist_in_table(edge_table, glist),
- """Graph APSP: Not all columns from {glist} are present in edge table ({edge_table}).""".
- format(**locals()))
def _validate_get_path(apsp_table, source_vertex, dest_vertex,
- path_table, **kwargs):
+ path_table, **kwargs):
- _assert(apsp_table and apsp_table.strip().lower() not in ('null', ''),
- "Graph APSP: Invalid APSP table name!")
- _assert(table_exists(apsp_table),
- "Graph APSP: APSP table ({0}) is missing!".format(apsp_table))
- _assert(not table_is_empty(apsp_table),
- "Graph APSP: APSP table ({0}) is empty!".format(apsp_table))
+ _assert(apsp_table and apsp_table.strip().lower() not in ('null', ''),
+ "Graph APSP: Invalid APSP table name!")
+ _assert(table_exists(apsp_table),
+ "Graph APSP: APSP table ({0}) is missing!".format(apsp_table))
+ _assert(not table_is_empty(apsp_table),
+ "Graph APSP: APSP table ({0}) is empty!".format(apsp_table))
- summary = apsp_table+"_summary"
- _assert(table_exists(summary),
- "Graph APSP: APSP summary table ({0}) is missing!".format(summary))
- _assert(not table_is_empty(summary),
- "Graph APSP: APSP summary table ({0}) is empty!".format(summary))
+ summary = apsp_table + "_summary"
+ _assert(table_exists(summary),
+ "Graph APSP: APSP summary table ({0}) is missing!".format(summary))
+ _assert(not table_is_empty(summary),
+ "Graph APSP: APSP summary table ({0}) is empty!".format(summary))
- _assert(not table_exists(path_table),
- "Graph APSP: Output path table already exists!")
+ _assert(not table_exists(path_table),
+ "Graph APSP: Output path table already exists!")
+
+ return None
- return None
def graph_apsp_help(schema_madlib, message, **kwargs):
- """
- Help function for graph_apsp and graph_apsp_get_path
-
- Args:
- @param schema_madlib
- @param message: string, Help message string
- @param kwargs
-
- Returns:
- String. Help/usage information
- """
- if not message:
- help_string = """
+ """
+ Help function for graph_apsp and graph_apsp_get_path
+
+ Args:
+ @param schema_madlib
+ @param message: string, Help message string
+ @param kwargs
+
+ Returns:
+ String. Help/usage information
+ """
+ if not message:
+ help_string = """
-----------------------------------------------------------------------
SUMMARY
-----------------------------------------------------------------------
@@ -717,8 +717,8 @@ edges is minimized.
For more details on function usage:
SELECT {schema_madlib}.graph_apsp('usage')
"""
- elif message.lower() in ['usage', 'help', '?']:
- help_string = """
+ elif message.lower() in ['usage', 'help', '?']:
+ help_string = """
Given a graph, all pairs shortest path (apsp) algorithm finds a path for
every vertex pair such that the sum of the weights of its constituent
edges is minimized.
@@ -728,10 +728,10 @@ edges is minimized.
To retrieve the path for a specific vertex pair:
SELECT {schema_madlib}.graph_apsp_get_path(
- apsp_table TEXT, -- Name of the table that contains the apsp output.
+ apsp_table TEXT, -- Name of the table that contains the apsp output.
source_vertex INT, -- The vertex that will be the source of the
-- desired path.
- dest_vertex INT, -- The vertex that will be the destination of the
+ dest_vertex INT, -- The vertex that will be the destination of the
-- desired path.
path_table TEXT -- Name of the output table that contains the path.
);
@@ -762,8 +762,8 @@ every group and has the following columns:
- path (ARRAY) : The shortest path from the source vertex to the
destination vertex.
"""
- elif message.lower() in ("example", "examples"):
- help_string = """
+ elif message.lower() in ("example", "examples"):
+ help_string = """
----------------------------------------------------------------------------
EXAMPLES
----------------------------------------------------------------------------
@@ -806,11 +806,11 @@ INSERT INTO edge VALUES
-- Compute the apsp:
DROP TABLE IF EXISTS out;
SELECT madlib.graph_apsp(
- 'vertex', -- Vertex table
- 'id', -- Vertix id column
- 'edge', -- Edge table
- 'src=src, dest=dest, weight=weight', -- Comma delimited string of edge arguments
- 'out' -- Output table of apsp
+ 'vertex', -- Vertex table
+ 'id', -- Vertix id column
+ 'edge', -- Edge table
+ 'src=src, dest=dest, weight=weight', -- Comma delimited string of edge arguments
+ 'out' -- Output table of apsp
);
-- View the apsp costs for every vertex:
SELECT * FROM out ORDER BY src, dest;
@@ -834,11 +834,11 @@ INSERT INTO edge_gr VALUES
DROP TABLE IF EXISTS out_gr, out_gr_summary;
SELECT graph_apsp('vertex',NULL,'edge_gr',NULL,'out_gr','grp');
"""
- else:
- help_string = "No such option. Use {schema_madlib}.graph_apsp()"
+ else:
+ help_string = "No such option. Use {schema_madlib}.graph_apsp()"
- return help_string.format(schema_madlib=schema_madlib,
- graph_usage=get_graph_usage(schema_madlib, 'graph_apsp',
- """out_table TEXT, -- Name of the table to store the result of apsp.
+ return help_string.format(schema_madlib=schema_madlib,
+ graph_usage=get_graph_usage(schema_madlib, 'graph_apsp',
+ """out_table TEXT, -- Name of the table to store the result of apsp.
grouping_cols TEXT -- The list of grouping columns."""))
# ---------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/graph_utils.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/graph_utils.py_in b/src/ports/postgres/modules/graph/graph_utils.py_in
index 944c301..9d31345 100644
--- a/src/ports/postgres/modules/graph/graph_utils.py_in
+++ b/src/ports/postgres/modules/graph/graph_utils.py_in
@@ -27,115 +27,115 @@
@namespace graph
"""
-import plpy
-from utilities.control import MinWarning
from utilities.utilities import _assert
-from utilities.utilities import extract_keyvalue_params
-from utilities.utilities import unique_string
from utilities.validate_args import get_cols
from utilities.validate_args import unquote_ident
from utilities.validate_args import table_exists
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_is_empty
+
def _grp_null_checks(grp_list):
"""
- Helper function for generating NULL checks for grouping columns
+ Helper function for generating NULL checks for grouping columns
to be used within a WHERE clause
Args:
@param grp_list The list of grouping columns
"""
return ' AND '.join([" {i} IS NOT NULL ".format(**locals())
- for i in grp_list])
+ for i in grp_list])
+
def _check_groups(tbl1, tbl2, grp_list):
+ """
+ Helper function for joining tables with groups.
+ Args:
+ @param tbl1 Name of the first table
+ @param tbl2 Name of the second table
+ @param grp_list The list of grouping columns
+ """
- """
- Helper function for joining tables with groups.
- Args:
- @param tbl1 Name of the first table
- @param tbl2 Name of the second table
- @param grp_list The list of grouping columns
- """
+ return ' AND '.join([" {tbl1}.{i} = {tbl2}.{i} ".format(**locals())
+ for i in grp_list])
- return ' AND '.join([" {tbl1}.{i} = {tbl2}.{i} ".format(**locals())
- for i in grp_list])
def _grp_from_table(tbl, grp_list):
+ """
+ Helper function for selecting grouping columns of a table
+ Args:
+ @param tbl Name of the table
+ @param grp_list The list of grouping columns
+ """
+ return ' , '.join([" {tbl}.{i} ".format(**locals())
+ for i in grp_list])
- """
- Helper function for selecting grouping columns of a table
- Args:
- @param tbl Name of the table
- @param grp_list The list of grouping columns
- """
- return ' , '.join([" {tbl}.{i} ".format(**locals())
- for i in grp_list])
def validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
- out_table, func_name, **kwargs):
- """
- Validates graph tables (vertex and edge) as well as the output table.
- """
- _assert(out_table and out_table.strip().lower() not in ('null', ''),
- "Graph {func_name}: Invalid output table name!".format(**locals()))
- _assert(not table_exists(out_table),
- "Graph {func_name}: Output table already exists!".format(**locals()))
-
- _assert(vertex_table and vertex_table.strip().lower() not in ('null', ''),
- "Graph {func_name}: Invalid vertex table name!".format(**locals()))
- _assert(table_exists(vertex_table),
- "Graph {func_name}: Vertex table ({vertex_table}) is missing!".format(
- **locals()))
- _assert(not table_is_empty(vertex_table),
- "Graph {func_name}: Vertex table ({vertex_table}) is empty!".format(
- **locals()))
-
- _assert(edge_table and edge_table.strip().lower() not in ('null', ''),
- "Graph {func_name}: Invalid edge table name!".format(**locals()))
- _assert(table_exists(edge_table),
- "Graph {func_name}: Edge table ({edge_table}) is missing!".format(
- **locals()))
- _assert(not table_is_empty(edge_table),
- "Graph {func_name}: Edge table ({edge_table}) is empty!".format(
- **locals()))
-
- existing_cols = set(unquote_ident(i) for i in get_cols(vertex_table))
- _assert(vertex_id in existing_cols,
- """Graph {func_name}: The vertex column {vertex_id} is not present in vertex table ({vertex_table}) """.
- format(**locals()))
- _assert(columns_exist_in_table(edge_table, edge_params.values()),
- """Graph {func_name}: Not all columns from {cols} are present in edge table ({edge_table})""".
- format(cols=edge_params.values(), **locals()))
-
- return None
+ out_table, func_name, **kwargs):
+ """
+ Validates graph tables (vertex and edge) as well as the output table.
+ """
+ _assert(out_table and out_table.strip().lower() not in ('null', ''),
+ "Graph {func_name}: Invalid output table name!".format(**locals()))
+ _assert(not table_exists(out_table),
+ "Graph {func_name}: Output table already exists!".format(**locals()))
+
+ _assert(vertex_table and vertex_table.strip().lower() not in ('null', ''),
+ "Graph {func_name}: Invalid vertex table name!".format(**locals()))
+ _assert(table_exists(vertex_table),
+ "Graph {func_name}: Vertex table ({vertex_table}) is missing!".format(
+ **locals()))
+ _assert(not table_is_empty(vertex_table),
+ "Graph {func_name}: Vertex table ({vertex_table}) is empty!".format(
+ **locals()))
+
+ _assert(edge_table and edge_table.strip().lower() not in ('null', ''),
+ "Graph {func_name}: Invalid edge table name!".format(**locals()))
+ _assert(table_exists(edge_table),
+ "Graph {func_name}: Edge table ({edge_table}) is missing!".format(
+ **locals()))
+ _assert(not table_is_empty(edge_table),
+ "Graph {func_name}: Edge table ({edge_table}) is empty!".format(
+ **locals()))
+
+ existing_cols = set(unquote_ident(i) for i in get_cols(vertex_table))
+ _assert(vertex_id in existing_cols,
+ """Graph {func_name}: The vertex column {vertex_id} is not present in vertex table ({vertex_table}) """.
+ format(**locals()))
+ _assert(columns_exist_in_table(edge_table, edge_params.values()),
+ """Graph {func_name}: Not all columns from {cols} are present in edge table ({edge_table})""".
+ format(cols=edge_params.values(), **locals()))
+
+ return None
+
def get_graph_usage(schema_madlib, func_name, other_text):
- usage = """
-----------------------------------------------------------------------------
- USAGE
-----------------------------------------------------------------------------
- SELECT {schema_madlib}.{func_name}(
- vertex_table TEXT, -- Name of the table that contains the vertex data.
- vertex_id TEXT, -- Name of the column containing the vertex ids.
- edge_table TEXT, -- Name of the table that contains the edge data.
- edge_args TEXT{comma} -- A comma-delimited string containing multiple
- -- named arguments of the form "name=value".
- {other_text}
-);
-
-The following parameters are supported for edge table arguments ('edge_args'
- above):
-
-src (default = 'src') : Name of the column containing the source
- vertex ids in the edge table.
-dest (default = 'dest') : Name of the column containing the destination
- vertex ids in the edge table.
-weight (default = 'weight') : Name of the column containing the weight of
- edges in the edge table.
-""".format(schema_madlib=schema_madlib, func_name=func_name,
- other_text=other_text, comma = ',' if other_text is not None else ' ')
-
- return usage
+ usage = """
+ ----------------------------------------------------------------------------
+ USAGE
+ ----------------------------------------------------------------------------
+ SELECT {schema_madlib}.{func_name}(
+ vertex_table TEXT, -- Name of the table that contains the vertex data.
+ vertex_id TEXT, -- Name of the column containing the vertex ids.
+ edge_table TEXT, -- Name of the table that contains the edge data.
+ edge_args TEXT{comma} -- A comma-delimited string containing multiple
+ -- named arguments of the form "name=value".
+ {other_text}
+ );
+
+ The following parameters are supported for edge table arguments
+ ('edge_args' above):
+
+ src (default = 'src'): Name of the column containing the source
+ vertex ids in the edge table.
+ dest (default = 'dest'): Name of the column containing the destination
+ vertex ids in the edge table.
+ weight (default = 'weight'): Name of the column containing the weight of
+ edges in the edge table.
+ """.format(schema_madlib=schema_madlib,
+ func_name=func_name,
+ other_text=other_text,
+ comma=',' if other_text is not None else ' ')
+ return usage