You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@madlib.apache.org by GitBox <gi...@apache.org> on 2020/07/29 07:32:45 UTC

[GitHub] [madlib] orhankislal commented on a change in pull request #505: DBSCAN: Add indexing optimizations to improve the runtime

orhankislal commented on a change in pull request #505:
URL: https://github.com/apache/madlib/pull/505#discussion_r461897855



##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table, source_table, id_column,
             """.format(**locals())
         result = plpy.execute(sql)
 
+def rtree_transition(state, id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id, **kwargs):
+
+    SD = kwargs['SD']
+    if not state:
+        data = {}
+        SD['counter{0}'.format(leaf_id)] = 0
+    else:
+        data = SD['data{0}'.format(leaf_id)]
+
+    data[id_in] = expr_points
+    SD['counter{0}'.format(leaf_id)] = SD['counter{0}'.format(leaf_id)]+1
+    SD['data{0}'.format(leaf_id)] = data
+    ret = [[-1,-1],[-1,-1]]
+
+    my_n_rows = n_rows[leaf_id]
+
+    if SD['counter{0}'.format(leaf_id)] == my_n_rows:
+
+        core_counts = {}
+        core_lists = {}
+        p = index.Property()
+        p.dimension = len(expr_points)
+        idx = index.Index(properties=p)
+        ret = []
+
+        if metric == 'dist_norm1':
+            fn_dist = distance.cityblock
+        elif metric == 'dist_norm2':
+            fn_dist = distance.euclidean
+        else:
+            fn_dist = distance.sqeuclidean
+
+        for key1, value1 in data.items():
+            idx.add(key1,value1+value1,key1)

Review comment:
       That will require transferring the rtree structure from one step to the next. We can use pickle to get a bytea but I think it would take some additional time to pickle/unpickle the whole tree for each row. 

##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -26,81 +26,288 @@ from utilities.utilities import add_postfix
 from utilities.utilities import NUMERIC, ONLY_ARRAY
 from utilities.utilities import is_valid_psql_type
 from utilities.utilities import is_platform_pg
+from utilities.utilities import num_features
+from utilities.utilities import get_seg_number
 from utilities.validate_args import input_tbl_valid, output_tbl_valid
 from utilities.validate_args import is_var_valid
 from utilities.validate_args import cols_in_tbl_valid
 from utilities.validate_args import get_expr_type
 from utilities.validate_args import get_algorithm_name
 from graph.wcc import wcc
 
+from math import log
+from math import floor
+from math import sqrt
+
+from scipy.spatial import distance
+
+try:
+    from rtree import index
+    from rtree.index import Rtree
+except ImportError:
+    RTREE_ENABLED=0
+else:
+    RTREE_ENABLED=1
+
 BRUTE_FORCE = 'brute_force'
 KD_TREE = 'kd_tree'
+DEFAULT_MIN_SAMPLES = 5
+DEFAULT_KD_DEPTH = 3
+DEFAULT_METRIC = 'squared_dist_norm2'
 
-def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, eps, min_samples, metric, algorithm, **kwargs):
+def dbscan(schema_madlib, source_table, output_table, id_column, expr_point,
+           eps, min_samples, metric, algorithm, depth, **kwargs):
 
     with MinWarning("warning"):
 
-        min_samples = 5 if not min_samples else min_samples
-        metric = 'squared_dist_norm2' if not metric else metric
-        algorithm = 'brute' if not algorithm else algorithm
+        min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples
+        metric = DEFAULT_METRIC if not metric else metric
+        algorithm = BRUTE_FORCE if not algorithm else algorithm
+        depth = DEFAULT_KD_DEPTH if not depth else depth
 
         algorithm = get_algorithm_name(algorithm, BRUTE_FORCE,
             [BRUTE_FORCE, KD_TREE], 'DBSCAN')
 
         _validate_dbscan(schema_madlib, source_table, output_table, id_column,
-                         expr_point, eps, min_samples, metric, algorithm)
+                         expr_point, eps, min_samples, metric, algorithm, depth)
 
         dist_src_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY (__src__)'
         dist_id_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY ({0})'.format(id_column)
         dist_reach_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY (__reachable_id__)'
+        dist_leaf_sql = ''  if is_platform_pg() else 'DISTRIBUTED BY (__leaf_id__)'
 
-        # Calculate pairwise distances
+        core_points_table = unique_string(desp='core_points_table')
+        core_edge_table = unique_string(desp='core_edge_table')
         distance_table = unique_string(desp='distance_table')
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(distance_table))
+        plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(
+            core_points_table, core_edge_table, distance_table))
+
+        if algorithm == KD_TREE:
+            cur_source_table, border_table1, border_table2 = dbscan_kd(
+                schema_madlib, source_table, id_column, expr_point, eps,
+                min_samples, metric, depth)
+
+            kd_join_clause = "AND __t1__.__leaf_id__ = __t2__.__leaf_id__ "
+
+            sql = """
+                SELECT count(*), __leaf_id__ FROM {cur_source_table} GROUP BY __leaf_id__
+                """.format(**locals())
+            result = plpy.execute(sql)
+            rt_counts_dict = {}
+            for i in result:
+                rt_counts_dict[i['__leaf_id__']] = int(i['count'])
+            rt_counts_list = []
+            for i in sorted(rt_counts_dict):
+                rt_counts_list.append(rt_counts_dict[i])
+
+            leaf_id_start = pow(2,depth)-1
+
+            rtree_step_table = unique_string(desp='rtree_step_table')
+            rt_edge_table = unique_string(desp='rt_edge_table')
+            rt_core_points_table = unique_string(desp='rt_core_points_table')
+            border_core_points_table = unique_string(desp='border_core_points_table')
+            border_edge_table = unique_string(desp='border_edge_table')
+            plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}, {4}".format(
+                rtree_step_table, rt_edge_table, rt_core_points_table,
+                border_core_points_table, border_edge_table))
+
+            sql = """
+            CREATE TABLE {rtree_step_table} AS
+            SELECT __leaf_id__,
+                   {schema_madlib}.rtree_step( {id_column},
+                                               {expr_point},
+                                               {eps},
+                                               {min_samples},
+                                               '{metric}',
+                                               ARRAY{rt_counts_list},
+                                               __leaf_id__
+                                               )
+            FROM {cur_source_table} GROUP BY __leaf_id__

Review comment:
       That would effectively make the kd-tree unusable. We group by the leaf_id because we assume that we don't have to calculate the distances across leaves (except the points on the borders). The same cannot be said if we group by gp_segment_id. If you want to group by segments you'll need a completely new merge function. I did consider it but I believe the potential complexity is very high.

##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table, source_table, id_column,
             """.format(**locals())
         result = plpy.execute(sql)
 
+def rtree_transition(state, id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id, **kwargs):
+
+    SD = kwargs['SD']
+    if not state:
+        data = {}
+        SD['counter{0}'.format(leaf_id)] = 0
+    else:
+        data = SD['data{0}'.format(leaf_id)]
+
+    data[id_in] = expr_points
+    SD['counter{0}'.format(leaf_id)] = SD['counter{0}'.format(leaf_id)]+1
+    SD['data{0}'.format(leaf_id)] = data
+    ret = [[-1,-1],[-1,-1]]
+
+    my_n_rows = n_rows[leaf_id]
+
+    if SD['counter{0}'.format(leaf_id)] == my_n_rows:
+
+        core_counts = {}
+        core_lists = {}
+        p = index.Property()
+        p.dimension = len(expr_points)

Review comment:
       We use `expr_points` in a number of modules and wanted to keep it same for consistency. I think changing the name just for transition function would be somewhat confusing.

##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -209,8 +521,187 @@ def dbscan_predict(schema_madlib, dbscan_table, source_table, id_column,
             """.format(**locals())
         result = plpy.execute(sql)
 
+def rtree_transition(state, id_in, expr_points, eps, min_samples, metric, n_rows, leaf_id, **kwargs):

Review comment:
       Going through the points in BFS order might help with the caching. We can try it as a further improvement.
   Regarding pruning: The rtree is irrelevant at this point, two points are either linked or not. We can merge these two operations together but there are two drawbacks.
   1. We don't have the information from border points. They might connect multiple clusters that seemed separate.
   2. The existing snowflake implementation is completely parallel. This one is dependent on the number of leaves.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org