You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ok...@apache.org on 2017/08/03 23:15:51 UTC

incubator-madlib git commit: Bugfix: Fix BFS performance issues

Repository: incubator-madlib
Updated Branches:
  refs/heads/master dc0a88ba4 -> 22fd88e42


Bugfix: Fix BFS performance issues

Some queries used in BFS implementation were not scaling well with
the input graph size. This commit includes changes to those queries
to improve BFS's performance.

Additional author: Orhan Kislal <ok...@apache.org>

Closes #159


Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/22fd88e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/22fd88e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/22fd88e4

Branch: refs/heads/master
Commit: 22fd88e42383cb670ab30355d848a0b0d2ed86f4
Parents: dc0a88b
Author: Nandish Jayaram <nj...@apache.org>
Authored: Fri Jul 28 11:23:15 2017 -0700
Committer: Orhan Kislal <ok...@pivotal.io>
Committed: Thu Aug 3 12:02:04 2017 -0700

----------------------------------------------------------------------
 src/ports/postgres/modules/graph/bfs.py_in      | 160 ++++++++++---------
 src/ports/postgres/modules/graph/bfs.sql_in     |   3 +-
 .../postgres/modules/graph/test/bfs.sql_in      |   4 +-
 3 files changed, 88 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/22fd88e4/src/ports/postgres/modules/graph/bfs.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/bfs.py_in b/src/ports/postgres/modules/graph/bfs.py_in
index ea03cf1..c9aa981 100644
--- a/src/ports/postgres/modules/graph/bfs.py_in
+++ b/src/ports/postgres/modules/graph/bfs.py_in
@@ -30,12 +30,13 @@
 import plpy
 from graph_utils import validate_graph_coding
 from graph_utils import get_graph_usage
-from graph_utils import _grp_null_checks
+from graph_utils import _grp_from_table
+from graph_utils import _grp_null_checks, _check_groups
 from utilities.control import MinWarning
 from utilities.utilities import _assert
 from utilities.utilities import add_postfix
 from utilities.utilities import extract_keyvalue_params
-from utilities.utilities import split_quoted_delimited_str
+from utilities.utilities import unique_string, split_quoted_delimited_str
 from utilities.validate_args import table_exists
 from utilities.validate_args import columns_exist_in_table
 
@@ -168,13 +169,24 @@ def graph_bfs(schema_madlib, vertex_table, vertex_id, edge_table,
         _validate_bfs(vertex_table, vertex_id, edge_table,
             edge_params, source_vertex, out_table, max_distance, directed, glist)
 
+        subq = unique_string(desp='subq')
+        subq1 = unique_string(desp='subq1')
+        sube = unique_string(desp='edge')
+        sube1 = unique_string(desp='edge1')
+
         # Initialize grouping related variables
+        insert_qry_undirected_init = ""
         grp_comma = ""
         and_grp_null_checks = ""
+        grp_sube_comma = ""
+        grp_sube1_comma = ""
+        subq_grp_join = ''
 
         if grouping_cols and grouping_cols is not '':
             grp_comma = grouping_cols + ", "
             and_grp_null_checks = " AND " + _grp_null_checks(glist)
+            grp_sube_comma = _grp_from_table(sube, glist) + " , "
+            grp_sube1_comma = _grp_from_table(sube1, glist) + " , "
 
         # We keep a table of every vertex, the distance to that vertex from source
         # and the parent in the path to the vertex.
@@ -209,58 +221,24 @@ def graph_bfs(schema_madlib, vertex_table, vertex_id, edge_table,
                 max_distance            INTEGER,
                 directed                BOOLEAN,
                 grouping_cols           TEXT
-            )""".format(**locals()))
+            )
+        """.format(**locals()))
 
         plpy.execute("""
             INSERT INTO {summary_table} VALUES
                 ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
                 {source_vertex}, '{out_table}', {d_st}, {dir_st}, '{g_st}')
-            """.format(**locals()))
-
+        """.format(**locals()))
 
         # The queries for directed and undirected graphs share a common section.
         # There are additional clauses added to the undirected graph queries.
         # In the undirected case edges can be considered to go from {src} to
         # {dest} and {dest} to {src}
 
-        insert_qry_undirected_init = ""
-        count_qry_undirected = ""
-        insert_qry_undirected_loop = ""
-
         if not directed:
             insert_qry_undirected_init = """ OR {dest} = {source_vertex}
                 """.format(**locals())
 
-            count_qry_undirected = """ OR (
-                    ({grp_comma} {dest}) IN (
-                        SELECT {grp_comma} {vertex_id} FROM {out_table}
-                        WHERE {dist_col}={{curr_dist_val}}
-                    )
-                    AND
-                    ({grp_comma} {src}) NOT IN (
-                        SELECT {grp_comma} {vertex_id} FROM {out_table}
-                    )
-                )
-            """.format(**locals())
-
-            insert_qry_undirected_loop = """ UNION
-                SELECT {grp_comma}
-                    {src} AS {vertex_id},
-                    {{curr_dist_val}}+1 AS {dist_col},
-                    {dest} AS {parent_col}
-                FROM {edge_table}
-                WHERE (
-                    ({grp_comma} {dest}) IN (
-                        SELECT {grp_comma} {vertex_id} FROM {out_table}
-                        WHERE {dist_col}={{curr_dist_val}}
-                    )
-                    AND
-                    ({grp_comma} {src}) NOT IN (
-                        SELECT {grp_comma} {vertex_id} FROM {out_table}
-                    )
-                )
-            """.format(**locals())
-
         # This step inserts into the output table the source vertex for each
         # group in which it is present. Grouping behavior is not predictable
         # when there are NULLs in any grouping column. Therefore those rows
@@ -278,51 +256,79 @@ def graph_bfs(schema_madlib, vertex_table, vertex_id, edge_table,
         """.format(**locals())
         plpy.execute(insert_qry_init.format(**locals()))
 
+        # Create a table that will hold the new vertices to be explored next.
+        message = unique_string(desp='message')
+        plpy.execute("""
+            CREATE TEMP TABLE {message} AS
+            SELECT {grp_comma} {vertex_id}, {parent_col}
+            FROM {out_table}
+        """.format(**locals()))
+
         # After initialization of the output table, number of nodes connected
         # by edges to the source vertex in each group is counted. This is also used
         # below in the BFS iteration while-loop
-        count_qry = """
-            SELECT COUNT(*)
-            FROM {edge_table}
-            WHERE (
-                ({grp_comma} {src}) IN (
-                    SELECT {grp_comma} {vertex_id} FROM {out_table}
-                    WHERE {dist_col}={{curr_dist_val}}
-                )
-                AND
-                ({grp_comma} {dest}) NOT IN (
-                    SELECT {grp_comma} {vertex_id} FROM {out_table}
-                )
-            ) {count_qry_undirected}
-        """.format(**locals())
-
+        edge_grp_join = ""
+        subq1_grp_join = ""
+        if grouping_cols:
+            subq_grp_join = ' AND ' + _check_groups(subq, sube, glist)
+            subq1_grp_join = ' AND ' + _check_groups(subq1, sube1, glist)
+            edge_grp_join = ' AND ' + _check_groups(edge_table, out_table, glist)
+
+        count_qry = """ SELECT count(*) AS count FROM {message}
+            """.format(**locals())
         vct = plpy.execute(count_qry.format(**locals()))[0]['count']
 
         # This insert statement is executed within the BFS iteration while-loop
         # below. It is used to discover and store all nodes (not already found)
-        # connected to those found in the immediate previous iteration.
-        insert_qry_loop = """
-            INSERT INTO {out_table}
-            SELECT {grp_comma} {vertex_id}, {dist_col}, min({parent_col})
+        # connected to those found in the immediate previous iteration, which
+        # are stored in the {message} table.
+        toupdate = unique_string(desp='toupdate')
+        insert_toupdate_table = """
+            CREATE TEMP TABLE {toupdate} AS
+            SELECT {grp_sube_comma} {dest} AS {vertex_id}, {src} AS {parent_col}
             FROM (
-                SELECT {grp_comma}
-                    {dest} AS {vertex_id},
-                    {{curr_dist_val}}+1 AS {dist_col},
-                    {src} AS {parent_col}
+                SELECT {grp_comma} {src}, {dest}
                 FROM {edge_table}
-                WHERE (
-                    ({grp_comma} {src}) IN (
-                        SELECT {grp_comma} {vertex_id} FROM {out_table}
-                        WHERE {dist_col}={{curr_dist_val}}
-                    )
-                    AND
-                    ({grp_comma} {dest}) NOT IN (
-                        SELECT {grp_comma} {vertex_id} FROM {out_table}
-                    )
+                WHERE NOT EXISTS (
+                    SELECT 1
+                    FROM {out_table}
+                    WHERE {out_table}.{vertex_id} = {edge_table}.{dest}
+                            {edge_grp_join}
                 )
-                {insert_qry_undirected_loop}
-            ) t1
-            GROUP BY {grp_comma} {vertex_id}, {dist_col}
+            ) AS {sube}
+            INNER JOIN {message} AS {subq}
+            ON ({sube}.{src}={subq}.{vertex_id} {subq_grp_join})
+        """
+        if not directed:
+            insert_toupdate_table += """
+                UNION ALL
+                SELECT {grp_sube1_comma} {src} AS {vertex_id},
+                        {dest} AS {parent_col}
+                FROM (
+                    SELECT {grp_comma} {src}, {dest}
+                    FROM {edge_table}
+                    WHERE NOT EXISTS (
+                        SELECT 1
+                        FROM {out_table}
+                        WHERE {out_table}.{vertex_id} = {edge_table}.{src}
+                            {edge_grp_join}
+                    )
+                ) AS {sube1}
+                INNER JOIN {message} AS {subq1}
+                ON ({sube1}.{dest}={subq1}.{vertex_id} {subq1_grp_join})
+            """.format(**locals())
+
+        insert_message_loop = """
+            CREATE TEMP TABLE {message} AS
+            SELECT {grp_comma} {vertex_id}, {{curr_dist_val}}+1 AS {dist_col},
+                    MIN({parent_col})
+            FROM {toupdate}
+            GROUP BY {grp_comma} {vertex_id}
+        """.format(**locals())
+
+        insert_qry_loop = """
+            INSERT INTO {out_table}
+            SELECT * FROM {message}
         """.format(**locals())
 
         # Main loop for traversing the graph
@@ -353,6 +359,12 @@ def graph_bfs(schema_madlib, vertex_table, vertex_id, edge_table,
 
             # Discover and store all nodes (not already found) connected to
             # those found in the immediate previous iteration
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate))
+            plpy.execute(insert_toupdate_table.format(**locals()))
+
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(message))
+            plpy.execute(insert_message_loop.format(**locals()))
+
             plpy.execute(insert_qry_loop.format(**locals()))
 
             # Update distance value for next iteration

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/22fd88e4/src/ports/postgres/modules/graph/bfs.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/bfs.sql_in b/src/ports/postgres/modules/graph/bfs.sql_in
index f4e8edc..067c8b6 100644
--- a/src/ports/postgres/modules/graph/bfs.sql_in
+++ b/src/ports/postgres/modules/graph/bfs.sql_in
@@ -178,8 +178,7 @@ INSERT INTO edge VALUES
 (8, 9),
 (9, 10),
 (9, 11),
-(10, 8)
-;
+(10, 8);
 </pre>
 
 -# Traverse undirected graph from vertex 3:

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/22fd88e4/src/ports/postgres/modules/graph/test/bfs.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/test/bfs.sql_in b/src/ports/postgres/modules/graph/test/bfs.sql_in
index 223ab84..a035eb3 100644
--- a/src/ports/postgres/modules/graph/test/bfs.sql_in
+++ b/src/ports/postgres/modules/graph/test/bfs.sql_in
@@ -96,9 +96,7 @@ INSERT INTO edge_grp VALUES
 (NULL, 'b', 8, 9, 1),
 (NULL, 'b', 9, 10, 1),
 (NULL, 'b', 9, 11, 1),
-(NULL, 'b', 10, 8, 1)
-;
-;
+(NULL, 'b', 10, 8, 1);
 
 ---------------------
 -- Undirected Graph