You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ok...@apache.org on 2019/12/02 19:02:15 UTC
[madlib] branch master updated: DL: Add support for asymmetric
segment distribution to preprocessor
This is an automated email from the ASF dual-hosted git repository.
okislal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git
The following commit(s) were added to refs/heads/master by this push:
new ef11e2b DL: Add support for asymmetric segment distribution to preprocessor
ef11e2b is described below
commit ef11e2b523423872311991b2162dccbdbdb9e37c
Author: Orhan Kislal <ok...@apache.org>
AuthorDate: Mon Dec 2 14:01:05 2019 -0500
DL: Add support for asymmetric segment distribution to preprocessor
JIRA: MADLIB-1392
This commit adds a new parameter `distribution_rules`, to the functions
`training_preprocessor_dl()` and `validation_preprocessor_dl()`. This
parameter specifies how to distribute the `output_table` on the cluster,
`all_segments` being the default value. This change adds a new column to
the output summary table `gpu_config`, containing values based on the
input `distribution_rules`.
Closes #459
Co-authored-by: Ekta Khanna <ek...@pivotal.io>
---
.../deep_learning/input_data_preprocessor.py_in | 163 +++++++++++--
.../deep_learning/input_data_preprocessor.sql_in | 269 +++++++++++++++++----
.../madlib_keras_fit_multiple_model.py_in | 4 +-
.../deep_learning/madlib_keras_helper.py_in | 1 +
.../test/input_data_preprocessor.sql_in | 58 ++++-
.../unit_tests/test_input_data_preprocessor.py_in | 88 ++++++-
6 files changed, 502 insertions(+), 81 deletions(-)
diff --git a/src/ports/postgres/modules/deep_learning/input_data_preprocessor.py_in b/src/ports/postgres/modules/deep_learning/input_data_preprocessor.py_in
index 1f833eb..1acd136 100644
--- a/src/ports/postgres/modules/deep_learning/input_data_preprocessor.py_in
+++ b/src/ports/postgres/modules/deep_learning/input_data_preprocessor.py_in
@@ -34,6 +34,7 @@ from utilities.utilities import add_postfix
from utilities.utilities import is_platform_pg
from utilities.utilities import is_psql_char_type
from utilities.utilities import is_valid_psql_type
+from utilities.utilities import is_var_valid
from utilities.utilities import BOOLEAN, NUMERIC, ONLY_ARRAY, TEXT
from utilities.utilities import py_list_to_sql_string
from utilities.utilities import split_quoted_delimited_str
@@ -51,7 +52,7 @@ NUM_CLASSES_COLNAME = "num_classes"
class InputDataPreprocessorDL(object):
def __init__(self, schema_madlib, source_table, output_table,
dependent_varname, independent_varname, buffer_size,
- normalizing_const, num_classes, module_name):
+ normalizing_const, num_classes, distribution_rules, module_name):
self.schema_madlib = schema_madlib
self.source_table = source_table
self.output_table = output_table
@@ -60,10 +61,12 @@ class InputDataPreprocessorDL(object):
self.buffer_size = buffer_size
self.normalizing_const = normalizing_const if normalizing_const is not None else DEFAULT_NORMALIZING_CONST
self.num_classes = num_classes
+ self.distribution_rules = distribution_rules if distribution_rules else DEFAULT_GPU_CONFIG
self.module_name = module_name
self.output_summary_table = None
self.dependent_vartype = None
self.independent_vartype = None
+ self.gpu_config = '$__madlib__$all_segments$__madlib__$'
if self.output_table:
self.output_summary_table = add_postfix(self.output_table, "_summary")
@@ -99,6 +102,34 @@ class InputDataPreprocessorDL(object):
"in table ({1}).".format(
self.module_name, len(self.dependent_levels)))
+ def _validate_distribution_table(self):
+
+ input_tbl_valid(self.distribution_rules, self.module_name,
+ error_suffix_str="""
+ segments_to_use table ({self.distribution_rules}) doesn't exist.
+ """.format(self=self))
+ _assert(is_var_valid(self.distribution_rules, 'dbid'),
+ "{self.module_name}: distribution rules table must contain dbib column".format(
+ self=self))
+ dbids = plpy.execute("""
+ SELECT array_agg(dbid) AS dbids FROM gp_segment_configuration
+ WHERE content >= 0 AND role = 'p'
+ """)[0]['dbids']
+ dist_result = plpy.execute("""
+ SELECT array_agg(dbid) AS dbids,
+ count(dbid) AS c1,
+ count(DISTINCT dbid) AS c2
+ FROM {0} """.format(self.distribution_rules))
+
+ _assert(dist_result[0]['c1'] == dist_result[0]['c2'],
+ '{self.module_name}: distribution rules table contains duplicate dbids'.format(
+ self=self))
+
+ for i in dist_result[0]['dbids']:
+ _assert(i in dbids,
+ '{self.module_name}: invalid dbid:{i} in the distribution rules table'.format(
+ self=self, i=i))
+
def get_one_hot_encoded_dep_var_expr(self):
"""
:param dependent_varname: Name of the dependent variable
@@ -204,42 +235,96 @@ class InputDataPreprocessorDL(object):
dep_shape = self._get_dependent_var_shape()
dep_shape = ','.join([str(i) for i in dep_shape])
- # Create the mini-batched output table
if is_platform_pg():
+ self.distribution_rules = '$__madlib__$all_segments$__madlib__$'
distributed_by_clause = ''
dist_key_clause = ''
join_clause = ''
- select_clause = 'b.*'
dist_key_comma = ''
-
else:
-
dist_key = DISTRIBUTION_KEY_COLNAME
# Create large temp table such that there is atleast 1 row on each segment
# Using 999999 would distribute data(atleast 1 row on each segment) for
# a cluster as large as 20000
query = """
- CREATE TEMP TABLE {series_tbl}
- AS
- SELECT generate_series(0, 999999) {dist_key}
- DISTRIBUTED BY ({dist_key})
- """.format(**locals())
+ CREATE TEMP TABLE {series_tbl}
+ AS
+ SELECT generate_series(0, 999999) {dist_key}
+ DISTRIBUTED BY ({dist_key})
+ """.format(**locals())
plpy.execute(query)
-
- # Create temp table to get unique distribution key values for each segment
- query = """
+ distributed_by_clause= ' DISTRIBUTED BY ({dist_key}) '.format(**locals())
+ dist_key_comma = dist_key + ' ,'
+ gpu_join_clause = """JOIN {dist_key_tbl} ON
+ ({self.gpu_config})[b.buffer_id%{num_segments}+1] = {dist_key_tbl}.id
+ """
+
+ if self.distribution_rules == 'gpu_segments':
+ gpu_info_table = unique_string(desp='gpu_info')
+ plpy.execute("""
+ SELECT {self.schema_madlib}.gpu_configuration('{gpu_info_table}')
+ """.format(**locals()))
+ gpu_query = """
+ SELECT array_agg(DISTINCT(hostname)) as gpu_config
+ FROM {gpu_info_table}
+ """.format(**locals())
+ gpu_query_result = plpy.execute(gpu_query)[0]['gpu_config']
+ if not gpu_query_result:
+ plpy.error("{self.module_name}: No GPUs configured on hosts.".format(self=self))
+
+ gpu_config_hostnames = "ARRAY{0}".format(gpu_query_result)
+ # find hosts with gpus
+ get_segment_query = """
+ SELECT array_agg(content) as segment_ids,
+ array_agg(dbid) as dbid,
+ count(*) as count
+ FROM gp_segment_configuration
+ WHERE content != -1 AND role = 'p'
+ AND hostname=ANY({gpu_config_hostnames})
+ """.format(**locals())
+ segment_ids_result = plpy.execute(get_segment_query)[0]
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(gpu_info_table))
+
+ self.gpu_config = "ARRAY{0}".format(sorted(segment_ids_result['segment_ids']))
+ self.distribution_rules = "ARRAY{0}".format(sorted(segment_ids_result['dbid']))
+
+ num_segments = segment_ids_result['count']
+ where_clause = "WHERE gp_segment_id=ANY({self.gpu_config})".format(**locals())
+ join_clause = gpu_join_clause.format(**locals())
+
+ elif self.distribution_rules == 'all_segments':
+
+ self.distribution_rules = '$__madlib__$all_segments$__madlib__$'
+ where_clause = ''
+ num_segments = get_seg_number()
+ join_clause = 'JOIN {dist_key_tbl} ON (b.buffer_id%{num_segments})= {dist_key_tbl}.id'.format(**locals())
+
+ else: # Read from a table with dbids to distribute the data
+
+ self._validate_distribution_table()
+ gpu_query = """
+ SELECT array_agg(content) as gpu_config,
+ array_agg(gp_segment_configuration.dbid) as dbid
+ FROM {self.distribution_rules} JOIN gp_segment_configuration
+ ON {self.distribution_rules}.dbid = gp_segment_configuration.dbid
+ """.format(**locals())
+ gpu_query_result = plpy.execute(gpu_query)[0]
+ self.gpu_config = "ARRAY{0}".format(sorted(gpu_query_result['gpu_config']))
+ where_clause = "WHERE gp_segment_id=ANY({self.gpu_config})".format(**locals())
+ num_segments = plpy.execute("SELECT count(*) as count FROM {self.distribution_rules}".format(**locals()))[0]['count']
+ join_clause = gpu_join_clause.format(**locals())
+ self.distribution_rules = "ARRAY{0}".format(sorted(gpu_query_result['dbid']))
+
+ dist_key_query = """
CREATE TEMP TABLE {dist_key_tbl} AS
SELECT gp_segment_id AS id, min({dist_key}) AS {dist_key}
FROM {series_tbl}
+ {where_clause}
GROUP BY gp_segment_id
- """.format(**locals())
- plpy.execute(query)
-
- num_segments = get_seg_number()
- join_clause = 'JOIN {dist_key_tbl} ON (b.buffer_id%{num_segments})= {dist_key_tbl}.id'.format(**locals())
- distributed_by_clause= ' DISTRIBUTED BY ({dist_key}) '.format(**locals())
- dist_key_comma = dist_key + ' ,'
+ """
+ plpy.execute(dist_key_query.format(**locals()))
+ # Create the mini-batched output table
sql = """
CREATE TABLE {self.output_table} AS
SELECT {dist_key_comma}
@@ -297,7 +382,9 @@ class InputDataPreprocessorDL(object):
{class_level_str} AS {class_values_colname},
{self.buffer_size} AS buffer_size,
{self.normalizing_const}::{FLOAT32_SQL_TYPE} AS {normalizing_const_colname},
- {self.num_classes} AS {num_classes_colname}
+ {self.num_classes} AS {num_classes_colname},
+ {self.distribution_rules} AS distribution_rules,
+ {self.gpu_config} AS __internal_gpu_config__
""".format(self=self, class_level_str=class_level_str,
dependent_varname_colname=DEPENDENT_VARNAME_COLNAME,
independent_varname_colname=INDEPENDENT_VARNAME_COLNAME,
@@ -365,11 +452,11 @@ class InputDataPreprocessorDL(object):
self.buffer_size, num_rows_in_tbl, indepdent_var_dim)
return ceil((1.0 * num_rows_in_tbl) / self.buffer_size)
-
class ValidationDataPreprocessorDL(InputDataPreprocessorDL):
def __init__(self, schema_madlib, source_table, output_table,
dependent_varname, independent_varname,
- training_preprocessor_table, buffer_size, **kwargs):
+ training_preprocessor_table, buffer_size, distribution_rules,
+ **kwargs):
"""
This prepares the variables that are required by
InputDataPreprocessorDL.
@@ -382,7 +469,7 @@ class ValidationDataPreprocessorDL(InputDataPreprocessorDL):
self, schema_madlib, source_table, output_table,
dependent_varname, independent_varname, buffer_size,
summary_table[NORMALIZING_CONST_COLNAME], num_classes,
- self.module_name)
+ distribution_rules, self.module_name)
# Update value of dependent_levels from training batch summary table.
self.dependent_levels = self._get_dependent_levels(
summary_table[CLASS_VALUES_COLNAME],
@@ -474,15 +561,18 @@ class ValidationDataPreprocessorDL(InputDataPreprocessorDL):
class TrainingDataPreprocessorDL(InputDataPreprocessorDL):
def __init__(self, schema_madlib, source_table, output_table,
dependent_varname, independent_varname, buffer_size,
- normalizing_const, num_classes, **kwargs):
+ normalizing_const, num_classes, distribution_rules,
+ **kwargs):
"""
This prepares the variables that are required by
InputDataPreprocessorDL.
"""
+ self.module_name = "training_preprocessor_dl"
InputDataPreprocessorDL.__init__(
self, schema_madlib, source_table, output_table,
dependent_varname, independent_varname, buffer_size,
- normalizing_const, num_classes, "training_preprocessor_dl")
+ normalizing_const, num_classes, distribution_rules,
+ self.module_name)
# Update default value of dependent_levels in superclass
self.dependent_levels = self._get_dependent_levels()
@@ -549,6 +639,12 @@ class InputDataPreprocessorDocumentation:
training_preprocessor_table, -- TEXT. packed training data table.
buffer_size -- INTEGER. Default computed automatically.
Number of source input rows to pack into a buffer.
+ distribution_rules -- TEXT. Default: 'all_segments'. Specifies how to
+ distribute the 'output_table'. This is important
+ for how the fit function will use resources on the
+ cluster. The default 'all_segments' means the
+ 'output_table' will be distributed to all segments
+ in the database cluster.
);
@@ -585,6 +681,10 @@ class InputDataPreprocessorDocumentation:
arrays in independent_varname.
num_classes -- num_classes value passed by user while
generating training_preprocessor_table.
+ gpu_config -- List of segment id's the data is distributed
+ on depending on the 'distribution_rules' parameter
+ specified as input. Set to 'all_segments' if
+ 'distribution_rules' is specified as 'all_segments'.
---------------------------------------------------------------------------
""".format(**locals())
@@ -646,6 +746,12 @@ class InputDataPreprocessorDocumentation:
the 1-hot encoded array length will be equal to
the number of distinct class values found in the
input table.
+ distribution_rules -- TEXT. Default: 'all_segments'. Specifies how to
+ distribute the 'output_table'. This is important
+ for how the fit function will use resources on the
+ cluster. The default 'all_segments' means the
+ 'output_table' will be distributed to all segments
+ in the database cluster.
);
@@ -681,6 +787,11 @@ class InputDataPreprocessorDocumentation:
normalizing_const -- Normalizing constant used for standardizing
arrays in independent_varname.
num_classes -- num_classes input param passed to function.
+ gpu_config -- List of segment id's the data is distributed
+ on depending on the 'distribution_rules' param
+ specified as input. Set to 'all_segments' if
+ 'distribution_rules' is specified as 'all_segments'.
+
---------------------------------------------------------------------------
""".format(**locals())
diff --git a/src/ports/postgres/modules/deep_learning/input_data_preprocessor.sql_in b/src/ports/postgres/modules/deep_learning/input_data_preprocessor.sql_in
index 29e4b4c..6d5f0ad 100644
--- a/src/ports/postgres/modules/deep_learning/input_data_preprocessor.sql_in
+++ b/src/ports/postgres/modules/deep_learning/input_data_preprocessor.sql_in
@@ -40,6 +40,7 @@ Interface and implementation are subject to change. </em>
<li class="level1"><a href="#validation_preprocessor_dl">Preprocessor for Validation Image Data</a></li>
<li class="level1"><a href="#output">Output Tables</a></li>
<li class="level1"><a href="#example">Examples</a></li>
+<li class="level1"><a href="#references">References</a></li>
<li class="level1"><a href="#related">Related Topics</a></li>
</ul></div>
@@ -71,7 +72,8 @@ training_preprocessor_dl(source_table,
independent_varname,
buffer_size,
normalizing_const,
- num_classes
+ num_classes,
+ distribution_rules
)
</pre>
@@ -136,6 +138,25 @@ training_preprocessor_dl(source_table,
length will be equal to the number
of distinct class values found in the input table.
</dd>
+
+ <dt>distribution_rules (optional)</dt>
+ <dd>TEXT, default: 'all_segments'. Specifies how to distribute the
+ 'output_table'. This is important for how the fit function will use
+ resources on the cluster. The default 'all_segments' means the
+ 'output_table' will be distributed to all segments in the database
+ cluster.
+
+ If you specify 'gpu_segments' then the 'output_table' will be distributed
+ to all segments that are on hosts that have GPUs attached. This will make
+ maximum use of GPU resources when training a deep learning model.
+
+ You can also specify the name of a resources table containing the segments
+ to be used for training. This table must contain a column called 'dbid' that
+ specifies the segment id from the 'gp_segment_configuration' table [1].
+ Refer to the utility function <a href="group__grp__gpu__configuration.html">GPU Configuration</a>
+ for more information on how to
+ identify segments attached to hosts that are GPU enabled.
+ </dd>
</dl>
@anchor validation_preprocessor_dl
@@ -146,7 +167,8 @@ validation_preprocessor_dl(source_table,
dependent_varname,
independent_varname,
training_preprocessor_table,
- buffer_size
+ buffer_size,
+ distribution_rules
)
</pre>
@@ -204,6 +226,24 @@ validation_preprocessor_dl(source_table,
rows specified in by the 'buffer_size' parameter.
</dd>
+ <dt>distribution_rules (optional)</dt>
+ <dd>TEXT, default: 'all_segments'. Specifies how to distribute the
+ 'output_table'. This is important for how the fit function will use
+ resources on the cluster. The default 'all_segments' means the
+ 'output_table' will be distributed to all segments in the database
+ cluster.
+
+ If you specify 'gpu_segments' then the 'output_table' will be distributed
+ to all segments that are on hosts that have GPUs attached. This will make
+ maximum use of GPU resources when training a deep learning model.
+
+ You can also specify the name of a resources table containing the segments
+ to be used for training. This table must contain a column called 'dbid' that
+ specifies the segment id from the 'gp_segment_configuration' table [1].
+ Refer to the utility function <a href="group__grp__gpu__configuration.html">GPU Configuration</a>
+ for more information on how to
+ identify segments attached to hosts that are GPU enabled.
+ </dd>
</dl>
@@ -298,6 +338,20 @@ both validation_preprocessor_dl() and training_preprocessor_dl() ):
levels found in the input data is less than the 'num_classes' parameter
specified in training_preprocessor_dl().</td>
</tr>
+ <tr>
+ <th>distribution_rules</th>
+ <td>This is the list of segment id's in the form of 'dbid'
+ describing how the 'output_table' is distributed,
+ as per the 'distribution_rules' input parameter.
+ If the 'distribution_rules' parameter is set to 'all_segments', then
+ this will also be set to 'all_segments'.</td>
+ </tr>
+ <tr>
+ <th>__internal_gpu_config__</th>
+ <td>For internal use. (Note: this is the list of segment id's
+ where data is distributed in the form of 'content' id, which
+ is different from 'dbid' [1].)</td>
+ </tr>
</table>
@anchor example
@@ -420,16 +474,18 @@ Review the output summary table:
SELECT * FROM image_data_packed_summary;
</pre>
<pre class="result">
--[ RECORD 1 ]-------+------------------
-source_table | image_data
-output_table | image_data_packed
-dependent_varname | species
-independent_varname | rgb
-dependent_vartype | text
-class_values | {bird,cat,dog}
-buffer_size | 26
-normalizing_const | 255
-num_classes | 3
+-[ RECORD 1 ]-----------+------------------
+source_table | image_data
+output_table | image_data_packed
+dependent_varname | species
+independent_varname | rgb
+dependent_vartype | text
+class_values | {bird,cat,dog}
+buffer_size | 26
+normalizing_const | 255
+num_classes | 3
+distribution_rules | all_segments
+__internal_gpu_config__ | all_segments
</pre>
-# Run the preprocessor for the validation dataset.
@@ -447,8 +503,9 @@ SELECT madlib.validation_preprocessor_dl(
NULL -- Buffer size
);
</pre>
-We can choose to use a new buffer size compared to the
-training_preprocessor_dl run. Other parameters such as num_classes and
+We could choose to use a different buffer size compared to the
+training_preprocessor_dl run (but generally don't need to).
+Other parameters such as num_classes and
normalizing_const that were passed to training_preprocessor_dl are
automatically inferred using the image_data_packed param that is passed.
Here is the packed output table of validation data for our simple example:
@@ -468,16 +525,18 @@ Review the output summary table:
SELECT * FROM val_image_data_packed_summary;
</pre>
<pre class="result">
--[ RECORD 1 ]-------+----------------------
-source_table | image_data
-output_table | val_image_data_packed
-dependent_varname | species
-independent_varname | rgb
-dependent_vartype | text
-class_values | {bird,cat,dog}
-buffer_size | 26
-normalizing_const | 255
-num_classes | 3
+-[ RECORD 1 ]-----------+----------------------
+source_table | image_data
+output_table | val_image_data_packed
+dependent_varname | species
+independent_varname | rgb
+dependent_vartype | text
+class_values | {bird,cat,dog}
+buffer_size | 26
+normalizing_const | 255
+num_classes | 3
+distribution_rules | all_segments
+__internal_gpu_config__ | all_segments
</pre>
-# Load data in another format. Create an artificial 2x2 resolution color image
@@ -641,16 +700,18 @@ Review the output summary table:
SELECT * FROM image_data_packed_summary;
</pre>
<pre class="result">
--[ RECORD 1 ]-------+------------------
-source_table | image_data
-output_table | image_data_packed
-dependent_varname | species
-independent_varname | rgb
-dependent_vartype | text
-class_values | {bird,cat,dog}
-buffer_size | 10
-normalizing_const | 255
-num_classes | 3
+-[ RECORD 1 ]-----------+------------------
+source_table | image_data
+output_table | image_data_packed
+dependent_varname | species
+independent_varname | rgb
+dependent_vartype | text
+class_values | {bird,cat,dog}
+buffer_size | 10
+normalizing_const | 255
+num_classes | 3
+distribution_rules | all_segments
+__internal_gpu_config__ | all_segments
</pre>
-# Run the preprocessor for image data with num_classes greater than 3 (distinct class values found in table):
@@ -682,23 +743,104 @@ Review the output summary table:
SELECT * FROM image_data_packed_summary;
</pre>
<pre class="result">
--[ RECORD 1 ]-------+-------------------------
-source_table | image_data
-output_table | image_data_packed
-dependent_varname | species
-independent_varname | rgb
-dependent_vartype | text
-class_values | {bird,cat,dog,NULL,NULL}
-buffer_size | 26
-normalizing_const | 255
-num_classes | 5
+-[ RECORD 1 ]-----------+-------------------------
+source_table | image_data
+output_table | image_data_packed
+dependent_varname | species
+independent_varname | rgb
+dependent_vartype | text
+class_values | {bird,cat,dog,NULL,NULL}
+buffer_size | 26
+normalizing_const | 255
+num_classes | 5
+distribution_rules | all_segments
+__internal_gpu_config__ | all_segments
</pre>
+-# Using distribution rules to specify how to distribute
+the 'output_table'. This is important for how the fit function
+will use resources on the cluster. To distribute to all segments
+on hosts with GPUs attached:
+<pre class="example">
+DROP TABLE IF EXISTS image_data_packed, image_data_packed_summary;
+SELECT madlib.training_preprocessor_dl('image_data', -- Source table
+ 'image_data_packed', -- Output table
+ 'species', -- Dependent variable
+ 'rgb', -- Independent variable
+ NULL, -- Buffer size
+ 255, -- Normalizing constant
+ NULL, -- Number of classes
+ 'gpu_segments' -- Distribution rules
+ );
+\\x on
+SELECT * FROM image_data_packed_summary;
+</pre>
+<pre class="result">
+-[ RECORD 1 ]-----------+------------------
+source_table | image_data
+output_table | image_data_packed
+dependent_varname | species
+independent_varname | rgb
+dependent_vartype | text
+class_values | {bird,cat,dog}
+buffer_size | 26
+normalizing_const | 255
+num_classes | 3
+distribution_rules | {2,3,4,5}
+__internal_gpu_config__ | {0,1,2,3}
+</pre>
+To distribute to only specified segments, create a
+distribution table with a column called 'dbid' that
+lists the segments you want:
+<pre class="example">
+DROP TABLE IF EXISTS segments_to_use;
+CREATE TABLE segments_to_use(
+ dbid INTEGER,
+ hostname TEXT
+);
+INSERT INTO segments_to_use VALUES
+(2, 'hostname-01'),
+(3, 'hostname-01');
+DROP TABLE IF EXISTS image_data_packed, image_data_packed_summary;
+SELECT madlib.training_preprocessor_dl('image_data', -- Source table
+ 'image_data_packed', -- Output table
+ 'species', -- Dependent variable
+ 'rgb', -- Independent variable
+ NULL, -- Buffer size
+ 255, -- Normalizing constant
+ NULL, -- Number of classes
+ 'segments_to_use' -- Distribution rules
+ );
+\\x on
+SELECT * FROM image_data_packed_summary;
+</pre>
+<pre class="result">
+-[ RECORD 1 ]-----------+------------------
+source_table | image_data
+output_table | image_data_packed
+dependent_varname | species
+independent_varname | rgb
+dependent_vartype | text
+class_values | {bird,cat,dog}
+buffer_size | 26
+normalizing_const | 255
+num_classes | 3
+distribution_rules | {2,3}
+__internal_gpu_config__ | {0,1}
+</pre>
+
+@anchor references
+@par References
+
+[1] Greenplum 'gp_segment_configuration' table https://gpdb.docs.pivotal.io/latest/ref_guide/system_catalogs/gp_segment_configuration.html
+
@anchor related
@par Related Topics
minibatch_preprocessing.sql_in
+gpu_configuration()
+
*/
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.validation_preprocessor_dl(
@@ -707,7 +849,8 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.validation_preprocessor_dl(
dependent_varname VARCHAR,
independent_varname VARCHAR,
training_preprocessor_table VARCHAR,
- buffer_size INTEGER
+ buffer_size INTEGER,
+ distribution_rules TEXT
) RETURNS VOID AS $$
PythonFunctionBodyOnly(deep_learning, input_data_preprocessor)
from utilities.control import MinWarning
@@ -723,9 +866,21 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.validation_preprocessor_dl(
output_table VARCHAR,
dependent_varname VARCHAR,
independent_varname VARCHAR,
+ training_preprocessor_table VARCHAR,
+ buffer_size INTEGER
+) RETURNS VOID AS $$
+ SELECT MADLIB_SCHEMA.validation_preprocessor_dl($1, $2, $3, $4, $5, $6, NULL);
+$$ LANGUAGE sql VOLATILE
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.validation_preprocessor_dl(
+ source_table VARCHAR,
+ output_table VARCHAR,
+ dependent_varname VARCHAR,
+ independent_varname VARCHAR,
training_preprocessor_table VARCHAR
) RETURNS VOID AS $$
- SELECT MADLIB_SCHEMA.validation_preprocessor_dl($1, $2, $3, $4, $5, NULL);
+ SELECT MADLIB_SCHEMA.validation_preprocessor_dl($1, $2, $3, $4, $5, NULL, NULL);
$$ LANGUAGE sql VOLATILE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
@@ -754,7 +909,8 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.training_preprocessor_dl(
independent_varname VARCHAR,
buffer_size INTEGER,
normalizing_const REAL,
- num_classes INTEGER
+ num_classes INTEGER,
+ distribution_rules TEXT
) RETURNS VOID AS $$
PythonFunctionBodyOnly(deep_learning, input_data_preprocessor)
from utilities.control import MinWarning
@@ -771,9 +927,22 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.training_preprocessor_dl(
dependent_varname VARCHAR,
independent_varname VARCHAR,
buffer_size INTEGER,
+ normalizing_const REAL,
+ num_classes INTEGER
+) RETURNS VOID AS $$
+ SELECT MADLIB_SCHEMA.training_preprocessor_dl($1, $2, $3, $4, $5, $6, $7, NULL);
+$$ LANGUAGE sql VOLATILE
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.training_preprocessor_dl(
+ source_table VARCHAR,
+ output_table VARCHAR,
+ dependent_varname VARCHAR,
+ independent_varname VARCHAR,
+ buffer_size INTEGER,
normalizing_const REAL
) RETURNS VOID AS $$
- SELECT MADLIB_SCHEMA.training_preprocessor_dl($1, $2, $3, $4, $5, $6, NULL);
+ SELECT MADLIB_SCHEMA.training_preprocessor_dl($1, $2, $3, $4, $5, $6, NULL, NULL);
$$ LANGUAGE sql VOLATILE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
@@ -784,7 +953,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.training_preprocessor_dl(
independent_varname VARCHAR,
buffer_size INTEGER
) RETURNS VOID AS $$
- SELECT MADLIB_SCHEMA.training_preprocessor_dl($1, $2, $3, $4, $5, 1.0, NULL);
+ SELECT MADLIB_SCHEMA.training_preprocessor_dl($1, $2, $3, $4, $5, 1.0, NULL, NULL);
$$ LANGUAGE sql VOLATILE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
@@ -794,7 +963,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.training_preprocessor_dl(
dependent_varname VARCHAR,
independent_varname VARCHAR
) RETURNS VOID AS $$
- SELECT MADLIB_SCHEMA.training_preprocessor_dl($1, $2, $3, $4, NULL, 1.0, NULL);
+ SELECT MADLIB_SCHEMA.training_preprocessor_dl($1, $2, $3, $4, NULL, 1.0, NULL, NULL);
$$ LANGUAGE sql VOLATILE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_fit_multiple_model.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_fit_multiple_model.py_in
index 6e3389c..4ce21dd 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_fit_multiple_model.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_fit_multiple_model.py_in
@@ -167,8 +167,8 @@ class FitMultipleModel():
self.run_training()
if mst_idx == (total_msts - 1):
end_iteration = time.time()
- self.info_str = "\tTime for training in iteration {0}: {1} sec\n".format(iter,
- end_iteration - start_iteration)
+ self.info_str = "\tTime for training in iteration {0}: {1} sec\n".format(
+ iter, end_iteration - start_iteration)
self.info_str += "\tTraining set after iteration {0}:".format(iter)
self.evaluate_model(iter, self.source_table, True)
if self.validation_table:
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in
index 2621670..70761d5 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in
@@ -50,6 +50,7 @@ FLOAT32_SQL_TYPE = 'REAL'
SMALLINT_SQL_TYPE = 'SMALLINT'
DEFAULT_NORMALIZING_CONST = 1.0
+DEFAULT_GPU_CONFIG = 'all_segments'
#####################################################################
diff --git a/src/ports/postgres/modules/deep_learning/test/input_data_preprocessor.sql_in b/src/ports/postgres/modules/deep_learning/test/input_data_preprocessor.sql_in
index 4a0ede6..93b4c52 100644
--- a/src/ports/postgres/modules/deep_learning/test/input_data_preprocessor.sql_in
+++ b/src/ports/postgres/modules/deep_learning/test/input_data_preprocessor.sql_in
@@ -112,6 +112,61 @@ SELECT training_preprocessor_dl(
SELECT assert(count(*)=(SELECT ceil(17.0/count(*)) from gp_segment_configuration WHERE role = 'p' and content != -1), 'Even distribution of buffers failed.')
FROM data_preprocessor_input_batch
WHERE gp_segment_id = 0;
+SELECT assert(__internal_gpu_config__ = 'all_segments', 'Missing column in summary table')
+FROM data_preprocessor_input_batch_summary;
+
+-- Test validation data is evenly distributed across all segments (GPDB only)
+DROP TABLE IF EXISTS validation_out, validation_out_summary;
+SELECT validation_preprocessor_dl(
+ 'data_preprocessor_input',
+ 'validation_out',
+ 'id',
+ 'x',
+ 'data_preprocessor_input_batch',
+ 1);
+SELECT assert(count(*)=(SELECT ceil(17.0/count(*)) from gp_segment_configuration WHERE role = 'p' and content != -1), 'Even distribution of validation buffers failed.')
+FROM validation_out
+WHERE gp_segment_id = 0;
+SELECT assert(__internal_gpu_config__ = 'all_segments', 'Missing column in validation summary table')
+FROM validation_out_summary;
+
+-- Test data distributed on specified segments
+DROP TABLE IF EXISTS segments_to_use;
+CREATE TABLE segments_to_use (dbid INTEGER, notes TEXT);
+INSERT INTO segments_to_use VALUES (2, 'GPU segment');
+DROP TABLE IF EXISTS data_preprocessor_input_batch, data_preprocessor_input_batch_summary;
+SELECT training_preprocessor_dl(
+ 'data_preprocessor_input',
+ 'data_preprocessor_input_batch',
+ 'id',
+ 'x',
+ 1,
+ NULL,
+ NULL,
+ 'segments_to_use');
+SELECT assert(count(DISTINCT(gp_segment_id)) = 1, 'Fail to distribute data on segment0')
+FROM data_preprocessor_input_batch;
+SELECT assert(count(*) = 17, 'Fail to distribute all data on segment0')
+FROM data_preprocessor_input_batch;
+SELECT assert(__internal_gpu_config__ = ARRAY[0], 'Invalid column value in summary table')
+FROM data_preprocessor_input_batch_summary;
+
+-- Test data distributed on specified segments for validation_preprocessor_dl
+DROP TABLE IF EXISTS validation_out, validation_out_summary;
+SELECT validation_preprocessor_dl(
+ 'data_preprocessor_input',
+ 'validation_out',
+ 'id',
+ 'x',
+ 'data_preprocessor_input_batch',
+ 1,
+ 'segments_to_use');
+SELECT assert(count(DISTINCT(gp_segment_id)) = 1, 'Failed to distribute validation data on segment0')
+FROM validation_out;
+SELECT assert(count(*) = 17, 'Fail to distribute all validation data on segment0')
+FROM validation_out;
+SELECT assert(__internal_gpu_config__ = ARRAY[0], 'Invalid column value in validation summary table')
+FROM validation_out_summary;
!>)
DROP TABLE IF EXISTS data_preprocessor_input;
@@ -168,7 +223,8 @@ SELECT assert
buffer_size = 4 AND -- we sort the class values in python
normalizing_const = 5 AND
pg_typeof(normalizing_const) = 'real'::regtype AND
- num_classes = 16,
+ num_classes = 16 AND
+ distribution_rules = 'all_segments',
'Summary Validation failed. Actual:' || __to_char(summary)
) from (select * from data_preprocessor_input_batch_summary) summary;
diff --git a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_input_data_preprocessor.py_in b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_input_data_preprocessor.py_in
index 8358697..f21176c 100644
--- a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_input_data_preprocessor.py_in
+++ b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_input_data_preprocessor.py_in
@@ -54,6 +54,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
self.default_buffer_size = 5
self.default_normalizing_const = 1.0
self.default_num_classes = None
+ self.default_distribution_rules = "all_segments"
self.default_module_name = "dummy"
import deep_learning.input_data_preprocessor
@@ -78,6 +79,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
self.default_buffer_size,
self.default_normalizing_const,
self.default_num_classes,
+ self.default_distribution_rules,
self.default_module_name)
preprocessor_obj.input_preprocessor_dl()
@@ -92,6 +94,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
None,
self.default_normalizing_const,
self.default_num_classes,
+ self.default_distribution_rules,
self.default_module_name)
self.util_module.MiniBatchBufferSizeCalculator.calculate_default_buffer_size = Mock(return_value = 5)
preprocessor_obj.input_preprocessor_dl()
@@ -108,6 +111,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
self.default_buffer_size,
self.default_normalizing_const,
self.default_num_classes,
+ self.default_distribution_rules,
self.default_module_name)
def test_input_preprocessor_multiple_indep_var_raises_exception(self):
@@ -122,6 +126,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
self.default_buffer_size,
self.default_normalizing_const,
self.default_num_classes,
+ self.default_distribution_rules,
self.default_module_name)
def test_input_preprocessor_buffer_size_zero_fails(self):
@@ -136,6 +141,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
0,
self.default_normalizing_const,
self.default_num_classes,
+ self.default_distribution_rules,
self.default_module_name)
def test_input_preprocessor_negative_buffer_size_fails(self):
@@ -149,6 +155,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
-1,
self.default_normalizing_const,
self.default_num_classes,
+ self.default_distribution_rules,
self.default_module_name)
def test_input_preprocessor_invalid_indep_vartype_raises_exception(self):
@@ -162,6 +169,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
self.default_buffer_size,
self.default_normalizing_const,
self.default_num_classes,
+ self.default_distribution_rules,
self.default_module_name)
def test_input_preprocessor_invalid_dep_vartype_raises_exception(self):
@@ -175,6 +183,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
self.default_buffer_size,
self.default_normalizing_const,
self.default_num_classes,
+ self.default_distribution_rules,
self.default_module_name)
def test_input_preprocessor_normalizing_const_zero_fails(self):
@@ -188,6 +197,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
self.default_buffer_size,
0,
self.default_num_classes,
+ self.default_distribution_rules,
self.default_module_name)
def test_input_preprocessor_negative_normalizing_const_fails(self):
@@ -201,6 +211,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
self.default_buffer_size,
-1,
self.default_num_classes,
+ self.default_distribution_rules,
self.default_module_name)
def test_get_one_hot_encoded_dep_var_expr_null_val(self):
@@ -215,6 +226,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
self.default_buffer_size,
self.default_normalizing_const,
self.default_num_classes,
+ self.default_distribution_rules,
self.default_module_name)
obj.dependent_levels = ["NULL", "'a'"]
dep_var_array_expr = obj.get_one_hot_encoded_dep_var_expr()
@@ -234,6 +246,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
self.default_buffer_size,
self.default_normalizing_const,
self.default_num_classes,
+ self.default_distribution_rules,
self.default_module_name)
dep_var_array_expr = obj.get_one_hot_encoded_dep_var_expr()
self.assertEqual("{0}::smallint[]".
@@ -251,6 +264,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
self.default_buffer_size,
self.default_normalizing_const,
None,
+ self.default_distribution_rules,
self.default_module_name)
obj.dependent_levels = ["dummy"]
self.assertEqual(0, obj.padding_size)
@@ -266,7 +280,8 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
self.default_ind_var,
self.default_buffer_size,
self.default_normalizing_const,
- 5)
+ 5,
+ self.default_distribution_rules)
obj._set_one_hot_encoding_variables()
self.assertEqual(2, obj.padding_size)
@@ -281,10 +296,79 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
self.default_ind_var,
self.default_buffer_size,
self.default_normalizing_const,
- 2)
+ 2,
+ self.default_distribution_rules)
with self.assertRaises(plpy.PLPYException):
obj._set_one_hot_encoding_variables()
+ def test_validate_distribution_table(self):
+ self.module.get_expr_type = Mock(side_effect = ['integer[]', 'integer[]'])
+
+ obj = self.module.InputDataPreprocessorDL(self.default_schema_madlib,
+ self.default_source_table,
+ self.default_output_table,
+ self.default_dep_var,
+ self.default_ind_var,
+ self.default_buffer_size,
+ self.default_normalizing_const,
+ self.default_num_classes,
+ self.default_distribution_rules,
+ self.default_module_name)
+ self.module.input_tbl_valid = Mock()
+ self.module.is_var_valid = Mock()
+ self.plpy_mock_execute.side_effect = [
+ [{'dbids': [2,3,4]}],
+ [{'dbids': [3,4], 'c1': 2, 'c2': 2}]
+ ]
+
+ obj._validate_distribution_table()
+
+ def test_validate_distribution_table_dup(self):
+ self.module.get_expr_type = Mock(side_effect = ['integer[]', 'integer[]'])
+
+ obj = self.module.InputDataPreprocessorDL(self.default_schema_madlib,
+ self.default_source_table,
+ self.default_output_table,
+ self.default_dep_var,
+ self.default_ind_var,
+ self.default_buffer_size,
+ self.default_normalizing_const,
+ self.default_num_classes,
+ self.default_distribution_rules,
+ self.default_module_name)
+ self.module.input_tbl_valid = Mock()
+ self.module.is_var_valid = Mock()
+ self.plpy_mock_execute.side_effect = [
+ [{'dbids': [2,3,4]}],
+ [{'dbids': [3,3], 'c1': 2, 'c2': 1}]
+ ]
+ with self.assertRaises(plpy.PLPYException) as error:
+ obj._validate_distribution_table()
+ self.assertIn('duplicate', str(error.exception).lower())
+
+ def test_validate_distribution_table_invalid(self):
+ self.module.get_expr_type = Mock(side_effect = ['integer[]', 'integer[]'])
+ obj = self.module.InputDataPreprocessorDL(self.default_schema_madlib,
+ self.default_source_table,
+ self.default_output_table,
+ self.default_dep_var,
+ self.default_ind_var,
+ self.default_buffer_size,
+ self.default_normalizing_const,
+ self.default_num_classes,
+ self.default_distribution_rules,
+ self.default_module_name)
+ self.module.input_tbl_valid = Mock()
+ self.module.is_var_valid = Mock()
+ self.plpy_mock_execute.side_effect = [
+ [{'dbids': [2,3,4]}],
+ [{'dbids': [3,30], 'c1': 2, 'c2': 2}]
+ ]
+ with self.assertRaises(plpy.PLPYException) as error:
+ obj._validate_distribution_table()
+ self.assertIn('invalid', str(error.exception).lower())
+
+
if __name__ == '__main__':
unittest.main()