You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by nj...@apache.org on 2019/04/22 22:50:26 UTC

[madlib] 02/02: DL: Add Postgres support

This is an automated email from the ASF dual-hosted git repository.

njayaram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git

commit 5a2cea7aaf0253d466f13e720f32b9533a2682ad
Author: Nandish Jayaram <nj...@apache.org>
AuthorDate: Wed Apr 17 11:11:06 2019 -0700

    DL: Add Postgres support
    
    JIRA: MADLIB-1311
    This commit decouples GPDB specific code to enable Postgres support too
    for Deep Learning modules.
    
    There is a weird failure happenning when `import keras` and
    `array_scalar_mult` are called one after another. We have opened
    MADLIB-1326 to track this issue.
    
    Closes #371
    Co-authored-by: Orhan Kislal <ok...@apache.org>
---
 .../modules/deep_learning/madlib_keras.py_in       | 90 +++++++++++++---------
 .../modules/deep_learning/madlib_keras.sql_in      |  2 +-
 .../deep_learning/madlib_keras_wrapper.py_in       |  6 +-
 .../modules/deep_learning/test/madlib_keras.sql_in | 39 ++++++++--
 4 files changed, 92 insertions(+), 45 deletions(-)

diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
index bbbcfb4..8d4e384 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
@@ -46,6 +46,7 @@ from keras_model_arch_table import Format
 
 from utilities.model_arch_info import get_input_shape
 from utilities.model_arch_info import get_num_classes
+from utilities.utilities import is_platform_pg
 from utilities.utilities import madlib_version
 from utilities.validate_args import get_col_value_and_type
 
@@ -102,39 +103,46 @@ def fit(schema_madlib, source_table, model, dependent_varname,
     validation_set_provided = bool(validation_table)
     validation_aggregate_accuracy = []; validation_aggregate_loss = []
 
-    # Compute total buffers on each segment
-    total_buffers_per_seg = plpy.execute(
-        """ SELECT gp_segment_id, count(*) AS total_buffers_per_seg
-            FROM {0}
-            GROUP BY gp_segment_id
-        """.format(source_table))
-    seg_nums = [int(each_buffer["gp_segment_id"])
-        for each_buffer in total_buffers_per_seg]
+    if is_platform_pg():
+        total_buffers_per_seg = plpy.execute(
+            """ SELECT count(*) AS total_buffers_per_seg
+                FROM {0}
+            """.format(source_table))
+        seg_nums = "[]::integer[]"
+        gp_segment_id_col = -1
+    else:
+        # Compute total buffers on each segment
+        total_buffers_per_seg = plpy.execute(
+            """ SELECT gp_segment_id, count(*) AS total_buffers_per_seg
+                FROM {0}
+                GROUP BY gp_segment_id
+            """.format(source_table))
+        seg_nums = [int(each_buffer["gp_segment_id"])
+            for each_buffer in total_buffers_per_seg]
+        # gp_segment_id is an implicit column in GPDB tables.
+        gp_segment_id_col = "gp_segment_id"
+
     total_buffers_per_seg = [int(each_buffer["total_buffers_per_seg"])
         for each_buffer in total_buffers_per_seg]
-
     # Prepare the SQL for running distributed training via UDA
     compile_params_to_pass = "$madlib$" + compile_params + "$madlib$"
     fit_params_to_pass = "$madlib$" + fit_params + "$madlib$"
     run_training_iteration = plpy.prepare("""
-        SELECT {0}.fit_step(
-            {1}::REAL[],
-            {2}::SMALLINT[],
-            gp_segment_id,
-            {3}::INTEGER,
-            ARRAY{4},
-            ARRAY{5},
-            $MAD${6}$MAD$::TEXT,
-            {7}::TEXT,
-            {8}::TEXT,
-            {9},
+        SELECT {schema_madlib}.fit_step(
+            {independent_varname}::REAL[],
+            {dependent_varname}::SMALLINT[],
+            {gp_segment_id_col},
+            {num_classes}::INTEGER,
+            ARRAY{seg_nums},
+            ARRAY{total_buffers_per_seg},
+            $MAD${model_arch}$MAD$::TEXT,
+            {compile_params_to_pass}::TEXT,
+            {fit_params_to_pass}::TEXT,
+            {use_gpu},
             $1
         ) AS iteration_result
-        FROM {10}
-        """.format(schema_madlib, independent_varname, dependent_varname,
-                   num_classes, seg_nums, total_buffers_per_seg, model_arch,
-                   compile_params_to_pass, fit_params_to_pass,
-                   use_gpu, source_table), ["bytea"])
+        FROM {source_table}
+        """.format(**locals()), ["bytea"])
 
     # Define the state for the model and loss/accuracy storage lists
     model_state = madlib_keras_serializer.serialize_weights(
@@ -167,13 +175,10 @@ def fit(schema_madlib, source_table, model, dependent_varname,
             _, _, _, updated_weights = madlib_keras_serializer.deserialize_weights(
                 model_state, model_shapes)
             master_model.set_weights(updated_weights)
-            evaluate_result = get_loss_acc_from_keras_eval(schema_madlib,
-                                                           validation_table,
-                                                           dependent_varname,
-                                                           independent_varname,
-                                                           compile_params_to_pass,
-                                                           model_arch, model_state,
-                                                           use_gpu)
+            evaluate_result = get_loss_acc_from_keras_eval(
+                schema_madlib, validation_table, dependent_varname,
+                independent_varname, compile_params_to_pass, model_arch,
+                model_state, use_gpu, gp_segment_id_col)
             if len(evaluate_result) < 2:
                 plpy.error('Calling evaluate on validation data returned < 2 '
                            'metrics. Expected metrics are loss and accuracy')
@@ -279,7 +284,7 @@ def fit(schema_madlib, source_table, model, dependent_varname,
 
 def get_loss_acc_from_keras_eval(schema_madlib, table, dependent_varname,
                                  independent_varname, compile_params, model_arch,
-                                 model_data, use_gpu):
+                                 model_data, use_gpu, gp_segment_id_col):
     """
     This function will call the internal keras evaluate function to get the loss
     and accuracy of each tuple which then gets averaged to get the final result.
@@ -299,7 +304,7 @@ def get_loss_acc_from_keras_eval(schema_madlib, table, dependent_varname,
                                             {independent_varname},
                                             $MAD${model_arch}$MAD$,
                                             $1, {compile_params},
-                                            {use_gpu}, gp_segment_id)) as loss_acc
+                                            {use_gpu}, {gp_segment_id_col})) as loss_acc
         from {table}
     ) q""".format(**locals()), ["bytea"])
     res = plpy.execute(evaluate_query, [model_data])
@@ -333,7 +338,16 @@ def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
 
     start_transition = time.time()
     SD = kwargs['SD']
-
+    is_pg = False
+    if current_seg_id == -1:
+        is_pg = True
+    if is_pg:
+        # This is postgres
+        total_buffers = total_buffers_per_seg[0]
+    else:
+        # This is GPDB
+        total_buffers = total_buffers_per_seg[all_seg_ids.index(
+            current_seg_id)]
     # Configure GPUs/CPUs
     device_name = get_device_name_and_set_cuda_env(use_gpu, current_seg_id)
 
@@ -378,14 +392,16 @@ def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
     with K.tf.device(device_name):
         updated_weights = segment_model.get_weights()
 
-    total_buffers = total_buffers_per_seg[all_seg_ids.index(current_seg_id)]
     if SD['buffer_count'] == total_buffers:
         if total_buffers == 0:
             plpy.error('total buffers is 0')
 
         agg_loss /= total_buffers
         agg_accuracy /= total_buffers
-        clear_keras_session()
+        if not is_pg:
+            # In GPDB, each segment would have a keras session, so clear
+            # them after the last buffer is processed.
+            clear_keras_session()
 
     new_model_state = madlib_keras_serializer.serialize_weights(
         agg_loss, agg_accuracy, SD['buffer_count'], updated_weights)
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
index 34bf2c2..295276b 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
@@ -163,7 +163,7 @@ CREATE AGGREGATE MADLIB_SCHEMA.fit_step(
 )(
     STYPE=BYTEA,
     SFUNC=MADLIB_SCHEMA.fit_transition,
-    PREFUNC=MADLIB_SCHEMA.fit_merge,
+    m4_ifdef(`__POSTGRESQL__', `', `prefunc=MADLIB_SCHEMA.fit_merge,')
     FINALFUNC=MADLIB_SCHEMA.fit_final
 );
 
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_wrapper.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_wrapper.py_in
index 4dd29d5..2e2250e 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_wrapper.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_wrapper.py_in
@@ -40,7 +40,11 @@ def get_device_name_and_set_cuda_env(use_gpu, seg):
     gpus_per_host = 4
     if use_gpu:
         device_name = '/gpu:0'
-        os.environ["CUDA_VISIBLE_DEVICES"] = str(seg % gpus_per_host)
+        if seg == -1:
+            cuda_visible_dev = ','.join([i for i in range(gpus_per_host)])
+        else:
+            cuda_visible_dev = str(seg % gpus_per_host)
+        os.environ["CUDA_VISIBLE_DEVICES"] = cuda_visible_dev
     else: # cpu only
         device_name = '/cpu:0'
         os.environ["CUDA_VISIBLE_DEVICES"] = '-1'
diff --git a/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in b/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in
index 115af9d..8b68aa9 100644
--- a/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in
+++ b/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in
@@ -18,7 +18,6 @@
  * under the License.
  *
  *//* ---------------------------------------------------------------------*/
-
 drop table if exists cifar_10_sample;
 create table cifar_10_sample(id INTEGER, y SMALLINT, imgpath TEXT, x  REAL[]);
 copy cifar_10_sample from stdin delimiter '|';
@@ -36,11 +35,40 @@ copy cifar_10_sample_val from stdin delimiter '|';
 -- TODO Calling this function makes keras.fit fail with the exception (investigate later)
 -- NOTICE:  Releasing segworker groups to finish aborting the transaction.
 -- ERROR:  could not connect to segment: initialization of segworker group failed (cdbgang.c:237)
---update cifar_10_sample_val SET independent_var = array_scalar_mult(independent_var::real[], (1/255.0)::real);
+-- update cifar_10_sample_val SET independent_var = array_scalar_mult(independent_var::real[], (1/255.0)::real);
+
+-- Prepare the minibatched data manually instead of calling
+-- minibatch_preprocessor_dl since it internally calls array_scalar_mult.
+-- Please refer to MADLIB-1326 for more details on the issue.
 
 DROP TABLE IF EXISTS cifar_10_sample_batched;
+CREATE TABLE cifar_10_sample_batched(
+    buffer_id smallint,
+    dependent_var integer[],
+    independent_var real[]);
+copy cifar_10_sample_batched from stdin delimiter '|';
+0|{{0,1},{1,0}}|{{{{0.792157,0.8,0.780392},{0.792157,0.8,0.780392},{0.8,0.807843,0.788235},{0.807843,0.815686,0.796079},{0.815686,0.823529,0.803922},{0.819608,0.827451,0.807843},{0.823529,0.831373,0.811765},{0.831373,0.839216,0.823529},{0.835294,0.843137,0.831373},{0.843137,0.85098,0.839216},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.843137},{0.843137,0.85098,0.839216},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.839216},{0.85098,0.858824,0.839216 [...]
+\.
+
 DROP TABLE IF EXISTS cifar_10_sample_batched_summary;
-SELECT minibatch_preprocessor_dl('cifar_10_sample','cifar_10_sample_batched','y','x', 2, 255);
+CREATE TABLE cifar_10_sample_batched_summary(
+    source_table text,
+    output_table text,
+    dependent_varname text,
+    independent_varname text,
+    dependent_vartype text,
+    class_values smallint[],
+    buffer_size integer,
+    normalizing_const numeric);
+INSERT INTO cifar_10_sample_batched_summary values (
+    'cifar_10_sample',
+    'cifar_10_sample_batched',
+    'y',
+    'x',
+    'smallint',
+    ARRAY[0,1],
+    2,
+    255.0);
 
 DROP TABLE IF EXISTS model_arch;
 SELECT load_keras_model('model_arch',
@@ -64,9 +92,8 @@ SELECT load_keras_model('model_arch',
 	{"class_name": "Zeros", "config": {}}, "units": 2, "use_bias": true, "activity_regularizer": null}
 	}], "backend": "tensorflow"}$$);
 
--- Please do not break up the compile_params string
--- It might break the assertion
-
+-- -- Please do not break up the compile_params string
+-- -- It might break the assertion
 DROP TABLE IF EXISTS keras_saved_out, keras_saved_out_summary;
 SELECT madlib_keras_fit(
     'cifar_10_sample_batched',