You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ok...@apache.org on 2019/12/02 19:02:15 UTC

[madlib] branch master updated: DL: Add support for asymmetric segment distribution to preprocessor

This is an automated email from the ASF dual-hosted git repository.

okislal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git


The following commit(s) were added to refs/heads/master by this push:
     new ef11e2b  DL: Add support for asymmetric segment distribution to preprocessor
ef11e2b is described below

commit ef11e2b523423872311991b2162dccbdbdb9e37c
Author: Orhan Kislal <ok...@apache.org>
AuthorDate: Mon Dec 2 14:01:05 2019 -0500

    DL: Add support for asymmetric segment distribution to preprocessor
    
    JIRA: MADLIB-1392
    
    This commit adds a new parameter `distribution_rules`, to the functions
    `training_preprocessor_dl()` and `validation_preprocessor_dl()`. This
    parameter specifies how to distribute the `output_table` on the cluster,
    `all_segments` being the default value. This change adds a new column to
    the output summary table `gpu_config`, containing values based on the
    input `distribution_rules`.
    
    Closes #459
    
    Co-authored-by: Ekta Khanna <ek...@pivotal.io>
---
 .../deep_learning/input_data_preprocessor.py_in    | 163 +++++++++++--
 .../deep_learning/input_data_preprocessor.sql_in   | 269 +++++++++++++++++----
 .../madlib_keras_fit_multiple_model.py_in          |   4 +-
 .../deep_learning/madlib_keras_helper.py_in        |   1 +
 .../test/input_data_preprocessor.sql_in            |  58 ++++-
 .../unit_tests/test_input_data_preprocessor.py_in  |  88 ++++++-
 6 files changed, 502 insertions(+), 81 deletions(-)

diff --git a/src/ports/postgres/modules/deep_learning/input_data_preprocessor.py_in b/src/ports/postgres/modules/deep_learning/input_data_preprocessor.py_in
index 1f833eb..1acd136 100644
--- a/src/ports/postgres/modules/deep_learning/input_data_preprocessor.py_in
+++ b/src/ports/postgres/modules/deep_learning/input_data_preprocessor.py_in
@@ -34,6 +34,7 @@ from utilities.utilities import add_postfix
 from utilities.utilities import is_platform_pg
 from utilities.utilities import is_psql_char_type
 from utilities.utilities import is_valid_psql_type
+from utilities.utilities import is_var_valid
 from utilities.utilities import BOOLEAN, NUMERIC, ONLY_ARRAY, TEXT
 from utilities.utilities import py_list_to_sql_string
 from utilities.utilities import split_quoted_delimited_str
@@ -51,7 +52,7 @@ NUM_CLASSES_COLNAME = "num_classes"
 class InputDataPreprocessorDL(object):
     def __init__(self, schema_madlib, source_table, output_table,
                  dependent_varname, independent_varname, buffer_size,
-                 normalizing_const, num_classes, module_name):
+                 normalizing_const, num_classes, distribution_rules, module_name):
         self.schema_madlib = schema_madlib
         self.source_table = source_table
         self.output_table = output_table
@@ -60,10 +61,12 @@ class InputDataPreprocessorDL(object):
         self.buffer_size = buffer_size
         self.normalizing_const = normalizing_const if normalizing_const is not None else DEFAULT_NORMALIZING_CONST
         self.num_classes = num_classes
+        self.distribution_rules = distribution_rules if distribution_rules else DEFAULT_GPU_CONFIG
         self.module_name = module_name
         self.output_summary_table = None
         self.dependent_vartype = None
         self.independent_vartype = None
+        self.gpu_config = '$__madlib__$all_segments$__madlib__$'
         if self.output_table:
             self.output_summary_table = add_postfix(self.output_table, "_summary")
 
@@ -99,6 +102,34 @@ class InputDataPreprocessorDL(object):
                 "in table ({1}).".format(
                     self.module_name, len(self.dependent_levels)))
 
+    def _validate_distribution_table(self):
+
+        input_tbl_valid(self.distribution_rules, self.module_name,
+                        error_suffix_str="""
+                        segments_to_use table ({self.distribution_rules}) doesn't exist.
+                        """.format(self=self))
+        _assert(is_var_valid(self.distribution_rules, 'dbid'),
+                "{self.module_name}: distribution rules table must contain dbib column".format(
+                    self=self))
+        dbids = plpy.execute("""
+            SELECT array_agg(dbid) AS dbids FROM gp_segment_configuration
+            WHERE content >= 0 AND role = 'p'
+            """)[0]['dbids']
+        dist_result = plpy.execute("""
+            SELECT array_agg(dbid) AS dbids,
+                   count(dbid) AS c1,
+                   count(DISTINCT dbid) AS c2
+            FROM {0} """.format(self.distribution_rules))
+
+        _assert(dist_result[0]['c1'] == dist_result[0]['c2'],
+            '{self.module_name}: distribution rules table contains duplicate dbids'.format(
+                self=self))
+
+        for i in dist_result[0]['dbids']:
+            _assert(i in dbids,
+                '{self.module_name}: invalid dbid:{i} in the distribution rules table'.format(
+                    self=self, i=i))
+
     def get_one_hot_encoded_dep_var_expr(self):
         """
         :param dependent_varname: Name of the dependent variable
@@ -204,42 +235,96 @@ class InputDataPreprocessorDL(object):
         dep_shape = self._get_dependent_var_shape()
         dep_shape = ','.join([str(i) for i in dep_shape])
 
-        # Create the mini-batched output table
         if is_platform_pg():
+            self.distribution_rules = '$__madlib__$all_segments$__madlib__$'
             distributed_by_clause = ''
             dist_key_clause = ''
             join_clause = ''
-            select_clause = 'b.*'
             dist_key_comma = ''
-
         else:
-
             dist_key = DISTRIBUTION_KEY_COLNAME
             # Create large temp table such that there is atleast 1 row on each segment
             # Using 999999 would distribute data(atleast 1 row on each segment) for
             # a cluster as large as 20000
             query = """
-                CREATE TEMP TABLE {series_tbl}
-                AS
-                SELECT generate_series(0, 999999) {dist_key}
-                DISTRIBUTED BY ({dist_key})
-            """.format(**locals())
+                    CREATE TEMP TABLE {series_tbl}
+                    AS
+                    SELECT generate_series(0, 999999) {dist_key}
+                    DISTRIBUTED BY ({dist_key})
+                """.format(**locals())
             plpy.execute(query)
-
-            # Create temp table to get unique distribution key values for each segment
-            query = """
+            distributed_by_clause= ' DISTRIBUTED BY ({dist_key}) '.format(**locals())
+            dist_key_comma = dist_key + ' ,'
+            gpu_join_clause = """JOIN {dist_key_tbl} ON
+                ({self.gpu_config})[b.buffer_id%{num_segments}+1] = {dist_key_tbl}.id
+                """
+
+            if self.distribution_rules == 'gpu_segments':
+                gpu_info_table = unique_string(desp='gpu_info')
+                plpy.execute("""
+                    SELECT {self.schema_madlib}.gpu_configuration('{gpu_info_table}')
+                """.format(**locals()))
+                gpu_query = """
+                    SELECT array_agg(DISTINCT(hostname)) as gpu_config
+                    FROM {gpu_info_table}
+                """.format(**locals())
+                gpu_query_result = plpy.execute(gpu_query)[0]['gpu_config']
+                if not gpu_query_result:
+                   plpy.error("{self.module_name}: No GPUs configured on hosts.".format(self=self))
+
+                gpu_config_hostnames = "ARRAY{0}".format(gpu_query_result)
+                # find hosts with gpus
+                get_segment_query = """
+                    SELECT array_agg(content) as segment_ids,
+                           array_agg(dbid) as dbid,
+                           count(*) as count
+                    FROM gp_segment_configuration
+                    WHERE content != -1 AND role = 'p'
+                    AND hostname=ANY({gpu_config_hostnames})
+                """.format(**locals())
+                segment_ids_result = plpy.execute(get_segment_query)[0]
+                plpy.execute("DROP TABLE IF EXISTS {0}".format(gpu_info_table))
+
+                self.gpu_config = "ARRAY{0}".format(sorted(segment_ids_result['segment_ids']))
+                self.distribution_rules = "ARRAY{0}".format(sorted(segment_ids_result['dbid']))
+
+                num_segments = segment_ids_result['count']
+                where_clause = "WHERE gp_segment_id=ANY({self.gpu_config})".format(**locals())
+                join_clause = gpu_join_clause.format(**locals())
+
+            elif self.distribution_rules == 'all_segments':
+
+                self.distribution_rules = '$__madlib__$all_segments$__madlib__$'
+                where_clause = ''
+                num_segments = get_seg_number()
+                join_clause = 'JOIN {dist_key_tbl} ON (b.buffer_id%{num_segments})= {dist_key_tbl}.id'.format(**locals())
+
+            else:  # Read from a table with dbids to distribute the data
+
+                self._validate_distribution_table()
+                gpu_query = """
+                    SELECT array_agg(content) as gpu_config,
+                           array_agg(gp_segment_configuration.dbid) as dbid
+                    FROM {self.distribution_rules} JOIN gp_segment_configuration
+                    ON {self.distribution_rules}.dbid = gp_segment_configuration.dbid
+                """.format(**locals())
+                gpu_query_result = plpy.execute(gpu_query)[0]
+                self.gpu_config = "ARRAY{0}".format(sorted(gpu_query_result['gpu_config']))
+                where_clause = "WHERE gp_segment_id=ANY({self.gpu_config})".format(**locals())
+                num_segments = plpy.execute("SELECT count(*) as count FROM {self.distribution_rules}".format(**locals()))[0]['count']
+                join_clause = gpu_join_clause.format(**locals())
+                self.distribution_rules = "ARRAY{0}".format(sorted(gpu_query_result['dbid']))
+
+            dist_key_query = """
                     CREATE TEMP TABLE {dist_key_tbl} AS
                     SELECT gp_segment_id AS id, min({dist_key}) AS {dist_key}
                     FROM {series_tbl}
+                    {where_clause}
                     GROUP BY gp_segment_id
-            """.format(**locals())
-            plpy.execute(query)
-
-            num_segments = get_seg_number()
-            join_clause = 'JOIN {dist_key_tbl} ON (b.buffer_id%{num_segments})= {dist_key_tbl}.id'.format(**locals())
-            distributed_by_clause= ' DISTRIBUTED BY ({dist_key}) '.format(**locals())
-            dist_key_comma = dist_key + ' ,'
+            """
+            plpy.execute(dist_key_query.format(**locals()))
 
+        # Create the mini-batched output table
         sql = """
             CREATE TABLE {self.output_table} AS
             SELECT {dist_key_comma}
@@ -297,7 +382,9 @@ class InputDataPreprocessorDL(object):
                 {class_level_str} AS {class_values_colname},
                 {self.buffer_size} AS buffer_size,
                 {self.normalizing_const}::{FLOAT32_SQL_TYPE} AS {normalizing_const_colname},
-                {self.num_classes} AS {num_classes_colname}
+                {self.num_classes} AS {num_classes_colname},
+                {self.distribution_rules} AS distribution_rules,
+                {self.gpu_config} AS __internal_gpu_config__
             """.format(self=self, class_level_str=class_level_str,
                        dependent_varname_colname=DEPENDENT_VARNAME_COLNAME,
                        independent_varname_colname=INDEPENDENT_VARNAME_COLNAME,
@@ -365,11 +452,11 @@ class InputDataPreprocessorDL(object):
             self.buffer_size, num_rows_in_tbl, indepdent_var_dim)
         return ceil((1.0 * num_rows_in_tbl) / self.buffer_size)
 
-
 class ValidationDataPreprocessorDL(InputDataPreprocessorDL):
     def __init__(self, schema_madlib, source_table, output_table,
                  dependent_varname, independent_varname,
-                 training_preprocessor_table, buffer_size, **kwargs):
+                 training_preprocessor_table, buffer_size, distribution_rules,
+                 **kwargs):
         """
             This prepares the variables that are required by
             InputDataPreprocessorDL.
@@ -382,7 +469,7 @@ class ValidationDataPreprocessorDL(InputDataPreprocessorDL):
             self, schema_madlib, source_table, output_table,
             dependent_varname, independent_varname, buffer_size,
             summary_table[NORMALIZING_CONST_COLNAME], num_classes,
-            self.module_name)
+            distribution_rules, self.module_name)
         # Update value of dependent_levels from training batch summary table.
         self.dependent_levels = self._get_dependent_levels(
             summary_table[CLASS_VALUES_COLNAME],
@@ -474,15 +561,18 @@ class ValidationDataPreprocessorDL(InputDataPreprocessorDL):
 class TrainingDataPreprocessorDL(InputDataPreprocessorDL):
     def __init__(self, schema_madlib, source_table, output_table,
                  dependent_varname, independent_varname, buffer_size,
-                 normalizing_const, num_classes, **kwargs):
+                 normalizing_const, num_classes, distribution_rules,
+                **kwargs):
         """
             This prepares the variables that are required by
             InputDataPreprocessorDL.
         """
+        self.module_name = "training_preprocessor_dl"
         InputDataPreprocessorDL.__init__(
             self, schema_madlib, source_table, output_table,
             dependent_varname, independent_varname, buffer_size,
-            normalizing_const, num_classes, "training_preprocessor_dl")
+            normalizing_const, num_classes, distribution_rules,
+            self.module_name)
         # Update default value of dependent_levels in superclass
         self.dependent_levels = self._get_dependent_levels()
 
@@ -549,6 +639,12 @@ class InputDataPreprocessorDocumentation:
             training_preprocessor_table, -- TEXT. packed training data table.
             buffer_size            -- INTEGER. Default computed automatically.
                                       Number of source input rows to pack into a buffer.
+            distribution_rules     -- TEXT. Default: 'all_segments'. Specifies how to
+                                      distribute the 'output_table'. This is important
+                                      for how the fit function will use resources on the
+                                      cluster.  The default 'all_segments' means the
+                                      'output_table' will be distributed to all segments
+                                      in the database cluster.
         );
 
 
@@ -585,6 +681,10 @@ class InputDataPreprocessorDocumentation:
                                      arrays in independent_varname.
         num_classes               -- num_classes value passed by user while
                                      generating training_preprocessor_table.
+        gpu_config                -- List of segment id's the data is distributed
+                                     on depending on the 'distribution_rules' parameter
+                                     specified as input. Set to 'all_segments' if
+                                     'distribution_rules' is specified as 'all_segments'.
 
         ---------------------------------------------------------------------------
         """.format(**locals())
@@ -646,6 +746,12 @@ class InputDataPreprocessorDocumentation:
                                       the 1-hot encoded array length will be equal to
                                       the number of distinct class values found in the
                                       input table.
+            distribution_rules     -- TEXT. Default: 'all_segments'. Specifies how to
+                                      distribute the 'output_table'. This is important
+                                      for how the fit function will use resources on the
+                                      cluster.  The default 'all_segments' means the
+                                      'output_table' will be distributed to all segments
+                                      in the database cluster.
         );
 
 
@@ -681,6 +787,11 @@ class InputDataPreprocessorDocumentation:
         normalizing_const         -- Normalizing constant used for standardizing
                                      arrays in independent_varname.
         num_classes               -- num_classes input param passed to function.
+        gpu_config                -- List of segment id's the data is distributed
+                                     on depending on the 'distribution_rules' param
+                                     specified as input. Set to 'all_segments' if
+                                     'distribution_rules' is specified as 'all_segments'.
+
 
         ---------------------------------------------------------------------------
         """.format(**locals())
diff --git a/src/ports/postgres/modules/deep_learning/input_data_preprocessor.sql_in b/src/ports/postgres/modules/deep_learning/input_data_preprocessor.sql_in
index 29e4b4c..6d5f0ad 100644
--- a/src/ports/postgres/modules/deep_learning/input_data_preprocessor.sql_in
+++ b/src/ports/postgres/modules/deep_learning/input_data_preprocessor.sql_in
@@ -40,6 +40,7 @@ Interface and implementation are subject to change. </em>
 <li class="level1"><a href="#validation_preprocessor_dl">Preprocessor for Validation Image Data</a></li>
 <li class="level1"><a href="#output">Output Tables</a></li>
 <li class="level1"><a href="#example">Examples</a></li>
+<li class="level1"><a href="#references">References</a></li>
 <li class="level1"><a href="#related">Related Topics</a></li>
 </ul></div>
 
@@ -71,7 +72,8 @@ training_preprocessor_dl(source_table,
                          independent_varname,
                          buffer_size,
                          normalizing_const,
-                         num_classes
+                         num_classes,
+                         distribution_rules
                         )
 </pre>
 
@@ -136,6 +138,25 @@ training_preprocessor_dl(source_table,
   length will be equal to the number
   of distinct class values found in the input table.
   </dd>
+
+  <dt>distribution_rules (optional)</dt>
+  <dd>TEXT, default: 'all_segments'. Specifies how to distribute the
+  'output_table'.  This is important for how the fit function will use
+  resources on the cluster.  The default 'all_segments' means the
+  'output_table' will be distributed to all segments in the database
+  cluster.
+
+  If you specify 'gpu_segments' then the 'output_table' will be distributed
+  to all segments that are on hosts that have GPUs attached. This will make
+  maximum use of GPU resources when training a deep learning model.
+
+  You can also specify the name of a resources table containing the segments
+  to be used for training.  This table must contain a column called 'dbid' that
+  specifies the segment id from the 'gp_segment_configuration' table [1].
+  Refer to the utility function <a href="group__grp__gpu__configuration.html">GPU Configuration</a>
+  for more information on how to
+  identify segments attached to hosts that are GPU enabled.
+  </dd>
 </dl>
 
 @anchor validation_preprocessor_dl
@@ -146,7 +167,8 @@ validation_preprocessor_dl(source_table,
                            dependent_varname,
                            independent_varname,
                            training_preprocessor_table,
-                           buffer_size
+                           buffer_size,
+                           distribution_rules
                           )
 </pre>
 
@@ -204,6 +226,24 @@ validation_preprocessor_dl(source_table,
   rows specified in by the 'buffer_size' parameter.
   </dd>
 
+ <dt>distribution_rules (optional)</dt>
+  <dd>TEXT, default: 'all_segments'. Specifies how to distribute the
+  'output_table'.  This is important for how the fit function will use
+  resources on the cluster.  The default 'all_segments' means the
+  'output_table' will be distributed to all segments in the database
+  cluster.
+
+  If you specify 'gpu_segments' then the 'output_table' will be distributed
+  to all segments that are on hosts that have GPUs attached. This will make
+  maximum use of GPU resources when training a deep learning model.
+
+  You can also specify the name of a resources table containing the segments
+  to be used for training.  This table must contain a column called 'dbid' that
+  specifies the segment id from the 'gp_segment_configuration' table [1].
+  Refer to the utility function <a href="group__grp__gpu__configuration.html">GPU Configuration</a>
+  for more information on how to
+  identify segments attached to hosts that are GPU enabled.
+  </dd>
 
 </dl>
 
@@ -298,6 +338,20 @@ both validation_preprocessor_dl() and training_preprocessor_dl() ):
         levels found in the input data is less than the 'num_classes' parameter
         specified in training_preprocessor_dl().</td>
     </tr>
+    <tr>
+        <th>distribution_rules</th>
+        <td>This is the list of segment id's in the form of 'dbid'
+        describing how the 'output_table' is distributed,
+        as per the 'distribution_rules' input parameter.
+        If the 'distribution_rules' parameter is set to 'all_segments', then
+        this will also be set to 'all_segments'.</td>
+    </tr>
+    <tr>
+        <th>__internal_gpu_config__</th>
+        <td>For internal use.  (Note: this is the list of segment id's
+        where data is distributed in the form of 'content' id, which
+        is different from 'dbid' [1].)</td>
+    </tr>
    </table>
 
 @anchor example
@@ -420,16 +474,18 @@ Review the output summary table:
 SELECT * FROM image_data_packed_summary;
 </pre>
 <pre class="result">
--[ RECORD 1 ]-------+------------------
-source_table        | image_data
-output_table        | image_data_packed
-dependent_varname   | species
-independent_varname | rgb
-dependent_vartype   | text
-class_values        | {bird,cat,dog}
-buffer_size         | 26
-normalizing_const   | 255
-num_classes         | 3
+-[ RECORD 1 ]-----------+------------------
+source_table            | image_data
+output_table            | image_data_packed
+dependent_varname       | species
+independent_varname     | rgb
+dependent_vartype       | text
+class_values            | {bird,cat,dog}
+buffer_size             | 26
+normalizing_const       | 255
+num_classes             | 3
+distribution_rules      | all_segments
+__internal_gpu_config__ | all_segments
 </pre>
 
 -#  Run the preprocessor for the validation dataset.
@@ -447,8 +503,9 @@ SELECT madlib.validation_preprocessor_dl(
       NULL                      -- Buffer size
       );
 </pre>
-We can choose to use a new buffer size compared to the
-training_preprocessor_dl run. Other parameters such as num_classes and
+We could choose to use a different buffer size compared to the
+training_preprocessor_dl run (but generally don't need to).
+Other parameters such as num_classes and
 normalizing_const that were passed to training_preprocessor_dl are
 automatically inferred using the image_data_packed param that is passed.
 Here is the packed output table of validation data for our simple example:
@@ -468,16 +525,18 @@ Review the output summary table:
 SELECT * FROM val_image_data_packed_summary;
 </pre>
 <pre class="result">
--[ RECORD 1 ]-------+----------------------
-source_table        | image_data
-output_table        | val_image_data_packed
-dependent_varname   | species
-independent_varname | rgb
-dependent_vartype   | text
-class_values        | {bird,cat,dog}
-buffer_size         | 26
-normalizing_const   | 255
-num_classes         | 3
+-[ RECORD 1 ]-----------+----------------------
+source_table            | image_data
+output_table            | val_image_data_packed
+dependent_varname       | species
+independent_varname     | rgb
+dependent_vartype       | text
+class_values            | {bird,cat,dog}
+buffer_size             | 26
+normalizing_const       | 255
+num_classes             | 3
+distribution_rules      | all_segments
+__internal_gpu_config__ | all_segments
 </pre>
 
 -#  Load data in another format.  Create an artificial 2x2 resolution color image
@@ -641,16 +700,18 @@ Review the output summary table:
 SELECT * FROM image_data_packed_summary;
 </pre>
 <pre class="result">
--[ RECORD 1 ]-------+------------------
-source_table        | image_data
-output_table        | image_data_packed
-dependent_varname   | species
-independent_varname | rgb
-dependent_vartype   | text
-class_values        | {bird,cat,dog}
-buffer_size         | 10
-normalizing_const   | 255
-num_classes         | 3
+-[ RECORD 1 ]-----------+------------------
+source_table            | image_data
+output_table            | image_data_packed
+dependent_varname       | species
+independent_varname     | rgb
+dependent_vartype       | text
+class_values            | {bird,cat,dog}
+buffer_size             | 10
+normalizing_const       | 255
+num_classes             | 3
+distribution_rules      | all_segments
+__internal_gpu_config__ | all_segments
 </pre>
 
 -#  Run the preprocessor for image data with num_classes greater than 3 (distinct class values found in table):
@@ -682,23 +743,104 @@ Review the output summary table:
 SELECT * FROM image_data_packed_summary;
 </pre>
 <pre class="result">
--[ RECORD 1 ]-------+-------------------------
-source_table        | image_data
-output_table        | image_data_packed
-dependent_varname   | species
-independent_varname | rgb
-dependent_vartype   | text
-class_values        | {bird,cat,dog,NULL,NULL}
-buffer_size         | 26
-normalizing_const   | 255
-num_classes         | 5
+-[ RECORD 1 ]-----------+-------------------------
+source_table            | image_data
+output_table            | image_data_packed
+dependent_varname       | species
+independent_varname     | rgb
+dependent_vartype       | text
+class_values            | {bird,cat,dog,NULL,NULL}
+buffer_size             | 26
+normalizing_const       | 255
+num_classes             | 5
+distribution_rules      | all_segments
+__internal_gpu_config__ | all_segments
 </pre>
 
+-#  Using distribution rules to specify how to distribute
+the 'output_table'. This is important for how the fit function
+will use resources on the cluster. To distribute to all segments
+on hosts with GPUs attached:
+<pre class="example">
+DROP TABLE IF EXISTS image_data_packed, image_data_packed_summary;
+SELECT madlib.training_preprocessor_dl('image_data',          -- Source table
+                                        'image_data_packed',  -- Output table
+                                        'species',            -- Dependent variable
+                                        'rgb',                -- Independent variable
+                                        NULL,                 -- Buffer size
+                                        255,                  -- Normalizing constant
+                                        NULL,                 -- Number of classes
+                                        'gpu_segments'        -- Distribution rules
+                                        );
+\\x on
+SELECT * FROM image_data_packed_summary;
+</pre>
+<pre class="result">
+-[ RECORD 1 ]-----------+------------------
+source_table            | image_data
+output_table            | image_data_packed
+dependent_varname       | species
+independent_varname     | rgb
+dependent_vartype       | text
+class_values            | {bird,cat,dog}
+buffer_size             | 26
+normalizing_const       | 255
+num_classes             | 3
+distribution_rules      | {2,3,4,5}
+__internal_gpu_config__ | {0,1,2,3}
+</pre>
+To distribute to only specified segments, create a
+distribution table with a column called 'dbid' that
+lists the segments you want:
+<pre class="example">
+DROP TABLE IF EXISTS segments_to_use;
+CREATE TABLE segments_to_use(
+    dbid INTEGER,
+    hostname TEXT
+);
+INSERT INTO segments_to_use VALUES
+(2, 'hostname-01'),
+(3, 'hostname-01');
+DROP TABLE IF EXISTS image_data_packed, image_data_packed_summary;
+SELECT madlib.training_preprocessor_dl('image_data',          -- Source table
+                                        'image_data_packed',  -- Output table
+                                        'species',            -- Dependent variable
+                                        'rgb',                -- Independent variable
+                                        NULL,                 -- Buffer size
+                                        255,                  -- Normalizing constant
+                                        NULL,                 -- Number of classes
+                                        'segments_to_use'     -- Distribution rules
+                                        );
+\\x on
+SELECT * FROM image_data_packed_summary;
+</pre>
+<pre class="result">
+-[ RECORD 1 ]-----------+------------------
+source_table            | image_data
+output_table            | image_data_packed
+dependent_varname       | species
+independent_varname     | rgb
+dependent_vartype       | text
+class_values            | {bird,cat,dog}
+buffer_size             | 26
+normalizing_const       | 255
+num_classes             | 3
+distribution_rules      | {2,3}
+__internal_gpu_config__ | {0,1}
+</pre>
+
+@anchor references
+@par References
+
+[1] Greenplum 'gp_segment_configuration' table https://gpdb.docs.pivotal.io/latest/ref_guide/system_catalogs/gp_segment_configuration.html
+
 @anchor related
 @par Related Topics
 
 minibatch_preprocessing.sql_in
 
+gpu_configuration()
+
 */
 
 CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.validation_preprocessor_dl(
@@ -707,7 +849,8 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.validation_preprocessor_dl(
     dependent_varname           VARCHAR,
     independent_varname         VARCHAR,
     training_preprocessor_table VARCHAR,
-    buffer_size                 INTEGER
+    buffer_size                 INTEGER,
+    distribution_rules          TEXT
 ) RETURNS VOID AS $$
     PythonFunctionBodyOnly(deep_learning, input_data_preprocessor)
     from utilities.control import MinWarning
@@ -723,9 +866,21 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.validation_preprocessor_dl(
     output_table                VARCHAR,
     dependent_varname           VARCHAR,
     independent_varname         VARCHAR,
+    training_preprocessor_table VARCHAR,
+    buffer_size                 INTEGER
+) RETURNS VOID AS $$
+  SELECT MADLIB_SCHEMA.validation_preprocessor_dl($1, $2, $3, $4, $5, $6, NULL);
+$$ LANGUAGE sql VOLATILE
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.validation_preprocessor_dl(
+    source_table                VARCHAR,
+    output_table                VARCHAR,
+    dependent_varname           VARCHAR,
+    independent_varname         VARCHAR,
     training_preprocessor_table VARCHAR
 ) RETURNS VOID AS $$
-  SELECT MADLIB_SCHEMA.validation_preprocessor_dl($1, $2, $3, $4, $5, NULL);
+  SELECT MADLIB_SCHEMA.validation_preprocessor_dl($1, $2, $3, $4, $5, NULL, NULL);
 $$ LANGUAGE sql VOLATILE
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 
@@ -754,7 +909,8 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.training_preprocessor_dl(
     independent_varname         VARCHAR,
     buffer_size                 INTEGER,
     normalizing_const           REAL,
-    num_classes                 INTEGER
+    num_classes                 INTEGER,
+    distribution_rules          TEXT
 ) RETURNS VOID AS $$
     PythonFunctionBodyOnly(deep_learning, input_data_preprocessor)
     from utilities.control import MinWarning
@@ -771,9 +927,22 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.training_preprocessor_dl(
     dependent_varname       VARCHAR,
     independent_varname     VARCHAR,
     buffer_size             INTEGER,
+    normalizing_const       REAL,
+    num_classes             INTEGER
+) RETURNS VOID AS $$
+  SELECT MADLIB_SCHEMA.training_preprocessor_dl($1, $2, $3, $4, $5, $6, $7, NULL);
+$$ LANGUAGE sql VOLATILE
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.training_preprocessor_dl(
+    source_table            VARCHAR,
+    output_table            VARCHAR,
+    dependent_varname       VARCHAR,
+    independent_varname     VARCHAR,
+    buffer_size             INTEGER,
     normalizing_const       REAL
 ) RETURNS VOID AS $$
-  SELECT MADLIB_SCHEMA.training_preprocessor_dl($1, $2, $3, $4, $5, $6, NULL);
+  SELECT MADLIB_SCHEMA.training_preprocessor_dl($1, $2, $3, $4, $5, $6, NULL, NULL);
 $$ LANGUAGE sql VOLATILE
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 
@@ -784,7 +953,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.training_preprocessor_dl(
     independent_varname     VARCHAR,
     buffer_size             INTEGER
 ) RETURNS VOID AS $$
-  SELECT MADLIB_SCHEMA.training_preprocessor_dl($1, $2, $3, $4, $5, 1.0, NULL);
+  SELECT MADLIB_SCHEMA.training_preprocessor_dl($1, $2, $3, $4, $5, 1.0, NULL, NULL);
 $$ LANGUAGE sql VOLATILE
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 
@@ -794,7 +963,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.training_preprocessor_dl(
     dependent_varname       VARCHAR,
     independent_varname     VARCHAR
 ) RETURNS VOID AS $$
-  SELECT MADLIB_SCHEMA.training_preprocessor_dl($1, $2, $3, $4, NULL, 1.0, NULL);
+  SELECT MADLIB_SCHEMA.training_preprocessor_dl($1, $2, $3, $4, NULL, 1.0, NULL, NULL);
 $$ LANGUAGE sql VOLATILE
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_fit_multiple_model.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_fit_multiple_model.py_in
index 6e3389c..4ce21dd 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_fit_multiple_model.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_fit_multiple_model.py_in
@@ -167,8 +167,8 @@ class FitMultipleModel():
                 self.run_training()
                 if mst_idx == (total_msts - 1):
                     end_iteration = time.time()
-                    self.info_str = "\tTime for training in iteration {0}: {1} sec\n".format(iter,
-                                                                                      end_iteration - start_iteration)
+                    self.info_str = "\tTime for training in iteration {0}: {1} sec\n".format(
+                        iter, end_iteration - start_iteration)
             self.info_str += "\tTraining set after iteration {0}:".format(iter)
             self.evaluate_model(iter, self.source_table, True)
             if self.validation_table:
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in
index 2621670..70761d5 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in
@@ -50,6 +50,7 @@ FLOAT32_SQL_TYPE = 'REAL'
 SMALLINT_SQL_TYPE = 'SMALLINT'
 
 DEFAULT_NORMALIZING_CONST = 1.0
+DEFAULT_GPU_CONFIG = 'all_segments'
 
 #####################################################################
 
diff --git a/src/ports/postgres/modules/deep_learning/test/input_data_preprocessor.sql_in b/src/ports/postgres/modules/deep_learning/test/input_data_preprocessor.sql_in
index 4a0ede6..93b4c52 100644
--- a/src/ports/postgres/modules/deep_learning/test/input_data_preprocessor.sql_in
+++ b/src/ports/postgres/modules/deep_learning/test/input_data_preprocessor.sql_in
@@ -112,6 +112,61 @@ SELECT training_preprocessor_dl(
 SELECT assert(count(*)=(SELECT ceil(17.0/count(*)) from gp_segment_configuration WHERE role = 'p' and content != -1), 'Even distribution of buffers failed.')
 FROM data_preprocessor_input_batch
 WHERE gp_segment_id = 0;
+SELECT assert(__internal_gpu_config__ = 'all_segments', 'Missing column in summary table')
+FROM data_preprocessor_input_batch_summary;
+
+-- Test validation data is evenly distributed across all segments (GPDB only)
+DROP TABLE IF EXISTS validation_out, validation_out_summary;
+SELECT validation_preprocessor_dl(
+  'data_preprocessor_input',
+  'validation_out',
+  'id',
+  'x',
+  'data_preprocessor_input_batch',
+  1);
+SELECT assert(count(*)=(SELECT ceil(17.0/count(*)) from gp_segment_configuration WHERE role = 'p' and content != -1), 'Even distribution of validation buffers failed.')
+FROM validation_out
+WHERE gp_segment_id = 0;
+SELECT assert(__internal_gpu_config__ = 'all_segments', 'Missing column in validation summary table')
+FROM validation_out_summary;
+
+-- Test data distributed on specified segments
+DROP TABLE IF EXISTS segments_to_use;
+CREATE TABLE segments_to_use (dbid INTEGER, notes TEXT);
+INSERT INTO segments_to_use VALUES (2, 'GPU segment');
+DROP TABLE IF EXISTS data_preprocessor_input_batch, data_preprocessor_input_batch_summary;
+SELECT training_preprocessor_dl(
+  'data_preprocessor_input',
+  'data_preprocessor_input_batch',
+  'id',
+  'x',
+  1,
+  NULL,
+  NULL,
+  'segments_to_use');
+SELECT assert(count(DISTINCT(gp_segment_id)) = 1, 'Fail to distribute data on segment0')
+FROM data_preprocessor_input_batch;
+SELECT assert(count(*) = 17, 'Fail to distribute all data on segment0')
+FROM data_preprocessor_input_batch;
+SELECT assert(__internal_gpu_config__ = ARRAY[0], 'Invalid column value in summary table')
+FROM data_preprocessor_input_batch_summary;
+
+-- Test data distributed on specified segments for validation_preprocessor_dl
+DROP TABLE IF EXISTS validation_out, validation_out_summary;
+SELECT validation_preprocessor_dl(
+  'data_preprocessor_input',
+  'validation_out',
+  'id',
+  'x',
+  'data_preprocessor_input_batch',
+  1,
+  'segments_to_use');
+SELECT assert(count(DISTINCT(gp_segment_id)) = 1, 'Failed to distribute validation data on segment0')
+FROM validation_out;
+SELECT assert(count(*) = 17, 'Fail to distribute all validation data on segment0')
+FROM validation_out;
+SELECT assert(__internal_gpu_config__ = ARRAY[0], 'Invalid column value in validation summary table')
+FROM validation_out_summary;
 !>)
 
 DROP TABLE IF EXISTS data_preprocessor_input;
@@ -168,7 +223,8 @@ SELECT assert
         buffer_size         = 4 AND  -- we sort the class values in python
         normalizing_const   = 5 AND
         pg_typeof(normalizing_const) = 'real'::regtype AND
-        num_classes         = 16,
+        num_classes         = 16 AND
+        distribution_rules  = 'all_segments',
         'Summary Validation failed. Actual:' || __to_char(summary)
         ) from (select * from data_preprocessor_input_batch_summary) summary;
 
diff --git a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_input_data_preprocessor.py_in b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_input_data_preprocessor.py_in
index 8358697..f21176c 100644
--- a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_input_data_preprocessor.py_in
+++ b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_input_data_preprocessor.py_in
@@ -54,6 +54,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
         self.default_buffer_size = 5
         self.default_normalizing_const = 1.0
         self.default_num_classes = None
+        self.default_distribution_rules = "all_segments"
         self.default_module_name = "dummy"
 
         import deep_learning.input_data_preprocessor
@@ -78,6 +79,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
             self.default_buffer_size,
             self.default_normalizing_const,
             self.default_num_classes,
+            self.default_distribution_rules,
             self.default_module_name)
         preprocessor_obj.input_preprocessor_dl()
 
@@ -92,6 +94,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
             None,
             self.default_normalizing_const,
             self.default_num_classes,
+            self.default_distribution_rules,
             self.default_module_name)
         self.util_module.MiniBatchBufferSizeCalculator.calculate_default_buffer_size = Mock(return_value = 5)
         preprocessor_obj.input_preprocessor_dl()
@@ -108,6 +111,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
                 self.default_buffer_size,
                 self.default_normalizing_const,
                 self.default_num_classes,
+                self.default_distribution_rules,
                 self.default_module_name)
 
     def test_input_preprocessor_multiple_indep_var_raises_exception(self):
@@ -122,6 +126,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
                 self.default_buffer_size,
                 self.default_normalizing_const,
                 self.default_num_classes,
+                self.default_distribution_rules,
                 self.default_module_name)
 
     def test_input_preprocessor_buffer_size_zero_fails(self):
@@ -136,6 +141,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
                 0,
                 self.default_normalizing_const,
                 self.default_num_classes,
+                self.default_distribution_rules,
                 self.default_module_name)
 
     def test_input_preprocessor_negative_buffer_size_fails(self):
@@ -149,6 +155,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
                                               -1,
                                               self.default_normalizing_const,
                                               self.default_num_classes,
+                                              self.default_distribution_rules,
                                               self.default_module_name)
 
     def test_input_preprocessor_invalid_indep_vartype_raises_exception(self):
@@ -162,6 +169,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
                                                 self.default_buffer_size,
                                                 self.default_normalizing_const,
                                                 self.default_num_classes,
+                                                self.default_distribution_rules,
                                                 self.default_module_name)
 
     def test_input_preprocessor_invalid_dep_vartype_raises_exception(self):
@@ -175,6 +183,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
                                                 self.default_buffer_size,
                                                 self.default_normalizing_const,
                                                 self.default_num_classes,
+                                                self.default_distribution_rules,
                                                 self.default_module_name)
 
     def test_input_preprocessor_normalizing_const_zero_fails(self):
@@ -188,6 +197,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
                                                 self.default_buffer_size,
                                                 0,
                                                 self.default_num_classes,
+                                                self.default_distribution_rules,
                                                 self.default_module_name)
 
     def test_input_preprocessor_negative_normalizing_const_fails(self):
@@ -201,6 +211,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
                                                 self.default_buffer_size,
                                                 -1,
                                                 self.default_num_classes,
+                                                self.default_distribution_rules,
                                                 self.default_module_name)
 
     def test_get_one_hot_encoded_dep_var_expr_null_val(self):
@@ -215,6 +226,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
             self.default_buffer_size,
             self.default_normalizing_const,
             self.default_num_classes,
+            self.default_distribution_rules,
             self.default_module_name)
         obj.dependent_levels = ["NULL", "'a'"]
         dep_var_array_expr = obj.get_one_hot_encoded_dep_var_expr()
@@ -234,6 +246,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
             self.default_buffer_size,
             self.default_normalizing_const,
             self.default_num_classes,
+            self.default_distribution_rules,
             self.default_module_name)
         dep_var_array_expr = obj.get_one_hot_encoded_dep_var_expr()
         self.assertEqual("{0}::smallint[]".
@@ -251,6 +264,7 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
             self.default_buffer_size,
             self.default_normalizing_const,
             None,
+            self.default_distribution_rules,
             self.default_module_name)
         obj.dependent_levels = ["dummy"]
         self.assertEqual(0, obj.padding_size)
@@ -266,7 +280,8 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
             self.default_ind_var,
             self.default_buffer_size,
             self.default_normalizing_const,
-            5)
+            5,
+            self.default_distribution_rules)
         obj._set_one_hot_encoding_variables()
         self.assertEqual(2, obj.padding_size)
 
@@ -281,10 +296,79 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
             self.default_ind_var,
             self.default_buffer_size,
             self.default_normalizing_const,
-            2)
+            2,
+            self.default_distribution_rules)
         with self.assertRaises(plpy.PLPYException):
             obj._set_one_hot_encoding_variables()
 
+    def test_validate_distribution_table(self):
+        self.module.get_expr_type = Mock(side_effect = ['integer[]', 'integer[]'])
+
+        obj = self.module.InputDataPreprocessorDL(self.default_schema_madlib,
+            self.default_source_table,
+            self.default_output_table,
+            self.default_dep_var,
+            self.default_ind_var,
+            self.default_buffer_size,
+            self.default_normalizing_const,
+            self.default_num_classes,
+            self.default_distribution_rules,
+            self.default_module_name)
+        self.module.input_tbl_valid = Mock()
+        self.module.is_var_valid = Mock()
+        self.plpy_mock_execute.side_effect = [
+            [{'dbids': [2,3,4]}],
+            [{'dbids': [3,4], 'c1': 2, 'c2': 2}]
+            ]
+
+        obj._validate_distribution_table()
+
+    def test_validate_distribution_table_dup(self):
+        self.module.get_expr_type = Mock(side_effect = ['integer[]', 'integer[]'])
+
+        obj = self.module.InputDataPreprocessorDL(self.default_schema_madlib,
+            self.default_source_table,
+            self.default_output_table,
+            self.default_dep_var,
+            self.default_ind_var,
+            self.default_buffer_size,
+            self.default_normalizing_const,
+            self.default_num_classes,
+            self.default_distribution_rules,
+            self.default_module_name)
+        self.module.input_tbl_valid = Mock()
+        self.module.is_var_valid = Mock()
+        self.plpy_mock_execute.side_effect = [
+            [{'dbids': [2,3,4]}],
+            [{'dbids': [3,3], 'c1': 2, 'c2': 1}]
+            ]
+        with self.assertRaises(plpy.PLPYException) as error:
+            obj._validate_distribution_table()
+        self.assertIn('duplicate', str(error.exception).lower())
+
+    def test_validate_distribution_table_invalid(self):
+        self.module.get_expr_type = Mock(side_effect = ['integer[]', 'integer[]'])
+        obj = self.module.InputDataPreprocessorDL(self.default_schema_madlib,
+            self.default_source_table,
+            self.default_output_table,
+            self.default_dep_var,
+            self.default_ind_var,
+            self.default_buffer_size,
+            self.default_normalizing_const,
+            self.default_num_classes,
+            self.default_distribution_rules,
+            self.default_module_name)
+        self.module.input_tbl_valid = Mock()
+        self.module.is_var_valid = Mock()
+        self.plpy_mock_execute.side_effect = [
+            [{'dbids': [2,3,4]}],
+            [{'dbids': [3,30], 'c1': 2, 'c2': 2}]
+            ]
+        with self.assertRaises(plpy.PLPYException) as error:
+            obj._validate_distribution_table()
+        self.assertIn('invalid', str(error.exception).lower())
+
+
 if __name__ == '__main__':
     unittest.main()