You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ri...@apache.org on 2016/04/01 03:21:22 UTC

[03/11] incubator-madlib git commit: Build: Add support for HAWQ 2.0

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/96f9ac04/src/ports/postgres/modules/linalg/linalg.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/linalg/linalg.sql_in b/src/ports/postgres/modules/linalg/linalg.sql_in
index 80b9660..331dbb5 100644
--- a/src/ports/postgres/modules/linalg/linalg.sql_in
+++ b/src/ports/postgres/modules/linalg/linalg.sql_in
@@ -452,18 +452,7 @@ m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!NO SQL!>, <!!>);
  *     column of \f$ M \f$ and \f$ x \f$. That is,
  *     \f$ \min_{i=0,\dots,l-1} \operatorname{dist}(\vec{m_i}, \vec x) \f$.
  */
-m4_ifdef(<!__HAWQ__!>, <!!>, <!
-CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_column(
-    M DOUBLE PRECISION[],
-    x DOUBLE PRECISION[],
-    dist REGPROC /*+ DEFAULT 'squared_dist_norm2' */
-) RETURNS MADLIB_SCHEMA.closest_column_result AS $$
-    SELECT MADLIB_SCHEMA._closest_column($1, $2, $3, textin(regprocout($3)))
-$$ LANGUAGE SQL IMMUTABLE STRICT
-m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `CONTAINS SQL');
-!>)
-
-m4_ifdef(<!__HAWQ__!>, <!
+m4_ifdef(<!__UDF_ON_SEGMENT_NOT_ALLOWED__!>, <!
 CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_column(
     M    DOUBLE PRECISION[],
     x    DOUBLE PRECISION[],
@@ -472,8 +461,17 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_column(
 IMMUTABLE
 STRICT
 LANGUAGE C
-AS 'MODULE_PATHNAME', 'closest_column_hawq'
+AS 'MODULE_PATHNAME', 'closest_column_fixed'
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL');
+!>, <!
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_column(
+    M DOUBLE PRECISION[],
+    x DOUBLE PRECISION[],
+    dist REGPROC /*+ DEFAULT 'squared_dist_norm2' */
+) RETURNS MADLIB_SCHEMA.closest_column_result AS $$
+    SELECT MADLIB_SCHEMA._closest_column($1, $2, $3, textin(regprocout($3)))
+$$ LANGUAGE SQL IMMUTABLE STRICT
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `CONTAINS SQL');
 !>)
 
 CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_column(
@@ -526,19 +524,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA._closest_columns(
 LANGUAGE C IMMUTABLE STRICT
 m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!NO SQL!>, <!!>);
 
-m4_ifdef(<!__HAWQ__!>, <!!>, <!
-CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_columns(
-    M DOUBLE PRECISION[],
-    x DOUBLE PRECISION[],
-    num INTEGER,
-    dist REGPROC
-) RETURNS MADLIB_SCHEMA.closest_columns_result AS $$
-    SELECT MADLIB_SCHEMA._closest_columns($1, $2, $3, $4, textin(regprocout($4)))
-$$ LANGUAGE SQL IMMUTABLE STRICT
-m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `CONTAINS SQL');
-!>)
-
-m4_ifdef(<!__HAWQ__!>, <!
+m4_ifdef(<!__UDF_ON_SEGMENT_NOT_ALLOWED__!>, <!
 CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_columns(
     M DOUBLE PRECISION[],
     x DOUBLE PRECISION[],
@@ -548,8 +534,18 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_columns(
 IMMUTABLE
 STRICT
 LANGUAGE C
-AS 'MODULE_PATHNAME', 'closest_columns_hawq'
+AS 'MODULE_PATHNAME', 'closest_columns_fixed'
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL');
+!>, <!
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_columns(
+    M DOUBLE PRECISION[],
+    x DOUBLE PRECISION[],
+    num INTEGER,
+    dist REGPROC
+) RETURNS MADLIB_SCHEMA.closest_columns_result AS $$
+    SELECT MADLIB_SCHEMA._closest_columns($1, $2, $3, $4, textin(regprocout($4)))
+$$ LANGUAGE SQL IMMUTABLE STRICT
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `CONTAINS SQL');
 !>)
 
 CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.closest_columns(

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/96f9ac04/src/ports/postgres/modules/svd_mf/__init__.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/svd_mf/__init__.py_in b/src/ports/postgres/modules/svd_mf/__init__.py_in
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/96f9ac04/src/ports/postgres/modules/svd_mf/svdmf.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/svd_mf/svdmf.py_in b/src/ports/postgres/modules/svd_mf/svdmf.py_in
deleted file mode 100644
index 081dd7c..0000000
--- a/src/ports/postgres/modules/svd_mf/svdmf.py_in
+++ /dev/null
@@ -1,258 +0,0 @@
-import plpy
-import datetime
-from math import floor, log, pow, sqrt
-
-"""@file svdmf.py_in
-
-Implementation of partial SVD decomposition of a sparse matrix into U and V components.
-"""
-
-# ----------------------------------------
-# Logging
-# ----------------------------------------
-def info( msg):
-    #plpy.info( datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' : ' + msg);
-	plpy.info( msg);
-	
-# ----------------------------------------
-# Logging
-# ----------------------------------------
-def svdmf_run( madlib_schema, input_matrix, col_name, row_name, value, num_features):
-   return(svdmf_run_full( madlib_schema, input_matrix, col_name, row_name, value, num_features, 1000, .0001))
-
-# ----------------------------------------
-# Main: svdmf_run
-# ----------------------------------------
-def svdmf_run_full( madlib_schema, input_matrix, col_name, row_name, value, num_features, NUM_ITERATIONS, MIN_IMPROVEMENT):
-	"""
-	These constants are important for the execution of the algorithms and may need to be changed for some types of data. 
-
-	ORIGINAL_STEP - is a size of the step in the direction of gradient. 
-	It is divided by ~M*N*(M+N). Too large of a step 
-	may cause algorithm failure to converge and overflows. 
-	Too small of the step will make execution longer than necessary.
-
-	SPEEDUP_CONST, FAST_SPEEDUP_CONST and SLOWDOWN_CONST - 
-	Size of step is changing dynamically during the execution, 
-	as long as algorithm making a progress step size is increased 
-	at each iteration by multiplying by SPEEDUP_CONST (speed-up constant > 1). 
-	When algorithm starts to fail to converge step is made smaller abruptly 
-	to avoid overflow by multiplying by SLOWDOWN_CONST. 
-	In some instances initial step size is very small, 
-	to quickly get it into the suitable range FAST_SPEEDUP_CONST (> 1) is used.
-	This means that generally FAST_SPEEDUP_CONST >> SPEEDUP_CONST. 
-	Those values may be changed if necessary.
-
-	NUM_ITERATIONS is the maximum number of iterations to perform. 
-	Generally this should not be the cause for termination, but on some data it is 
-	possible that algorithm is making progress, but at a very slow pace, 
-	in this case termination will occur when this number of iterations is reached.
-
-	MIN_NUM_ITERATIONS - sometimes algorithm starts with very small progress 
-	and it then increases. To avoid the case when it exits prematurely one can 
-	set a min number of iterations after which progress will be checked
-
-	MIN_IMPROVEMENT - minimum improvement that has to be sustained for 
-	algorithm to continue. It is compared with consecutive error terms. 
-	Sometimes it needs to be lowered 
-	in instances when initial progress may be too small, or increased 
-	if execution it taking too long, and level of needed precision is surpassed.   
-
-	INIT_VALUE - initial value that new rows are starting with. 
-	Value is almost irrelevant in most cases, but it should not be 0, 
-	since this value is used as a multiple in subsequent steps.  
-
-	EARLY_TEMINATE - tells algorithm to terminate early if after a given 
-	dimension (< K) error is less than MIN_IMPROVEMENT. It should generally be TRUE. 
-	Since there is no contribution that one should expect from adding consequent dimensions. 
-	"""
-
-	# Record the time
-	start = datetime.datetime.now();
-
-	ORIGINAL_STEP = .001; 
-	SPEEDUP_CONST = 1.1;
-	FAST_SPEEDUP_CONST = 10.0;
-	SLOWDOWN_CONST = .1;
-	#NUM_ITERATIONS = 1000;
-	#MIN_IMPROVEMENT = 0.000001;
-	MIN_NUM_ITERATIONS = 1;
-	IMPROVEMENT_REACHED = True;
-	INIT_VALUE = 0.1;
-	EARLY_TEMINATE = True;
-
-	error=0;
-	keep_ind = 1;
-	SD_ind = 1;
-
-	# Find sizes of the input and number of elements in the input
-	res = plpy.execute('SELECT count(distinct ' + col_name + ') AS c FROM ' + input_matrix + ';'); 
-	feature_y = res[0]['c'];
-	res = plpy.execute('SELECT count(distinct ' + row_name + ') AS c FROM ' + input_matrix + ';'); 
-	feature_x = res[0]['c'];
-	res = plpy.execute('SELECT count(*) AS c FROM ' + input_matrix + ';'); 
-	cells = res[0]['c'];
-
-	# Adjust step size based on the size  
-	#ORIGINAL_STEP = ORIGINAL_STEP/(feature_x+feature_y);
-
-	# Parameters summary:
-	info( 'Started svdmf_run() with parameters:');
-	info( ' * input_matrix = %s' % input_matrix);
-	info( ' * col_name = %s' % col_name);
-	info( ' * row_name = %s' % row_name);
-	info( ' * value = %s' % value);
-	info( ' * num_features = %s' % str(num_features));
-	info('got there...');
-
-	# Create output and intermediate (temp) tables necessary for the execution
-	# matrix_u and matrix_v are for end results. tables e1, e2, e are for residual errors
-	# tables S1, S2, D1, D2 are for estimates of row values of u and v.
-	sql = '''
-	DROP TABLE IF EXISTS ''' + madlib_schema + '''.matrix_u;
-	CREATE TABLE ''' + madlib_schema + '''.matrix_u(
-		col_num INT, 
-		row_num INT,
-		val FLOAT
-	);
-	DROP TABLE IF EXISTS ''' + madlib_schema + '''.matrix_v;
-	CREATE TABLE ''' + madlib_schema + '''.matrix_v(
-		row_num INT,
-		col_num INT, 
-		val FLOAT
-	);
-	DROP TABLE IF EXISTS e1;
-	CREATE TEMP TABLE e1(
-		row_num INT, 
-		col_num INT,
-		val FLOAT
-	) m4_ifdef( `__POSTGRESQL__', `', `DISTRIBUTED BY (row_num, col_num)');
-	DROP TABLE IF EXISTS S1;
-	CREATE TEMP TABLE S1(
-		col_num INT,
-		row_num INT,
-		val FLOAT
-	) m4_ifdef( `__POSTGRESQL__', `', `DISTRIBUTED BY (col_num)');
-	DROP TABLE IF EXISTS S2;
-	CREATE TEMP TABLE S2(
-		col_num INT,
-		row_num INT,
-		val FLOAT
-	) m4_ifdef( `__POSTGRESQL__', `', `DISTRIBUTED BY (col_num)');
-	DROP TABLE IF EXISTS D1;
-	CREATE TEMP TABLE D1(
-		row_num INT,
-		col_num INT,
-		val FLOAT
-	) m4_ifdef( `__POSTGRESQL__', `', `DISTRIBUTED BY (row_num)');
-	DROP TABLE IF EXISTS D2;
-	CREATE TEMP TABLE D2(
-		row_num INT,
-		col_num INT,
-		val FLOAT
-	) m4_ifdef( `__POSTGRESQL__', `', `DISTRIBUTED BY (row_num)');
-	DROP TABLE IF EXISTS e;
-	CREATE TABLE e(
-		row_num INT, 
-		col_num INT,
-		val FLOAT
-	) m4_ifdef( `__POSTGRESQL__', `', `DISTRIBUTED BY (row_num,col_num)');
-	''';
-	plpy.execute(sql);
-
-	# Copy original data into a temp table
-	info( 'Copying the source data into a temporary table...');
-	plpy.execute('INSERT INTO e1 SELECT ' + row_name + ', ' + col_name + ', ' + str(value) + ' FROM ' + input_matrix + ';');
-    
-	# Create tables to keep most of the execution data
-	sql = '''
-		TRUNCATE TABLE S1;
-		TRUNCATE TABLE S2;
-		TRUNCATE TABLE D1;
-		TRUNCATE TABLE D2;
-		''';
-	plpy.execute(sql);
-	
-	# populate initial vectors
-	plpy.execute('INSERT INTO S1 (row_num, col_num, val) SELECT (g.a-1)/' +str(num_features)+ '+1, (g.a-1)%' +str(num_features)+ '+1, random() FROM generate_series(1,'+str(feature_x*num_features)+') AS g(a);');
-	plpy.execute('INSERT INTO D1 (col_num, row_num, val) SELECT (g.a-1)/' +str(num_features)+ '+1, (g.a-1)%' +str(num_features)+ '+1, random() FROM generate_series(1,'+str(feature_y*num_features)+') AS g(a);');
-	
-	SD_ind = 1;
-	i = 0;
-	step = ORIGINAL_STEP;
-	imp_reached = False;
-        
-	while(True):
-		
-		i = i + 1;
-		
-		sql = '''            
-			TRUNCATE TABLE e; 
-			''';
-		plpy.execute(sql);
-		
-		# Create current predictions for the values and compute error per term
-		#plpy.execute('INSERT INTO e SELECT a.row_num, a.col_num, a.val-o.prod FROM (SELECT s.row_num, d.col_num, '+madlib_schema+'.array_dot(d.val,s.val) AS prod FROM (SELECT row_num, array_agg(val ORDER BY col_num) AS val FROM S'+str(SD_ind)+' GROUP BY row_num) AS s CROSS JOIN (SELECT col_num, array_agg(val ORDER BY row_num) AS val FROM D'+str(SD_ind)+' GROUP BY col_num) AS d) as o, e1 as a WHERE a.row_num=o.row_num AND a.col_num=o.col_num;');
-		plpy.execute('INSERT INTO e SELECT a.row_num, a.col_num, a.val-o.prod FROM (SELECT s.row_num, d.col_num, sum(s.val*d.val) AS prod FROM S'+str(SD_ind)+' AS s JOIN D'+str(SD_ind)+' AS d ON d.row_num = s.col_num GROUP BY s.row_num, d.col_num) AS o, e1 AS a WHERE a.row_num=o.row_num AND a.col_num=o.col_num');
-		old_error = error;
-				
-		res = plpy.execute('SELECT sqrt(sum(val*val)) AS c FROM e;');
-		error = res[0]['c'];
-		
-		info( '...Iteration ' + str(i) + ': residual_error = '+ str(error) +', step_size = ' + str(step) + ', min_improvement = ' + str(MIN_IMPROVEMENT));
-		
-		# Check if progress is being made or max number of iterations reached
-		if(((abs(error - old_error) < MIN_IMPROVEMENT) and (i >= MIN_NUM_ITERATIONS) and ((error < MIN_IMPROVEMENT) or (not IMPROVEMENT_REACHED) or (imp_reached))) or (NUM_ITERATIONS < i)):
-			break;
-		
-		if((abs(error - old_error) >= MIN_IMPROVEMENT) and (old_error > 0)):
-			imp_reached = True;
-		
-		# Check if step size need to be increased or decreased
-		if((error > old_error) and (old_error != 0)) :
-			error = 0;
-			step = step*SLOWDOWN_CONST;
-			SD_ind = SD_ind%2+1;
-			continue;
-		elif(sqrt((error - old_error)*(error - old_error)) < .1*MIN_IMPROVEMENT):
-			step = step*FAST_SPEEDUP_CONST;
-		else:
-			step = step*SPEEDUP_CONST;
-			
-		# Empty intermediate tables    
-		plpy.execute('TRUNCATE TABLE S'+str(SD_ind%2+1)+';');
-		plpy.execute('TRUNCATE TABLE D'+str(SD_ind%2+1)+';');
-	
-		# Update values of the vectors   
-		# The commented out line are an implementation that uses ordered aggrigates that we chose to avoid for compatibility reasons 
-		#plpy.execute('INSERT INTO S'+str(SD_ind%2+1)+' SELECT s.col_num, s.row_num, s.val+'+str(step)+'*o.prod FROM (SELECT e.row_num, d.row_num as col_num, '+madlib_schema+'.array_dot(d.val,e.val) AS prod FROM (SELECT row_num, array_agg(val ORDER BY col_num) AS val FROM e GROUP BY row_num) AS e CROSS JOIN (SELECT row_num, array_agg(val ORDER BY col_num) AS val FROM D'+str(SD_ind)+' GROUP BY row_num) AS d) as o, S'+str(SD_ind)+' as s WHERE s.col_num = o.col_num AND s.row_num = o.row_num;');
-		#plpy.execute('INSERT INTO D'+str(SD_ind%2+1)+' SELECT d.row_num, d.col_num, d.val+'+str(step)+'*o.prod FROM (SELECT s.col_num AS row_num, e.col_num, '+madlib_schema+'.array_dot(e.val,s.val) AS prod FROM (SELECT col_num, array_agg(val ORDER BY row_num) AS val FROM S'+str(SD_ind)+' GROUP BY col_num) AS s CROSS JOIN (SELECT col_num, array_agg(val ORDER BY row_num) AS val FROM e GROUP BY col_num) AS e) as o, D'+str(SD_ind)+' as d WHERE d.col_num = o.col_num AND d.row_num = o.row_num;');
-		
-		plpy.execute('INSERT INTO S'+str(SD_ind%2+1)+' SELECT s.col_num, s.row_num, s.val+'+str(step)+'*o.prod FROM (SELECT e.row_num, d.row_num AS col_num, sum(e.val*d.val) AS prod FROM e JOIN D'+str(SD_ind)+' AS d ON e.col_num = d.col_num GROUP BY e.row_num, d.row_num) AS o, S'+str(SD_ind)+' AS s WHERE s.row_num=o.row_num AND s.col_num=o.col_num');  
-		plpy.execute('INSERT INTO D'+str(SD_ind%2+1)+' SELECT d.row_num, d.col_num, d.val+'+str(step)+'*o.prod FROM (SELECT s.col_num AS row_num, e.col_num, sum(e.val*s.val) AS prod FROM e JOIN S'+str(SD_ind)+' AS s ON s.row_num = e.row_num GROUP BY e.col_num, s.col_num) AS o, D'+str(SD_ind)+' AS d WHERE d.row_num=o.row_num AND d.col_num=o.col_num');
-		
-		SD_ind = SD_ind%2+1;
-		
-		# Save previous vector and prepare for computing another one
-		keep_ind = keep_ind%2+1;
-		
-		if((error < MIN_IMPROVEMENT) and (EARLY_TEMINATE)):
-			break;
-	
-	plpy.execute('INSERT INTO ' + madlib_schema + '.matrix_u SELECT * FROM S'+str(SD_ind)+';'); 
-	plpy.execute('INSERT INTO ' + madlib_schema + '.matrix_v SELECT * FROM D'+str(SD_ind)+';'); 
-
-	# Runtime evaluation
-	end = datetime.datetime.now();
-	minutes, seconds = divmod( (end - start).seconds, 60)
-	microsec = (end - start).microseconds	            
-	return ('''
-		Finished SVD matrix factorisation for %s (%s, %s, %s). 
-		Results: 
-		 * total error = %s
-		Output:
-		 * table : ''' + madlib_schema + '''.matrix_u
-		 * table : ''' + madlib_schema + '''.matrix_v
-		Time elapsed: %d minutes %d.%d seconds.
-		''') % (input_matrix, row_name, col_name, value, str(error), minutes, seconds, microsec)
-    

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/96f9ac04/src/ports/postgres/modules/svd_mf/svdmf.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/svd_mf/svdmf.sql_in b/src/ports/postgres/modules/svd_mf/svdmf.sql_in
deleted file mode 100644
index d85b7a8..0000000
--- a/src/ports/postgres/modules/svd_mf/svdmf.sql_in
+++ /dev/null
@@ -1,199 +0,0 @@
-/* ----------------------------------------------------------------------- *//**
- *
- * @file svdmf.sql_in
- *
- * @brief SQL functions for SVD Matrix Factorization
- * @date   January 2011
- *
- * @sa For a brief introduction to SVD Matrix Factorization, see the module
- *     description \ref grp_svd.
- *
- *//* ----------------------------------------------------------------------- */
-
-m4_include(`SQLCommon.m4')
-
-/**
-@addtogroup grp_svdmf 
-
-\warning <em> This is an old implementation of Matrix Decomposition and
-has been deprecated. For SVD decomposition, please see \ref grp_svd;
-for the latest version of low-rank approximation, please see \ref grp_lmf</em>
-
-<div class="toc"><b>Contents</b>
-<ul>
-<li><a href="#syntax">SVD Function Syntax</a></li>
-<li><a href="#xamples">Examples</a></li>
-<li><a href="#literature">Literature</a></li>
-<li><a href="#related">Related Topics</a></li>
-</ul>
-</div>
-
-@brief Computes low-rank approximation of a sparse matrix.
-
-This module implements "partial SVD decomposition" method for representing a sparse matrix using a low-rank approximation.
-Mathematically, this algorithm seeks to find matrices U and V that, for any given A, minimizes:\n
-\f[ ||\boldsymbol A - \boldsymbol UV ||_2
-\f]
-subject to \f$ rank(\boldsymbol UV) \leq k \f$, where \f$ ||\cdot||_2 \f$ denotes the Frobenius norm and \f$ k \leq rank(\boldsymbol A)\f$.
-If A is \f$ m \times n \f$, then U will be \f$ m \times k \f$ and V will be \f$ k \times n \f$.
-
-This algorithm is not intended to do the full decomposition, or to be used as part of
-inverse procedure. It effectively computes the SVD of a low-rank approximation of A (preferably sparse), with the singular values absorbed in U and V.
-Code is based on the write-up as appears at [1], with some modifications.
-
-
-
-@anchor syntax
-@par Function Syntax
-
-The SVD function is called as follows:
-<pre class="syntax">
-svdmf_run( input_table,
-           col_name,
-           row_name,
-           value, num_features)
-</pre>
-
-The <b>input matrix</b> is expected to be of the following form:
-<pre>{TABLE|VIEW} <em>input_table</em> (
-    <em>col_num</em> INTEGER,
-    <em>row_num</em> INTEGER,
-  <em>value</em> FLOAT
-)</pre>
-
-Input is contained in a table where column number and row number for each cell
-are sequential; that is to say that if the data was written as a matrix, those values would be the
-actual row and column numbers and not some random identifiers. All rows and columns must be associated with a value.
-There should not be any missing row, columns or values.
-
-The function returns two tables \c matrix_u and \c matrix_v, which represent the matrices U and V in table format.
-
-@anchor examples
-@examp
-
--# Prepare an input table/view.
-<pre class="example">
-CREATE TABLE svd_test ( col INT,
-                        row INT,
-                        val FLOAT
-                      );
-</pre>
--# Populate the input table with some data.
-<pre class="example">
-INSERT INTO svd_test SELECT ( g.a%1000)+1, g.a/1000+1, random()
-                     FROM generate_series(1,1000) AS g(a);
-</pre>
--# Call the svdmf_run() stored procedure.
-<pre class="example">
-SELECT madlib.svdmf_run( 'svd_test',
-                         'col',
-                         'row',
-                         'val',
-                         3);
-</pre>
-Example result:
-<pre class="result">
-INFO:  ('Started svdmf_run() with parameters:',)
-INFO:  (' * input_matrix = madlib_svdsparse_test.test',)
-INFO:  (' * col_name = col_num',)
-INFO:  (' * row_name = row_num',)
-INFO:  (' * value = val',)
-INFO:  (' * num_features = 3',)
-INFO:  ('Copying the source data into a temporary table...',)
-INFO:  ('Estimating feature: 1',)
-INFO:  ('...Iteration 1: residual_error = 33345014611.1, step_size = 4.9997500125e-10, min_improvement = 1.0',)
-INFO:  ('...Iteration 2: residual_error = 33345014557.6, step_size = 5.49972501375e-10, min_improvement = 1.0',)
-INFO:  ('...Iteration 3: residual_error = 33345014054.3, step_size = 6.04969751512e-10, min_improvement = 1.0',)
-...
-INFO:  ('...Iteration 78: residual_error = 2.02512133868, step_size = 5.78105354457e-10, min_improvement = 1.0',)
-INFO:  ('...Iteration 79: residual_error = 0.893810181282, step_size = 6.35915889903e-10, min_improvement = 1.0',)
-INFO:  ('...Iteration 80: residual_error = 0.34496773222, step_size = 6.99507478893e-10, min_improvement = 1.0',)
-INFO:  ('Swapping residual error matrix...',)
-                                         svdmf_run
-&nbsp;-------------------------------------------------------------------------------------------
-
- Finished SVD matrix factorisation for madlib_svdsparse_test.test (row_num, col_num, val).
- Results:
-  * total error = 0.34496773222
-  * number of estimated features = 1
- Output:
-  * table : madlib.matrix_u
-  * table : madlib.matrix_v
- Time elapsed: 4 minutes 47.86839 seconds.
-</pre>
-
-@anchor literature
-@literature
-
-[1] Simon Funk, Netflix Update: Try This at Home, December 11 2006,
-    http://sifter.org/~simon/journal/20061211.html
-
-
-@anchor related
-@par Related Topics
-File svdmf.sql_in documenting the SQL functions.
-
-@internal
-@sa namespace svdmf (documenting the implementation in Python)
-@endinternal
-
-*/
-
-/**
- * @brief Partial SVD decomposition of a sparse matrix into U and V components
- *
- * This function takes as input the table representation of a sparse matrix and
- * decomposes it into the specified set of most significant features of matrices
- * of U and V matrix.
- *
- *   @param input_table Name of the table/view with the source data
- *   @param col_name Name of the column containing cell column number
- *   @param row_name Name of the column containing cell row number
- *   @param value Name of the column containing cell value
- *   @param num_features Rank of desired approximation
- *
- */
-CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.svdmf_run(
-    input_table TEXT, col_name TEXT, row_name TEXT, value TEXT, num_features INT
-)
-RETURNS TEXT
-AS $$
-
-    PythonFunctionBodyOnly(`svd_mf', `svdmf')
-
-    # schema_madlib comes from PythonFunctionBodyOnly
-    return svdmf.svdmf_run( schema_madlib, input_table, col_name, row_name, value, num_features);
-
-$$ LANGUAGE plpythonu VOLATILE
-m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
-
-/**
- * @brief Partial SVD decomposition of a sparse matrix into U and V components
- *
- * This function takes as input the table representation of a sparse matrix and
- * decomposes it into the specified set of most significant features of matrices
- * of U and V matrix.
- *
- *   @param input_table Name of the table/view with the source data
- *   @param col_name Name of the column containing cell column number
- *   @param row_name Name of the column containing cell row number
- *   @param value Name of the column containing cell value
- *   @param num_features Rank of desired approximation
- *   @param num_iterations Maximum number if iterations to perform regardless of convergence
- *   @param min_error Acceptable level of error in convergence.
- *
- */
-
-CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.svdmf_run(
-    input_table TEXT, col_name TEXT, row_name TEXT, value TEXT, num_features INT, num_iterations INT, min_error FLOAT
-)
-RETURNS TEXT
-AS $$
-
-    PythonFunctionBodyOnly(`svd_mf', `svdmf')
-
-    # schema_madlib comes from PythonFunctionBodyOnly
-    return svdmf.svdmf_run_full( schema_madlib, input_table, col_name, row_name, value, num_features, num_iterations, min_error);
-
-$$ LANGUAGE plpythonu VOLATILE
-m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/96f9ac04/src/ports/postgres/modules/svd_mf/test/svdmf.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/svd_mf/test/svdmf.sql_in b/src/ports/postgres/modules/svd_mf/test/svdmf.sql_in
deleted file mode 100644
index c77e633..0000000
--- a/src/ports/postgres/modules/svd_mf/test/svdmf.sql_in
+++ /dev/null
@@ -1,79 +0,0 @@
----------------------------------------------------------------------------
--- Rules: 
--- ------
--- 1) Any DB objects should be created w/o schema prefix,
---    since this file is executed in a separate schema context.
--- 2) There should be no DROP statements in this script, since
---    all objects created in the default schema will be cleaned-up outside.
----------------------------------------------------------------------------
-
----------------------------------------------------------------------------
--- Setup: 
----------------------------------------------------------------------------
---------------------------------------------------------------------------------
--- Generate_sparse:
---	Creates a table represetint sparse matrix which is the result or products of two 
---	sequential valued rows.
---	Used for testing general convergence on data that has a discoverable structure
---
---  $1 - number of rows in the matrix
---	$2 - number of columns in the matrix
---	$3 - number of empty cells pre single cell containing a value
---------------------------------------------------------------------------------
-CREATE OR REPLACE FUNCTION Generate_Sparse(INT, INT, INT) RETURNS void AS $$
-declare
-	i INT;
-begin
--- DROP TABLE IF EXISTS test;
-CREATE TABLE test(
-	row_num INT, 
-	col_num INT,
-	val FLOAT
-);
-
-FOR i IN 1..$1 LOOP
-	EXECUTE 'INSERT INTO test SELECT '||i||', gen.a, CAST((gen.a*'||i||') AS FLOAT) FROM (SELECT CAST((random()*'||$2||'+1) AS INT) AS a FROM generate_series(1,'||$2/$3||')) as gen';
-END LOOP;
-end
-$$ LANGUAGE plpgsql;
-
-
---------------------------------------------------------------------------------
--- Generate_random:
---	Creates a table represetint sparse matrix with random values.
---	Used for testing general convergence poroperties on random data,
---	where convergence rate should be minimal
---
---      $1 - number of rows in the matrix
---	$2 - number of columns in the matrix
---	$3 - number of empty cells pre single cell containing a value
---------------------------------------------------------------------------------
-CREATE OR REPLACE FUNCTION Generate_Random(INT, INT, INT) RETURNS void AS $$
-declare
-	i INT;
-begin
--- DROP TABLE IF EXISTS test;
-CREATE TABLE test(
-	row_num INT, 
-	col_num INT,
-	val FLOAT
-);
-
-FOR i IN 1..$1 LOOP
-	EXECUTE 'INSERT INTO test SELECT '||i||', gen.a, random() FROM (SELECT CAST((random()*'||$2||'+1) AS INT) AS a FROM generate_series(1,'||$2/$3||')) as gen';
-END LOOP;
-end
-$$ LANGUAGE plpgsql;
-
----------------------------------------------------------------
--- Test
----------------------------------------------------------------
--- Pick a test to run: Random or Sequential Sparse; and creat a test table
--- SELECT Generate_Random(10000, 10000, 100);
-SELECT Generate_Sparse(10, 10, 5);
-
--- Run SVD decomposition on 3 main features
-SELECT MADLIB_SCHEMA.svdmf_run('test'::text, 'col_num'::text, 'row_num'::text, 'val'::text, 3);
-
--- Display portion of the results
-SELECT * FROM MADLIB_SCHEMA.matrix_u ORDER BY col_num, row_num LIMIT 10;

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/96f9ac04/src/ports/postgres/modules/utilities/control.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/control.py_in b/src/ports/postgres/modules/utilities/control.py_in
index eccc61f..a0c3d55 100644
--- a/src/ports/postgres/modules/utilities/control.py_in
+++ b/src/ports/postgres/modules/utilities/control.py_in
@@ -19,6 +19,11 @@ from utilities import unique_string
 _unique_string = unique_string
 
 
+STATE_IN_MEM = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+HAS_FUNCTION_PROPERTIES = m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!True!>, <!False!>)
+UDF_ON_SEGMENT_NOT_ALLOWED = m4_ifdef(<!__UDF_ON_SEGMENT_NOT_ALLOWED__!>, <!True!>, <!False!>)
+
+
 class EnableOptimizer(object):
 
     """
@@ -35,11 +40,12 @@ class EnableOptimizer(object):
                                    <!True!>, <!False!>)
 
     def __enter__(self):
+        # we depend on the fact that all GPDB/HAWQ versions that have the ORCA
+        # optimizer also define function properties
         if self.guc_exists:
             optimizer = plpy.execute("show optimizer")[0]["optimizer"]
             self.optimizer_enabled = True if optimizer == 'on' else False
             plpy.execute("set optimizer={0}".format(('off', 'on')[self.to_enable]))
-        return self
 
     def __exit__(self, *args):
         if args and args[0]:
@@ -161,8 +167,7 @@ class IterationController:
         self.inWith = False
         self.iteration = -1
         self.initialize_state = initialize_state
-        self.is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
-        if self.is_hawq:
+        if STATE_IN_MEM:
             self.new_state={"_iteration": 0, "_state": None}
             self.old_state={"_iteration": 0, "_state": None}
 
@@ -186,12 +191,13 @@ class IterationController:
 
     def __exit__(self, type, value, tb):
         self.inWith = False
-        if self.is_hawq:
+        if STATE_IN_MEM:
             insert_plan = plpy.prepare("""
                 INSERT INTO {rel_state}
                 SELECT $1, $2
                 """.format(**self.kwargs), ["INTEGER", "DOUBLE PRECISION[]"])
-            plpy.execute(insert_plan, [self.new_state['_iteration'], self.new_state['_state']])
+            plpy.execute(insert_plan,
+                         [self.new_state['_iteration'], self.new_state['_state']])
 
     def runSQL(self, sql):
         if self.verbose:
@@ -215,38 +221,31 @@ class IterationController:
         # For GPDB 4.3 we disable the optimizer (ORCA) for the query planner
         # since currently ORCA has a bug for left outer joins (MPP-21868).
         # This should be removed when the issue is fixed in ORCA.
-        if version_wrapper.is_gp43() or version_wrapper.is_hawq():
-            optimizer = plpy.execute("SHOW optimizer")[0]['optimizer']
-            plpy.execute("SET optimizer = off")
-
-        if self.is_hawq:
-            eval_plan= plpy.prepare("""
-                SELECT ({expression}) AS expression
-                FROM {{rel_args}} AS _args
-                    LEFT OUTER JOIN (
-                        SELECT $1 AS _state
-                    ) AS _state ON True
-                """.format(expression=expression).
-                    format(iteration=self.iteration,
-                           **self.kwargs), ["DOUBLE PRECISION[]"])
-
-            resultObject = plpy.execute(eval_plan, [[]
-                                        if self.new_state['_state'] is None else self.new_state['_state']])
-        else:
-            resultObject= self.runSQL("""
-            SELECT ({expression}) AS expression
-            FROM {{rel_args}} AS _args
-                LEFT OUTER JOIN (
-                    SELECT _state
-                    FROM {{rel_state}} AS _state
-                    WHERE _state._iteration = {{iteration}}
-                ) AS _state ON True
-            """.format(expression=expression).format(
-                     iteration=self.iteration,
-                     **self.kwargs))
-
-        if version_wrapper.is_gp43() or version_wrapper.is_hawq():
-            plpy.execute("SET optimizer = " + optimizer)
+        with EnableOptimizer(False):
+            if STATE_IN_MEM:
+                eval_plan = plpy.prepare("""
+                    SELECT ({expression}) AS expression
+                    FROM {{rel_args}} AS _args
+                        LEFT OUTER JOIN (
+                            SELECT $1 AS _state
+                        ) AS _state ON True
+                    """.format(expression=expression).
+                        format(iteration=self.iteration, **self.kwargs),
+                        ["DOUBLE PRECISION[]"])
+                resultObject = plpy.execute(eval_plan,
+                                            [[] if self.new_state['_state'] is None
+                                             else self.new_state['_state']])
+            else:
+                resultObject = self.runSQL("""
+                    SELECT ({expression}) AS expression
+                    FROM {{rel_args}} AS _args
+                        LEFT OUTER JOIN (
+                            SELECT _state
+                            FROM {{rel_state}} AS _state
+                            WHERE _state._iteration = {{iteration}}
+                        ) AS _state ON True
+                    """.format(expression=expression).
+                        format(iteration=self.iteration, **self.kwargs))
 
         if resultObject.nrows() == 0:
             return None
@@ -266,10 +265,7 @@ class IterationController:
         @return None if \c condition evaluates to NULL, otherwise the Boolean
             value of \c condition
         """
-
-        return self.evaluate("""
-            CAST(({condition}) AS BOOLEAN)
-            """.format(condition=condition))
+        return self.evaluate("CAST(({0}) AS BOOLEAN)".format(condition))
 
     def update(self, newState, **updateKwargs):
         """
@@ -292,24 +288,22 @@ class IterationController:
         is kept.
         """
         updateKwargs.update(**self.kwargs)
-        newState = newState.format(
-            iteration=self.iteration,
-            **updateKwargs)
+        newState = newState.format(iteration=self.iteration, **updateKwargs)
         self.iteration = self.iteration + 1
 
-        if self.is_hawq:
+        if STATE_IN_MEM:
             self.old_state = self.new_state
-            update_plan= plpy.prepare("""
+            update_plan = plpy.prepare("""
                 SELECT
                     {iteration} AS _iteration,
                     ({newState}) AS _state
                 """.format(iteration=self.iteration,
-                           newState=newState,
-                           **self.kwargs).format(__state__='$1'), ["DOUBLE PRECISION[]"])
+                           newState=newState, **self.kwargs).
+                    format(__state__='$1'),
+                    ["DOUBLE PRECISION[]"])
             self.new_state = plpy.execute(update_plan,
-                                          [None if self.new_state['_state'] is None else self.new_state['_state']])[0]
-            # Deal with possible double underflow
-            # self.new_state['_state'] = map(lambda r: 1e-307 if abs(r) < 1e-307 else r, self.new_state['_state'])
+                                          [None if self.new_state['_state'] is None
+                                           else self.new_state['_state']])[0]
         else:
             self.runSQL("""
                 INSERT INTO {rel_state}
@@ -317,15 +311,13 @@ class IterationController:
                     {iteration},
                     ({newState})
                 """.format(
-                          iteration=self.iteration,
-                          newState=newState,
-                          **self.kwargs))
-
-        if not self.is_hawq:
+                    iteration=self.iteration,
+                    newState=newState,
+                    **self.kwargs))
             if self.truncAfterIteration:
                 self.runSQL("""
-                   DELETE FROM {rel_state} AS _state
-                   WHERE _state._iteration < {iteration}
+                    DELETE FROM {rel_state} AS _state
+                    WHERE _state._iteration < {iteration}
                 """.format(iteration=self.iteration, **self.kwargs))
 
 
@@ -337,12 +329,14 @@ class IterationController2D(IterationController):
 
     def __exit__(self, type, value, tb):
         self.inWith = False
-        if self.is_hawq:
-            insert_plan= plpy.prepare("""
-                INSERT INTO {rel_state}
-                SELECT $1, {schema_madlib}.array_to_2d($2)
-                """.format(**self.kwargs), ["INTEGER", "DOUBLE PRECISION[]"])
-            plpy.execute(insert_plan, [self.new_state['_iteration'], self.new_state['_state']])
+        if STATE_IN_MEM:
+            insert_plan = plpy.prepare("""
+                    INSERT INTO {rel_state}
+                    SELECT $1, {schema_madlib}.array_to_2d($2)
+                """.format(**self.kwargs),
+                ["INTEGER", "DOUBLE PRECISION[]"])
+            plpy.execute(insert_plan,
+                         [self.new_state['_iteration'], self.new_state['_state']])
 
     def evaluate(self, expression):
         """
@@ -357,40 +351,35 @@ class IterationController2D(IterationController):
         @return None if \c expression evaluates to NULL, otherwise the value of
             \c expression
         """
-
-        # For GPDB 4.3 we disable the optimizer (ORCA) for the query planner
-        # since currently ORCA has a bug for left outer joins (MPP-21868).
+        # We disable the optimizer (ORCA) for the query planning
+        # since ORCA has a bug for left outer joins (MPP-21868).
         # This should be removed when the issue is fixed in ORCA.
-        if version_wrapper.is_gp43() or version_wrapper.is_hawq():
-            optimizer = plpy.execute("SHOW optimizer")[0]['optimizer']
-            plpy.execute("SET optimizer = off")
-
-        if self.is_hawq:
-            eval_plan= plpy.prepare("""
-               SELECT ({expression}) AS expression
-               FROM {{rel_args}} AS _args
-                   LEFT OUTER JOIN (
-                       SELECT {{schema_madlib}}.array_to_2d($1) AS _state
-                   ) AS _state ON True
-               """.format(expression=expression).format(
-                        **self.kwargs), ["DOUBLE PRECISION[]"])
-
-            resultObject = plpy.execute(eval_plan,
-                                        [[] if self.new_state['_state'] is None else self.new_state['_state']])
-        else:
-            resultObject= self.runSQL("""
-                SELECT ({expression}) AS expression
-                FROM {{rel_args}} AS _args
-                    LEFT OUTER JOIN (
-                        SELECT *
-                        FROM {{rel_state}} AS _state
-                        WHERE _state._iteration = {{iteration}}
-                    ) AS _state ON True
-                """.format(expression=expression).format(iteration=self.iteration, **self.kwargs))
-
-        if version_wrapper.is_gp43() or version_wrapper.is_hawq():
-            plpy.execute("SET optimizer = " + optimizer)
-
+        with EnableOptimizer(False):
+            if STATE_IN_MEM:
+                eval_plan = plpy.prepare("""
+                    SELECT ({expression}) AS expression
+                    FROM {{rel_args}} AS _args
+                        LEFT OUTER JOIN (
+                            SELECT {{schema_madlib}}.array_to_2d($1) AS _state
+                        ) AS _state ON True
+                    """.format(expression=expression).format(
+                               **self.kwargs), ["DOUBLE PRECISION[]"])
+
+                resultObject = plpy.execute(
+                    eval_plan,
+                    [[] if self.new_state['_state'] is None else self.new_state['_state']])
+            else:
+                resultObject = self.runSQL("""
+                    SELECT ({expression}) AS expression
+                    FROM {{rel_args}} AS _args
+                        LEFT OUTER JOIN (
+                            SELECT *
+                            FROM {{rel_state}} AS _state
+                            WHERE _state._iteration = {{iteration}}
+                        ) AS _state ON True
+                    """.format(expression=expression).
+                        format(iteration=self.iteration,
+                               **self.kwargs))
         if resultObject.nrows() == 0:
             return None
         else:
@@ -422,19 +411,18 @@ class IterationController2D(IterationController):
             **updateKwargs)
         self.iteration = self.iteration + 1
 
-        if self.is_hawq:
-            self.old_state= self.new_state
-            update_plan= plpy.prepare("""
-               SELECT
-                   {iteration} AS _iteration,
-                   {schema_madlib}.array_to_1d(({newState})) AS _state
-               """.format(
-                        iteration=self.iteration,
-                        newState=newState,
-                        **self.kwargs), ["DOUBLE PRECISION[]"])
-            self.new_state= plpy.execute(update_plan,
-                                         [None if self.new_state['_state'] is None
-                                          else self.new_state['_state']])[0]
+        if STATE_IN_MEM:
+            self.old_state = self.new_state
+            update_plan = plpy.prepare("""
+                SELECT
+                    {iteration} AS _iteration,
+                    {schema_madlib}.array_to_1d(({newState})) AS _state
+                """.format(iteration=self.iteration,
+                           newState=newState, **self.kwargs),
+                ["DOUBLE PRECISION[]"])
+            self.new_state = plpy.execute(update_plan,
+                                          [None if self.new_state['_state'] is None
+                                           else self.new_state['_state']])[0]
         else:
             self.runSQL("""
                 INSERT INTO {rel_state}
@@ -442,16 +430,14 @@ class IterationController2D(IterationController):
                     {iteration},
                     ({newState})
                 """.format(
-                         iteration=self.iteration,
-                         newState=newState,
-                         **self.kwargs))
-
-        if not self.is_hawq:
+                    iteration=self.iteration,
+                    newState=newState,
+                    **self.kwargs))
             if self.truncAfterIteration:
                 self.runSQL("""
-                        DELETE FROM {rel_state} AS _state
-                        WHERE _state._iteration < {iteration}
-                    """.format(iteration=self.iteration, **self.kwargs))
+                    DELETE FROM {rel_state} AS _state
+                    WHERE _state._iteration < {iteration}
+                """.format(iteration=self.iteration, **self.kwargs))
 
 
 class IterationController2S(IterationController):
@@ -462,8 +448,8 @@ class IterationController2S(IterationController):
     """
 
     def evaluate(self, expression):
-        if self.is_hawq:
-            eval_plan= plpy.prepare("""
+        if STATE_IN_MEM:
+            eval_plan = plpy.prepare("""
                 SELECT ({expression}) AS expression
                 FROM {{rel_args}} AS _args
                     LEFT OUTER JOIN (
@@ -471,45 +457,46 @@ class IterationController2S(IterationController):
                             $1 AS _state_previous,
                             $2 AS _state_current
                     ) AS _state ON True
-                """.format(expression=expression).format(
-                         iteration=self.iteration,
-                         **self.kwargs), ["DOUBLE PRECISION[]", "DOUBLE PRECISION[]"])
+                """.format(expression=expression).
+                    format(iteration=self.iteration, **self.kwargs),
+                    ["DOUBLE PRECISION[]", "DOUBLE PRECISION[]"])
 
-            resultObject= plpy.execute(eval_plan,
-                                       [self.old_state['_state'],
-                                        self.new_state['_state']])
+            resultObject = plpy.execute(
+                eval_plan,
+                [self.old_state['_state'], self.new_state['_state']])
         else:
-            resultObject=self.runSQL("""
-               SELECT ({expression}) AS expression
-               FROM {{rel_args}} AS _args
-                   LEFT OUTER JOIN (
-                       SELECT
-                           _state_previous, _state_current
-                       FROM
-                       (
-                           SELECT _state AS _state_previous
-                           FROM {{rel_state}}
-                           WHERE _iteration = {{iteration}} - 1
-                       ) sub1,
-                       (
-                           SELECT _state AS _state_current
-                           FROM {{rel_state}}
-                           WHERE _iteration = {{iteration}}
-                       ) sub2
-                   ) AS _state ON True
-               """.format(expression=expression).format(
-                        iteration=self.iteration,
-                        **self.kwargs))
+            resultObject = self.runSQL("""
+                SELECT ({expression}) AS expression
+                FROM {{rel_args}} AS _args
+                    LEFT OUTER JOIN (
+                        SELECT
+                            _state_previous, _state_current
+                        FROM
+                        (
+                            SELECT _state AS _state_previous
+                            FROM {{rel_state}}
+                            WHERE _iteration = {{iteration}} - 1
+                        ) sub1,
+                        (
+                            SELECT _state AS _state_current
+                            FROM {{rel_state}}
+                            WHERE _iteration = {{iteration}}
+                        ) sub2
+                    ) AS _state ON True
+                """.format(expression=expression).
+                    format(iteration=self.iteration,
+                           **self.kwargs))
 
         if resultObject.nrows() == 0:
             return None
         else:
             return resultObject[0]['expression']
 
-    def get_state_size(self):
-        return len(self.new_state) if self.is_hawq else 0
+    if STATE_IN_MEM:
+        def get_state_size(self):
+            return len(self.new_state)
 
-    def get_state_value(self, index):
-        return self.new_state['_state'][index] if self.is_hawq else 0
+        def get_state_value(self, index):
+            return self.new_state['_state'][index]
 
 m4_changequote(<!`!> , <!'!>)

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/96f9ac04/src/ports/postgres/modules/utilities/control_composite.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/control_composite.py_in b/src/ports/postgres/modules/utilities/control_composite.py_in
index bfe2f9d..a584845 100644
--- a/src/ports/postgres/modules/utilities/control_composite.py_in
+++ b/src/ports/postgres/modules/utilities/control_composite.py_in
@@ -16,7 +16,12 @@ import plpy
 from utilities import __mad_version
 version_wrapper = __mad_version()
 from utilities import unique_string
-from control import MinWarning
+_unique_string = unique_string
+from control import MinWarning, EnableOptimizer
+
+STATE_IN_MEM = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+HAS_FUNCTION_PROPERTIES = m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!True!>, <!False!>)
+UDF_ON_SEGMENT_NOT_ALLOWED = m4_ifdef(<!__UDF_ON_SEGMENT_NOT_ALLOWED__!>, <!True!>, <!False!>)
 
 
 class IterationControllerComposite:
@@ -58,10 +63,9 @@ class IterationControllerComposite:
         self.verbose = verbose
         self.inWith = False
         self.iteration = -1
-        m4_ifdef(<!__HAWQ__!>, <!
-        self.new_state = None
-        self.old_state = None
-        !>)
+        if STATE_IN_MEM:
+            self.new_state = None
+            self.old_state = None
 
     def __enter__(self):
         with MinWarning('warning'):
@@ -79,18 +83,18 @@ class IterationControllerComposite:
 
     def __exit__(self, type, value, tb):
         self.inWith = False
-        m4_ifdef(<!__HAWQ__!>, <!
-        insert_plan=plpy.prepare("""
-            INSERT INTO {rel_state}
-            SELECT $1, CAST( ({schema_madlib}.array_to_2d($2), $3, $4, $5) AS {schema_madlib}.kmeans_state)
-            """.format(**self.kwargs), [ "INTEGER", "DOUBLE PRECISION[]",
-                "INTEGER[]", "DOUBLE PRECISION", "DOUBLE PRECISION" ])
-        plpy.execute(insert_plan, [ self.new_state['_iteration'],
-            self.new_state['centroids'],
-            self.new_state['old_centroid_ids'],
-            self.new_state['objective_fn'],
-            self.new_state['frac_reassigned'] ])
-        !>)
+        if STATE_IN_MEM:
+            insert_plan=plpy.prepare("""
+                INSERT INTO {rel_state}
+                SELECT $1, CAST( ({schema_madlib}.array_to_2d($2), $3, $4, $5) AS {schema_madlib}.kmeans_state)
+                """.format(**self.kwargs), ["INTEGER", "DOUBLE PRECISION[]",
+                                            "INTEGER[]", "DOUBLE PRECISION",
+                                            "DOUBLE PRECISION"])
+            plpy.execute(insert_plan, [self.new_state['_iteration'],
+                         self.new_state['centroids'],
+                         self.new_state['old_centroid_ids'],
+                         self.new_state['objective_fn'],
+                         self.new_state['frac_reassigned']])
 
     def runSQL(self, sql):
         if self.verbose:
@@ -110,47 +114,39 @@ class IterationControllerComposite:
         @return None if \c expression evaluates to NULL, otherwise the value of
             \c expression
         """
-
-        ## For GPDB 4.3 we disable the optimzer (ORCA) for the query planner
-        ## since currently ORCA has a bug for left outer joins (MPP-21868).
-        ## This should be removed when the issue is fixed in ORCA.
-        if version_wrapper.is_gp43() or version_wrapper.is_hawq():
-            optimizer = plpy.execute("SHOW optimizer")[0]['optimizer']
-            plpy.execute("SET optimizer = off")
-
-        m4_ifdef(<!__HAWQ__!>, <!
-        cast_str = "CAST (({schema_madlib}.array_to_2d($1), $2, $3, $4) AS {schema_madlib}.kmeans_state)".format(**self.kwargs)
-        cast_type = [ "DOUBLE PRECISION[]", "INTEGER[]", "DOUBLE PRECISION", "DOUBLE PRECISION" ]
-        cast_para = [None if self.new_state is None else self.new_state[i]
-                     for i in ('centroids', 'old_centroid_ids',
-                               'objective_fn', 'frac_reassigned')]
-        eval_plan = plpy.prepare("""
-            SELECT ({expression}) AS expression
-            FROM {{rel_args}} AS _args
-                LEFT OUTER JOIN (
-                    SELECT {{schema_madlib}}.array_to_2d($1) AS _state
-                ) AS _state ON True
-            """.format(expression = expression).format(
-                    iteration = self.iteration,
-                    curr_state=cast_str, **self.kwargs), cast_type)
-
-        resultObject = plpy.execute(eval_plan, cast_para)
-        !>, <!
-        resultObject = self.runSQL("""
-            SELECT ({expression}) AS expression
-            FROM {{rel_args}} AS _args
-                LEFT OUTER JOIN (
-                    SELECT *
-                    FROM {{rel_state}} AS _state
-                    WHERE _state._iteration = {{iteration}}
-                ) AS _state ON True
-            """.format(expression = expression).format(
-                iteration = self.iteration,
-                **self.kwargs))
-        !>)
-
-        if version_wrapper.is_gp43() or version_wrapper.is_hawq():
-            plpy.execute("SET optimizer = " + optimizer)
+        # We disable the optimzer (ORCA) for the query planning
+        # since ORCA has a bug for left outer joins (MPP-21868).
+        # This should be removed when the issue is fixed in ORCA.
+        with EnableOptimizer(False):
+            if STATE_IN_MEM:
+                cast_str = "CAST (({schema_madlib}.array_to_2d($1), $2, $3, $4) AS {schema_madlib}.kmeans_state)".format(**self.kwargs)
+                cast_type = ["DOUBLE PRECISION[]", "INTEGER[]",
+                             "DOUBLE PRECISION", "DOUBLE PRECISION"]
+                cast_para = [None if self.new_state is None else self.new_state[i]
+                             for i in ('centroids', 'old_centroid_ids',
+                                       'objective_fn', 'frac_reassigned')]
+                eval_plan = plpy.prepare("""
+                    SELECT ({expression}) AS expression
+                    FROM {{rel_args}} AS _args
+                        LEFT OUTER JOIN (
+                            SELECT {{schema_madlib}}.array_to_2d($1) AS _state
+                        ) AS _state ON True
+                    """.format(expression=expression).
+                        format(iteration=self.iteration,
+                               curr_state=cast_str, **self.kwargs), cast_type)
+                resultObject = plpy.execute(eval_plan, cast_para)
+            else:
+                resultObject = self.runSQL("""
+                    SELECT ({expression}) AS expression
+                    FROM {{rel_args}} AS _args
+                        LEFT OUTER JOIN (
+                            SELECT *
+                            FROM {{rel_state}} AS _state
+                            WHERE _state._iteration = {{iteration}}
+                        ) AS _state ON True
+                    """.format(expression=expression).
+                        format(iteration=self.iteration,
+                               **self.kwargs))
 
         if resultObject.nrows() == 0:
             return None
@@ -170,10 +166,7 @@ class IterationControllerComposite:
         @return None if \c condition evaluates to NULL, otherwise the Boolean
             value of \c condition
         """
-
-        return self.evaluate("""
-            CAST(({condition}) AS BOOLEAN)
-            """.format(condition = condition))
+        return self.evaluate("CAST(({0}) AS BOOLEAN)".format(condition))
 
     def update(self, newState, **updateKwargs):
         """
@@ -196,64 +189,56 @@ class IterationControllerComposite:
         is kept.
         """
         updateKwargs.update(**self.kwargs)
-        newState = newState.format(
-            iteration = self.iteration,
-            **updateKwargs)
+        newState = newState.format(iteration=self.iteration, **updateKwargs)
         self.iteration = self.iteration + 1
 
-        m4_ifdef(<!__HAWQ__!>, <!
-        cast_str = """CAST (({schema_madlib}.array_to_2d($1), $2, $3, $4)
-                        AS {schema_madlib}.kmeans_state)""".format(**self.kwargs)
-        cast_str_old  = """CAST (({schema_madlib}.array_to_2d($5), $6, $7, $8)
+        if STATE_IN_MEM:
+            cast_str = """CAST (({schema_madlib}.array_to_2d($1), $2, $3, $4)
                             AS {schema_madlib}.kmeans_state)""".format(**self.kwargs)
-        cast_type = [
-            "DOUBLE PRECISION[]", "INTEGER[]", "DOUBLE PRECISION", "DOUBLE PRECISION",
-            "DOUBLE PRECISION[]", "INTEGER[]", "DOUBLE PRECISION", "DOUBLE PRECISION"
-        ]
-
-        cast_para = [None if self.new_state is None else self.new_state[i]
-                     for i in ('centroids', 'old_centroid_ids', 'objective_fn',
-                               'frac_reassigned')]
-        cast_para.extend([None if self.old_state is None else self.old_state[i]
-                     for i in ('centroids', 'old_centroid_ids', 'objective_fn',
-                               'frac_reassigned')])
-
-        updateKwargs.update(curr_state=cast_str, old_state=cast_str_old)
-        self.old_state = self.new_state
-
-        update_plan = plpy.prepare("""
-            SELECT
-                {iteration} AS _iteration,
-                {schema_madlib}.array_to_1d((_state).centroids) AS centroids,
-                (_state).old_centroid_ids,
-                (_state).objective_fn,
-                (_state).frac_reassigned
-            FROM
-            (
-                SELECT ({newState}) AS _state
-            ) q
-            """.format(iteration=self.iteration,
-                       newState=newState, **self.kwargs).format(**updateKwargs),
-            cast_type)
-        self.new_state = plpy.execute(update_plan, cast_para)[0]
-        !>, <!
-        self.runSQL("""
-            INSERT INTO {rel_state}
-            SELECT
-                {iteration},
-                ({newState})
-            """.format(
-                iteration = self.iteration,
-                newState = newState,
-                **self.kwargs))
-        !>)
-
-        m4_ifdef(<!__HAWQ__!>, <!!>, <!
-        if self.truncAfterIteration:
+            cast_str_old = """CAST (({schema_madlib}.array_to_2d($5), $6, $7, $8)
+                                AS {schema_madlib}.kmeans_state)""".format(**self.kwargs)
+            cast_type = [
+                "DOUBLE PRECISION[]", "INTEGER[]", "DOUBLE PRECISION", "DOUBLE PRECISION",
+                "DOUBLE PRECISION[]", "INTEGER[]", "DOUBLE PRECISION", "DOUBLE PRECISION"
+            ]
+
+            cast_para = [None if self.new_state is None else self.new_state[i]
+                         for i in ('centroids', 'old_centroid_ids', 'objective_fn',
+                                   'frac_reassigned')]
+            cast_para.extend([None if self.old_state is None else self.old_state[i]
+                             for i in ('centroids', 'old_centroid_ids', 'objective_fn',
+                                       'frac_reassigned')])
+
+            updateKwargs.update(curr_state=cast_str, old_state=cast_str_old)
+            self.old_state = self.new_state
+
+            update_plan = plpy.prepare("""
+                SELECT
+                    {iteration} AS _iteration,
+                    {schema_madlib}.array_to_1d((_state).centroids) AS centroids,
+                    (_state).old_centroid_ids,
+                    (_state).objective_fn,
+                    (_state).frac_reassigned
+                FROM
+                (
+                    SELECT ({newState}) AS _state
+                ) q
+                """.format(iteration=self.iteration,
+                           newState=newState, **self.kwargs).format(**updateKwargs),
+                cast_type)
+            self.new_state = plpy.execute(update_plan, cast_para)[0]
+        else:
             self.runSQL("""
-                DELETE FROM {rel_state} AS _state
-                WHERE _state._iteration < {iteration}
-            """.format(iteration = self.iteration, **self.kwargs))
-        !>)
+                INSERT INTO {rel_state}
+                SELECT
+                    {iteration},
+                    ({newState})
+                """.format(iteration=self.iteration, newState=newState,
+                           **self.kwargs))
+            if self.truncAfterIteration:
+                self.runSQL("""
+                    DELETE FROM {rel_state} AS _state
+                    WHERE _state._iteration < {iteration}
+                """.format(iteration=self.iteration, **self.kwargs))
 
 m4_changequote(<!`!>, <!'!>)

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/96f9ac04/src/ports/postgres/modules/utilities/group_control.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/group_control.py_in b/src/ports/postgres/modules/utilities/group_control.py_in
index ce9b7db..708a532 100644
--- a/src/ports/postgres/modules/utilities/group_control.py_in
+++ b/src/ports/postgres/modules/utilities/group_control.py_in
@@ -1,6 +1,4 @@
 # coding=utf-8
-m4_changequote(`>>>', `<<<')
-
 """
 @file control.py_in
 
@@ -20,7 +18,11 @@ _unique_string = unique_string
 version_wrapper = __mad_version()
 mad_vec = version_wrapper.select_vecfunc()
 
+m4_changequote(`<!', `!>')
+
+
 class GroupIterationController:
+
     """
     @brief Abstraction for implementing driver functions in PL/Python
 
@@ -48,13 +50,13 @@ class GroupIterationController:
     """
 
     def __init__(self, rel_args, rel_state, stateType,
-            temporaryTables = True,
-            schema_madlib = "MADLIB_SCHEMA_MISSING",
-            verbose = False,
-            grouping_str = "NULL",
-            col_grp_iteration="_iteration",
-            col_grp_state="_state",
-            **kwargs):
+                 temporaryTables=True,
+                 schema_madlib="MADLIB_SCHEMA_MISSING",
+                 verbose=False,
+                 grouping_str="NULL",
+                 col_grp_iteration="_iteration",
+                 col_grp_state="_state",
+                 **kwargs):
         self.temporaryTables = temporaryTables
         self.verbose = verbose
         self.inWith = False
@@ -62,38 +64,36 @@ class GroupIterationController:
         self.grouping_str = grouping_str
         self.kwargs = kwargs
         self.kwargs.update(
-            unqualified_rel_state = rel_state,
-            rel_args = ('pg_temp.' if temporaryTables else '') + rel_args,
-            rel_state = ('pg_temp.' if temporaryTables else '') + rel_state,
-            stateType = stateType.format(schema_madlib = schema_madlib),
-            schema_madlib = schema_madlib,
-            grouping_str = self.grouping_str,
-            col_grp_null = _unique_string(),
-            col_grp_key = _unique_string(),
+            unqualified_rel_state=rel_state,
+            rel_args=('pg_temp.' if temporaryTables else '') + rel_args,
+            rel_state=('pg_temp.' if temporaryTables else '') + rel_state,
+            stateType=stateType.format(schema_madlib=schema_madlib),
+            schema_madlib=schema_madlib,
+            grouping_str=self.grouping_str,
+            col_grp_null=_unique_string(),
+            col_grp_key=_unique_string(),
             col_grp_iteration=col_grp_iteration,
             col_grp_state=col_grp_state
-            )
-        grouping_col = "Null" if kwargs["grouping_col"] is None \
-                                else kwargs["grouping_col"]
-        using_str = "on True" if kwargs["grouping_col"] is None \
-                                else "using ({grouping_col})".format(**kwargs)
+        )
+        grouping_col = "Null" if kwargs["grouping_col"] is None else kwargs["grouping_col"]
+        using_str = "on True" if kwargs["grouping_col"] is None else "using ({grouping_col})".format(**kwargs)
         self.is_group_null = True if kwargs["grouping_col"] is None else False
         self.kwargs["grouping_col"] = grouping_col
         self.kwargs["using_str"] = using_str
         self.grouping_col = grouping_col
-        m4_ifdef(>>>__HAWQ__<<<, >>>
-        self.new_states = []
-        self.old_states = []
-        self.finished_states = []
-        <<<)
+        self.hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+        if self.hawq:
+            self.new_states = []
+            self.old_states = []
+            self.finished_states = []
 
     def __enter__(self):
         with MinWarning('warning'):
             # currently assuming that groups is passed as a valid array
-            group_col = ("NULL::integer as {col_grp_null}" if self.is_group_null \
-                        else "{grouping_col}").format(**self.kwargs)
-            groupby_str = ("{col_grp_null}" if self.is_group_null \
-                        else "{grouping_col}").format(**self.kwargs)
+            group_col = ("NULL::integer as {col_grp_null}" if self.is_group_null
+                         else "{grouping_col}").format(**self.kwargs)
+            groupby_str = ("{col_grp_null}" if self.is_group_null
+                           else "{grouping_col}").format(**self.kwargs)
             primary_str = "" if self.is_group_null else ", {grouping_col}".format(**self.kwargs)
 
             self.runSQL(
@@ -104,94 +104,93 @@ class GroupIterationController:
                         {group_col},
                         0::integer as {col_grp_iteration},
                         Null::{stateType} as {col_grp_state}
-                        m4_ifdef(>>>__HAWQ__<<<, >>>, array_to_string(ARRAY[{grouping_str}], ',') AS {col_grp_key}<<<)
+                        m4_ifdef(<!__HAWQ__!>, <!, array_to_string(ARRAY[{grouping_str}], ',') AS {col_grp_key}!>)
                     from {rel_source}
                     group by {groupby_str}
                 );
-				m4_ifdef(>>>__POSTGRESQL__<<<, >>><<<, >>>
+                m4_ifdef(<!__POSTGRESQL__!>, <!!>, <!
                 alter table {rel_state} set distributed by ({col_grp_iteration} {primary_str});
-                <<<)
+                !>)
                 """.format(group_col=group_col, groupby_str=groupby_str, primary_str=primary_str,
                            temp='TEMPORARY' if self.temporaryTables else '', **self.kwargs))
             null_test = " or ".join([g.strip() + " is NULL" for g in
-                        self.kwargs['grouping_col'].split(",")])
+                                     self.kwargs['grouping_col'].split(",")])
             null_count = plpy.execute(
                 """
                 select count(*) from {rel_state} where {null_test}
                 """.format(null_test=null_test, **self.kwargs))[0]['count']
-            if null_count != 0 and primary_str != "":
+            if null_count != 0 and primary_str:
                 plpy.error("Grouping error: at least one of the grouping columns contains NULL values!"
                            " Please filter out those NULL values.")
-            self.runSQL("""
-                        m4_ifdef(>>>__HAWQ__<<<, >>><<<, >>>
-                        alter table {rel_state} add primary key ({col_grp_iteration} {primary_str})
-                        <<<)
-                        """.format(primary_str=primary_str, **self.kwargs))
-            m4_ifdef(>>>__HAWQ__<<<, >>>
-            self.new_states = plpy.execute("SELECT * FROM {rel_state}".format(**self.kwargs))
-            if not self.is_group_null:
-                converted_source_table = _unique_string()
-                plpy.execute("""
-                    CREATE TEMP TABLE {converted_source_table} AS
-                    SELECT *, array_to_string(ARRAY[{grouping_str}], ',') AS {col_grp_key} FROM {rel_source}
-                    """.format(converted_source_table=converted_source_table, **self.kwargs))
-                self.kwargs.update(rel_source=converted_source_table)
-            <<<)
+            if not self.hawq:
+                self.runSQL("alter table {rel_state} add primary key "
+                            "({col_grp_iteration} {primary_str})".
+                            format(primary_str=primary_str, **self.kwargs))
+            else:
+                self.new_states = plpy.execute("SELECT * FROM {rel_state}".format(**self.kwargs))
+                if not self.is_group_null:
+                    converted_source_table = _unique_string()
+                    plpy.execute("""
+                        CREATE TEMP TABLE {converted_source_table} AS
+                        SELECT *, array_to_string(ARRAY[{grouping_str}], ',') AS {col_grp_key} FROM {rel_source}
+                        """.format(converted_source_table=converted_source_table, **self.kwargs))
+                    self.kwargs.update(rel_source=converted_source_table)
         self.inWith = True
         return self
 
     def __exit__(self, type, value, tb):
         self.inWith = False
-        m4_ifdef(>>>__HAWQ__<<<, >>>
-        insert_plan = plpy.prepare("""
-            INSERT INTO {rel_state}
-            SELECT
-                {grouping_col},
-                _rel_state.iteration,
-                _rel_state.state,
-                _rel_state.grp_key
-            FROM
-            (
-                SELECT
-                    {grouping_col}, {col_grp_key}
-                FROM {rel_state}
-                WHERE {col_grp_iteration} = 0
-            ) AS _src,
-            (
+        if self.hawq:
+            insert_sql = """
+                INSERT INTO {rel_state}
                 SELECT
-                    grp_key, iteration, state
+                    {grouping_col},
+                    _rel_state.iteration,
+                    _rel_state.state,
+                    _rel_state.grp_key
                 FROM
-                    {schema_madlib}._gen_state($1, $2, $3)
-            ) AS _rel_state
-            WHERE _src.{col_grp_key} = _rel_state.grp_key
-            """.format(**self.kwargs), ["text[]", "integer[]", "double precision[]"])
-
-        grp_keys = []
-        states = []
-        iterations = []
-        null_empty = []
-        for state in self.finished_states:
-            # the lenght of states for null_empty groups are different
-            if state["{col_grp_state}".format(**self.kwargs)][-1] == 3:
-                null_empty.append(state)
-                continue
-            iterations.append(state["{col_grp_iteration}".format(**self.kwargs)])
-            grp_keys.append(state["{col_grp_key}".format(**self.kwargs)])
-            states.extend(state["{col_grp_state}".format(**self.kwargs)])
-        plpy.execute(insert_plan, [grp_keys, iterations, states])
-
-        grp_keys = []
-        states = []
-        iterations = []
-        for state in null_empty:
-            iterations.append(state["{col_grp_iteration}".format(**self.kwargs)])
-            grp_keys.append(state["{col_grp_key}".format(**self.kwargs)])
-            states.extend(state["{col_grp_state}".format(**self.kwargs)])
-        plpy.execute(insert_plan, [grp_keys, iterations, states])
-
-        if not self.is_group_null:
-            plpy.execute('DROP TABLE IF EXISTS ' + self.kwargs['rel_source'])
-        <<<)
+                (
+                    SELECT
+                        {grouping_col}, {col_grp_key}
+                    FROM {rel_state}
+                    WHERE {col_grp_iteration} = 0
+                ) AS _src,
+                (
+                    SELECT
+                        grp_key, iteration, state
+                    FROM
+                        {schema_madlib}._gen_state($1, $2, $3)
+                ) AS _rel_state
+                WHERE _src.{col_grp_key} = _rel_state.grp_key
+                """.format(**self.kwargs)
+            insert_plan = plpy.prepare(insert_sql, ["text[]", "integer[]", "double precision[]"])
+
+            grp_keys = []
+            states = []
+            iterations = []
+            null_empty = []
+            for state in self.finished_states:
+                # the length of states for null_empty groups are different
+                if state["{col_grp_state}".format(**self.kwargs)][-1] == 3:
+                    null_empty.append(state)
+                    continue
+                iterations.append(state["{col_grp_iteration}".format(**self.kwargs)])
+                grp_keys.append(state["{col_grp_key}".format(**self.kwargs)])
+                states.extend(state["{col_grp_state}".format(**self.kwargs)])
+            plpy.execute(insert_plan, [grp_keys, iterations, states])
+
+            grp_keys = []
+            states = []
+            iterations = []
+            for state in null_empty:
+                iterations.append(state["{col_grp_iteration}".format(**self.kwargs)])
+                grp_keys.append(state["{col_grp_key}".format(**self.kwargs)])
+                states.extend(state["{col_grp_state}".format(**self.kwargs)])
+            insert_plan = plpy.prepare(insert_sql, ["text[]", "integer[]", "double precision[]"])
+            plpy.execute(insert_plan, [grp_keys, iterations, states])
+
+            if not self.is_group_null:
+                plpy.execute('DROP TABLE IF EXISTS ' + self.kwargs['rel_source'])
 
     def runSQL(self, sql):
         if self.verbose:
@@ -211,45 +210,44 @@ class GroupIterationController:
         @return None if \c expression evaluates to NULL, otherwise the value of
             \c expression
         """
-        m4_ifdef(>>>__HAWQ__<<<, >>>
-        eva_plan = plpy.prepare("""
-            SELECT
-                ({expression}) AS _expression,
-                subq1.grp_key AS _groups
-            FROM {{rel_args}} AS _args
-            left outer join (
-                (
-                    SELECT grp_key, state AS _state_previous
-                    FROM {schema_madlib}._gen_state($1, NULL, $2)
-                ) sub1
-                JOIN
-                (
-                    SELECT grp_key, state AS _state_current
-                    FROM {schema_madlib}._gen_state($3, NULL, $4)
-                ) sub2
-                using (grp_key)
-            ) AS subq1 ON True
-            """.format(expression = expression, **self.kwargs).format(
-                    iteration=self.iteration, **self.kwargs),
-            ["text[]", "double precision[]", "text[]", "double precision[]"])
-
-        grp_keys_new = []
-        states_new = []
-        for state in self.new_states:
-            if state["{col_grp_state}".format(**self.kwargs)][-1] < 2:
-                grp_keys_new.append(state["{col_grp_key}".format(**self.kwargs)])
-                states_new.extend(state["{col_grp_state}".format(**self.kwargs)])
-        grp_keys_old = []
-        states_old = []
-        for state in self.old_states:
-            grp_keys_old.append(state["{col_grp_key}".format(**self.kwargs)])
-            if state["{col_grp_state}".format(**self.kwargs)] is not None:
-                states_old.extend(state["{col_grp_state}".format(**self.kwargs)])
-
-        resultObject = plpy.execute(eva_plan, [grp_keys_old, states_old, grp_keys_new, states_new])
-        <<<,
-        >>>resultObject = self.runSQL(
-            """
+        if self.hawq:
+            eva_plan = plpy.prepare("""
+                SELECT
+                    ({expression}) AS _expression,
+                    subq1.grp_key AS _groups
+                FROM {{rel_args}} AS _args
+                left outer join (
+                    (
+                        SELECT grp_key, state AS _state_previous
+                        FROM {schema_madlib}._gen_state($1, NULL, $2)
+                    ) sub1
+                    JOIN
+                    (
+                        SELECT grp_key, state AS _state_current
+                        FROM {schema_madlib}._gen_state($3, NULL, $4)
+                    ) sub2
+                    using (grp_key)
+                ) AS subq1 ON True
+                """.format(expression=expression, **self.kwargs).
+                    format(iteration=self.iteration, **self.kwargs),
+                    ["text[]", "double precision[]", "text[]", "double precision[]"])
+
+            grp_keys_new = []
+            states_new = []
+            for state in self.new_states:
+                if state["{col_grp_state}".format(**self.kwargs)][-1] < 2:
+                    grp_keys_new.append(state["{col_grp_key}".format(**self.kwargs)])
+                    states_new.extend(state["{col_grp_state}".format(**self.kwargs)])
+            grp_keys_old = []
+            states_old = []
+            for state in self.old_states:
+                grp_keys_old.append(state["{col_grp_key}".format(**self.kwargs)])
+                if state["{col_grp_state}".format(**self.kwargs)] is not None:
+                    states_old.extend(state["{col_grp_state}".format(**self.kwargs)])
+
+            resultObject = plpy.execute(eva_plan, [grp_keys_old, states_old, grp_keys_new, states_new])
+        else:
+            resultObject = self.runSQL("""
             SELECT
                 ({expression}) AS _expression,
                 ARRAY[{{grouping_str}}] AS _groups
@@ -268,35 +266,34 @@ class GroupIterationController:
                 ) sub2
                 {using_str}
             ) AS subq1 ON True
-            """.format(expression = expression, **self.kwargs).format(
-                    iteration=self.iteration, **self.kwargs))
-        <<<)
+            """.format(expression=expression, **self.kwargs).
+                format(iteration=self.iteration, **self.kwargs))
 
         if resultObject.nrows() == 0:
             return None
         else:
-            m4_ifdef(>>>__HAWQ__<<<, >>>complete_grps = []<<<)
+            complete_grps = []
             for each_elem in resultObject:
-                m4_ifdef(>>>__HAWQ__<<<, >>><<<, >>>
-                # update status for each group
-                group_vector = mad_vec(each_elem["_groups"])
-                groups_as_str = [None] * len(group_vector)
-                # convert group values to string objects
-                for index, each_grp in enumerate(group_vector):
-                    if not each_grp or each_grp.lower() == 'null':
-                        # NULL values should be outputed as NULL instead of
-                        # as a string 'NULL'
-                        groups_as_str[index] = "NULL::text"
-                    else:
-                        groups_as_str[index] = "'" + str(each_grp) + "'::text"
-                array_str = "array[" + ",".join(groups_as_str) + "]"
-                <<<)
+                if not self.hawq:
+                    # update status for each group
+                    group_vector = mad_vec(each_elem["_groups"])
+                    groups_as_str = [None] * len(group_vector)
+                    # convert group values to string objects
+                    for index, each_grp in enumerate(group_vector):
+                        if not each_grp or each_grp.lower() == 'null':
+                            # NULL values should be outputed as NULL instead of
+                            # as a string 'NULL'
+                            groups_as_str[index] = "NULL::text"
+                        else:
+                            groups_as_str[index] = "'" + str(each_grp) + "'::text"
+                    array_str = "array[" + ",".join(groups_as_str) + "]"
                 # update status for the group if it completed iterating
                 if each_elem['_expression']:
-                    m4_ifdef(>>>__HAWQ__<<<,
-                    >>>complete_grps.append(each_elem["_groups"])<<<,
-                    >>>self.runSQL(
-                        """UPDATE {rel_state} set {col_grp_state}[array_upper({col_grp_state}, 1)] = 1
+                    if self.hawq:
+                        complete_grps.append(each_elem["_groups"])
+                    else:
+                        self.runSQL("""
+                        UPDATE {rel_state} set {col_grp_state}[array_upper({col_grp_state}, 1)] = 1
                         WHERE
                             ARRAY[{grouping_str}] = {_group_val} and
                             {col_grp_state}[array_upper({col_grp_state}, 1)] < 2 and
@@ -305,34 +302,32 @@ class GroupIterationController:
                             _group_val=array_str,
                             iteration=self.iteration,
                             **self.kwargs))
-                    <<<)
 
-            m4_ifdef(>>>__HAWQ__<<<, >>>
-            all_finish = True
-            tmp = []
-            for state in self.new_states:
-                if state["{col_grp_key}".format(**self.kwargs)] in complete_grps \
-                    and state["{col_grp_state}".format(**self.kwargs)][-1] < 1:
-                    state["{col_grp_state}".format(**self.kwargs)][-1] = 1
-                if state["{col_grp_state}".format(**self.kwargs)][-1] < 1:
-                    all_finish = False
-                    tmp.append(state)
-                else:
-                    self.finished_states.append(state)
-            self.new_states = tmp
-            return all_finish
-            <<<, >>>
-            # return True only if all group combinations have finished iterating
-            rv = self.runSQL(
-                """
-                select bool_and({col_grp_state}[array_upper({col_grp_state}, 1)]::integer::boolean) as rst
-                from {rel_state} as _state_table
-                where _state_table.{col_grp_iteration} = {iteration}
-                """.format(
-                    iteration=self.iteration,
-                    **self.kwargs))[0]["rst"]
-            return rv
-            <<<)
+            if self.hawq:
+                all_finish = True
+                tmp = []
+                for state in self.new_states:
+                    if state["{col_grp_key}".format(**self.kwargs)] in complete_grps \
+                            and state["{col_grp_state}".format(**self.kwargs)][-1] < 1:
+                        state["{col_grp_state}".format(**self.kwargs)][-1] = 1
+                    if state["{col_grp_state}".format(**self.kwargs)][-1] < 1:
+                        all_finish = False
+                        tmp.append(state)
+                    else:
+                        self.finished_states.append(state)
+                self.new_states = tmp
+                return all_finish
+            else:
+                # return True only if all group combinations have finished iterating
+                rv = self.runSQL(
+                    """
+                    select bool_and({col_grp_state}[array_upper({col_grp_state}, 1)]::integer::boolean) as rst
+                    from {rel_state} as _state_table
+                    where _state_table.{col_grp_iteration} = {iteration}
+                    """.format(
+                        iteration=self.iteration,
+                        **self.kwargs))[0]["rst"]
+                return rv
 
     def test(self, condition):
         """
@@ -347,10 +342,7 @@ class GroupIterationController:
         @return None if \c condition evaluates to NULL, otherwise the Boolean
             value of \c condition
         """
-        return self.evaluate(
-            """
-            CAST(({condition}) AS BOOLEAN)
-            """.format(condition = condition))
+        return self.evaluate("CAST(({0}) AS BOOLEAN)".format(condition))
 
     def update(self, newState, **updateKwargs):
         """
@@ -374,76 +366,72 @@ class GroupIterationController:
         """
         newState = newState.format(**self.kwargs)
         self.iteration = self.iteration + 1
-
         groupby_str = "" if self.is_group_null \
                       else "group by {grouping_col}, {col_grp_key}".format(**self.kwargs)
-
-        m4_ifdef(>>>__HAWQ__<<<, >>>
-        groupby_str = "" if self.is_group_null \
-                      else "group by {grouping_col}, {col_grp_key}".format(**self.kwargs)
-        update_plan = plpy.prepare(
-            """
-            select
-                {_grp_key} AS {col_grp_key},
-                {grouping_col} {as_string},
-                {iteration} AS {col_grp_iteration},
-                ({newState}) AS {col_grp_state}
-            from
-                {rel_source} AS _src,
-                (
-                SELECT
-                    grp_key, state AS {col_grp_state}
-                FROM
-                    {schema_madlib}._gen_state($1, NULL, $2)
-                ) AS rel_state
-            where
-                {_grp_key} = grp_key
-            {groupby_str}
-            """.format(
-                iteration=self.iteration,
-                groupby_str=groupby_str,
-                as_string='AS _grp' if self.is_group_null else '',
-                _grp_key="array_to_string(ARRAY[{grouping_str}], ',')".format(**self.kwargs) if self.is_group_null else self.kwargs['col_grp_key'],
-                newState = newState,
-                **self.kwargs), ["text[]", "double precision[]"])
-        grp_keys = []
-        states = []
-        for state in self.new_states:
-            grp_keys.append(state["{col_grp_key}".format(**self.kwargs)])
-            if state["{col_grp_state}".format(**self.kwargs)] is not None:
-                states.extend(state["{col_grp_state}".format(**self.kwargs)])
-
-        self.old_states = self.new_states
-        self.new_states = plpy.execute(update_plan, [grp_keys, states])
-        <<<,
-        >>>
-        groupby_str = "" if self.is_group_null \
-                      else "group by {grouping_col}".format(**self.kwargs)
-        self.runSQL(
-            """
-            insert into {rel_state}
-                (select
-                    {grouping_col},
-                    {iteration},
-                    ({newState})
+        if self.hawq:
+            groupby_str = "" if self.is_group_null \
+                          else "group by {grouping_col}, {col_grp_key}".format(**self.kwargs)
+            update_plan = plpy.prepare(
+                """
+                select
+                    {_grp_key} AS {col_grp_key},
+                    {grouping_col} {as_string},
+                    {iteration} AS {col_grp_iteration},
+                    ({newState}) AS {col_grp_state}
                 from
-                    ({rel_source} AS _src
-                    join
-                    {rel_state} AS rel_state
-                    {using_str})
+                    {rel_source} AS _src,
+                    (
+                    SELECT
+                        grp_key, state AS {col_grp_state}
+                    FROM
+                        {schema_madlib}._gen_state($1, NULL, $2)
+                    ) AS rel_state
                 where
-                    rel_state.{col_grp_iteration} = {iteration} - 1 and
-                    (case when {iteration} = 1 then
-                        True
-                    else
-                        rel_state.{col_grp_state}[array_upper(rel_state.{col_grp_state}, 1)] = 0
-                    end)
-                {groupby_str})
-            """.format(
-                groupby_str = groupby_str,
-                iteration = self.iteration,
-                newState = newState,
-                **self.kwargs))
-        <<<)
-
-m4_changequote(>>>`<<<, >>>'<<<)
+                    {_grp_key} = grp_key
+                {groupby_str}
+                """.format(
+                    iteration=self.iteration,
+                    groupby_str=groupby_str,
+                    as_string='AS _grp' if self.is_group_null else '',
+                    _grp_key="array_to_string(ARRAY[{grouping_str}], ',')".format(**self.kwargs) if self.is_group_null else self.kwargs['col_grp_key'],
+                    newState=newState,
+                    **self.kwargs), ["text[]", "double precision[]"])
+            grp_keys = []
+            states = []
+            for state in self.new_states:
+                grp_keys.append(state["{col_grp_key}".format(**self.kwargs)])
+                if state["{col_grp_state}".format(**self.kwargs)] is not None:
+                    states.extend(state["{col_grp_state}".format(**self.kwargs)])
+
+            self.old_states = self.new_states
+            self.new_states = plpy.execute(update_plan, [grp_keys, states])
+        else:
+            groupby_str = "" if self.is_group_null \
+                          else "group by {grouping_col}".format(**self.kwargs)
+            self.runSQL(
+                """
+                insert into {rel_state}
+                    (select
+                        {grouping_col},
+                        {iteration},
+                        ({newState})
+                    from
+                        ({rel_source} AS _src
+                        join
+                        {rel_state} AS rel_state
+                        {using_str})
+                    where
+                        rel_state.{col_grp_iteration} = {iteration} - 1 and
+                        (case when {iteration} = 1 then
+                            True
+                        else
+                            rel_state.{col_grp_state}[array_upper(rel_state.{col_grp_state}, 1)] = 0
+                        end)
+                    {groupby_str})
+                """.format(
+                    groupby_str=groupby_str,
+                    iteration=self.iteration,
+                    newState=newState,
+                    **self.kwargs))
+
+m4_changequote(<!`!>, <!'!>)