You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ok...@apache.org on 2017/08/29 20:41:54 UTC

[17/50] [abbrv] incubator-madlib git commit: Graph: Update Python code to follow PEP-8

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/sssp.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/sssp.py_in b/src/ports/postgres/modules/graph/sssp.py_in
index 4839d2d..93497c4 100644
--- a/src/ports/postgres/modules/graph/sssp.py_in
+++ b/src/ports/postgres/modules/graph/sssp.py_in
@@ -33,21 +33,22 @@ from graph_utils import get_graph_usage
 from graph_utils import _grp_from_table
 from graph_utils import _check_groups
 from utilities.control import MinWarning
+
 from utilities.utilities import _assert
 from utilities.utilities import extract_keyvalue_params
 from utilities.utilities import unique_string
 from utilities.utilities import split_quoted_delimited_str
+from utilities.utilities import is_platform_pg, is_platform_hawq
+
 from utilities.validate_args import table_exists
 from utilities.validate_args import columns_exist_in_table
 from utilities.validate_args import table_is_empty
 from utilities.validate_args import get_expr_type
 
-m4_changequote(`<!', `!>')
 
 def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
-		edge_args, source_vertex, out_table, grouping_cols, **kwargs):
-
-	"""
+               edge_args, source_vertex, out_table, grouping_cols, **kwargs):
+    """
     Single source shortest path function for graphs using the Bellman-Ford
     algorhtm [1].
     Args:
@@ -63,383 +64,382 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
     [1] https://en.wikipedia.org/wiki/Bellman-Ford_algorithm
     """
 
-	with MinWarning("warning"):
-
-		INT_MAX = 2147483647
-		INFINITY = "'Infinity'"
-		EPSILON = 0.000001
-
-		message = unique_string(desp='message')
-
-		oldupdate = unique_string(desp='oldupdate')
-		newupdate = unique_string(desp='newupdate')
-
-		params_types = {'src': str, 'dest': str, 'weight': str}
-		default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
-		edge_params = extract_keyvalue_params(edge_args,
-                                            params_types,
-                                            default_args)
-
-		# Prepare the input for recording in the summary table
-		if vertex_id is None:
-			v_st= "NULL"
-			vertex_id = "id"
-		else:
-			v_st = vertex_id
-		if edge_args is None:
-			e_st = "NULL"
-		else:
-			e_st = edge_args
-		if grouping_cols is None:
-			g_st = "NULL"
-			glist = None
-		else:
-			g_st = grouping_cols
-			glist = split_quoted_delimited_str(grouping_cols)
-
-		src = edge_params["src"]
-		dest = edge_params["dest"]
-		weight = edge_params["weight"]
-
-		distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-			<!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
-		local_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-			<!"DISTRIBUTED BY (id)"!>)
-
-		is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
-		_validate_sssp(vertex_table, vertex_id, edge_table,
-			edge_params, source_vertex, out_table, glist)
-
-		plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format(
-			message,oldupdate,newupdate))
-
-		# Initialize grouping related variables
-		comma_grp = ""
-		comma_grp_e = ""
-		comma_grp_m = ""
-		grp_comma = ""
-		checkg_oo = ""
-		checkg_eo = ""
-		checkg_ex = ""
-		checkg_om = ""
-		group_by = ""
-
-		if grouping_cols is not None:
-			comma_grp = " , " + grouping_cols
-			group_by = " , " + _grp_from_table(edge_table,glist)
-			comma_grp_e = " , " + _grp_from_table(edge_table,glist)
-			comma_grp_m = " , " + _grp_from_table("message",glist)
-			grp_comma = grouping_cols + " , "
-
-			checkg_oo_sub = _check_groups(out_table,"oldupdate",glist)
-			checkg_oo = " AND " + checkg_oo_sub
-			checkg_eo = " AND " + _check_groups(edge_table,"oldupdate",glist)
-			checkg_ex = " AND " + _check_groups(edge_table,"x",glist)
-			checkg_om = " AND " + _check_groups("out_table","message",glist)
-
-		w_type = get_expr_type(weight,edge_table).lower()
-		init_w = INT_MAX
-		if w_type in ['real','double precision','float8']:
-			init_w = INFINITY
-
-		# We keep a table of every vertex, the minimum cost to that destination
-		# seen so far and the parent to this vertex in the associated shortest
-		# path. This table will be updated throughout the execution.
-		plpy.execute(
-			""" CREATE TABLE {out_table} AS ( SELECT
-					{grp_comma} {src} AS {vertex_id}, {weight},
-					{src} AS parent FROM {edge_table} LIMIT 0)
-				{distribution} """.format(**locals()))
-
-		# We keep a summary table to keep track of the parameters used for this
-		# SSSP run. This table is used in the path finding function to eliminate
-		# the need for repetition.
-		plpy.execute( """ CREATE TABLE {out_table}_summary  (
-			vertex_table            TEXT,
-			vertex_id               TEXT,
-			edge_table              TEXT,
-			edge_args               TEXT,
-			source_vertex           INTEGER,
-			out_table               TEXT,
-			grouping_cols           TEXT)
-			""".format(**locals()))
-		plpy.execute( """ INSERT INTO {out_table}_summary VALUES
-			('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
-			{source_vertex}, '{out_table}', '{g_st}')
-			""".format(**locals()))
-
-		# We keep 2 update tables and alternate them during the execution.
-		# This is necessary since we need to know which vertices are updated in
-		# the previous iteration to calculate the next set of updates.
-		plpy.execute(
-			""" CREATE TEMP TABLE {oldupdate} AS ( SELECT
-					{src} AS id, {weight},
-					{src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
-				{local_distribution}
-				""".format(**locals()))
-		plpy.execute(
-			""" CREATE TEMP TABLE {newupdate} AS ( SELECT
-					{src} AS id, {weight},
-					{src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
-				{local_distribution}
-				""".format(**locals()))
-
-		# Since HAWQ does not allow us to update, we create a new table and
-		# rename at every iteration.
-		if is_hawq:
-			temp_table = unique_string(desp='temp')
-			sql =""" CREATE TABLE {temp_table} AS ( SELECT * FROM {out_table} )
-				{distribution} """
-			plpy.execute(sql.format(**locals()))
-
-		# GPDB and HAWQ have distributed by clauses to help them with indexing.
-		# For Postgres we add the indices manually.
-		sql_index = m4_ifdef(<!__POSTGRESQL__!>,
-			<!""" CREATE INDEX ON {out_table} ({vertex_id});
-				CREATE INDEX ON {oldupdate} (id);
-				CREATE INDEX ON {newupdate} (id);
-			""".format(**locals())!>,
-			<!''!>)
-		plpy.execute(sql_index)
-
-		# The initialization step is quite different when grouping is involved
-		# since not every group (subgraph) will have the same set of vertices.
-
-		# Example:
-		# Assume there are two grouping columns g1 and g2
-		# g1 values are 0 and 1. g2 values are 5 and 6
-		if grouping_cols is not None:
-
-			distinct_grp_table = unique_string(desp='grp')
-			plpy.execute(""" DROP TABLE IF EXISTS {distinct_grp_table} """.
-				format(**locals()))
-			plpy.execute( """ CREATE TEMP TABLE {distinct_grp_table} AS
-				SELECT DISTINCT {grouping_cols} FROM {edge_table} """.
-				format(**locals()))
-			subq = unique_string(desp='subquery')
-
-			checkg_ds_sub = _check_groups(distinct_grp_table,subq,glist)
-			grp_d_comma = _grp_from_table(distinct_grp_table,glist) +","
-
-			plpy.execute(
-				""" INSERT INTO {out_table}
-				SELECT {grp_d_comma} {vertex_id} AS {vertex_id},
-					{init_w} AS {weight}, NULL::INT AS parent
-				FROM {distinct_grp_table} INNER JOIN
-					(
-					SELECT {src} AS {vertex_id} {comma_grp}
-					FROM {edge_table}
-					UNION
-					SELECT {dest} AS {vertex_id} {comma_grp}
-					FROM {edge_table}
-					) {subq} ON ({checkg_ds_sub})
-				WHERE {vertex_id} IS NOT NULL
-				""".format(**locals()))
-
-			plpy.execute(
-				""" INSERT INTO {oldupdate}
-					SELECT {source_vertex}, 0, {source_vertex},
-					{grouping_cols}
-					FROM {distinct_grp_table}
-				""".format(**locals()))
-
-			# The maximum number of vertices for any group.
-			# Used for determining negative cycles.
-			v_cnt = plpy.execute(
-				""" SELECT max(count) as max FROM (
-						SELECT count({vertex_id}) AS count
-						FROM {out_table}
-						GROUP BY {grouping_cols}) x
-				""".format(**locals()))[0]['max']
-			plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
-		else:
-			plpy.execute(
-				""" INSERT INTO {out_table}
-				SELECT {vertex_id} AS {vertex_id},
-					{init_w} AS {weight},
-					NULL AS parent
-				FROM {vertex_table}
-				WHERE {vertex_id} IS NOT NULL
-				 """.format(**locals()))
-
-			# The source can be reached with 0 cost and it has itself as the
-			# parent.
-			plpy.execute(
-				""" INSERT INTO {oldupdate}
-					VALUES({source_vertex},0,{source_vertex})
-				""".format(**locals()))
-
-			v_cnt = plpy.execute(
-				""" SELECT count(*) FROM {vertex_table}
-				WHERE {vertex_id} IS NOT NULL
-				""".format(**locals()))[0]['count']
-
-		for i in range(0,v_cnt+1):
-
-			# Apply the updates calculated in the last iteration.
-			if is_hawq:
-				sql = """
-				TRUNCATE TABLE {temp_table};
-				INSERT INTO {temp_table}
-					SELECT *
-					FROM {out_table}
-					WHERE NOT EXISTS (
-						SELECT 1
-						FROM {oldupdate} as oldupdate
-						WHERE {out_table}.{vertex_id} = oldupdate.id
-						{checkg_oo})
-					UNION
-					SELECT {grp_comma} id, {weight}, parent FROM {oldupdate};
-				"""
-				plpy.execute(sql.format(**locals()))
-				plpy.execute("DROP TABLE {0}".format(out_table))
-				plpy.execute("ALTER TABLE {0} RENAME TO {1}".
-					format(temp_table,out_table))
-				sql = """ CREATE TABLE {temp_table} AS (
-					SELECT * FROM {out_table} LIMIT 0)
-					{distribution};"""
-				plpy.execute(sql.format(**locals()))
-				ret = plpy.execute("SELECT id FROM {0} LIMIT 1".
-					format(oldupdate))
-			else:
-				sql = """
-				UPDATE {out_table} SET
-				{weight}=oldupdate.{weight},
-				parent=oldupdate.parent
-				FROM
-				{oldupdate} AS oldupdate
-				WHERE
-				{out_table}.{vertex_id}=oldupdate.id AND
-				{out_table}.{weight}>oldupdate.{weight} {checkg_oo}
-				"""
-				ret = plpy.execute(sql.format(**locals()))
-
-			if ret.nrows() == 0:
-				break
-
-			plpy.execute("TRUNCATE TABLE {0}".format(newupdate))
-
-			# 'oldupdate' table has the update info from the last iteration
-
-			# Consider every edge that has an updated source
-			# From these edges:
-			# For every destination vertex, find the min total cost to reach.
-			# Note that, just calling an aggregate function with group by won't
-			# let us store the src field of the edge (needed for the parent).
-			# This is why we need the 'x'; it gives a list of destinations and
-			# associated min values. Using these values, we identify which edge
-			# is selected.
-
-			# Since using '=' with floats is dangerous we use an epsilon value
-			# for comparison.
-
-			# Once we have a list of edges and values (stores as 'message'),
-			# we check if these values are lower than the existing shortest
-			# path values.
-
-			sql = (""" INSERT INTO {newupdate}
-				SELECT DISTINCT ON (message.id {comma_grp})
-					message.id AS id,
-					message.{weight} AS {weight},
-					message.parent AS parent {comma_grp_m}
-				FROM {out_table} AS out_table INNER JOIN
-					(
-					SELECT {edge_table}.{dest} AS id, x.{weight} AS {weight},
-						oldupdate.id AS parent {comma_grp_e}
-					FROM {oldupdate} AS oldupdate INNER JOIN
-						{edge_table}  ON
-							({edge_table}.{src} = oldupdate.id {checkg_eo})
-						INNER JOIN
-						(
-						SELECT {edge_table}.{dest} AS id,
-							min(oldupdate.{weight} +
-								{edge_table}.{weight}) AS {weight} {comma_grp_e}
-						FROM {oldupdate} AS oldupdate INNER JOIN
-							{edge_table}  ON
-							({edge_table}.{src}=oldupdate.id {checkg_eo})
-						GROUP BY {edge_table}.{dest} {comma_grp_e}
-						) x
-						ON ({edge_table}.{dest} = x.id {checkg_ex} )
-					WHERE ABS(oldupdate.{weight} + {edge_table}.{weight}
-								- x.{weight}) < {EPSILON}
-					) message
-					ON (message.id = out_table.{vertex_id} {checkg_om})
-				WHERE message.{weight}<out_table.{weight}
-				""".format(**locals()))
-
-			plpy.execute(sql)
-
-			# Swap the update tables for the next iteration.
-			tmp = oldupdate
-			oldupdate = newupdate
-			newupdate = tmp
-
-		plpy.execute("DROP TABLE IF EXISTS {0}".format(newupdate))
-		# The algorithm should converge in less than |V| iterations.
-		# Otherwise there is a negative cycle in the graph.
-		if i == v_cnt:
-			if grouping_cols is None:
-				plpy.execute("DROP TABLE IF EXISTS {0},{1},{2}".
-					format(out_table, out_table+"_summary", oldupdate))
-				if is_hawq:
-					plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_table))
-				plpy.error("Graph SSSP: Detected a negative cycle in the graph.")
-
-			# It is possible that not all groups has negative cycles.
-			else:
-
-				# negs is the string created by collating grouping columns.
-				# By looking at the oldupdate table we can see which groups
-				# are in a negative cycle.
-
-				negs = plpy.execute(
-					""" SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
-						FROM {oldupdate}
-					""".format(**locals()))[0]['grp']
-
-				# Delete the groups with negative cycles from the output table.
-				if is_hawq:
-					sql_del = """
-						TRUNCATE TABLE {temp_table};
-						INSERT INTO {temp_table}
-							SELECT *
-							FROM {out_table}
-							WHERE NOT EXISTS(
-								SELECT 1
-								FROM {oldupdate} as oldupdate
-								WHERE {checkg_oo_sub}
-								);"""
-					plpy.execute(sql_del.format(**locals()))
-					plpy.execute("DROP TABLE {0}".format(out_table))
-					plpy.execute("ALTER TABLE {0} RENAME TO {1}".
-						format(temp_table,out_table))
-				else:
-					sql_del = """ DELETE FROM {out_table}
-						USING {oldupdate} AS oldupdate
-						WHERE {checkg_oo_sub}"""
-					plpy.execute(sql_del.format(**locals()))
-
-				# If every group has a negative cycle,
-				# drop the output table as well.
-				if table_is_empty(out_table):
-					plpy.execute("DROP TABLE IF EXISTS {0},{1}".
-						format(out_table,out_table+"_summary"))
-
-				plpy.warning(
-					"""Graph SSSP: Detected a negative cycle in the """ +
-					"""sub-graphs of following groups: {0}.""".
-					format(str(negs)[1:-1]))
-
-		plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
-		if is_hawq:
-			plpy.execute("DROP TABLE IF EXISTS {temp_table} ".
-				format(**locals()))
-
-	return None
+    with MinWarning("warning"):
+
+        INT_MAX = 2147483647
+        INFINITY = "'Infinity'"
+        EPSILON = 0.000001
+
+        message = unique_string(desp='message')
+
+        oldupdate = unique_string(desp='oldupdate')
+        newupdate = unique_string(desp='newupdate')
+
+        params_types = {'src': str, 'dest': str, 'weight': str}
+        default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
+        edge_params = extract_keyvalue_params(edge_args,
+                                              params_types,
+                                              default_args)
+
+        # Prepare the input for recording in the summary table
+        if vertex_id is None:
+            v_st = "NULL"
+            vertex_id = "id"
+        else:
+            v_st = vertex_id
+        if edge_args is None:
+            e_st = "NULL"
+        else:
+            e_st = edge_args
+        if grouping_cols is None:
+            g_st = "NULL"
+            glist = None
+        else:
+            g_st = grouping_cols
+            glist = split_quoted_delimited_str(grouping_cols)
+
+        src = edge_params["src"]
+        dest = edge_params["dest"]
+        weight = edge_params["weight"]
+
+        distribution = '' if is_platform_pg() else "DISTRIBUTED BY ({0})".format(vertex_id)
+        local_distribution = '' if is_platform_pg() else "DISTRIBUTED BY (id)"
+
+        is_hawq = is_platform_hawq()
+
+        _validate_sssp(vertex_table, vertex_id, edge_table,
+                       edge_params, source_vertex, out_table, glist)
+
+        plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format(
+            message, oldupdate, newupdate))
+
+        # Initialize grouping related variables
+        comma_grp = ""
+        comma_grp_e = ""
+        comma_grp_m = ""
+        grp_comma = ""
+        checkg_oo = ""
+        checkg_eo = ""
+        checkg_ex = ""
+        checkg_om = ""
+        group_by = ""
+
+        if grouping_cols is not None:
+            comma_grp = " , " + grouping_cols
+            group_by = " , " + _grp_from_table(edge_table, glist)
+            comma_grp_e = " , " + _grp_from_table(edge_table, glist)
+            comma_grp_m = " , " + _grp_from_table("message", glist)
+            grp_comma = grouping_cols + " , "
+
+            checkg_oo_sub = _check_groups(out_table, "oldupdate", glist)
+            checkg_oo = " AND " + checkg_oo_sub
+            checkg_eo = " AND " + _check_groups(edge_table, "oldupdate", glist)
+            checkg_ex = " AND " + _check_groups(edge_table, "x", glist)
+            checkg_om = " AND " + _check_groups("out_table", "message", glist)
+
+        w_type = get_expr_type(weight, edge_table).lower()
+        init_w = INT_MAX
+        if w_type in ['real', 'double precision', 'float8']:
+            init_w = INFINITY
+
+        # We keep a table of every vertex, the minimum cost to that destination
+        # seen so far and the parent to this vertex in the associated shortest
+        # path. This table will be updated throughout the execution.
+        plpy.execute(
+            """ CREATE TABLE {out_table} AS (SELECT
+                    {grp_comma} {src} AS {vertex_id}, {weight},
+                    {src} AS parent FROM {edge_table} LIMIT 0)
+                {distribution} """.format(**locals()))
+
+        # We keep a summary table to keep track of the parameters used for this
+        # SSSP run. This table is used in the path finding function to eliminate
+        # the need for repetition.
+        plpy.execute(""" CREATE TABLE {out_table}_summary  (
+            vertex_table            TEXT,
+            vertex_id               TEXT,
+            edge_table              TEXT,
+            edge_args               TEXT,
+            source_vertex           INTEGER,
+            out_table               TEXT,
+            grouping_cols           TEXT)
+            """.format(**locals()))
+        plpy.execute(""" INSERT INTO {out_table}_summary VALUES
+            ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
+            {source_vertex}, '{out_table}', '{g_st}')
+            """.format(**locals()))
+
+        # We keep 2 update tables and alternate them during the execution.
+        # This is necessary since we need to know which vertices are updated in
+        # the previous iteration to calculate the next set of updates.
+        plpy.execute(
+            """ CREATE TEMP TABLE {oldupdate} AS (SELECT
+                    {src} AS id, {weight},
+                    {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
+                {local_distribution}
+                """.format(**locals()))
+        plpy.execute(
+            """ CREATE TEMP TABLE {newupdate} AS (SELECT
+                    {src} AS id, {weight},
+                    {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
+                {local_distribution}
+                """.format(**locals()))
+
+        # Since HAWQ does not allow us to update, we create a new table and
+        # rename at every iteration.
+        if is_hawq:
+            temp_table = unique_string(desp='temp')
+            sql = """ CREATE TABLE {temp_table} AS (SELECT * FROM {out_table} )
+                {distribution} """
+            plpy.execute(sql.format(**locals()))
+
+        # GPDB and HAWQ have distributed by clauses to help them with indexing.
+        # For Postgres we add the indices manually.
+        if is_platform_pg():
+            plpy.execute("""
+                CREATE INDEX ON {out_table} ({vertex_id});
+                CREATE INDEX ON {oldupdate} (id);
+                CREATE INDEX ON {newupdate} (id);
+            """.format(**locals()))
+
+        # The initialization step is quite different when grouping is involved
+        # since not every group (subgraph) will have the same set of vertices.
+
+        # Example:
+        # Assume there are two grouping columns g1 and g2
+        # g1 values are 0 and 1. g2 values are 5 and 6
+        if grouping_cols is not None:
+
+            distinct_grp_table = unique_string(desp='grp')
+            plpy.execute("DROP TABLE IF EXISTS {distinct_grp_table}".
+                         format(**locals()))
+            plpy.execute("""
+                CREATE TEMP TABLE {distinct_grp_table} AS
+                SELECT DISTINCT {grouping_cols} FROM {edge_table}
+                """.format(**locals()))
+            subq = unique_string(desp='subquery')
+
+            checkg_ds_sub = _check_groups(distinct_grp_table, subq, glist)
+            grp_d_comma = _grp_from_table(distinct_grp_table, glist) + ","
+
+            plpy.execute("""
+                INSERT INTO {out_table}
+                SELECT {grp_d_comma} {vertex_id} AS {vertex_id},
+                    {init_w} AS {weight}, NULL::INT AS parent
+                FROM {distinct_grp_table} INNER JOIN
+                    (
+                    SELECT {src} AS {vertex_id} {comma_grp}
+                    FROM {edge_table}
+                    UNION
+                    SELECT {dest} AS {vertex_id} {comma_grp}
+                    FROM {edge_table}
+                    ) {subq} ON ({checkg_ds_sub})
+                WHERE {vertex_id} IS NOT NULL
+                """.format(**locals()))
+
+            plpy.execute("""
+                INSERT INTO {oldupdate}
+                SELECT {source_vertex}, 0, {source_vertex},
+                       {grouping_cols}
+                FROM {distinct_grp_table}
+                """.format(**locals()))
+
+            # The maximum number of vertices for any group.
+            # Used for determining negative cycles.
+            v_cnt = plpy.execute("""
+                SELECT max(count) as max FROM (
+                    SELECT count({vertex_id}) AS count
+                    FROM {out_table}
+                    GROUP BY {grouping_cols}) x
+                """.format(**locals()))[0]['max']
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
+        else:
+            plpy.execute("""
+                INSERT INTO {out_table}
+                SELECT {vertex_id} AS {vertex_id},
+                    {init_w} AS {weight},
+                    NULL AS parent
+                FROM {vertex_table}
+                WHERE {vertex_id} IS NOT NULL
+                """.format(**locals()))
+
+            # The source can be reached with 0 cost and it has itself as the
+            # parent.
+            plpy.execute("""
+                INSERT INTO {oldupdate}
+                VALUES({source_vertex},0,{source_vertex})
+                """.format(**locals()))
+
+            v_cnt = plpy.execute("""
+                SELECT count(*) FROM {vertex_table}
+                WHERE {vertex_id} IS NOT NULL
+                """.format(**locals()))[0]['count']
+
+        for i in range(0, v_cnt + 1):
+
+            # Apply the updates calculated in the last iteration.
+            if is_hawq:
+                sql = """
+                TRUNCATE TABLE {temp_table};
+                INSERT INTO {temp_table}
+                    SELECT *
+                    FROM {out_table}
+                    WHERE NOT EXISTS (
+                        SELECT 1
+                        FROM {oldupdate} as oldupdate
+                        WHERE {out_table}.{vertex_id} = oldupdate.id
+                        {checkg_oo})
+                    UNION
+                    SELECT {grp_comma} id, {weight}, parent FROM {oldupdate};
+                """
+                plpy.execute(sql.format(**locals()))
+                plpy.execute("DROP TABLE {0}".format(out_table))
+                plpy.execute("ALTER TABLE {0} RENAME TO {1}".
+                             format(temp_table, out_table))
+                sql = """ CREATE TABLE {temp_table} AS (
+                    SELECT * FROM {out_table} LIMIT 0)
+                    {distribution};"""
+                plpy.execute(sql.format(**locals()))
+                ret = plpy.execute("SELECT id FROM {0} LIMIT 1".
+                                   format(oldupdate))
+            else:
+                sql = """
+                UPDATE {out_table} SET
+                    {weight} = oldupdate.{weight},
+                    parent = oldupdate.parent
+                FROM
+                    {oldupdate} AS oldupdate
+                WHERE
+                    {out_table}.{vertex_id} = oldupdate.id AND
+                    {out_table}.{weight} > oldupdate.{weight} {checkg_oo}
+                """
+                ret = plpy.execute(sql.format(**locals()))
+
+            if ret.nrows() == 0:
+                break
+
+            plpy.execute("TRUNCATE TABLE {0}".format(newupdate))
+
+            # 'oldupdate' table has the update info from the last iteration
+
+            # Consider every edge that has an updated source
+            # From these edges:
+            # For every destination vertex, find the min total cost to reach.
+            # Note that, just calling an aggregate function with group by won't
+            # let us store the src field of the edge (needed for the parent).
+            # This is why we need the 'x'; it gives a list of destinations and
+            # associated min values. Using these values, we identify which edge
+            # is selected.
+
+            # Since using '=' with floats is dangerous we use an epsilon value
+            # for comparison.
+
+            # Once we have a list of edges and values (stores as 'message'),
+            # we check if these values are lower than the existing shortest
+            # path values.
+
+            sql = (""" INSERT INTO {newupdate}
+                SELECT DISTINCT ON (message.id {comma_grp})
+                    message.id AS id,
+                    message.{weight} AS {weight},
+                    message.parent AS parent {comma_grp_m}
+                FROM {out_table} AS out_table INNER JOIN
+                    (
+                    SELECT {edge_table}.{dest} AS id, x.{weight} AS {weight},
+                        oldupdate.id AS parent {comma_grp_e}
+                    FROM {oldupdate} AS oldupdate INNER JOIN
+                        {edge_table}  ON
+                            ({edge_table}.{src} = oldupdate.id {checkg_eo})
+                        INNER JOIN
+                        (
+                        SELECT {edge_table}.{dest} AS id,
+                            min(oldupdate.{weight} +
+                                {edge_table}.{weight}) AS {weight} {comma_grp_e}
+                        FROM {oldupdate} AS oldupdate INNER JOIN
+                            {edge_table}  ON
+                            ({edge_table}.{src}=oldupdate.id {checkg_eo})
+                        GROUP BY {edge_table}.{dest} {comma_grp_e}
+                        ) x
+                        ON ({edge_table}.{dest} = x.id {checkg_ex} )
+                    WHERE ABS(oldupdate.{weight} + {edge_table}.{weight}
+                                - x.{weight}) < {EPSILON}
+                    ) message
+                    ON (message.id = out_table.{vertex_id} {checkg_om})
+                WHERE message.{weight}<out_table.{weight}
+                """.format(**locals()))
+
+            plpy.execute(sql)
+
+            # Swap the update tables for the next iteration.
+            tmp = oldupdate
+            oldupdate = newupdate
+            newupdate = tmp
+
+        plpy.execute("DROP TABLE IF EXISTS {0}".format(newupdate))
+        # The algorithm should converge in less than |V| iterations.
+        # Otherwise there is a negative cycle in the graph.
+        if i == v_cnt:
+            if grouping_cols is None:
+                plpy.execute("DROP TABLE IF EXISTS {0},{1},{2}".
+                             format(out_table, out_table + "_summary", oldupdate))
+                if is_hawq:
+                    plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_table))
+                plpy.error("Graph SSSP: Detected a negative cycle in the graph.")
+
+            # It is possible that not all groups has negative cycles.
+            else:
+
+                # negs is the string created by collating grouping columns.
+                # By looking at the oldupdate table we can see which groups
+                # are in a negative cycle.
+
+                negs = plpy.execute(
+                    """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
+                        FROM {oldupdate}
+                    """.format(**locals()))[0]['grp']
+
+                # Delete the groups with negative cycles from the output table.
+                if is_hawq:
+                    sql_del = """
+                        TRUNCATE TABLE {temp_table};
+                        INSERT INTO {temp_table}
+                            SELECT *
+                            FROM {out_table}
+                            WHERE NOT EXISTS(
+                                SELECT 1
+                                FROM {oldupdate} as oldupdate
+                                WHERE {checkg_oo_sub}
+                                );"""
+                    plpy.execute(sql_del.format(**locals()))
+                    plpy.execute("DROP TABLE {0}".format(out_table))
+                    plpy.execute("ALTER TABLE {0} RENAME TO {1}".
+                                 format(temp_table, out_table))
+                else:
+                    sql_del = """ DELETE FROM {out_table}
+                        USING {oldupdate} AS oldupdate
+                        WHERE {checkg_oo_sub}"""
+                    plpy.execute(sql_del.format(**locals()))
+
+                # If every group has a negative cycle,
+                # drop the output table as well.
+                if table_is_empty(out_table):
+                    plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+                                 format(out_table, out_table + "_summary"))
+
+                plpy.warning(
+                    """Graph SSSP: Detected a negative cycle in the """ +
+                    """sub-graphs of following groups: {0}.""".
+                    format(str(negs)[1:-1]))
+
+        plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
+        if is_hawq:
+            plpy.execute("DROP TABLE IF EXISTS {temp_table} ".
+                         format(**locals()))
+    return None
+
 
 def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, path_table,
-	**kwargs):
-	"""
+                        **kwargs):
+    """
     Helper function that can be used to get the shortest path for a vertex
     Args:
         @param sssp_table   Name of the table that contains the SSSP output.
@@ -447,188 +447,187 @@ def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, path_table,
                             desired path.
         @param path_table   Name of the output table that contains the path.
 
-	"""
-	with MinWarning("warning"):
-		_validate_get_path(sssp_table, dest_vertex, path_table)
-
-		temp1_name = unique_string(desp='temp1')
-		temp2_name = unique_string(desp='temp2')
-
-		select_grps = ""
-		check_grps_t1 = ""
-		check_grps_t2 = ""
-		grp_comma = ""
-		tmp = ""
-
-		summary = plpy.execute("SELECT * FROM {0}_summary".format(sssp_table))
-		vertex_id = summary[0]['vertex_id']
-		source_vertex = summary[0]['source_vertex']
-
-		if vertex_id == "NULL":
-			vertex_id = "id"
-
-		grouping_cols = summary[0]['grouping_cols']
-		if grouping_cols == "NULL":
-			grouping_cols = None
-
-		if grouping_cols is not None:
-			glist = split_quoted_delimited_str(grouping_cols)
-			select_grps = _grp_from_table(sssp_table,glist) + " , "
-			check_grps_t1 = " AND " + _check_groups(
-				sssp_table,temp1_name,glist)
-			check_grps_t2 = " AND " + _check_groups(
-				sssp_table,temp2_name,glist)
-
-			grp_comma = grouping_cols + " , "
-
-		if source_vertex == dest_vertex:
-			plpy.execute("""
-				CREATE TABLE {path_table} AS
-				SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
-				FROM {sssp_table} WHERE {vertex_id} = {dest_vertex}
-				""".format(**locals()))
-			return
-
-		plpy.execute( "DROP TABLE IF EXISTS {0},{1}".
-			format(temp1_name,temp2_name));
-		out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
-				SELECT {grp_comma} {sssp_table}.parent AS {vertex_id},
-					ARRAY[{dest_vertex}] AS path
-				FROM {sssp_table}
-				WHERE {vertex_id} = {dest_vertex}
-					AND {sssp_table}.parent IS NOT NULL
-			""".format(**locals()))
-
-		plpy.execute("""
-			CREATE TEMP TABLE {temp2_name} AS
-				SELECT * FROM {temp1_name} LIMIT 0
-			""".format(**locals()))
-
-		# Follow the 'parent' chain until you reach the source.
-		while out.nrows() > 0:
-
-			plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
-			# If the vertex id is not the source vertex,
-			# Add it to the path and move to its parent
-			out = plpy.execute(
-				""" INSERT INTO {temp2_name}
-				SELECT {select_grps} {sssp_table}.parent AS {vertex_id},
-					{sssp_table}.{vertex_id} || {temp1_name}.path AS path
-				FROM {sssp_table} INNER JOIN {temp1_name} ON
-					({sssp_table}.{vertex_id} = {temp1_name}.{vertex_id}
-						{check_grps_t1})
-				WHERE {source_vertex} <> {sssp_table}.{vertex_id}
-				""".format(**locals()))
-
-			tmp = temp2_name
-			temp2_name = temp1_name
-			temp1_name = tmp
-
-			tmp = check_grps_t1
-			check_grps_t1 = check_grps_t2
-			check_grps_t2 = tmp
-
-		# Add the source vertex to the beginning of every path and
-		# add the empty arrays for the groups that don't have a path to reach
-		# the destination vertex
-		plpy.execute("""
-			CREATE TABLE {path_table} AS
-			SELECT {grp_comma} {source_vertex} || path AS path
-			FROM {temp2_name}
-			UNION
-			SELECT {grp_comma} '{{}}'::INT[] AS path
-			FROM {sssp_table}
-			WHERE {vertex_id} = {dest_vertex}
-				AND {sssp_table}.parent IS NULL
-			""".format(**locals()))
-
-		out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
-
-		if out.nrows() == 0:
-			plpy.error(
-				"Graph SSSP: Vertex {0} is not present in the SSSP table {1}".
-				format(dest_vertex,sssp_table))
-
-		plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
-			format(**locals()))
-
-	return None
+    """
+    with MinWarning("warning"):
+        _validate_get_path(sssp_table, dest_vertex, path_table)
+
+        temp1_name = unique_string(desp='temp1')
+        temp2_name = unique_string(desp='temp2')
+
+        select_grps = ""
+        check_grps_t1 = ""
+        check_grps_t2 = ""
+        grp_comma = ""
+        tmp = ""
+
+        summary = plpy.execute("SELECT * FROM {0}_summary".format(sssp_table))
+        vertex_id = summary[0]['vertex_id']
+        source_vertex = summary[0]['source_vertex']
+
+        if vertex_id == "NULL":
+            vertex_id = "id"
+
+        grouping_cols = summary[0]['grouping_cols']
+        if grouping_cols == "NULL":
+            grouping_cols = None
+
+        if grouping_cols is not None:
+            glist = split_quoted_delimited_str(grouping_cols)
+            select_grps = _grp_from_table(sssp_table, glist) + " , "
+            check_grps_t1 = " AND " + _check_groups(
+                sssp_table, temp1_name, glist)
+            check_grps_t2 = " AND " + _check_groups(
+                sssp_table, temp2_name, glist)
+
+            grp_comma = grouping_cols + " , "
+
+        if source_vertex == dest_vertex:
+            plpy.execute("""
+                CREATE TABLE {path_table} AS
+                SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
+                FROM {sssp_table} WHERE {vertex_id} = {dest_vertex}
+                """.format(**locals()))
+            return
+
+        plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+                     format(temp1_name, temp2_name))
+        out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
+                SELECT {grp_comma} {sssp_table}.parent AS {vertex_id},
+                    ARRAY[{dest_vertex}] AS path
+                FROM {sssp_table}
+                WHERE {vertex_id} = {dest_vertex}
+                    AND {sssp_table}.parent IS NOT NULL
+            """.format(**locals()))
+
+        plpy.execute("""
+            CREATE TEMP TABLE {temp2_name} AS
+                SELECT * FROM {temp1_name} LIMIT 0
+            """.format(**locals()))
+
+        # Follow the 'parent' chain until you reach the source.
+        while out.nrows() > 0:
+
+            plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
+            # If the vertex id is not the source vertex,
+            # Add it to the path and move to its parent
+            out = plpy.execute(
+                """ INSERT INTO {temp2_name}
+                SELECT {select_grps} {sssp_table}.parent AS {vertex_id},
+                    {sssp_table}.{vertex_id} || {temp1_name}.path AS path
+                FROM {sssp_table} INNER JOIN {temp1_name} ON
+                    ({sssp_table}.{vertex_id} = {temp1_name}.{vertex_id}
+                        {check_grps_t1})
+                WHERE {source_vertex} <> {sssp_table}.{vertex_id}
+                """.format(**locals()))
+
+            tmp = temp2_name
+            temp2_name = temp1_name
+            temp1_name = tmp
+
+            tmp = check_grps_t1
+            check_grps_t1 = check_grps_t2
+            check_grps_t2 = tmp
+
+        # Add the source vertex to the beginning of every path and
+        # add the empty arrays for the groups that don't have a path to reach
+        # the destination vertex
+        plpy.execute("""
+            CREATE TABLE {path_table} AS
+            SELECT {grp_comma} {source_vertex} || path AS path
+            FROM {temp2_name}
+            UNION
+            SELECT {grp_comma} '{{}}'::INT[] AS path
+            FROM {sssp_table}
+            WHERE {vertex_id} = {dest_vertex}
+                AND {sssp_table}.parent IS NULL
+            """.format(**locals()))
+
+        out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
+
+        if out.nrows() == 0:
+            plpy.error(
+                "Graph SSSP: Vertex {0} is not present in the SSSP table {1}".
+                format(dest_vertex, sssp_table))
+
+        plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
+                     format(**locals()))
+    return None
 
 
 def _validate_sssp(vertex_table, vertex_id, edge_table, edge_params,
-	source_vertex, out_table, glist, **kwargs):
+                   source_vertex, out_table, glist, **kwargs):
 
-	validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
-		out_table,'SSSP')
+    validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
+                          out_table, 'SSSP')
 
-	_assert(isinstance(source_vertex,int),
-		"""Graph SSSP: Source vertex {source_vertex} has to be an integer.""".
-		format(**locals()))
-	src_exists = plpy.execute("""
-		SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
-		""".format(**locals()))
+    _assert(isinstance(source_vertex, int),
+            """Graph SSSP: Source vertex {source_vertex} has to be an integer.""".
+            format(**locals()))
+    src_exists = plpy.execute("""
+        SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
+        """.format(**locals()))
 
-	if src_exists.nrows() == 0:
-		plpy.error(
-			"""Graph SSSP: Source vertex {source_vertex} is not present in the vertex table {vertex_table}.""".
-			format(**locals()))
+    if src_exists.nrows() == 0:
+        plpy.error("Graph SSSP: Source vertex {source_vertex} is not present "
+                   "in the vertex table {vertex_table}.".format(**locals()))
 
-	vt_error = plpy.execute(
-		""" SELECT {vertex_id}
-			FROM {vertex_table}
-			WHERE {vertex_id} IS NOT NULL
-			GROUP BY {vertex_id}
-			HAVING count(*) > 1 """.format(**locals()))
+    vt_error = plpy.execute("""
+        SELECT {vertex_id}
+        FROM {vertex_table}
+        WHERE {vertex_id} IS NOT NULL
+        GROUP BY {vertex_id}
+        HAVING count(*) > 1
+        """.format(**locals()))
 
-	if vt_error.nrows() != 0:
-		plpy.error(
-			"""Graph SSSP: Source vertex table {vertex_table} contains duplicate vertex id's.""".
-			format(**locals()))
+    if vt_error.nrows() != 0:
+        plpy.error("Graph SSSP: Source vertex table {vertex_table} "
+                   "contains duplicate vertex id's.".format(**locals()))
 
-	_assert(not table_exists(out_table+"_summary"),
-		"Graph SSSP: Output summary table already exists!")
+    _assert(not table_exists(out_table + "_summary"),
+            "Graph SSSP: Output summary table already exists!")
 
-	if glist is not None:
-		_assert(columns_exist_in_table(edge_table, glist),
-			"""Graph SSSP: Not all columns from {glist} are present in edge table ({edge_table}).""".
-			format(**locals()))
+    if glist is not None:
+        _assert(columns_exist_in_table(edge_table, glist),
+                "Graph SSSP: Not all columns from {glist} are present in "
+                "edge table ({edge_table}).".format(**locals()))
+    return None
 
-	return None
 
 def _validate_get_path(sssp_table, dest_vertex, path_table, **kwargs):
 
-	_assert(sssp_table and sssp_table.strip().lower() not in ('null', ''),
-		"Graph SSSP: Invalid SSSP table name!")
-	_assert(table_exists(sssp_table),
-		"Graph SSSP: SSSP table ({0}) is missing!".format(sssp_table))
-	_assert(not table_is_empty(sssp_table),
-		"Graph SSSP: SSSP table ({0}) is empty!".format(sssp_table))
+    _assert(sssp_table and sssp_table.strip().lower() not in ('null', ''),
+            "Graph SSSP: Invalid SSSP table name!")
+    _assert(table_exists(sssp_table),
+            "Graph SSSP: SSSP table ({0}) is missing!".format(sssp_table))
+    _assert(not table_is_empty(sssp_table),
+            "Graph SSSP: SSSP table ({0}) is empty!".format(sssp_table))
 
-	summary = sssp_table+"_summary"
-	_assert(table_exists(summary),
-		"Graph SSSP: SSSP summary table ({0}) is missing!".format(summary))
-	_assert(not table_is_empty(summary),
-		"Graph SSSP: SSSP summary table ({0}) is empty!".format(summary))
+    summary = sssp_table + "_summary"
+    _assert(table_exists(summary),
+            "Graph SSSP: SSSP summary table ({0}) is missing!".format(summary))
+    _assert(not table_is_empty(summary),
+            "Graph SSSP: SSSP summary table ({0}) is empty!".format(summary))
 
-	_assert(not table_exists(path_table),
-		"Graph SSSP: Output path table already exists!")
+    _assert(not table_exists(path_table),
+            "Graph SSSP: Output path table already exists!")
+
+    return None
 
-	return None
 
 def graph_sssp_help(schema_madlib, message, **kwargs):
-	"""
-	Help function for graph_sssp and graph_sssp_get_path
-
-	Args:
-		@param schema_madlib
-		@param message: string, Help message string
-		@param kwargs
-
-	Returns:
-	    String. Help/usage information
-	"""
-	if not message:
-		help_string = """
+    """
+    Help function for graph_sssp and graph_sssp_get_path
+
+    Args:
+        @param schema_madlib
+        @param message: string, Help message string
+        @param kwargs
+
+    Returns:
+        String. Help/usage information
+    """
+    if not message:
+        help_string = """
 -----------------------------------------------------------------------
                             SUMMARY
 -----------------------------------------------------------------------
@@ -640,8 +639,8 @@ weights of its constituent edges is minimized.
 For more details on function usage:
     SELECT {schema_madlib}.graph_sssp('usage')
             """
-	elif message.lower() in ['usage', 'help', '?']:
-		help_string = """
+    elif message.lower() in ['usage', 'help', '?']:
+        help_string = """
 Given a graph and a source vertex, single source shortest path (SSSP)
 algorithm finds a path for every vertex such that the sum of the
 weights of its constituent edges is minimized.
@@ -651,8 +650,8 @@ weights of its constituent edges is minimized.
 To retrieve the path for a specific vertex:
 
  SELECT {schema_madlib}.graph_sssp_get_path(
-    sssp_table	TEXT, -- Name of the table that contains the SSSP output.
-    dest_vertex	INT,  -- The vertex that will be the destination of the
+    sssp_table  TEXT, -- Name of the table that contains the SSSP output.
+    dest_vertex INT,  -- The vertex that will be the destination of the
                       -- desired path.
     path_table  TEXT  -- Name of the output table that contains the path.
 );
@@ -679,8 +678,8 @@ every group and has the following columns:
   - path (ARRAY)  : The shortest path from the source vertex (as specified
                   in the SSSP execution) to the destination vertex.
 """
-	elif message.lower() in ("example", "examples"):
-		help_string = """
+    elif message.lower() in ("example", "examples"):
+        help_string = """
 ----------------------------------------------------------------------------
                                 EXAMPLES
 ----------------------------------------------------------------------------
@@ -723,12 +722,12 @@ INSERT INTO edge VALUES
 -- Compute the SSSP:
 DROP TABLE IF EXISTS out;
 SELECT madlib.graph_sssp(
-	'vertex',                            -- Vertex table
-	'id',                                -- Vertix id column
-	'edge',                              -- Edge table
-	'src=src, dest=dest, weight=weight', -- Comma delimted string of edge arguments
-	 0,                                  -- The source vertex
-	'out'                                -- Output table of SSSP
+    'vertex',                            -- Vertex table
+    'id',                                -- Vertix id column
+    'edge',                              -- Edge table
+    'src=src, dest=dest, weight=weight', -- Comma delimted string of edge arguments
+     0,                                  -- The source vertex
+    'out'                                -- Output table of SSSP
 );
 -- View the SSSP costs for every vertex:
 SELECT * FROM out ORDER BY id;
@@ -752,12 +751,14 @@ INSERT INTO edge_gr VALUES
 DROP TABLE IF EXISTS out_gr, out_gr_summary;
 SELECT graph_sssp('vertex',NULL,'edge_gr',NULL,0,'out_gr','grp');
 """
-	else:
-		help_string = "No such option. Use {schema_madlib}.graph_sssp()"
-
-	return help_string.format(schema_madlib=schema_madlib,
-		graph_usage=get_graph_usage(schema_madlib, 'graph_sssp',
-    """source_vertex INT,  -- The source vertex id for the algorithm to start.
-    out_table     TEXT, -- Name of the table to store the result of SSSP.
-    grouping_cols TEXT  -- The list of grouping columns."""))
+    else:
+        help_string = "No such option. Use {schema_madlib}.graph_sssp()"
+
+    common_usage_string = get_graph_usage(
+        schema_madlib, 'graph_sssp',
+        """source_vertex INT,  -- The source vertex id for the algorithm to start.
+            out_table     TEXT, -- Name of the table to store the result of SSSP.
+            grouping_cols TEXT  -- The list of grouping columns.""")
+    return help_string.format(schema_madlib=schema_madlib,
+                              graph_usage=common_usage_string)
 # ---------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/wcc.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/wcc.py_in b/src/ports/postgres/modules/graph/wcc.py_in
index 02cceeb..1f6a81f 100644
--- a/src/ports/postgres/modules/graph/wcc.py_in
+++ b/src/ports/postgres/modules/graph/wcc.py_in
@@ -31,34 +31,37 @@ import plpy
 from utilities.utilities import _assert
 from utilities.utilities import extract_keyvalue_params
 from utilities.utilities import unique_string, split_quoted_delimited_str
-from utilities.validate_args import columns_exist_in_table, get_cols_and_types
-from graph_utils import *
+from utilities.validate_args import columns_exist_in_table
+from utilities.utilities import is_platform_pg, is_platform_hawq
+from graph_utils import validate_graph_coding, get_graph_usage
 
-m4_changequote(`<!', `!>')
 
 def validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table,
-        edge_params, out_table, grouping_cols_list, module_name):
+                      edge_params, out_table, grouping_cols_list, module_name):
     """
     Function to validate input parameters for wcc
     """
     validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
-        out_table, module_name)
+                          out_table, module_name)
     if grouping_cols_list:
         # validate the grouping columns. We currently only support grouping_cols
         # to be column names in the edge_table, and not expressions!
         _assert(columns_exist_in_table(edge_table, grouping_cols_list, schema_madlib),
-                "Weakly Connected Components error: One or more grouping columns specified do not exist!")
+                "Weakly Connected Components error: "
+                "One or more grouping columns specified do not exist!")
 
 
 def prefix_tablename_to_colnames(table, cols_list):
     return ' , '.join(["{0}.{1}".format(table, col) for col in cols_list])
 
+
 def get_where_condition(table1, table2, cols_list):
     return ' AND '.join(['{0}.{2}={1}.{2}'.format(table1, table2, col)
-            for col in cols_list])
+                        for col in cols_list])
+
 
 def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
-    out_table, grouping_cols, **kwargs):
+        out_table, grouping_cols, **kwargs):
     """
     Function that computes the wcc
 
@@ -78,8 +81,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
     plpy.execute('SET client_min_messages TO warning')
     params_types = {'src': str, 'dest': str}
     default_args = {'src': 'src', 'dest': 'dest'}
-    edge_params = extract_keyvalue_params(edge_args,
-            params_types, default_args)
+    edge_params = extract_keyvalue_params(edge_args, params_types, default_args)
 
     # populate default values for optional params if null
     if vertex_id is None:
@@ -89,7 +91,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
 
     grouping_cols_list = split_quoted_delimited_str(grouping_cols)
     validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table,
-        edge_params, out_table, grouping_cols_list, 'Weakly Connected Components')
+                      edge_params, out_table, grouping_cols_list,
+                      'Weakly Connected Components')
     src = edge_params["src"]
     dest = edge_params["dest"]
 
@@ -99,21 +102,22 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
     toupdate = unique_string(desp='toupdate')
     temp_out_table = unique_string(desp='tempout')
 
-    distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-        <!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
+    distribution = '' if is_platform_pg() else "DISTRIBUTED BY ({0})".format(vertex_id)
     subq_prefixed_grouping_cols = ''
     comma_toupdate_prefixed_grouping_cols = ''
     comma_oldupdate_prefixed_grouping_cols = ''
     old_new_update_where_condition = ''
     new_to_update_where_condition = ''
     edge_to_update_where_condition = ''
-    is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+    is_hawq = is_platform_hawq()
 
     INT_MAX = 2147483647
     component_id = 'component_id'
+    grouping_cols_comma = '' if not grouping_cols else grouping_cols + ','
+
     if grouping_cols:
-        distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-        <!"DISTRIBUTED BY ({0},{1})".format(grouping_cols, vertex_id)!>)
+        distribution = ('' if is_platform_pg() else
+                        "DISTRIBUTED BY ({0}, {1})".format(grouping_cols, vertex_id))
         # Update some variables useful for grouping based query strings
         subq = unique_string(desp='subquery')
         distinct_grp_table = unique_string(desp='grptable')
@@ -121,18 +125,20 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
                 CREATE TABLE {distinct_grp_table} AS
                 SELECT DISTINCT {grouping_cols} FROM {edge_table}
             """.format(**locals()))
-        comma_toupdate_prefixed_grouping_cols = ', ' + prefix_tablename_to_colnames(toupdate,
-            grouping_cols_list)
-        comma_oldupdate_prefixed_grouping_cols = ', ' + prefix_tablename_to_colnames(
-            oldupdate, grouping_cols_list)
-        subq_prefixed_grouping_cols = prefix_tablename_to_colnames(subq,
-            grouping_cols_list)
-        old_new_update_where_condition = ' AND ' + get_where_condition(
-            oldupdate, newupdate, grouping_cols_list)
-        new_to_update_where_condition = ' AND ' + get_where_condition(
-            newupdate, toupdate, grouping_cols_list)
-        edge_to_update_where_condition = ' AND ' + get_where_condition(
-            edge_table, toupdate, grouping_cols_list)
+
+        pttc = prefix_tablename_to_colnames
+        gwc = get_where_condition
+
+        comma_toupdate_prefixed_grouping_cols = ', ' + pttc(toupdate, grouping_cols_list)
+        comma_oldupdate_prefixed_grouping_cols = ', ' + pttc(oldupdate, grouping_cols_list)
+        subq_prefixed_grouping_cols = pttc(subq, grouping_cols_list)
+        old_new_update_where_condition = ' AND ' + gwc(oldupdate, newupdate, grouping_cols_list)
+        new_to_update_where_condition = ' AND ' + gwc(newupdate, toupdate, grouping_cols_list)
+        edge_to_update_where_condition = ' AND ' + gwc(edge_table, toupdate, grouping_cols_list)
+        join_grouping_cols = gwc(subq, distinct_grp_table, grouping_cols_list)
+        group_by_clause = ('' if not grouping_cols else
+                           '{0}, {1}.{2}'.format(subq_prefixed_grouping_cols,
+                                                 subq, vertex_id))
         plpy.execute("""
                 CREATE TABLE {newupdate} AS
                 SELECT {subq}.{vertex_id},
@@ -148,13 +154,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
                 ON {join_grouping_cols}
                 GROUP BY {group_by_clause}
                 {distribution}
-            """.format(select_grouping_cols=','+subq_prefixed_grouping_cols,
-                join_grouping_cols=get_where_condition(subq,
-                    distinct_grp_table, grouping_cols_list),
-                group_by_clause='' if not grouping_cols else
-                    subq_prefixed_grouping_cols+', {0}.{1}'.format(subq, vertex_id),
-                select_grouping_cols_clause='' if not grouping_cols else
-                    grouping_cols+', ', **locals()))
+            """.format(select_grouping_cols=',' + subq_prefixed_grouping_cols,
+                       select_grouping_cols_clause=grouping_cols_comma,
+                       **locals()))
         plpy.execute("""
                 CREATE TEMP TABLE {message} AS
                 SELECT {vertex_id},
@@ -162,8 +164,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
                         {select_grouping_cols_clause}
                 FROM {newupdate}
                 {distribution}
-            """.format(select_grouping_cols_clause='' if not grouping_cols else
-                    ', '+grouping_cols, **locals()))
+            """.format(select_grouping_cols_clause=grouping_cols_comma,
+                       **locals()))
     else:
         plpy.execute("""
                 CREATE TABLE {newupdate} AS
@@ -186,13 +188,14 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
             """.format(**locals()))
     nodes_to_update = 1
     while nodes_to_update > 0:
-        # This idea here is simple. Look at all the neighbors of a node, and
-        # assign the smallest node id among the neighbors as its component_id.
-        # The next table starts off with very high component_id (INT_MAX). The
-        # component_id of all nodes which obtain a smaller component_id after
-        # looking at its neighbors are updated in the next table. At every
-        # iteration update only those nodes whose component_id in the previous
-        # iteration are greater than what was found in the current iteration.
+        # Look at all the neighbors of a node, and assign the smallest node id
+        # among the neighbors as its component_id. The next table starts off
+        # with very high component_id (INT_MAX). The component_id of all nodes
+        # which obtain a smaller component_id after looking at its neighbors are
+        # updated in the next table. At every iteration update only those nodes
+        # whose component_id in the previous iteration are greater than what was
+        # found in the current iteration.
+
         plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
         plpy.execute("""
             CREATE TEMP TABLE {oldupdate} AS
@@ -202,10 +205,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
             FROM {message}
             GROUP BY {group_by_clause} {vertex_id}
             {distribution}
-        """.format(grouping_cols_select='' if not grouping_cols else
-                ', {0}'.format(grouping_cols), group_by_clause=''
-                if not grouping_cols else '{0}, '.format(grouping_cols),
-                **locals()))
+        """.format(grouping_cols_select='' if not grouping_cols else ', {0}'.format(grouping_cols),
+                   group_by_clause=grouping_cols_comma,
+                   **locals()))
 
         plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate))
         plpy.execute("""
@@ -236,8 +238,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
                         SELECT * FROM {toupdate};
                 """.format(**locals()))
             plpy.execute("DROP TABLE {0}".format(newupdate))
-            plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(temp_out_table,
-                            newupdate))
+            plpy.execute("ALTER TABLE {0} RENAME TO {1}".
+                         format(temp_out_table, newupdate))
             plpy.execute("""
                     CREATE TABLE {temp_out_table} AS
                     SELECT * FROM {newupdate}
@@ -275,9 +277,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
             ) AS t
             GROUP BY {group_by_clause} {vertex_id}
         """.format(select_grouping_cols='' if not grouping_cols
-                else ', {0}'.format(grouping_cols), group_by_clause=''
-                if not grouping_cols else ' {0}, '.format(grouping_cols),
-                **locals()))
+                   else ', {0}'.format(grouping_cols), group_by_clause=''
+                   if not grouping_cols else ' {0}, '.format(grouping_cols),
+                   **locals()))
 
         plpy.execute("DROP TABLE {0}".format(oldupdate))
         if grouping_cols:
@@ -300,6 +302,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
     if is_hawq:
         plpy.execute("""DROP TABLE IF EXISTS {0}""".format(temp_out_table))
 
+
 def wcc_help(schema_madlib, message, **kwargs):
     """
     Help function for wcc
@@ -315,11 +318,13 @@ def wcc_help(schema_madlib, message, **kwargs):
     if message is not None and \
             message.lower() in ("usage", "help", "?"):
         help_string = "Get from method below"
-        help_string = get_graph_usage(schema_madlib, 'Weakly Connected Components',
+        help_string = get_graph_usage(
+            schema_madlib,
+            'Weakly Connected Components',
             """out_table     TEXT, -- Output table of weakly connected components
-    grouping_col  TEXT -- Comma separated column names to group on
-                       -- (DEFAULT = NULL, no grouping)
-""")
+            grouping_col  TEXT -- Comma separated column names to group on
+                               -- (DEFAULT = NULL, no grouping)
+            """)
     else:
         if message is not None and \
                 message.lower() in ("example", "examples"):

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/utilities/utilities.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/utilities.py_in b/src/ports/postgres/modules/utilities/utilities.py_in
index 6a5e8f9..b28a5f3 100644
--- a/src/ports/postgres/modules/utilities/utilities.py_in
+++ b/src/ports/postgres/modules/utilities/utilities.py_in
@@ -14,32 +14,43 @@ if __name__ != "__main__":
 m4_changequote(`<!', `!>')
 
 
+def has_function_properties():
+    """ __HAS_FUNCTION_PROPERTIES__ variable defined during configure """
+    return m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!True!>, <!False!>)
+
+
+def is_platform_pg():
+    """ __POSTGRESQL__ variable defined during configure """
+    return m4_ifdef(<!__POSTGRESQL__!>, <!True!>, <!False!>)
+# ------------------------------------------------------------------------------
+
+
+def is_platform_hawq():
+    """ __HAWQ__ variable defined during configure """
+    return m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+# ------------------------------------------------------------------------------
+
+
 def get_seg_number():
     """ Find out how many primary segments exist in the distribution
         Might be useful for partitioning data.
     """
-    m4_ifdef(<!__POSTGRESQL__!>, <!return 1!>, <!
-    return plpy.execute(
-        """
-        SELECT count(*) from gp_segment_configuration
-        WHERE role = 'p'
-        """)[0]['count']
-    !>)
+    if is_platform_pg():
+        return 1
+    else:
+        return plpy.execute("""
+            SELECT count(*) from gp_segment_configuration
+            WHERE role = 'p'
+            """)[0]['count']
 # ------------------------------------------------------------------------------
 
 
 def is_orca():
-    m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!
-    optimizer = plpy.execute("show optimizer")[0]["optimizer"]
-    return True if optimizer == 'on' else False
-    !>, <!
+    if has_function_properties():
+        optimizer = plpy.execute("show optimizer")[0]["optimizer"]
+        if optimizer == 'on':
+            return True
     return False
-    !>)
-# ------------------------------------------------------------------------------
-
-
-def is_platform_pg():
-    return m4_ifdef(<!__POSTGRESQL__!>, <!True!>, <!False!>)
 # ------------------------------------------------------------------------------