You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by nk...@apache.org on 2020/03/18 18:10:22 UTC

[madlib] branch master updated: DL: Don't include weights as part of state except for the last row.

This is an automated email from the ASF dual-hosted git repository.

nkak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git


The following commit(s) were added to refs/heads/master by this push:
     new 41ace0b  DL: Don't include weights as part of state except for the last row.
41ace0b is described below

commit 41ace0b4aaaf1aec53fd2ad9b3050786022a9120
Author: Nikhil Kak <nk...@pivotal.io>
AuthorDate: Fri Mar 13 17:56:33 2020 -0700

    DL: Don't include weights as part of state except for the last row.
    
    JIRA: MADLIB-1418
    
    Previously the model state would consist of the image count and the serialized
    model weights but there was no need to include the serialized model weights as
    part of the state except for the last row for that segment. We were only using
    that state to get the image count and ignored the model weights, so serializing
    the model weights for every row was a waste to time and resources.
    
    This commit changes that pattern so that for all rows except the last row, the
    state would be just the image count only.
---
 .../modules/deep_learning/madlib_keras.py_in       | 41 +++++-----
 .../deep_learning/madlib_keras_serializer.py_in    | 12 ---
 .../test/unit_tests/test_madlib_keras.py_in        | 91 +++++++---------------
 3 files changed, 49 insertions(+), 95 deletions(-)

diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
index d6fb857..ee27554 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
@@ -488,9 +488,11 @@ def fit_transition(state, dependent_var, independent_var, dependent_var_shape,
                                                   accessible_gpus_for_seg[current_seg_id],
                                                   segments_per_host,
                                                   model_architecture, compile_params)
-    agg_image_count = madlib_keras_serializer.get_image_count_from_state(state)
     if not state:
+        agg_image_count = 0
         set_model_weights(segment_model, prev_serialized_weights)
+    else:
+        agg_image_count = float(state)
 
     # Prepare the data
     x_train = np_array_float32(independent_var, independent_var_shape)
@@ -500,42 +502,42 @@ def fit_transition(state, dependent_var, independent_var, dependent_var_shape,
     #TODO consider not doing this every time
     fit_params = parse_and_validate_fit_params(fit_params)
     segment_model.fit(x_train, y_train, **fit_params)
-    updated_model_weights = segment_model.get_weights()
 
     # Aggregating number of images, loss and accuracy
     agg_image_count += len(x_train)
     total_images = get_image_count_per_seg_from_array(dist_key_mapping.index(dist_key),
                                                       images_per_seg)
     is_last_row = agg_image_count == total_images
+    return_state = get_state_to_return(segment_model, is_last_row, is_multiple_model,
+                                  agg_image_count, total_images)
     if is_last_row:
         if is_final_iteration or is_multiple_model:
             SD_STORE.clear_SD(SD)
             clear_keras_session(sess)
 
-    return get_state_to_return(is_last_row, is_multiple_model, agg_image_count,
-                               total_images, updated_model_weights)
+    return return_state
 
-def get_state_to_return(is_last_row, is_multiple_model, agg_image_count,
-                        total_images, updated_model_weights):
+def get_state_to_return(segment_model, is_last_row, is_multiple_model, agg_image_count,
+                        total_images):
     """
-    1. For model averaging fit_transition, the state always contains the image count
-    as well as the model weights
-    2. For fit multiple transition,
-        a. The state that gets passed from one row/buffer (within the same hop)
-        to the next needs to have the image_count and model weights. image_count
-        is needed to keep track of the last image for that hop.
-        b. Once we get to the last row, the state only needs the model
-        weights. This state is the output of the UDA for that hop. We don't need
-        the image_count here because unlike model averaging, model hopper does
-        not have a merge function and there is no need to average the weights
-        based on the image count.
+    1. For both model averaging fit_transition and fit multiple transition, the
+    state only needs to have the image count except for the last row.
+    2. For model averaging fit_transition, the last row state must always contain
+    the image count as well as the model weights
+    3. For fit multiple transition, the last row state only needs the model
+    weights. This state is the output of the UDA for that hop. We don't need
+    the image_count here because unlike model averaging, model hopper does
+    not have a merge/final function and there is no need to average the weights
+    based on the image count.
+    :param segment_model: cached model for that segment
     :param is_last_row: boolean to indicate if last row for that hop
     :param is_multiple_model: boolean
     :param agg_image_count: aggregated image count per hop
-    :param updated_model_weights: updated weights after learning (calling keras.fit)
+    :param total_images: total images per segment
     :return:
     """
     if is_last_row:
+        updated_model_weights = segment_model.get_weights()
         if is_multiple_model:
             new_state = madlib_keras_serializer.serialize_nd_weights(updated_model_weights)
         else:
@@ -543,8 +545,7 @@ def get_state_to_return(is_last_row, is_multiple_model, agg_image_count,
             new_state = madlib_keras_serializer.serialize_state_with_nd_weights(
                 agg_image_count, updated_model_weights)
     else:
-        new_state = madlib_keras_serializer.serialize_state_with_nd_weights(
-            agg_image_count, updated_model_weights)
+        new_state = float(agg_image_count)
 
     return new_state
 
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_serializer.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_serializer.py_in
index 5ab1f36..6fa210c 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_serializer.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_serializer.py_in
@@ -48,18 +48,6 @@ and the 1d state. same for fit final
 6. Return the final state from fit final to fit which will then be deserialized
 as 1d weights to be passed on to the evaluate function
 """
-def get_image_count_from_state(state):
-    """
-    :param state: bytestring serialized model state containing image count
-    and weights
-    :return: image count as float
-    """
-    if not state:
-        image_count = 0
-    else:
-        image_count , _  = deserialize_as_image_1d_weights(state)
-    return image_count
-
 def get_serialized_1d_weights_from_state(state):
     """
     Output of this function is used to deserialize the output of each iteration
diff --git a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
index 6102341..ed3e0da 100644
--- a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
@@ -79,6 +79,8 @@ class MadlibKerasFitTestCase(unittest.TestCase):
         #  images each, or 30 images total
         self.total_images_per_seg = [3*len(self.dependent_var_int),20,40]
 
+        self.dummy_prev_weights = 'dummy weights'
+
     def tearDown(self):
         self.module_patcher.stop()
 
@@ -103,12 +105,8 @@ class MadlibKerasFitTestCase(unittest.TestCase):
             self.dist_key_mapping, 0, 4, self.total_images_per_seg, False,
             self.accessible_gpus_for_seg, previous_state.tostring(), True, **k)
 
-        state = np.fromstring(new_state, dtype=np.float32)
-        image_count = state[0]
-        weights = np.rint(state[1:]).astype(np.int)
+        image_count = new_state
         self.assertEqual(ending_image_count, image_count)
-        # weights should not be modified yet
-        self.assertTrue((self.model_weights == weights).all())
         # set_session is always called
         self.assertEqual(1, self.subject.K.set_session.call_count)
         # Clear session and sess.close must not get called for the first buffer
@@ -129,12 +127,9 @@ class MadlibKerasFitTestCase(unittest.TestCase):
             self.dist_key_mapping, 0, 4, self.total_images_per_seg, False,
             self.accessible_gpus_for_seg, previous_state.tostring(), False, **k)
 
-        state = np.fromstring(new_state, dtype=np.float32)
-        image_count = state[0]
-        weights = np.rint(state[1:]).astype(np.int)
+        image_count = new_state
+
         self.assertEqual(ending_image_count, image_count)
-        # weights should not be modified yet
-        self.assertTrue((self.model_weights == weights).all())
         # set_session is always called
         self.assertEqual(1, self.subject.K.set_session.call_count)
         # Clear session and sess.close must not get called for the first buffer
@@ -160,12 +155,8 @@ class MadlibKerasFitTestCase(unittest.TestCase):
             self.dist_key_mapping, 0, 4, self.total_images_per_seg, False,
             self.accessible_gpus_for_seg, previous_weights.tostring(), True, True, **k)
 
-        state = np.fromstring(new_state, dtype=np.float32)
-        image_count = state[0]
-        weights = np.rint(state[1:]).astype(np.int)
+        image_count = new_state
         self.assertEqual(ending_image_count, image_count)
-        # weights should not be modified yet
-        self.assertTrue((self.model_weights == weights).all())
         # set_session is always called
         self.assertEqual(1, self.subject.K.set_session.call_count)
         # Clear session must not be called for the first buffer
@@ -184,57 +175,45 @@ class MadlibKerasFitTestCase(unittest.TestCase):
 
         # last iteration Call
 
-        state = [starting_image_count]
-        state.extend(self.model_weights)
-        state = np.array(state, dtype=np.float32)
 
         self.subject.compile_and_set_weights(self.model, self.compile_params,
                                              '/cpu:0', self.serialized_weights)
         k = {'SD': {'segment_model': self.model, 'sess': Mock()}}
 
+        state = starting_image_count
         new_state = self.subject.fit_transition(
-            state.tostring(), self.dependent_var, self.independent_var,
+            state, self.dependent_var, self.independent_var,
             self.dependent_var_shape, self.independent_var_shape,
             self.model.to_json(), None, self.fit_params, 0,
             self.dist_key_mapping, 0, 4, self.total_images_per_seg, False,
-            self.accessible_gpus_for_seg, 'dummy_previous_state', True, **k)
+            self.accessible_gpus_for_seg, self.dummy_prev_weights, True, **k)
 
-        state = np.fromstring(new_state, dtype=np.float32)
-        image_count = state[0]
-        weights = np.rint(state[1:]).astype(np.int)
+        image_count = new_state
         self.assertEqual(ending_image_count, image_count)
-        # weights should not be modified yet
-        self.assertTrue((self.model_weights == weights).all())
+
         # set_session is always called
         self.assertEqual(1, self.subject.K.set_session.call_count)
         # Clear session and sess.close must not get called for the middle buffer
         self.assertEqual(0, self.subject.K.clear_session.call_count)
 
         # Non-last iteration Call
-
         self.subject.K.set_session.reset_mock()
         self.subject.K.clear_session.reset_mock()
-        state = [starting_image_count]
-        state.extend(self.model_weights)
-        state = np.array(state, dtype=np.float32)
 
         self.subject.compile_and_set_weights(self.model, self.compile_params,
                                              '/cpu:0', self.serialized_weights)
         k = {'SD': {'segment_model': self.model, 'sess': Mock()}}
 
+        state = starting_image_count
         new_state = self.subject.fit_transition(
-            state.tostring(), self.dependent_var, self.independent_var,
+            state, self.dependent_var, self.independent_var,
             self.dependent_var_shape, self.independent_var_shape,
             self.model.to_json(), None, self.fit_params, 0,
             self.dist_key_mapping, 0, 4, self.total_images_per_seg, False,
-            self.accessible_gpus_for_seg, 'dummy_previous_state', False, **k)
+            self.accessible_gpus_for_seg, self.dummy_prev_weights, False, **k)
 
-        state = np.fromstring(new_state, dtype=np.float32)
-        image_count = state[0]
-        weights = np.rint(state[1:]).astype(np.int)
+        image_count = new_state
         self.assertEqual(ending_image_count, image_count)
-        # weights should not be modified yet
-        self.assertTrue((self.model_weights == weights).all())
         # set_session is always called
         self.assertEqual(1, self.subject.K.set_session.call_count)
         # Clear session and sess.close must not get called for the middle buffer
@@ -251,27 +230,21 @@ class MadlibKerasFitTestCase(unittest.TestCase):
 
         # last iteration Call
 
-        state = [starting_image_count]
-        state.extend(self.model_weights)
-        state = np.array(state, dtype=np.float32)
 
         self.subject.compile_and_set_weights(self.model, self.compile_params,
                                              '/cpu:0', self.serialized_weights)
         k = {'SD': {'segment_model': self.model, 'sess': Mock()}}
 
+        state = starting_image_count
         new_state = self.subject.fit_transition(
-            state.tostring(), self.dependent_var, self.independent_var,
+            state, self.dependent_var, self.independent_var,
             self.dependent_var_shape, self.independent_var_shape,
             self.model.to_json(), None, self.fit_params, 0,
             self.dist_key_mapping, 0, 4, self.total_images_per_seg, False,
-            self.accessible_gpus_for_seg, 'dummy_previous_state', True, True, **k)
+            self.accessible_gpus_for_seg, self.dummy_prev_weights, True, True, **k)
 
-        state = np.fromstring(new_state, dtype=np.float32)
-        image_count = state[0]
-        weights = np.rint(state[1:]).astype(np.int)
+        image_count = new_state
         self.assertEqual(ending_image_count, image_count)
-        # weights should not be modified yet
-        self.assertTrue((self.model_weights == weights).all())
         # set_session is always called
         self.assertEqual(1, self.subject.K.set_session.call_count)
         # Clear session and sess.close must not get called for the middle buffer
@@ -288,9 +261,6 @@ class MadlibKerasFitTestCase(unittest.TestCase):
         ending_image_count = starting_image_count + len(self.dependent_var_int)
 
         # last iteration Call
-        state = [starting_image_count]
-        state.extend(self.model_weights)
-        state = np.array(state, dtype=np.float32)
 
         multiplied_weights = mult(self.total_images_per_seg[0],self.model_weights)
 
@@ -298,12 +268,13 @@ class MadlibKerasFitTestCase(unittest.TestCase):
                                              '/cpu:0', self.serialized_weights)
         k = {'SD': {'segment_model' :self.model, 'sess': Mock()}}
 
+        state = starting_image_count
         new_state = self.subject.fit_transition(
-            state.tostring(), self.dependent_var, self.independent_var,
+            state, self.dependent_var, self.independent_var,
             self.dependent_var_shape, self.independent_var_shape,
             self.model.to_json(), None, self.fit_params, 0,
             self.dist_key_mapping, 0, 4, self.total_images_per_seg, False,
-            self.accessible_gpus_for_seg, 'dummy_previous_state', True, **k)
+            self.accessible_gpus_for_seg, self.dummy_prev_weights, True, **k)
 
         state = np.fromstring(new_state, dtype=np.float32)
         image_count = state[0]
@@ -320,9 +291,6 @@ class MadlibKerasFitTestCase(unittest.TestCase):
         # Non-last iteration Call
         self.subject.K.set_session.reset_mock()
         self.subject.K.clear_session.reset_mock()
-        state = [starting_image_count]
-        state.extend(self.model_weights)
-        state = np.array(state, dtype=np.float32)
 
         multiplied_weights = mult(self.total_images_per_seg[0],self.model_weights)
 
@@ -330,12 +298,13 @@ class MadlibKerasFitTestCase(unittest.TestCase):
                                              '/cpu:0', self.serialized_weights)
         k = {'SD': {'segment_model' :self.model, 'sess': Mock()}}
 
+        state = starting_image_count
         new_state = self.subject.fit_transition(
-            state.tostring(), self.dependent_var, self.independent_var,
+            state, self.dependent_var, self.independent_var,
             self.dependent_var_shape, self.independent_var_shape,
             self.model.to_json(), None, self.fit_params, 0,
             self.dist_key_mapping, 0, 4, self.total_images_per_seg, False,
-            self.accessible_gpus_for_seg, 'dummy_previous_state', False, **k)
+            self.accessible_gpus_for_seg, self.dummy_prev_weights, False, **k)
 
         state = np.fromstring(new_state, dtype=np.float32)
         image_count = state[0]
@@ -359,21 +328,17 @@ class MadlibKerasFitTestCase(unittest.TestCase):
         ending_image_count = starting_image_count + len(self.dependent_var_int)
 
         # last iteration Call
-        state = [starting_image_count]
-        state.extend(self.model_weights)
-        state = np.array(state, dtype=np.float32)
-
-
         self.subject.compile_and_set_weights(self.model, self.compile_params,
                                              '/cpu:0', self.serialized_weights)
         k = {'SD': {'segment_model' :self.model, 'sess': Mock()}}
 
+        state = starting_image_count
         new_state = self.subject.fit_transition(
-            state.tostring(), self.dependent_var, self.independent_var,
+            state   , self.dependent_var, self.independent_var,
             self.dependent_var_shape, self.independent_var_shape,
             self.model.to_json(), None, self.fit_params, 0,
             self.dist_key_mapping, 0, 4, self.total_images_per_seg, False,
-            self.accessible_gpus_for_seg, 'dummy_previous_state', True, True, **k)
+            self.accessible_gpus_for_seg, self.dummy_prev_weights, True, True, **k)
 
         state = np.fromstring(new_state, dtype=np.float32)
         weights = np.rint(state[0:]).astype(np.int)