You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by nj...@apache.org on 2019/04/29 19:39:48 UTC
[madlib] 01/03: DL: Update madlib_keras_fit code to weight by
images instead of buffers.
This is an automated email from the ASF dual-hosted git repository.
njayaram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git
commit 918ce80c0a4f10f23ab8c5e2acfc622b5ba71a2b
Author: Domino Valdano <dv...@pivotal.io>
AuthorDate: Thu Apr 25 18:14:20 2019 -0700
DL: Update madlib_keras_fit code to weight by images instead of buffers.
JIRA: MADLIB-1310
Prior to this commit, while merging the weights, accuracy and loss, we
assume that all the buffers are of the same size. This may not be always
the case. Ideally we should use the total no of images per segment to
average the weights, accuracy and loss.
This commit, updates the transition, merge and final function for
`madlib_keras_fit()` to use the number of images on a segment instead of
the buffer count for averaging weights, accuracy and loss.
Additionally, the function `deserialize_weights()` when deserializing
loss and accuracy from state, type-casted it to an int, which resulted
in computing lower loss and accuracy while training. This commit also
addresses this issue.
Closes #378
Co-authored-by: Ekta Khanna <ek...@pivotal.io>
---
.../modules/deep_learning/madlib_keras.py_in | 131 +++++++++++++--------
.../modules/deep_learning/madlib_keras.sql_in | 2 +-
.../deep_learning/madlib_keras_serializer.py_in | 27 ++---
3 files changed, 98 insertions(+), 62 deletions(-)
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
index bd12f55..5bf215b 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
@@ -49,11 +49,18 @@ from utilities.model_arch_info import get_num_classes
from utilities.utilities import is_platform_pg
from utilities.utilities import madlib_version
from utilities.validate_args import get_col_value_and_type
+from utilities.validate_args import quote_ident
def fit(schema_madlib, source_table, model, dependent_varname,
independent_varname, model_arch_table, model_arch_id, compile_params,
fit_params, num_iterations, use_gpu = True,
validation_table=None, name="", description="", **kwargs):
+
+ source_table = quote_ident(source_table)
+ dependent_varname = quote_ident(dependent_varname)
+ independent_varname = quote_ident(independent_varname)
+ model_arch_table = quote_ident(model_arch_table)
+
fit_validator = FitInputValidator(
source_table, validation_table, model, model_arch_table,
dependent_varname, independent_varname, num_iterations)
@@ -82,12 +89,25 @@ def fit(schema_madlib, source_table, model, dependent_varname,
# about making the fit function easier to read and maintain.
if is_platform_pg():
set_keras_session(use_gpu)
+ # Compute total images in dataset
+ total_images_per_seg = plpy.execute(
+ """ SELECT SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg
+ FROM {1}
+ """.format(dependent_varname, source_table))
+ seg_ids_train = "[]::integer[]"
gp_segment_id_col = -1
else:
+ # Compute total images on each segment
+ total_images_per_seg = plpy.execute(
+ """ SELECT gp_segment_id, SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg
+ FROM {1}
+ GROUP BY gp_segment_id
+ """.format(dependent_varname, source_table))
+ seg_ids_train = [int(each_segment["gp_segment_id"])
+ for each_segment in total_images_per_seg]
gp_segment_id_col = 'gp_segment_id'
# Disable GPU on master for gpdb
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
- seg_ids_train, rows_per_seg_train = get_rows_per_seg_from_db(source_table)
if validation_table:
seg_ids_val, rows_per_seg_val = get_rows_per_seg_from_db(validation_table)
@@ -110,6 +130,9 @@ def fit(schema_madlib, source_table, model, dependent_varname,
validation_set_provided = bool(validation_table)
validation_aggregate_accuracy = []; validation_aggregate_loss = []
+ total_images_per_seg = [int(each_segment["total_images_per_seg"])
+ for each_segment in total_images_per_seg]
+
# Prepare the SQL for running distributed training via UDA
compile_params_to_pass = "$madlib$" + compile_params + "$madlib$"
fit_params_to_pass = "$madlib$" + fit_params + "$madlib$"
@@ -120,7 +143,7 @@ def fit(schema_madlib, source_table, model, dependent_varname,
{gp_segment_id_col},
{num_classes}::INTEGER,
ARRAY{seg_ids_train},
- ARRAY{rows_per_seg_train},
+ ARRAY{total_images_per_seg},
$MAD${model_arch}$MAD$::TEXT,
{compile_params_to_pass}::TEXT,
{fit_params_to_pass}::TEXT,
@@ -305,7 +328,7 @@ def get_rows_per_seg_from_db(table_name):
def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
- all_seg_ids, total_buffers_per_seg, architecture,
+ all_seg_ids, total_images_per_seg, architecture,
compile_params, fit_params, use_gpu, previous_state,
**kwargs):
@@ -317,7 +340,7 @@ def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
:param current_seg_id:
:param num_classes:
:param all_seg_ids:
- :param total_buffers_per_seg:
+ :param total_images_per_seg:
:param architecture:
:param compile_params:
:param fit_params:
@@ -344,13 +367,14 @@ def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
compile_and_set_weights(segment_model, compile_params, device_name,
previous_state, SD['model_shapes'])
SD['segment_model'] = segment_model
- SD['buffer_count'] = 0
+ image_count = 0
agg_loss = 0
agg_accuracy = 0
+ agg_image_count = 0
else:
segment_model = SD['segment_model']
#TODO we don't need to deserialize the weights here.
- agg_loss, agg_accuracy, _, _ = madlib_keras_serializer.deserialize_weights(
+ agg_loss, agg_accuracy, agg_image_count, _ = madlib_keras_serializer.deserialize_weights(
state, SD['model_shapes'])
# Prepare the data
@@ -367,79 +391,92 @@ def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
accuracy = history.history['acc'][0]
end_fit = time.time()
- # Re-serialize the weights
- # Update buffer count, check if we are done
- SD['buffer_count'] += 1
- agg_loss += loss
- agg_accuracy += accuracy
+ image_count = len(x_train)
+ # Aggregating number of images, loss and accuracy
+ agg_image_count += image_count
+ agg_loss += (image_count * loss)
+ agg_accuracy += (image_count * accuracy)
with K.tf.device(device_name):
updated_weights = segment_model.get_weights()
if is_platform_pg():
- total_buffers_per_seg = total_buffers_per_seg[0]
+ total_images = total_images_per_seg[0]
else:
- total_buffers_per_seg = total_buffers_per_seg[all_seg_ids.index(current_seg_id)]
- if total_buffers_per_seg == 0:
+ total_images = total_images_per_seg[all_seg_ids.index(current_seg_id)]
+ if total_images == 0:
plpy.error('Got 0 rows. Expected at least 1.')
- if SD['buffer_count'] == total_buffers_per_seg:
- agg_loss /= total_buffers_per_seg
- agg_accuracy /= total_buffers_per_seg
+ # Re-serialize the weights
+ # Update image count, check if we are done
+ if agg_image_count == total_images:
+ if total_images == 0:
+ plpy.error('Total images is 0')
+ # Once done with all images on a segment, we update weights
+ # with the total number of images here instead of the merge function.
+ # The merge function only deals with aggregating them.
+ updated_weights = [ total_images * w for w in updated_weights ]
if not is_platform_pg():
# In GPDB, each segment would have a keras session, so clear
# them after the last buffer is processed.
clear_keras_session()
+ elif agg_image_count > total_images:
+ plpy.error('Processed {0} images, but there were supposed to be only {1}!'
+ .format(agg_image_count,total_images))
new_model_state = madlib_keras_serializer.serialize_weights(
- agg_loss, agg_accuracy, SD['buffer_count'], updated_weights)
+ agg_loss, agg_accuracy, agg_image_count, updated_weights)
+
del x_train
del y_train
end_transition = time.time()
- plpy.info("Processed buffer {0}: Fit took {1} sec, Total was {2} sec".format(
- SD['buffer_count'], end_fit - start_fit, end_transition - start_transition))
+ plpy.info("Processed {0} images: Fit took {1} sec, Total was {2} sec".format(
+ image_count, end_fit - start_fit, end_transition - start_transition))
return new_model_state
def fit_merge(state1, state2, **kwargs):
+
# Return if called early
if not state1 or not state2:
return state1 or state2
# Deserialize states
- loss1, accuracy1, buffer_count1, weights1 = madlib_keras_serializer.deserialize_weights_merge(state1)
- loss2, accuracy2, buffer_count2, weights2 = madlib_keras_serializer.deserialize_weights_merge(state2)
- # plpy.info('merge buffer loss1 {}, accuracy1 {}, buffer count1 {}'.format(loss1, accuracy1, buffer_count1))
- # plpy.info('merge buffer loss2 {}, accuracy2 {}, buffer count2 {}'.format(loss2, accuracy2, buffer_count2))
-
- # Compute total buffer counts
- # buffer_count1, buffer_count2 = state1[2], state2[2]
- total_buffers = (buffer_count1 + buffer_count2) * 1.0
- if total_buffers == 0:
- plpy.error('total buffers in merge is 0')
- merge_weight1 = buffer_count1 / total_buffers
- merge_weight2 = buffer_count2 / total_buffers
-
- # Average the losses
- # loss1, loss2 = state1[0], state2[0]
- avg_loss = merge_weight1*loss1 + merge_weight2*loss2
-
- # Average the accuracies
- # accuracy1, accuracy2 = state1[1], state2[1]
- avg_accuracy = merge_weight1*accuracy1 + merge_weight2*accuracy2
-
- # Average the weights
- # weights1, weights2 = state1[3:], state2[3:]
- avg_weights = merge_weight1*weights1 + merge_weight2*weights2
- # avg_weights = [(merge_weight1 * e1) + (merge_weight2 * e2) for e1, e2 in zip(weights1, weights2)]
+ loss1, accuracy1, image_count1, weights1 = madlib_keras_serializer.deserialize_weights_merge(state1)
+ loss2, accuracy2, image_count2, weights2 = madlib_keras_serializer.deserialize_weights_merge(state2)
+
+ # Compute total image counts
+ image_count = (image_count1 + image_count2) * 1.0
+ plpy.info("FIT_MERGE: Mergeing {0} + {1} = {2} images".format(image_count1,image_count2,image_count))
+ if image_count == 0:
+ plpy.error('total images in merge is 0')
+
+ # Aggregate the losses
+ total_loss = loss1 + loss2
+
+ # Aggregate the accuracies
+ total_accuracy = accuracy1 + accuracy2
+
+ # Aggregate the weights
+ total_weights = weights1 + weights2
# Return the merged state
return madlib_keras_serializer.serialize_weights_merge(
- avg_loss, avg_accuracy, total_buffers, avg_weights)
+ total_loss, total_accuracy, image_count, total_weights)
def fit_final(state, **kwargs):
- return state
+ # Return if called early
+ if not state:
+ return state
+
+ loss, accuracy, image_count, weights = madlib_keras_serializer.deserialize_weights_merge(state)
+ # Averaging the accuracy, loss and weights
+ loss /= image_count
+ accuracy /= image_count
+ weights /= image_count
+ return madlib_keras_serializer.serialize_weights_merge(
+ loss, accuracy, image_count, weights)
def evaluate1(schema_madlib, model_table, test_table, id_col, model_arch_table,
model_arch_id, dependent_varname, independent_varname,
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
index 543bbed..3c44205 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
@@ -105,7 +105,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.fit_transition(
current_seg_id INTEGER,
num_classes INTEGER,
all_seg_ids INTEGER[],
- total_buffers_per_seg INTEGER[],
+ total_images_per_seg INTEGER[],
architecture TEXT,
compile_params TEXT,
fit_params TEXT,
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_serializer.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_serializer.py_in
index d52ed16..09f3eb8 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_serializer.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_serializer.py_in
@@ -31,7 +31,7 @@ def get_model_shapes(model):
# np.array(np.array(loss, acc, buff_count).concatenate(weights_np_array)).tostring()
# Proposed logic
-# loss , accuracy and buffer_count can be comma separated values
+# loss , accuracy and image_count can be comma separated values
# weights -> np.array.tostring()
# combine these 2 into one string by a random splitter
# serialized string -> loss_splitter_acc_splitter_buffer_splitter_weights
@@ -40,11 +40,11 @@ def deserialize_weights(model_state, model_shapes):
"""
Parameters:
model_state: a stringified (serialized) state containing loss,
- accuracy, buffer_count, and model_weights, passed from postgres
+ accuracy, image_count, and model_weights, passed from postgres
model_shapes: a list of tuples containing the shapes of each element
in keras.get_weights()
Returns:
- buffer_count: the buffer count from state
+ image_count: the buffer count from state
model_weights: a list of numpy arrays that can be inputted into keras.set_weights()
"""
if not model_state or not model_shapes:
@@ -57,13 +57,13 @@ def deserialize_weights(model_state, model_shapes):
weight_arr_portion = model_weights_serialized[i:next_pointer]
model_weights.append(weight_arr_portion.reshape(model_shapes[j]))
i, j = next_pointer, j + 1
- return int(float(state[0])), int(float(state[1])), int(float(state[2])), model_weights
+ return float(state[0]), float(state[1]), int(float(state[2])), model_weights
-def serialize_weights(loss, accuracy, buffer_count, model_weights):
+def serialize_weights(loss, accuracy, image_count, model_weights):
"""
Parameters:
- loss, accuracy, buffer_count: float values
+ loss, accuracy, image_count: float values
model_weights: a list of numpy arrays, what you get from
keras.get_weights()
Returns:
@@ -74,12 +74,11 @@ def serialize_weights(loss, accuracy, buffer_count, model_weights):
return None
flattened_weights = [w.flatten() for w in model_weights]
model_weights_serialized = np.concatenate(flattened_weights)
- new_model_string = np.array([loss, accuracy, buffer_count])
+ new_model_string = np.array([loss, accuracy, image_count])
new_model_string = np.concatenate((new_model_string, model_weights_serialized))
new_model_string = np.float32(new_model_string)
return new_model_string.tostring()
-
def deserialize_iteration_state(iteration_result):
"""
Parameters:
@@ -90,7 +89,7 @@ def deserialize_iteration_state(iteration_result):
new_model_state: the stringified (serialized) state to pass in to next
iteration of step function training, represents the averaged weights
from the last iteration of training; zeros out loss, accuracy,
- buffer_count in this state because the new iteration must start with
+ image_count in this state because the new iteration must start with
fresh values
"""
if not iteration_result:
@@ -105,12 +104,12 @@ def deserialize_iteration_state(iteration_result):
def deserialize_weights_merge(state):
"""
Parameters:
- state: the stringified (serialized) state containing loss, accuracy, buffer_count, and
+ state: the stringified (serialized) state containing loss, accuracy, image_count, and
model_weights, passed from postgres to merge function
Returns:
loss: the averaged loss from that iteration of training
accuracy: the averaged accuracy from that iteration of training
- buffer_count: total buffer counts processed
+ image_count: total buffer counts processed
model_weights: a single flattened numpy array containing all of the
weights, flattened because all we have to do is average them (so don't
have to reshape)
@@ -121,10 +120,10 @@ def deserialize_weights_merge(state):
return float(state[0]), float(state[1]), int(float(state[2])), state[3:]
-def serialize_weights_merge(loss, accuracy, buffer_count, model_weights):
+def serialize_weights_merge(loss, accuracy, image_count, model_weights):
"""
Parameters:
- loss, accuracy, buffer_count: float values
+ loss, accuracy, image_count: float values
model_weights: a single flattened numpy array containing all of the
weights, averaged in merge function over the 2 states
Returns:
@@ -133,7 +132,7 @@ def serialize_weights_merge(loss, accuracy, buffer_count, model_weights):
"""
if model_weights is None:
return None
- new_model_string = np.array([loss, accuracy, buffer_count])
+ new_model_string = np.array([loss, accuracy, image_count])
new_model_string = np.concatenate((new_model_string, model_weights))
new_model_string = np.float32(new_model_string)
return new_model_string.tostring()