You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ok...@apache.org on 2017/08/25 22:02:13 UTC
incubator-madlib git commit: kNN: Refactor code for ease of use JIRA:
MADLIB-927
Repository: incubator-madlib
Updated Branches:
refs/heads/master 46795de02 -> 2fc98a6fe
kNN: Refactor code for ease of use
JIRA: MADLIB-927
Additional author: Orhan Kislal <ok...@apache.org>
Closes #168
Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/2fc98a6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/2fc98a6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/2fc98a6f
Branch: refs/heads/master
Commit: 2fc98a6fedb3a19f2157f60590b022a8c107c039
Parents: 46795de
Author: Himanshu Pandey <hp...@pivotal.io>
Authored: Fri Aug 25 14:55:28 2017 -0700
Committer: Orhan Kislal <ok...@pivotal.io>
Committed: Fri Aug 25 14:57:05 2017 -0700
----------------------------------------------------------------------
src/ports/postgres/modules/knn/knn.py_in | 173 ++++++++++++++++----
src/ports/postgres/modules/knn/knn.sql_in | 89 ++--------
src/ports/postgres/modules/knn/test/knn.sql_in | 10 +-
3 files changed, 157 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/2fc98a6f/src/ports/postgres/modules/knn/knn.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/knn/knn.py_in b/src/ports/postgres/modules/knn/knn.py_in
index c0d9cd7..b340d52 100644
--- a/src/ports/postgres/modules/knn/knn.py_in
+++ b/src/ports/postgres/modules/knn/knn.py_in
@@ -26,7 +26,6 @@ m4_changequote(`<!', `!>')
@namespace knn
-@brief knn: Driver functions
"""
import plpy
@@ -37,74 +36,81 @@ from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import is_col_array
from utilities.validate_args import array_col_has_no_null
from utilities.validate_args import get_cols_and_types
+from utilities.utilities import unique_string
+from utilities.control import MinWarning
-STATE_IN_MEM = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
-HAS_FUNCTION_PROPERTIES = m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!True!>, <!False!>)
-UDF_ON_SEGMENT_NOT_ALLOWED = m4_ifdef(<!__UDF_ON_SEGMENT_NOT_ALLOWED__!>, <!True!>, <!False!>)
-# ----------------------------------------------------------------------
+def knn_validate_src(schema_madlib, point_source, point_column_name,
+ label_column_name, test_source, test_column_name, id_column_name,
+ output_table, operation, k, **kwargs):
-def knn_validate_src(schema_madlib, point_source, point_column_name, label_column_name,
- test_source, test_column_name, id_column_name, output_table, operation, k, **kwargs):
if not operation or operation not in ['c', 'r']:
- plpy.error("kNN Error: operation='{0}' is an invalid value, has to be 'r' for regression OR 'c' for classification.".format(operation))
+ plpy.error(("kNN Error: operation='{0}' is an invalid value, has to be"
+ + " 'r' for regression OR 'c' for classification.").format(
+ operation))
if not point_source:
plpy.error("kNN Error: Invalid training table name.")
if not table_exists(point_source):
- plpy.error("kNN Error: Training table '{0}' does not exist.".format(point_source))
+ plpy.error("kNN Error: Training table '{0}' does not exist.".format(
+ point_source))
if table_is_empty(point_source):
- plpy.error("kNN Error: Training table '{0}' is empty.".format(point_source))
+ plpy.error("kNN Error: Training table '{0}' is empty.".format(
+ point_source))
if not test_source:
plpy.error("kNN Error: Invalid test table name.")
if not table_exists(test_source):
- plpy.error("kNN Error: Test table '{0}' does not exist.".format(test_source))
+ plpy.error("kNN Error: Test table '{0}' does not exist.".format(
+ test_source))
if table_is_empty(test_source):
- plpy.error("kNN Error: Test table '{0}' is empty.".format(test_source))
+ plpy.error("kNN Error: Test table '{0}' is empty.".format(
+ test_source))
for c in (label_column_name, point_column_name):
if not c:
plpy.error("kNN Error: Invalid column name in training table.")
if not columns_exist_in_table(point_source, [c]):
- plpy.error("kNN Error: " + \
- "Column '{0}' does not exist in {1}.".format(c, point_source))
+ plpy.error("kNN Error: Column '{0}' does not exist in {1}.".format(
+ c, point_source))
for c in (test_column_name, id_column_name):
if not c:
plpy.error("kNN Error: Invalid column name in test table.")
if not columns_exist_in_table(test_source, [c]):
- plpy.error("kNN Error: " + \
- "Column '{0}' does not exist in {1}.".format(c, test_source))
+ plpy.error("kNN Error: Column '{0}' does not exist in {1}.".format(
+ c, test_source))
if not is_col_array(point_source, point_column_name):
- plpy.error("kNN Error: " + \
- "Feature column '{0}' in train table is not an array.".format(point_column_name))
+ plpy.error(("kNN Error: Feature column '{0}' in train table is not"
+ + " an array.").format(point_column_name))
if not is_col_array(test_source, test_column_name):
- plpy.error("kNN Error: " + \
- "Feature column '{0}' in test table is not an array.".format(test_column_name))
+ plpy.error(("kNN Error: Feature column '{0}' in test table is not"
+ + " an array.").format(test_column_name))
if not array_col_has_no_null(point_source, point_column_name):
- plpy.error("kNN Error: " + \
- "Feature column '{0}' in train table has some NULL values.".format(point_column_name))
+ plpy.error(("kNN Error: Feature column '{0}' in train table has some"
+ + " NULL values.").format(point_column_name))
if not array_col_has_no_null(test_source, test_column_name):
- plpy.error("kNN Error: " + \
- "Feature column '{0}' in test table has some NULL values.".format(test_column_name))
+ plpy.error(("kNN Error: Feature column '{0}' in test table has some"
+ + " NULL values.").format(test_column_name))
if not output_table:
plpy.error("kNN Error: Invalid output table name")
if table_exists(output_table):
- plpy.error("kNN Error: Table '{0}' already exists, cannot use it as output table.".format(output_table))
+ plpy.error(("kNN Error: Table '{0}' already exists, cannot use it as"
+ + " output table.").format(output_table))
if k is None:
k = 1
if k<=0:
- plpy.error("kNN Error: k='{0}' is an invalid value, must be greater than 0.".format(k))
+ plpy.error(("kNN Error: k='{0}' is an invalid value, must be greater"
+ + "than 0.").format(k))
bound = plpy.execute("""SELECT {k} <= count(*)
AS bound FROM {tbl}""".format(k=k,
point_column_name=point_column_name, tbl=point_source))[0]['bound']
if not bound:
- plpy.error("kNN Error: " + \
- "k='{0}' is greater than number of rows in training table.".format(k))
+ plpy.error(("kNN Error: k='{0}' is greater than number of rows in"
+ + " training table.").format(k))
colTypesList = get_cols_and_types(point_source)
colType = ''
@@ -112,9 +118,11 @@ def knn_validate_src(schema_madlib, point_source, point_column_name, label_colum
if type[0] == label_column_name:
colType = type[1]
break
- if colType not in ['INTEGER','integer','double precision','DOUBLE PRECISION','float','FLOAT','boolean','BOOLEAN'] :
- plpy.error("kNN Error: " + \
- "Data type '{0}' is not a valid type for column '{1}' in table '{2}'.".format(colType, label_column_name, point_source))
+ if colType not in ['INTEGER','integer','double precision',
+ 'DOUBLE PRECISION','float','FLOAT','boolean','BOOLEAN']:
+ plpy.error(("kNN Error: Data type '{0}' is not a valid type for"
+ + " column '{1}' in table '{2}'.").format(
+ colType, label_column_name, point_source))
colTypesTestList = get_cols_and_types(test_source)
colType = ''
@@ -122,10 +130,103 @@ def knn_validate_src(schema_madlib, point_source, point_column_name, label_colum
if type[0] == id_column_name:
colType = type[1]
break
- if colType not in ['INTEGER','integer'] :
- plpy.error("kNN Error: " + \
- "Data type '{0}' is not a valid type for column '{1}' in table '{2}'.".format(colType, id_column_name, test_source))
+ if colType not in ['INTEGER','integer']:
+ plpy.error(("kNN Error: Data type '{0}' is not a valid type for"
+ + " column '{1}' in table '{2}'.").format(
+ colType, id_column_name, test_source))
return k
-# ----------------------------------------------------------------------
-m4_changequote(<!`!>, <!'!>)
+def knn(schema_madlib, point_source, point_column_name, label_column_name,
+ test_source, test_column_name, id_column_name, output_table, operation, k):
+
+ """
+ KNN function to find the K Nearest neighbours
+ Args:
+ @param schema_madlib Name of the Madlib Schema
+ @param point_source Training data table
+ @param point_column_name Name of the column with training data
+ points.
+ @param label_column_name Name of the column with labels/values
+ of training data points.
+ @param test_source Name of the table containing the test
+ data points.
+ @param test_column_name Name of the column with testing data
+ points.
+ @param id_column_name Name of the column having ids of data
+ points in test data table.
+ @param output_table Name of the table to store final
+ results.
+ @param operation Flag for the operation:
+ 'c' for classification and
+ 'r' for regression
+ @param k default: 1. Number of nearest
+ neighbors to consider
+ Returns:
+ VARCHAR Name of the output table.
+ """
+ with MinWarning('warning'):
+ k_val = knn_validate_src(schema_madlib, point_source,
+ point_column_name, label_column_name, test_source,
+ test_column_name, id_column_name,
+ output_table, operation, k)
+
+ x_temp_table = unique_string(desp='x_temp_table')
+ y_temp_table = unique_string(desp='y_temp_table')
+ label_column_name_unique = unique_string(
+ desp='label_column_name_unique')
+ test_id = unique_string(desp='test_id')
+
+ convert_boolean_to_int = ''
+ isClassification = False
+ if operation == 'c':
+ convert_boolean_to_int = '::INTEGER'
+ isClassification = True
+ madlib_knn_interm = unique_string(desp='madlib_knn_interm')
+ plpy.execute("""
+ DROP TABLE IF EXISTS {madlib_knn_interm}
+ """.format(**locals()))
+ plpy.execute(
+ """
+ CREATE TEMP TABLE {madlib_knn_interm} AS
+ SELECT * FROM
+ (
+ SELECT row_number() over
+ (partition by {test_id} order by dist) AS r,
+ {x_temp_table}.*
+ FROM
+ (
+ SELECT test.{id_column_name} AS {test_id} ,
+ {schema_madlib}.squared_dist_norm2(
+ train.{point_column_name},
+ test.{test_column_name})
+ AS dist,
+ train.{label_column_name} {convert_boolean_to_int}
+ AS {label_column_name_unique}
+ FROM {point_source} AS train, {test_source} AS test
+ ) {x_temp_table}
+ ) {y_temp_table}
+ WHERE {y_temp_table}.r <= {k_val}""".format(**locals()))
+
+ if isClassification:
+ plpy.execute(
+ """
+ CREATE TABLE {output_table} AS
+ SELECT {test_id} AS id, {test_column_name},
+ {schema_madlib}.mode({label_column_name_unique}) AS prediction
+ FROM {madlib_knn_interm} JOIN {test_source} ON
+ {test_id} = {id_column_name}
+ GROUP BY {test_id}, {test_column_name}""".format(**locals()))
+ else:
+ plpy.execute(
+ """
+ CREATE TABLE {output_table} AS
+ SELECT {test_id} AS id, {test_column_name},
+ AVG({label_column_name_unique}) AS prediction
+ FROM
+ {madlib_knn_interm} JOIN {test_source}
+ ON {test_id} = {id_column_name}
+ GROUP BY {test_id}, {test_column_name}
+ ORDER BY {test_id}""".format(**locals()))
+
+ plpy.execute("""DROP TABLE IF EXISTS {madlib_knn_interm}
+ """.format(**locals()))
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/2fc98a6f/src/ports/postgres/modules/knn/knn.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/knn/knn.sql_in b/src/ports/postgres/modules/knn/knn.sql_in
index d3c1929..ca5be88 100644
--- a/src/ports/postgres/modules/knn/knn.sql_in
+++ b/src/ports/postgres/modules/knn/knn.sql_in
@@ -358,6 +358,7 @@ $$ LANGUAGE plpgsql VOLATILE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `READS SQL DATA', `');
+
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.knn(
point_source VARCHAR,
point_column_name VARCHAR,
@@ -369,79 +370,20 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.knn(
operation VARCHAR,
k INTEGER
) RETURNS VARCHAR AS $$
-DECLARE
- l FLOAT;
- id INTEGER;
- vector DOUBLE PRECISION[];
- cur_pid integer;
- oldClientMinMessages VARCHAR;
- returnstring VARCHAR;
- x_temp_table VARCHAR;
- y_temp_table VARCHAR;
- k_val INTEGER;
- label_column_name_unique VARCHAR;
- test_id VARCHAR;
- convert_boolean_to_int VARCHAR;
-BEGIN
- oldClientMinMessages :=
- (SELECT setting FROM pg_settings WHERE name = 'client_min_messages');
- EXECUTE 'SET client_min_messages TO warning';
- SELECT * FROM MADLIB_SCHEMA.__knn_validate_src(point_source, point_column_name, label_column_name, test_source, test_column_name, id_column_name, output_table, operation, k) INTO k_val;
- PERFORM MADLIB_SCHEMA.create_schema_pg_temp();
- x_temp_table := 'knn_'||md5('knn_'||now()::text||random()::text)||'_temp';
- y_temp_table := 'knn_'||md5('knn_'||now()::text||random()::text)||'_temp';
- label_column_name_unique := 'label'||md5('knn_'||now()::text||random()::text)||'_name';
- test_id := 'id'||md5('knn_'||now()::text||random()::text)||'_name';
-
- convert_boolean_to_int := '';
- IF (operation = 'c') THEN
- convert_boolean_to_int := '::INTEGER';
- END IF;
-
- EXECUTE
- $sql$
- DROP TABLE IF EXISTS pg_temp.madlib_knn_interm;
- CREATE TABLE pg_temp.madlib_knn_interm AS
- SELECT *
- FROM
- (
- SELECT row_number() over (partition by $sql$ || test_id || $sql$ order by dist) AS r, $sql$ || x_temp_table || $sql$.*
- FROM
- (
- SELECT test.$sql$ || id_column_name || $sql$ AS $sql$ || test_id || $sql$, MADLIB_SCHEMA.squared_dist_norm2(train.$sql$ || point_column_name || $sql$,test.$sql$ || test_column_name || $sql$) AS dist, train.$sql$ || label_column_name || $sql$ $sql$ || convert_boolean_to_int || $sql$ AS $sql$ || label_column_name_unique || $sql$
- FROM $sql$ || textin(regclassout(point_source)) || $sql$ AS train, $sql$ || textin(regclassout(test_source)) || $sql$ AS test
- )$sql$ || x_temp_table || $sql$
- )$sql$ || y_temp_table || $sql$
- WHERE $sql$ || y_temp_table || $sql$.r <= $sql$ || k_val;
-
- IF (operation = 'c') THEN
- EXECUTE
- $sql$
- CREATE TABLE $sql$ || output_table || $sql$ AS
- SELECT $sql$ || test_id || $sql$ AS id, $sql$ || test_column_name || $sql$, MADLIB_SCHEMA.mode($sql$ || label_column_name_unique || $sql$) AS prediction
- FROM pg_temp.madlib_knn_interm join $sql$ || textin(regclassout(test_source)) || $sql$ ON $sql$ || test_id || $sql$=$sql$ || id_column_name || $sql$
- GROUP BY $sql$ || test_id || $sql$, $sql$ || test_column_name;
- ELSE
- EXECUTE
- $sql$
- CREATE TABLE $sql$ || output_table || $sql$ AS
- SELECT $sql$ || test_id || $sql$ AS id, $sql$ || test_column_name || $sql$, avg($sql$ || label_column_name_unique || $sql$) AS prediction
- FROM
- pg_temp.madlib_knn_interm join $sql$ || textin(regclassout(test_source)) || $sql$ on $sql$ || test_id || $sql$=$sql$ || id_column_name || $sql$
- GROUP BY $sql$ || test_id || $sql$, $sql$ || test_column_name || $sql$
- ORDER BY $sql$ || test_id || $sql$ $sql$;
- END IF;
-
- EXECUTE 'SET client_min_messages TO ' || oldClientMinMessages;
- IF (operation = 'c') THEN
- returnstring := 'The classification results have been written to output table '||output_table;
- ELSE
- returnstring := 'The regression results have been written to output table '||output_table;
- END IF;
- DROP TABLE pg_temp.madlib_knn_interm;
- RETURN returnstring;
-END;
-$$ LANGUAGE plpgsql VOLATILE
+ PythonFunctionBodyOnly(`knn', `knn')
+ return knn.knn(
+ schema_madlib,
+ point_source,
+ point_column_name,
+ label_column_name,
+ test_source,
+ test_column_name,
+ id_column_name,
+ output_table,
+ operation,
+ k
+ )
+$$ LANGUAGE plpythonu VOLATILE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
@@ -463,4 +405,3 @@ BEGIN
END;
$$ LANGUAGE plpgsql VOLATILE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
-
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/2fc98a6f/src/ports/postgres/modules/knn/test/knn.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/knn/test/knn.sql_in b/src/ports/postgres/modules/knn/test/knn.sql_in
index e5427fd..c7d6798 100644
--- a/src/ports/postgres/modules/knn/test/knn.sql_in
+++ b/src/ports/postgres/modules/knn/test/knn.sql_in
@@ -28,9 +28,9 @@ m4_include(`SQLCommon.m4')
drop table if exists "KNN_TRAIN_DATA";
create table "KNN_TRAIN_DATA" (
-id integer,
-"DATA" integer[],
-label float);
+ id integer,
+ "DATA" integer[],
+ label float);
copy "KNN_TRAIN_DATA" (id, "DATA", label) from stdin delimiter '|';
1|{1,1}|1.0
2|{2,2}|1.0
@@ -44,8 +44,8 @@ copy "KNN_TRAIN_DATA" (id, "DATA", label) from stdin delimiter '|';
\.
drop table if exists knn_test_data;
create table knn_test_data (
-id integer,
-"DATA" integer[]);
+ id integer,
+ "DATA" integer[]);
copy knn_test_data (id, "DATA") from stdin delimiter '|';
1|{2,1}
2|{2,6}