You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by kh...@apache.org on 2020/02/11 19:49:36 UTC

[madlib] branch master updated: DL: Avoid constant folding of weights in GPDB6 plan

This is an automated email from the ASF dual-hosted git repository.

khannaekta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git


The following commit(s) were added to refs/heads/master by this push:
     new fc81374  DL: Avoid constant folding of weights in GPDB6 plan
fc81374 is described below

commit fc81374d4d280ab4454150cb126137775e0f7ae6
Author: Ekta Khanna <ek...@pivotal.io>
AuthorDate: Thu Jan 30 17:49:29 2020 -0800

    DL: Avoid constant folding of weights in GPDB6 plan
    
    JIRA: MADLIB-1405
    
    For versions >=GPDB6, previously, for queries called with the initial
    weights value passed in, the query plan for it would have create custom
    plans with weights embedded in the plan itself.  This meant that the
    query plan size would also include the size of these weights, bloating
    it up to hit the 1GB limit when dispatching the query plan to segments,
    leading to OOM for large weights.
    
    In GPDB, for PREPARE plans, there is a threshold of 5 attempts to create
    custom plans(constant folding the passed in params) for execution and
    then it uses a generic plan(not constant folding the passed in params)
    for all the subsequent executions.  Therefore, to avoid GPDB6 from
    creating custom plans when passing in weights, the queries(with weights)
    is executed with DUMMY weights for 5 time, prior to calling it with the
    actual weights.
    
    Co-authored-by: Nikhil Kak <nk...@pivotal.io>
---
 .../modules/deep_learning/madlib_keras.py_in       | 41 +++++++++++++++-------
 .../madlib_keras_fit_multiple_model.py_in          | 31 +++++++++-------
 .../deep_learning/madlib_keras_helper.py_in        | 25 +++++++++++++
 .../test/unit_tests/test_madlib_keras.py_in        | 23 +++++++++++-
 4 files changed, 94 insertions(+), 26 deletions(-)

diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
index 7502a6a..01f9152 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
@@ -168,6 +168,8 @@ def fit(schema_madlib, source_table, model, model_arch_table,
         FROM {source_table}
         """.format(**locals()), ["bytea", "boolean"])
 
+    prepare_generic_plan(run_training_iteration, [DUMMY_WEIGHTS, False])
+
     # Define the state for the model and loss/metric storage lists
     training_loss, training_metrics, metrics_elapsed_time = [], [], []
     metrics_iters = []
@@ -313,11 +315,20 @@ def fit(schema_madlib, source_table, model, model_arch_table,
                  [compile_params, fit_params, name,
                   description, metrics_elapsed_time, class_values])
 
-    create_output_table = plpy.prepare("""
-        CREATE TABLE {0} AS SELECT
-        $1 as model_weights,
-        $2 as {1}""".format(model, ModelArchSchema.MODEL_ARCH), ["bytea", "json"])
-    plpy.execute(create_output_table, [serialized_weights, model_arch])
+    plpy.execute("""
+        CREATE TABLE {0}
+        (model_weights bytea,
+        {1} json)""".format(model, ModelArchSchema.MODEL_ARCH))
+    insert_output_table = plpy.prepare("""
+        INSERT INTO {0} SELECT model_weights, {1}
+        FROM (VALUES($1, $2))t(model_weights, {1})
+        """.format(model, ModelArchSchema.MODEL_ARCH), ["bytea", "json"])
+    ## prepare generic plan for GPDB6 insert query
+    if is_platform_gp6():
+        for i in range(1, 6):
+            plpy.execute(insert_output_table, [DUMMY_WEIGHTS, DUMMY_JSON])
+            plpy.execute("TRUNCATE TABLE {0}".format(model))
+    plpy.execute(insert_output_table, [serialized_weights, model_arch])
 
     #TODO add a unit test for this in a future PR
     reset_cuda_env(original_cuda_env)
@@ -475,7 +486,7 @@ def fit_transition(state, dependent_var, independent_var, dependent_var_shape,
         b. keras session is cleared at the end of the final iteration,
         i.e, last row of last iteration.
     """
-    if not independent_var or not dependent_var:
+    if not independent_var or not dependent_var or prev_serialized_weights==DUMMY_WEIGHTS:
         return state
     SD = kwargs['SD']
     device_name = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[current_seg_id], current_seg_id)
@@ -664,11 +675,8 @@ def get_loss_metric_from_keras_eval(schema_madlib, table, compile_params,
         MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL, "_shape")
     ind_shape_col = add_postfix(
         MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL, "_shape")
-    """
-    This function will call the internal keras evaluate function to get the loss
-    and accuracy of each tuple which then gets averaged to get the final result.
-    """
     use_gpus = use_gpus if use_gpus else False
+
     evaluate_query = plpy.prepare("""
         select ({schema_madlib}.internal_keras_evaluate(
                                             {mb_dep_var_col},
@@ -685,11 +693,12 @@ def get_loss_metric_from_keras_eval(schema_madlib, table, compile_params,
                                             ARRAY{images_per_seg},
                                             {use_gpus}::BOOLEAN,
                                             ARRAY{accessible_gpus_for_seg},
-                                            {is_final_iteration}
+                                            $2
                                             )) as loss_metric
         from {table}
-        """.format(**locals()), ["bytea"])
-    res = plpy.execute(evaluate_query, [serialized_weights])
+        """.format(**locals()),["bytea", "boolean"])
+    prepare_generic_plan(evaluate_query, [DUMMY_WEIGHTS, False])
+    res = plpy.execute(evaluate_query, [serialized_weights, is_final_iteration])
     loss_metric = res[0]['loss_metric']
     return loss_metric
 
@@ -701,6 +710,8 @@ def internal_keras_eval_transition(state, dependent_var, independent_var,
                                    segments_per_host, images_per_seg,
                                    use_gpus, accessible_gpus_for_seg,
                                    is_final_iteration, **kwargs):
+    if serialized_weights == DUMMY_WEIGHTS:
+        return None
     SD = kwargs['SD']
     device_name = get_device_name_and_set_cuda_env(accessible_gpus_for_seg[current_seg_id], current_seg_id)
     agg_loss, agg_metric, agg_image_count = state
@@ -780,6 +791,10 @@ def internal_keras_eval_merge(state1, state2, **kwargs):
     return merged_state
 
 def internal_keras_eval_final(state, **kwargs):
+    # Return if called early
+    if not state or state == [0,0,0]:
+        return state
+
     loss, metric, image_count = state
 
     if image_count == 0:
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_fit_multiple_model.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_fit_multiple_model.py_in
index ae577e8..97c3c60 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_fit_multiple_model.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_fit_multiple_model.py_in
@@ -221,8 +221,7 @@ class FitMultipleModel():
                 self.mst_key_col, mst[self.mst_key_col])
             model_arch, _ = get_model_arch_weights(self.model_arch_table, mst[self.model_id_col])
             _, metric, loss = compute_loss_and_metrics(
-                self.schema_madlib, table, "$madlib${0}$madlib$".format(
-                    mst[self.compile_params_col]),
+                self.schema_madlib, table, "$madlib${0}$madlib$".format(mst[self.compile_params_col]),
                 model_arch,
                 weights,
                 self.use_gpus,
@@ -294,6 +293,23 @@ class FitMultipleModel():
                                          {self.model_arch_col} JSON)
                                         """.format(self=self)
             plpy.execute(output_table_create_query)
+        output_table_insert_query = """
+                            INSERT INTO {self.model_output_table}(
+                                 {self.mst_key_col}, {self.model_weights_col},
+                                 {self.model_arch_col})
+                              SELECT v1,v2,v3 from (VALUES ($1, $2, $3))t(v1,v2,v3)
+                                """.format(self=self)
+        output_table_insert_query_prepared = plpy.prepare(
+             output_table_insert_query, ["int", "bytea", "json"])
+
+        ## prepare generic plan for GPDB6 insert query
+        if is_platform_gp6():
+            for i in range(1, 6):
+                plpy.execute(output_table_insert_query_prepared, [0, DUMMY_WEIGHTS, DUMMY_JSON])
+                plpy.execute("""
+                                DELETE FROM {self.model_output_table}
+                                WHERE {self.mst_key_col}=0
+                             """.format(self=self))
 
         info_table_create_query = """
                                   CREATE TABLE {self.model_info_table}
@@ -361,17 +377,8 @@ class FitMultipleModel():
             plpy.execute(info_table_insert_query)
 
             if not mst['mst_key'] in warm_start_msts:
-                output_table_insert_query = """
-                                    INSERT INTO {self.model_output_table}(
-                                        {self.mst_key_col}, {self.model_weights_col},
-                                        {self.model_arch_col})
-                                    VALUES ({mst_key}, $1, $2)
-                                       """.format(self=self,
-                                                  mst_key=mst[self.mst_key_col])
-                output_table_insert_query_prepared = plpy.prepare(
-                    output_table_insert_query, ["bytea", "json"])
                 plpy.execute(output_table_insert_query_prepared, [
-                             serialized_weights, model_arch])
+                    mst[self.mst_key_col], serialized_weights, model_arch])
 
     def create_model_summary_table(self):
         if self.warm_start:
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in
index 5be078b..27736ac 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in
@@ -19,6 +19,7 @@
 
 import numpy as np
 from model_arch_info import ModelArchSchema
+from utilities.utilities import __mad_version
 from utilities.utilities import add_postfix
 from utilities.utilities import unique_string
 from utilities.utilities import is_platform_pg
@@ -53,6 +54,8 @@ SMALLINT_SQL_TYPE = 'SMALLINT'
 DEFAULT_NORMALIZING_CONST = 1.0
 GP_SEGMENT_ID_COLNAME = "gp_segment_id"
 INTERNAL_GPU_CONFIG = '__internal_gpu_config__'
+DUMMY_WEIGHTS = 'DUMMY'
+DUMMY_JSON = '{"a": "DUMMY"}'
 
 #####################################################################
 
@@ -314,3 +317,25 @@ def get_accessible_gpus_for_seg(schema_madlib, segments_per_host, module_name):
                     'recommended configuration is to have 1 GPU available per segment.')
                 warning_flag = False
         return accessible_gpus_for_seg
+
+def is_platform_gp6():
+    version_wrapper = __mad_version()
+    return not is_platform_pg() and not version_wrapper.is_gp_version_less_than('6.0')
+
+def prepare_generic_plan(query_plan, query_params):
+    # For >=GPDB6, previously, when the queries called with the
+    # initial weights value passed in, the query plan for it would
+    # create custom plans with weights embedded in the plan itself.
+    # This meant that the query plan size would also include the size
+    # of these weights, bloating it up to hit the 1GB limit when dispatching
+    # the query plan to segments, leading to OOM for large weights.
+    # In GPDB, for PREPARE plans, there is a threshold of 5 attempts to create
+    # custom plans(constant folding the passed in params) for execution and then
+    # it uses a generic plan(not constant folding the passed in params) for all
+    # the subsequent executions.
+    # Therefore, to avoid GPDB6 from creating custom plans when passing in
+    # weights, the query is executed passing in DUMMY weights for 5
+    # time, prior to calling it with the actual weights.
+    if is_platform_gp6():
+        for i in range(1, 6):
+            plpy.execute(query_plan, query_params)
diff --git a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
index 3714de5..a9cf865 100644
--- a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
@@ -1371,6 +1371,27 @@ class MadlibKerasHelperTestCase(unittest.TestCase):
             self.subject.get_accessible_gpus_for_seg('schema_madlib', 2, 'foo')
         self.assertIn('no gpus configured on hosts', str(error.exception).lower())
 
+    def test_is_platform_gp6_input_gpdb6(self):
+
+        self.subject.is_platform_pg = Mock(return_value = False)
+
+        self.plpy_mock_execute.side_effect = [[{ 'version':'PostgreSQL 9.4.24 (Greenplum Database 6.3.0 build commit:aabd)'}]]
+        self.assertTrue(self.subject.is_platform_gp6())
+
+    def test_is_platform_gp6_input_gpdb5(self):
+
+        self.subject.is_platform_pg = Mock(return_value = False)
+
+        self.plpy_mock_execute.side_effect = [[{ 'version':'PostgreSQL 8.3.23 (Greenplum Database 5.24.0 build commit:bdca)'}]]
+        self.assertFalse(self.subject.is_platform_gp6())
+
+    def test_is_platform_gp6_input_pg(self):
+
+        self.subject.is_platform_pg = Mock(return_value = True)
+
+        self.plpy_mock_execute.side_effect = [[{ 'version':'PostgreSQL 10.7'}]]
+        self.assertFalse(self.subject.is_platform_gp6())
+
 class MadlibKerasEvaluationTestCase(unittest.TestCase):
     def setUp(self):
         self.plpy_mock = Mock(spec='error')
@@ -1700,7 +1721,7 @@ class MadlibKerasEvaluationTestCase(unittest.TestCase):
         self.assertEqual(result, None)
 
     def test_internal_keras_eval_final_image_count_zero(self):
-        input_state = [0, 0, 0]
+        input_state = [1, 1, 0]
 
         with self.assertRaises(plpy.PLPYException):
             result = self.subject.internal_keras_eval_final(input_state)