You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ri...@apache.org on 2018/02/09 04:07:57 UTC
[2/2] madlib git commit: KNN: Add window for distinct on + other
changes
KNN: Add window for distinct on + other changes
This commit includes a few fixes:
1. 'DISTINCT ON' requires a window with partition to give the rows
corresponding to a distinct value. This was added to compute the label
with highest weighted votes.
2. Zero distances led to divide-by-zero issues during the weigthing
process. This was fixed by replacing the inverse of the distance with a
pre-defined large number.
3. Other changes include changing error messages and code cleanup.
Co-authored-by: Nandish Jayaram <nj...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/madlib/commit/7c6fea20
Tree: http://git-wip-us.apache.org/repos/asf/madlib/tree/7c6fea20
Diff: http://git-wip-us.apache.org/repos/asf/madlib/diff/7c6fea20
Branch: refs/heads/master
Commit: 7c6fea20bac49ee75d1c68cef5f1248e6bcc78b1
Parents: f6aba42
Author: Rahul Iyer <ri...@apache.org>
Authored: Thu Feb 8 20:02:09 2018 -0800
Committer: Rahul Iyer <ri...@apache.org>
Committed: Thu Feb 8 20:02:09 2018 -0800
----------------------------------------------------------------------
src/ports/postgres/modules/knn/knn.py_in | 216 +++++++++++++++-----------
1 file changed, 121 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/madlib/blob/7c6fea20/src/ports/postgres/modules/knn/knn.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/knn/knn.py_in b/src/ports/postgres/modules/knn/knn.py_in
index b9e7916..cfd93d9 100644
--- a/src/ports/postgres/modules/knn/knn.py_in
+++ b/src/ports/postgres/modules/knn/knn.py_in
@@ -20,7 +20,7 @@
"""
@file knn.py_in
-@brief knn: Driver functions
+@brief knn: K-Nearest Neighbors for regression and classification
@namespace knn
@@ -32,9 +32,12 @@ from utilities.validate_args import cols_in_tbl_valid
from utilities.validate_args import is_col_array
from utilities.validate_args import array_col_has_no_null
from utilities.validate_args import get_expr_type
+from utilities.utilities import _assert
from utilities.utilities import unique_string
from utilities.control import MinWarning
+MAX_WEIGHT_ZERO_DIST = 1e6
+
def knn_validate_src(schema_madlib, point_source, point_column_name, point_id,
label_column_name, test_source, test_column_name,
@@ -43,21 +46,22 @@ def knn_validate_src(schema_madlib, point_source, point_column_name, point_id,
input_tbl_valid(point_source, 'kNN')
input_tbl_valid(test_source, 'kNN')
output_tbl_valid(output_table, 'kNN')
- if label_column_name is not None and label_column_name != '':
- cols_in_tbl_valid(
- point_source,
- (label_column_name,
- point_column_name),
- 'kNN')
+
+ _assert(label_column_name or output_neighbors,
+ "kNN error: Either label_column_name or "
+ "output_neighbors has to be inputed.")
+
+ if label_column_name and label_column_name.strip():
+ cols_in_tbl_valid(point_source, [label_column_name], 'kNN')
cols_in_tbl_valid(point_source, (point_column_name, point_id), 'kNN')
cols_in_tbl_valid(test_source, (test_column_name, test_id), 'kNN')
if not is_col_array(point_source, point_column_name):
plpy.error("kNN Error: Feature column '{0}' in train table is not"
- " an array.").format(point_column_name)
+ " an array.".format(point_column_name))
if not is_col_array(test_source, test_column_name):
plpy.error("kNN Error: Feature column '{0}' in test table is not"
- " an array.").format(test_column_name)
+ " an array.".format(test_column_name))
if not array_col_has_no_null(point_source, point_column_name):
plpy.error("kNN Error: Feature column '{0}' in train table has some"
@@ -66,44 +70,46 @@ def knn_validate_src(schema_madlib, point_source, point_column_name, point_id,
plpy.error("kNN Error: Feature column '{0}' in test table has some"
" NULL values.".format(test_column_name))
- if k is None:
- k = 1
if k <= 0:
- plpy.error("kNN Error: k={0} is an invalid value, must be greater"
+ plpy.error("kNN Error: k={0} is an invalid value, must be greater "
"than 0.".format(k))
+
bound = plpy.execute("SELECT {k} <= count(*) AS bound FROM {tbl}".
format(k=k, tbl=point_source))[0]['bound']
if not bound:
plpy.error("kNN Error: k={0} is greater than number of rows in"
" training table.".format(k))
- if label_column_name is not None and label_column_name != '':
+ if label_column_name:
col_type = get_expr_type(label_column_name, point_source).lower()
if col_type not in ['integer', 'double precision', 'float', 'boolean']:
- plpy.error("kNN error: Data type '{0}' is not a valid type for"
- " column '{1}' in table '{2}'.".
- format(col_type, label_column_name, point_source))
+ plpy.error("kNN error: Invalid data type '{0}' for"
+ " label_column_name in table '{1}'.".
+ format(col_type, point_source))
col_type_test = get_expr_type(test_id, test_source).lower()
if col_type_test not in ['integer']:
- plpy.error("kNN Error: Data type '{0}' is not a valid type for"
- " column '{1}' in table '{2}'.".
- format(col_type_test, test_id, test_source))
+ plpy.error("kNN Error: Invalid data type '{0}' for"
+ " test_id column in table '{1}'.".
+ format(col_type_test, test_source))
if fn_dist:
fn_dist = fn_dist.lower().strip()
- dist_functions = set([schema_madlib + dist for dist in
- ('.dist_norm1', '.dist_norm2', '.squared_dist_norm2', '.dist_angle', '.dist_tanimoto')])
-
- is_invalid_func = plpy.execute(
- """select prorettype != 'DOUBLE PRECISION'::regtype
- OR proisagg = TRUE AS OUTPUT from pg_proc where
- oid='{fn_dist}(DOUBLE PRECISION[], DOUBLE PRECISION[])'::regprocedure;
- """.format(**locals()))[0]['output']
-
- if is_invalid_func or fn_dist not in dist_functions:
- plpy.error(
- "KNN error: Distance function has wrong signature or is not a simple function.")
+ dist_functions = set(["{0}.{1}".format(schema_madlib, dist) for dist in
+ ('dist_norm1', 'dist_norm2',
+ 'squared_dist_norm2', 'dist_angle',
+ 'dist_tanimoto')])
+
+ is_invalid_func = plpy.execute("""
+ SELECT prorettype != 'DOUBLE PRECISION'::regtype OR
+ proisagg = TRUE AS OUTPUT
+ FROM pg_proc
+ WHERE oid='{fn_dist}(DOUBLE PRECISION[], DOUBLE PRECISION[])'::regprocedure;
+ """.format(fn_dist=fn_dist))[0]['output']
+
+ if is_invalid_func or (fn_dist not in dist_functions):
+ plpy.error("KNN error: Distance function has invalid signature "
+ "or is not a simple function.")
return k
# ------------------------------------------------------------------------------
@@ -146,21 +152,21 @@ def knn(schema_madlib, point_source, point_column_name, point_id,
the weighted average method.
"""
with MinWarning('warning'):
- k_val = knn_validate_src(schema_madlib, point_source,
- point_column_name, point_id, label_column_name,
- test_source, test_column_name, test_id,
- output_table, k, output_neighbors, fn_dist)
+ output_neighbors = True if output_neighbors is None else output_neighbors
+ if k is None:
+ k = 1
+ knn_validate_src(schema_madlib, point_source,
+ point_column_name, point_id, label_column_name,
+ test_source, test_column_name, test_id,
+ output_table, k, output_neighbors, fn_dist)
x_temp_table = unique_string(desp='x_temp_table')
y_temp_table = unique_string(desp='y_temp_table')
label_col_temp = unique_string(desp='label_col_temp')
test_id_temp = unique_string(desp='test_id_temp')
- if output_neighbors is None:
- output_neighbors = True
-
if not fn_dist:
- fn_dist = schema_madlib + '.squared_dist_norm2'
+ fn_dist = '{0}.squared_dist_norm2'.format(schema_madlib)
fn_dist = fn_dist.lower().strip()
interim_table = unique_string(desp='interim_table')
@@ -173,98 +179,121 @@ def knn(schema_madlib, point_source, point_column_name, point_id,
view_join = ""
view_grp_by = ""
- if output_neighbors:
- knn_neighbors = (", array_agg(knn_temp.train_id ORDER BY "
- "knn_temp.dist ASC) AS k_nearest_neighbours ")
if label_column_name:
- is_classification = False
label_column_type = get_expr_type(
label_column_name, point_source).lower()
if label_column_type in ['boolean', 'integer', 'text']:
is_classification = True
cast_to_int = '::INTEGER'
- if weighted_avg:
- pred_out = ",sum( {label_col_temp} * 1/dist)/sum(1/dist)".format(
- label_col_temp=label_col_temp)
else:
- pred_out = ", avg({label_col_temp})".format(
- label_col_temp=label_col_temp)
+ is_classification = False
if is_classification:
if weighted_avg:
# This view is to calculate the max value of sum of the 1/distance grouped by label and Id.
# And this max value will be the prediction for the
# classification model.
- view_def = (""" WITH vw
- AS (SELECT distinct on ({test_id_temp}) {test_id_temp},
- max(data_sum) data_dist,
- {label_col_temp}
- FROM (SELECT {test_id_temp},
+ view_def = """
+ WITH vw AS (
+ SELECT DISTINCT ON({test_id_temp})
+ {test_id_temp},
+ last_value(data_sum) OVER (
+ PARTITION BY {test_id_temp}
+ ORDER BY data_sum, {label_col_temp}
+ ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
+ ) AS data_dist ,
+ last_value({label_col_temp}) OVER (
+ PARTITION BY {test_id_temp}
+ ORDER BY data_sum, {label_col_temp}
+ ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
+ ) AS {label_col_temp}
+ FROM (
+ SELECT
+ {test_id_temp},
{label_col_temp},
- sum(1 / dist) data_sum
- FROM pg_temp.{interim_table}
- GROUP BY {test_id_temp},
- {label_col_temp}) a
- GROUP BY {test_id_temp} , {label_col_temp})""").format(**locals())
+ sum(dist_inverse) data_sum
+ FROM pg_temp.{interim_table}
+ GROUP BY {test_id_temp},
+ {label_col_temp}
+ ) a
+ -- GROUP BY {test_id_temp} , {label_col_temp}
+ )
+ """.format(**locals())
# This join is needed to get the max value of predicion
# calculated above
- view_join = (" JOIN vw AS knn_vw "
- "ON knn_temp.{test_id_temp} = knn_vw.{test_id_temp}").format(
- test_id_temp=test_id_temp)
- view_grp_by = ", knn_vw.{label_col_temp}".format(
- label_col_temp=label_col_temp)
- pred_out = ", knn_vw.{label_col_temp}".format(
- label_col_temp=label_col_temp)
+ view_join = (" JOIN vw ON knn_temp.{0} = vw.{0}".
+ format(test_id_temp))
+ view_grp_by = ", vw.{0}".format(label_col_temp)
+ pred_out = ", vw.{0}".format(label_col_temp)
+ else:
+ pred_out = ", {0}.mode({1})".format(schema_madlib, label_col_temp)
+ else:
+ if weighted_avg:
+ pred_out = (", sum({0} * dist_inverse) / sum(dist_inverse)".
+ format(label_col_temp))
else:
- pred_out = (", {schema_madlib}.mode({label_col_temp})"
- ).format(**locals())
+ pred_out = ", avg({0})".format(label_col_temp)
pred_out += " AS prediction"
label_out = (", train.{label_column_name}{cast_to_int}"
" AS {label_col_temp}").format(**locals())
+ comma_label_out_alias = ', ' + label_col_temp
+ else:
+ pred_out = ""
+ label_out = ""
+ comma_label_out_alias = ""
- if not label_column_name and not output_neighbors:
-
- plpy.error("kNN error: Either label_column_name or "
- "output_neighbors has to be non-NULL.")
-
+ # interim_table picks the 'k' nearest neighbors for each test point
+ if output_neighbors:
+ knn_neighbors = (", array_agg(knn_temp.train_id ORDER BY "
+ "knn_temp.dist_inverse DESC) AS k_nearest_neighbours ")
+ else:
+ knn_neighbors = ''
plpy.execute("""
CREATE TEMP TABLE {interim_table} AS
SELECT * FROM (
SELECT row_number() over
(partition by {test_id_temp} order by dist) AS r,
- {x_temp_table}.*
+ {test_id_temp},
+ train_id,
+ CASE WHEN dist = 0.0 THEN {max_weight_zero_dist}
+ ELSE 1.0 / dist
+ END AS dist_inverse
+ {comma_label_out_alias}
FROM (
- SELECT test.{test_id} AS {test_id_temp} ,
- train.{point_id} as train_id ,
+ SELECT test.{test_id} AS {test_id_temp},
+ train.{point_id} as train_id,
{fn_dist}(
train.{point_column_name},
test.{test_column_name})
- AS dist {label_out}
+ AS dist
+ {label_out}
FROM {point_source} AS train,
- {test_source} AS test
+ {test_source} AS test
) {x_temp_table}
) {y_temp_table}
- WHERE {y_temp_table}.r <= {k_val}
- """.format(**locals()))
+ WHERE {y_temp_table}.r <= {k}
+ """.format(max_weight_zero_dist=MAX_WEIGHT_ZERO_DIST, **locals()))
- plpy.execute("""
+ sql = """
CREATE TABLE {output_table} AS
{view_def}
- SELECT knn_temp.{test_id_temp} AS id ,
- knn_test.data
+ SELECT
+ knn_temp.{test_id_temp} AS id,
+ knn_test.{test_column_name}
{pred_out}
{knn_neighbors}
- FROM {interim_table} AS knn_temp
- JOIN {test_source} AS knn_test
- ON knn_temp.{test_id_temp} = knn_test.id
- {view_join}
- GROUP BY knn_temp.{test_id_temp},
- knn_test.{test_column_name}
- {view_grp_by}
- ORDER BY knn_temp.{test_id_temp}
- """.format(**locals()))
-
+ FROM
+ pg_temp.{interim_table} AS knn_temp
+ JOIN
+ {test_source} AS knn_test
+ ON knn_temp.{test_id_temp} = knn_test.{test_id}
+ {view_join}
+ GROUP BY knn_temp.{test_id_temp},
+ knn_test.{test_column_name}
+ {view_grp_by}
+ """
+ plpy.execute(sql.format(**locals()))
plpy.execute("DROP TABLE IF EXISTS {0}".format(interim_table))
return
@@ -394,9 +423,6 @@ to furthest from the corresponding test point.
DROP TABLE IF EXISTS knn_result_regression;
SELECT * FROM {schema_madlib}.knn(
'knn_train_data_reg', -- Table of training data
- 'data', -- Col name of training data
- 'id', -- Col Name of id in train data
- 'label', -- Training labels
'knn_test_data', -- Table of test data
'data', -- Col name of test data
'id', -- Col name of id in test data