You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ri...@apache.org on 2016/04/01 03:21:22 UTC
[03/11] incubator-madlib git commit: Build: Add support for HAWQ 2.0
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/96f9ac04/src/ports/postgres/modules/linalg/linalg.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/linalg/linalg.sql_in b/src/ports/postgres/modules/linalg/linalg.sql_in
index 80b9660..331dbb5 100644
--- a/src/ports/postgres/modules/linalg/linalg.sql_in
+++ b/src/ports/postgres/modules/linalg/linalg.sql_in
@@ -452,18 +452,7 @@ m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!NO SQL!>, <!!>);
* column of \f$ M \f$ and \f$ x \f$. That is,
* \f$ \min_{i=0,\dots,l-1} \operatorname{dist}(\vec{m_i}, \vec x) \f$.
*/
-m4_ifdef(<!__HAWQ__!>, <!!>, <!
-CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_column(
- M DOUBLE PRECISION[],
- x DOUBLE PRECISION[],
- dist REGPROC /*+ DEFAULT 'squared_dist_norm2' */
-) RETURNS MADLIB_SCHEMA.closest_column_result AS $$
- SELECT MADLIB_SCHEMA._closest_column($1, $2, $3, textin(regprocout($3)))
-$$ LANGUAGE SQL IMMUTABLE STRICT
-m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `CONTAINS SQL');
-!>)
-
-m4_ifdef(<!__HAWQ__!>, <!
+m4_ifdef(<!__UDF_ON_SEGMENT_NOT_ALLOWED__!>, <!
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_column(
M DOUBLE PRECISION[],
x DOUBLE PRECISION[],
@@ -472,8 +461,17 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_column(
IMMUTABLE
STRICT
LANGUAGE C
-AS 'MODULE_PATHNAME', 'closest_column_hawq'
+AS 'MODULE_PATHNAME', 'closest_column_fixed'
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL');
+!>, <!
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_column(
+ M DOUBLE PRECISION[],
+ x DOUBLE PRECISION[],
+ dist REGPROC /*+ DEFAULT 'squared_dist_norm2' */
+) RETURNS MADLIB_SCHEMA.closest_column_result AS $$
+ SELECT MADLIB_SCHEMA._closest_column($1, $2, $3, textin(regprocout($3)))
+$$ LANGUAGE SQL IMMUTABLE STRICT
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `CONTAINS SQL');
!>)
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_column(
@@ -526,19 +524,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA._closest_columns(
LANGUAGE C IMMUTABLE STRICT
m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!NO SQL!>, <!!>);
-m4_ifdef(<!__HAWQ__!>, <!!>, <!
-CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_columns(
- M DOUBLE PRECISION[],
- x DOUBLE PRECISION[],
- num INTEGER,
- dist REGPROC
-) RETURNS MADLIB_SCHEMA.closest_columns_result AS $$
- SELECT MADLIB_SCHEMA._closest_columns($1, $2, $3, $4, textin(regprocout($4)))
-$$ LANGUAGE SQL IMMUTABLE STRICT
-m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `CONTAINS SQL');
-!>)
-
-m4_ifdef(<!__HAWQ__!>, <!
+m4_ifdef(<!__UDF_ON_SEGMENT_NOT_ALLOWED__!>, <!
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_columns(
M DOUBLE PRECISION[],
x DOUBLE PRECISION[],
@@ -548,8 +534,18 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_columns(
IMMUTABLE
STRICT
LANGUAGE C
-AS 'MODULE_PATHNAME', 'closest_columns_hawq'
+AS 'MODULE_PATHNAME', 'closest_columns_fixed'
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL');
+!>, <!
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_columns(
+ M DOUBLE PRECISION[],
+ x DOUBLE PRECISION[],
+ num INTEGER,
+ dist REGPROC
+) RETURNS MADLIB_SCHEMA.closest_columns_result AS $$
+ SELECT MADLIB_SCHEMA._closest_columns($1, $2, $3, $4, textin(regprocout($4)))
+$$ LANGUAGE SQL IMMUTABLE STRICT
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `CONTAINS SQL');
!>)
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_columns(
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/96f9ac04/src/ports/postgres/modules/svd_mf/__init__.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/svd_mf/__init__.py_in b/src/ports/postgres/modules/svd_mf/__init__.py_in
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/96f9ac04/src/ports/postgres/modules/svd_mf/svdmf.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/svd_mf/svdmf.py_in b/src/ports/postgres/modules/svd_mf/svdmf.py_in
deleted file mode 100644
index 081dd7c..0000000
--- a/src/ports/postgres/modules/svd_mf/svdmf.py_in
+++ /dev/null
@@ -1,258 +0,0 @@
-import plpy
-import datetime
-from math import floor, log, pow, sqrt
-
-"""@file svdmf.py_in
-
-Implementation of partial SVD decomposition of a sparse matrix into U and V components.
-"""
-
-# ----------------------------------------
-# Logging
-# ----------------------------------------
-def info( msg):
- #plpy.info( datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' : ' + msg);
- plpy.info( msg);
-
-# ----------------------------------------
-# Logging
-# ----------------------------------------
-def svdmf_run( madlib_schema, input_matrix, col_name, row_name, value, num_features):
- return(svdmf_run_full( madlib_schema, input_matrix, col_name, row_name, value, num_features, 1000, .0001))
-
-# ----------------------------------------
-# Main: svdmf_run
-# ----------------------------------------
-def svdmf_run_full( madlib_schema, input_matrix, col_name, row_name, value, num_features, NUM_ITERATIONS, MIN_IMPROVEMENT):
- """
- These constants are important for the execution of the algorithms and may need to be changed for some types of data.
-
- ORIGINAL_STEP - is a size of the step in the direction of gradient.
- It is divided by ~M*N*(M+N). Too large of a step
- may cause algorithm failure to converge and overflows.
- Too small of the step will make execution longer than necessary.
-
- SPEEDUP_CONST, FAST_SPEEDUP_CONST and SLOWDOWN_CONST -
- Size of step is changing dynamically during the execution,
- as long as algorithm making a progress step size is increased
- at each iteration by multiplying by SPEEDUP_CONST (speed-up constant > 1).
- When algorithm starts to fail to converge step is made smaller abruptly
- to avoid overflow by multiplying by SLOWDOWN_CONST.
- In some instances initial step size is very small,
- to quickly get it into the suitable range FAST_SPEEDUP_CONST (> 1) is used.
- This means that generally FAST_SPEEDUP_CONST >> SPEEDUP_CONST.
- Those values may be changed if necessary.
-
- NUM_ITERATIONS is the maximum number of iterations to perform.
- Generally this should not be the cause for termination, but on some data it is
- possible that algorithm is making progress, but at a very slow pace,
- in this case termination will occur when this number of iterations is reached.
-
- MIN_NUM_ITERATIONS - sometimes algorithm starts with very small progress
- and it then increases. To avoid the case when it exits prematurely one can
- set a min number of iterations after which progress will be checked
-
- MIN_IMPROVEMENT - minimum improvement that has to be sustained for
- algorithm to continue. It is compared with consecutive error terms.
- Sometimes it needs to be lowered
- in instances when initial progress may be too small, or increased
- if execution it taking too long, and level of needed precision is surpassed.
-
- INIT_VALUE - initial value that new rows are starting with.
- Value is almost irrelevant in most cases, but it should not be 0,
- since this value is used as a multiple in subsequent steps.
-
- EARLY_TEMINATE - tells algorithm to terminate early if after a given
- dimension (< K) error is less than MIN_IMPROVEMENT. It should generally be TRUE.
- Since there is no contribution that one should expect from adding consequent dimensions.
- """
-
- # Record the time
- start = datetime.datetime.now();
-
- ORIGINAL_STEP = .001;
- SPEEDUP_CONST = 1.1;
- FAST_SPEEDUP_CONST = 10.0;
- SLOWDOWN_CONST = .1;
- #NUM_ITERATIONS = 1000;
- #MIN_IMPROVEMENT = 0.000001;
- MIN_NUM_ITERATIONS = 1;
- IMPROVEMENT_REACHED = True;
- INIT_VALUE = 0.1;
- EARLY_TEMINATE = True;
-
- error=0;
- keep_ind = 1;
- SD_ind = 1;
-
- # Find sizes of the input and number of elements in the input
- res = plpy.execute('SELECT count(distinct ' + col_name + ') AS c FROM ' + input_matrix + ';');
- feature_y = res[0]['c'];
- res = plpy.execute('SELECT count(distinct ' + row_name + ') AS c FROM ' + input_matrix + ';');
- feature_x = res[0]['c'];
- res = plpy.execute('SELECT count(*) AS c FROM ' + input_matrix + ';');
- cells = res[0]['c'];
-
- # Adjust step size based on the size
- #ORIGINAL_STEP = ORIGINAL_STEP/(feature_x+feature_y);
-
- # Parameters summary:
- info( 'Started svdmf_run() with parameters:');
- info( ' * input_matrix = %s' % input_matrix);
- info( ' * col_name = %s' % col_name);
- info( ' * row_name = %s' % row_name);
- info( ' * value = %s' % value);
- info( ' * num_features = %s' % str(num_features));
- info('got there...');
-
- # Create output and intermediate (temp) tables necessary for the execution
- # matrix_u and matrix_v are for end results. tables e1, e2, e are for residual errors
- # tables S1, S2, D1, D2 are for estimates of row values of u and v.
- sql = '''
- DROP TABLE IF EXISTS ''' + madlib_schema + '''.matrix_u;
- CREATE TABLE ''' + madlib_schema + '''.matrix_u(
- col_num INT,
- row_num INT,
- val FLOAT
- );
- DROP TABLE IF EXISTS ''' + madlib_schema + '''.matrix_v;
- CREATE TABLE ''' + madlib_schema + '''.matrix_v(
- row_num INT,
- col_num INT,
- val FLOAT
- );
- DROP TABLE IF EXISTS e1;
- CREATE TEMP TABLE e1(
- row_num INT,
- col_num INT,
- val FLOAT
- ) m4_ifdef( `__POSTGRESQL__', `', `DISTRIBUTED BY (row_num, col_num)');
- DROP TABLE IF EXISTS S1;
- CREATE TEMP TABLE S1(
- col_num INT,
- row_num INT,
- val FLOAT
- ) m4_ifdef( `__POSTGRESQL__', `', `DISTRIBUTED BY (col_num)');
- DROP TABLE IF EXISTS S2;
- CREATE TEMP TABLE S2(
- col_num INT,
- row_num INT,
- val FLOAT
- ) m4_ifdef( `__POSTGRESQL__', `', `DISTRIBUTED BY (col_num)');
- DROP TABLE IF EXISTS D1;
- CREATE TEMP TABLE D1(
- row_num INT,
- col_num INT,
- val FLOAT
- ) m4_ifdef( `__POSTGRESQL__', `', `DISTRIBUTED BY (row_num)');
- DROP TABLE IF EXISTS D2;
- CREATE TEMP TABLE D2(
- row_num INT,
- col_num INT,
- val FLOAT
- ) m4_ifdef( `__POSTGRESQL__', `', `DISTRIBUTED BY (row_num)');
- DROP TABLE IF EXISTS e;
- CREATE TABLE e(
- row_num INT,
- col_num INT,
- val FLOAT
- ) m4_ifdef( `__POSTGRESQL__', `', `DISTRIBUTED BY (row_num,col_num)');
- ''';
- plpy.execute(sql);
-
- # Copy original data into a temp table
- info( 'Copying the source data into a temporary table...');
- plpy.execute('INSERT INTO e1 SELECT ' + row_name + ', ' + col_name + ', ' + str(value) + ' FROM ' + input_matrix + ';');
-
- # Create tables to keep most of the execution data
- sql = '''
- TRUNCATE TABLE S1;
- TRUNCATE TABLE S2;
- TRUNCATE TABLE D1;
- TRUNCATE TABLE D2;
- ''';
- plpy.execute(sql);
-
- # populate initial vectors
- plpy.execute('INSERT INTO S1 (row_num, col_num, val) SELECT (g.a-1)/' +str(num_features)+ '+1, (g.a-1)%' +str(num_features)+ '+1, random() FROM generate_series(1,'+str(feature_x*num_features)+') AS g(a);');
- plpy.execute('INSERT INTO D1 (col_num, row_num, val) SELECT (g.a-1)/' +str(num_features)+ '+1, (g.a-1)%' +str(num_features)+ '+1, random() FROM generate_series(1,'+str(feature_y*num_features)+') AS g(a);');
-
- SD_ind = 1;
- i = 0;
- step = ORIGINAL_STEP;
- imp_reached = False;
-
- while(True):
-
- i = i + 1;
-
- sql = '''
- TRUNCATE TABLE e;
- ''';
- plpy.execute(sql);
-
- # Create current predictions for the values and compute error per term
- #plpy.execute('INSERT INTO e SELECT a.row_num, a.col_num, a.val-o.prod FROM (SELECT s.row_num, d.col_num, '+madlib_schema+'.array_dot(d.val,s.val) AS prod FROM (SELECT row_num, array_agg(val ORDER BY col_num) AS val FROM S'+str(SD_ind)+' GROUP BY row_num) AS s CROSS JOIN (SELECT col_num, array_agg(val ORDER BY row_num) AS val FROM D'+str(SD_ind)+' GROUP BY col_num) AS d) as o, e1 as a WHERE a.row_num=o.row_num AND a.col_num=o.col_num;');
- plpy.execute('INSERT INTO e SELECT a.row_num, a.col_num, a.val-o.prod FROM (SELECT s.row_num, d.col_num, sum(s.val*d.val) AS prod FROM S'+str(SD_ind)+' AS s JOIN D'+str(SD_ind)+' AS d ON d.row_num = s.col_num GROUP BY s.row_num, d.col_num) AS o, e1 AS a WHERE a.row_num=o.row_num AND a.col_num=o.col_num');
- old_error = error;
-
- res = plpy.execute('SELECT sqrt(sum(val*val)) AS c FROM e;');
- error = res[0]['c'];
-
- info( '...Iteration ' + str(i) + ': residual_error = '+ str(error) +', step_size = ' + str(step) + ', min_improvement = ' + str(MIN_IMPROVEMENT));
-
- # Check if progress is being made or max number of iterations reached
- if(((abs(error - old_error) < MIN_IMPROVEMENT) and (i >= MIN_NUM_ITERATIONS) and ((error < MIN_IMPROVEMENT) or (not IMPROVEMENT_REACHED) or (imp_reached))) or (NUM_ITERATIONS < i)):
- break;
-
- if((abs(error - old_error) >= MIN_IMPROVEMENT) and (old_error > 0)):
- imp_reached = True;
-
- # Check if step size need to be increased or decreased
- if((error > old_error) and (old_error != 0)) :
- error = 0;
- step = step*SLOWDOWN_CONST;
- SD_ind = SD_ind%2+1;
- continue;
- elif(sqrt((error - old_error)*(error - old_error)) < .1*MIN_IMPROVEMENT):
- step = step*FAST_SPEEDUP_CONST;
- else:
- step = step*SPEEDUP_CONST;
-
- # Empty intermediate tables
- plpy.execute('TRUNCATE TABLE S'+str(SD_ind%2+1)+';');
- plpy.execute('TRUNCATE TABLE D'+str(SD_ind%2+1)+';');
-
- # Update values of the vectors
- # The commented out line are an implementation that uses ordered aggrigates that we chose to avoid for compatibility reasons
- #plpy.execute('INSERT INTO S'+str(SD_ind%2+1)+' SELECT s.col_num, s.row_num, s.val+'+str(step)+'*o.prod FROM (SELECT e.row_num, d.row_num as col_num, '+madlib_schema+'.array_dot(d.val,e.val) AS prod FROM (SELECT row_num, array_agg(val ORDER BY col_num) AS val FROM e GROUP BY row_num) AS e CROSS JOIN (SELECT row_num, array_agg(val ORDER BY col_num) AS val FROM D'+str(SD_ind)+' GROUP BY row_num) AS d) as o, S'+str(SD_ind)+' as s WHERE s.col_num = o.col_num AND s.row_num = o.row_num;');
- #plpy.execute('INSERT INTO D'+str(SD_ind%2+1)+' SELECT d.row_num, d.col_num, d.val+'+str(step)+'*o.prod FROM (SELECT s.col_num AS row_num, e.col_num, '+madlib_schema+'.array_dot(e.val,s.val) AS prod FROM (SELECT col_num, array_agg(val ORDER BY row_num) AS val FROM S'+str(SD_ind)+' GROUP BY col_num) AS s CROSS JOIN (SELECT col_num, array_agg(val ORDER BY row_num) AS val FROM e GROUP BY col_num) AS e) as o, D'+str(SD_ind)+' as d WHERE d.col_num = o.col_num AND d.row_num = o.row_num;');
-
- plpy.execute('INSERT INTO S'+str(SD_ind%2+1)+' SELECT s.col_num, s.row_num, s.val+'+str(step)+'*o.prod FROM (SELECT e.row_num, d.row_num AS col_num, sum(e.val*d.val) AS prod FROM e JOIN D'+str(SD_ind)+' AS d ON e.col_num = d.col_num GROUP BY e.row_num, d.row_num) AS o, S'+str(SD_ind)+' AS s WHERE s.row_num=o.row_num AND s.col_num=o.col_num');
- plpy.execute('INSERT INTO D'+str(SD_ind%2+1)+' SELECT d.row_num, d.col_num, d.val+'+str(step)+'*o.prod FROM (SELECT s.col_num AS row_num, e.col_num, sum(e.val*s.val) AS prod FROM e JOIN S'+str(SD_ind)+' AS s ON s.row_num = e.row_num GROUP BY e.col_num, s.col_num) AS o, D'+str(SD_ind)+' AS d WHERE d.row_num=o.row_num AND d.col_num=o.col_num');
-
- SD_ind = SD_ind%2+1;
-
- # Save previous vector and prepare for computing another one
- keep_ind = keep_ind%2+1;
-
- if((error < MIN_IMPROVEMENT) and (EARLY_TEMINATE)):
- break;
-
- plpy.execute('INSERT INTO ' + madlib_schema + '.matrix_u SELECT * FROM S'+str(SD_ind)+';');
- plpy.execute('INSERT INTO ' + madlib_schema + '.matrix_v SELECT * FROM D'+str(SD_ind)+';');
-
- # Runtime evaluation
- end = datetime.datetime.now();
- minutes, seconds = divmod( (end - start).seconds, 60)
- microsec = (end - start).microseconds
- return ('''
- Finished SVD matrix factorisation for %s (%s, %s, %s).
- Results:
- * total error = %s
- Output:
- * table : ''' + madlib_schema + '''.matrix_u
- * table : ''' + madlib_schema + '''.matrix_v
- Time elapsed: %d minutes %d.%d seconds.
- ''') % (input_matrix, row_name, col_name, value, str(error), minutes, seconds, microsec)
-
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/96f9ac04/src/ports/postgres/modules/svd_mf/svdmf.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/svd_mf/svdmf.sql_in b/src/ports/postgres/modules/svd_mf/svdmf.sql_in
deleted file mode 100644
index d85b7a8..0000000
--- a/src/ports/postgres/modules/svd_mf/svdmf.sql_in
+++ /dev/null
@@ -1,199 +0,0 @@
-/* ----------------------------------------------------------------------- *//**
- *
- * @file svdmf.sql_in
- *
- * @brief SQL functions for SVD Matrix Factorization
- * @date January 2011
- *
- * @sa For a brief introduction to SVD Matrix Factorization, see the module
- * description \ref grp_svd.
- *
- *//* ----------------------------------------------------------------------- */
-
-m4_include(`SQLCommon.m4')
-
-/**
-@addtogroup grp_svdmf
-
-\warning <em> This is an old implementation of Matrix Decomposition and
-has been deprecated. For SVD decomposition, please see \ref grp_svd;
-for the latest version of low-rank approximation, please see \ref grp_lmf</em>
-
-<div class="toc"><b>Contents</b>
-<ul>
-<li><a href="#syntax">SVD Function Syntax</a></li>
-<li><a href="#xamples">Examples</a></li>
-<li><a href="#literature">Literature</a></li>
-<li><a href="#related">Related Topics</a></li>
-</ul>
-</div>
-
-@brief Computes low-rank approximation of a sparse matrix.
-
-This module implements "partial SVD decomposition" method for representing a sparse matrix using a low-rank approximation.
-Mathematically, this algorithm seeks to find matrices U and V that, for any given A, minimizes:\n
-\f[ ||\boldsymbol A - \boldsymbol UV ||_2
-\f]
-subject to \f$ rank(\boldsymbol UV) \leq k \f$, where \f$ ||\cdot||_2 \f$ denotes the Frobenius norm and \f$ k \leq rank(\boldsymbol A)\f$.
-If A is \f$ m \times n \f$, then U will be \f$ m \times k \f$ and V will be \f$ k \times n \f$.
-
-This algorithm is not intended to do the full decomposition, or to be used as part of
-inverse procedure. It effectively computes the SVD of a low-rank approximation of A (preferably sparse), with the singular values absorbed in U and V.
-Code is based on the write-up as appears at [1], with some modifications.
-
-
-
-@anchor syntax
-@par Function Syntax
-
-The SVD function is called as follows:
-<pre class="syntax">
-svdmf_run( input_table,
- col_name,
- row_name,
- value, num_features)
-</pre>
-
-The <b>input matrix</b> is expected to be of the following form:
-<pre>{TABLE|VIEW} <em>input_table</em> (
- <em>col_num</em> INTEGER,
- <em>row_num</em> INTEGER,
- <em>value</em> FLOAT
-)</pre>
-
-Input is contained in a table where column number and row number for each cell
-are sequential; that is to say that if the data was written as a matrix, those values would be the
-actual row and column numbers and not some random identifiers. All rows and columns must be associated with a value.
-There should not be any missing row, columns or values.
-
-The function returns two tables \c matrix_u and \c matrix_v, which represent the matrices U and V in table format.
-
-@anchor examples
-@examp
-
--# Prepare an input table/view.
-<pre class="example">
-CREATE TABLE svd_test ( col INT,
- row INT,
- val FLOAT
- );
-</pre>
--# Populate the input table with some data.
-<pre class="example">
-INSERT INTO svd_test SELECT ( g.a%1000)+1, g.a/1000+1, random()
- FROM generate_series(1,1000) AS g(a);
-</pre>
--# Call the svdmf_run() stored procedure.
-<pre class="example">
-SELECT madlib.svdmf_run( 'svd_test',
- 'col',
- 'row',
- 'val',
- 3);
-</pre>
-Example result:
-<pre class="result">
-INFO: ('Started svdmf_run() with parameters:',)
-INFO: (' * input_matrix = madlib_svdsparse_test.test',)
-INFO: (' * col_name = col_num',)
-INFO: (' * row_name = row_num',)
-INFO: (' * value = val',)
-INFO: (' * num_features = 3',)
-INFO: ('Copying the source data into a temporary table...',)
-INFO: ('Estimating feature: 1',)
-INFO: ('...Iteration 1: residual_error = 33345014611.1, step_size = 4.9997500125e-10, min_improvement = 1.0',)
-INFO: ('...Iteration 2: residual_error = 33345014557.6, step_size = 5.49972501375e-10, min_improvement = 1.0',)
-INFO: ('...Iteration 3: residual_error = 33345014054.3, step_size = 6.04969751512e-10, min_improvement = 1.0',)
-...
-INFO: ('...Iteration 78: residual_error = 2.02512133868, step_size = 5.78105354457e-10, min_improvement = 1.0',)
-INFO: ('...Iteration 79: residual_error = 0.893810181282, step_size = 6.35915889903e-10, min_improvement = 1.0',)
-INFO: ('...Iteration 80: residual_error = 0.34496773222, step_size = 6.99507478893e-10, min_improvement = 1.0',)
-INFO: ('Swapping residual error matrix...',)
- svdmf_run
- -------------------------------------------------------------------------------------------
-
- Finished SVD matrix factorisation for madlib_svdsparse_test.test (row_num, col_num, val).
- Results:
- * total error = 0.34496773222
- * number of estimated features = 1
- Output:
- * table : madlib.matrix_u
- * table : madlib.matrix_v
- Time elapsed: 4 minutes 47.86839 seconds.
-</pre>
-
-@anchor literature
-@literature
-
-[1] Simon Funk, Netflix Update: Try This at Home, December 11 2006,
- http://sifter.org/~simon/journal/20061211.html
-
-
-@anchor related
-@par Related Topics
-File svdmf.sql_in documenting the SQL functions.
-
-@internal
-@sa namespace svdmf (documenting the implementation in Python)
-@endinternal
-
-*/
-
-/**
- * @brief Partial SVD decomposition of a sparse matrix into U and V components
- *
- * This function takes as input the table representation of a sparse matrix and
- * decomposes it into the specified set of most significant features of matrices
- * of U and V matrix.
- *
- * @param input_table Name of the table/view with the source data
- * @param col_name Name of the column containing cell column number
- * @param row_name Name of the column containing cell row number
- * @param value Name of the column containing cell value
- * @param num_features Rank of desired approximation
- *
- */
-CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.svdmf_run(
- input_table TEXT, col_name TEXT, row_name TEXT, value TEXT, num_features INT
-)
-RETURNS TEXT
-AS $$
-
- PythonFunctionBodyOnly(`svd_mf', `svdmf')
-
- # schema_madlib comes from PythonFunctionBodyOnly
- return svdmf.svdmf_run( schema_madlib, input_table, col_name, row_name, value, num_features);
-
-$$ LANGUAGE plpythonu VOLATILE
-m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
-
-/**
- * @brief Partial SVD decomposition of a sparse matrix into U and V components
- *
- * This function takes as input the table representation of a sparse matrix and
- * decomposes it into the specified set of most significant features of matrices
- * of U and V matrix.
- *
- * @param input_table Name of the table/view with the source data
- * @param col_name Name of the column containing cell column number
- * @param row_name Name of the column containing cell row number
- * @param value Name of the column containing cell value
- * @param num_features Rank of desired approximation
- * @param num_iterations Maximum number if iterations to perform regardless of convergence
- * @param min_error Acceptable level of error in convergence.
- *
- */
-
-CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.svdmf_run(
- input_table TEXT, col_name TEXT, row_name TEXT, value TEXT, num_features INT, num_iterations INT, min_error FLOAT
-)
-RETURNS TEXT
-AS $$
-
- PythonFunctionBodyOnly(`svd_mf', `svdmf')
-
- # schema_madlib comes from PythonFunctionBodyOnly
- return svdmf.svdmf_run_full( schema_madlib, input_table, col_name, row_name, value, num_features, num_iterations, min_error);
-
-$$ LANGUAGE plpythonu VOLATILE
-m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/96f9ac04/src/ports/postgres/modules/svd_mf/test/svdmf.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/svd_mf/test/svdmf.sql_in b/src/ports/postgres/modules/svd_mf/test/svdmf.sql_in
deleted file mode 100644
index c77e633..0000000
--- a/src/ports/postgres/modules/svd_mf/test/svdmf.sql_in
+++ /dev/null
@@ -1,79 +0,0 @@
----------------------------------------------------------------------------
--- Rules:
--- ------
--- 1) Any DB objects should be created w/o schema prefix,
--- since this file is executed in a separate schema context.
--- 2) There should be no DROP statements in this script, since
--- all objects created in the default schema will be cleaned-up outside.
----------------------------------------------------------------------------
-
----------------------------------------------------------------------------
--- Setup:
----------------------------------------------------------------------------
---------------------------------------------------------------------------------
--- Generate_sparse:
--- Creates a table represetint sparse matrix which is the result or products of two
--- sequential valued rows.
--- Used for testing general convergence on data that has a discoverable structure
---
--- $1 - number of rows in the matrix
--- $2 - number of columns in the matrix
--- $3 - number of empty cells pre single cell containing a value
---------------------------------------------------------------------------------
-CREATE OR REPLACE FUNCTION Generate_Sparse(INT, INT, INT) RETURNS void AS $$
-declare
- i INT;
-begin
--- DROP TABLE IF EXISTS test;
-CREATE TABLE test(
- row_num INT,
- col_num INT,
- val FLOAT
-);
-
-FOR i IN 1..$1 LOOP
- EXECUTE 'INSERT INTO test SELECT '||i||', gen.a, CAST((gen.a*'||i||') AS FLOAT) FROM (SELECT CAST((random()*'||$2||'+1) AS INT) AS a FROM generate_series(1,'||$2/$3||')) as gen';
-END LOOP;
-end
-$$ LANGUAGE plpgsql;
-
-
---------------------------------------------------------------------------------
--- Generate_random:
--- Creates a table represetint sparse matrix with random values.
--- Used for testing general convergence poroperties on random data,
--- where convergence rate should be minimal
---
--- $1 - number of rows in the matrix
--- $2 - number of columns in the matrix
--- $3 - number of empty cells pre single cell containing a value
---------------------------------------------------------------------------------
-CREATE OR REPLACE FUNCTION Generate_Random(INT, INT, INT) RETURNS void AS $$
-declare
- i INT;
-begin
--- DROP TABLE IF EXISTS test;
-CREATE TABLE test(
- row_num INT,
- col_num INT,
- val FLOAT
-);
-
-FOR i IN 1..$1 LOOP
- EXECUTE 'INSERT INTO test SELECT '||i||', gen.a, random() FROM (SELECT CAST((random()*'||$2||'+1) AS INT) AS a FROM generate_series(1,'||$2/$3||')) as gen';
-END LOOP;
-end
-$$ LANGUAGE plpgsql;
-
----------------------------------------------------------------
--- Test
----------------------------------------------------------------
--- Pick a test to run: Random or Sequential Sparse; and creat a test table
--- SELECT Generate_Random(10000, 10000, 100);
-SELECT Generate_Sparse(10, 10, 5);
-
--- Run SVD decomposition on 3 main features
-SELECT MADLIB_SCHEMA.svdmf_run('test'::text, 'col_num'::text, 'row_num'::text, 'val'::text, 3);
-
--- Display portion of the results
-SELECT * FROM MADLIB_SCHEMA.matrix_u ORDER BY col_num, row_num LIMIT 10;
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/96f9ac04/src/ports/postgres/modules/utilities/control.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/control.py_in b/src/ports/postgres/modules/utilities/control.py_in
index eccc61f..a0c3d55 100644
--- a/src/ports/postgres/modules/utilities/control.py_in
+++ b/src/ports/postgres/modules/utilities/control.py_in
@@ -19,6 +19,11 @@ from utilities import unique_string
_unique_string = unique_string
+STATE_IN_MEM = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+HAS_FUNCTION_PROPERTIES = m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!True!>, <!False!>)
+UDF_ON_SEGMENT_NOT_ALLOWED = m4_ifdef(<!__UDF_ON_SEGMENT_NOT_ALLOWED__!>, <!True!>, <!False!>)
+
+
class EnableOptimizer(object):
"""
@@ -35,11 +40,12 @@ class EnableOptimizer(object):
<!True!>, <!False!>)
def __enter__(self):
+ # we depend on the fact that all GPDB/HAWQ versions that have the ORCA
+ # optimizer also define function properties
if self.guc_exists:
optimizer = plpy.execute("show optimizer")[0]["optimizer"]
self.optimizer_enabled = True if optimizer == 'on' else False
plpy.execute("set optimizer={0}".format(('off', 'on')[self.to_enable]))
- return self
def __exit__(self, *args):
if args and args[0]:
@@ -161,8 +167,7 @@ class IterationController:
self.inWith = False
self.iteration = -1
self.initialize_state = initialize_state
- self.is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
- if self.is_hawq:
+ if STATE_IN_MEM:
self.new_state={"_iteration": 0, "_state": None}
self.old_state={"_iteration": 0, "_state": None}
@@ -186,12 +191,13 @@ class IterationController:
def __exit__(self, type, value, tb):
self.inWith = False
- if self.is_hawq:
+ if STATE_IN_MEM:
insert_plan = plpy.prepare("""
INSERT INTO {rel_state}
SELECT $1, $2
""".format(**self.kwargs), ["INTEGER", "DOUBLE PRECISION[]"])
- plpy.execute(insert_plan, [self.new_state['_iteration'], self.new_state['_state']])
+ plpy.execute(insert_plan,
+ [self.new_state['_iteration'], self.new_state['_state']])
def runSQL(self, sql):
if self.verbose:
@@ -215,38 +221,31 @@ class IterationController:
# For GPDB 4.3 we disable the optimizer (ORCA) for the query planner
# since currently ORCA has a bug for left outer joins (MPP-21868).
# This should be removed when the issue is fixed in ORCA.
- if version_wrapper.is_gp43() or version_wrapper.is_hawq():
- optimizer = plpy.execute("SHOW optimizer")[0]['optimizer']
- plpy.execute("SET optimizer = off")
-
- if self.is_hawq:
- eval_plan= plpy.prepare("""
- SELECT ({expression}) AS expression
- FROM {{rel_args}} AS _args
- LEFT OUTER JOIN (
- SELECT $1 AS _state
- ) AS _state ON True
- """.format(expression=expression).
- format(iteration=self.iteration,
- **self.kwargs), ["DOUBLE PRECISION[]"])
-
- resultObject = plpy.execute(eval_plan, [[]
- if self.new_state['_state'] is None else self.new_state['_state']])
- else:
- resultObject= self.runSQL("""
- SELECT ({expression}) AS expression
- FROM {{rel_args}} AS _args
- LEFT OUTER JOIN (
- SELECT _state
- FROM {{rel_state}} AS _state
- WHERE _state._iteration = {{iteration}}
- ) AS _state ON True
- """.format(expression=expression).format(
- iteration=self.iteration,
- **self.kwargs))
-
- if version_wrapper.is_gp43() or version_wrapper.is_hawq():
- plpy.execute("SET optimizer = " + optimizer)
+ with EnableOptimizer(False):
+ if STATE_IN_MEM:
+ eval_plan = plpy.prepare("""
+ SELECT ({expression}) AS expression
+ FROM {{rel_args}} AS _args
+ LEFT OUTER JOIN (
+ SELECT $1 AS _state
+ ) AS _state ON True
+ """.format(expression=expression).
+ format(iteration=self.iteration, **self.kwargs),
+ ["DOUBLE PRECISION[]"])
+ resultObject = plpy.execute(eval_plan,
+ [[] if self.new_state['_state'] is None
+ else self.new_state['_state']])
+ else:
+ resultObject = self.runSQL("""
+ SELECT ({expression}) AS expression
+ FROM {{rel_args}} AS _args
+ LEFT OUTER JOIN (
+ SELECT _state
+ FROM {{rel_state}} AS _state
+ WHERE _state._iteration = {{iteration}}
+ ) AS _state ON True
+ """.format(expression=expression).
+ format(iteration=self.iteration, **self.kwargs))
if resultObject.nrows() == 0:
return None
@@ -266,10 +265,7 @@ class IterationController:
@return None if \c condition evaluates to NULL, otherwise the Boolean
value of \c condition
"""
-
- return self.evaluate("""
- CAST(({condition}) AS BOOLEAN)
- """.format(condition=condition))
+ return self.evaluate("CAST(({0}) AS BOOLEAN)".format(condition))
def update(self, newState, **updateKwargs):
"""
@@ -292,24 +288,22 @@ class IterationController:
is kept.
"""
updateKwargs.update(**self.kwargs)
- newState = newState.format(
- iteration=self.iteration,
- **updateKwargs)
+ newState = newState.format(iteration=self.iteration, **updateKwargs)
self.iteration = self.iteration + 1
- if self.is_hawq:
+ if STATE_IN_MEM:
self.old_state = self.new_state
- update_plan= plpy.prepare("""
+ update_plan = plpy.prepare("""
SELECT
{iteration} AS _iteration,
({newState}) AS _state
""".format(iteration=self.iteration,
- newState=newState,
- **self.kwargs).format(__state__='$1'), ["DOUBLE PRECISION[]"])
+ newState=newState, **self.kwargs).
+ format(__state__='$1'),
+ ["DOUBLE PRECISION[]"])
self.new_state = plpy.execute(update_plan,
- [None if self.new_state['_state'] is None else self.new_state['_state']])[0]
- # Deal with possible double underflow
- # self.new_state['_state'] = map(lambda r: 1e-307 if abs(r) < 1e-307 else r, self.new_state['_state'])
+ [None if self.new_state['_state'] is None
+ else self.new_state['_state']])[0]
else:
self.runSQL("""
INSERT INTO {rel_state}
@@ -317,15 +311,13 @@ class IterationController:
{iteration},
({newState})
""".format(
- iteration=self.iteration,
- newState=newState,
- **self.kwargs))
-
- if not self.is_hawq:
+ iteration=self.iteration,
+ newState=newState,
+ **self.kwargs))
if self.truncAfterIteration:
self.runSQL("""
- DELETE FROM {rel_state} AS _state
- WHERE _state._iteration < {iteration}
+ DELETE FROM {rel_state} AS _state
+ WHERE _state._iteration < {iteration}
""".format(iteration=self.iteration, **self.kwargs))
@@ -337,12 +329,14 @@ class IterationController2D(IterationController):
def __exit__(self, type, value, tb):
self.inWith = False
- if self.is_hawq:
- insert_plan= plpy.prepare("""
- INSERT INTO {rel_state}
- SELECT $1, {schema_madlib}.array_to_2d($2)
- """.format(**self.kwargs), ["INTEGER", "DOUBLE PRECISION[]"])
- plpy.execute(insert_plan, [self.new_state['_iteration'], self.new_state['_state']])
+ if STATE_IN_MEM:
+ insert_plan = plpy.prepare("""
+ INSERT INTO {rel_state}
+ SELECT $1, {schema_madlib}.array_to_2d($2)
+ """.format(**self.kwargs),
+ ["INTEGER", "DOUBLE PRECISION[]"])
+ plpy.execute(insert_plan,
+ [self.new_state['_iteration'], self.new_state['_state']])
def evaluate(self, expression):
"""
@@ -357,40 +351,35 @@ class IterationController2D(IterationController):
@return None if \c expression evaluates to NULL, otherwise the value of
\c expression
"""
-
- # For GPDB 4.3 we disable the optimizer (ORCA) for the query planner
- # since currently ORCA has a bug for left outer joins (MPP-21868).
+ # We disable the optimizer (ORCA) for the query planning
+ # since ORCA has a bug for left outer joins (MPP-21868).
# This should be removed when the issue is fixed in ORCA.
- if version_wrapper.is_gp43() or version_wrapper.is_hawq():
- optimizer = plpy.execute("SHOW optimizer")[0]['optimizer']
- plpy.execute("SET optimizer = off")
-
- if self.is_hawq:
- eval_plan= plpy.prepare("""
- SELECT ({expression}) AS expression
- FROM {{rel_args}} AS _args
- LEFT OUTER JOIN (
- SELECT {{schema_madlib}}.array_to_2d($1) AS _state
- ) AS _state ON True
- """.format(expression=expression).format(
- **self.kwargs), ["DOUBLE PRECISION[]"])
-
- resultObject = plpy.execute(eval_plan,
- [[] if self.new_state['_state'] is None else self.new_state['_state']])
- else:
- resultObject= self.runSQL("""
- SELECT ({expression}) AS expression
- FROM {{rel_args}} AS _args
- LEFT OUTER JOIN (
- SELECT *
- FROM {{rel_state}} AS _state
- WHERE _state._iteration = {{iteration}}
- ) AS _state ON True
- """.format(expression=expression).format(iteration=self.iteration, **self.kwargs))
-
- if version_wrapper.is_gp43() or version_wrapper.is_hawq():
- plpy.execute("SET optimizer = " + optimizer)
-
+ with EnableOptimizer(False):
+ if STATE_IN_MEM:
+ eval_plan = plpy.prepare("""
+ SELECT ({expression}) AS expression
+ FROM {{rel_args}} AS _args
+ LEFT OUTER JOIN (
+ SELECT {{schema_madlib}}.array_to_2d($1) AS _state
+ ) AS _state ON True
+ """.format(expression=expression).format(
+ **self.kwargs), ["DOUBLE PRECISION[]"])
+
+ resultObject = plpy.execute(
+ eval_plan,
+ [[] if self.new_state['_state'] is None else self.new_state['_state']])
+ else:
+ resultObject = self.runSQL("""
+ SELECT ({expression}) AS expression
+ FROM {{rel_args}} AS _args
+ LEFT OUTER JOIN (
+ SELECT *
+ FROM {{rel_state}} AS _state
+ WHERE _state._iteration = {{iteration}}
+ ) AS _state ON True
+ """.format(expression=expression).
+ format(iteration=self.iteration,
+ **self.kwargs))
if resultObject.nrows() == 0:
return None
else:
@@ -422,19 +411,18 @@ class IterationController2D(IterationController):
**updateKwargs)
self.iteration = self.iteration + 1
- if self.is_hawq:
- self.old_state= self.new_state
- update_plan= plpy.prepare("""
- SELECT
- {iteration} AS _iteration,
- {schema_madlib}.array_to_1d(({newState})) AS _state
- """.format(
- iteration=self.iteration,
- newState=newState,
- **self.kwargs), ["DOUBLE PRECISION[]"])
- self.new_state= plpy.execute(update_plan,
- [None if self.new_state['_state'] is None
- else self.new_state['_state']])[0]
+ if STATE_IN_MEM:
+ self.old_state = self.new_state
+ update_plan = plpy.prepare("""
+ SELECT
+ {iteration} AS _iteration,
+ {schema_madlib}.array_to_1d(({newState})) AS _state
+ """.format(iteration=self.iteration,
+ newState=newState, **self.kwargs),
+ ["DOUBLE PRECISION[]"])
+ self.new_state = plpy.execute(update_plan,
+ [None if self.new_state['_state'] is None
+ else self.new_state['_state']])[0]
else:
self.runSQL("""
INSERT INTO {rel_state}
@@ -442,16 +430,14 @@ class IterationController2D(IterationController):
{iteration},
({newState})
""".format(
- iteration=self.iteration,
- newState=newState,
- **self.kwargs))
-
- if not self.is_hawq:
+ iteration=self.iteration,
+ newState=newState,
+ **self.kwargs))
if self.truncAfterIteration:
self.runSQL("""
- DELETE FROM {rel_state} AS _state
- WHERE _state._iteration < {iteration}
- """.format(iteration=self.iteration, **self.kwargs))
+ DELETE FROM {rel_state} AS _state
+ WHERE _state._iteration < {iteration}
+ """.format(iteration=self.iteration, **self.kwargs))
class IterationController2S(IterationController):
@@ -462,8 +448,8 @@ class IterationController2S(IterationController):
"""
def evaluate(self, expression):
- if self.is_hawq:
- eval_plan= plpy.prepare("""
+ if STATE_IN_MEM:
+ eval_plan = plpy.prepare("""
SELECT ({expression}) AS expression
FROM {{rel_args}} AS _args
LEFT OUTER JOIN (
@@ -471,45 +457,46 @@ class IterationController2S(IterationController):
$1 AS _state_previous,
$2 AS _state_current
) AS _state ON True
- """.format(expression=expression).format(
- iteration=self.iteration,
- **self.kwargs), ["DOUBLE PRECISION[]", "DOUBLE PRECISION[]"])
+ """.format(expression=expression).
+ format(iteration=self.iteration, **self.kwargs),
+ ["DOUBLE PRECISION[]", "DOUBLE PRECISION[]"])
- resultObject= plpy.execute(eval_plan,
- [self.old_state['_state'],
- self.new_state['_state']])
+ resultObject = plpy.execute(
+ eval_plan,
+ [self.old_state['_state'], self.new_state['_state']])
else:
- resultObject=self.runSQL("""
- SELECT ({expression}) AS expression
- FROM {{rel_args}} AS _args
- LEFT OUTER JOIN (
- SELECT
- _state_previous, _state_current
- FROM
- (
- SELECT _state AS _state_previous
- FROM {{rel_state}}
- WHERE _iteration = {{iteration}} - 1
- ) sub1,
- (
- SELECT _state AS _state_current
- FROM {{rel_state}}
- WHERE _iteration = {{iteration}}
- ) sub2
- ) AS _state ON True
- """.format(expression=expression).format(
- iteration=self.iteration,
- **self.kwargs))
+ resultObject = self.runSQL("""
+ SELECT ({expression}) AS expression
+ FROM {{rel_args}} AS _args
+ LEFT OUTER JOIN (
+ SELECT
+ _state_previous, _state_current
+ FROM
+ (
+ SELECT _state AS _state_previous
+ FROM {{rel_state}}
+ WHERE _iteration = {{iteration}} - 1
+ ) sub1,
+ (
+ SELECT _state AS _state_current
+ FROM {{rel_state}}
+ WHERE _iteration = {{iteration}}
+ ) sub2
+ ) AS _state ON True
+ """.format(expression=expression).
+ format(iteration=self.iteration,
+ **self.kwargs))
if resultObject.nrows() == 0:
return None
else:
return resultObject[0]['expression']
- def get_state_size(self):
- return len(self.new_state) if self.is_hawq else 0
+ if STATE_IN_MEM:
+ def get_state_size(self):
+ return len(self.new_state)
- def get_state_value(self, index):
- return self.new_state['_state'][index] if self.is_hawq else 0
+ def get_state_value(self, index):
+ return self.new_state['_state'][index]
m4_changequote(<!`!> , <!'!>)
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/96f9ac04/src/ports/postgres/modules/utilities/control_composite.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/control_composite.py_in b/src/ports/postgres/modules/utilities/control_composite.py_in
index bfe2f9d..a584845 100644
--- a/src/ports/postgres/modules/utilities/control_composite.py_in
+++ b/src/ports/postgres/modules/utilities/control_composite.py_in
@@ -16,7 +16,12 @@ import plpy
from utilities import __mad_version
version_wrapper = __mad_version()
from utilities import unique_string
-from control import MinWarning
+_unique_string = unique_string
+from control import MinWarning, EnableOptimizer
+
+STATE_IN_MEM = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+HAS_FUNCTION_PROPERTIES = m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!True!>, <!False!>)
+UDF_ON_SEGMENT_NOT_ALLOWED = m4_ifdef(<!__UDF_ON_SEGMENT_NOT_ALLOWED__!>, <!True!>, <!False!>)
class IterationControllerComposite:
@@ -58,10 +63,9 @@ class IterationControllerComposite:
self.verbose = verbose
self.inWith = False
self.iteration = -1
- m4_ifdef(<!__HAWQ__!>, <!
- self.new_state = None
- self.old_state = None
- !>)
+ if STATE_IN_MEM:
+ self.new_state = None
+ self.old_state = None
def __enter__(self):
with MinWarning('warning'):
@@ -79,18 +83,18 @@ class IterationControllerComposite:
def __exit__(self, type, value, tb):
self.inWith = False
- m4_ifdef(<!__HAWQ__!>, <!
- insert_plan=plpy.prepare("""
- INSERT INTO {rel_state}
- SELECT $1, CAST( ({schema_madlib}.array_to_2d($2), $3, $4, $5) AS {schema_madlib}.kmeans_state)
- """.format(**self.kwargs), [ "INTEGER", "DOUBLE PRECISION[]",
- "INTEGER[]", "DOUBLE PRECISION", "DOUBLE PRECISION" ])
- plpy.execute(insert_plan, [ self.new_state['_iteration'],
- self.new_state['centroids'],
- self.new_state['old_centroid_ids'],
- self.new_state['objective_fn'],
- self.new_state['frac_reassigned'] ])
- !>)
+ if STATE_IN_MEM:
+ insert_plan=plpy.prepare("""
+ INSERT INTO {rel_state}
+ SELECT $1, CAST( ({schema_madlib}.array_to_2d($2), $3, $4, $5) AS {schema_madlib}.kmeans_state)
+ """.format(**self.kwargs), ["INTEGER", "DOUBLE PRECISION[]",
+ "INTEGER[]", "DOUBLE PRECISION",
+ "DOUBLE PRECISION"])
+ plpy.execute(insert_plan, [self.new_state['_iteration'],
+ self.new_state['centroids'],
+ self.new_state['old_centroid_ids'],
+ self.new_state['objective_fn'],
+ self.new_state['frac_reassigned']])
def runSQL(self, sql):
if self.verbose:
@@ -110,47 +114,39 @@ class IterationControllerComposite:
@return None if \c expression evaluates to NULL, otherwise the value of
\c expression
"""
-
- ## For GPDB 4.3 we disable the optimzer (ORCA) for the query planner
- ## since currently ORCA has a bug for left outer joins (MPP-21868).
- ## This should be removed when the issue is fixed in ORCA.
- if version_wrapper.is_gp43() or version_wrapper.is_hawq():
- optimizer = plpy.execute("SHOW optimizer")[0]['optimizer']
- plpy.execute("SET optimizer = off")
-
- m4_ifdef(<!__HAWQ__!>, <!
- cast_str = "CAST (({schema_madlib}.array_to_2d($1), $2, $3, $4) AS {schema_madlib}.kmeans_state)".format(**self.kwargs)
- cast_type = [ "DOUBLE PRECISION[]", "INTEGER[]", "DOUBLE PRECISION", "DOUBLE PRECISION" ]
- cast_para = [None if self.new_state is None else self.new_state[i]
- for i in ('centroids', 'old_centroid_ids',
- 'objective_fn', 'frac_reassigned')]
- eval_plan = plpy.prepare("""
- SELECT ({expression}) AS expression
- FROM {{rel_args}} AS _args
- LEFT OUTER JOIN (
- SELECT {{schema_madlib}}.array_to_2d($1) AS _state
- ) AS _state ON True
- """.format(expression = expression).format(
- iteration = self.iteration,
- curr_state=cast_str, **self.kwargs), cast_type)
-
- resultObject = plpy.execute(eval_plan, cast_para)
- !>, <!
- resultObject = self.runSQL("""
- SELECT ({expression}) AS expression
- FROM {{rel_args}} AS _args
- LEFT OUTER JOIN (
- SELECT *
- FROM {{rel_state}} AS _state
- WHERE _state._iteration = {{iteration}}
- ) AS _state ON True
- """.format(expression = expression).format(
- iteration = self.iteration,
- **self.kwargs))
- !>)
-
- if version_wrapper.is_gp43() or version_wrapper.is_hawq():
- plpy.execute("SET optimizer = " + optimizer)
+ # We disable the optimzer (ORCA) for the query planning
+ # since ORCA has a bug for left outer joins (MPP-21868).
+ # This should be removed when the issue is fixed in ORCA.
+ with EnableOptimizer(False):
+ if STATE_IN_MEM:
+ cast_str = "CAST (({schema_madlib}.array_to_2d($1), $2, $3, $4) AS {schema_madlib}.kmeans_state)".format(**self.kwargs)
+ cast_type = ["DOUBLE PRECISION[]", "INTEGER[]",
+ "DOUBLE PRECISION", "DOUBLE PRECISION"]
+ cast_para = [None if self.new_state is None else self.new_state[i]
+ for i in ('centroids', 'old_centroid_ids',
+ 'objective_fn', 'frac_reassigned')]
+ eval_plan = plpy.prepare("""
+ SELECT ({expression}) AS expression
+ FROM {{rel_args}} AS _args
+ LEFT OUTER JOIN (
+ SELECT {{schema_madlib}}.array_to_2d($1) AS _state
+ ) AS _state ON True
+ """.format(expression=expression).
+ format(iteration=self.iteration,
+ curr_state=cast_str, **self.kwargs), cast_type)
+ resultObject = plpy.execute(eval_plan, cast_para)
+ else:
+ resultObject = self.runSQL("""
+ SELECT ({expression}) AS expression
+ FROM {{rel_args}} AS _args
+ LEFT OUTER JOIN (
+ SELECT *
+ FROM {{rel_state}} AS _state
+ WHERE _state._iteration = {{iteration}}
+ ) AS _state ON True
+ """.format(expression=expression).
+ format(iteration=self.iteration,
+ **self.kwargs))
if resultObject.nrows() == 0:
return None
@@ -170,10 +166,7 @@ class IterationControllerComposite:
@return None if \c condition evaluates to NULL, otherwise the Boolean
value of \c condition
"""
-
- return self.evaluate("""
- CAST(({condition}) AS BOOLEAN)
- """.format(condition = condition))
+ return self.evaluate("CAST(({0}) AS BOOLEAN)".format(condition))
def update(self, newState, **updateKwargs):
"""
@@ -196,64 +189,56 @@ class IterationControllerComposite:
is kept.
"""
updateKwargs.update(**self.kwargs)
- newState = newState.format(
- iteration = self.iteration,
- **updateKwargs)
+ newState = newState.format(iteration=self.iteration, **updateKwargs)
self.iteration = self.iteration + 1
- m4_ifdef(<!__HAWQ__!>, <!
- cast_str = """CAST (({schema_madlib}.array_to_2d($1), $2, $3, $4)
- AS {schema_madlib}.kmeans_state)""".format(**self.kwargs)
- cast_str_old = """CAST (({schema_madlib}.array_to_2d($5), $6, $7, $8)
+ if STATE_IN_MEM:
+ cast_str = """CAST (({schema_madlib}.array_to_2d($1), $2, $3, $4)
AS {schema_madlib}.kmeans_state)""".format(**self.kwargs)
- cast_type = [
- "DOUBLE PRECISION[]", "INTEGER[]", "DOUBLE PRECISION", "DOUBLE PRECISION",
- "DOUBLE PRECISION[]", "INTEGER[]", "DOUBLE PRECISION", "DOUBLE PRECISION"
- ]
-
- cast_para = [None if self.new_state is None else self.new_state[i]
- for i in ('centroids', 'old_centroid_ids', 'objective_fn',
- 'frac_reassigned')]
- cast_para.extend([None if self.old_state is None else self.old_state[i]
- for i in ('centroids', 'old_centroid_ids', 'objective_fn',
- 'frac_reassigned')])
-
- updateKwargs.update(curr_state=cast_str, old_state=cast_str_old)
- self.old_state = self.new_state
-
- update_plan = plpy.prepare("""
- SELECT
- {iteration} AS _iteration,
- {schema_madlib}.array_to_1d((_state).centroids) AS centroids,
- (_state).old_centroid_ids,
- (_state).objective_fn,
- (_state).frac_reassigned
- FROM
- (
- SELECT ({newState}) AS _state
- ) q
- """.format(iteration=self.iteration,
- newState=newState, **self.kwargs).format(**updateKwargs),
- cast_type)
- self.new_state = plpy.execute(update_plan, cast_para)[0]
- !>, <!
- self.runSQL("""
- INSERT INTO {rel_state}
- SELECT
- {iteration},
- ({newState})
- """.format(
- iteration = self.iteration,
- newState = newState,
- **self.kwargs))
- !>)
-
- m4_ifdef(<!__HAWQ__!>, <!!>, <!
- if self.truncAfterIteration:
+ cast_str_old = """CAST (({schema_madlib}.array_to_2d($5), $6, $7, $8)
+ AS {schema_madlib}.kmeans_state)""".format(**self.kwargs)
+ cast_type = [
+ "DOUBLE PRECISION[]", "INTEGER[]", "DOUBLE PRECISION", "DOUBLE PRECISION",
+ "DOUBLE PRECISION[]", "INTEGER[]", "DOUBLE PRECISION", "DOUBLE PRECISION"
+ ]
+
+ cast_para = [None if self.new_state is None else self.new_state[i]
+ for i in ('centroids', 'old_centroid_ids', 'objective_fn',
+ 'frac_reassigned')]
+ cast_para.extend([None if self.old_state is None else self.old_state[i]
+ for i in ('centroids', 'old_centroid_ids', 'objective_fn',
+ 'frac_reassigned')])
+
+ updateKwargs.update(curr_state=cast_str, old_state=cast_str_old)
+ self.old_state = self.new_state
+
+ update_plan = plpy.prepare("""
+ SELECT
+ {iteration} AS _iteration,
+ {schema_madlib}.array_to_1d((_state).centroids) AS centroids,
+ (_state).old_centroid_ids,
+ (_state).objective_fn,
+ (_state).frac_reassigned
+ FROM
+ (
+ SELECT ({newState}) AS _state
+ ) q
+ """.format(iteration=self.iteration,
+ newState=newState, **self.kwargs).format(**updateKwargs),
+ cast_type)
+ self.new_state = plpy.execute(update_plan, cast_para)[0]
+ else:
self.runSQL("""
- DELETE FROM {rel_state} AS _state
- WHERE _state._iteration < {iteration}
- """.format(iteration = self.iteration, **self.kwargs))
- !>)
+ INSERT INTO {rel_state}
+ SELECT
+ {iteration},
+ ({newState})
+ """.format(iteration=self.iteration, newState=newState,
+ **self.kwargs))
+ if self.truncAfterIteration:
+ self.runSQL("""
+ DELETE FROM {rel_state} AS _state
+ WHERE _state._iteration < {iteration}
+ """.format(iteration=self.iteration, **self.kwargs))
m4_changequote(<!`!>, <!'!>)
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/96f9ac04/src/ports/postgres/modules/utilities/group_control.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/group_control.py_in b/src/ports/postgres/modules/utilities/group_control.py_in
index ce9b7db..708a532 100644
--- a/src/ports/postgres/modules/utilities/group_control.py_in
+++ b/src/ports/postgres/modules/utilities/group_control.py_in
@@ -1,6 +1,4 @@
# coding=utf-8
-m4_changequote(`>>>', `<<<')
-
"""
@file control.py_in
@@ -20,7 +18,11 @@ _unique_string = unique_string
version_wrapper = __mad_version()
mad_vec = version_wrapper.select_vecfunc()
+m4_changequote(`<!', `!>')
+
+
class GroupIterationController:
+
"""
@brief Abstraction for implementing driver functions in PL/Python
@@ -48,13 +50,13 @@ class GroupIterationController:
"""
def __init__(self, rel_args, rel_state, stateType,
- temporaryTables = True,
- schema_madlib = "MADLIB_SCHEMA_MISSING",
- verbose = False,
- grouping_str = "NULL",
- col_grp_iteration="_iteration",
- col_grp_state="_state",
- **kwargs):
+ temporaryTables=True,
+ schema_madlib="MADLIB_SCHEMA_MISSING",
+ verbose=False,
+ grouping_str="NULL",
+ col_grp_iteration="_iteration",
+ col_grp_state="_state",
+ **kwargs):
self.temporaryTables = temporaryTables
self.verbose = verbose
self.inWith = False
@@ -62,38 +64,36 @@ class GroupIterationController:
self.grouping_str = grouping_str
self.kwargs = kwargs
self.kwargs.update(
- unqualified_rel_state = rel_state,
- rel_args = ('pg_temp.' if temporaryTables else '') + rel_args,
- rel_state = ('pg_temp.' if temporaryTables else '') + rel_state,
- stateType = stateType.format(schema_madlib = schema_madlib),
- schema_madlib = schema_madlib,
- grouping_str = self.grouping_str,
- col_grp_null = _unique_string(),
- col_grp_key = _unique_string(),
+ unqualified_rel_state=rel_state,
+ rel_args=('pg_temp.' if temporaryTables else '') + rel_args,
+ rel_state=('pg_temp.' if temporaryTables else '') + rel_state,
+ stateType=stateType.format(schema_madlib=schema_madlib),
+ schema_madlib=schema_madlib,
+ grouping_str=self.grouping_str,
+ col_grp_null=_unique_string(),
+ col_grp_key=_unique_string(),
col_grp_iteration=col_grp_iteration,
col_grp_state=col_grp_state
- )
- grouping_col = "Null" if kwargs["grouping_col"] is None \
- else kwargs["grouping_col"]
- using_str = "on True" if kwargs["grouping_col"] is None \
- else "using ({grouping_col})".format(**kwargs)
+ )
+ grouping_col = "Null" if kwargs["grouping_col"] is None else kwargs["grouping_col"]
+ using_str = "on True" if kwargs["grouping_col"] is None else "using ({grouping_col})".format(**kwargs)
self.is_group_null = True if kwargs["grouping_col"] is None else False
self.kwargs["grouping_col"] = grouping_col
self.kwargs["using_str"] = using_str
self.grouping_col = grouping_col
- m4_ifdef(>>>__HAWQ__<<<, >>>
- self.new_states = []
- self.old_states = []
- self.finished_states = []
- <<<)
+ self.hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+ if self.hawq:
+ self.new_states = []
+ self.old_states = []
+ self.finished_states = []
def __enter__(self):
with MinWarning('warning'):
# currently assuming that groups is passed as a valid array
- group_col = ("NULL::integer as {col_grp_null}" if self.is_group_null \
- else "{grouping_col}").format(**self.kwargs)
- groupby_str = ("{col_grp_null}" if self.is_group_null \
- else "{grouping_col}").format(**self.kwargs)
+ group_col = ("NULL::integer as {col_grp_null}" if self.is_group_null
+ else "{grouping_col}").format(**self.kwargs)
+ groupby_str = ("{col_grp_null}" if self.is_group_null
+ else "{grouping_col}").format(**self.kwargs)
primary_str = "" if self.is_group_null else ", {grouping_col}".format(**self.kwargs)
self.runSQL(
@@ -104,94 +104,93 @@ class GroupIterationController:
{group_col},
0::integer as {col_grp_iteration},
Null::{stateType} as {col_grp_state}
- m4_ifdef(>>>__HAWQ__<<<, >>>, array_to_string(ARRAY[{grouping_str}], ',') AS {col_grp_key}<<<)
+ m4_ifdef(<!__HAWQ__!>, <!, array_to_string(ARRAY[{grouping_str}], ',') AS {col_grp_key}!>)
from {rel_source}
group by {groupby_str}
);
- m4_ifdef(>>>__POSTGRESQL__<<<, >>><<<, >>>
+ m4_ifdef(<!__POSTGRESQL__!>, <!!>, <!
alter table {rel_state} set distributed by ({col_grp_iteration} {primary_str});
- <<<)
+ !>)
""".format(group_col=group_col, groupby_str=groupby_str, primary_str=primary_str,
temp='TEMPORARY' if self.temporaryTables else '', **self.kwargs))
null_test = " or ".join([g.strip() + " is NULL" for g in
- self.kwargs['grouping_col'].split(",")])
+ self.kwargs['grouping_col'].split(",")])
null_count = plpy.execute(
"""
select count(*) from {rel_state} where {null_test}
""".format(null_test=null_test, **self.kwargs))[0]['count']
- if null_count != 0 and primary_str != "":
+ if null_count != 0 and primary_str:
plpy.error("Grouping error: at least one of the grouping columns contains NULL values!"
" Please filter out those NULL values.")
- self.runSQL("""
- m4_ifdef(>>>__HAWQ__<<<, >>><<<, >>>
- alter table {rel_state} add primary key ({col_grp_iteration} {primary_str})
- <<<)
- """.format(primary_str=primary_str, **self.kwargs))
- m4_ifdef(>>>__HAWQ__<<<, >>>
- self.new_states = plpy.execute("SELECT * FROM {rel_state}".format(**self.kwargs))
- if not self.is_group_null:
- converted_source_table = _unique_string()
- plpy.execute("""
- CREATE TEMP TABLE {converted_source_table} AS
- SELECT *, array_to_string(ARRAY[{grouping_str}], ',') AS {col_grp_key} FROM {rel_source}
- """.format(converted_source_table=converted_source_table, **self.kwargs))
- self.kwargs.update(rel_source=converted_source_table)
- <<<)
+ if not self.hawq:
+ self.runSQL("alter table {rel_state} add primary key "
+ "({col_grp_iteration} {primary_str})".
+ format(primary_str=primary_str, **self.kwargs))
+ else:
+ self.new_states = plpy.execute("SELECT * FROM {rel_state}".format(**self.kwargs))
+ if not self.is_group_null:
+ converted_source_table = _unique_string()
+ plpy.execute("""
+ CREATE TEMP TABLE {converted_source_table} AS
+ SELECT *, array_to_string(ARRAY[{grouping_str}], ',') AS {col_grp_key} FROM {rel_source}
+ """.format(converted_source_table=converted_source_table, **self.kwargs))
+ self.kwargs.update(rel_source=converted_source_table)
self.inWith = True
return self
def __exit__(self, type, value, tb):
self.inWith = False
- m4_ifdef(>>>__HAWQ__<<<, >>>
- insert_plan = plpy.prepare("""
- INSERT INTO {rel_state}
- SELECT
- {grouping_col},
- _rel_state.iteration,
- _rel_state.state,
- _rel_state.grp_key
- FROM
- (
- SELECT
- {grouping_col}, {col_grp_key}
- FROM {rel_state}
- WHERE {col_grp_iteration} = 0
- ) AS _src,
- (
+ if self.hawq:
+ insert_sql = """
+ INSERT INTO {rel_state}
SELECT
- grp_key, iteration, state
+ {grouping_col},
+ _rel_state.iteration,
+ _rel_state.state,
+ _rel_state.grp_key
FROM
- {schema_madlib}._gen_state($1, $2, $3)
- ) AS _rel_state
- WHERE _src.{col_grp_key} = _rel_state.grp_key
- """.format(**self.kwargs), ["text[]", "integer[]", "double precision[]"])
-
- grp_keys = []
- states = []
- iterations = []
- null_empty = []
- for state in self.finished_states:
- # the lenght of states for null_empty groups are different
- if state["{col_grp_state}".format(**self.kwargs)][-1] == 3:
- null_empty.append(state)
- continue
- iterations.append(state["{col_grp_iteration}".format(**self.kwargs)])
- grp_keys.append(state["{col_grp_key}".format(**self.kwargs)])
- states.extend(state["{col_grp_state}".format(**self.kwargs)])
- plpy.execute(insert_plan, [grp_keys, iterations, states])
-
- grp_keys = []
- states = []
- iterations = []
- for state in null_empty:
- iterations.append(state["{col_grp_iteration}".format(**self.kwargs)])
- grp_keys.append(state["{col_grp_key}".format(**self.kwargs)])
- states.extend(state["{col_grp_state}".format(**self.kwargs)])
- plpy.execute(insert_plan, [grp_keys, iterations, states])
-
- if not self.is_group_null:
- plpy.execute('DROP TABLE IF EXISTS ' + self.kwargs['rel_source'])
- <<<)
+ (
+ SELECT
+ {grouping_col}, {col_grp_key}
+ FROM {rel_state}
+ WHERE {col_grp_iteration} = 0
+ ) AS _src,
+ (
+ SELECT
+ grp_key, iteration, state
+ FROM
+ {schema_madlib}._gen_state($1, $2, $3)
+ ) AS _rel_state
+ WHERE _src.{col_grp_key} = _rel_state.grp_key
+ """.format(**self.kwargs)
+ insert_plan = plpy.prepare(insert_sql, ["text[]", "integer[]", "double precision[]"])
+
+ grp_keys = []
+ states = []
+ iterations = []
+ null_empty = []
+ for state in self.finished_states:
+ # the length of states for null_empty groups are different
+ if state["{col_grp_state}".format(**self.kwargs)][-1] == 3:
+ null_empty.append(state)
+ continue
+ iterations.append(state["{col_grp_iteration}".format(**self.kwargs)])
+ grp_keys.append(state["{col_grp_key}".format(**self.kwargs)])
+ states.extend(state["{col_grp_state}".format(**self.kwargs)])
+ plpy.execute(insert_plan, [grp_keys, iterations, states])
+
+ grp_keys = []
+ states = []
+ iterations = []
+ for state in null_empty:
+ iterations.append(state["{col_grp_iteration}".format(**self.kwargs)])
+ grp_keys.append(state["{col_grp_key}".format(**self.kwargs)])
+ states.extend(state["{col_grp_state}".format(**self.kwargs)])
+ insert_plan = plpy.prepare(insert_sql, ["text[]", "integer[]", "double precision[]"])
+ plpy.execute(insert_plan, [grp_keys, iterations, states])
+
+ if not self.is_group_null:
+ plpy.execute('DROP TABLE IF EXISTS ' + self.kwargs['rel_source'])
def runSQL(self, sql):
if self.verbose:
@@ -211,45 +210,44 @@ class GroupIterationController:
@return None if \c expression evaluates to NULL, otherwise the value of
\c expression
"""
- m4_ifdef(>>>__HAWQ__<<<, >>>
- eva_plan = plpy.prepare("""
- SELECT
- ({expression}) AS _expression,
- subq1.grp_key AS _groups
- FROM {{rel_args}} AS _args
- left outer join (
- (
- SELECT grp_key, state AS _state_previous
- FROM {schema_madlib}._gen_state($1, NULL, $2)
- ) sub1
- JOIN
- (
- SELECT grp_key, state AS _state_current
- FROM {schema_madlib}._gen_state($3, NULL, $4)
- ) sub2
- using (grp_key)
- ) AS subq1 ON True
- """.format(expression = expression, **self.kwargs).format(
- iteration=self.iteration, **self.kwargs),
- ["text[]", "double precision[]", "text[]", "double precision[]"])
-
- grp_keys_new = []
- states_new = []
- for state in self.new_states:
- if state["{col_grp_state}".format(**self.kwargs)][-1] < 2:
- grp_keys_new.append(state["{col_grp_key}".format(**self.kwargs)])
- states_new.extend(state["{col_grp_state}".format(**self.kwargs)])
- grp_keys_old = []
- states_old = []
- for state in self.old_states:
- grp_keys_old.append(state["{col_grp_key}".format(**self.kwargs)])
- if state["{col_grp_state}".format(**self.kwargs)] is not None:
- states_old.extend(state["{col_grp_state}".format(**self.kwargs)])
-
- resultObject = plpy.execute(eva_plan, [grp_keys_old, states_old, grp_keys_new, states_new])
- <<<,
- >>>resultObject = self.runSQL(
- """
+ if self.hawq:
+ eva_plan = plpy.prepare("""
+ SELECT
+ ({expression}) AS _expression,
+ subq1.grp_key AS _groups
+ FROM {{rel_args}} AS _args
+ left outer join (
+ (
+ SELECT grp_key, state AS _state_previous
+ FROM {schema_madlib}._gen_state($1, NULL, $2)
+ ) sub1
+ JOIN
+ (
+ SELECT grp_key, state AS _state_current
+ FROM {schema_madlib}._gen_state($3, NULL, $4)
+ ) sub2
+ using (grp_key)
+ ) AS subq1 ON True
+ """.format(expression=expression, **self.kwargs).
+ format(iteration=self.iteration, **self.kwargs),
+ ["text[]", "double precision[]", "text[]", "double precision[]"])
+
+ grp_keys_new = []
+ states_new = []
+ for state in self.new_states:
+ if state["{col_grp_state}".format(**self.kwargs)][-1] < 2:
+ grp_keys_new.append(state["{col_grp_key}".format(**self.kwargs)])
+ states_new.extend(state["{col_grp_state}".format(**self.kwargs)])
+ grp_keys_old = []
+ states_old = []
+ for state in self.old_states:
+ grp_keys_old.append(state["{col_grp_key}".format(**self.kwargs)])
+ if state["{col_grp_state}".format(**self.kwargs)] is not None:
+ states_old.extend(state["{col_grp_state}".format(**self.kwargs)])
+
+ resultObject = plpy.execute(eva_plan, [grp_keys_old, states_old, grp_keys_new, states_new])
+ else:
+ resultObject = self.runSQL("""
SELECT
({expression}) AS _expression,
ARRAY[{{grouping_str}}] AS _groups
@@ -268,35 +266,34 @@ class GroupIterationController:
) sub2
{using_str}
) AS subq1 ON True
- """.format(expression = expression, **self.kwargs).format(
- iteration=self.iteration, **self.kwargs))
- <<<)
+ """.format(expression=expression, **self.kwargs).
+ format(iteration=self.iteration, **self.kwargs))
if resultObject.nrows() == 0:
return None
else:
- m4_ifdef(>>>__HAWQ__<<<, >>>complete_grps = []<<<)
+ complete_grps = []
for each_elem in resultObject:
- m4_ifdef(>>>__HAWQ__<<<, >>><<<, >>>
- # update status for each group
- group_vector = mad_vec(each_elem["_groups"])
- groups_as_str = [None] * len(group_vector)
- # convert group values to string objects
- for index, each_grp in enumerate(group_vector):
- if not each_grp or each_grp.lower() == 'null':
- # NULL values should be outputed as NULL instead of
- # as a string 'NULL'
- groups_as_str[index] = "NULL::text"
- else:
- groups_as_str[index] = "'" + str(each_grp) + "'::text"
- array_str = "array[" + ",".join(groups_as_str) + "]"
- <<<)
+ if not self.hawq:
+ # update status for each group
+ group_vector = mad_vec(each_elem["_groups"])
+ groups_as_str = [None] * len(group_vector)
+ # convert group values to string objects
+ for index, each_grp in enumerate(group_vector):
+ if not each_grp or each_grp.lower() == 'null':
+ # NULL values should be outputed as NULL instead of
+ # as a string 'NULL'
+ groups_as_str[index] = "NULL::text"
+ else:
+ groups_as_str[index] = "'" + str(each_grp) + "'::text"
+ array_str = "array[" + ",".join(groups_as_str) + "]"
# update status for the group if it completed iterating
if each_elem['_expression']:
- m4_ifdef(>>>__HAWQ__<<<,
- >>>complete_grps.append(each_elem["_groups"])<<<,
- >>>self.runSQL(
- """UPDATE {rel_state} set {col_grp_state}[array_upper({col_grp_state}, 1)] = 1
+ if self.hawq:
+ complete_grps.append(each_elem["_groups"])
+ else:
+ self.runSQL("""
+ UPDATE {rel_state} set {col_grp_state}[array_upper({col_grp_state}, 1)] = 1
WHERE
ARRAY[{grouping_str}] = {_group_val} and
{col_grp_state}[array_upper({col_grp_state}, 1)] < 2 and
@@ -305,34 +302,32 @@ class GroupIterationController:
_group_val=array_str,
iteration=self.iteration,
**self.kwargs))
- <<<)
- m4_ifdef(>>>__HAWQ__<<<, >>>
- all_finish = True
- tmp = []
- for state in self.new_states:
- if state["{col_grp_key}".format(**self.kwargs)] in complete_grps \
- and state["{col_grp_state}".format(**self.kwargs)][-1] < 1:
- state["{col_grp_state}".format(**self.kwargs)][-1] = 1
- if state["{col_grp_state}".format(**self.kwargs)][-1] < 1:
- all_finish = False
- tmp.append(state)
- else:
- self.finished_states.append(state)
- self.new_states = tmp
- return all_finish
- <<<, >>>
- # return True only if all group combinations have finished iterating
- rv = self.runSQL(
- """
- select bool_and({col_grp_state}[array_upper({col_grp_state}, 1)]::integer::boolean) as rst
- from {rel_state} as _state_table
- where _state_table.{col_grp_iteration} = {iteration}
- """.format(
- iteration=self.iteration,
- **self.kwargs))[0]["rst"]
- return rv
- <<<)
+ if self.hawq:
+ all_finish = True
+ tmp = []
+ for state in self.new_states:
+ if state["{col_grp_key}".format(**self.kwargs)] in complete_grps \
+ and state["{col_grp_state}".format(**self.kwargs)][-1] < 1:
+ state["{col_grp_state}".format(**self.kwargs)][-1] = 1
+ if state["{col_grp_state}".format(**self.kwargs)][-1] < 1:
+ all_finish = False
+ tmp.append(state)
+ else:
+ self.finished_states.append(state)
+ self.new_states = tmp
+ return all_finish
+ else:
+ # return True only if all group combinations have finished iterating
+ rv = self.runSQL(
+ """
+ select bool_and({col_grp_state}[array_upper({col_grp_state}, 1)]::integer::boolean) as rst
+ from {rel_state} as _state_table
+ where _state_table.{col_grp_iteration} = {iteration}
+ """.format(
+ iteration=self.iteration,
+ **self.kwargs))[0]["rst"]
+ return rv
def test(self, condition):
"""
@@ -347,10 +342,7 @@ class GroupIterationController:
@return None if \c condition evaluates to NULL, otherwise the Boolean
value of \c condition
"""
- return self.evaluate(
- """
- CAST(({condition}) AS BOOLEAN)
- """.format(condition = condition))
+ return self.evaluate("CAST(({0}) AS BOOLEAN)".format(condition))
def update(self, newState, **updateKwargs):
"""
@@ -374,76 +366,72 @@ class GroupIterationController:
"""
newState = newState.format(**self.kwargs)
self.iteration = self.iteration + 1
-
groupby_str = "" if self.is_group_null \
else "group by {grouping_col}, {col_grp_key}".format(**self.kwargs)
-
- m4_ifdef(>>>__HAWQ__<<<, >>>
- groupby_str = "" if self.is_group_null \
- else "group by {grouping_col}, {col_grp_key}".format(**self.kwargs)
- update_plan = plpy.prepare(
- """
- select
- {_grp_key} AS {col_grp_key},
- {grouping_col} {as_string},
- {iteration} AS {col_grp_iteration},
- ({newState}) AS {col_grp_state}
- from
- {rel_source} AS _src,
- (
- SELECT
- grp_key, state AS {col_grp_state}
- FROM
- {schema_madlib}._gen_state($1, NULL, $2)
- ) AS rel_state
- where
- {_grp_key} = grp_key
- {groupby_str}
- """.format(
- iteration=self.iteration,
- groupby_str=groupby_str,
- as_string='AS _grp' if self.is_group_null else '',
- _grp_key="array_to_string(ARRAY[{grouping_str}], ',')".format(**self.kwargs) if self.is_group_null else self.kwargs['col_grp_key'],
- newState = newState,
- **self.kwargs), ["text[]", "double precision[]"])
- grp_keys = []
- states = []
- for state in self.new_states:
- grp_keys.append(state["{col_grp_key}".format(**self.kwargs)])
- if state["{col_grp_state}".format(**self.kwargs)] is not None:
- states.extend(state["{col_grp_state}".format(**self.kwargs)])
-
- self.old_states = self.new_states
- self.new_states = plpy.execute(update_plan, [grp_keys, states])
- <<<,
- >>>
- groupby_str = "" if self.is_group_null \
- else "group by {grouping_col}".format(**self.kwargs)
- self.runSQL(
- """
- insert into {rel_state}
- (select
- {grouping_col},
- {iteration},
- ({newState})
+ if self.hawq:
+ groupby_str = "" if self.is_group_null \
+ else "group by {grouping_col}, {col_grp_key}".format(**self.kwargs)
+ update_plan = plpy.prepare(
+ """
+ select
+ {_grp_key} AS {col_grp_key},
+ {grouping_col} {as_string},
+ {iteration} AS {col_grp_iteration},
+ ({newState}) AS {col_grp_state}
from
- ({rel_source} AS _src
- join
- {rel_state} AS rel_state
- {using_str})
+ {rel_source} AS _src,
+ (
+ SELECT
+ grp_key, state AS {col_grp_state}
+ FROM
+ {schema_madlib}._gen_state($1, NULL, $2)
+ ) AS rel_state
where
- rel_state.{col_grp_iteration} = {iteration} - 1 and
- (case when {iteration} = 1 then
- True
- else
- rel_state.{col_grp_state}[array_upper(rel_state.{col_grp_state}, 1)] = 0
- end)
- {groupby_str})
- """.format(
- groupby_str = groupby_str,
- iteration = self.iteration,
- newState = newState,
- **self.kwargs))
- <<<)
-
-m4_changequote(>>>`<<<, >>>'<<<)
+ {_grp_key} = grp_key
+ {groupby_str}
+ """.format(
+ iteration=self.iteration,
+ groupby_str=groupby_str,
+ as_string='AS _grp' if self.is_group_null else '',
+ _grp_key="array_to_string(ARRAY[{grouping_str}], ',')".format(**self.kwargs) if self.is_group_null else self.kwargs['col_grp_key'],
+ newState=newState,
+ **self.kwargs), ["text[]", "double precision[]"])
+ grp_keys = []
+ states = []
+ for state in self.new_states:
+ grp_keys.append(state["{col_grp_key}".format(**self.kwargs)])
+ if state["{col_grp_state}".format(**self.kwargs)] is not None:
+ states.extend(state["{col_grp_state}".format(**self.kwargs)])
+
+ self.old_states = self.new_states
+ self.new_states = plpy.execute(update_plan, [grp_keys, states])
+ else:
+ groupby_str = "" if self.is_group_null \
+ else "group by {grouping_col}".format(**self.kwargs)
+ self.runSQL(
+ """
+ insert into {rel_state}
+ (select
+ {grouping_col},
+ {iteration},
+ ({newState})
+ from
+ ({rel_source} AS _src
+ join
+ {rel_state} AS rel_state
+ {using_str})
+ where
+ rel_state.{col_grp_iteration} = {iteration} - 1 and
+ (case when {iteration} = 1 then
+ True
+ else
+ rel_state.{col_grp_state}[array_upper(rel_state.{col_grp_state}, 1)] = 0
+ end)
+ {groupby_str})
+ """.format(
+ groupby_str=groupby_str,
+ iteration=self.iteration,
+ newState=newState,
+ **self.kwargs))
+
+m4_changequote(<!`!>, <!'!>)