You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ri...@apache.org on 2017/07/20 21:24:26 UTC

[1/3] incubator-madlib git commit: Graph: Update Python code to follow PEP-8

Repository: incubator-madlib
Updated Branches:
  refs/heads/master 8c9b955cd -> d487df3c4


http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/sssp.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/sssp.py_in b/src/ports/postgres/modules/graph/sssp.py_in
index 4839d2d..93497c4 100644
--- a/src/ports/postgres/modules/graph/sssp.py_in
+++ b/src/ports/postgres/modules/graph/sssp.py_in
@@ -33,21 +33,22 @@ from graph_utils import get_graph_usage
 from graph_utils import _grp_from_table
 from graph_utils import _check_groups
 from utilities.control import MinWarning
+
 from utilities.utilities import _assert
 from utilities.utilities import extract_keyvalue_params
 from utilities.utilities import unique_string
 from utilities.utilities import split_quoted_delimited_str
+from utilities.utilities import is_platform_pg, is_platform_hawq
+
 from utilities.validate_args import table_exists
 from utilities.validate_args import columns_exist_in_table
 from utilities.validate_args import table_is_empty
 from utilities.validate_args import get_expr_type
 
-m4_changequote(`<!', `!>')
 
 def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
-		edge_args, source_vertex, out_table, grouping_cols, **kwargs):
-
-	"""
+               edge_args, source_vertex, out_table, grouping_cols, **kwargs):
+    """
     Single source shortest path function for graphs using the Bellman-Ford
     algorhtm [1].
     Args:
@@ -63,383 +64,382 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
     [1] https://en.wikipedia.org/wiki/Bellman-Ford_algorithm
     """
 
-	with MinWarning("warning"):
-
-		INT_MAX = 2147483647
-		INFINITY = "'Infinity'"
-		EPSILON = 0.000001
-
-		message = unique_string(desp='message')
-
-		oldupdate = unique_string(desp='oldupdate')
-		newupdate = unique_string(desp='newupdate')
-
-		params_types = {'src': str, 'dest': str, 'weight': str}
-		default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
-		edge_params = extract_keyvalue_params(edge_args,
-                                            params_types,
-                                            default_args)
-
-		# Prepare the input for recording in the summary table
-		if vertex_id is None:
-			v_st= "NULL"
-			vertex_id = "id"
-		else:
-			v_st = vertex_id
-		if edge_args is None:
-			e_st = "NULL"
-		else:
-			e_st = edge_args
-		if grouping_cols is None:
-			g_st = "NULL"
-			glist = None
-		else:
-			g_st = grouping_cols
-			glist = split_quoted_delimited_str(grouping_cols)
-
-		src = edge_params["src"]
-		dest = edge_params["dest"]
-		weight = edge_params["weight"]
-
-		distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-			<!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
-		local_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-			<!"DISTRIBUTED BY (id)"!>)
-
-		is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
-		_validate_sssp(vertex_table, vertex_id, edge_table,
-			edge_params, source_vertex, out_table, glist)
-
-		plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format(
-			message,oldupdate,newupdate))
-
-		# Initialize grouping related variables
-		comma_grp = ""
-		comma_grp_e = ""
-		comma_grp_m = ""
-		grp_comma = ""
-		checkg_oo = ""
-		checkg_eo = ""
-		checkg_ex = ""
-		checkg_om = ""
-		group_by = ""
-
-		if grouping_cols is not None:
-			comma_grp = " , " + grouping_cols
-			group_by = " , " + _grp_from_table(edge_table,glist)
-			comma_grp_e = " , " + _grp_from_table(edge_table,glist)
-			comma_grp_m = " , " + _grp_from_table("message",glist)
-			grp_comma = grouping_cols + " , "
-
-			checkg_oo_sub = _check_groups(out_table,"oldupdate",glist)
-			checkg_oo = " AND " + checkg_oo_sub
-			checkg_eo = " AND " + _check_groups(edge_table,"oldupdate",glist)
-			checkg_ex = " AND " + _check_groups(edge_table,"x",glist)
-			checkg_om = " AND " + _check_groups("out_table","message",glist)
-
-		w_type = get_expr_type(weight,edge_table).lower()
-		init_w = INT_MAX
-		if w_type in ['real','double precision','float8']:
-			init_w = INFINITY
-
-		# We keep a table of every vertex, the minimum cost to that destination
-		# seen so far and the parent to this vertex in the associated shortest
-		# path. This table will be updated throughout the execution.
-		plpy.execute(
-			""" CREATE TABLE {out_table} AS ( SELECT
-					{grp_comma} {src} AS {vertex_id}, {weight},
-					{src} AS parent FROM {edge_table} LIMIT 0)
-				{distribution} """.format(**locals()))
-
-		# We keep a summary table to keep track of the parameters used for this
-		# SSSP run. This table is used in the path finding function to eliminate
-		# the need for repetition.
-		plpy.execute( """ CREATE TABLE {out_table}_summary  (
-			vertex_table            TEXT,
-			vertex_id               TEXT,
-			edge_table              TEXT,
-			edge_args               TEXT,
-			source_vertex           INTEGER,
-			out_table               TEXT,
-			grouping_cols           TEXT)
-			""".format(**locals()))
-		plpy.execute( """ INSERT INTO {out_table}_summary VALUES
-			('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
-			{source_vertex}, '{out_table}', '{g_st}')
-			""".format(**locals()))
-
-		# We keep 2 update tables and alternate them during the execution.
-		# This is necessary since we need to know which vertices are updated in
-		# the previous iteration to calculate the next set of updates.
-		plpy.execute(
-			""" CREATE TEMP TABLE {oldupdate} AS ( SELECT
-					{src} AS id, {weight},
-					{src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
-				{local_distribution}
-				""".format(**locals()))
-		plpy.execute(
-			""" CREATE TEMP TABLE {newupdate} AS ( SELECT
-					{src} AS id, {weight},
-					{src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
-				{local_distribution}
-				""".format(**locals()))
-
-		# Since HAWQ does not allow us to update, we create a new table and
-		# rename at every iteration.
-		if is_hawq:
-			temp_table = unique_string(desp='temp')
-			sql =""" CREATE TABLE {temp_table} AS ( SELECT * FROM {out_table} )
-				{distribution} """
-			plpy.execute(sql.format(**locals()))
-
-		# GPDB and HAWQ have distributed by clauses to help them with indexing.
-		# For Postgres we add the indices manually.
-		sql_index = m4_ifdef(<!__POSTGRESQL__!>,
-			<!""" CREATE INDEX ON {out_table} ({vertex_id});
-				CREATE INDEX ON {oldupdate} (id);
-				CREATE INDEX ON {newupdate} (id);
-			""".format(**locals())!>,
-			<!''!>)
-		plpy.execute(sql_index)
-
-		# The initialization step is quite different when grouping is involved
-		# since not every group (subgraph) will have the same set of vertices.
-
-		# Example:
-		# Assume there are two grouping columns g1 and g2
-		# g1 values are 0 and 1. g2 values are 5 and 6
-		if grouping_cols is not None:
-
-			distinct_grp_table = unique_string(desp='grp')
-			plpy.execute(""" DROP TABLE IF EXISTS {distinct_grp_table} """.
-				format(**locals()))
-			plpy.execute( """ CREATE TEMP TABLE {distinct_grp_table} AS
-				SELECT DISTINCT {grouping_cols} FROM {edge_table} """.
-				format(**locals()))
-			subq = unique_string(desp='subquery')
-
-			checkg_ds_sub = _check_groups(distinct_grp_table,subq,glist)
-			grp_d_comma = _grp_from_table(distinct_grp_table,glist) +","
-
-			plpy.execute(
-				""" INSERT INTO {out_table}
-				SELECT {grp_d_comma} {vertex_id} AS {vertex_id},
-					{init_w} AS {weight}, NULL::INT AS parent
-				FROM {distinct_grp_table} INNER JOIN
-					(
-					SELECT {src} AS {vertex_id} {comma_grp}
-					FROM {edge_table}
-					UNION
-					SELECT {dest} AS {vertex_id} {comma_grp}
-					FROM {edge_table}
-					) {subq} ON ({checkg_ds_sub})
-				WHERE {vertex_id} IS NOT NULL
-				""".format(**locals()))
-
-			plpy.execute(
-				""" INSERT INTO {oldupdate}
-					SELECT {source_vertex}, 0, {source_vertex},
-					{grouping_cols}
-					FROM {distinct_grp_table}
-				""".format(**locals()))
-
-			# The maximum number of vertices for any group.
-			# Used for determining negative cycles.
-			v_cnt = plpy.execute(
-				""" SELECT max(count) as max FROM (
-						SELECT count({vertex_id}) AS count
-						FROM {out_table}
-						GROUP BY {grouping_cols}) x
-				""".format(**locals()))[0]['max']
-			plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
-		else:
-			plpy.execute(
-				""" INSERT INTO {out_table}
-				SELECT {vertex_id} AS {vertex_id},
-					{init_w} AS {weight},
-					NULL AS parent
-				FROM {vertex_table}
-				WHERE {vertex_id} IS NOT NULL
-				 """.format(**locals()))
-
-			# The source can be reached with 0 cost and it has itself as the
-			# parent.
-			plpy.execute(
-				""" INSERT INTO {oldupdate}
-					VALUES({source_vertex},0,{source_vertex})
-				""".format(**locals()))
-
-			v_cnt = plpy.execute(
-				""" SELECT count(*) FROM {vertex_table}
-				WHERE {vertex_id} IS NOT NULL
-				""".format(**locals()))[0]['count']
-
-		for i in range(0,v_cnt+1):
-
-			# Apply the updates calculated in the last iteration.
-			if is_hawq:
-				sql = """
-				TRUNCATE TABLE {temp_table};
-				INSERT INTO {temp_table}
-					SELECT *
-					FROM {out_table}
-					WHERE NOT EXISTS (
-						SELECT 1
-						FROM {oldupdate} as oldupdate
-						WHERE {out_table}.{vertex_id} = oldupdate.id
-						{checkg_oo})
-					UNION
-					SELECT {grp_comma} id, {weight}, parent FROM {oldupdate};
-				"""
-				plpy.execute(sql.format(**locals()))
-				plpy.execute("DROP TABLE {0}".format(out_table))
-				plpy.execute("ALTER TABLE {0} RENAME TO {1}".
-					format(temp_table,out_table))
-				sql = """ CREATE TABLE {temp_table} AS (
-					SELECT * FROM {out_table} LIMIT 0)
-					{distribution};"""
-				plpy.execute(sql.format(**locals()))
-				ret = plpy.execute("SELECT id FROM {0} LIMIT 1".
-					format(oldupdate))
-			else:
-				sql = """
-				UPDATE {out_table} SET
-				{weight}=oldupdate.{weight},
-				parent=oldupdate.parent
-				FROM
-				{oldupdate} AS oldupdate
-				WHERE
-				{out_table}.{vertex_id}=oldupdate.id AND
-				{out_table}.{weight}>oldupdate.{weight} {checkg_oo}
-				"""
-				ret = plpy.execute(sql.format(**locals()))
-
-			if ret.nrows() == 0:
-				break
-
-			plpy.execute("TRUNCATE TABLE {0}".format(newupdate))
-
-			# 'oldupdate' table has the update info from the last iteration
-
-			# Consider every edge that has an updated source
-			# From these edges:
-			# For every destination vertex, find the min total cost to reach.
-			# Note that, just calling an aggregate function with group by won't
-			# let us store the src field of the edge (needed for the parent).
-			# This is why we need the 'x'; it gives a list of destinations and
-			# associated min values. Using these values, we identify which edge
-			# is selected.
-
-			# Since using '=' with floats is dangerous we use an epsilon value
-			# for comparison.
-
-			# Once we have a list of edges and values (stores as 'message'),
-			# we check if these values are lower than the existing shortest
-			# path values.
-
-			sql = (""" INSERT INTO {newupdate}
-				SELECT DISTINCT ON (message.id {comma_grp})
-					message.id AS id,
-					message.{weight} AS {weight},
-					message.parent AS parent {comma_grp_m}
-				FROM {out_table} AS out_table INNER JOIN
-					(
-					SELECT {edge_table}.{dest} AS id, x.{weight} AS {weight},
-						oldupdate.id AS parent {comma_grp_e}
-					FROM {oldupdate} AS oldupdate INNER JOIN
-						{edge_table}  ON
-							({edge_table}.{src} = oldupdate.id {checkg_eo})
-						INNER JOIN
-						(
-						SELECT {edge_table}.{dest} AS id,
-							min(oldupdate.{weight} +
-								{edge_table}.{weight}) AS {weight} {comma_grp_e}
-						FROM {oldupdate} AS oldupdate INNER JOIN
-							{edge_table}  ON
-							({edge_table}.{src}=oldupdate.id {checkg_eo})
-						GROUP BY {edge_table}.{dest} {comma_grp_e}
-						) x
-						ON ({edge_table}.{dest} = x.id {checkg_ex} )
-					WHERE ABS(oldupdate.{weight} + {edge_table}.{weight}
-								- x.{weight}) < {EPSILON}
-					) message
-					ON (message.id = out_table.{vertex_id} {checkg_om})
-				WHERE message.{weight}<out_table.{weight}
-				""".format(**locals()))
-
-			plpy.execute(sql)
-
-			# Swap the update tables for the next iteration.
-			tmp = oldupdate
-			oldupdate = newupdate
-			newupdate = tmp
-
-		plpy.execute("DROP TABLE IF EXISTS {0}".format(newupdate))
-		# The algorithm should converge in less than |V| iterations.
-		# Otherwise there is a negative cycle in the graph.
-		if i == v_cnt:
-			if grouping_cols is None:
-				plpy.execute("DROP TABLE IF EXISTS {0},{1},{2}".
-					format(out_table, out_table+"_summary", oldupdate))
-				if is_hawq:
-					plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_table))
-				plpy.error("Graph SSSP: Detected a negative cycle in the graph.")
-
-			# It is possible that not all groups has negative cycles.
-			else:
-
-				# negs is the string created by collating grouping columns.
-				# By looking at the oldupdate table we can see which groups
-				# are in a negative cycle.
-
-				negs = plpy.execute(
-					""" SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
-						FROM {oldupdate}
-					""".format(**locals()))[0]['grp']
-
-				# Delete the groups with negative cycles from the output table.
-				if is_hawq:
-					sql_del = """
-						TRUNCATE TABLE {temp_table};
-						INSERT INTO {temp_table}
-							SELECT *
-							FROM {out_table}
-							WHERE NOT EXISTS(
-								SELECT 1
-								FROM {oldupdate} as oldupdate
-								WHERE {checkg_oo_sub}
-								);"""
-					plpy.execute(sql_del.format(**locals()))
-					plpy.execute("DROP TABLE {0}".format(out_table))
-					plpy.execute("ALTER TABLE {0} RENAME TO {1}".
-						format(temp_table,out_table))
-				else:
-					sql_del = """ DELETE FROM {out_table}
-						USING {oldupdate} AS oldupdate
-						WHERE {checkg_oo_sub}"""
-					plpy.execute(sql_del.format(**locals()))
-
-				# If every group has a negative cycle,
-				# drop the output table as well.
-				if table_is_empty(out_table):
-					plpy.execute("DROP TABLE IF EXISTS {0},{1}".
-						format(out_table,out_table+"_summary"))
-
-				plpy.warning(
-					"""Graph SSSP: Detected a negative cycle in the """ +
-					"""sub-graphs of following groups: {0}.""".
-					format(str(negs)[1:-1]))
-
-		plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
-		if is_hawq:
-			plpy.execute("DROP TABLE IF EXISTS {temp_table} ".
-				format(**locals()))
-
-	return None
+    with MinWarning("warning"):
+
+        INT_MAX = 2147483647
+        INFINITY = "'Infinity'"
+        EPSILON = 0.000001
+
+        message = unique_string(desp='message')
+
+        oldupdate = unique_string(desp='oldupdate')
+        newupdate = unique_string(desp='newupdate')
+
+        params_types = {'src': str, 'dest': str, 'weight': str}
+        default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
+        edge_params = extract_keyvalue_params(edge_args,
+                                              params_types,
+                                              default_args)
+
+        # Prepare the input for recording in the summary table
+        if vertex_id is None:
+            v_st = "NULL"
+            vertex_id = "id"
+        else:
+            v_st = vertex_id
+        if edge_args is None:
+            e_st = "NULL"
+        else:
+            e_st = edge_args
+        if grouping_cols is None:
+            g_st = "NULL"
+            glist = None
+        else:
+            g_st = grouping_cols
+            glist = split_quoted_delimited_str(grouping_cols)
+
+        src = edge_params["src"]
+        dest = edge_params["dest"]
+        weight = edge_params["weight"]
+
+        distribution = '' if is_platform_pg() else "DISTRIBUTED BY ({0})".format(vertex_id)
+        local_distribution = '' if is_platform_pg() else "DISTRIBUTED BY (id)"
+
+        is_hawq = is_platform_hawq()
+
+        _validate_sssp(vertex_table, vertex_id, edge_table,
+                       edge_params, source_vertex, out_table, glist)
+
+        plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format(
+            message, oldupdate, newupdate))
+
+        # Initialize grouping related variables
+        comma_grp = ""
+        comma_grp_e = ""
+        comma_grp_m = ""
+        grp_comma = ""
+        checkg_oo = ""
+        checkg_eo = ""
+        checkg_ex = ""
+        checkg_om = ""
+        group_by = ""
+
+        if grouping_cols is not None:
+            comma_grp = " , " + grouping_cols
+            group_by = " , " + _grp_from_table(edge_table, glist)
+            comma_grp_e = " , " + _grp_from_table(edge_table, glist)
+            comma_grp_m = " , " + _grp_from_table("message", glist)
+            grp_comma = grouping_cols + " , "
+
+            checkg_oo_sub = _check_groups(out_table, "oldupdate", glist)
+            checkg_oo = " AND " + checkg_oo_sub
+            checkg_eo = " AND " + _check_groups(edge_table, "oldupdate", glist)
+            checkg_ex = " AND " + _check_groups(edge_table, "x", glist)
+            checkg_om = " AND " + _check_groups("out_table", "message", glist)
+
+        w_type = get_expr_type(weight, edge_table).lower()
+        init_w = INT_MAX
+        if w_type in ['real', 'double precision', 'float8']:
+            init_w = INFINITY
+
+        # We keep a table of every vertex, the minimum cost to that destination
+        # seen so far and the parent to this vertex in the associated shortest
+        # path. This table will be updated throughout the execution.
+        plpy.execute(
+            """ CREATE TABLE {out_table} AS (SELECT
+                    {grp_comma} {src} AS {vertex_id}, {weight},
+                    {src} AS parent FROM {edge_table} LIMIT 0)
+                {distribution} """.format(**locals()))
+
+        # We keep a summary table to keep track of the parameters used for this
+        # SSSP run. This table is used in the path finding function to eliminate
+        # the need for repetition.
+        plpy.execute(""" CREATE TABLE {out_table}_summary  (
+            vertex_table            TEXT,
+            vertex_id               TEXT,
+            edge_table              TEXT,
+            edge_args               TEXT,
+            source_vertex           INTEGER,
+            out_table               TEXT,
+            grouping_cols           TEXT)
+            """.format(**locals()))
+        plpy.execute(""" INSERT INTO {out_table}_summary VALUES
+            ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
+            {source_vertex}, '{out_table}', '{g_st}')
+            """.format(**locals()))
+
+        # We keep 2 update tables and alternate them during the execution.
+        # This is necessary since we need to know which vertices are updated in
+        # the previous iteration to calculate the next set of updates.
+        plpy.execute(
+            """ CREATE TEMP TABLE {oldupdate} AS (SELECT
+                    {src} AS id, {weight},
+                    {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
+                {local_distribution}
+                """.format(**locals()))
+        plpy.execute(
+            """ CREATE TEMP TABLE {newupdate} AS (SELECT
+                    {src} AS id, {weight},
+                    {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
+                {local_distribution}
+                """.format(**locals()))
+
+        # Since HAWQ does not allow us to update, we create a new table and
+        # rename at every iteration.
+        if is_hawq:
+            temp_table = unique_string(desp='temp')
+            sql = """ CREATE TABLE {temp_table} AS (SELECT * FROM {out_table} )
+                {distribution} """
+            plpy.execute(sql.format(**locals()))
+
+        # GPDB and HAWQ have distributed by clauses to help them with indexing.
+        # For Postgres we add the indices manually.
+        if is_platform_pg():
+            plpy.execute("""
+                CREATE INDEX ON {out_table} ({vertex_id});
+                CREATE INDEX ON {oldupdate} (id);
+                CREATE INDEX ON {newupdate} (id);
+            """.format(**locals()))
+
+        # The initialization step is quite different when grouping is involved
+        # since not every group (subgraph) will have the same set of vertices.
+
+        # Example:
+        # Assume there are two grouping columns g1 and g2
+        # g1 values are 0 and 1. g2 values are 5 and 6
+        if grouping_cols is not None:
+
+            distinct_grp_table = unique_string(desp='grp')
+            plpy.execute("DROP TABLE IF EXISTS {distinct_grp_table}".
+                         format(**locals()))
+            plpy.execute("""
+                CREATE TEMP TABLE {distinct_grp_table} AS
+                SELECT DISTINCT {grouping_cols} FROM {edge_table}
+                """.format(**locals()))
+            subq = unique_string(desp='subquery')
+
+            checkg_ds_sub = _check_groups(distinct_grp_table, subq, glist)
+            grp_d_comma = _grp_from_table(distinct_grp_table, glist) + ","
+
+            plpy.execute("""
+                INSERT INTO {out_table}
+                SELECT {grp_d_comma} {vertex_id} AS {vertex_id},
+                    {init_w} AS {weight}, NULL::INT AS parent
+                FROM {distinct_grp_table} INNER JOIN
+                    (
+                    SELECT {src} AS {vertex_id} {comma_grp}
+                    FROM {edge_table}
+                    UNION
+                    SELECT {dest} AS {vertex_id} {comma_grp}
+                    FROM {edge_table}
+                    ) {subq} ON ({checkg_ds_sub})
+                WHERE {vertex_id} IS NOT NULL
+                """.format(**locals()))
+
+            plpy.execute("""
+                INSERT INTO {oldupdate}
+                SELECT {source_vertex}, 0, {source_vertex},
+                       {grouping_cols}
+                FROM {distinct_grp_table}
+                """.format(**locals()))
+
+            # The maximum number of vertices for any group.
+            # Used for determining negative cycles.
+            v_cnt = plpy.execute("""
+                SELECT max(count) as max FROM (
+                    SELECT count({vertex_id}) AS count
+                    FROM {out_table}
+                    GROUP BY {grouping_cols}) x
+                """.format(**locals()))[0]['max']
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
+        else:
+            plpy.execute("""
+                INSERT INTO {out_table}
+                SELECT {vertex_id} AS {vertex_id},
+                    {init_w} AS {weight},
+                    NULL AS parent
+                FROM {vertex_table}
+                WHERE {vertex_id} IS NOT NULL
+                """.format(**locals()))
+
+            # The source can be reached with 0 cost and it has itself as the
+            # parent.
+            plpy.execute("""
+                INSERT INTO {oldupdate}
+                VALUES({source_vertex},0,{source_vertex})
+                """.format(**locals()))
+
+            v_cnt = plpy.execute("""
+                SELECT count(*) FROM {vertex_table}
+                WHERE {vertex_id} IS NOT NULL
+                """.format(**locals()))[0]['count']
+
+        for i in range(0, v_cnt + 1):
+
+            # Apply the updates calculated in the last iteration.
+            if is_hawq:
+                sql = """
+                TRUNCATE TABLE {temp_table};
+                INSERT INTO {temp_table}
+                    SELECT *
+                    FROM {out_table}
+                    WHERE NOT EXISTS (
+                        SELECT 1
+                        FROM {oldupdate} as oldupdate
+                        WHERE {out_table}.{vertex_id} = oldupdate.id
+                        {checkg_oo})
+                    UNION
+                    SELECT {grp_comma} id, {weight}, parent FROM {oldupdate};
+                """
+                plpy.execute(sql.format(**locals()))
+                plpy.execute("DROP TABLE {0}".format(out_table))
+                plpy.execute("ALTER TABLE {0} RENAME TO {1}".
+                             format(temp_table, out_table))
+                sql = """ CREATE TABLE {temp_table} AS (
+                    SELECT * FROM {out_table} LIMIT 0)
+                    {distribution};"""
+                plpy.execute(sql.format(**locals()))
+                ret = plpy.execute("SELECT id FROM {0} LIMIT 1".
+                                   format(oldupdate))
+            else:
+                sql = """
+                UPDATE {out_table} SET
+                    {weight} = oldupdate.{weight},
+                    parent = oldupdate.parent
+                FROM
+                    {oldupdate} AS oldupdate
+                WHERE
+                    {out_table}.{vertex_id} = oldupdate.id AND
+                    {out_table}.{weight} > oldupdate.{weight} {checkg_oo}
+                """
+                ret = plpy.execute(sql.format(**locals()))
+
+            if ret.nrows() == 0:
+                break
+
+            plpy.execute("TRUNCATE TABLE {0}".format(newupdate))
+
+            # 'oldupdate' table has the update info from the last iteration
+
+            # Consider every edge that has an updated source
+            # From these edges:
+            # For every destination vertex, find the min total cost to reach.
+            # Note that, just calling an aggregate function with group by won't
+            # let us store the src field of the edge (needed for the parent).
+            # This is why we need the 'x'; it gives a list of destinations and
+            # associated min values. Using these values, we identify which edge
+            # is selected.
+
+            # Since using '=' with floats is dangerous we use an epsilon value
+            # for comparison.
+
+            # Once we have a list of edges and values (stores as 'message'),
+            # we check if these values are lower than the existing shortest
+            # path values.
+
+            sql = (""" INSERT INTO {newupdate}
+                SELECT DISTINCT ON (message.id {comma_grp})
+                    message.id AS id,
+                    message.{weight} AS {weight},
+                    message.parent AS parent {comma_grp_m}
+                FROM {out_table} AS out_table INNER JOIN
+                    (
+                    SELECT {edge_table}.{dest} AS id, x.{weight} AS {weight},
+                        oldupdate.id AS parent {comma_grp_e}
+                    FROM {oldupdate} AS oldupdate INNER JOIN
+                        {edge_table}  ON
+                            ({edge_table}.{src} = oldupdate.id {checkg_eo})
+                        INNER JOIN
+                        (
+                        SELECT {edge_table}.{dest} AS id,
+                            min(oldupdate.{weight} +
+                                {edge_table}.{weight}) AS {weight} {comma_grp_e}
+                        FROM {oldupdate} AS oldupdate INNER JOIN
+                            {edge_table}  ON
+                            ({edge_table}.{src}=oldupdate.id {checkg_eo})
+                        GROUP BY {edge_table}.{dest} {comma_grp_e}
+                        ) x
+                        ON ({edge_table}.{dest} = x.id {checkg_ex} )
+                    WHERE ABS(oldupdate.{weight} + {edge_table}.{weight}
+                                - x.{weight}) < {EPSILON}
+                    ) message
+                    ON (message.id = out_table.{vertex_id} {checkg_om})
+                WHERE message.{weight}<out_table.{weight}
+                """.format(**locals()))
+
+            plpy.execute(sql)
+
+            # Swap the update tables for the next iteration.
+            tmp = oldupdate
+            oldupdate = newupdate
+            newupdate = tmp
+
+        plpy.execute("DROP TABLE IF EXISTS {0}".format(newupdate))
+        # The algorithm should converge in less than |V| iterations.
+        # Otherwise there is a negative cycle in the graph.
+        if i == v_cnt:
+            if grouping_cols is None:
+                plpy.execute("DROP TABLE IF EXISTS {0},{1},{2}".
+                             format(out_table, out_table + "_summary", oldupdate))
+                if is_hawq:
+                    plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_table))
+                plpy.error("Graph SSSP: Detected a negative cycle in the graph.")
+
+            # It is possible that not all groups has negative cycles.
+            else:
+
+                # negs is the string created by collating grouping columns.
+                # By looking at the oldupdate table we can see which groups
+                # are in a negative cycle.
+
+                negs = plpy.execute(
+                    """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
+                        FROM {oldupdate}
+                    """.format(**locals()))[0]['grp']
+
+                # Delete the groups with negative cycles from the output table.
+                if is_hawq:
+                    sql_del = """
+                        TRUNCATE TABLE {temp_table};
+                        INSERT INTO {temp_table}
+                            SELECT *
+                            FROM {out_table}
+                            WHERE NOT EXISTS(
+                                SELECT 1
+                                FROM {oldupdate} as oldupdate
+                                WHERE {checkg_oo_sub}
+                                );"""
+                    plpy.execute(sql_del.format(**locals()))
+                    plpy.execute("DROP TABLE {0}".format(out_table))
+                    plpy.execute("ALTER TABLE {0} RENAME TO {1}".
+                                 format(temp_table, out_table))
+                else:
+                    sql_del = """ DELETE FROM {out_table}
+                        USING {oldupdate} AS oldupdate
+                        WHERE {checkg_oo_sub}"""
+                    plpy.execute(sql_del.format(**locals()))
+
+                # If every group has a negative cycle,
+                # drop the output table as well.
+                if table_is_empty(out_table):
+                    plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+                                 format(out_table, out_table + "_summary"))
+
+                plpy.warning(
+                    """Graph SSSP: Detected a negative cycle in the """ +
+                    """sub-graphs of following groups: {0}.""".
+                    format(str(negs)[1:-1]))
+
+        plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
+        if is_hawq:
+            plpy.execute("DROP TABLE IF EXISTS {temp_table} ".
+                         format(**locals()))
+    return None
+
 
 def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, path_table,
-	**kwargs):
-	"""
+                        **kwargs):
+    """
     Helper function that can be used to get the shortest path for a vertex
     Args:
         @param sssp_table   Name of the table that contains the SSSP output.
@@ -447,188 +447,187 @@ def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, path_table,
                             desired path.
         @param path_table   Name of the output table that contains the path.
 
-	"""
-	with MinWarning("warning"):
-		_validate_get_path(sssp_table, dest_vertex, path_table)
-
-		temp1_name = unique_string(desp='temp1')
-		temp2_name = unique_string(desp='temp2')
-
-		select_grps = ""
-		check_grps_t1 = ""
-		check_grps_t2 = ""
-		grp_comma = ""
-		tmp = ""
-
-		summary = plpy.execute("SELECT * FROM {0}_summary".format(sssp_table))
-		vertex_id = summary[0]['vertex_id']
-		source_vertex = summary[0]['source_vertex']
-
-		if vertex_id == "NULL":
-			vertex_id = "id"
-
-		grouping_cols = summary[0]['grouping_cols']
-		if grouping_cols == "NULL":
-			grouping_cols = None
-
-		if grouping_cols is not None:
-			glist = split_quoted_delimited_str(grouping_cols)
-			select_grps = _grp_from_table(sssp_table,glist) + " , "
-			check_grps_t1 = " AND " + _check_groups(
-				sssp_table,temp1_name,glist)
-			check_grps_t2 = " AND " + _check_groups(
-				sssp_table,temp2_name,glist)
-
-			grp_comma = grouping_cols + " , "
-
-		if source_vertex == dest_vertex:
-			plpy.execute("""
-				CREATE TABLE {path_table} AS
-				SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
-				FROM {sssp_table} WHERE {vertex_id} = {dest_vertex}
-				""".format(**locals()))
-			return
-
-		plpy.execute( "DROP TABLE IF EXISTS {0},{1}".
-			format(temp1_name,temp2_name));
-		out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
-				SELECT {grp_comma} {sssp_table}.parent AS {vertex_id},
-					ARRAY[{dest_vertex}] AS path
-				FROM {sssp_table}
-				WHERE {vertex_id} = {dest_vertex}
-					AND {sssp_table}.parent IS NOT NULL
-			""".format(**locals()))
-
-		plpy.execute("""
-			CREATE TEMP TABLE {temp2_name} AS
-				SELECT * FROM {temp1_name} LIMIT 0
-			""".format(**locals()))
-
-		# Follow the 'parent' chain until you reach the source.
-		while out.nrows() > 0:
-
-			plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
-			# If the vertex id is not the source vertex,
-			# Add it to the path and move to its parent
-			out = plpy.execute(
-				""" INSERT INTO {temp2_name}
-				SELECT {select_grps} {sssp_table}.parent AS {vertex_id},
-					{sssp_table}.{vertex_id} || {temp1_name}.path AS path
-				FROM {sssp_table} INNER JOIN {temp1_name} ON
-					({sssp_table}.{vertex_id} = {temp1_name}.{vertex_id}
-						{check_grps_t1})
-				WHERE {source_vertex} <> {sssp_table}.{vertex_id}
-				""".format(**locals()))
-
-			tmp = temp2_name
-			temp2_name = temp1_name
-			temp1_name = tmp
-
-			tmp = check_grps_t1
-			check_grps_t1 = check_grps_t2
-			check_grps_t2 = tmp
-
-		# Add the source vertex to the beginning of every path and
-		# add the empty arrays for the groups that don't have a path to reach
-		# the destination vertex
-		plpy.execute("""
-			CREATE TABLE {path_table} AS
-			SELECT {grp_comma} {source_vertex} || path AS path
-			FROM {temp2_name}
-			UNION
-			SELECT {grp_comma} '{{}}'::INT[] AS path
-			FROM {sssp_table}
-			WHERE {vertex_id} = {dest_vertex}
-				AND {sssp_table}.parent IS NULL
-			""".format(**locals()))
-
-		out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
-
-		if out.nrows() == 0:
-			plpy.error(
-				"Graph SSSP: Vertex {0} is not present in the SSSP table {1}".
-				format(dest_vertex,sssp_table))
-
-		plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
-			format(**locals()))
-
-	return None
+    """
+    with MinWarning("warning"):
+        _validate_get_path(sssp_table, dest_vertex, path_table)
+
+        temp1_name = unique_string(desp='temp1')
+        temp2_name = unique_string(desp='temp2')
+
+        select_grps = ""
+        check_grps_t1 = ""
+        check_grps_t2 = ""
+        grp_comma = ""
+        tmp = ""
+
+        summary = plpy.execute("SELECT * FROM {0}_summary".format(sssp_table))
+        vertex_id = summary[0]['vertex_id']
+        source_vertex = summary[0]['source_vertex']
+
+        if vertex_id == "NULL":
+            vertex_id = "id"
+
+        grouping_cols = summary[0]['grouping_cols']
+        if grouping_cols == "NULL":
+            grouping_cols = None
+
+        if grouping_cols is not None:
+            glist = split_quoted_delimited_str(grouping_cols)
+            select_grps = _grp_from_table(sssp_table, glist) + " , "
+            check_grps_t1 = " AND " + _check_groups(
+                sssp_table, temp1_name, glist)
+            check_grps_t2 = " AND " + _check_groups(
+                sssp_table, temp2_name, glist)
+
+            grp_comma = grouping_cols + " , "
+
+        if source_vertex == dest_vertex:
+            plpy.execute("""
+                CREATE TABLE {path_table} AS
+                SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
+                FROM {sssp_table} WHERE {vertex_id} = {dest_vertex}
+                """.format(**locals()))
+            return
+
+        plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+                     format(temp1_name, temp2_name))
+        out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
+                SELECT {grp_comma} {sssp_table}.parent AS {vertex_id},
+                    ARRAY[{dest_vertex}] AS path
+                FROM {sssp_table}
+                WHERE {vertex_id} = {dest_vertex}
+                    AND {sssp_table}.parent IS NOT NULL
+            """.format(**locals()))
+
+        plpy.execute("""
+            CREATE TEMP TABLE {temp2_name} AS
+                SELECT * FROM {temp1_name} LIMIT 0
+            """.format(**locals()))
+
+        # Follow the 'parent' chain until you reach the source.
+        while out.nrows() > 0:
+
+            plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
+            # If the vertex id is not the source vertex,
+            # Add it to the path and move to its parent
+            out = plpy.execute(
+                """ INSERT INTO {temp2_name}
+                SELECT {select_grps} {sssp_table}.parent AS {vertex_id},
+                    {sssp_table}.{vertex_id} || {temp1_name}.path AS path
+                FROM {sssp_table} INNER JOIN {temp1_name} ON
+                    ({sssp_table}.{vertex_id} = {temp1_name}.{vertex_id}
+                        {check_grps_t1})
+                WHERE {source_vertex} <> {sssp_table}.{vertex_id}
+                """.format(**locals()))
+
+            tmp = temp2_name
+            temp2_name = temp1_name
+            temp1_name = tmp
+
+            tmp = check_grps_t1
+            check_grps_t1 = check_grps_t2
+            check_grps_t2 = tmp
+
+        # Add the source vertex to the beginning of every path and
+        # add the empty arrays for the groups that don't have a path to reach
+        # the destination vertex
+        plpy.execute("""
+            CREATE TABLE {path_table} AS
+            SELECT {grp_comma} {source_vertex} || path AS path
+            FROM {temp2_name}
+            UNION
+            SELECT {grp_comma} '{{}}'::INT[] AS path
+            FROM {sssp_table}
+            WHERE {vertex_id} = {dest_vertex}
+                AND {sssp_table}.parent IS NULL
+            """.format(**locals()))
+
+        out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
+
+        if out.nrows() == 0:
+            plpy.error(
+                "Graph SSSP: Vertex {0} is not present in the SSSP table {1}".
+                format(dest_vertex, sssp_table))
+
+        plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
+                     format(**locals()))
+    return None
 
 
 def _validate_sssp(vertex_table, vertex_id, edge_table, edge_params,
-	source_vertex, out_table, glist, **kwargs):
+                   source_vertex, out_table, glist, **kwargs):
 
-	validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
-		out_table,'SSSP')
+    validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
+                          out_table, 'SSSP')
 
-	_assert(isinstance(source_vertex,int),
-		"""Graph SSSP: Source vertex {source_vertex} has to be an integer.""".
-		format(**locals()))
-	src_exists = plpy.execute("""
-		SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
-		""".format(**locals()))
+    _assert(isinstance(source_vertex, int),
+            """Graph SSSP: Source vertex {source_vertex} has to be an integer.""".
+            format(**locals()))
+    src_exists = plpy.execute("""
+        SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
+        """.format(**locals()))
 
-	if src_exists.nrows() == 0:
-		plpy.error(
-			"""Graph SSSP: Source vertex {source_vertex} is not present in the vertex table {vertex_table}.""".
-			format(**locals()))
+    if src_exists.nrows() == 0:
+        plpy.error("Graph SSSP: Source vertex {source_vertex} is not present "
+                   "in the vertex table {vertex_table}.".format(**locals()))
 
-	vt_error = plpy.execute(
-		""" SELECT {vertex_id}
-			FROM {vertex_table}
-			WHERE {vertex_id} IS NOT NULL
-			GROUP BY {vertex_id}
-			HAVING count(*) > 1 """.format(**locals()))
+    vt_error = plpy.execute("""
+        SELECT {vertex_id}
+        FROM {vertex_table}
+        WHERE {vertex_id} IS NOT NULL
+        GROUP BY {vertex_id}
+        HAVING count(*) > 1
+        """.format(**locals()))
 
-	if vt_error.nrows() != 0:
-		plpy.error(
-			"""Graph SSSP: Source vertex table {vertex_table} contains duplicate vertex id's.""".
-			format(**locals()))
+    if vt_error.nrows() != 0:
+        plpy.error("Graph SSSP: Source vertex table {vertex_table} "
+                   "contains duplicate vertex id's.".format(**locals()))
 
-	_assert(not table_exists(out_table+"_summary"),
-		"Graph SSSP: Output summary table already exists!")
+    _assert(not table_exists(out_table + "_summary"),
+            "Graph SSSP: Output summary table already exists!")
 
-	if glist is not None:
-		_assert(columns_exist_in_table(edge_table, glist),
-			"""Graph SSSP: Not all columns from {glist} are present in edge table ({edge_table}).""".
-			format(**locals()))
+    if glist is not None:
+        _assert(columns_exist_in_table(edge_table, glist),
+                "Graph SSSP: Not all columns from {glist} are present in "
+                "edge table ({edge_table}).".format(**locals()))
+    return None
 
-	return None
 
 def _validate_get_path(sssp_table, dest_vertex, path_table, **kwargs):
 
-	_assert(sssp_table and sssp_table.strip().lower() not in ('null', ''),
-		"Graph SSSP: Invalid SSSP table name!")
-	_assert(table_exists(sssp_table),
-		"Graph SSSP: SSSP table ({0}) is missing!".format(sssp_table))
-	_assert(not table_is_empty(sssp_table),
-		"Graph SSSP: SSSP table ({0}) is empty!".format(sssp_table))
+    _assert(sssp_table and sssp_table.strip().lower() not in ('null', ''),
+            "Graph SSSP: Invalid SSSP table name!")
+    _assert(table_exists(sssp_table),
+            "Graph SSSP: SSSP table ({0}) is missing!".format(sssp_table))
+    _assert(not table_is_empty(sssp_table),
+            "Graph SSSP: SSSP table ({0}) is empty!".format(sssp_table))
 
-	summary = sssp_table+"_summary"
-	_assert(table_exists(summary),
-		"Graph SSSP: SSSP summary table ({0}) is missing!".format(summary))
-	_assert(not table_is_empty(summary),
-		"Graph SSSP: SSSP summary table ({0}) is empty!".format(summary))
+    summary = sssp_table + "_summary"
+    _assert(table_exists(summary),
+            "Graph SSSP: SSSP summary table ({0}) is missing!".format(summary))
+    _assert(not table_is_empty(summary),
+            "Graph SSSP: SSSP summary table ({0}) is empty!".format(summary))
 
-	_assert(not table_exists(path_table),
-		"Graph SSSP: Output path table already exists!")
+    _assert(not table_exists(path_table),
+            "Graph SSSP: Output path table already exists!")
+
+    return None
 
-	return None
 
 def graph_sssp_help(schema_madlib, message, **kwargs):
-	"""
-	Help function for graph_sssp and graph_sssp_get_path
-
-	Args:
-		@param schema_madlib
-		@param message: string, Help message string
-		@param kwargs
-
-	Returns:
-	    String. Help/usage information
-	"""
-	if not message:
-		help_string = """
+    """
+    Help function for graph_sssp and graph_sssp_get_path
+
+    Args:
+        @param schema_madlib
+        @param message: string, Help message string
+        @param kwargs
+
+    Returns:
+        String. Help/usage information
+    """
+    if not message:
+        help_string = """
 -----------------------------------------------------------------------
                             SUMMARY
 -----------------------------------------------------------------------
@@ -640,8 +639,8 @@ weights of its constituent edges is minimized.
 For more details on function usage:
     SELECT {schema_madlib}.graph_sssp('usage')
             """
-	elif message.lower() in ['usage', 'help', '?']:
-		help_string = """
+    elif message.lower() in ['usage', 'help', '?']:
+        help_string = """
 Given a graph and a source vertex, single source shortest path (SSSP)
 algorithm finds a path for every vertex such that the sum of the
 weights of its constituent edges is minimized.
@@ -651,8 +650,8 @@ weights of its constituent edges is minimized.
 To retrieve the path for a specific vertex:
 
  SELECT {schema_madlib}.graph_sssp_get_path(
-    sssp_table	TEXT, -- Name of the table that contains the SSSP output.
-    dest_vertex	INT,  -- The vertex that will be the destination of the
+    sssp_table  TEXT, -- Name of the table that contains the SSSP output.
+    dest_vertex INT,  -- The vertex that will be the destination of the
                       -- desired path.
     path_table  TEXT  -- Name of the output table that contains the path.
 );
@@ -679,8 +678,8 @@ every group and has the following columns:
   - path (ARRAY)  : The shortest path from the source vertex (as specified
                   in the SSSP execution) to the destination vertex.
 """
-	elif message.lower() in ("example", "examples"):
-		help_string = """
+    elif message.lower() in ("example", "examples"):
+        help_string = """
 ----------------------------------------------------------------------------
                                 EXAMPLES
 ----------------------------------------------------------------------------
@@ -723,12 +722,12 @@ INSERT INTO edge VALUES
 -- Compute the SSSP:
 DROP TABLE IF EXISTS out;
 SELECT madlib.graph_sssp(
-	'vertex',                            -- Vertex table
-	'id',                                -- Vertix id column
-	'edge',                              -- Edge table
-	'src=src, dest=dest, weight=weight', -- Comma delimted string of edge arguments
-	 0,                                  -- The source vertex
-	'out'                                -- Output table of SSSP
+    'vertex',                            -- Vertex table
+    'id',                                -- Vertix id column
+    'edge',                              -- Edge table
+    'src=src, dest=dest, weight=weight', -- Comma delimted string of edge arguments
+     0,                                  -- The source vertex
+    'out'                                -- Output table of SSSP
 );
 -- View the SSSP costs for every vertex:
 SELECT * FROM out ORDER BY id;
@@ -752,12 +751,14 @@ INSERT INTO edge_gr VALUES
 DROP TABLE IF EXISTS out_gr, out_gr_summary;
 SELECT graph_sssp('vertex',NULL,'edge_gr',NULL,0,'out_gr','grp');
 """
-	else:
-		help_string = "No such option. Use {schema_madlib}.graph_sssp()"
-
-	return help_string.format(schema_madlib=schema_madlib,
-		graph_usage=get_graph_usage(schema_madlib, 'graph_sssp',
-    """source_vertex INT,  -- The source vertex id for the algorithm to start.
-    out_table     TEXT, -- Name of the table to store the result of SSSP.
-    grouping_cols TEXT  -- The list of grouping columns."""))
+    else:
+        help_string = "No such option. Use {schema_madlib}.graph_sssp()"
+
+    common_usage_string = get_graph_usage(
+        schema_madlib, 'graph_sssp',
+        """source_vertex INT,  -- The source vertex id for the algorithm to start.
+            out_table     TEXT, -- Name of the table to store the result of SSSP.
+            grouping_cols TEXT  -- The list of grouping columns.""")
+    return help_string.format(schema_madlib=schema_madlib,
+                              graph_usage=common_usage_string)
 # ---------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/wcc.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/wcc.py_in b/src/ports/postgres/modules/graph/wcc.py_in
index 02cceeb..1f6a81f 100644
--- a/src/ports/postgres/modules/graph/wcc.py_in
+++ b/src/ports/postgres/modules/graph/wcc.py_in
@@ -31,34 +31,37 @@ import plpy
 from utilities.utilities import _assert
 from utilities.utilities import extract_keyvalue_params
 from utilities.utilities import unique_string, split_quoted_delimited_str
-from utilities.validate_args import columns_exist_in_table, get_cols_and_types
-from graph_utils import *
+from utilities.validate_args import columns_exist_in_table
+from utilities.utilities import is_platform_pg, is_platform_hawq
+from graph_utils import validate_graph_coding, get_graph_usage
 
-m4_changequote(`<!', `!>')
 
 def validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table,
-        edge_params, out_table, grouping_cols_list, module_name):
+                      edge_params, out_table, grouping_cols_list, module_name):
     """
     Function to validate input parameters for wcc
     """
     validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
-        out_table, module_name)
+                          out_table, module_name)
     if grouping_cols_list:
         # validate the grouping columns. We currently only support grouping_cols
         # to be column names in the edge_table, and not expressions!
         _assert(columns_exist_in_table(edge_table, grouping_cols_list, schema_madlib),
-                "Weakly Connected Components error: One or more grouping columns specified do not exist!")
+                "Weakly Connected Components error: "
+                "One or more grouping columns specified do not exist!")
 
 
 def prefix_tablename_to_colnames(table, cols_list):
     return ' , '.join(["{0}.{1}".format(table, col) for col in cols_list])
 
+
 def get_where_condition(table1, table2, cols_list):
     return ' AND '.join(['{0}.{2}={1}.{2}'.format(table1, table2, col)
-            for col in cols_list])
+                        for col in cols_list])
+
 
 def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
-    out_table, grouping_cols, **kwargs):
+        out_table, grouping_cols, **kwargs):
     """
     Function that computes the wcc
 
@@ -78,8 +81,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
     plpy.execute('SET client_min_messages TO warning')
     params_types = {'src': str, 'dest': str}
     default_args = {'src': 'src', 'dest': 'dest'}
-    edge_params = extract_keyvalue_params(edge_args,
-            params_types, default_args)
+    edge_params = extract_keyvalue_params(edge_args, params_types, default_args)
 
     # populate default values for optional params if null
     if vertex_id is None:
@@ -89,7 +91,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
 
     grouping_cols_list = split_quoted_delimited_str(grouping_cols)
     validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table,
-        edge_params, out_table, grouping_cols_list, 'Weakly Connected Components')
+                      edge_params, out_table, grouping_cols_list,
+                      'Weakly Connected Components')
     src = edge_params["src"]
     dest = edge_params["dest"]
 
@@ -99,21 +102,22 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
     toupdate = unique_string(desp='toupdate')
     temp_out_table = unique_string(desp='tempout')
 
-    distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-        <!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
+    distribution = '' if is_platform_pg() else "DISTRIBUTED BY ({0})".format(vertex_id)
     subq_prefixed_grouping_cols = ''
     comma_toupdate_prefixed_grouping_cols = ''
     comma_oldupdate_prefixed_grouping_cols = ''
     old_new_update_where_condition = ''
     new_to_update_where_condition = ''
     edge_to_update_where_condition = ''
-    is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+    is_hawq = is_platform_hawq()
 
     INT_MAX = 2147483647
     component_id = 'component_id'
+    grouping_cols_comma = '' if not grouping_cols else grouping_cols + ','
+
     if grouping_cols:
-        distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-        <!"DISTRIBUTED BY ({0},{1})".format(grouping_cols, vertex_id)!>)
+        distribution = ('' if is_platform_pg() else
+                        "DISTRIBUTED BY ({0}, {1})".format(grouping_cols, vertex_id))
         # Update some variables useful for grouping based query strings
         subq = unique_string(desp='subquery')
         distinct_grp_table = unique_string(desp='grptable')
@@ -121,18 +125,20 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
                 CREATE TABLE {distinct_grp_table} AS
                 SELECT DISTINCT {grouping_cols} FROM {edge_table}
             """.format(**locals()))
-        comma_toupdate_prefixed_grouping_cols = ', ' + prefix_tablename_to_colnames(toupdate,
-            grouping_cols_list)
-        comma_oldupdate_prefixed_grouping_cols = ', ' + prefix_tablename_to_colnames(
-            oldupdate, grouping_cols_list)
-        subq_prefixed_grouping_cols = prefix_tablename_to_colnames(subq,
-            grouping_cols_list)
-        old_new_update_where_condition = ' AND ' + get_where_condition(
-            oldupdate, newupdate, grouping_cols_list)
-        new_to_update_where_condition = ' AND ' + get_where_condition(
-            newupdate, toupdate, grouping_cols_list)
-        edge_to_update_where_condition = ' AND ' + get_where_condition(
-            edge_table, toupdate, grouping_cols_list)
+
+        pttc = prefix_tablename_to_colnames
+        gwc = get_where_condition
+
+        comma_toupdate_prefixed_grouping_cols = ', ' + pttc(toupdate, grouping_cols_list)
+        comma_oldupdate_prefixed_grouping_cols = ', ' + pttc(oldupdate, grouping_cols_list)
+        subq_prefixed_grouping_cols = pttc(subq, grouping_cols_list)
+        old_new_update_where_condition = ' AND ' + gwc(oldupdate, newupdate, grouping_cols_list)
+        new_to_update_where_condition = ' AND ' + gwc(newupdate, toupdate, grouping_cols_list)
+        edge_to_update_where_condition = ' AND ' + gwc(edge_table, toupdate, grouping_cols_list)
+        join_grouping_cols = gwc(subq, distinct_grp_table, grouping_cols_list)
+        group_by_clause = ('' if not grouping_cols else
+                           '{0}, {1}.{2}'.format(subq_prefixed_grouping_cols,
+                                                 subq, vertex_id))
         plpy.execute("""
                 CREATE TABLE {newupdate} AS
                 SELECT {subq}.{vertex_id},
@@ -148,13 +154,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
                 ON {join_grouping_cols}
                 GROUP BY {group_by_clause}
                 {distribution}
-            """.format(select_grouping_cols=','+subq_prefixed_grouping_cols,
-                join_grouping_cols=get_where_condition(subq,
-                    distinct_grp_table, grouping_cols_list),
-                group_by_clause='' if not grouping_cols else
-                    subq_prefixed_grouping_cols+', {0}.{1}'.format(subq, vertex_id),
-                select_grouping_cols_clause='' if not grouping_cols else
-                    grouping_cols+', ', **locals()))
+            """.format(select_grouping_cols=',' + subq_prefixed_grouping_cols,
+                       select_grouping_cols_clause=grouping_cols_comma,
+                       **locals()))
         plpy.execute("""
                 CREATE TEMP TABLE {message} AS
                 SELECT {vertex_id},
@@ -162,8 +164,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
                         {select_grouping_cols_clause}
                 FROM {newupdate}
                 {distribution}
-            """.format(select_grouping_cols_clause='' if not grouping_cols else
-                    ', '+grouping_cols, **locals()))
+            """.format(select_grouping_cols_clause=grouping_cols_comma,
+                       **locals()))
     else:
         plpy.execute("""
                 CREATE TABLE {newupdate} AS
@@ -186,13 +188,14 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
             """.format(**locals()))
     nodes_to_update = 1
     while nodes_to_update > 0:
-        # This idea here is simple. Look at all the neighbors of a node, and
-        # assign the smallest node id among the neighbors as its component_id.
-        # The next table starts off with very high component_id (INT_MAX). The
-        # component_id of all nodes which obtain a smaller component_id after
-        # looking at its neighbors are updated in the next table. At every
-        # iteration update only those nodes whose component_id in the previous
-        # iteration are greater than what was found in the current iteration.
+        # Look at all the neighbors of a node, and assign the smallest node id
+        # among the neighbors as its component_id. The next table starts off
+        # with very high component_id (INT_MAX). The component_id of all nodes
+        # which obtain a smaller component_id after looking at its neighbors are
+        # updated in the next table. At every iteration update only those nodes
+        # whose component_id in the previous iteration are greater than what was
+        # found in the current iteration.
+
         plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
         plpy.execute("""
             CREATE TEMP TABLE {oldupdate} AS
@@ -202,10 +205,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
             FROM {message}
             GROUP BY {group_by_clause} {vertex_id}
             {distribution}
-        """.format(grouping_cols_select='' if not grouping_cols else
-                ', {0}'.format(grouping_cols), group_by_clause=''
-                if not grouping_cols else '{0}, '.format(grouping_cols),
-                **locals()))
+        """.format(grouping_cols_select='' if not grouping_cols else ', {0}'.format(grouping_cols),
+                   group_by_clause=grouping_cols_comma,
+                   **locals()))
 
         plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate))
         plpy.execute("""
@@ -236,8 +238,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
                         SELECT * FROM {toupdate};
                 """.format(**locals()))
             plpy.execute("DROP TABLE {0}".format(newupdate))
-            plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(temp_out_table,
-                            newupdate))
+            plpy.execute("ALTER TABLE {0} RENAME TO {1}".
+                         format(temp_out_table, newupdate))
             plpy.execute("""
                     CREATE TABLE {temp_out_table} AS
                     SELECT * FROM {newupdate}
@@ -275,9 +277,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
             ) AS t
             GROUP BY {group_by_clause} {vertex_id}
         """.format(select_grouping_cols='' if not grouping_cols
-                else ', {0}'.format(grouping_cols), group_by_clause=''
-                if not grouping_cols else ' {0}, '.format(grouping_cols),
-                **locals()))
+                   else ', {0}'.format(grouping_cols), group_by_clause=''
+                   if not grouping_cols else ' {0}, '.format(grouping_cols),
+                   **locals()))
 
         plpy.execute("DROP TABLE {0}".format(oldupdate))
         if grouping_cols:
@@ -300,6 +302,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
     if is_hawq:
         plpy.execute("""DROP TABLE IF EXISTS {0}""".format(temp_out_table))
 
+
 def wcc_help(schema_madlib, message, **kwargs):
     """
     Help function for wcc
@@ -315,11 +318,13 @@ def wcc_help(schema_madlib, message, **kwargs):
     if message is not None and \
             message.lower() in ("usage", "help", "?"):
         help_string = "Get from method below"
-        help_string = get_graph_usage(schema_madlib, 'Weakly Connected Components',
+        help_string = get_graph_usage(
+            schema_madlib,
+            'Weakly Connected Components',
             """out_table     TEXT, -- Output table of weakly connected components
-    grouping_col  TEXT -- Comma separated column names to group on
-                       -- (DEFAULT = NULL, no grouping)
-""")
+            grouping_col  TEXT -- Comma separated column names to group on
+                               -- (DEFAULT = NULL, no grouping)
+            """)
     else:
         if message is not None and \
                 message.lower() in ("example", "examples"):

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/utilities/utilities.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/utilities.py_in b/src/ports/postgres/modules/utilities/utilities.py_in
index 6a5e8f9..b28a5f3 100644
--- a/src/ports/postgres/modules/utilities/utilities.py_in
+++ b/src/ports/postgres/modules/utilities/utilities.py_in
@@ -14,32 +14,43 @@ if __name__ != "__main__":
 m4_changequote(`<!', `!>')
 
 
+def has_function_properties():
+    """ __HAS_FUNCTION_PROPERTIES__ variable defined during configure """
+    return m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!True!>, <!False!>)
+
+
+def is_platform_pg():
+    """ __POSTGRESQL__ variable defined during configure """
+    return m4_ifdef(<!__POSTGRESQL__!>, <!True!>, <!False!>)
+# ------------------------------------------------------------------------------
+
+
+def is_platform_hawq():
+    """ __HAWQ__ variable defined during configure """
+    return m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+# ------------------------------------------------------------------------------
+
+
 def get_seg_number():
     """ Find out how many primary segments exist in the distribution
         Might be useful for partitioning data.
     """
-    m4_ifdef(<!__POSTGRESQL__!>, <!return 1!>, <!
-    return plpy.execute(
-        """
-        SELECT count(*) from gp_segment_configuration
-        WHERE role = 'p'
-        """)[0]['count']
-    !>)
+    if is_platform_pg():
+        return 1
+    else:
+        return plpy.execute("""
+            SELECT count(*) from gp_segment_configuration
+            WHERE role = 'p'
+            """)[0]['count']
 # ------------------------------------------------------------------------------
 
 
 def is_orca():
-    m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!
-    optimizer = plpy.execute("show optimizer")[0]["optimizer"]
-    return True if optimizer == 'on' else False
-    !>, <!
+    if has_function_properties():
+        optimizer = plpy.execute("show optimizer")[0]["optimizer"]
+        if optimizer == 'on':
+            return True
     return False
-    !>)
-# ------------------------------------------------------------------------------
-
-
-def is_platform_pg():
-    return m4_ifdef(<!__POSTGRESQL__!>, <!True!>, <!False!>)
 # ------------------------------------------------------------------------------
 
 


[2/3] incubator-madlib git commit: Graph: Update Python code to follow PEP-8

Posted by ri...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/pagerank.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/pagerank.py_in b/src/ports/postgres/modules/graph/pagerank.py_in
index 4ef5876..95bc4b4 100644
--- a/src/ports/postgres/modules/graph/pagerank.py_in
+++ b/src/ports/postgres/modules/graph/pagerank.py_in
@@ -32,36 +32,38 @@ from utilities.control import MinWarning
 from utilities.utilities import _assert
 from utilities.utilities import extract_keyvalue_params
 from utilities.utilities import unique_string, split_quoted_delimited_str
+from utilities.utilities import is_platform_pg
+
 from utilities.validate_args import columns_exist_in_table, get_cols_and_types
 from graph_utils import *
 
-m4_changequote(`<!', `!>')
 
 def validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table,
-        edge_params, out_table, damping_factor, max_iter, threshold,
-        grouping_cols_list, module_name):
+                           edge_params, out_table, damping_factor, max_iter,
+                           threshold, grouping_cols_list):
     """
     Function to validate input parameters for PageRank
     """
     validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
-        out_table, module_name)
+                          out_table, 'PageRank')
     _assert(damping_factor >= 0.0 and damping_factor <= 1.0,
-        """PageRank: Invalid damping factor value ({0}), must be between 0 and 1.""".
-        format(damping_factor))
+            "PageRank: Invalid damping factor value ({0}), must be between 0 and 1.".
+            format(damping_factor))
     _assert(not threshold or (threshold >= 0.0 and threshold <= 1.0),
-        """PageRank: Invalid threshold value ({0}), must be between 0 and 1.""".
-        format(threshold))
+            "PageRank: Invalid threshold value ({0}), must be between 0 and 1.".
+            format(threshold))
     _assert(max_iter > 0,
-        """PageRank: Invalid max_iter value ({0}), must be a positive integer.""".
-        format(max_iter))
+            """PageRank: Invalid max_iter value ({0}), must be a positive integer.""".
+            format(max_iter))
     if grouping_cols_list:
         # validate the grouping columns. We currently only support grouping_cols
         # to be column names in the edge_table, and not expressions!
         _assert(columns_exist_in_table(edge_table, grouping_cols_list, schema_madlib),
                 "PageRank error: One or more grouping columns specified do not exist!")
 
+
 def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
-    out_table, damping_factor, max_iter, threshold, grouping_cols, **kwargs):
+             out_table, damping_factor, max_iter, threshold, grouping_cols, **kwargs):
     """
     Function that computes the PageRank
 
@@ -76,458 +78,451 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
         @param max_iter
         @param threshold
     """
-    old_msg_level = plpy.execute("""
-                                  SELECT setting
-                                  FROM pg_settings
-                                  WHERE name='client_min_messages'
-                                  """)[0]['setting']
-    plpy.execute('SET client_min_messages TO warning')
-    params_types = {'src': str, 'dest': str}
-    default_args = {'src': 'src', 'dest': 'dest'}
-    edge_params = extract_keyvalue_params(edge_args, params_types, default_args)
-
-    # populate default values for optional params if null
-    if damping_factor is None:
-        damping_factor = 0.85
-    if max_iter is None:
-        max_iter = 100
-    if vertex_id is None:
-        vertex_id = "id"
-    if not grouping_cols:
-        grouping_cols = ''
-
-    grouping_cols_list = split_quoted_delimited_str(grouping_cols)
-    validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table,
-        edge_params, out_table, damping_factor, max_iter, threshold,
-        grouping_cols_list, 'PageRank')
-    summary_table = out_table + "_summary"
-    _assert(not table_exists(summary_table),
-        "Graph PageRank: Output summary table ({summary_table}) already exists."
-        .format(**locals()))
-    src = edge_params["src"]
-    dest = edge_params["dest"]
-    nvertices = plpy.execute("""
-                SELECT COUNT({0}) AS cnt
-                FROM {1}
-            """.format(vertex_id, vertex_table))[0]["cnt"]
-    # A fixed threshold value, of say 1e-5, might not work well when the
-    # number of vertices is a billion, since the initial pagerank value
-    # of all nodes would then be 1/1e-9. So, assign default threshold
-    # value based on number of nodes in the graph.
-    # NOTE: The heuristic below is not based on any scientific evidence.
-    if threshold is None:
-        threshold = 1.0/(nvertices*1000)
-
-    # table/column names used when grouping_cols is set.
-    distinct_grp_table = ''
-    vertices_per_group = ''
-    vpg = ''
-    grouping_where_clause = ''
-    group_by_clause = ''
-    random_prob = ''
-
-    edge_temp_table = unique_string(desp='temp_edge')
-    distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-        <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+','
-            if grouping_cols else '', dest)!>)
-    plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table))
-    plpy.execute("""CREATE TEMP TABLE {edge_temp_table} AS
-        SELECT * FROM {edge_table}
-        {distribution}
-        """.format(**locals()))
-    # GPDB and HAWQ have distributed by clauses to help them with indexing.
-    # For Postgres we add the index explicitly.
-    sql_index = m4_ifdef(<!__POSTGRESQL__!>,
-        <!"""CREATE INDEX ON {edge_temp_table} ({src});
-        """.format(**locals())!>,
-        <!''!>)
-    plpy.execute(sql_index)
-
-    # Intermediate tables required.
-    cur = unique_string(desp='cur')
-    message = unique_string(desp='message')
-    cur_unconv = unique_string(desp='cur_unconv')
-    message_unconv = unique_string(desp='message_unconv')
-    out_cnts = unique_string(desp='out_cnts')
-    out_cnts_cnt = unique_string(desp='cnt')
-    v1 = unique_string(desp='v1')
-
-    cur_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-        <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+','
-            if grouping_cols else '', vertex_id)!>)
-    cnts_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-            <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+','
-            if grouping_cols else '', vertex_id)!>)
-    cur_join_clause = """{edge_temp_table}.{dest}={cur}.{vertex_id}
-        """.format(**locals())
-    out_cnts_join_clause = """{out_cnts}.{vertex_id}={edge_temp_table}.{src}
-        """.format(**locals())
-    v1_join_clause = """{v1}.{vertex_id}={edge_temp_table}.{src}
-        """.format(**locals())
-
-    random_probability = (1.0-damping_factor)/nvertices
-    ######################################################################
-    # Create several strings that will be used to construct required
-    # queries. These strings will be required only during grouping.
-    random_jump_prob = random_probability
-    ignore_group_clause_first = ''
-    limit = ' LIMIT 1 '
-
-    grouping_cols_select_pr = ''
-    vertices_per_group_inner_join_pr = ''
-    ignore_group_clause_pr= ''
-
-    grouping_cols_select_ins = ''
-    vpg_from_clause_ins = ''
-    vpg_where_clause_ins = ''
-    message_grp_where_ins = ''
-    ignore_group_clause_ins = ''
-
-    nodes_with_no_incoming_edges = unique_string(desp='no_incoming')
-    ignore_group_clause_ins_noincoming = ''
-
-    grouping_cols_select_conv = '{0}.{1}'.format(cur, vertex_id)
-    group_by_grouping_cols_conv = ''
-    message_grp_clause_conv = ''
-    ignore_group_clause_conv = ''
-    ######################################################################
-
-    # Queries when groups are involved need a lot more conditions in
-    # various clauses, so populating the required variables. Some intermediate
-    # tables are unnecessary when no grouping is involved, so create some
-    # tables and certain columns only during grouping.
-    if grouping_cols:
-        distinct_grp_table = unique_string(desp='grp')
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
-        plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS
-                SELECT DISTINCT {grouping_cols} FROM {edge_temp_table}
-            """.format(**locals()))
-        vertices_per_group = unique_string(desp='nvert_grp')
-        init_pr = unique_string(desp='init')
-        random_prob = unique_string(desp='rand')
-        subq = unique_string(desp='subquery')
-        rand_damp = 1-damping_factor
-        grouping_where_clause = ' AND '.join(
-            [distinct_grp_table+'.'+col+'='+subq+'.'+col
-            for col in grouping_cols_list])
-        group_by_clause = ', '.join([distinct_grp_table+'.'+col
-            for col in grouping_cols_list])
-        # Find number of vertices in each group, this is the normalizing factor
-        # for computing the random_prob
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(vertices_per_group))
-        plpy.execute("""CREATE TEMP TABLE {vertices_per_group} AS
-                SELECT {distinct_grp_table}.*,
-                1/COUNT(__vertices__)::DOUBLE PRECISION AS {init_pr},
-                {rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION AS {random_prob}
-                FROM {distinct_grp_table} INNER JOIN (
-                    SELECT {grouping_cols}, {src} AS __vertices__
-                    FROM {edge_temp_table}
-                    UNION
-                    SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
-                ){subq}
-                ON {grouping_where_clause}
-                GROUP BY {group_by_clause}
-            """.format(**locals()))
-
-        grouping_where_clause = ' AND '.join(
-            [vertices_per_group+'.'+col+'='+subq+'.'+col
-            for col in grouping_cols_list])
-        group_by_clause = ', '.join([vertices_per_group+'.'+col
-            for col in grouping_cols_list])
+    with MinWarning('warning'):
+        params_types = {'src': str, 'dest': str}
+        default_args = {'src': 'src', 'dest': 'dest'}
+        edge_params = extract_keyvalue_params(edge_args, params_types, default_args)
+
+        # populate default values for optional params if null
+        if damping_factor is None:
+            damping_factor = 0.85
+        if max_iter is None:
+            max_iter = 100
+        if vertex_id is None:
+            vertex_id = "id"
+        if not grouping_cols:
+            grouping_cols = ''
+
+        grouping_cols_list = split_quoted_delimited_str(grouping_cols)
+        validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table,
+                               edge_params, out_table, damping_factor,
+                               max_iter, threshold, grouping_cols_list)
+        summary_table = out_table + "_summary"
+        _assert(not table_exists(summary_table),
+                "Graph PageRank: Output summary table ({summary_table}) already exists."
+                .format(**locals()))
+        src = edge_params["src"]
+        dest = edge_params["dest"]
+        n_vertices = plpy.execute("""
+                    SELECT COUNT({0}) AS cnt
+                    FROM {1}
+                """.format(vertex_id, vertex_table))[0]["cnt"]
+        # A fixed threshold value, of say 1e-5, might not work well when the
+        # number of vertices is a billion, since the initial pagerank value
+        # of all nodes would then be 1/1e-9. So, assign default threshold
+        # value based on number of nodes in the graph.
+        # NOTE: The heuristic below is not based on any scientific evidence.
+        if threshold is None:
+            threshold = 1.0 / (n_vertices * 1000)
+
+        # table/column names used when grouping_cols is set.
+        distinct_grp_table = ''
+        vertices_per_group = ''
+        vpg = ''
+        grouping_where_clause = ''
+        group_by_clause = ''
+        random_prob = ''
+
+        edge_temp_table = unique_string(desp='temp_edge')
+        grouping_cols_comma = grouping_cols + ',' if grouping_cols else ''
+        distribution = ('' if is_platform_pg() else
+                        "DISTRIBUTED BY ({0}{1})".format(grouping_cols_comma, dest))
+        plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table))
         plpy.execute("""
-                CREATE TEMP TABLE {cur} AS
-                SELECT {group_by_clause}, {subq}.__vertices__ as {vertex_id},
-                       {init_pr} AS pagerank
-                FROM {vertices_per_group} INNER JOIN (
-                    SELECT {grouping_cols}, {src} AS __vertices__
-                    FROM {edge_temp_table}
-                    UNION
-                    SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
-                ){subq}
-                ON {grouping_where_clause}
-                {cur_distribution}
-            """.format(**locals()))
-        vpg = unique_string(desp='vpg')
-        # Compute the out-degree of every node in the group-based subgraphs.
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
-        plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
-            SELECT {grouping_cols_select} {src} AS {vertex_id},
-                   COUNT({dest}) AS {out_cnts_cnt}
-            FROM {edge_temp_table}
-            GROUP BY {grouping_cols_select} {src}
-            {cnts_distribution}
-            """.format(grouping_cols_select=grouping_cols+','
-                if grouping_cols else '', **locals()))
-
-        message_grp = ' AND '.join(
-            ["{cur}.{col}={message}.{col}".format(**locals())
-                for col in grouping_cols_list])
-        cur_join_clause = cur_join_clause + ' AND ' + ' AND '.join(
-            ["{edge_temp_table}.{col}={cur}.{col}".format(**locals())
-                for col in grouping_cols_list])
-        out_cnts_join_clause = out_cnts_join_clause + ' AND ' + ' AND '.join(
-            ["{edge_temp_table}.{col}={out_cnts}.{col}".format(**locals())
-                for col in grouping_cols_list])
-        v1_join_clause = v1_join_clause + ' AND ' + ' AND '.join(
-            ["{edge_temp_table}.{col}={v1}.{col}".format(**locals())
-                for col in grouping_cols_list])
-        vpg_join_clause = ' AND '.join(
-            ["{edge_temp_table}.{col}={vpg}.{col}".format(**locals())
-                for col in grouping_cols_list])
-        vpg_t1_join_clause = ' AND '.join(
-            ["__t1__.{col}={vpg}.{col}".format(**locals())
-                for col in grouping_cols_list])
-        # join clause specific to populating random_prob for nodes without any
-        # incoming edges.
-        edge_grouping_cols_select = ', '.join(
-            ["{edge_temp_table}.{col}".format(**locals())
-                for col in grouping_cols_list])
-        cur_grouping_cols_select = ', '.join(
-            ["{cur}.{col}".format(**locals()) for col in grouping_cols_list])
-        # Create output summary table:
-        cols_names_types = get_cols_and_types(edge_table)
-        grouping_cols_clause = ', '.join([c_name+" "+c_type
-            for (c_name, c_type) in cols_names_types
-            if c_name in grouping_cols_list])
-        plpy.execute("""
-                CREATE TABLE {summary_table} (
-                    {grouping_cols_clause},
-                    __iterations__ INTEGER
-                )
-            """.format(**locals()))
-        # Create output table. This will be updated whenever a group converges
-        # Note that vertex_id is assumed to be an integer (as described in
-        # documentation)
-        plpy.execute("""
-                CREATE TABLE {out_table} (
-                    {grouping_cols_clause},
-                    {vertex_id} INTEGER,
-                    pagerank DOUBLE PRECISION
-                )
-            """.format(**locals()))
-        temp_summary_table = unique_string(desp='temp_summary')
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_summary_table))
-        plpy.execute("""
-                CREATE TABLE {temp_summary_table} (
-                    {grouping_cols_clause}
-                )
+            CREATE TEMP TABLE {edge_temp_table} AS
+                SELECT * FROM {edge_table}
+                {distribution}
             """.format(**locals()))
+
+        # GPDB and HAWQ have distributed by clauses to help them with indexing.
+        # For Postgres we add the index explicitly.
+        if is_platform_pg():
+            plpy.execute("CREATE INDEX ON {0}({1})".format(edge_temp_table, src))
+
+        # Intermediate tables required.
+        cur = unique_string(desp='cur')
+        message = unique_string(desp='message')
+        cur_unconv = unique_string(desp='cur_unconv')
+        message_unconv = unique_string(desp='message_unconv')
+        out_cnts = unique_string(desp='out_cnts')
+        out_cnts_cnt = unique_string(desp='cnt')
+        v1 = unique_string(desp='v1')
+
+        if is_platform_pg():
+            cur_distribution = cnts_distribution = ''
+        else:
+            cur_distribution = cnts_distribution = \
+                "DISTRIBUTED BY ({0}{1})".format(grouping_cols_comma, vertex_id)
+        cur_join_clause = "{edge_temp_table}.{dest} = {cur}.{vertex_id}".format(**locals())
+        out_cnts_join_clause = "{out_cnts}.{vertex_id} = {edge_temp_table}.{src}".format(**locals())
+        v1_join_clause = "{v1}.{vertex_id} = {edge_temp_table}.{src}".format(**locals())
+
+        random_probability = (1.0 - damping_factor) / n_vertices
+        ######################################################################
+        # Create several strings that will be used to construct required
+        # queries. These strings will be required only during grouping.
+        random_jump_prob = random_probability
+        ignore_group_clause_first = ''
+        limit = ' LIMIT 1 '
+
+        grouping_cols_select_pr = ''
+        vertices_per_group_inner_join_pr = ''
+        ignore_group_clause_pr = ''
+
+        grouping_cols_select_ins = ''
+        vpg_from_clause_ins = ''
+        vpg_where_clause_ins = ''
+        message_grp_where_ins = ''
+        ignore_group_clause_ins = ''
+
+        nodes_with_no_incoming_edges = unique_string(desp='no_incoming')
+        ignore_group_clause_ins_noincoming = ''
+
+        grouping_cols_select_conv = '{0}.{1}'.format(cur, vertex_id)
+        group_by_grouping_cols_conv = ''
+        message_grp_clause_conv = ''
+        ignore_group_clause_conv = ''
         ######################################################################
-        # Strings required for the main PageRank computation query
-        grouping_cols_select_pr = edge_grouping_cols_select+', '
-        random_jump_prob = 'MIN({vpg}.{random_prob})'.format(**locals())
-        vertices_per_group_inner_join_pr = """INNER JOIN {vertices_per_group}
-            AS {vpg} ON {vpg_join_clause}""".format(**locals())
-        ignore_group_clause_pr=' WHERE '+get_ignore_groups(summary_table,
-            edge_temp_table, grouping_cols_list)
-        ignore_group_clause_ins_noincoming = ' WHERE '+get_ignore_groups(
-            summary_table, nodes_with_no_incoming_edges, grouping_cols_list)
-        # Strings required for updating PageRank scores of vertices that have
-        # no incoming edges
-        grouping_cols_select_ins = cur_grouping_cols_select+','
-        vpg_from_clause_ins = ', {vertices_per_group} AS {vpg}'.format(
-            **locals())
-        vpg_where_clause_ins = ' AND {vpg_t1_join_clause} '.format(
-            **locals())
-        message_grp_where_ins = 'WHERE {message_grp}'.format(**locals())
-        ignore_group_clause_ins = ' AND '+get_ignore_groups(summary_table,
-            cur, grouping_cols_list)
-        # Strings required for convergence test query
-        grouping_cols_select_conv = cur_grouping_cols_select
-        group_by_grouping_cols_conv = ' GROUP BY {0}'.format(
-            cur_grouping_cols_select)
-        message_grp_clause_conv = '{0} AND '.format(message_grp)
-        ignore_group_clause_conv = ' AND '+get_ignore_groups(summary_table,
-            cur, grouping_cols_list)
-        limit = ''
-
-        # Find all nodes, in each group, that have no incoming edges. The PageRank
-        # value of these nodes are not updated using the first query in the
-        # following for loop. They must be explicitly plugged back in to the
-        # message table, with their corresponding group's random_prob as their
-        # PageRank values.
-        plpy.execute("""
-                CREATE TABLE {nodes_with_no_incoming_edges} AS
-                SELECT {select_group_cols}, __t1__.{src} AS {vertex_id},
-                        {vpg}.{random_prob} AS pagerank
-                FROM {edge_temp_table} AS __t1__ {vpg_from_clause_ins}
-                WHERE NOT EXISTS (
-                    SELECT 1
-                    FROM {edge_temp_table} AS __t2__
-                    WHERE __t1__.{src}=__t2__.{dest} AND {where_group_clause}
-                ) {vpg_where_clause_ins}
-            """.format(select_group_cols=','.join(['__t1__.{0}'.format(col)
-                        for col in grouping_cols_list]),
-                    where_group_clause=' AND '.join(['__t1__.{0}=__t2__.{0}'.format(col)
-                        for col in grouping_cols_list]),
-                    **locals()))
-    else:
-        # cur and out_cnts tables can be simpler when no grouping is involved.
-        init_value = 1.0/nvertices
-        plpy.execute("""
-                CREATE TEMP TABLE {cur} AS
-                SELECT {vertex_id}, {init_value}::DOUBLE PRECISION AS pagerank
-                FROM {vertex_table}
-                {cur_distribution}
-            """.format(**locals()))
 
-        # Compute the out-degree of every node in the graph.
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
-        plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
-            SELECT {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt}
-            FROM {edge_temp_table}
-            GROUP BY {src}
-            {cnts_distribution}
-            """.format(**locals()))
-        # The summary table when there is no grouping will contain only
-        # the iteration column. We don't need to create the out_table
-        # when no grouping is used since the 'cur' table will be renamed
-        # to out_table after pagerank computation is completed.
-        plpy.execute("""
-                CREATE TABLE {summary_table} (
-                    __iterations__ INTEGER
-                )
-            """.format(**locals()))
-        # Find all nodes in the graph that don't have any incoming edges and
-        # assign random_prob as their pagerank values.
-        plpy.execute("""
-                CREATE TABLE {nodes_with_no_incoming_edges} AS
-                SELECT DISTINCT({src}), {random_probability} AS pagerank
-                FROM {edge_temp_table}
-                EXCEPT
-                    (SELECT DISTINCT({dest}), {random_probability} AS pagerank
-                    FROM {edge_temp_table})
-            """.format(**locals()))
-    unconverged = 0
-    iteration_num = 0
-    for iteration_num in range(max_iter):
-        #####################################################################
-        # PageRank for node 'A' at any given iteration 'i' is given by:
-        # PR_i(A) = damping_factor(PR_i-1(B)/degree(B) +
-        #           PR_i-1(C)/degree(C) + ...) + (1-damping_factor)/N
-        # where 'N' is the number of vertices in the graph,
-        # B, C are nodes that have edges to node A, and
-        # degree(node) represents the number of outgoing edges from 'node'
-        #####################################################################
-        # Essentially, the pagerank for a node is based on an aggregate of a
-        # fraction of the pagerank values of all the nodes that have incoming
-        # edges to it, along with a small random probability.
-        # More information can be found at:
-        # https://en.wikipedia.org/wiki/PageRank#Damping_factor
-
-        # The query below computes the PageRank of each node using the above
-        # formula. A small explanatory note on ignore_group_clause:
-        # This is used only when grouping is set. This essentially will have
-        # the condition that will help skip the PageRank computation on groups
-        # that have converged.
-        plpy.execute("""
-                CREATE TABLE {message} AS
-                SELECT {grouping_cols_select_pr} {edge_temp_table}.{dest} AS {vertex_id},
-                        SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_jump_prob} AS pagerank
+        # Queries when groups are involved need a lot more conditions in
+        # various clauses, so populating the required variables. Some intermediate
+        # tables are unnecessary when no grouping is involved, so create some
+        # tables and certain columns only during grouping.
+        if grouping_cols:
+            distinct_grp_table = unique_string(desp='grp')
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
+            plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS
+                    SELECT DISTINCT {grouping_cols} FROM {edge_temp_table}
+                """.format(**locals()))
+            vertices_per_group = unique_string(desp='nvert_grp')
+            init_pr = unique_string(desp='init')
+            random_prob = unique_string(desp='rand')
+            subq = unique_string(desp='subquery')
+            rand_damp = 1 - damping_factor
+            grouping_where_clause = ' AND '.join(
+                [distinct_grp_table + '.' + col + '=' + subq + '.' + col
+                 for col in grouping_cols_list])
+            group_by_clause = ', '.join([distinct_grp_table + '.' + col
+                                         for col in grouping_cols_list])
+            # Find number of vertices in each group, this is the normalizing factor
+            # for computing the random_prob
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(vertices_per_group))
+            plpy.execute("""CREATE TEMP TABLE {vertices_per_group} AS
+                    SELECT {distinct_grp_table}.*,
+                    1/COUNT(__vertices__)::DOUBLE PRECISION AS {init_pr},
+                    {rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION AS {random_prob}
+                    FROM {distinct_grp_table} INNER JOIN (
+                        SELECT {grouping_cols}, {src} AS __vertices__
+                        FROM {edge_temp_table}
+                        UNION
+                        SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
+                    ){subq}
+                    ON {grouping_where_clause}
+                    GROUP BY {group_by_clause}
+                """.format(**locals()))
+
+            grouping_where_clause = ' AND '.join(
+                [vertices_per_group + '.' + col + '=' + subq + '.' + col
+                 for col in grouping_cols_list])
+            group_by_clause = ', '.join([vertices_per_group + '.' + col
+                                         for col in grouping_cols_list])
+            plpy.execute("""
+                    CREATE TEMP TABLE {cur} AS
+                    SELECT {group_by_clause}, {subq}.__vertices__ as {vertex_id},
+                           {init_pr} AS pagerank
+                    FROM {vertices_per_group} INNER JOIN (
+                        SELECT {grouping_cols}, {src} AS __vertices__
+                        FROM {edge_temp_table}
+                        UNION
+                        SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
+                    ){subq}
+                    ON {grouping_where_clause}
+                    {cur_distribution}
+                """.format(**locals()))
+            vpg = unique_string(desp='vpg')
+            # Compute the out-degree of every node in the group-based subgraphs.
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
+            plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
+                SELECT {grouping_cols_select} {src} AS {vertex_id},
+                       COUNT({dest}) AS {out_cnts_cnt}
                 FROM {edge_temp_table}
-                    INNER JOIN {cur} ON {cur_join_clause}
-                    INNER JOIN {out_cnts} ON {out_cnts_join_clause}
-                    INNER JOIN {cur} AS {v1} ON {v1_join_clause}
-                    {vertices_per_group_inner_join_pr}
-                {ignore_group_clause}
-                GROUP BY {grouping_cols_select_pr} {edge_temp_table}.{dest}
-                {cur_distribution}
-            """.format(ignore_group_clause=ignore_group_clause_pr
-                    if iteration_num>0 else ignore_group_clause_first,
-                **locals()))
-        # If there are nodes that have no incoming edges, they are not
-        # captured in the message table. Insert entries for such nodes,
-        # with random_prob.
-        plpy.execute("""
-                INSERT INTO {message}
-                SELECT *
-                FROM {nodes_with_no_incoming_edges}
-                {ignore_group_clause}
-            """.format(ignore_group_clause=ignore_group_clause_ins_noincoming
-                    if iteration_num>0 else ignore_group_clause_first,
-                    **locals()))
-        # Check for convergence:
-        ## Check for convergence only if threshold != 0.
-        if threshold != 0:
-            # message_unconv and cur_unconv will contain the unconverged groups
-            # after current # and previous iterations respectively. Groups that
-            # are missing in message_unconv but appear in cur_unconv are the
-            # groups that have converged after this iteration's computations.
-            # If no grouping columns are specified, then we check if there is
-            # at least one unconverged node (limit 1 is used in the query).
+                GROUP BY {grouping_cols_select} {src}
+                {cnts_distribution}
+                """.format(grouping_cols_select=grouping_cols + ','
+                           if grouping_cols else '', **locals()))
+
+            message_grp = ' AND '.join(
+                ["{cur}.{col}={message}.{col}".format(**locals())
+                    for col in grouping_cols_list])
+            cur_join_clause = cur_join_clause + ' AND ' + ' AND '.join(
+                ["{edge_temp_table}.{col}={cur}.{col}".format(**locals())
+                    for col in grouping_cols_list])
+            out_cnts_join_clause = out_cnts_join_clause + ' AND ' + ' AND '.join(
+                ["{edge_temp_table}.{col}={out_cnts}.{col}".format(**locals())
+                    for col in grouping_cols_list])
+            v1_join_clause = v1_join_clause + ' AND ' + ' AND '.join(
+                ["{edge_temp_table}.{col}={v1}.{col}".format(**locals())
+                    for col in grouping_cols_list])
+            vpg_join_clause = ' AND '.join(
+                ["{edge_temp_table}.{col}={vpg}.{col}".format(**locals())
+                    for col in grouping_cols_list])
+            vpg_t1_join_clause = ' AND '.join(
+                ["__t1__.{col}={vpg}.{col}".format(**locals())
+                    for col in grouping_cols_list])
+            # join clause specific to populating random_prob for nodes without any
+            # incoming edges.
+            edge_grouping_cols_select = ', '.join(
+                ["{edge_temp_table}.{col}".format(**locals())
+                    for col in grouping_cols_list])
+            cur_grouping_cols_select = ', '.join(
+                ["{cur}.{col}".format(**locals()) for col in grouping_cols_list])
+            # Create output summary table:
+            cols_names_types = get_cols_and_types(edge_table)
+            grouping_cols_clause = ', '.join([c_name + " " + c_type
+                                              for (c_name, c_type) in cols_names_types
+                                              if c_name in grouping_cols_list])
             plpy.execute("""
-                    CREATE TEMP TABLE {message_unconv} AS
-                    SELECT {grouping_cols_select_conv}
-                    FROM {message}
-                    INNER JOIN {cur}
-                    ON {cur}.{vertex_id}={message}.{vertex_id}
-                    WHERE {message_grp_clause_conv}
-                        ABS({cur}.pagerank-{message}.pagerank) > {threshold}
-                    {ignore_group_clause}
-                    {group_by_grouping_cols_conv}
-                    {limit}
-                """.format(ignore_group_clause=ignore_group_clause_ins
-                        if iteration_num>0 else ignore_group_clause_conv,
-                    **locals()))
-            unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
-                """.format(message_unconv))[0]["cnt"]
-            if iteration_num > 0 and grouping_cols:
-                # Update result and summary tables for groups that have
-                # converged
-                # since the last iteration.
-                update_result_tables(temp_summary_table, iteration_num,
-                    summary_table, out_table, message, grouping_cols_list,
-                    cur_unconv, message_unconv)
-            plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv))
-            plpy.execute("""ALTER TABLE {message_unconv} RENAME TO
-                {cur_unconv} """.format(**locals()))
+                    CREATE TABLE {summary_table} (
+                        {grouping_cols_clause},
+                        __iterations__ INTEGER
+                    )
+                """.format(**locals()))
+            # Create output table. This will be updated whenever a group converges
+            # Note that vertex_id is assumed to be an integer (as described in
+            # documentation)
+            plpy.execute("""
+                    CREATE TABLE {out_table} (
+                        {grouping_cols_clause},
+                        {vertex_id} INTEGER,
+                        pagerank DOUBLE PRECISION
+                    )
+                """.format(**locals()))
+            temp_summary_table = unique_string(desp='temp_summary')
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_summary_table))
+            plpy.execute("""
+                    CREATE TABLE {temp_summary_table} (
+                        {grouping_cols_clause}
+                    )
+                """.format(**locals()))
+            ######################################################################
+            # Strings required for the main PageRank computation query
+            grouping_cols_select_pr = edge_grouping_cols_select + ', '
+            random_jump_prob = 'MIN({vpg}.{random_prob})'.format(**locals())
+            vertices_per_group_inner_join_pr = """INNER JOIN {vertices_per_group}
+                AS {vpg} ON {vpg_join_clause}""".format(**locals())
+            ignore_group_clause_pr = ' WHERE ' + get_ignore_groups(summary_table,
+                                                                   edge_temp_table, grouping_cols_list)
+            ignore_group_clause_ins_noincoming = ' WHERE ' + get_ignore_groups(
+                summary_table, nodes_with_no_incoming_edges, grouping_cols_list)
+            # Strings required for updating PageRank scores of vertices that have
+            # no incoming edges
+            grouping_cols_select_ins = cur_grouping_cols_select + ','
+            vpg_from_clause_ins = ', {vertices_per_group} AS {vpg}'.format(
+                **locals())
+            vpg_where_clause_ins = ' AND {vpg_t1_join_clause} '.format(
+                **locals())
+            message_grp_where_ins = 'WHERE {message_grp}'.format(**locals())
+            ignore_group_clause_ins = ' AND ' + get_ignore_groups(summary_table,
+                                                                  cur, grouping_cols_list)
+            # Strings required for convergence test query
+            grouping_cols_select_conv = cur_grouping_cols_select
+            group_by_grouping_cols_conv = ' GROUP BY {0}'.format(
+                cur_grouping_cols_select)
+            message_grp_clause_conv = '{0} AND '.format(message_grp)
+            ignore_group_clause_conv = ' AND ' + get_ignore_groups(summary_table,
+                                                                   cur, grouping_cols_list)
+            limit = ''
+
+            # Find all nodes, in each group, that have no incoming edges. The PageRank
+            # value of these nodes are not updated using the first query in the
+            # following for loop. They must be explicitly plugged back in to the
+            # message table, with their corresponding group's random_prob as their
+            # PageRank values.
+            plpy.execute("""
+                    CREATE TABLE {nodes_with_no_incoming_edges} AS
+                    SELECT {select_group_cols}, __t1__.{src} AS {vertex_id},
+                            {vpg}.{random_prob} AS pagerank
+                    FROM {edge_temp_table} AS __t1__ {vpg_from_clause_ins}
+                    WHERE NOT EXISTS (
+                        SELECT 1
+                        FROM {edge_temp_table} AS __t2__
+                        WHERE __t1__.{src}=__t2__.{dest} AND {where_group_clause}
+                    ) {vpg_where_clause_ins}
+                """.format(select_group_cols=','.join(['__t1__.{0}'.format(col)
+                                                       for col in grouping_cols_list]),
+                           where_group_clause=' AND '.join(['__t1__.{0}=__t2__.{0}'.format(col)
+                                                            for col in grouping_cols_list]),
+                           **locals()))
         else:
-            # Do not run convergence test if threshold=0, since that implies
-            # the user wants to run max_iter iterations.
-            unconverged = 1
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(cur))
-        plpy.execute("""ALTER TABLE {message} RENAME TO {cur}
+            # cur and out_cnts tables can be simpler when no grouping is involved.
+            init_value = 1.0 / n_vertices
+            plpy.execute("""
+                    CREATE TEMP TABLE {cur} AS
+                    SELECT {vertex_id}, {init_value}::DOUBLE PRECISION AS pagerank
+                    FROM {vertex_table}
+                    {cur_distribution}
                 """.format(**locals()))
-        if unconverged == 0:
-            break
 
-    # If there still are some unconverged groups/(entire table),
-    # update results.
-    if grouping_cols:
-        if unconverged > 0:
+            # Compute the out-degree of every node in the graph.
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
+            plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
+                SELECT {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt}
+                FROM {edge_temp_table}
+                GROUP BY {src}
+                {cnts_distribution}
+                """.format(**locals()))
+            # The summary table when there is no grouping will contain only
+            # the iteration column. We don't need to create the out_table
+            # when no grouping is used since the 'cur' table will be renamed
+            # to out_table after pagerank computation is completed.
+            plpy.execute("""
+                    CREATE TABLE {summary_table} (
+                        __iterations__ INTEGER
+                    )
+                """.format(**locals()))
+            # Find all nodes in the graph that don't have any incoming edges and
+            # assign random_prob as their pagerank values.
+            plpy.execute("""
+                    CREATE TABLE {nodes_with_no_incoming_edges} AS
+                    SELECT DISTINCT({src}), {random_probability} AS pagerank
+                    FROM {edge_temp_table}
+                    EXCEPT
+                        (SELECT DISTINCT({dest}), {random_probability} AS pagerank
+                        FROM {edge_temp_table})
+                """.format(**locals()))
+        unconverged = 0
+        iteration_num = 0
+        for iteration_num in range(max_iter):
+            #####################################################################
+            # PageRank for node 'A' at any given iteration 'i' is given by:
+            # PR_i(A) = damping_factor(PR_i-1(B)/degree(B) +
+            #           PR_i-1(C)/degree(C) + ...) + (1-damping_factor)/N
+            # where 'N' is the number of vertices in the graph,
+            # B, C are nodes that have edges to node A, and
+            # degree(node) represents the number of outgoing edges from 'node'
+            #####################################################################
+            # Essentially, the pagerank for a node is based on an aggregate of a
+            # fraction of the pagerank values of all the nodes that have incoming
+            # edges to it, along with a small random probability.
+            # More information can be found at:
+            # https://en.wikipedia.org/wiki/PageRank#Damping_factor
+
+            # The query below computes the PageRank of each node using the above
+            # formula. A small explanatory note on ignore_group_clause:
+            # This is used only when grouping is set. This essentially will have
+            # the condition that will help skip the PageRank computation on groups
+            # that have converged.
+            plpy.execute("""
+                    CREATE TABLE {message} AS
+                    SELECT {grouping_cols_select_pr} {edge_temp_table}.{dest} AS {vertex_id},
+                            SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_jump_prob} AS pagerank
+                    FROM {edge_temp_table}
+                        INNER JOIN {cur} ON {cur_join_clause}
+                        INNER JOIN {out_cnts} ON {out_cnts_join_clause}
+                        INNER JOIN {cur} AS {v1} ON {v1_join_clause}
+                        {vertices_per_group_inner_join_pr}
+                    {ignore_group_clause}
+                    GROUP BY {grouping_cols_select_pr} {edge_temp_table}.{dest}
+                    {cur_distribution}
+                """.format(ignore_group_clause=ignore_group_clause_pr
+                           if iteration_num > 0 else ignore_group_clause_first,
+                           **locals()))
+            # If there are nodes that have no incoming edges, they are not
+            # captured in the message table. Insert entries for such nodes,
+            # with random_prob.
+            plpy.execute("""
+                    INSERT INTO {message}
+                    SELECT *
+                    FROM {nodes_with_no_incoming_edges}
+                    {ignore_group_clause}
+                """.format(ignore_group_clause=ignore_group_clause_ins_noincoming
+                           if iteration_num > 0 else ignore_group_clause_first,
+                           **locals()))
+            # Check for convergence:
+            # Check for convergence only if threshold != 0.
             if threshold != 0:
-                # We completed max_iters, but there are still some unconverged
-                # groups # Update the result and summary tables for unconverged
-                # groups.
-                update_result_tables(temp_summary_table, iteration_num,
-                    summary_table, out_table, cur, grouping_cols_list,
-                    cur_unconv)
+                # message_unconv and cur_unconv will contain the unconverged groups
+                # after current # and previous iterations respectively. Groups that
+                # are missing in message_unconv but appear in cur_unconv are the
+                # groups that have converged after this iteration's computations.
+                # If no grouping columns are specified, then we check if there is
+                # at least one unconverged node (limit 1 is used in the query).
+                plpy.execute("""
+                        CREATE TEMP TABLE {message_unconv} AS
+                        SELECT {grouping_cols_select_conv}
+                        FROM {message}
+                        INNER JOIN {cur}
+                        ON {cur}.{vertex_id}={message}.{vertex_id}
+                        WHERE {message_grp_clause_conv}
+                            ABS({cur}.pagerank-{message}.pagerank) > {threshold}
+                        {ignore_group_clause}
+                        {group_by_grouping_cols_conv}
+                        {limit}
+                    """.format(ignore_group_clause=ignore_group_clause_ins
+                               if iteration_num > 0 else ignore_group_clause_conv,
+                               **locals()))
+                unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
+                    """.format(message_unconv))[0]["cnt"]
+                if iteration_num > 0 and grouping_cols:
+                    # Update result and summary tables for groups that have
+                    # converged
+                    # since the last iteration.
+                    update_result_tables(temp_summary_table, iteration_num,
+                                         summary_table, out_table, message, grouping_cols_list,
+                                         cur_unconv, message_unconv)
+                plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv))
+                plpy.execute("""ALTER TABLE {message_unconv} RENAME TO
+                    {cur_unconv} """.format(**locals()))
             else:
-                # No group has converged. List of all group values are in
-                # distinct_grp_table.
-                update_result_tables(temp_summary_table, iteration_num,
-                    summary_table, out_table, cur, grouping_cols_list,
-                    distinct_grp_table)
-    else:
-        plpy.execute("""ALTER TABLE {table_name} RENAME TO {out_table}
-            """.format(table_name=cur, **locals()))
-        plpy.execute("""
+                # Do not run convergence test if threshold=0, since that implies
+                # the user wants to run max_iter iterations.
+                unconverged = 1
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(cur))
+            plpy.execute("""ALTER TABLE {message} RENAME TO {cur}
+                    """.format(**locals()))
+            if unconverged == 0:
+                break
+
+        # If there still are some unconverged groups/(entire table),
+        # update results.
+        if grouping_cols:
+            if unconverged > 0:
+                if threshold != 0:
+                    # We completed max_iters, but there are still some unconverged
+                    # groups # Update the result and summary tables for unconverged
+                    # groups.
+                    update_result_tables(temp_summary_table, iteration_num,
+                                         summary_table, out_table, cur, grouping_cols_list,
+                                         cur_unconv)
+                else:
+                    # No group has converged. List of all group values are in
+                    # distinct_grp_table.
+                    update_result_tables(temp_summary_table, iteration_num,
+                                         summary_table, out_table, cur, grouping_cols_list,
+                                         distinct_grp_table)
+        else:
+            plpy.execute("""
+                ALTER TABLE {table_name}
+                RENAME TO {out_table}
+                """.format(table_name=cur, **locals()))
+            plpy.execute("""
                 INSERT INTO {summary_table} VALUES
-                ({iteration_num}+1);
-            """.format(**locals()))
+                ({iteration_num}+1)
+                """.format(**locals()))
+
+        # Step 4: Cleanup
+        plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5},{6}
+            """.format(out_cnts, edge_temp_table, cur, message, cur_unconv,
+                       message_unconv, nodes_with_no_incoming_edges))
+        if grouping_cols:
+            plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2}
+                """.format(vertices_per_group, temp_summary_table,
+                           distinct_grp_table))
 
-    ## Step 4: Cleanup
-    plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5},{6};
-        """.format(out_cnts, edge_temp_table, cur, message, cur_unconv,
-                    message_unconv, nodes_with_no_incoming_edges))
-    if grouping_cols:
-        plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2};
-            """.format(vertices_per_group, temp_summary_table,
-                distinct_grp_table))
-    plpy.execute("SET client_min_messages TO %s" % old_msg_level)
 
 def update_result_tables(temp_summary_table, i, summary_table, out_table,
-    res_table, grouping_cols_list, cur_unconv, message_unconv=None):
+                         res_table, grouping_cols_list, cur_unconv,
+                         message_unconv=None):
     """
         This function updates the summary and output tables only for those
         groups that have converged. This is found out by looking at groups
@@ -556,7 +551,7 @@ def update_result_tables(temp_summary_table, i, summary_table, out_table,
             FROM {cur_unconv}
             WHERE {join_condition}
             """.format(join_condition=get_ignore_groups(
-                message_unconv, cur_unconv, grouping_cols_list), **locals()))
+            message_unconv, cur_unconv, grouping_cols_list), **locals()))
     plpy.execute("""
         INSERT INTO {summary_table}
         SELECT *, {i}+1 AS __iteration__
@@ -569,21 +564,24 @@ def update_result_tables(temp_summary_table, i, summary_table, out_table,
         INNER JOIN {temp_summary_table}
         ON {join_condition}
         """.format(join_condition=' AND '.join(
-                ["{res_table}.{col}={temp_summary_table}.{col}".format(
-                    **locals())
-                for col in grouping_cols_list]), **locals()))
+        ["{res_table}.{col}={temp_summary_table}.{col}".format(
+            **locals())
+         for col in grouping_cols_list]), **locals()))
+
 
 def get_ignore_groups(first_table, second_table, grouping_cols_list):
     """
         This function generates the necessary clause to only select the
         groups that appear in second_table and not in first_table.
     """
-    return """({second_table_cols}) NOT IN (SELECT {grouping_cols} FROM
-    {first_table}) """.format(second_table_cols=', '.join(
-            ["{second_table}.{col}".format(**locals())
-            for col in grouping_cols_list]),
-        grouping_cols=', '.join([col for col in grouping_cols_list]),
-        **locals())
+    second_table_cols = ', '.join(["{0}.{1}".format(second_table, col)
+                                   for col in grouping_cols_list])
+    grouping_cols = ', '.join([col for col in grouping_cols_list])
+    return """({second_table_cols}) NOT IN
+                (SELECT {grouping_cols}
+                 FROM {first_table})
+           """.format(**locals())
+
 
 def pagerank_help(schema_madlib, message, **kwargs):
     """
@@ -601,7 +599,7 @@ def pagerank_help(schema_madlib, message, **kwargs):
             message.lower() in ("usage", "help", "?"):
         help_string = "Get from method below"
         help_string = get_graph_usage(schema_madlib, 'PageRank',
-            """out_table     TEXT, -- Name of the output table for PageRank
+                                      """out_table     TEXT, -- Name of the output table for PageRank
     damping_factor DOUBLE PRECISION, -- Damping factor in random surfer model
                                      -- (DEFAULT = 0.85)
     max_iter      INTEGER, -- Maximum iteration number (DEFAULT = 100)


[3/3] incubator-madlib git commit: Graph: Update Python code to follow PEP-8

Posted by ri...@apache.org.
Graph: Update Python code to follow PEP-8

- Changed indentation to use spaces instead of tabs
- Updated to PEP-8 guidelines
- Updated to follow style guide convention
- Refactored few functions to clean code and design

Closes #148


Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/d487df3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/d487df3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/d487df3c

Branch: refs/heads/master
Commit: d487df3c46f984fad6c15b8a1d3e50dc30f6b6f6
Parents: 8c9b955
Author: Rahul Iyer <ri...@apache.org>
Authored: Fri Jul 7 22:23:18 2017 -0700
Committer: Rahul Iyer <ri...@apache.org>
Committed: Thu Jul 20 14:23:22 2017 -0700

----------------------------------------------------------------------
 src/ports/postgres/modules/graph/apsp.py_in     | 1294 +++++++++---------
 .../postgres/modules/graph/graph_utils.py_in    |  170 +--
 src/ports/postgres/modules/graph/pagerank.py_in |  916 +++++++------
 src/ports/postgres/modules/graph/sssp.py_in     | 1125 +++++++--------
 src/ports/postgres/modules/graph/wcc.py_in      |  119 +-
 .../postgres/modules/utilities/utilities.py_in  |   45 +-
 6 files changed, 1842 insertions(+), 1827 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/apsp.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/apsp.py_in b/src/ports/postgres/modules/graph/apsp.py_in
index 1331301..ab8a566 100644
--- a/src/ports/postgres/modules/graph/apsp.py_in
+++ b/src/ports/postgres/modules/graph/apsp.py_in
@@ -38,17 +38,16 @@ from utilities.utilities import _assert
 from utilities.utilities import extract_keyvalue_params
 from utilities.utilities import unique_string
 from utilities.utilities import split_quoted_delimited_str
-from utilities.validate_args import get_cols
+from utilities.utilities import is_platform_pg, is_platform_hawq
 from utilities.validate_args import table_exists
 from utilities.validate_args import columns_exist_in_table
 from utilities.validate_args import table_is_empty
 from utilities.validate_args import get_expr_type
 
-m4_changequote(`<!', `!>')
 
 def graph_apsp(schema_madlib, vertex_table, vertex_id, edge_table,
-		edge_args, out_table, grouping_cols, **kwargs):
-	"""
+               edge_args, out_table, grouping_cols, **kwargs):
+    """
     All Pairs shortest path function for graphs using the matrix
     multiplication based algorithm [1].
     [1] http://users.cecs.anu.edu.au/~Alistair.Rendell/Teaching/apac_comp3600/module4/all_pairs_shortest_paths.xhtml
@@ -57,655 +56,656 @@ def graph_apsp(schema_madlib, vertex_table, vertex_id, edge_table,
         @param vertex_id        Name of the column containing the vertex ids.
         @param edge_table       Name of the table that contains the edge data.
         @param edge_args        A comma-delimited string containing multiple
-        						named arguments of the form "name=value".
-        @param out_table   	    Name of the table to store the result of APSP.
+                                named arguments of the form "name=value".
+        @param out_table        Name of the table to store the result of APSP.
         @param grouping_cols   The list of grouping columns.
 
     """
+    with MinWarning("warning"):
+
+        INT_MAX = 2147483647
+        INFINITY = "'Infinity'"
+        EPSILON = 0.000001
+
+        params_types = {'src': str, 'dest': str, 'weight': str}
+        default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
+        edge_params = extract_keyvalue_params(edge_args, params_types, default_args)
+
+        # Prepare the input for recording in the summary table
+        if vertex_id is None:
+            v_st = "NULL"
+            vertex_id = "id"
+        else:
+            v_st = vertex_id
+        if edge_args is None:
+            e_st = "NULL"
+        else:
+            e_st = edge_args
+        if grouping_cols is None:
+            g_st = "NULL"
+            glist = None
+        else:
+            g_st = grouping_cols
+            glist = split_quoted_delimited_str(grouping_cols)
+
+        src = edge_params["src"]
+        dest = edge_params["dest"]
+        weight = edge_params["weight"]
+
+        distribution = '' if is_platform_pg() else "DISTRIBUTED BY ({0})".format(src)
+        is_hawq = is_platform_hawq()
+
+        _validate_apsp(vertex_table, vertex_id, edge_table,
+                       edge_params, out_table, glist)
+
+        out_table_1 = unique_string(desp='out_table_1')
+        out_table_2 = unique_string(desp='out_table_2')
+        tmp_view = unique_string(desp='tmp_view')
+        v1 = unique_string(desp='v1')
+        v2 = unique_string(desp='v2')
+        message = unique_string(desp='message')
+
+        # Initialize grouping related variables
+        comma_grp = ""
+        comma_grp_e = ""
+        comma_grp_m = ""
+        grp_comma = ""
+        grp_v1_comma = ""
+        grp_o1_comma = ""
+        grp_o_comma = ""
+        checkg_eo = ""
+        checkg_eout = ""
+        checkg_ex = ""
+        checkg_om = ""
+        checkg_o1t_sub = ""
+        checkg_ot_sub = ""
+        checkg_ot = ""
+        checkg_o1t = ""
+        checkg_vv = ""
+        checkg_o2v = ""
+        checkg_oy = ""
+        checkg_vv_sub = "TRUE"
+        grp_by = ""
+
+        if grouping_cols is not None:
+
+            # We use actual table names in some cases and aliases in others
+            # In some cases, we swap the table names so use of an alias is
+            # necessary. In other cases, they are used to simplify debugging.
+
+            comma_grp = " , " + grouping_cols
+            comma_grp_e = " , " + _grp_from_table("edge", glist)
+            comma_grp_m = " , " + _grp_from_table(message, glist)
+            grp_comma = grouping_cols + " , "
+            grp_v1_comma = _grp_from_table("v1", glist) + " , "
+            grp_o1_comma = _grp_from_table(out_table_1, glist) + " , "
+            grp_o_comma = _grp_from_table("out", glist) + " , "
+
+            checkg_eo = " AND " + _check_groups(edge_table, out_table, glist)
+            checkg_eout = " AND " + _check_groups("edge", "out", glist)
+            checkg_ex = " AND " + _check_groups("edge", "x", glist)
+            checkg_om = " AND " + _check_groups(out_table, message, glist)
+            checkg_o1t_sub = _check_groups("out", tmp_view, glist)
+            checkg_ot_sub = _check_groups(out_table, tmp_view, glist)
+            checkg_ot = " AND " + _check_groups(out_table, tmp_view, glist)
+            checkg_o1t = " AND " + _check_groups("out", "t", glist)
+            checkg_vv = " AND " + _check_groups("v1", "v2", glist)
+            checkg_o2v = " AND " + _check_groups(out_table_2, "v2", glist)
+            checkg_oy = " AND " + _check_groups("out", "y", glist)
+            checkg_vv_sub = _check_groups("v1", "v2", glist)
+            grp_by = " GROUP BY " + grouping_cols
+
+        w_type = get_expr_type(weight, edge_table).lower()
+        init_w = INT_MAX
+        if w_type in ['real', 'double precision', 'float8']:
+            init_w = INFINITY
+
+        # We keep a summary table to keep track of the parameters used for this
+        # APSP run. This table is used in the path finding function to eliminate
+        # the need for repetition.
+        plpy.execute(""" CREATE TABLE {out_table}_summary  (
+            vertex_table            TEXT,
+            vertex_id               TEXT,
+            edge_table              TEXT,
+            edge_args               TEXT,
+            out_table               TEXT,
+            grouping_cols           TEXT)
+            """.format(**locals()))
+        plpy.execute(""" INSERT INTO {out_table}_summary VALUES
+            ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
+            '{out_table}', '{g_st}') """.format(**locals()))
+
+        plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+
+        # Find all of the vertices involved with a given group
+        plpy.execute(""" CREATE VIEW {tmp_view} AS
+            SELECT {src} AS {vertex_id} {comma_grp}
+            FROM {edge_table} WHERE {src} IS NOT NULL
+            UNION
+            SELECT {dest} AS {vertex_id} {comma_grp}
+            FROM {edge_table} WHERE {dest} IS NOT NULL
+            """.format(**locals()))
+
+        # Don't use the unnecessary rows of the output table during joins.
+        ot_sql = """ SELECT * FROM {out_table}
+            WHERE {weight} != {init_w} AND {src} != {dest} """
+
+        # HAWQ does not support UPDATE so the initialization has to differ.
+        if is_hawq:
+
+            plpy.execute(" DROP TABLE IF EXISTS {0},{1}".format(
+                out_table_1, out_table_2))
+            # Create 2 identical tables to swap at every iteration.
+            plpy.execute(""" CREATE TABLE {out_table_1} AS
+                SELECT {grp_comma} {src},{dest},{weight}, NULL::INT AS parent
+                FROM {edge_table} LIMIT 0 {distribution}
+                """.format(**locals()))
+            plpy.execute(""" CREATE TABLE {out_table_2} AS
+                SELECT * FROM {out_table_1} LIMIT 0 {distribution}
+                """.format(**locals()))
+
+            # The source can be reached with 0 cost and next is itself.
+            plpy.execute(""" INSERT INTO {out_table_2}
+                SELECT {grp_comma} {vertex_id} AS {src}, {vertex_id} AS {dest},
+                    0 AS {weight}, {vertex_id} AS parent
+                FROM {tmp_view} """.format(**locals()))
+            # Distance = 1: every edge means there is a path from src to dest
+            plpy.execute(""" INSERT INTO {out_table_2}
+                SELECT {grp_comma} {src}, {dest}, {weight}, {dest} AS parent
+                FROM {edge_table} """.format(**locals()))
+
+            # Fill the rest of the possible pairs with infinite initial weights
+            fill_sql = """ INSERT INTO {out_table_1}
+                SELECT {grp_v1_comma}
+                    v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest},
+                    {init_w} AS {weight}, NULL::INT AS parent
+                FROM {tmp_view} v1, {tmp_view} v2
+                WHERE NOT EXISTS
+                    (SELECT 1 FROM {out_table_2}
+                        WHERE v1.{vertex_id} = {src} AND
+                            v2.{vertex_id} = {dest}
+                            {checkg_vv} {checkg_o2v})
+                    {checkg_vv}
+                UNION
+                SELECT * FROM {out_table_2}
+                """.format(**locals())
+            plpy.execute(fill_sql)
+
+            ot_sql1 = ot_sql.format(out_table=out_table_1, init_w=init_w,
+                                    weight=weight, src=src, dest=dest)
+            ot_sql2 = ot_sql.format(out_table=out_table_2, init_w=init_w,
+                                    weight=weight, src=src, dest=dest)
+
+        # PostgreSQL & GPDB initialization
+        else:
+
+            plpy.execute(""" CREATE TABLE {out_table} AS
+                (SELECT {grp_comma} {src}, {dest}, {weight},
+                    {src} AS parent FROM {edge_table} LIMIT 0)
+                {distribution} """.format(**locals()))
+
+            plpy.execute(""" INSERT INTO {out_table}
+                SELECT {grp_v1_comma}
+                    v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest},
+                    {init_w} AS {weight}, NULL::INT AS parent
+                FROM
+                    {tmp_view} AS v1 INNER JOIN
+                    {tmp_view} AS v2 ON ({checkg_vv_sub})
+                    """.format(**locals()))
+            plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+
+            # GPDB and HAWQ have distributed by clauses to help them with indexing.
+            # For Postgres we add the indices manually.
+            if is_platform_pg():
+                sql_index = "CREATE INDEX ON {0}({1})".format(out_table, src)
+            else:
+                sql_index = ''
+
+            plpy.execute(sql_index)
+
+            # The source can be reached with 0 cost and next is itself.
+            plpy.execute(
+                """ UPDATE {out_table} SET
+                {weight} = 0, parent = {vertex_id}
+                FROM {vertex_table}
+                WHERE {out_table}.{src} = {vertex_id}
+                AND {out_table}.{dest} = {vertex_id}
+                """.format(**locals()))
+
+            # Distance = 1: every edge means there is a path from src to dest
+
+            # There may be multiple edges defined as a->b,
+            # we only need the minimum weighted one.
+
+            plpy.execute(
+                """ CREATE VIEW {tmp_view} AS
+                    SELECT {grp_comma} {src}, {dest},
+                        min({weight}) AS {weight}
+                    FROM {edge_table}
+                    GROUP BY {grp_comma} {src}, {dest}
+                """.format(**locals()))
+            plpy.execute(
+                """ UPDATE {out_table} SET
+                {weight} = {tmp_view}.{weight}, parent = {tmp_view}.{dest}
+                FROM {tmp_view}
+                WHERE {out_table}.{src} = {tmp_view}.{src}
+                AND {out_table}.{dest} = {tmp_view}.{dest}
+                AND {out_table}.{weight} > {tmp_view}.{weight} {checkg_ot}
+                """.format(**locals()))
+            plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+
+            ot_sql1 = ot_sql.format(**locals())
+            out_table_1 = out_table
+
+        # Find the maximum number of iterations to try
+        # If not done by v_cnt iterations, there is a negative cycle.
+        v_cnt = plpy.execute(
+            """ SELECT max(count) as max FROM (
+                    SELECT count(DISTINCT {src}) AS count
+                    FROM {out_table_1}
+                    {grp_by}) x
+            """.format(**locals()))[0]['max']
+
+        if v_cnt < 2:
+            plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+            plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+                         format(out_table, out_table + "_summary"))
+            if is_hawq:
+                plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
+                    out_table_1, out_table_2))
+            if grouping_cols:
+                plpy.error(("Graph APSP: {0} has less than 2 vertices in " +
+                            "every group.").format(edge_table))
+            else:
+                plpy.error("Graph APSP: {0} has less than 2 vertices.".format(
+                    vertex_table))
+
+        for i in range(0, v_cnt + 1):
+
+            """
+            Create a view that will be used to update the output table
+
+            The implementation is based on the matrix multiplication idea.
+            The initial output table consists of 3 sets of vertex pairs.
+            1) for every vervex v: v -> v, weight 0, parent v
+            2) for every edge v1,v2,w: v1 -> v2, weight w, parent v2
+            3) for every other vertex pair v1,v2: v1 -> v2, weight 'Inf',
+                parent NULL
+
+            The algorithm "relaxes" the paths: finds alternate paths with less
+            weights
+            At every step, we look at every combination of non-infinite
+            existing paths and edges to see if we can relax a path.
+
+            Assume the graph is a chain: 1->2->3->...
+            The initial output table will have a finite weighted path for 1->2
+            and infinite for the rest. At ith iteration, the output table will
+            have 1->i path and will relax 1->i+1 path from infinite to a
+            finite value (weight of 1->i path + weight of i->i+1 edge) and
+            assign i as the parent.
+
+            Since using '=' with floats is dangerous we use an epsilon value
+            for comparison.
+            """
+
+            plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+            update_sql = """ CREATE VIEW {tmp_view} AS
+                SELECT DISTINCT ON ({grp_o_comma} y.{src}, y.{dest})
+                    {grp_o_comma} y.{src}, y.{dest},y.{weight}, y.parent
+                FROM  {out_table_1} AS out
+                INNER JOIN
+                    (SELECT x.{src}, x.{dest}, x.{weight},
+                        out.{dest} as parent {comma_grp_e}
+                    FROM
+                        ({ot_sql1}) AS out
+                    INNER JOIN
+                        {edge_table} AS edge
+                    ON (out.{dest} = edge.{src} {checkg_eout})
+                    INNER JOIN
+                        (SELECT out.{src}, edge.{dest},
+                            min(out.{weight}+edge.{weight}) AS {weight}
+                            {comma_grp_e}
+                        FROM
+                            ({ot_sql1}) AS out,
+                            {edge_table} AS edge
+                        WHERE out.{dest} = edge.{src} {checkg_eout}
+                        GROUP BY out.{src},edge.{dest} {comma_grp_e}) x
+                    ON (x.{src} = out.{src} AND x.{dest} = edge.{dest} {checkg_ex})
+                    WHERE ABS(out.{weight}+edge.{weight} - x.{weight})
+                        < {EPSILON}) y
+                ON (y.{src} = out.{src} AND y.{dest} = out.{dest} {checkg_oy})
+                WHERE y.{weight} < out.{weight}
+                """.format(**locals())
+            plpy.execute(update_sql)
+
+            # HAWQ employs alternating tables and has to be handled separately
+            if is_hawq:
+
+                # Stop if therea re no more updates
+                if table_is_empty(tmp_view):
+
+                    plpy.execute("DROP VIEW {0}".format(tmp_view))
+                    plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(
+                        out_table_1, out_table))
+                    break
+
+                # The new output table will still have the old values for
+                # every vertex pair that does not appear on the update view.
+
+                plpy.execute("TRUNCATE TABLE {0}".format(out_table_2))
+                fill_sql = """ INSERT INTO {out_table_2}
+                    SELECT * FROM {out_table_1} AS out WHERE NOT EXISTS
+                        (SELECT 1 FROM {tmp_view} AS t
+                        WHERE t.{src} = out.{src} AND
+                            t.{dest} = out.{dest} {checkg_o1t})
+                    UNION
+                    SELECT * FROM {tmp_view}""".format(**locals())
+                plpy.execute(fill_sql)
+
+                # Swap the table names and the sql command we use for filtering
+                tmpname = out_table_1
+                out_table_1 = out_table_2
+                out_table_2 = tmpname
+
+                tmpname = ot_sql1
+                ot_sql1 = ot_sql2
+                ot_sql2 = tmpname
+
+            else:
+
+                updates = plpy.execute(
+                    """ UPDATE {out_table}
+                        SET {weight} = {tmp_view}.{weight},
+                            parent = {tmp_view}.parent
+                        FROM {tmp_view}
+                        WHERE {tmp_view}.{src} = {out_table}.{src} AND
+                            {tmp_view}.{dest} = {out_table}.{dest} {checkg_ot}
+                    """.format(**locals()))
+                if updates.nrows() == 0:
+                    break
+
+        # The algorithm should have reached a break command by this point.
+        # This check handles the existence of a negative cycle.
+        if i == v_cnt:
+
+            # If there are no groups, clean up and give error.
+            if grouping_cols is None:
+                plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+                plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+                             format(out_table, out_table + "_summary"))
+                if is_hawq:
+                    plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
+                        out_table_1, out_table_2))
+                plpy.error("Graph APSP: Detected a negative cycle in the graph.")
+
+            # It is possible that not all groups has negative cycles.
+            else:
+                # negs is the string created by collating grouping columns.
+                # By looking at the update view, we can see which groups
+                # are in a negative cycle.
+
+                negs = plpy.execute(
+                    """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
+                        FROM {tmp_view}
+                    """.format(**locals()))[0]['grp']
+
+                # Delete the groups with negative cycles from the output table.
+
+                # HAWQ doesn't support DELETE so we have to copy the valid results.
+                if is_hawq:
+                    sql_del = """ CREATE TABLE {out_table} AS
+                        SELECT *
+                        FROM {out_table_1} AS out
+                        WHERE NOT EXISTS(
+                            SELECT 1
+                            FROM {tmp_view}
+                            WHERE {checkg_o1t_sub}
+                            );"""
+                    plpy.execute(sql_del.format(**locals()))
+                    plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+                    plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
+                        out_table_1, out_table_2))
+                else:
+                    sql_del = """ DELETE FROM {out_table}
+                        USING {tmp_view}
+                        WHERE {checkg_ot_sub}"""
+                    plpy.execute(sql_del.format(**locals()))
+
+                plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+
+                # If every group has a negative cycle,
+                # drop the output table as well.
+                if table_is_empty(out_table):
+                    plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+                                 format(out_table, out_table + "_summary"))
+                plpy.warning(
+                    """Graph APSP: Detected a negative cycle in the """ +
+                    """sub-graphs of following groups: {0}.""".
+                    format(str(negs)[1:-1]))
+
+        else:
+            plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+            if is_hawq:
+                plpy.execute("DROP TABLE IF EXISTS {0}".format(out_table_2))
+
+    return None
 
-	with MinWarning("warning"):
-
-		INT_MAX = 2147483647
-		INFINITY = "'Infinity'"
-		EPSILON = 0.000001
-
-		params_types = {'src': str, 'dest': str, 'weight': str}
-		default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
-		edge_params = extract_keyvalue_params(edge_args,
-                                            params_types,
-                                            default_args)
-
-		# Prepare the input for recording in the summary table
-		if vertex_id is None:
-			v_st= "NULL"
-			vertex_id = "id"
-		else:
-			v_st = vertex_id
-		if edge_args is None:
-			e_st = "NULL"
-		else:
-			e_st = edge_args
-		if grouping_cols is None:
-			g_st = "NULL"
-			glist = None
-		else:
-			g_st = grouping_cols
-			glist = split_quoted_delimited_str(grouping_cols)
-
-		src = edge_params["src"]
-		dest = edge_params["dest"]
-		weight = edge_params["weight"]
-
-		distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-			<!"DISTRIBUTED BY ({0})".format(src)!>)
-
-		is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
-
-		_validate_apsp(vertex_table, vertex_id, edge_table,
-			edge_params, out_table, glist)
-
-		out_table_1 = unique_string(desp='out_table_1')
-		out_table_2 = unique_string(desp='out_table_2')
-		tmp_view = unique_string(desp='tmp_view')
-		v1 = unique_string(desp='v1')
-		v2 = unique_string(desp='v2')
-		message = unique_string(desp='message')
-
-		# Initialize grouping related variables
-		comma_grp = ""
-		comma_grp_e = ""
-		comma_grp_m = ""
-		grp_comma = ""
-		grp_v1_comma = ""
-		grp_o1_comma = ""
-		grp_o_comma = ""
-		checkg_eo = ""
-		checkg_eout = ""
-		checkg_ex = ""
-		checkg_om = ""
-		checkg_o1t_sub = ""
-		checkg_ot_sub = ""
-		checkg_ot = ""
-		checkg_o1t = ""
-		checkg_vv = ""
-		checkg_o2v = ""
-		checkg_oy = ""
-		checkg_vv_sub = "TRUE"
-		grp_by = ""
-
-		if grouping_cols is not None:
-
-			# We use actual table names in some cases and aliases in others
-			# In some cases, we swap the table names so use of an alias is
-			# necessary. In other cases, they are used to simplify debugging.
-
-			comma_grp = " , " + grouping_cols
-			comma_grp_e = " , " + _grp_from_table("edge",glist)
-			comma_grp_m = " , " + _grp_from_table(message,glist)
-			grp_comma = grouping_cols + " , "
-			grp_v1_comma = _grp_from_table("v1",glist) + " , "
-			grp_o1_comma = _grp_from_table(out_table_1,glist) + " , "
-			grp_o_comma = _grp_from_table("out",glist) + " , "
-
-			checkg_eo = " AND " + _check_groups(edge_table,out_table,glist)
-			checkg_eout = " AND " + _check_groups("edge","out",glist)
-			checkg_ex = " AND " + _check_groups("edge","x",glist)
-			checkg_om = " AND " + _check_groups(out_table,message,glist)
-			checkg_o1t_sub = _check_groups("out",tmp_view,glist)
-			checkg_ot_sub = _check_groups(out_table,tmp_view,glist)
-			checkg_ot = " AND " + _check_groups(out_table,tmp_view,glist)
-			checkg_o1t = " AND " + _check_groups("out","t",glist)
-			checkg_vv = " AND " + _check_groups("v1","v2",glist)
-			checkg_o2v = " AND " + _check_groups(out_table_2,"v2",glist)
-			checkg_oy = " AND " + _check_groups("out","y",glist)
-			checkg_vv_sub = _check_groups("v1","v2",glist)
-			grp_by = " GROUP BY " + grouping_cols
-
-		w_type = get_expr_type(weight,edge_table).lower()
-		init_w = INT_MAX
-		if w_type in ['real','double precision','float8']:
-			init_w = INFINITY
-
-		# We keep a summary table to keep track of the parameters used for this
-		# APSP run. This table is used in the path finding function to eliminate
-		# the need for repetition.
-		plpy.execute( """ CREATE TABLE {out_table}_summary  (
-			vertex_table            TEXT,
-			vertex_id               TEXT,
-			edge_table              TEXT,
-			edge_args               TEXT,
-			out_table               TEXT,
-			grouping_cols           TEXT)
-			""".format(**locals()))
-		plpy.execute( """ INSERT INTO {out_table}_summary VALUES
-			('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
-			'{out_table}', '{g_st}') """.format(**locals()))
-
-		plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-
-		# Find all of the vertices involved with a given group
-		plpy.execute(""" CREATE VIEW {tmp_view} AS
-			SELECT {src} AS {vertex_id} {comma_grp}
-			FROM {edge_table} WHERE {src} IS NOT NULL
-			UNION
-			SELECT {dest} AS {vertex_id} {comma_grp}
-			FROM {edge_table} WHERE {dest} IS NOT NULL
-			""".format(**locals()))
-
-		# Don't use the unnecessary rows of the output table during joins.
-		ot_sql = """ SELECT * FROM {out_table}
-			WHERE {weight} != {init_w} AND {src} != {dest} """
-
-		# HAWQ does not support UPDATE so the initialization has to differ.
-		if is_hawq:
-
-			plpy.execute(" DROP TABLE IF EXISTS {0},{1}".format(
-				out_table_1,out_table_2))
-			# Create 2 identical tables to swap at every iteration.
-			plpy.execute(""" CREATE TABLE {out_table_1} AS
-				SELECT {grp_comma} {src},{dest},{weight}, NULL::INT AS parent
-				FROM {edge_table} LIMIT 0 {distribution}
-				""".format(**locals()))
-			plpy.execute(""" CREATE TABLE {out_table_2} AS
-				SELECT * FROM {out_table_1} LIMIT 0 {distribution}
-				""".format(**locals()))
-
-			# The source can be reached with 0 cost and next is itself.
-			plpy.execute(""" INSERT INTO {out_table_2}
-				SELECT {grp_comma} {vertex_id} AS {src}, {vertex_id} AS {dest},
-					0 AS {weight}, {vertex_id} AS parent
-				FROM {tmp_view} """.format(**locals()))
-			# Distance = 1: every edge means there is a path from src to dest
-			plpy.execute(""" INSERT INTO {out_table_2}
-				SELECT {grp_comma} {src}, {dest}, {weight}, {dest} AS parent
-				FROM {edge_table} """.format(**locals()))
-
-			# Fill the rest of the possible pairs with infinite initial weights
-			fill_sql = """ INSERT INTO {out_table_1}
-				SELECT {grp_v1_comma}
-					v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest},
-					{init_w} AS {weight}, NULL::INT AS parent
-				FROM {tmp_view} v1, {tmp_view} v2
-				WHERE NOT EXISTS
-					(SELECT 1 FROM {out_table_2}
-						WHERE v1.{vertex_id} = {src} AND
-							v2.{vertex_id} = {dest}
-							{checkg_vv} {checkg_o2v})
-					{checkg_vv}
-				UNION
-				SELECT * FROM {out_table_2}
-				""".format(**locals())
-			plpy.execute(fill_sql)
-
-			ot_sql1 = ot_sql.format(out_table = out_table_1, init_w = init_w,
-				weight = weight, src = src, dest = dest)
-			ot_sql2 = ot_sql.format(out_table = out_table_2, init_w = init_w,
-				weight = weight, src = src, dest = dest)
-
-		# PostgreSQL & GPDB initialization
-		else:
-
-			plpy.execute( """ CREATE TABLE {out_table} AS
-				(SELECT {grp_comma} {src}, {dest}, {weight},
-					{src} AS parent FROM {edge_table} LIMIT 0)
-				{distribution} """.format(**locals()))
-
-			plpy.execute( """ INSERT INTO {out_table}
-				SELECT {grp_v1_comma}
-					v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest},
-					{init_w} AS {weight}, NULL::INT AS parent
-				FROM
-					{tmp_view} AS v1 INNER JOIN
-					{tmp_view} AS v2 ON ({checkg_vv_sub})
-					""".format(**locals()))
-			plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-
-			# GPDB and HAWQ have distributed by clauses to help them with indexing.
-			# For Postgres we add the indices manually.
-			sql_index = m4_ifdef(<!__POSTGRESQL__!>,
-				<!""" CREATE INDEX ON {out_table} ({src});
-				""".format(**locals())!>, <!''!>)
-			plpy.execute(sql_index)
-
-			# The source can be reached with 0 cost and next is itself.
-			plpy.execute(
-				""" UPDATE {out_table} SET
-				{weight} = 0, parent = {vertex_id}
-				FROM {vertex_table}
-				WHERE {out_table}.{src} = {vertex_id}
-				AND {out_table}.{dest} = {vertex_id}
-				""".format(**locals()))
-
-			# Distance = 1: every edge means there is a path from src to dest
-
-			# There may be multiple edges defined as a->b,
-			# we only need the minimum weighted one.
-
-			plpy.execute(
-				""" CREATE VIEW {tmp_view} AS
-					SELECT {grp_comma} {src}, {dest},
-						min({weight}) AS {weight}
-					FROM {edge_table}
-					GROUP BY {grp_comma} {src}, {dest}
-				""".format(**locals()))
-			plpy.execute(
-				""" UPDATE {out_table} SET
-				{weight} = {tmp_view}.{weight}, parent = {tmp_view}.{dest}
-				FROM {tmp_view}
-				WHERE {out_table}.{src} = {tmp_view}.{src}
-				AND {out_table}.{dest} = {tmp_view}.{dest}
-				AND {out_table}.{weight} > {tmp_view}.{weight} {checkg_ot}
-				""".format(**locals()))
-			plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-
-			ot_sql1 = ot_sql.format(**locals())
-			out_table_1 = out_table
-
-		# Find the maximum number of iterations to try
-		# If not done by v_cnt iterations, there is a negative cycle.
-		v_cnt = plpy.execute(
-			""" SELECT max(count) as max FROM (
-					SELECT count(DISTINCT {src}) AS count
-					FROM {out_table_1}
-					{grp_by}) x
-			""".format(**locals()))[0]['max']
-
-		if v_cnt < 2:
-			plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-			plpy.execute("DROP TABLE IF EXISTS {0},{1}".
-					format(out_table, out_table+"_summary"))
-			if is_hawq:
-				plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
-					out_table_1,out_table_2))
-			if grouping_cols:
-				plpy.error(("Graph APSP: {0} has less than 2 vertices in "+
-					"every group.").format(edge_table))
-			else:
-				plpy.error("Graph APSP: {0} has less than 2 vertices.".format(
-					vertex_table))
-
-		for i in range(0,v_cnt+1):
-
-			"""
-			Create a view that will be used to update the output table
-
-			The implementation is based on the matrix multiplication idea.
-			The initial output table consists of 3 sets of vertex pairs.
-			1) for every vervex v: v -> v, weight 0, parent v
-			2) for every edge v1,v2,w: v1 -> v2, weight w, parent v2
-			3) for every other vertex pair v1,v2: v1 -> v2, weight 'Inf',
-				parent NULL
-
-			The algorithm "relaxes" the paths: finds alternate paths with less
-			weights
-			At every step, we look at every combination of non-infinite
-			existing paths and edges to see if we can relax a path.
-
-			Assume the graph is a chain: 1->2->3->...
-			The initial output table will have a finite weighted path for 1->2
-			and infinite for the rest. At ith iteration, the output table will
-			have 1->i path and will relax 1->i+1 path from infinite to a
-			finite value (weight of 1->i path + weight of i->i+1 edge) and
-			assign i as the parent.
-
-			Since using '=' with floats is dangerous we use an epsilon value
-			for comparison.
-			"""
-
-			plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-			update_sql = """ CREATE VIEW {tmp_view} AS
-				SELECT DISTINCT ON ({grp_o_comma} y.{src}, y.{dest})
-					{grp_o_comma} y.{src}, y.{dest},y.{weight}, y.parent
-				FROM  {out_table_1} AS out
-				INNER JOIN
-					(SELECT x.{src}, x.{dest}, x.{weight},
-						out.{dest} as parent {comma_grp_e}
-					FROM
-						({ot_sql1}) AS out
-					INNER JOIN
-						{edge_table} AS edge
-					ON (out.{dest} = edge.{src} {checkg_eout})
-					INNER JOIN
-						(SELECT out.{src}, edge.{dest},
-							min(out.{weight}+edge.{weight}) AS {weight}
-							{comma_grp_e}
-						FROM
-							({ot_sql1}) AS out,
-							{edge_table} AS edge
-						WHERE out.{dest} = edge.{src} {checkg_eout}
-						GROUP BY out.{src},edge.{dest} {comma_grp_e}) x
-					ON (x.{src} = out.{src} AND x.{dest} = edge.{dest} {checkg_ex})
-					WHERE ABS(out.{weight}+edge.{weight} - x.{weight})
-						< {EPSILON}) y
-				ON (y.{src} = out.{src} AND y.{dest} = out.{dest} {checkg_oy})
-				WHERE y.{weight} < out.{weight}
-				""".format(**locals())
-			plpy.execute(update_sql)
-
-			# HAWQ employs alternating tables and has to be handled separately
-			if is_hawq:
-
-				# Stop if therea re no more updates
-				if table_is_empty(tmp_view):
-
-					plpy.execute("DROP VIEW {0}".format(tmp_view))
-					plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(
-						out_table_1,out_table))
-					break
-
-				# The new output table will still have the old values for
-				# every vertex pair that does not appear on the update view.
-
-				plpy.execute("TRUNCATE TABLE {0}".format(out_table_2))
-				fill_sql = """ INSERT INTO {out_table_2}
-					SELECT * FROM {out_table_1} AS out WHERE NOT EXISTS
-						(SELECT 1 FROM {tmp_view} AS t
-						WHERE t.{src} = out.{src} AND
-							t.{dest} = out.{dest} {checkg_o1t})
-					UNION
-					SELECT * FROM {tmp_view}""".format(**locals())
-				plpy.execute(fill_sql)
-
-				# Swap the table names and the sql command we use for filtering
-				tmpname = out_table_1
-				out_table_1 = out_table_2
-				out_table_2 = tmpname
-
-				tmpname = ot_sql1
-				ot_sql1 = ot_sql2
-				ot_sql2 = tmpname
-
-			else:
-
-				updates = plpy.execute(
-					"""	UPDATE {out_table}
-						SET {weight} = {tmp_view}.{weight},
-							parent = {tmp_view}.parent
-						FROM {tmp_view}
-						WHERE {tmp_view}.{src} = {out_table}.{src} AND
-							{tmp_view}.{dest} = {out_table}.{dest} {checkg_ot}
-					""".format(**locals()))
-				if updates.nrows() == 0:
-					break
-
-		# The algorithm should have reached a break command by this point.
-		# This check handles the existence of a negative cycle.
-		if i == v_cnt:
-
-			# If there are no groups, clean up and give error.
-			if grouping_cols is None:
-				plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-				plpy.execute("DROP TABLE IF EXISTS {0},{1}".
-					format(out_table, out_table+"_summary"))
-				if is_hawq:
-					plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
-						out_table_1,out_table_2))
-				plpy.error("Graph APSP: Detected a negative cycle in the graph.")
-
-			# It is possible that not all groups has negative cycles.
-			else:
-				# negs is the string created by collating grouping columns.
-				# By looking at the update view, we can see which groups
-				# are in a negative cycle.
-
-				negs = plpy.execute(
-					""" SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
-						FROM {tmp_view}
-					""".format(**locals()))[0]['grp']
-
-				# Delete the groups with negative cycles from the output table.
-
-				# HAWQ doesn't support DELETE so we have to copy the valid results.
-				if is_hawq:
-					sql_del = """ CREATE TABLE {out_table} AS
-						SELECT *
-						FROM {out_table_1} AS out
-						WHERE NOT EXISTS(
-							SELECT 1
-							FROM {tmp_view}
-							WHERE {checkg_o1t_sub}
-							);"""
-					plpy.execute(sql_del.format(**locals()))
-					plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-					plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
-						out_table_1,out_table_2))
-				else:
-					sql_del = """ DELETE FROM {out_table}
-						USING {tmp_view}
-						WHERE {checkg_ot_sub}"""
-					plpy.execute(sql_del.format(**locals()))
-
-				plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-
-				# If every group has a negative cycle,
-				# drop the output table as well.
-				if table_is_empty(out_table):
-					plpy.execute("DROP TABLE IF EXISTS {0},{1}".
-						format(out_table,out_table+"_summary"))
-				plpy.warning(
-					"""Graph APSP: Detected a negative cycle in the """ +
-					"""sub-graphs of following groups: {0}.""".
-					format(str(negs)[1:-1]))
-
-		else:
-			plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-			if is_hawq:
-				plpy.execute("DROP TABLE IF EXISTS {0}".format(out_table_2))
-
-	return None
 
 def graph_apsp_get_path(schema_madlib, apsp_table,
-	source_vertex, dest_vertex, path_table, **kwargs):
-	"""
-	Helper function that can be used to get the shortest path between any 2
-	vertices
+                        source_vertex, dest_vertex, path_table, **kwargs):
+    """
+    Helper function that can be used to get the shortest path between any 2
+    vertices
     Args:
-    	@param apsp_table		Name of the table that contains the APSP
-    							output.
-        @param source_vertex	The vertex that will be the source of the
-            					desired path.
-        @param dest_vertex		The vertex that will be the destination of the
-            					desired path.
-	"""
-
-	with MinWarning("warning"):
-		_validate_get_path(apsp_table, source_vertex, dest_vertex, path_table)
-
-		temp1_name = unique_string(desp='temp1')
-		temp2_name = unique_string(desp='temp2')
-
-		summary = plpy.execute("SELECT * FROM {0}_summary".format(apsp_table))
-		vertex_id = summary[0]['vertex_id']
-		edge_args = summary[0]['edge_args']
-		grouping_cols = summary[0]['grouping_cols']
-
-		params_types = {'src': str, 'dest': str, 'weight': str}
-		default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
-		edge_params = extract_keyvalue_params(edge_args,
-                                            params_types,
-                                            default_args)
-
-		src = edge_params["src"]
-		dest = edge_params["dest"]
-		weight = edge_params["weight"]
-
-		if vertex_id == "NULL":
-			vertex_id = "id"
-
-		if grouping_cols == "NULL":
-			grouping_cols = None
-
-		select_grps = ""
-		check_grps_t1 = ""
-		check_grps_t2 = ""
-		grp_comma = ""
-		tmp = ""
-
-		if grouping_cols is not None:
-			glist = split_quoted_delimited_str(grouping_cols)
-			select_grps = _grp_from_table(apsp_table,glist) + " , "
-			check_grps_t1 = " AND " + _check_groups(
-				apsp_table,temp1_name,glist)
-			check_grps_t2 = " AND " + _check_groups(
-				apsp_table,temp2_name,glist)
-
-			grp_comma = grouping_cols + " , "
-
-		# If the source and destination is the same vertex.
-		# There is no need to check the paths for any group.
-		if source_vertex == dest_vertex:
-			plpy.execute("""
-				CREATE TABLE {path_table} AS
-				SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
-				FROM {apsp_table} WHERE {src} = {source_vertex} AND
-					{dest} = {dest_vertex}
-				""".format(**locals()))
-			return
-
-		plpy.execute( "DROP TABLE IF EXISTS {0},{1}".
-			format(temp1_name,temp2_name));
-
-		# Initialize the temporary tables
-		out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
-				SELECT {grp_comma} {apsp_table}.parent AS {vertex_id},
-					ARRAY[{dest_vertex}] AS path
-				FROM {apsp_table}
-				WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
-					AND {apsp_table}.parent IS NOT NULL
-			""".format(**locals()))
-
-		plpy.execute("""
-			CREATE TEMP TABLE {temp2_name} AS
-				SELECT * FROM {temp1_name} LIMIT 0
-			""".format(**locals()))
-
-		# Follow the 'parent' chain until you reach the case where the parent
-		# is the same as destination. This means it is the last vertex before
-		# the source.
-		while out.nrows() > 0:
-
-			plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
-			# If the parent id is not the same as dest,
-			# that means we have to follow the chain:
-			# 	Add it to the path and move to its parent
-			out = plpy.execute(
-				""" INSERT INTO {temp2_name}
-				SELECT {select_grps} {apsp_table}.parent AS {vertex_id},
-					{apsp_table}.{dest} || {temp1_name}.path AS path
-				FROM {apsp_table} INNER JOIN {temp1_name} ON
-					({apsp_table}.{dest} = {temp1_name}.{vertex_id}
-						{check_grps_t1})
-				WHERE {src} = {source_vertex} AND
-					{apsp_table}.parent <> {apsp_table}.{dest}
-				""".format(**locals()))
-
-			tmp = temp2_name
-			temp2_name = temp1_name
-			temp1_name = tmp
-
-			tmp = check_grps_t1
-			check_grps_t1 = check_grps_t2
-			check_grps_t2 = tmp
-
-		# We have to consider 3 cases.
-		# 1) The path has more than 2 vertices:
-		# 	Add the current parent and the source vertex
-		# 2) The path has exactly 2 vertices (an edge between src and dest is
-		# the shortest path).
-		#	Add the source vertex
-		# 3) The path has 0 vertices (unreachable)
-		#	Add an empty array.
-
-		# Path with 1 vertex (src == dest) has been handled before
-		plpy.execute("""
-			CREATE TABLE {path_table} AS
-			SELECT {grp_comma} {source_vertex} || ({vertex_id} || path) AS path
-			FROM {temp2_name}
-			WHERE {vertex_id} <> {dest_vertex}
-			UNION
-			SELECT {grp_comma} {source_vertex} || path AS path
-			FROM {temp2_name}
-			WHERE {vertex_id} = {dest_vertex}
-			UNION
-			SELECT {grp_comma} '{{}}'::INT[] AS path
-			FROM {apsp_table}
-			WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
-				AND {apsp_table}.parent IS NULL
-			""".format(**locals()))
-
-		out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
-
-		if out.nrows() == 0:
-			plpy.error( ("Graph APSP: Vertex {0} and/or {1} is not present"+
-				" in the APSP table {1}").format(
-				source_vertex,dest_vertex,apsp_table))
-
-		plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
-			format(**locals()))
-
-	return None
+        @param apsp_table       Name of the table that contains the APSP
+                                output.
+        @param source_vertex    The vertex that will be the source of the
+                                desired path.
+        @param dest_vertex      The vertex that will be the destination of the
+                                desired path.
+    """
+
+    with MinWarning("warning"):
+        _validate_get_path(apsp_table, source_vertex, dest_vertex, path_table)
+
+        temp1_name = unique_string(desp='temp1')
+        temp2_name = unique_string(desp='temp2')
+
+        summary = plpy.execute("SELECT * FROM {0}_summary".format(apsp_table))
+        vertex_id = summary[0]['vertex_id']
+        edge_args = summary[0]['edge_args']
+        grouping_cols = summary[0]['grouping_cols']
+
+        params_types = {'src': str, 'dest': str, 'weight': str}
+        default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
+        edge_params = extract_keyvalue_params(edge_args,
+                                              params_types,
+                                              default_args)
+
+        src = edge_params["src"]
+        dest = edge_params["dest"]
+        weight = edge_params["weight"]
+
+        if vertex_id == "NULL":
+            vertex_id = "id"
+
+        if grouping_cols == "NULL":
+            grouping_cols = None
+
+        select_grps = ""
+        check_grps_t1 = ""
+        check_grps_t2 = ""
+        grp_comma = ""
+        tmp = ""
+
+        if grouping_cols is not None:
+            glist = split_quoted_delimited_str(grouping_cols)
+            select_grps = _grp_from_table(apsp_table, glist) + " , "
+            check_grps_t1 = " AND " + _check_groups(
+                apsp_table, temp1_name, glist)
+            check_grps_t2 = " AND " + _check_groups(
+                apsp_table, temp2_name, glist)
+
+            grp_comma = grouping_cols + " , "
+
+        # If the source and destination is the same vertex.
+        # There is no need to check the paths for any group.
+        if source_vertex == dest_vertex:
+            plpy.execute("""
+                CREATE TABLE {path_table} AS
+                SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
+                FROM {apsp_table} WHERE {src} = {source_vertex} AND
+                    {dest} = {dest_vertex}
+                """.format(**locals()))
+            return
+
+        plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+                     format(temp1_name, temp2_name))
+
+        # Initialize the temporary tables
+        out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
+                SELECT {grp_comma} {apsp_table}.parent AS {vertex_id},
+                    ARRAY[{dest_vertex}] AS path
+                FROM {apsp_table}
+                WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
+                    AND {apsp_table}.parent IS NOT NULL
+            """.format(**locals()))
+
+        plpy.execute("""
+            CREATE TEMP TABLE {temp2_name} AS
+                SELECT * FROM {temp1_name} LIMIT 0
+            """.format(**locals()))
+
+        # Follow the 'parent' chain until you reach the case where the parent
+        # is the same as destination. This means it is the last vertex before
+        # the source.
+        while out.nrows() > 0:
+
+            plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
+            # If the parent id is not the same as dest,
+            # that means we have to follow the chain:
+            #   Add it to the path and move to its parent
+            out = plpy.execute(
+                """ INSERT INTO {temp2_name}
+                SELECT {select_grps} {apsp_table}.parent AS {vertex_id},
+                    {apsp_table}.{dest} || {temp1_name}.path AS path
+                FROM {apsp_table} INNER JOIN {temp1_name} ON
+                    ({apsp_table}.{dest} = {temp1_name}.{vertex_id}
+                        {check_grps_t1})
+                WHERE {src} = {source_vertex} AND
+                    {apsp_table}.parent <> {apsp_table}.{dest}
+                """.format(**locals()))
+
+            tmp = temp2_name
+            temp2_name = temp1_name
+            temp1_name = tmp
+
+            tmp = check_grps_t1
+            check_grps_t1 = check_grps_t2
+            check_grps_t2 = tmp
+
+        # We have to consider 3 cases.
+        # 1) The path has more than 2 vertices:
+        #   Add the current parent and the source vertex
+        # 2) The path has exactly 2 vertices (an edge between src and dest is
+        # the shortest path).
+        #   Add the source vertex
+        # 3) The path has 0 vertices (unreachable)
+        #   Add an empty array.
+
+        # Path with 1 vertex (src == dest) has been handled before
+        plpy.execute("""
+            CREATE TABLE {path_table} AS
+            SELECT {grp_comma} {source_vertex} || ({vertex_id} || path) AS path
+            FROM {temp2_name}
+            WHERE {vertex_id} <> {dest_vertex}
+            UNION
+            SELECT {grp_comma} {source_vertex} || path AS path
+            FROM {temp2_name}
+            WHERE {vertex_id} = {dest_vertex}
+            UNION
+            SELECT {grp_comma} '{{}}'::INT[] AS path
+            FROM {apsp_table}
+            WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
+                AND {apsp_table}.parent IS NULL
+            """.format(**locals()))
+
+        out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
+
+        if out.nrows() == 0:
+            plpy.error(("Graph APSP: Vertex {0} and/or {1} is not present" +
+                        " in the APSP table {1}").format(
+                source_vertex, dest_vertex, apsp_table))
+
+        plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
+                     format(**locals()))
+
+    return None
+
 
 def _validate_apsp(vertex_table, vertex_id, edge_table, edge_params,
-	out_table, glist, **kwargs):
+                   out_table, glist, **kwargs):
+
+    validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
+                          out_table, 'APSP')
 
-	validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
-		out_table,'APSP')
+    vt_error = plpy.execute(
+        """ SELECT {vertex_id}
+            FROM {vertex_table}
+            WHERE {vertex_id} IS NOT NULL
+            GROUP BY {vertex_id}
+            HAVING count(*) > 1 """.format(**locals()))
 
-	vt_error = plpy.execute(
-		""" SELECT {vertex_id}
-			FROM {vertex_table}
-			WHERE {vertex_id} IS NOT NULL
-			GROUP BY {vertex_id}
-			HAVING count(*) > 1 """.format(**locals()))
+    if vt_error.nrows() != 0:
+        plpy.error(
+            """Graph APSP: Source vertex table {vertex_table} contains duplicate vertex id's.""".
+            format(**locals()))
 
-	if vt_error.nrows() != 0:
-		plpy.error(
-			"""Graph APSP: Source vertex table {vertex_table} contains duplicate vertex id's.""".
-			format(**locals()))
+    _assert(not table_exists(out_table + "_summary"),
+            "Graph APSP: Output summary table already exists!")
 
-	_assert(not table_exists(out_table+"_summary"),
-		"Graph APSP: Output summary table already exists!")
+    if glist is not None:
+        _assert(columns_exist_in_table(edge_table, glist),
+                """Graph APSP: Not all columns from {glist} are present in edge table ({edge_table}).""".
+                format(**locals()))
 
-	if glist is not None:
-		_assert(columns_exist_in_table(edge_table, glist),
-			"""Graph APSP: Not all columns from {glist} are present in edge table ({edge_table}).""".
-			format(**locals()))
 
 def _validate_get_path(apsp_table, source_vertex, dest_vertex,
-	path_table, **kwargs):
+                       path_table, **kwargs):
 
-	_assert(apsp_table and apsp_table.strip().lower() not in ('null', ''),
-		"Graph APSP: Invalid APSP table name!")
-	_assert(table_exists(apsp_table),
-		"Graph APSP: APSP table ({0}) is missing!".format(apsp_table))
-	_assert(not table_is_empty(apsp_table),
-		"Graph APSP: APSP table ({0}) is empty!".format(apsp_table))
+    _assert(apsp_table and apsp_table.strip().lower() not in ('null', ''),
+            "Graph APSP: Invalid APSP table name!")
+    _assert(table_exists(apsp_table),
+            "Graph APSP: APSP table ({0}) is missing!".format(apsp_table))
+    _assert(not table_is_empty(apsp_table),
+            "Graph APSP: APSP table ({0}) is empty!".format(apsp_table))
 
-	summary = apsp_table+"_summary"
-	_assert(table_exists(summary),
-		"Graph APSP: APSP summary table ({0}) is missing!".format(summary))
-	_assert(not table_is_empty(summary),
-		"Graph APSP: APSP summary table ({0}) is empty!".format(summary))
+    summary = apsp_table + "_summary"
+    _assert(table_exists(summary),
+            "Graph APSP: APSP summary table ({0}) is missing!".format(summary))
+    _assert(not table_is_empty(summary),
+            "Graph APSP: APSP summary table ({0}) is empty!".format(summary))
 
-	_assert(not table_exists(path_table),
-		"Graph APSP: Output path table already exists!")
+    _assert(not table_exists(path_table),
+            "Graph APSP: Output path table already exists!")
+
+    return None
 
-	return None
 
 def graph_apsp_help(schema_madlib, message, **kwargs):
-	"""
-	Help function for graph_apsp and graph_apsp_get_path
-
-	Args:
-		@param schema_madlib
-		@param message: string, Help message string
-		@param kwargs
-
-	Returns:
-	    String. Help/usage information
-	"""
-	if not message:
-		help_string = """
+    """
+    Help function for graph_apsp and graph_apsp_get_path
+
+    Args:
+        @param schema_madlib
+        @param message: string, Help message string
+        @param kwargs
+
+    Returns:
+        String. Help/usage information
+    """
+    if not message:
+        help_string = """
 -----------------------------------------------------------------------
                             SUMMARY
 -----------------------------------------------------------------------
@@ -717,8 +717,8 @@ edges is minimized.
 For more details on function usage:
     SELECT {schema_madlib}.graph_apsp('usage')
             """
-	elif message.lower() in ['usage', 'help', '?']:
-		help_string = """
+    elif message.lower() in ['usage', 'help', '?']:
+        help_string = """
 Given a graph, all pairs shortest path (apsp) algorithm finds a path for
 every vertex pair such that the sum of the weights of its constituent
 edges is minimized.
@@ -728,10 +728,10 @@ edges is minimized.
 To retrieve the path for a specific vertex pair:
 
  SELECT {schema_madlib}.graph_apsp_get_path(
-    apsp_table	 TEXT, -- Name of the table that contains the apsp output.
+    apsp_table   TEXT, -- Name of the table that contains the apsp output.
     source_vertex INT, -- The vertex that will be the source of the
                        -- desired path.
-    dest_vertex	  INT, -- The vertex that will be the destination of the
+    dest_vertex   INT, -- The vertex that will be the destination of the
                        -- desired path.
     path_table   TEXT  -- Name of the output table that contains the path.
 );
@@ -762,8 +762,8 @@ every group and has the following columns:
   - path (ARRAY)  : The shortest path from the source vertex to the
                   destination vertex.
 """
-	elif message.lower() in ("example", "examples"):
-		help_string = """
+    elif message.lower() in ("example", "examples"):
+        help_string = """
 ----------------------------------------------------------------------------
                                 EXAMPLES
 ----------------------------------------------------------------------------
@@ -806,11 +806,11 @@ INSERT INTO edge VALUES
 -- Compute the apsp:
 DROP TABLE IF EXISTS out;
 SELECT madlib.graph_apsp(
-	'vertex',                            -- Vertex table
-	'id',                                -- Vertix id column
-	'edge',                              -- Edge table
-	'src=src, dest=dest, weight=weight', -- Comma delimited string of edge arguments
-	'out'                                -- Output table of apsp
+    'vertex',                            -- Vertex table
+    'id',                                -- Vertix id column
+    'edge',                              -- Edge table
+    'src=src, dest=dest, weight=weight', -- Comma delimited string of edge arguments
+    'out'                                -- Output table of apsp
 );
 -- View the apsp costs for every vertex:
 SELECT * FROM out ORDER BY src, dest;
@@ -834,11 +834,11 @@ INSERT INTO edge_gr VALUES
 DROP TABLE IF EXISTS out_gr, out_gr_summary;
 SELECT graph_apsp('vertex',NULL,'edge_gr',NULL,'out_gr','grp');
 """
-	else:
-		help_string = "No such option. Use {schema_madlib}.graph_apsp()"
+    else:
+        help_string = "No such option. Use {schema_madlib}.graph_apsp()"
 
-	return help_string.format(schema_madlib=schema_madlib,
-		graph_usage=get_graph_usage(schema_madlib, 'graph_apsp',
-    """out_table     TEXT, -- Name of the table to store the result of apsp.
+    return help_string.format(schema_madlib=schema_madlib,
+                              graph_usage=get_graph_usage(schema_madlib, 'graph_apsp',
+                                                          """out_table     TEXT, -- Name of the table to store the result of apsp.
     grouping_cols TEXT  -- The list of grouping columns."""))
 # ---------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/graph_utils.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/graph_utils.py_in b/src/ports/postgres/modules/graph/graph_utils.py_in
index 944c301..9d31345 100644
--- a/src/ports/postgres/modules/graph/graph_utils.py_in
+++ b/src/ports/postgres/modules/graph/graph_utils.py_in
@@ -27,115 +27,115 @@
 @namespace graph
 """
 
-import plpy
-from utilities.control import MinWarning
 from utilities.utilities import _assert
-from utilities.utilities import extract_keyvalue_params
-from utilities.utilities import unique_string
 from utilities.validate_args import get_cols
 from utilities.validate_args import unquote_ident
 from utilities.validate_args import table_exists
 from utilities.validate_args import columns_exist_in_table
 from utilities.validate_args import table_is_empty
 
+
 def _grp_null_checks(grp_list):
 
     """
-    Helper function for generating NULL checks for grouping columns 
+    Helper function for generating NULL checks for grouping columns
     to be used within a WHERE clause
     Args:
         @param grp_list   The list of grouping columns
     """
     return ' AND '.join([" {i} IS NOT NULL ".format(**locals())
-        for i in grp_list])
+                        for i in grp_list])
+
 
 def _check_groups(tbl1, tbl2, grp_list):
+    """
+    Helper function for joining tables with groups.
+    Args:
+            @param tbl1       Name of the first table
+            @param tbl2       Name of the second table
+            @param grp_list   The list of grouping columns
+    """
 
-	"""
-	Helper function for joining tables with groups.
-	Args:
-		@param tbl1       Name of the first table
-		@param tbl2       Name of the second table
-		@param grp_list   The list of grouping columns
-	"""
+    return ' AND '.join([" {tbl1}.{i} = {tbl2}.{i} ".format(**locals())
+                         for i in grp_list])
 
-	return ' AND '.join([" {tbl1}.{i} = {tbl2}.{i} ".format(**locals())
-		for i in grp_list])
 
 def _grp_from_table(tbl, grp_list):
+    """
+    Helper function for selecting grouping columns of a table
+    Args:
+            @param tbl        Name of the table
+            @param grp_list   The list of grouping columns
+    """
+    return ' , '.join([" {tbl}.{i} ".format(**locals())
+                       for i in grp_list])
 
-	"""
-	Helper function for selecting grouping columns of a table
-	Args:
-		@param tbl        Name of the table
-		@param grp_list   The list of grouping columns
-	"""
-	return ' , '.join([" {tbl}.{i} ".format(**locals())
-		for i in grp_list])
 
 def validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
-	out_table, func_name, **kwargs):
-	"""
-	Validates graph tables (vertex and edge) as well as the output table.
-	"""
-	_assert(out_table and out_table.strip().lower() not in ('null', ''),
-		"Graph {func_name}: Invalid output table name!".format(**locals()))
-	_assert(not table_exists(out_table),
-		"Graph {func_name}: Output table already exists!".format(**locals()))
-
-	_assert(vertex_table and vertex_table.strip().lower() not in ('null', ''),
-		"Graph {func_name}: Invalid vertex table name!".format(**locals()))
-	_assert(table_exists(vertex_table),
-		"Graph {func_name}: Vertex table ({vertex_table}) is missing!".format(
-			**locals()))
-	_assert(not table_is_empty(vertex_table),
-		"Graph {func_name}: Vertex table ({vertex_table}) is empty!".format(
-			**locals()))
-
-	_assert(edge_table and edge_table.strip().lower() not in ('null', ''),
-		"Graph {func_name}: Invalid edge table name!".format(**locals()))
-	_assert(table_exists(edge_table),
-		"Graph {func_name}: Edge table ({edge_table}) is missing!".format(
-			**locals()))
-	_assert(not table_is_empty(edge_table),
-		"Graph {func_name}: Edge table ({edge_table}) is empty!".format(
-			**locals()))
-
-	existing_cols = set(unquote_ident(i) for i in get_cols(vertex_table))
-	_assert(vertex_id in existing_cols,
-		"""Graph {func_name}: The vertex column {vertex_id} is not present in vertex table ({vertex_table}) """.
-		format(**locals()))
-	_assert(columns_exist_in_table(edge_table, edge_params.values()),
-		"""Graph {func_name}: Not all columns from {cols} are present in edge table ({edge_table})""".
-		format(cols=edge_params.values(), **locals()))
-
-	return None
+                          out_table, func_name, **kwargs):
+    """
+    Validates graph tables (vertex and edge) as well as the output table.
+    """
+    _assert(out_table and out_table.strip().lower() not in ('null', ''),
+            "Graph {func_name}: Invalid output table name!".format(**locals()))
+    _assert(not table_exists(out_table),
+            "Graph {func_name}: Output table already exists!".format(**locals()))
+
+    _assert(vertex_table and vertex_table.strip().lower() not in ('null', ''),
+            "Graph {func_name}: Invalid vertex table name!".format(**locals()))
+    _assert(table_exists(vertex_table),
+            "Graph {func_name}: Vertex table ({vertex_table}) is missing!".format(
+        **locals()))
+    _assert(not table_is_empty(vertex_table),
+            "Graph {func_name}: Vertex table ({vertex_table}) is empty!".format(
+        **locals()))
+
+    _assert(edge_table and edge_table.strip().lower() not in ('null', ''),
+            "Graph {func_name}: Invalid edge table name!".format(**locals()))
+    _assert(table_exists(edge_table),
+            "Graph {func_name}: Edge table ({edge_table}) is missing!".format(
+        **locals()))
+    _assert(not table_is_empty(edge_table),
+            "Graph {func_name}: Edge table ({edge_table}) is empty!".format(
+        **locals()))
+
+    existing_cols = set(unquote_ident(i) for i in get_cols(vertex_table))
+    _assert(vertex_id in existing_cols,
+            """Graph {func_name}: The vertex column {vertex_id} is not present in vertex table ({vertex_table}) """.
+            format(**locals()))
+    _assert(columns_exist_in_table(edge_table, edge_params.values()),
+            """Graph {func_name}: Not all columns from {cols} are present in edge table ({edge_table})""".
+            format(cols=edge_params.values(), **locals()))
+
+    return None
+
 
 def get_graph_usage(schema_madlib, func_name, other_text):
 
-	usage = """
-----------------------------------------------------------------------------
-                            USAGE
-----------------------------------------------------------------------------
- SELECT {schema_madlib}.{func_name}(
-    vertex_table  TEXT, -- Name of the table that contains the vertex data.
-    vertex_id     TEXT, -- Name of the column containing the vertex ids.
-    edge_table    TEXT, -- Name of the table that contains the edge data.
-    edge_args     TEXT{comma} -- A comma-delimited string containing multiple
-                        -- named arguments of the form "name=value".
-    {other_text}
-);
-
-The following parameters are supported for edge table arguments ('edge_args'
-	above):
-
-src (default = 'src')		: Name of the column containing the source
-				vertex ids in the edge table.
-dest (default = 'dest')		: Name of the column containing the destination
-				vertex ids in the edge table.
-weight (default = 'weight')	: Name of the column containing the weight of
-				edges in the edge table.
-""".format(schema_madlib=schema_madlib, func_name=func_name,
-	other_text=other_text, comma = ',' if other_text is not None else ' ')
-
-	return usage
+    usage = """
+    ----------------------------------------------------------------------------
+                                USAGE
+    ----------------------------------------------------------------------------
+     SELECT {schema_madlib}.{func_name}(
+        vertex_table  TEXT, -- Name of the table that contains the vertex data.
+        vertex_id     TEXT, -- Name of the column containing the vertex ids.
+        edge_table    TEXT, -- Name of the table that contains the edge data.
+        edge_args     TEXT{comma} -- A comma-delimited string containing multiple
+                            -- named arguments of the form "name=value".
+        {other_text}
+    );
+
+    The following parameters are supported for edge table arguments
+    ('edge_args' above):
+
+    src (default = 'src'): Name of the column containing the source
+                           vertex ids in the edge table.
+    dest (default = 'dest'): Name of the column containing the destination
+                            vertex ids in the edge table.
+    weight (default = 'weight'): Name of the column containing the weight of
+                                 edges in the edge table.
+    """.format(schema_madlib=schema_madlib,
+               func_name=func_name,
+               other_text=other_text,
+               comma=',' if other_text is not None else ' ')
+    return usage