You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ok...@apache.org on 2017/08/29 20:41:54 UTC
[17/50] [abbrv] incubator-madlib git commit: Graph: Update Python
code to follow PEP-8
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/sssp.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/sssp.py_in b/src/ports/postgres/modules/graph/sssp.py_in
index 4839d2d..93497c4 100644
--- a/src/ports/postgres/modules/graph/sssp.py_in
+++ b/src/ports/postgres/modules/graph/sssp.py_in
@@ -33,21 +33,22 @@ from graph_utils import get_graph_usage
from graph_utils import _grp_from_table
from graph_utils import _check_groups
from utilities.control import MinWarning
+
from utilities.utilities import _assert
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string
from utilities.utilities import split_quoted_delimited_str
+from utilities.utilities import is_platform_pg, is_platform_hawq
+
from utilities.validate_args import table_exists
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_is_empty
from utilities.validate_args import get_expr_type
-m4_changequote(`<!', `!>')
def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
- edge_args, source_vertex, out_table, grouping_cols, **kwargs):
-
- """
+ edge_args, source_vertex, out_table, grouping_cols, **kwargs):
+ """
Single source shortest path function for graphs using the Bellman-Ford
algorhtm [1].
Args:
@@ -63,383 +64,382 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
[1] https://en.wikipedia.org/wiki/Bellman-Ford_algorithm
"""
- with MinWarning("warning"):
-
- INT_MAX = 2147483647
- INFINITY = "'Infinity'"
- EPSILON = 0.000001
-
- message = unique_string(desp='message')
-
- oldupdate = unique_string(desp='oldupdate')
- newupdate = unique_string(desp='newupdate')
-
- params_types = {'src': str, 'dest': str, 'weight': str}
- default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
- edge_params = extract_keyvalue_params(edge_args,
- params_types,
- default_args)
-
- # Prepare the input for recording in the summary table
- if vertex_id is None:
- v_st= "NULL"
- vertex_id = "id"
- else:
- v_st = vertex_id
- if edge_args is None:
- e_st = "NULL"
- else:
- e_st = edge_args
- if grouping_cols is None:
- g_st = "NULL"
- glist = None
- else:
- g_st = grouping_cols
- glist = split_quoted_delimited_str(grouping_cols)
-
- src = edge_params["src"]
- dest = edge_params["dest"]
- weight = edge_params["weight"]
-
- distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
- <!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
- local_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
- <!"DISTRIBUTED BY (id)"!>)
-
- is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
- _validate_sssp(vertex_table, vertex_id, edge_table,
- edge_params, source_vertex, out_table, glist)
-
- plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format(
- message,oldupdate,newupdate))
-
- # Initialize grouping related variables
- comma_grp = ""
- comma_grp_e = ""
- comma_grp_m = ""
- grp_comma = ""
- checkg_oo = ""
- checkg_eo = ""
- checkg_ex = ""
- checkg_om = ""
- group_by = ""
-
- if grouping_cols is not None:
- comma_grp = " , " + grouping_cols
- group_by = " , " + _grp_from_table(edge_table,glist)
- comma_grp_e = " , " + _grp_from_table(edge_table,glist)
- comma_grp_m = " , " + _grp_from_table("message",glist)
- grp_comma = grouping_cols + " , "
-
- checkg_oo_sub = _check_groups(out_table,"oldupdate",glist)
- checkg_oo = " AND " + checkg_oo_sub
- checkg_eo = " AND " + _check_groups(edge_table,"oldupdate",glist)
- checkg_ex = " AND " + _check_groups(edge_table,"x",glist)
- checkg_om = " AND " + _check_groups("out_table","message",glist)
-
- w_type = get_expr_type(weight,edge_table).lower()
- init_w = INT_MAX
- if w_type in ['real','double precision','float8']:
- init_w = INFINITY
-
- # We keep a table of every vertex, the minimum cost to that destination
- # seen so far and the parent to this vertex in the associated shortest
- # path. This table will be updated throughout the execution.
- plpy.execute(
- """ CREATE TABLE {out_table} AS ( SELECT
- {grp_comma} {src} AS {vertex_id}, {weight},
- {src} AS parent FROM {edge_table} LIMIT 0)
- {distribution} """.format(**locals()))
-
- # We keep a summary table to keep track of the parameters used for this
- # SSSP run. This table is used in the path finding function to eliminate
- # the need for repetition.
- plpy.execute( """ CREATE TABLE {out_table}_summary (
- vertex_table TEXT,
- vertex_id TEXT,
- edge_table TEXT,
- edge_args TEXT,
- source_vertex INTEGER,
- out_table TEXT,
- grouping_cols TEXT)
- """.format(**locals()))
- plpy.execute( """ INSERT INTO {out_table}_summary VALUES
- ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
- {source_vertex}, '{out_table}', '{g_st}')
- """.format(**locals()))
-
- # We keep 2 update tables and alternate them during the execution.
- # This is necessary since we need to know which vertices are updated in
- # the previous iteration to calculate the next set of updates.
- plpy.execute(
- """ CREATE TEMP TABLE {oldupdate} AS ( SELECT
- {src} AS id, {weight},
- {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
- {local_distribution}
- """.format(**locals()))
- plpy.execute(
- """ CREATE TEMP TABLE {newupdate} AS ( SELECT
- {src} AS id, {weight},
- {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
- {local_distribution}
- """.format(**locals()))
-
- # Since HAWQ does not allow us to update, we create a new table and
- # rename at every iteration.
- if is_hawq:
- temp_table = unique_string(desp='temp')
- sql =""" CREATE TABLE {temp_table} AS ( SELECT * FROM {out_table} )
- {distribution} """
- plpy.execute(sql.format(**locals()))
-
- # GPDB and HAWQ have distributed by clauses to help them with indexing.
- # For Postgres we add the indices manually.
- sql_index = m4_ifdef(<!__POSTGRESQL__!>,
- <!""" CREATE INDEX ON {out_table} ({vertex_id});
- CREATE INDEX ON {oldupdate} (id);
- CREATE INDEX ON {newupdate} (id);
- """.format(**locals())!>,
- <!''!>)
- plpy.execute(sql_index)
-
- # The initialization step is quite different when grouping is involved
- # since not every group (subgraph) will have the same set of vertices.
-
- # Example:
- # Assume there are two grouping columns g1 and g2
- # g1 values are 0 and 1. g2 values are 5 and 6
- if grouping_cols is not None:
-
- distinct_grp_table = unique_string(desp='grp')
- plpy.execute(""" DROP TABLE IF EXISTS {distinct_grp_table} """.
- format(**locals()))
- plpy.execute( """ CREATE TEMP TABLE {distinct_grp_table} AS
- SELECT DISTINCT {grouping_cols} FROM {edge_table} """.
- format(**locals()))
- subq = unique_string(desp='subquery')
-
- checkg_ds_sub = _check_groups(distinct_grp_table,subq,glist)
- grp_d_comma = _grp_from_table(distinct_grp_table,glist) +","
-
- plpy.execute(
- """ INSERT INTO {out_table}
- SELECT {grp_d_comma} {vertex_id} AS {vertex_id},
- {init_w} AS {weight}, NULL::INT AS parent
- FROM {distinct_grp_table} INNER JOIN
- (
- SELECT {src} AS {vertex_id} {comma_grp}
- FROM {edge_table}
- UNION
- SELECT {dest} AS {vertex_id} {comma_grp}
- FROM {edge_table}
- ) {subq} ON ({checkg_ds_sub})
- WHERE {vertex_id} IS NOT NULL
- """.format(**locals()))
-
- plpy.execute(
- """ INSERT INTO {oldupdate}
- SELECT {source_vertex}, 0, {source_vertex},
- {grouping_cols}
- FROM {distinct_grp_table}
- """.format(**locals()))
-
- # The maximum number of vertices for any group.
- # Used for determining negative cycles.
- v_cnt = plpy.execute(
- """ SELECT max(count) as max FROM (
- SELECT count({vertex_id}) AS count
- FROM {out_table}
- GROUP BY {grouping_cols}) x
- """.format(**locals()))[0]['max']
- plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
- else:
- plpy.execute(
- """ INSERT INTO {out_table}
- SELECT {vertex_id} AS {vertex_id},
- {init_w} AS {weight},
- NULL AS parent
- FROM {vertex_table}
- WHERE {vertex_id} IS NOT NULL
- """.format(**locals()))
-
- # The source can be reached with 0 cost and it has itself as the
- # parent.
- plpy.execute(
- """ INSERT INTO {oldupdate}
- VALUES({source_vertex},0,{source_vertex})
- """.format(**locals()))
-
- v_cnt = plpy.execute(
- """ SELECT count(*) FROM {vertex_table}
- WHERE {vertex_id} IS NOT NULL
- """.format(**locals()))[0]['count']
-
- for i in range(0,v_cnt+1):
-
- # Apply the updates calculated in the last iteration.
- if is_hawq:
- sql = """
- TRUNCATE TABLE {temp_table};
- INSERT INTO {temp_table}
- SELECT *
- FROM {out_table}
- WHERE NOT EXISTS (
- SELECT 1
- FROM {oldupdate} as oldupdate
- WHERE {out_table}.{vertex_id} = oldupdate.id
- {checkg_oo})
- UNION
- SELECT {grp_comma} id, {weight}, parent FROM {oldupdate};
- """
- plpy.execute(sql.format(**locals()))
- plpy.execute("DROP TABLE {0}".format(out_table))
- plpy.execute("ALTER TABLE {0} RENAME TO {1}".
- format(temp_table,out_table))
- sql = """ CREATE TABLE {temp_table} AS (
- SELECT * FROM {out_table} LIMIT 0)
- {distribution};"""
- plpy.execute(sql.format(**locals()))
- ret = plpy.execute("SELECT id FROM {0} LIMIT 1".
- format(oldupdate))
- else:
- sql = """
- UPDATE {out_table} SET
- {weight}=oldupdate.{weight},
- parent=oldupdate.parent
- FROM
- {oldupdate} AS oldupdate
- WHERE
- {out_table}.{vertex_id}=oldupdate.id AND
- {out_table}.{weight}>oldupdate.{weight} {checkg_oo}
- """
- ret = plpy.execute(sql.format(**locals()))
-
- if ret.nrows() == 0:
- break
-
- plpy.execute("TRUNCATE TABLE {0}".format(newupdate))
-
- # 'oldupdate' table has the update info from the last iteration
-
- # Consider every edge that has an updated source
- # From these edges:
- # For every destination vertex, find the min total cost to reach.
- # Note that, just calling an aggregate function with group by won't
- # let us store the src field of the edge (needed for the parent).
- # This is why we need the 'x'; it gives a list of destinations and
- # associated min values. Using these values, we identify which edge
- # is selected.
-
- # Since using '=' with floats is dangerous we use an epsilon value
- # for comparison.
-
- # Once we have a list of edges and values (stores as 'message'),
- # we check if these values are lower than the existing shortest
- # path values.
-
- sql = (""" INSERT INTO {newupdate}
- SELECT DISTINCT ON (message.id {comma_grp})
- message.id AS id,
- message.{weight} AS {weight},
- message.parent AS parent {comma_grp_m}
- FROM {out_table} AS out_table INNER JOIN
- (
- SELECT {edge_table}.{dest} AS id, x.{weight} AS {weight},
- oldupdate.id AS parent {comma_grp_e}
- FROM {oldupdate} AS oldupdate INNER JOIN
- {edge_table} ON
- ({edge_table}.{src} = oldupdate.id {checkg_eo})
- INNER JOIN
- (
- SELECT {edge_table}.{dest} AS id,
- min(oldupdate.{weight} +
- {edge_table}.{weight}) AS {weight} {comma_grp_e}
- FROM {oldupdate} AS oldupdate INNER JOIN
- {edge_table} ON
- ({edge_table}.{src}=oldupdate.id {checkg_eo})
- GROUP BY {edge_table}.{dest} {comma_grp_e}
- ) x
- ON ({edge_table}.{dest} = x.id {checkg_ex} )
- WHERE ABS(oldupdate.{weight} + {edge_table}.{weight}
- - x.{weight}) < {EPSILON}
- ) message
- ON (message.id = out_table.{vertex_id} {checkg_om})
- WHERE message.{weight}<out_table.{weight}
- """.format(**locals()))
-
- plpy.execute(sql)
-
- # Swap the update tables for the next iteration.
- tmp = oldupdate
- oldupdate = newupdate
- newupdate = tmp
-
- plpy.execute("DROP TABLE IF EXISTS {0}".format(newupdate))
- # The algorithm should converge in less than |V| iterations.
- # Otherwise there is a negative cycle in the graph.
- if i == v_cnt:
- if grouping_cols is None:
- plpy.execute("DROP TABLE IF EXISTS {0},{1},{2}".
- format(out_table, out_table+"_summary", oldupdate))
- if is_hawq:
- plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_table))
- plpy.error("Graph SSSP: Detected a negative cycle in the graph.")
-
- # It is possible that not all groups has negative cycles.
- else:
-
- # negs is the string created by collating grouping columns.
- # By looking at the oldupdate table we can see which groups
- # are in a negative cycle.
-
- negs = plpy.execute(
- """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
- FROM {oldupdate}
- """.format(**locals()))[0]['grp']
-
- # Delete the groups with negative cycles from the output table.
- if is_hawq:
- sql_del = """
- TRUNCATE TABLE {temp_table};
- INSERT INTO {temp_table}
- SELECT *
- FROM {out_table}
- WHERE NOT EXISTS(
- SELECT 1
- FROM {oldupdate} as oldupdate
- WHERE {checkg_oo_sub}
- );"""
- plpy.execute(sql_del.format(**locals()))
- plpy.execute("DROP TABLE {0}".format(out_table))
- plpy.execute("ALTER TABLE {0} RENAME TO {1}".
- format(temp_table,out_table))
- else:
- sql_del = """ DELETE FROM {out_table}
- USING {oldupdate} AS oldupdate
- WHERE {checkg_oo_sub}"""
- plpy.execute(sql_del.format(**locals()))
-
- # If every group has a negative cycle,
- # drop the output table as well.
- if table_is_empty(out_table):
- plpy.execute("DROP TABLE IF EXISTS {0},{1}".
- format(out_table,out_table+"_summary"))
-
- plpy.warning(
- """Graph SSSP: Detected a negative cycle in the """ +
- """sub-graphs of following groups: {0}.""".
- format(str(negs)[1:-1]))
-
- plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
- if is_hawq:
- plpy.execute("DROP TABLE IF EXISTS {temp_table} ".
- format(**locals()))
-
- return None
+ with MinWarning("warning"):
+
+ INT_MAX = 2147483647
+ INFINITY = "'Infinity'"
+ EPSILON = 0.000001
+
+ message = unique_string(desp='message')
+
+ oldupdate = unique_string(desp='oldupdate')
+ newupdate = unique_string(desp='newupdate')
+
+ params_types = {'src': str, 'dest': str, 'weight': str}
+ default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
+ edge_params = extract_keyvalue_params(edge_args,
+ params_types,
+ default_args)
+
+ # Prepare the input for recording in the summary table
+ if vertex_id is None:
+ v_st = "NULL"
+ vertex_id = "id"
+ else:
+ v_st = vertex_id
+ if edge_args is None:
+ e_st = "NULL"
+ else:
+ e_st = edge_args
+ if grouping_cols is None:
+ g_st = "NULL"
+ glist = None
+ else:
+ g_st = grouping_cols
+ glist = split_quoted_delimited_str(grouping_cols)
+
+ src = edge_params["src"]
+ dest = edge_params["dest"]
+ weight = edge_params["weight"]
+
+ distribution = '' if is_platform_pg() else "DISTRIBUTED BY ({0})".format(vertex_id)
+ local_distribution = '' if is_platform_pg() else "DISTRIBUTED BY (id)"
+
+ is_hawq = is_platform_hawq()
+
+ _validate_sssp(vertex_table, vertex_id, edge_table,
+ edge_params, source_vertex, out_table, glist)
+
+ plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format(
+ message, oldupdate, newupdate))
+
+ # Initialize grouping related variables
+ comma_grp = ""
+ comma_grp_e = ""
+ comma_grp_m = ""
+ grp_comma = ""
+ checkg_oo = ""
+ checkg_eo = ""
+ checkg_ex = ""
+ checkg_om = ""
+ group_by = ""
+
+ if grouping_cols is not None:
+ comma_grp = " , " + grouping_cols
+ group_by = " , " + _grp_from_table(edge_table, glist)
+ comma_grp_e = " , " + _grp_from_table(edge_table, glist)
+ comma_grp_m = " , " + _grp_from_table("message", glist)
+ grp_comma = grouping_cols + " , "
+
+ checkg_oo_sub = _check_groups(out_table, "oldupdate", glist)
+ checkg_oo = " AND " + checkg_oo_sub
+ checkg_eo = " AND " + _check_groups(edge_table, "oldupdate", glist)
+ checkg_ex = " AND " + _check_groups(edge_table, "x", glist)
+ checkg_om = " AND " + _check_groups("out_table", "message", glist)
+
+ w_type = get_expr_type(weight, edge_table).lower()
+ init_w = INT_MAX
+ if w_type in ['real', 'double precision', 'float8']:
+ init_w = INFINITY
+
+ # We keep a table of every vertex, the minimum cost to that destination
+ # seen so far and the parent to this vertex in the associated shortest
+ # path. This table will be updated throughout the execution.
+ plpy.execute(
+ """ CREATE TABLE {out_table} AS (SELECT
+ {grp_comma} {src} AS {vertex_id}, {weight},
+ {src} AS parent FROM {edge_table} LIMIT 0)
+ {distribution} """.format(**locals()))
+
+ # We keep a summary table to keep track of the parameters used for this
+ # SSSP run. This table is used in the path finding function to eliminate
+ # the need for repetition.
+ plpy.execute(""" CREATE TABLE {out_table}_summary (
+ vertex_table TEXT,
+ vertex_id TEXT,
+ edge_table TEXT,
+ edge_args TEXT,
+ source_vertex INTEGER,
+ out_table TEXT,
+ grouping_cols TEXT)
+ """.format(**locals()))
+ plpy.execute(""" INSERT INTO {out_table}_summary VALUES
+ ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
+ {source_vertex}, '{out_table}', '{g_st}')
+ """.format(**locals()))
+
+ # We keep 2 update tables and alternate them during the execution.
+ # This is necessary since we need to know which vertices are updated in
+ # the previous iteration to calculate the next set of updates.
+ plpy.execute(
+ """ CREATE TEMP TABLE {oldupdate} AS (SELECT
+ {src} AS id, {weight},
+ {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
+ {local_distribution}
+ """.format(**locals()))
+ plpy.execute(
+ """ CREATE TEMP TABLE {newupdate} AS (SELECT
+ {src} AS id, {weight},
+ {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
+ {local_distribution}
+ """.format(**locals()))
+
+ # Since HAWQ does not allow us to update, we create a new table and
+ # rename at every iteration.
+ if is_hawq:
+ temp_table = unique_string(desp='temp')
+ sql = """ CREATE TABLE {temp_table} AS (SELECT * FROM {out_table} )
+ {distribution} """
+ plpy.execute(sql.format(**locals()))
+
+ # GPDB and HAWQ have distributed by clauses to help them with indexing.
+ # For Postgres we add the indices manually.
+ if is_platform_pg():
+ plpy.execute("""
+ CREATE INDEX ON {out_table} ({vertex_id});
+ CREATE INDEX ON {oldupdate} (id);
+ CREATE INDEX ON {newupdate} (id);
+ """.format(**locals()))
+
+ # The initialization step is quite different when grouping is involved
+ # since not every group (subgraph) will have the same set of vertices.
+
+ # Example:
+ # Assume there are two grouping columns g1 and g2
+ # g1 values are 0 and 1. g2 values are 5 and 6
+ if grouping_cols is not None:
+
+ distinct_grp_table = unique_string(desp='grp')
+ plpy.execute("DROP TABLE IF EXISTS {distinct_grp_table}".
+ format(**locals()))
+ plpy.execute("""
+ CREATE TEMP TABLE {distinct_grp_table} AS
+ SELECT DISTINCT {grouping_cols} FROM {edge_table}
+ """.format(**locals()))
+ subq = unique_string(desp='subquery')
+
+ checkg_ds_sub = _check_groups(distinct_grp_table, subq, glist)
+ grp_d_comma = _grp_from_table(distinct_grp_table, glist) + ","
+
+ plpy.execute("""
+ INSERT INTO {out_table}
+ SELECT {grp_d_comma} {vertex_id} AS {vertex_id},
+ {init_w} AS {weight}, NULL::INT AS parent
+ FROM {distinct_grp_table} INNER JOIN
+ (
+ SELECT {src} AS {vertex_id} {comma_grp}
+ FROM {edge_table}
+ UNION
+ SELECT {dest} AS {vertex_id} {comma_grp}
+ FROM {edge_table}
+ ) {subq} ON ({checkg_ds_sub})
+ WHERE {vertex_id} IS NOT NULL
+ """.format(**locals()))
+
+ plpy.execute("""
+ INSERT INTO {oldupdate}
+ SELECT {source_vertex}, 0, {source_vertex},
+ {grouping_cols}
+ FROM {distinct_grp_table}
+ """.format(**locals()))
+
+ # The maximum number of vertices for any group.
+ # Used for determining negative cycles.
+ v_cnt = plpy.execute("""
+ SELECT max(count) as max FROM (
+ SELECT count({vertex_id}) AS count
+ FROM {out_table}
+ GROUP BY {grouping_cols}) x
+ """.format(**locals()))[0]['max']
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
+ else:
+ plpy.execute("""
+ INSERT INTO {out_table}
+ SELECT {vertex_id} AS {vertex_id},
+ {init_w} AS {weight},
+ NULL AS parent
+ FROM {vertex_table}
+ WHERE {vertex_id} IS NOT NULL
+ """.format(**locals()))
+
+ # The source can be reached with 0 cost and it has itself as the
+ # parent.
+ plpy.execute("""
+ INSERT INTO {oldupdate}
+ VALUES({source_vertex},0,{source_vertex})
+ """.format(**locals()))
+
+ v_cnt = plpy.execute("""
+ SELECT count(*) FROM {vertex_table}
+ WHERE {vertex_id} IS NOT NULL
+ """.format(**locals()))[0]['count']
+
+ for i in range(0, v_cnt + 1):
+
+ # Apply the updates calculated in the last iteration.
+ if is_hawq:
+ sql = """
+ TRUNCATE TABLE {temp_table};
+ INSERT INTO {temp_table}
+ SELECT *
+ FROM {out_table}
+ WHERE NOT EXISTS (
+ SELECT 1
+ FROM {oldupdate} as oldupdate
+ WHERE {out_table}.{vertex_id} = oldupdate.id
+ {checkg_oo})
+ UNION
+ SELECT {grp_comma} id, {weight}, parent FROM {oldupdate};
+ """
+ plpy.execute(sql.format(**locals()))
+ plpy.execute("DROP TABLE {0}".format(out_table))
+ plpy.execute("ALTER TABLE {0} RENAME TO {1}".
+ format(temp_table, out_table))
+ sql = """ CREATE TABLE {temp_table} AS (
+ SELECT * FROM {out_table} LIMIT 0)
+ {distribution};"""
+ plpy.execute(sql.format(**locals()))
+ ret = plpy.execute("SELECT id FROM {0} LIMIT 1".
+ format(oldupdate))
+ else:
+ sql = """
+ UPDATE {out_table} SET
+ {weight} = oldupdate.{weight},
+ parent = oldupdate.parent
+ FROM
+ {oldupdate} AS oldupdate
+ WHERE
+ {out_table}.{vertex_id} = oldupdate.id AND
+ {out_table}.{weight} > oldupdate.{weight} {checkg_oo}
+ """
+ ret = plpy.execute(sql.format(**locals()))
+
+ if ret.nrows() == 0:
+ break
+
+ plpy.execute("TRUNCATE TABLE {0}".format(newupdate))
+
+ # 'oldupdate' table has the update info from the last iteration
+
+ # Consider every edge that has an updated source
+ # From these edges:
+ # For every destination vertex, find the min total cost to reach.
+ # Note that, just calling an aggregate function with group by won't
+ # let us store the src field of the edge (needed for the parent).
+ # This is why we need the 'x'; it gives a list of destinations and
+ # associated min values. Using these values, we identify which edge
+ # is selected.
+
+ # Since using '=' with floats is dangerous we use an epsilon value
+ # for comparison.
+
+ # Once we have a list of edges and values (stores as 'message'),
+ # we check if these values are lower than the existing shortest
+ # path values.
+
+ sql = (""" INSERT INTO {newupdate}
+ SELECT DISTINCT ON (message.id {comma_grp})
+ message.id AS id,
+ message.{weight} AS {weight},
+ message.parent AS parent {comma_grp_m}
+ FROM {out_table} AS out_table INNER JOIN
+ (
+ SELECT {edge_table}.{dest} AS id, x.{weight} AS {weight},
+ oldupdate.id AS parent {comma_grp_e}
+ FROM {oldupdate} AS oldupdate INNER JOIN
+ {edge_table} ON
+ ({edge_table}.{src} = oldupdate.id {checkg_eo})
+ INNER JOIN
+ (
+ SELECT {edge_table}.{dest} AS id,
+ min(oldupdate.{weight} +
+ {edge_table}.{weight}) AS {weight} {comma_grp_e}
+ FROM {oldupdate} AS oldupdate INNER JOIN
+ {edge_table} ON
+ ({edge_table}.{src}=oldupdate.id {checkg_eo})
+ GROUP BY {edge_table}.{dest} {comma_grp_e}
+ ) x
+ ON ({edge_table}.{dest} = x.id {checkg_ex} )
+ WHERE ABS(oldupdate.{weight} + {edge_table}.{weight}
+ - x.{weight}) < {EPSILON}
+ ) message
+ ON (message.id = out_table.{vertex_id} {checkg_om})
+ WHERE message.{weight}<out_table.{weight}
+ """.format(**locals()))
+
+ plpy.execute(sql)
+
+ # Swap the update tables for the next iteration.
+ tmp = oldupdate
+ oldupdate = newupdate
+ newupdate = tmp
+
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(newupdate))
+ # The algorithm should converge in less than |V| iterations.
+ # Otherwise there is a negative cycle in the graph.
+ if i == v_cnt:
+ if grouping_cols is None:
+ plpy.execute("DROP TABLE IF EXISTS {0},{1},{2}".
+ format(out_table, out_table + "_summary", oldupdate))
+ if is_hawq:
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_table))
+ plpy.error("Graph SSSP: Detected a negative cycle in the graph.")
+
+ # It is possible that not all groups has negative cycles.
+ else:
+
+ # negs is the string created by collating grouping columns.
+ # By looking at the oldupdate table we can see which groups
+ # are in a negative cycle.
+
+ negs = plpy.execute(
+ """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
+ FROM {oldupdate}
+ """.format(**locals()))[0]['grp']
+
+ # Delete the groups with negative cycles from the output table.
+ if is_hawq:
+ sql_del = """
+ TRUNCATE TABLE {temp_table};
+ INSERT INTO {temp_table}
+ SELECT *
+ FROM {out_table}
+ WHERE NOT EXISTS(
+ SELECT 1
+ FROM {oldupdate} as oldupdate
+ WHERE {checkg_oo_sub}
+ );"""
+ plpy.execute(sql_del.format(**locals()))
+ plpy.execute("DROP TABLE {0}".format(out_table))
+ plpy.execute("ALTER TABLE {0} RENAME TO {1}".
+ format(temp_table, out_table))
+ else:
+ sql_del = """ DELETE FROM {out_table}
+ USING {oldupdate} AS oldupdate
+ WHERE {checkg_oo_sub}"""
+ plpy.execute(sql_del.format(**locals()))
+
+ # If every group has a negative cycle,
+ # drop the output table as well.
+ if table_is_empty(out_table):
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+ format(out_table, out_table + "_summary"))
+
+ plpy.warning(
+ """Graph SSSP: Detected a negative cycle in the """ +
+ """sub-graphs of following groups: {0}.""".
+ format(str(negs)[1:-1]))
+
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
+ if is_hawq:
+ plpy.execute("DROP TABLE IF EXISTS {temp_table} ".
+ format(**locals()))
+ return None
+
def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, path_table,
- **kwargs):
- """
+ **kwargs):
+ """
Helper function that can be used to get the shortest path for a vertex
Args:
@param sssp_table Name of the table that contains the SSSP output.
@@ -447,188 +447,187 @@ def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, path_table,
desired path.
@param path_table Name of the output table that contains the path.
- """
- with MinWarning("warning"):
- _validate_get_path(sssp_table, dest_vertex, path_table)
-
- temp1_name = unique_string(desp='temp1')
- temp2_name = unique_string(desp='temp2')
-
- select_grps = ""
- check_grps_t1 = ""
- check_grps_t2 = ""
- grp_comma = ""
- tmp = ""
-
- summary = plpy.execute("SELECT * FROM {0}_summary".format(sssp_table))
- vertex_id = summary[0]['vertex_id']
- source_vertex = summary[0]['source_vertex']
-
- if vertex_id == "NULL":
- vertex_id = "id"
-
- grouping_cols = summary[0]['grouping_cols']
- if grouping_cols == "NULL":
- grouping_cols = None
-
- if grouping_cols is not None:
- glist = split_quoted_delimited_str(grouping_cols)
- select_grps = _grp_from_table(sssp_table,glist) + " , "
- check_grps_t1 = " AND " + _check_groups(
- sssp_table,temp1_name,glist)
- check_grps_t2 = " AND " + _check_groups(
- sssp_table,temp2_name,glist)
-
- grp_comma = grouping_cols + " , "
-
- if source_vertex == dest_vertex:
- plpy.execute("""
- CREATE TABLE {path_table} AS
- SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
- FROM {sssp_table} WHERE {vertex_id} = {dest_vertex}
- """.format(**locals()))
- return
-
- plpy.execute( "DROP TABLE IF EXISTS {0},{1}".
- format(temp1_name,temp2_name));
- out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
- SELECT {grp_comma} {sssp_table}.parent AS {vertex_id},
- ARRAY[{dest_vertex}] AS path
- FROM {sssp_table}
- WHERE {vertex_id} = {dest_vertex}
- AND {sssp_table}.parent IS NOT NULL
- """.format(**locals()))
-
- plpy.execute("""
- CREATE TEMP TABLE {temp2_name} AS
- SELECT * FROM {temp1_name} LIMIT 0
- """.format(**locals()))
-
- # Follow the 'parent' chain until you reach the source.
- while out.nrows() > 0:
-
- plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
- # If the vertex id is not the source vertex,
- # Add it to the path and move to its parent
- out = plpy.execute(
- """ INSERT INTO {temp2_name}
- SELECT {select_grps} {sssp_table}.parent AS {vertex_id},
- {sssp_table}.{vertex_id} || {temp1_name}.path AS path
- FROM {sssp_table} INNER JOIN {temp1_name} ON
- ({sssp_table}.{vertex_id} = {temp1_name}.{vertex_id}
- {check_grps_t1})
- WHERE {source_vertex} <> {sssp_table}.{vertex_id}
- """.format(**locals()))
-
- tmp = temp2_name
- temp2_name = temp1_name
- temp1_name = tmp
-
- tmp = check_grps_t1
- check_grps_t1 = check_grps_t2
- check_grps_t2 = tmp
-
- # Add the source vertex to the beginning of every path and
- # add the empty arrays for the groups that don't have a path to reach
- # the destination vertex
- plpy.execute("""
- CREATE TABLE {path_table} AS
- SELECT {grp_comma} {source_vertex} || path AS path
- FROM {temp2_name}
- UNION
- SELECT {grp_comma} '{{}}'::INT[] AS path
- FROM {sssp_table}
- WHERE {vertex_id} = {dest_vertex}
- AND {sssp_table}.parent IS NULL
- """.format(**locals()))
-
- out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
-
- if out.nrows() == 0:
- plpy.error(
- "Graph SSSP: Vertex {0} is not present in the SSSP table {1}".
- format(dest_vertex,sssp_table))
-
- plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
- format(**locals()))
-
- return None
+ """
+ with MinWarning("warning"):
+ _validate_get_path(sssp_table, dest_vertex, path_table)
+
+ temp1_name = unique_string(desp='temp1')
+ temp2_name = unique_string(desp='temp2')
+
+ select_grps = ""
+ check_grps_t1 = ""
+ check_grps_t2 = ""
+ grp_comma = ""
+ tmp = ""
+
+ summary = plpy.execute("SELECT * FROM {0}_summary".format(sssp_table))
+ vertex_id = summary[0]['vertex_id']
+ source_vertex = summary[0]['source_vertex']
+
+ if vertex_id == "NULL":
+ vertex_id = "id"
+
+ grouping_cols = summary[0]['grouping_cols']
+ if grouping_cols == "NULL":
+ grouping_cols = None
+
+ if grouping_cols is not None:
+ glist = split_quoted_delimited_str(grouping_cols)
+ select_grps = _grp_from_table(sssp_table, glist) + " , "
+ check_grps_t1 = " AND " + _check_groups(
+ sssp_table, temp1_name, glist)
+ check_grps_t2 = " AND " + _check_groups(
+ sssp_table, temp2_name, glist)
+
+ grp_comma = grouping_cols + " , "
+
+ if source_vertex == dest_vertex:
+ plpy.execute("""
+ CREATE TABLE {path_table} AS
+ SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
+ FROM {sssp_table} WHERE {vertex_id} = {dest_vertex}
+ """.format(**locals()))
+ return
+
+ plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+ format(temp1_name, temp2_name))
+ out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
+ SELECT {grp_comma} {sssp_table}.parent AS {vertex_id},
+ ARRAY[{dest_vertex}] AS path
+ FROM {sssp_table}
+ WHERE {vertex_id} = {dest_vertex}
+ AND {sssp_table}.parent IS NOT NULL
+ """.format(**locals()))
+
+ plpy.execute("""
+ CREATE TEMP TABLE {temp2_name} AS
+ SELECT * FROM {temp1_name} LIMIT 0
+ """.format(**locals()))
+
+ # Follow the 'parent' chain until you reach the source.
+ while out.nrows() > 0:
+
+ plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
+ # If the vertex id is not the source vertex,
+ # Add it to the path and move to its parent
+ out = plpy.execute(
+ """ INSERT INTO {temp2_name}
+ SELECT {select_grps} {sssp_table}.parent AS {vertex_id},
+ {sssp_table}.{vertex_id} || {temp1_name}.path AS path
+ FROM {sssp_table} INNER JOIN {temp1_name} ON
+ ({sssp_table}.{vertex_id} = {temp1_name}.{vertex_id}
+ {check_grps_t1})
+ WHERE {source_vertex} <> {sssp_table}.{vertex_id}
+ """.format(**locals()))
+
+ tmp = temp2_name
+ temp2_name = temp1_name
+ temp1_name = tmp
+
+ tmp = check_grps_t1
+ check_grps_t1 = check_grps_t2
+ check_grps_t2 = tmp
+
+ # Add the source vertex to the beginning of every path and
+ # add the empty arrays for the groups that don't have a path to reach
+ # the destination vertex
+ plpy.execute("""
+ CREATE TABLE {path_table} AS
+ SELECT {grp_comma} {source_vertex} || path AS path
+ FROM {temp2_name}
+ UNION
+ SELECT {grp_comma} '{{}}'::INT[] AS path
+ FROM {sssp_table}
+ WHERE {vertex_id} = {dest_vertex}
+ AND {sssp_table}.parent IS NULL
+ """.format(**locals()))
+
+ out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
+
+ if out.nrows() == 0:
+ plpy.error(
+ "Graph SSSP: Vertex {0} is not present in the SSSP table {1}".
+ format(dest_vertex, sssp_table))
+
+ plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
+ format(**locals()))
+ return None
def _validate_sssp(vertex_table, vertex_id, edge_table, edge_params,
- source_vertex, out_table, glist, **kwargs):
+ source_vertex, out_table, glist, **kwargs):
- validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
- out_table,'SSSP')
+ validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
+ out_table, 'SSSP')
- _assert(isinstance(source_vertex,int),
- """Graph SSSP: Source vertex {source_vertex} has to be an integer.""".
- format(**locals()))
- src_exists = plpy.execute("""
- SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
- """.format(**locals()))
+ _assert(isinstance(source_vertex, int),
+ """Graph SSSP: Source vertex {source_vertex} has to be an integer.""".
+ format(**locals()))
+ src_exists = plpy.execute("""
+ SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
+ """.format(**locals()))
- if src_exists.nrows() == 0:
- plpy.error(
- """Graph SSSP: Source vertex {source_vertex} is not present in the vertex table {vertex_table}.""".
- format(**locals()))
+ if src_exists.nrows() == 0:
+ plpy.error("Graph SSSP: Source vertex {source_vertex} is not present "
+ "in the vertex table {vertex_table}.".format(**locals()))
- vt_error = plpy.execute(
- """ SELECT {vertex_id}
- FROM {vertex_table}
- WHERE {vertex_id} IS NOT NULL
- GROUP BY {vertex_id}
- HAVING count(*) > 1 """.format(**locals()))
+ vt_error = plpy.execute("""
+ SELECT {vertex_id}
+ FROM {vertex_table}
+ WHERE {vertex_id} IS NOT NULL
+ GROUP BY {vertex_id}
+ HAVING count(*) > 1
+ """.format(**locals()))
- if vt_error.nrows() != 0:
- plpy.error(
- """Graph SSSP: Source vertex table {vertex_table} contains duplicate vertex id's.""".
- format(**locals()))
+ if vt_error.nrows() != 0:
+ plpy.error("Graph SSSP: Source vertex table {vertex_table} "
+ "contains duplicate vertex id's.".format(**locals()))
- _assert(not table_exists(out_table+"_summary"),
- "Graph SSSP: Output summary table already exists!")
+ _assert(not table_exists(out_table + "_summary"),
+ "Graph SSSP: Output summary table already exists!")
- if glist is not None:
- _assert(columns_exist_in_table(edge_table, glist),
- """Graph SSSP: Not all columns from {glist} are present in edge table ({edge_table}).""".
- format(**locals()))
+ if glist is not None:
+ _assert(columns_exist_in_table(edge_table, glist),
+ "Graph SSSP: Not all columns from {glist} are present in "
+ "edge table ({edge_table}).".format(**locals()))
+ return None
- return None
def _validate_get_path(sssp_table, dest_vertex, path_table, **kwargs):
- _assert(sssp_table and sssp_table.strip().lower() not in ('null', ''),
- "Graph SSSP: Invalid SSSP table name!")
- _assert(table_exists(sssp_table),
- "Graph SSSP: SSSP table ({0}) is missing!".format(sssp_table))
- _assert(not table_is_empty(sssp_table),
- "Graph SSSP: SSSP table ({0}) is empty!".format(sssp_table))
+ _assert(sssp_table and sssp_table.strip().lower() not in ('null', ''),
+ "Graph SSSP: Invalid SSSP table name!")
+ _assert(table_exists(sssp_table),
+ "Graph SSSP: SSSP table ({0}) is missing!".format(sssp_table))
+ _assert(not table_is_empty(sssp_table),
+ "Graph SSSP: SSSP table ({0}) is empty!".format(sssp_table))
- summary = sssp_table+"_summary"
- _assert(table_exists(summary),
- "Graph SSSP: SSSP summary table ({0}) is missing!".format(summary))
- _assert(not table_is_empty(summary),
- "Graph SSSP: SSSP summary table ({0}) is empty!".format(summary))
+ summary = sssp_table + "_summary"
+ _assert(table_exists(summary),
+ "Graph SSSP: SSSP summary table ({0}) is missing!".format(summary))
+ _assert(not table_is_empty(summary),
+ "Graph SSSP: SSSP summary table ({0}) is empty!".format(summary))
- _assert(not table_exists(path_table),
- "Graph SSSP: Output path table already exists!")
+ _assert(not table_exists(path_table),
+ "Graph SSSP: Output path table already exists!")
+
+ return None
- return None
def graph_sssp_help(schema_madlib, message, **kwargs):
- """
- Help function for graph_sssp and graph_sssp_get_path
-
- Args:
- @param schema_madlib
- @param message: string, Help message string
- @param kwargs
-
- Returns:
- String. Help/usage information
- """
- if not message:
- help_string = """
+ """
+ Help function for graph_sssp and graph_sssp_get_path
+
+ Args:
+ @param schema_madlib
+ @param message: string, Help message string
+ @param kwargs
+
+ Returns:
+ String. Help/usage information
+ """
+ if not message:
+ help_string = """
-----------------------------------------------------------------------
SUMMARY
-----------------------------------------------------------------------
@@ -640,8 +639,8 @@ weights of its constituent edges is minimized.
For more details on function usage:
SELECT {schema_madlib}.graph_sssp('usage')
"""
- elif message.lower() in ['usage', 'help', '?']:
- help_string = """
+ elif message.lower() in ['usage', 'help', '?']:
+ help_string = """
Given a graph and a source vertex, single source shortest path (SSSP)
algorithm finds a path for every vertex such that the sum of the
weights of its constituent edges is minimized.
@@ -651,8 +650,8 @@ weights of its constituent edges is minimized.
To retrieve the path for a specific vertex:
SELECT {schema_madlib}.graph_sssp_get_path(
- sssp_table TEXT, -- Name of the table that contains the SSSP output.
- dest_vertex INT, -- The vertex that will be the destination of the
+ sssp_table TEXT, -- Name of the table that contains the SSSP output.
+ dest_vertex INT, -- The vertex that will be the destination of the
-- desired path.
path_table TEXT -- Name of the output table that contains the path.
);
@@ -679,8 +678,8 @@ every group and has the following columns:
- path (ARRAY) : The shortest path from the source vertex (as specified
in the SSSP execution) to the destination vertex.
"""
- elif message.lower() in ("example", "examples"):
- help_string = """
+ elif message.lower() in ("example", "examples"):
+ help_string = """
----------------------------------------------------------------------------
EXAMPLES
----------------------------------------------------------------------------
@@ -723,12 +722,12 @@ INSERT INTO edge VALUES
-- Compute the SSSP:
DROP TABLE IF EXISTS out;
SELECT madlib.graph_sssp(
- 'vertex', -- Vertex table
- 'id', -- Vertix id column
- 'edge', -- Edge table
- 'src=src, dest=dest, weight=weight', -- Comma delimted string of edge arguments
- 0, -- The source vertex
- 'out' -- Output table of SSSP
+ 'vertex', -- Vertex table
+ 'id', -- Vertix id column
+ 'edge', -- Edge table
+ 'src=src, dest=dest, weight=weight', -- Comma delimted string of edge arguments
+ 0, -- The source vertex
+ 'out' -- Output table of SSSP
);
-- View the SSSP costs for every vertex:
SELECT * FROM out ORDER BY id;
@@ -752,12 +751,14 @@ INSERT INTO edge_gr VALUES
DROP TABLE IF EXISTS out_gr, out_gr_summary;
SELECT graph_sssp('vertex',NULL,'edge_gr',NULL,0,'out_gr','grp');
"""
- else:
- help_string = "No such option. Use {schema_madlib}.graph_sssp()"
-
- return help_string.format(schema_madlib=schema_madlib,
- graph_usage=get_graph_usage(schema_madlib, 'graph_sssp',
- """source_vertex INT, -- The source vertex id for the algorithm to start.
- out_table TEXT, -- Name of the table to store the result of SSSP.
- grouping_cols TEXT -- The list of grouping columns."""))
+ else:
+ help_string = "No such option. Use {schema_madlib}.graph_sssp()"
+
+ common_usage_string = get_graph_usage(
+ schema_madlib, 'graph_sssp',
+ """source_vertex INT, -- The source vertex id for the algorithm to start.
+ out_table TEXT, -- Name of the table to store the result of SSSP.
+ grouping_cols TEXT -- The list of grouping columns.""")
+ return help_string.format(schema_madlib=schema_madlib,
+ graph_usage=common_usage_string)
# ---------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/wcc.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/wcc.py_in b/src/ports/postgres/modules/graph/wcc.py_in
index 02cceeb..1f6a81f 100644
--- a/src/ports/postgres/modules/graph/wcc.py_in
+++ b/src/ports/postgres/modules/graph/wcc.py_in
@@ -31,34 +31,37 @@ import plpy
from utilities.utilities import _assert
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string, split_quoted_delimited_str
-from utilities.validate_args import columns_exist_in_table, get_cols_and_types
-from graph_utils import *
+from utilities.validate_args import columns_exist_in_table
+from utilities.utilities import is_platform_pg, is_platform_hawq
+from graph_utils import validate_graph_coding, get_graph_usage
-m4_changequote(`<!', `!>')
def validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table,
- edge_params, out_table, grouping_cols_list, module_name):
+ edge_params, out_table, grouping_cols_list, module_name):
"""
Function to validate input parameters for wcc
"""
validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
- out_table, module_name)
+ out_table, module_name)
if grouping_cols_list:
# validate the grouping columns. We currently only support grouping_cols
# to be column names in the edge_table, and not expressions!
_assert(columns_exist_in_table(edge_table, grouping_cols_list, schema_madlib),
- "Weakly Connected Components error: One or more grouping columns specified do not exist!")
+ "Weakly Connected Components error: "
+ "One or more grouping columns specified do not exist!")
def prefix_tablename_to_colnames(table, cols_list):
return ' , '.join(["{0}.{1}".format(table, col) for col in cols_list])
+
def get_where_condition(table1, table2, cols_list):
return ' AND '.join(['{0}.{2}={1}.{2}'.format(table1, table2, col)
- for col in cols_list])
+ for col in cols_list])
+
def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
- out_table, grouping_cols, **kwargs):
+ out_table, grouping_cols, **kwargs):
"""
Function that computes the wcc
@@ -78,8 +81,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
plpy.execute('SET client_min_messages TO warning')
params_types = {'src': str, 'dest': str}
default_args = {'src': 'src', 'dest': 'dest'}
- edge_params = extract_keyvalue_params(edge_args,
- params_types, default_args)
+ edge_params = extract_keyvalue_params(edge_args, params_types, default_args)
# populate default values for optional params if null
if vertex_id is None:
@@ -89,7 +91,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
grouping_cols_list = split_quoted_delimited_str(grouping_cols)
validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table,
- edge_params, out_table, grouping_cols_list, 'Weakly Connected Components')
+ edge_params, out_table, grouping_cols_list,
+ 'Weakly Connected Components')
src = edge_params["src"]
dest = edge_params["dest"]
@@ -99,21 +102,22 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
toupdate = unique_string(desp='toupdate')
temp_out_table = unique_string(desp='tempout')
- distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
- <!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
+ distribution = '' if is_platform_pg() else "DISTRIBUTED BY ({0})".format(vertex_id)
subq_prefixed_grouping_cols = ''
comma_toupdate_prefixed_grouping_cols = ''
comma_oldupdate_prefixed_grouping_cols = ''
old_new_update_where_condition = ''
new_to_update_where_condition = ''
edge_to_update_where_condition = ''
- is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+ is_hawq = is_platform_hawq()
INT_MAX = 2147483647
component_id = 'component_id'
+ grouping_cols_comma = '' if not grouping_cols else grouping_cols + ','
+
if grouping_cols:
- distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
- <!"DISTRIBUTED BY ({0},{1})".format(grouping_cols, vertex_id)!>)
+ distribution = ('' if is_platform_pg() else
+ "DISTRIBUTED BY ({0}, {1})".format(grouping_cols, vertex_id))
# Update some variables useful for grouping based query strings
subq = unique_string(desp='subquery')
distinct_grp_table = unique_string(desp='grptable')
@@ -121,18 +125,20 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
CREATE TABLE {distinct_grp_table} AS
SELECT DISTINCT {grouping_cols} FROM {edge_table}
""".format(**locals()))
- comma_toupdate_prefixed_grouping_cols = ', ' + prefix_tablename_to_colnames(toupdate,
- grouping_cols_list)
- comma_oldupdate_prefixed_grouping_cols = ', ' + prefix_tablename_to_colnames(
- oldupdate, grouping_cols_list)
- subq_prefixed_grouping_cols = prefix_tablename_to_colnames(subq,
- grouping_cols_list)
- old_new_update_where_condition = ' AND ' + get_where_condition(
- oldupdate, newupdate, grouping_cols_list)
- new_to_update_where_condition = ' AND ' + get_where_condition(
- newupdate, toupdate, grouping_cols_list)
- edge_to_update_where_condition = ' AND ' + get_where_condition(
- edge_table, toupdate, grouping_cols_list)
+
+ pttc = prefix_tablename_to_colnames
+ gwc = get_where_condition
+
+ comma_toupdate_prefixed_grouping_cols = ', ' + pttc(toupdate, grouping_cols_list)
+ comma_oldupdate_prefixed_grouping_cols = ', ' + pttc(oldupdate, grouping_cols_list)
+ subq_prefixed_grouping_cols = pttc(subq, grouping_cols_list)
+ old_new_update_where_condition = ' AND ' + gwc(oldupdate, newupdate, grouping_cols_list)
+ new_to_update_where_condition = ' AND ' + gwc(newupdate, toupdate, grouping_cols_list)
+ edge_to_update_where_condition = ' AND ' + gwc(edge_table, toupdate, grouping_cols_list)
+ join_grouping_cols = gwc(subq, distinct_grp_table, grouping_cols_list)
+ group_by_clause = ('' if not grouping_cols else
+ '{0}, {1}.{2}'.format(subq_prefixed_grouping_cols,
+ subq, vertex_id))
plpy.execute("""
CREATE TABLE {newupdate} AS
SELECT {subq}.{vertex_id},
@@ -148,13 +154,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
ON {join_grouping_cols}
GROUP BY {group_by_clause}
{distribution}
- """.format(select_grouping_cols=','+subq_prefixed_grouping_cols,
- join_grouping_cols=get_where_condition(subq,
- distinct_grp_table, grouping_cols_list),
- group_by_clause='' if not grouping_cols else
- subq_prefixed_grouping_cols+', {0}.{1}'.format(subq, vertex_id),
- select_grouping_cols_clause='' if not grouping_cols else
- grouping_cols+', ', **locals()))
+ """.format(select_grouping_cols=',' + subq_prefixed_grouping_cols,
+ select_grouping_cols_clause=grouping_cols_comma,
+ **locals()))
plpy.execute("""
CREATE TEMP TABLE {message} AS
SELECT {vertex_id},
@@ -162,8 +164,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
{select_grouping_cols_clause}
FROM {newupdate}
{distribution}
- """.format(select_grouping_cols_clause='' if not grouping_cols else
- ', '+grouping_cols, **locals()))
+ """.format(select_grouping_cols_clause=grouping_cols_comma,
+ **locals()))
else:
plpy.execute("""
CREATE TABLE {newupdate} AS
@@ -186,13 +188,14 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
""".format(**locals()))
nodes_to_update = 1
while nodes_to_update > 0:
- # This idea here is simple. Look at all the neighbors of a node, and
- # assign the smallest node id among the neighbors as its component_id.
- # The next table starts off with very high component_id (INT_MAX). The
- # component_id of all nodes which obtain a smaller component_id after
- # looking at its neighbors are updated in the next table. At every
- # iteration update only those nodes whose component_id in the previous
- # iteration are greater than what was found in the current iteration.
+ # Look at all the neighbors of a node, and assign the smallest node id
+ # among the neighbors as its component_id. The next table starts off
+ # with very high component_id (INT_MAX). The component_id of all nodes
+ # which obtain a smaller component_id after looking at its neighbors are
+ # updated in the next table. At every iteration update only those nodes
+ # whose component_id in the previous iteration are greater than what was
+ # found in the current iteration.
+
plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
plpy.execute("""
CREATE TEMP TABLE {oldupdate} AS
@@ -202,10 +205,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
FROM {message}
GROUP BY {group_by_clause} {vertex_id}
{distribution}
- """.format(grouping_cols_select='' if not grouping_cols else
- ', {0}'.format(grouping_cols), group_by_clause=''
- if not grouping_cols else '{0}, '.format(grouping_cols),
- **locals()))
+ """.format(grouping_cols_select='' if not grouping_cols else ', {0}'.format(grouping_cols),
+ group_by_clause=grouping_cols_comma,
+ **locals()))
plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate))
plpy.execute("""
@@ -236,8 +238,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
SELECT * FROM {toupdate};
""".format(**locals()))
plpy.execute("DROP TABLE {0}".format(newupdate))
- plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(temp_out_table,
- newupdate))
+ plpy.execute("ALTER TABLE {0} RENAME TO {1}".
+ format(temp_out_table, newupdate))
plpy.execute("""
CREATE TABLE {temp_out_table} AS
SELECT * FROM {newupdate}
@@ -275,9 +277,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
) AS t
GROUP BY {group_by_clause} {vertex_id}
""".format(select_grouping_cols='' if not grouping_cols
- else ', {0}'.format(grouping_cols), group_by_clause=''
- if not grouping_cols else ' {0}, '.format(grouping_cols),
- **locals()))
+ else ', {0}'.format(grouping_cols), group_by_clause=''
+ if not grouping_cols else ' {0}, '.format(grouping_cols),
+ **locals()))
plpy.execute("DROP TABLE {0}".format(oldupdate))
if grouping_cols:
@@ -300,6 +302,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
if is_hawq:
plpy.execute("""DROP TABLE IF EXISTS {0}""".format(temp_out_table))
+
def wcc_help(schema_madlib, message, **kwargs):
"""
Help function for wcc
@@ -315,11 +318,13 @@ def wcc_help(schema_madlib, message, **kwargs):
if message is not None and \
message.lower() in ("usage", "help", "?"):
help_string = "Get from method below"
- help_string = get_graph_usage(schema_madlib, 'Weakly Connected Components',
+ help_string = get_graph_usage(
+ schema_madlib,
+ 'Weakly Connected Components',
"""out_table TEXT, -- Output table of weakly connected components
- grouping_col TEXT -- Comma separated column names to group on
- -- (DEFAULT = NULL, no grouping)
-""")
+ grouping_col TEXT -- Comma separated column names to group on
+ -- (DEFAULT = NULL, no grouping)
+ """)
else:
if message is not None and \
message.lower() in ("example", "examples"):
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/utilities/utilities.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/utilities.py_in b/src/ports/postgres/modules/utilities/utilities.py_in
index 6a5e8f9..b28a5f3 100644
--- a/src/ports/postgres/modules/utilities/utilities.py_in
+++ b/src/ports/postgres/modules/utilities/utilities.py_in
@@ -14,32 +14,43 @@ if __name__ != "__main__":
m4_changequote(`<!', `!>')
+def has_function_properties():
+ """ __HAS_FUNCTION_PROPERTIES__ variable defined during configure """
+ return m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!True!>, <!False!>)
+
+
+def is_platform_pg():
+ """ __POSTGRESQL__ variable defined during configure """
+ return m4_ifdef(<!__POSTGRESQL__!>, <!True!>, <!False!>)
+# ------------------------------------------------------------------------------
+
+
+def is_platform_hawq():
+ """ __HAWQ__ variable defined during configure """
+ return m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+# ------------------------------------------------------------------------------
+
+
def get_seg_number():
""" Find out how many primary segments exist in the distribution
Might be useful for partitioning data.
"""
- m4_ifdef(<!__POSTGRESQL__!>, <!return 1!>, <!
- return plpy.execute(
- """
- SELECT count(*) from gp_segment_configuration
- WHERE role = 'p'
- """)[0]['count']
- !>)
+ if is_platform_pg():
+ return 1
+ else:
+ return plpy.execute("""
+ SELECT count(*) from gp_segment_configuration
+ WHERE role = 'p'
+ """)[0]['count']
# ------------------------------------------------------------------------------
def is_orca():
- m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!
- optimizer = plpy.execute("show optimizer")[0]["optimizer"]
- return True if optimizer == 'on' else False
- !>, <!
+ if has_function_properties():
+ optimizer = plpy.execute("show optimizer")[0]["optimizer"]
+ if optimizer == 'on':
+ return True
return False
- !>)
-# ------------------------------------------------------------------------------
-
-
-def is_platform_pg():
- return m4_ifdef(<!__POSTGRESQL__!>, <!True!>, <!False!>)
# ------------------------------------------------------------------------------