You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@madlib.apache.org by "Domino Valdano (JIRA)" <ji...@apache.org> on 2019/06/21 20:02:00 UTC

[jira] [Comment Edited] (MADLIB-1345) DL: Performance improvement in DL functions

    [ https://issues.apache.org/jira/browse/MADLIB-1345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869840#comment-16869840 ] 

Domino Valdano edited comment on MADLIB-1345 at 6/21/19 8:01 PM:
-----------------------------------------------------------------

The description of this story seems pretty specific to GD, but another idea I think would be worth exploring is using a UDF instead of a UDA so that we don't have to worry about passing state around, and so that keras can handle all of the batching (ie, we just have 1 image per row for our input to fit, rather than making our own buffers.)

Eliminating the need for buffers would avoid there being partial batches when keras's minibatch size does not go evenly into the buffer size, making a more even workload.

Here is the strategy that I think should work:
 # master {{fit()}} function runs a sql query similar to the current one, {{SELECT keras_step(a.independent_var, a.dependent_var, initial_weights) FROM input_table as a;}}
 # {{keras_step()}} is defined as a UDF that has {{RETURNS SETOF REAL[]}} in its definition. (There is no state, merge, or final function.)
 # {{keras_step()}} runs on each segment on each row of that segment. But instead of running {{keras_fit()}} every time it's called, it just stores all of the independent and dependent vars in a huge buffer, that can grow to much larger than 1GB. This buffer can be saved in SD the first time it runs, and then appended to for each additional row. When it reaches the last row (which we already detect), it runs {{keras_fit()}} on the entire buffer of all images on the segment, and keras will handle the minibatching.
 # At the end of the {{keras_step()}} function, it returns one of two things:

{{if (last_row):}}

{{    return [weights]}}

else:

{{    return []}}
 # The result of the plpy.execute() function run on master fit() will have N rows, where N is the number of segments... because only the last row of each segment returns an actual value. These N rows will contain the final weights after training on each segment.
 # {{fit()}} averages these N weights together, weighted by how many images there are on each segment, and then uses the resulting weights to kick off the next iteration, with another call to {{keras_step()}}

This method is simpler in some ways than the UDA, and probably more efficient as no state needs to be passed around by Greenplum, and {{keras_fit()}} is only called once on each segment so it can handle all batching itself.

This performance improvement is complementary to the GD improvement, so we could combine the two.  By using GD, we would avoid the need for the weights to be passed the first time. And by using a UDF, we would avoid passing them around in the state between each call to {{fit_transition().  Instead, they just get set once at the beginning, and returned once after the whole iteration is done.}}


was (Author: dvaldano):
The description of this story seems pretty specific to GD, but another idea I think would be worth exploring is using a UDF instead of a UDA so that we don't have to worry about passing state around, and so that keras can handle all of the batching (ie, we just have 1 image per row for our input to fit, rather than making our own buffers.)

Eliminating the need for buffers would avoid there being partial batches when keras's minibatch size does not go evenly into the buffer size, making a more even workload.

Here is the strategy that I think should work:
 # master {{fit()}} function runs a sql query similar to the current one, {{SELECT keras_step(a.independent_var, a.dependent_var, initial_weights) FROM input_table as a;}}
 # {{keras_step()}} is defined as a UDF that has {{RETURNS SETOF REAL[]}} in its definition. (There is no state, merge, or final function.)
 # {{keras_step()}} runs on each segment on each row of that segment. But instead of running {{keras_fit()}} every time it's called, it just stores all of the independent and dependent vars in a huge buffer, that can grow to much larger than 1GB. This buffer can be saved in SD the first time it runs, and then appended to for each additional row. When it reaches the last row (which we already detect), it runs {{keras_fit()}} on the entire buffer of all images on the segment, and keras will handle the minibatching.
 # At the end of the {{keras_step()}} function, it returns one of two things:

{{if (last_row):}}

{{    return [weights]}}

{{ else: }}

{{    return []}}
 # The result of the plpy.execute() function run on master fit() will have N rows, where N is the number of segments... because only the last row of each segment returns an actual value. These N rows will contain the final weights after training on each segment.
 # {{fit()}} averages these N weights together, weighted by how many images there are on each segment, and then uses the resulting weights to kick off the next iteration, with another call to {{keras_step()}}

This method is simpler in some ways than the UDA, and probably more efficient as no state needs to be passed around by Greenplum, and {{keras_fit()}} is only called once on each segment so it can handle all batching itself.

This performance improvement is complementary to the GD improvement, so we could combine the two.  By using GD, we would avoid the need for the weights to be passed the first time. And by using a UDF, we would avoid passing them around in the state between each call to {{fit_transition().  Instead, they just get set once at the beginning, and returned once after the whole iteration is done.}}

> DL: Performance improvement in DL functions
> -------------------------------------------
>
>                 Key: MADLIB-1345
>                 URL: https://issues.apache.org/jira/browse/MADLIB-1345
>             Project: Apache MADlib
>          Issue Type: Improvement
>          Components: Deep Learning
>            Reporter: Ekta Khanna
>            Priority: Major
>             Fix For: v2.0
>
>
> Currently, we pass around model_data, model_arch, etc.. for each buffer/image for fit(), predict() and evaluate(). This causes a lot of overhead and slows down the query considerable.
> We tried to set model_data and model_arch using GD for predict. Following were the runtimes:
> with GD
> ~707 sec(with CPU) - 50K places10_20seg
> without GD
> ~1650 sec(with CPU) - 50K places10_20seg
> Below is the patch for GD changes:
> {code}
> def set_predict_GD(model_architecture, model_data,
>                            is_response, normalizing_const, seg_ids,
>                            images_per_seg, gpus_per_host, segments_per_host,
>                            **kwargs):
>     GD = kwargs['GD']
>     GD['model_architecture'] = model_architecture
>     GD['model_data'] = model_data
>     GD['is_response'] = is_response
>     GD['normalizing_const'] = normalizing_const
>     #GD['current_seg_id'] = current_seg_id
>     GD['seg_ids'] = seg_ids
>     GD['images_per_seg'] = images_per_seg
>     GD['gpus_per_host'] = gpus_per_host
>     GD['segments_per_host'] = segments_per_host
> def predict()
> ....
> set_gd_query=plpy.prepare("""
>            SELECT set_predict_GD
>             ($MAD${model_arch}$MAD$,
>             $1,
>             {is_response},
>             {normalizing_const},
>             -- gp_segment_id,
>             ARRAY{seg_ids_test},
>             ARRAY{images_per_seg_test},
>             {gpus_per_host},
>             {segments_per_host}
>             ) from gp_dist_random('gp_id')
>             """.format(**locals()), ["bytea"]) #Using gp_dist_random('gp_id')  in the query makes the UDF run on each segment
> plpy.execute(set_gd_query, [model_data])
> predict_query = plpy.execute("""
>     CREATE TABLE {output_table} AS
>     SELECT {id_col}, {prediction_select_clause}
>     FROM (
>         SELECT {test_table}.{id_col},
>                ({schema_madlib}.internal_keras_predict
>                    ({independent_varname}, {gp_segment_id_col})
>                ) AS {intermediate_col}
>     FROM {test_table}
>     ) q distributed by ({id_col})
>     """.format(**locals()))
> def internal_keras_predict(independent_var, current_seg_id, **kwargs):
>     start = time.time()
>     SD = kwargs['SD']
>     GD = kwargs['GD']
>     is_response = GD['is_response']
>     normalizing_const = GD['normalizing_const']
>     #current_seg_id = GD['current_seg_id']
>     seg_ids = GD['seg_ids']
>     images_per_seg = GD['images_per_seg']
>     gpus_per_host = GD['gpus_per_host']
>     segments_per_host = GD['segments_per_host']
>     device_name = get_device_name_and_set_cuda_env(gpus_per_host,
>                                                    current_seg_id)
>     ...
> {code}
> With the above changes , we found out that GD is not reliable for GPDB because of the following reasons:
> Consider a single node gpdb cluster with 3 segments
> Calling set_gd using gp_dist_random(), creates 1 process per seg and sets GD on each of these processes.
> seg1 - pid 100 - gd is set here for seg1
> seg2 - pid 200 - gd is set here for seg2
> seg3 - pid300- gd is set here for seg3
> Now, CREATE TABLE.. in predict(), spins up 2 processes per seg, (the old processes where GD was set) + 1 new process per seg.
> seg1 - pid 100 - gd is set here for seg1 (reused from before)
> seg1 - pid 101 - gd is read here for seg1
> seg2 - pid 200 - gd is set here for seg2 (reused from before)
> seg1 - pid 201 - gd is read here for seg2
> seg3 - pid300 - gd is set here for seg3 (reused from before)
> seg1 - pid 301- gd is read here for seg3
> This causes problems because , the processes where GD is read from is not same as the process where it was set.
> Couple of ways to avoid this problem
> # Change predict code to run two plpy execute queries, the first one being the internal predict query and the second one being the create table query.
> # Distribute the source table by the id column and while creating the predict output table use that id column as the distribution key.
> We are not sure if this is good enough for all use cases like what if the source table has an index which might do the same thing as the create table command. Our goal is to avoid the query from creating multiple processes.
> # Explore the GD option
> # Explore alternatives so that we don't have to pass the model data for every row/buffer/image in the transition function/udf



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)