You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by nj...@apache.org on 2019/04/29 19:39:47 UTC

[madlib] branch master updated (27ddd27 -> 56efedf)

This is an automated email from the ASF dual-hosted git repository.

njayaram pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git.


    from 27ddd27  DL: Handle NULL value for optional pred_type param in predict
     new 918ce80  DL: Update madlib_keras_fit code to weight by images instead of buffers.
     new 94e5a6c  DL: Add new unit tests and update existing one's
     new 56efedf  DL: Refactor computation of images per segment

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../modules/deep_learning/madlib_keras.py_in       | 156 +++++++----
 .../modules/deep_learning/madlib_keras.sql_in      |   2 +-
 .../deep_learning/madlib_keras_serializer.py_in    |  27 +-
 .../modules/deep_learning/test/madlib_keras.sql_in |   7 +
 .../test/unit_tests/test_madlib_keras.py_in        | 287 +++++++++++++++------
 5 files changed, 335 insertions(+), 144 deletions(-)


[madlib] 03/03: DL: Refactor computation of images per segment

Posted by nj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

njayaram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git

commit 56efedfe91f239d436c48c3d35fd327d1f5d999e
Author: Domino Valdano <dv...@pivotal.io>
AuthorDate: Fri Apr 26 16:39:35 2019 -0700

    DL: Refactor computation of images per segment
    
    JIRA: MADLIB-1310
    Closes #378
---
 .../modules/deep_learning/madlib_keras.py_in       | 53 +++++++++++++++-------
 1 file changed, 37 insertions(+), 16 deletions(-)

diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
index 5bf215b..82d1069 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
@@ -89,25 +89,15 @@ def fit(schema_madlib, source_table, model, dependent_varname,
     # about making the fit function easier to read and maintain.
     if is_platform_pg():
         set_keras_session(use_gpu)
-        # Compute total images in dataset
-        total_images_per_seg = plpy.execute(
-            """ SELECT SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg
-                FROM {1}
-            """.format(dependent_varname, source_table))
-        seg_ids_train = "[]::integer[]"
-        gp_segment_id_col =  -1
     else:
-        # Compute total images on each segment
-        total_images_per_seg = plpy.execute(
-            """ SELECT gp_segment_id, SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg
-                FROM {1}
-                GROUP BY gp_segment_id
-            """.format(dependent_varname, source_table))
-        seg_ids_train = [int(each_segment["gp_segment_id"])
-                   for each_segment in total_images_per_seg]
-        gp_segment_id_col = 'gp_segment_id'
         # Disable GPU on master for gpdb
         os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
+
+    # Compute total images on each segment
+    gp_segment_id_col,\
+    seg_ids_train,\
+    total_images_per_seg = get_images_per_seg(source_table, dependent_varname)
+
     if validation_table:
         seg_ids_val, rows_per_seg_val = get_rows_per_seg_from_db(validation_table)
 
@@ -296,6 +286,37 @@ def fit(schema_madlib, source_table, model, dependent_varname,
     if is_platform_pg():
         clear_keras_session()
 
+def get_images_per_seg(source_table, dependent_varname):
+    """
+    Compute total images in each segment, by querying source_table.  For
+    postgres, this is just the total number of images in the db.
+    :param source_table:
+    :param dependent_var:
+    :return: Returns a string and two arrays
+    1. The appropriate string to use for querying segment number
+    ("gp_segment_id" for gpdb or "-1" for postgres).
+    1. An array containing all the segment numbers in ascending order
+    1. An array containing the total images on each of the segments in the
+    segment array.
+    """
+    if is_platform_pg():
+        total_images_per_seg = plpy.execute(
+            """ SELECT SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg
+                FROM {1}
+            """.format(dependent_varname, source_table))
+        seg_ids_train = "[]::integer[]"
+        gp_segment_id_col =  -1
+    else:
+        total_images_per_seg = plpy.execute(
+            """ SELECT gp_segment_id, SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg
+                FROM {1}
+                GROUP BY gp_segment_id
+            """.format(dependent_varname, source_table))
+        seg_ids_train = [int(each_segment["gp_segment_id"])
+                   for each_segment in total_images_per_seg]
+        gp_segment_id_col = 'gp_segment_id'
+    return gp_segment_id_col, seg_ids_train, total_images_per_seg
+ 
 def get_rows_per_seg_from_db(table_name):
     """
     This function queries the given table and returns the total rows per segment.


[madlib] 02/03: DL: Add new unit tests and update existing one's

Posted by nj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

njayaram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git

commit 94e5a6ca8e7145d81111bb7b96ff2e6f241cc00a
Author: Domino Valdano <dv...@pivotal.io>
AuthorDate: Thu Apr 25 18:14:28 2019 -0700

    DL: Add new unit tests and update existing one's
    
    JIRA: MADLIB-1310
    This commit also adds a commented SQL for creating validation data using
    minibatch_preprocessor_dl with batchsize as 1 - ensuring when we fix
    MADLIB-1326, we should have 2 rows as with 1 row, the fit_merge()
    function was never being called.  This will be a better end-to-end test.
    
    Closes #378
    Co-authored-by: Ekta Khanna <ek...@pivotal.io>
---
 .../modules/deep_learning/test/madlib_keras.sql_in |   7 +
 .../test/unit_tests/test_madlib_keras.py_in        | 287 +++++++++++++++------
 2 files changed, 214 insertions(+), 80 deletions(-)

diff --git a/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in b/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in
index 81a088e..527d6e8 100644
--- a/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in
+++ b/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in
@@ -50,6 +50,13 @@ copy cifar_10_sample_batched from stdin delimiter '|';
 0|{{0,1},{1,0}}|{{{{0.792157,0.8,0.780392},{0.792157,0.8,0.780392},{0.8,0.807843,0.788235},{0.807843,0.815686,0.796079},{0.815686,0.823529,0.803922},{0.819608,0.827451,0.807843},{0.823529,0.831373,0.811765},{0.831373,0.839216,0.823529},{0.835294,0.843137,0.831373},{0.843137,0.85098,0.839216},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.843137},{0.843137,0.85098,0.839216},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.839216},{0.85098,0.858824,0.839216 [...]
 \.
 
+-- In order to test fit_merge, we need at least 2 rows in the batched table (1 on each segment).
+-- As part of supporting Postgres, an issue was reported JIRA MADLIB-1326.
+-- If we don't fix the bug, we should regenerate the batched table with this command
+-- (and paste it into the file).  (If we do fix the bug, we can just uncomment this line,
+-- and remove the mocked output tables above.)
+-- SELECT minibatch_preprocessor_dl('cifar_10_sample','cifar_10_sample_batched','y','x', 1, 255);
+
 DROP TABLE IF EXISTS cifar_10_sample_batched_summary;
 CREATE TABLE cifar_10_sample_batched_summary(
     source_table text,
diff --git a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
index 0c4072b..3d27e1b 100644
--- a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
@@ -33,6 +33,10 @@ import plpy_mock as plpy
 
 m4_changequote(`<!', `!>')
 
+# helper for multiplying array by int
+def mult(k,arr):
+    return [ k*a for a in arr ]
+
 class MadlibKerasFitTestCase(unittest.TestCase):
     def setUp(self):
         self.plpy_mock = Mock(spec='error')
@@ -60,10 +64,15 @@ class MadlibKerasFitTestCase(unittest.TestCase):
         for a in self.model.get_weights():
             self.model_shapes.append(a.shape)
 
-        self.loss = 1.3
-        self.accuracy = 0.34
+        self.loss = 13.0
+        self.accuracy = 3.4
         self.all_seg_ids = [0,1,2]
-        self.total_buffers_per_seg = [3,3,3]
+
+        self.independent_var = [[[[0.5]]]] * 10
+        self.dependent_var = [[0,1]] * 10
+        # We test on segment 0, which has 3 buffers filled with 10 identical
+        #  images each, or 30 images total
+        self.total_images_per_seg = [3*len(self.dependent_var),20,40]
 
     def tearDown(self):
         self.module_patcher.stop()
@@ -76,23 +85,28 @@ class MadlibKerasFitTestCase(unittest.TestCase):
         self.subject.K.set_session = Mock()
         self.subject.clear_keras_session = Mock()
         self.subject.is_platform_pg = Mock(return_value = True)
-        buffer_count = 0
-        previous_state = [self.loss, self.accuracy, buffer_count]
+        starting_image_count = 0
+        ending_image_count = len(self.dependent_var)
+        previous_state = [self.loss, self.accuracy, starting_image_count]
         previous_state.extend(self.model_weights)
         previous_state = np.array(previous_state, dtype=np.float32)
 
-        k = {'SD': {'buffer_count': buffer_count}}
+        k = {'SD' : {}}
+
         new_model_state = self.subject.fit_transition(
-            None, [[[[0.5]]]] , [[0,1]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg,
+            None, self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg,
             self.model.to_json(), self.compile_params, self.fit_params, False,
             previous_state.tostring(), **k)
-        buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
-        self.assertEqual(1, buffer_count)
-        # set_session must get called ONLY once, when its the first buffer
+        state = np.fromstring(new_model_state, dtype=np.float32)
+        image_count = state[2]
+        weights = np.rint(state[3:]).astype(np.int)
+        self.assertEqual(ending_image_count, image_count)
+        # weights should not be modified yet
+        self.assertTrue((self.model_weights == weights).all())
+        # set_session must be not be called in transition func for PG
         self.assertEqual(0, self.subject.K.set_session.call_count)
         # Clear session and sess.close must not get called for the first buffer
         self.assertEqual(0, self.subject.clear_keras_session.call_count)
-        self.assertEqual(1, k['SD']['buffer_count'])
         self.assertTrue(k['SD']['segment_model'])
         self.assertTrue(k['SD']['model_shapes'])
 
@@ -104,115 +118,254 @@ class MadlibKerasFitTestCase(unittest.TestCase):
         self.subject.K.set_session = Mock()
         self.subject.clear_keras_session = Mock()
         self.subject.is_platform_pg = Mock(return_value = False)
-        buffer_count = 0
-        previous_state = [self.loss, self.accuracy, buffer_count]
+        starting_image_count = 0
+        ending_image_count = len(self.dependent_var)
+        previous_state = [self.loss, self.accuracy, starting_image_count]
         previous_state.extend(self.model_weights)
         previous_state = np.array(previous_state, dtype=np.float32)
 
-        k = {'SD': {'buffer_count': buffer_count}}
+        k = {'SD' : {}}
+
         new_model_state = self.subject.fit_transition(
-            None, [[[[0.5]]]] , [[0,1]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg,
+            None, self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg,
             self.model.to_json(), self.compile_params, self.fit_params, False,
             previous_state.tostring(), **k)
-        buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
-        self.assertEqual(1, buffer_count)
+        state = np.fromstring(new_model_state, dtype=np.float32)
+        image_count = state[2]
+        weights = np.rint(state[3:]).astype(np.int)
+        self.assertEqual(ending_image_count, image_count)
+        # weights should not be modified yet
+        self.assertTrue((self.model_weights == weights).all())
         # set_session must get called ONLY once, when its the first buffer
         self.assertEqual(1, self.subject.K.set_session.call_count)
         # Clear session and sess.close must not get called for the first buffer
         self.assertEqual(0, self.subject.clear_keras_session.call_count)
-        self.assertEqual(1, k['SD']['buffer_count'])
         self.assertTrue(k['SD']['segment_model'])
         self.assertTrue(k['SD']['model_shapes'])
 
-
-    def test_fit_transition_last_buffer_pass_pg(self):
+    def test_fit_transition_middle_buffer_pass(self):
         #TODO should we mock tensorflow's close_session and keras'
         # clear_session instead of mocking the function `clear_keras_session`
         self.subject.K.set_session = Mock()
         self.subject.clear_keras_session = Mock()
-        self.subject.is_platform_pg = Mock(return_value = True)
+        self.subject.is_platform_pg = Mock(return_value = False)
 
-        buffer_count = 2
+        starting_image_count = len(self.dependent_var)
+        ending_image_count = starting_image_count + len(self.dependent_var)
 
-        state = [self.loss, self.accuracy, buffer_count]
+        state = [self.loss, self.accuracy, starting_image_count]
         state.extend(self.model_weights)
         state = np.array(state, dtype=np.float32)
 
         self.subject.compile_and_set_weights(self.model, self.compile_params,
                                              '/cpu:0', state.tostring(), self.model_shapes)
-        k = {'SD': {'buffer_count': buffer_count,
-                   'model_shapes': self.model_shapes}}
+        k = {'SD': {'model_shapes': self.model_shapes}}
         k['SD']['segment_model'] = self.model
         new_model_state = self.subject.fit_transition(
-            state.tostring(), [[[[0.5]]]] , [[1,0]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg,
+            state.tostring(), self.independent_var, self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg,
             self.model.to_json(), None, self.fit_params, False, 'dummy_previous_state', **k)
 
-        buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
-        self.assertEqual(3, buffer_count)
+        state = np.fromstring(new_model_state, dtype=np.float32)
+        image_count = state[2]
+        weights = np.rint(state[3:]).astype(np.int)
+        self.assertEqual(ending_image_count, image_count)
+        # weights should not be modified yet
+        self.assertTrue((self.model_weights == weights).all())
         # set_session must get called ONLY once, when its the first buffer
         self.assertEqual(0, self.subject.K.set_session.call_count)
-        # Clear session and sess.close must not get called for the first buffer
+        # Clear session and sess.close must not get called for the middle buffer
         self.assertEqual(0, self.subject.clear_keras_session.call_count)
-        self.assertEqual(3, k['SD']['buffer_count'])
 
-    def test_fit_transition_last_buffer_pass_gpdb(self):
+    def test_fit_transition_last_buffer_pass_pg(self):
         #TODO should we mock tensorflow's close_session and keras'
         # clear_session instead of mocking the function `clear_keras_session`
         self.subject.K.set_session = Mock()
         self.subject.clear_keras_session = Mock()
-        self.subject.is_platform_pg = Mock(return_value = False)
+        self.subject.is_platform_pg = Mock(return_value = True)
 
-        buffer_count = 2
+        starting_image_count = 2*len(self.dependent_var)
+        ending_image_count = starting_image_count + len(self.dependent_var)
 
-        state = [self.loss, self.accuracy, buffer_count]
+        state = [self.loss, self.accuracy, starting_image_count]
         state.extend(self.model_weights)
         state = np.array(state, dtype=np.float32)
 
+        multiplied_weights = [ 1.0*self.total_images_per_seg[0]*w for w in self.model_weights]
+        multiplied_weights = np.rint(multiplied_weights).astype(np.int)
+
         self.subject.compile_and_set_weights(self.model, self.compile_params,
                                              '/cpu:0', state.tostring(), self.model_shapes)
-        k = {'SD': {'buffer_count': buffer_count,
-                   'model_shapes': self.model_shapes}}
+        k = {'SD': { 'model_shapes': self.model_shapes}}
         k['SD']['segment_model'] = self.model
         new_model_state = self.subject.fit_transition(
-            state.tostring(), [[[[0.5]]]] , [[1,0]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg,
+            state.tostring(), self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg,
             self.model.to_json(), None, self.fit_params, False, 'dummy_previous_state', **k)
 
-        buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
-        self.assertEqual(3, buffer_count)
-        # set_session must get called ONLY once, when its the first buffer
+        state = np.fromstring(new_model_state, dtype=np.float32)
+        image_count = state[2]
+        weights = np.rint(state[3:]).astype(np.int)
+        self.assertEqual(ending_image_count, image_count)
+        # weights should be multiplied by final image count
+        self.assertTrue((multiplied_weights == weights).all())
+        # set_session must be not be called in transition func for PG
         self.assertEqual(0, self.subject.K.set_session.call_count)
-        # Clear session and sess.close must not get called for the first buffer
-        self.assertEqual(1, self.subject.clear_keras_session.call_count)
-        self.assertEqual(3, k['SD']['buffer_count'])
+        # Clear session and sess.close must get called for the last buffer in gpdb,
+        #  but not in postgres
+        self.assertEqual(0, self.subject.clear_keras_session.call_count)
 
-    def test_fit_transition_middle_buffer_pass(self):
+    def test_fit_transition_last_buffer_pass_gpdb(self):
         #TODO should we mock tensorflow's close_session and keras'
         # clear_session instead of mocking the function `clear_keras_session`
         self.subject.K.set_session = Mock()
         self.subject.clear_keras_session = Mock()
+        self.subject.is_platform_pg = Mock(return_value = False)
 
-        buffer_count = 1
+        starting_image_count = 2*len(self.dependent_var)
+        ending_image_count = starting_image_count + len(self.dependent_var)
 
-        state = [self.loss, self.accuracy, buffer_count]
+        state = [self.loss, self.accuracy, starting_image_count]
         state.extend(self.model_weights)
         state = np.array(state, dtype=np.float32)
 
+        multiplied_weights = [ 1.0*self.total_images_per_seg[0]*w for w in self.model_weights]
+        multiplied_weights = np.rint(multiplied_weights).astype(np.int)
+
         self.subject.compile_and_set_weights(self.model, self.compile_params,
                                              '/cpu:0', state.tostring(), self.model_shapes)
-        k = {'SD': {'buffer_count': buffer_count,
-                   'model_shapes': self.model_shapes}}
+        k = {'SD': { 'model_shapes': self.model_shapes}}
         k['SD']['segment_model'] = self.model
         new_model_state = self.subject.fit_transition(
-            state.tostring(), [[[[0.5]]]] , [[0,1]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg,
+            state.tostring(), self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg,
             self.model.to_json(), None, self.fit_params, False, 'dummy_previous_state', **k)
 
-        buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
-        self.assertEqual(2, buffer_count)
+        state = np.fromstring(new_model_state, dtype=np.float32)
+        image_count = state[2]
+        weights = np.rint(state[3:]).astype(np.int)
+        self.assertEqual(ending_image_count, image_count)
+        # weights should be multiplied by final image count
+        self.assertTrue((multiplied_weights == weights).all())
         # set_session must get called ONLY once, when its the first buffer
         self.assertEqual(0, self.subject.K.set_session.call_count)
-        # Clear session and sess.close must not get called for the first buffer
-        self.assertEqual(0, self.subject.clear_keras_session.call_count)
-        self.assertEqual(2, k['SD']['buffer_count'])
+        # Clear session and sess.close must get called for the last buffer in gpdb,
+        #  but not in postgres
+        self.assertEqual(1, self.subject.clear_keras_session.call_count)
+
+    def test_fit_transition_ending_image_count_zero(self):
+        self.subject.K.set_session = Mock()
+        self.subject.clear_keras_session = Mock()
+        starting_image_count = 0
+        previous_state = [self.loss, self.accuracy, starting_image_count]
+        previous_state.extend(self.model_weights)
+        previous_state = np.array(previous_state, dtype=np.float32)
+
+        k = {'SD' : {}}
+
+        total_images_per_seg = [0,1,1]
+
+        with self.assertRaises(plpy.PLPYException):
+            new_model_state = self.subject.fit_transition(
+            None, self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, total_images_per_seg,
+            self.model.to_json(), self.compile_params, self.fit_params, False,
+            previous_state.tostring(), **k)
+
+    def test_fit_transition_too_many_images(self):
+        self.subject.K.set_session = Mock()
+        self.subject.clear_keras_session = Mock()
+        starting_image_count = 0
+        previous_state = [self.loss, self.accuracy, starting_image_count]
+        previous_state.extend(self.model_weights)
+        previous_state = np.array(previous_state, dtype=np.float32)
+
+        k = {'SD' : {}}
+
+        total_images_per_seg = [1,1,1]
+
+        with self.assertRaises(plpy.PLPYException):
+            new_model_state = self.subject.fit_transition(
+            None, self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, total_images_per_seg,
+            self.model.to_json(), self.compile_params, self.fit_params, False,
+            previous_state.tostring(), **k)
+ 
+
+    def test_fit_merge(self):
+        image_count = self.total_images_per_seg[0]
+        state1 = [3.0*self.loss, 3.0*self.accuracy, image_count]
+        state1.extend(mult(3,self.model_weights))
+        state1 = np.array(state1, dtype=np.float32)
+        state2 = [2.0*self.loss, 2.0*self.accuracy, image_count+30]
+        state2.extend(mult(2,self.model_weights))
+        state2 = np.array(state2, dtype=np.float32)
+        merged_state = self.subject.fit_merge(state1.tostring(),state2.tostring())
+        state = np.fromstring(merged_state, dtype=np.float32)
+        agg_loss = state[0]
+        agg_accuracy = state[1]
+        image_count_total = state[2]
+        weights = np.rint(state[3:]).astype(np.int)
+       
+        self.assertEqual( 2*image_count+30 , image_count_total )
+        self.assertAlmostEqual( 5.0*self.loss, agg_loss, 2)
+        self.assertAlmostEqual( 5.0*self.accuracy, agg_accuracy, 2)
+        self.assertTrue( (mult(5,self.model_weights) == weights).all())
+
+    def test_fit_merge_none_first(self):
+        image_count = self.total_images_per_seg[0]
+        input_state = [self.loss, self.accuracy, image_count]
+        input_state.extend(self.model_weights)
+        input_state = np.array(input_state, dtype=np.float32)
+        merged_state = self.subject.fit_merge(None, input_state.tostring())
+        state = np.fromstring(merged_state, dtype=np.float32)
+        agg_loss = state[0]
+        agg_accuracy = state[1]
+        image_count_total = state[2]
+        weights = np.rint(state[3:]).astype(np.int)
+       
+        self.assertEqual(image_count, image_count_total)
+        self.assertAlmostEqual(self.loss, agg_loss, 2)
+        self.assertAlmostEqual(self.accuracy, agg_accuracy, 2)
+        self.assertTrue((self.model_weights == weights).all())
+
+    def test_fit_merge_none_second(self):
+        image_count = self.total_images_per_seg[0]
+        input_state = [self.loss, self.accuracy, image_count]
+        input_state.extend(self.model_weights)
+        input_state = np.array(input_state, dtype=np.float32)
+        merged_state = self.subject.fit_merge(input_state.tostring(), None)
+        state = np.fromstring(merged_state, dtype=np.float32)
+        agg_loss = state[0]
+        agg_accuracy = state[1]
+        image_count_total = state[2]
+        weights = np.rint(state[3:]).astype(np.int)
+       
+        self.assertEqual(image_count, image_count_total)
+        self.assertAlmostEqual(self.loss, agg_loss, 2)
+        self.assertAlmostEqual(self.accuracy, agg_accuracy, 2)
+        self.assertTrue((self.model_weights == weights).all())
+
+    def test_fit_merge_both_none(self):
+        result = self.subject.fit_merge(None,None)
+        self.assertEqual(None, result)
+
+    def test_fit_final(self):
+        image_count = self.total_images_per_seg[0]
+        input_state = [image_count*self.loss, image_count*self.accuracy, image_count]
+        input_state.extend(mult(image_count,self.model_weights))
+        input_state = np.array(input_state, dtype=np.float32)
+        
+        output_state = self.subject.fit_final(input_state.tostring())
+        output_state = np.fromstring(output_state, dtype=np.float32)
+        agg_loss = output_state[0]
+        agg_accuracy = output_state[1]
+        image_count_output = output_state[2]
+        weights = np.rint(output_state[3:]).astype(np.int)
+
+        self.assertEqual(image_count, image_count_output)
+        self.assertAlmostEqual(self.loss, agg_loss,2)
+        self.assertAlmostEqual(self.accuracy, agg_accuracy,2)
+        self.assertTrue((self.model_weights == weights).all())
+
+    def fit_final_none(self):
+        result = self.subject.fit_final(None)
+        self.assertEqual(result, None)
 
     def test_get_device_name_and_set_cuda_env(self):
         import os
@@ -305,19 +458,6 @@ class MadlibKerasValidatorTestCase(unittest.TestCase):
         import madlib_keras_validator
         self.subject = madlib_keras_validator
 
-        self.model = Sequential()
-        self.model.add(Conv2D(2, kernel_size=(1, 1), activation='relu',
-                              input_shape=(1,1,1,), padding='same'))
-        self.model.add(Flatten())
-
-        self.compile_params = "'optimizer'=SGD(lr=0.01, decay=1e-6, nesterov=True), 'loss'='categorical_crossentropy', 'metrics'=['accuracy']"
-        self.fit_params = "'batch_size'=1, 'epochs'=1"
-        self.model_weights = [3,4,5,6]
-        self.loss = 1.3
-        self.accuracy = 0.34
-        self.all_seg_ids = [0,1,2]
-        self.total_buffers_per_seg = [3,3,3]
-
     def tearDown(self):
         self.module_patcher.stop()
 
@@ -359,19 +499,6 @@ class MadlibSerializerTestCase(unittest.TestCase):
         import madlib_keras_serializer
         self.subject = madlib_keras_serializer
 
-        self.model = Sequential()
-        self.model.add(Conv2D(2, kernel_size=(1, 1), activation='relu',
-                              input_shape=(1,1,1,), padding='same'))
-        self.model.add(Flatten())
-
-        self.compile_params = "'optimizer'=SGD(lr=0.01, decay=1e-6, nesterov=True), 'loss'='categorical_crossentropy', 'metrics'=['accuracy']"
-        self.fit_params = "'batch_size'=1, 'epochs'=1"
-        self.model_weights = [3,4,5,6]
-        self.loss = 1.3
-        self.accuracy = 0.34
-        self.all_seg_ids = [0,1,2]
-        self.total_buffers_per_seg = [3,3,3]
-
     def tearDown(self):
         self.module_patcher.stop()
 


[madlib] 01/03: DL: Update madlib_keras_fit code to weight by images instead of buffers.

Posted by nj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

njayaram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git

commit 918ce80c0a4f10f23ab8c5e2acfc622b5ba71a2b
Author: Domino Valdano <dv...@pivotal.io>
AuthorDate: Thu Apr 25 18:14:20 2019 -0700

    DL: Update madlib_keras_fit code to weight by images instead of buffers.
    
    JIRA: MADLIB-1310
    Prior to this commit, while merging the weights, accuracy and loss, we
    assume that all the buffers are of the same size. This may not be always
    the case.  Ideally we should use the total no of images per segment to
    average the weights, accuracy and loss.
    This commit, updates the transition, merge and final function for
    `madlib_keras_fit()` to use the number of images on a segment instead of
    the buffer count for averaging weights, accuracy and loss.
    
    Additionally, the function `deserialize_weights()` when deserializing
    loss and accuracy from state, type-casted it to an int, which resulted
    in computing lower loss and accuracy while training. This commit also
    addresses this issue.
    
    Closes #378
    Co-authored-by: Ekta Khanna <ek...@pivotal.io>
---
 .../modules/deep_learning/madlib_keras.py_in       | 131 +++++++++++++--------
 .../modules/deep_learning/madlib_keras.sql_in      |   2 +-
 .../deep_learning/madlib_keras_serializer.py_in    |  27 ++---
 3 files changed, 98 insertions(+), 62 deletions(-)

diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
index bd12f55..5bf215b 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
@@ -49,11 +49,18 @@ from utilities.model_arch_info import get_num_classes
 from utilities.utilities import is_platform_pg
 from utilities.utilities import madlib_version
 from utilities.validate_args import get_col_value_and_type
+from utilities.validate_args import quote_ident
 
 def fit(schema_madlib, source_table, model, dependent_varname,
         independent_varname, model_arch_table, model_arch_id, compile_params,
         fit_params, num_iterations, use_gpu = True,
         validation_table=None, name="", description="", **kwargs):
+
+    source_table = quote_ident(source_table)
+    dependent_varname = quote_ident(dependent_varname)
+    independent_varname = quote_ident(independent_varname)
+    model_arch_table = quote_ident(model_arch_table)
+
     fit_validator = FitInputValidator(
         source_table, validation_table, model, model_arch_table,
         dependent_varname, independent_varname, num_iterations)
@@ -82,12 +89,25 @@ def fit(schema_madlib, source_table, model, dependent_varname,
     # about making the fit function easier to read and maintain.
     if is_platform_pg():
         set_keras_session(use_gpu)
+        # Compute total images in dataset
+        total_images_per_seg = plpy.execute(
+            """ SELECT SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg
+                FROM {1}
+            """.format(dependent_varname, source_table))
+        seg_ids_train = "[]::integer[]"
         gp_segment_id_col =  -1
     else:
+        # Compute total images on each segment
+        total_images_per_seg = plpy.execute(
+            """ SELECT gp_segment_id, SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg
+                FROM {1}
+                GROUP BY gp_segment_id
+            """.format(dependent_varname, source_table))
+        seg_ids_train = [int(each_segment["gp_segment_id"])
+                   for each_segment in total_images_per_seg]
         gp_segment_id_col = 'gp_segment_id'
         # Disable GPU on master for gpdb
         os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
-    seg_ids_train, rows_per_seg_train = get_rows_per_seg_from_db(source_table)
     if validation_table:
         seg_ids_val, rows_per_seg_val = get_rows_per_seg_from_db(validation_table)
 
@@ -110,6 +130,9 @@ def fit(schema_madlib, source_table, model, dependent_varname,
     validation_set_provided = bool(validation_table)
     validation_aggregate_accuracy = []; validation_aggregate_loss = []
 
+    total_images_per_seg = [int(each_segment["total_images_per_seg"])
+        for each_segment in total_images_per_seg]
+
     # Prepare the SQL for running distributed training via UDA
     compile_params_to_pass = "$madlib$" + compile_params + "$madlib$"
     fit_params_to_pass = "$madlib$" + fit_params + "$madlib$"
@@ -120,7 +143,7 @@ def fit(schema_madlib, source_table, model, dependent_varname,
             {gp_segment_id_col},
             {num_classes}::INTEGER,
             ARRAY{seg_ids_train},
-            ARRAY{rows_per_seg_train},
+            ARRAY{total_images_per_seg},
             $MAD${model_arch}$MAD$::TEXT,
             {compile_params_to_pass}::TEXT,
             {fit_params_to_pass}::TEXT,
@@ -305,7 +328,7 @@ def get_rows_per_seg_from_db(table_name):
 
 
 def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
-                   all_seg_ids, total_buffers_per_seg, architecture,
+                   all_seg_ids, total_images_per_seg, architecture,
                    compile_params, fit_params, use_gpu, previous_state,
                    **kwargs):
 
@@ -317,7 +340,7 @@ def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
     :param current_seg_id:
     :param num_classes:
     :param all_seg_ids:
-    :param total_buffers_per_seg:
+    :param total_images_per_seg:
     :param architecture:
     :param compile_params:
     :param fit_params:
@@ -344,13 +367,14 @@ def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
         compile_and_set_weights(segment_model, compile_params, device_name,
                                 previous_state, SD['model_shapes'])
         SD['segment_model'] = segment_model
-        SD['buffer_count'] = 0
+        image_count = 0
         agg_loss = 0
         agg_accuracy = 0
+        agg_image_count = 0
     else:
         segment_model = SD['segment_model']
         #TODO we don't need to deserialize the weights here.
-        agg_loss, agg_accuracy, _, _ = madlib_keras_serializer.deserialize_weights(
+        agg_loss, agg_accuracy, agg_image_count, _ = madlib_keras_serializer.deserialize_weights(
             state, SD['model_shapes'])
 
     # Prepare the data
@@ -367,79 +391,92 @@ def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
         accuracy = history.history['acc'][0]
     end_fit = time.time()
 
-    # Re-serialize the weights
-    # Update buffer count, check if we are done
-    SD['buffer_count'] += 1
-    agg_loss += loss
-    agg_accuracy += accuracy
+    image_count = len(x_train)
+    # Aggregating number of images, loss and accuracy
+    agg_image_count += image_count
+    agg_loss += (image_count * loss)
+    agg_accuracy += (image_count * accuracy)
 
     with K.tf.device(device_name):
         updated_weights = segment_model.get_weights()
     if is_platform_pg():
-        total_buffers_per_seg = total_buffers_per_seg[0]
+        total_images = total_images_per_seg[0]
     else:
-        total_buffers_per_seg = total_buffers_per_seg[all_seg_ids.index(current_seg_id)]
-    if total_buffers_per_seg == 0:
+        total_images = total_images_per_seg[all_seg_ids.index(current_seg_id)]
+    if total_images == 0:
         plpy.error('Got 0 rows. Expected at least 1.')
 
-    if SD['buffer_count'] == total_buffers_per_seg:
-        agg_loss /= total_buffers_per_seg
-        agg_accuracy /= total_buffers_per_seg
+    # Re-serialize the weights
+    # Update image count, check if we are done
+    if agg_image_count == total_images:
+        if total_images == 0:
+            plpy.error('Total images is 0')
+        # Once done with all images on a segment, we update weights
+        # with the total number of images here instead of the merge function.
+        # The merge function only deals with aggregating them.
+        updated_weights = [ total_images * w for w in updated_weights ]
         if not is_platform_pg():
             # In GPDB, each segment would have a keras session, so clear
             # them after the last buffer is processed.
             clear_keras_session()
+    elif agg_image_count > total_images:
+        plpy.error('Processed {0} images, but there were supposed to be only {1}!'
+            .format(agg_image_count,total_images))
 
     new_model_state = madlib_keras_serializer.serialize_weights(
-        agg_loss, agg_accuracy, SD['buffer_count'], updated_weights)
+            agg_loss, agg_accuracy, agg_image_count, updated_weights)
+
 
     del x_train
     del y_train
 
     end_transition = time.time()
-    plpy.info("Processed buffer {0}: Fit took {1} sec, Total was {2} sec".format(
-        SD['buffer_count'], end_fit - start_fit, end_transition - start_transition))
+    plpy.info("Processed {0} images: Fit took {1} sec, Total was {2} sec".format(
+        image_count, end_fit - start_fit, end_transition - start_transition))
 
     return new_model_state
 
 def fit_merge(state1, state2, **kwargs):
+
     # Return if called early
     if not state1 or not state2:
         return state1 or state2
 
     # Deserialize states
-    loss1, accuracy1, buffer_count1, weights1 = madlib_keras_serializer.deserialize_weights_merge(state1)
-    loss2, accuracy2, buffer_count2, weights2 = madlib_keras_serializer.deserialize_weights_merge(state2)
-        # plpy.info('merge buffer loss1 {}, accuracy1 {}, buffer count1 {}'.format(loss1, accuracy1, buffer_count1))
-    # plpy.info('merge buffer loss2 {}, accuracy2 {}, buffer count2 {}'.format(loss2, accuracy2, buffer_count2))
-
-    # Compute total buffer counts
-    # buffer_count1, buffer_count2 = state1[2], state2[2]
-    total_buffers = (buffer_count1 + buffer_count2) * 1.0
-    if total_buffers == 0:
-        plpy.error('total buffers in merge is 0')
-    merge_weight1 = buffer_count1 / total_buffers
-    merge_weight2 = buffer_count2 / total_buffers
-
-    # Average the losses
-    # loss1, loss2 = state1[0], state2[0]
-    avg_loss = merge_weight1*loss1 + merge_weight2*loss2
-
-    # Average the accuracies
-    # accuracy1, accuracy2 = state1[1], state2[1]
-    avg_accuracy = merge_weight1*accuracy1 + merge_weight2*accuracy2
-
-    # Average the weights
-    # weights1, weights2 = state1[3:], state2[3:]
-    avg_weights = merge_weight1*weights1 + merge_weight2*weights2
-    # avg_weights = [(merge_weight1 * e1) + (merge_weight2 * e2) for e1, e2 in zip(weights1, weights2)]
+    loss1, accuracy1, image_count1, weights1 = madlib_keras_serializer.deserialize_weights_merge(state1)
+    loss2, accuracy2, image_count2, weights2 = madlib_keras_serializer.deserialize_weights_merge(state2)
+
+    # Compute total image counts
+    image_count = (image_count1 + image_count2) * 1.0
+    plpy.info("FIT_MERGE: Mergeing {0} + {1} = {2} images".format(image_count1,image_count2,image_count))
+    if image_count == 0:
+        plpy.error('total images in merge is 0')
+
+    # Aggregate the losses
+    total_loss = loss1 + loss2
+
+    # Aggregate the accuracies
+    total_accuracy = accuracy1 + accuracy2
+
+    # Aggregate the weights
+    total_weights = weights1 + weights2
 
     # Return the merged state
     return madlib_keras_serializer.serialize_weights_merge(
-        avg_loss, avg_accuracy, total_buffers, avg_weights)
+        total_loss, total_accuracy, image_count, total_weights)
 
 def fit_final(state, **kwargs):
-    return state
+    # Return if called early
+    if not state:
+        return state
+
+    loss, accuracy, image_count, weights = madlib_keras_serializer.deserialize_weights_merge(state)
+    # Averaging the accuracy, loss and weights
+    loss /= image_count
+    accuracy /= image_count
+    weights /= image_count
+    return madlib_keras_serializer.serialize_weights_merge(
+        loss, accuracy, image_count, weights)
 
 def evaluate1(schema_madlib, model_table, test_table, id_col, model_arch_table,
             model_arch_id, dependent_varname, independent_varname,
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
index 543bbed..3c44205 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
@@ -105,7 +105,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.fit_transition(
     current_seg_id             INTEGER,
     num_classes                INTEGER,
     all_seg_ids                INTEGER[],
-    total_buffers_per_seg      INTEGER[],
+    total_images_per_seg       INTEGER[],
     architecture               TEXT,
     compile_params             TEXT,
     fit_params                 TEXT,
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_serializer.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_serializer.py_in
index d52ed16..09f3eb8 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_serializer.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_serializer.py_in
@@ -31,7 +31,7 @@ def get_model_shapes(model):
 # np.array(np.array(loss, acc, buff_count).concatenate(weights_np_array)).tostring()
 
 # Proposed logic
-# loss , accuracy and buffer_count can be comma separated values
+# loss , accuracy and image_count can be comma separated values
 # weights -> np.array.tostring()
 # combine these 2 into one string by a random splitter
 # serialized string -> loss_splitter_acc_splitter_buffer_splitter_weights
@@ -40,11 +40,11 @@ def deserialize_weights(model_state, model_shapes):
     """
     Parameters:
         model_state: a stringified (serialized) state containing loss,
-        accuracy, buffer_count, and model_weights, passed from postgres
+        accuracy, image_count, and model_weights, passed from postgres
         model_shapes: a list of tuples containing the shapes of each element
         in keras.get_weights()
     Returns:
-        buffer_count: the buffer count from state
+        image_count: the buffer count from state
         model_weights: a list of numpy arrays that can be inputted into keras.set_weights()
     """
     if not model_state or not model_shapes:
@@ -57,13 +57,13 @@ def deserialize_weights(model_state, model_shapes):
         weight_arr_portion = model_weights_serialized[i:next_pointer]
         model_weights.append(weight_arr_portion.reshape(model_shapes[j]))
         i, j = next_pointer, j + 1
-    return int(float(state[0])), int(float(state[1])), int(float(state[2])), model_weights
+    return float(state[0]), float(state[1]), int(float(state[2])), model_weights
 
 
-def serialize_weights(loss, accuracy, buffer_count, model_weights):
+def serialize_weights(loss, accuracy, image_count, model_weights):
     """
     Parameters:
-        loss, accuracy, buffer_count: float values
+        loss, accuracy, image_count: float values
         model_weights: a list of numpy arrays, what you get from
         keras.get_weights()
     Returns:
@@ -74,12 +74,11 @@ def serialize_weights(loss, accuracy, buffer_count, model_weights):
         return None
     flattened_weights = [w.flatten() for w in model_weights]
     model_weights_serialized = np.concatenate(flattened_weights)
-    new_model_string = np.array([loss, accuracy, buffer_count])
+    new_model_string = np.array([loss, accuracy, image_count])
     new_model_string = np.concatenate((new_model_string, model_weights_serialized))
     new_model_string = np.float32(new_model_string)
     return new_model_string.tostring()
 
-
 def deserialize_iteration_state(iteration_result):
     """
     Parameters:
@@ -90,7 +89,7 @@ def deserialize_iteration_state(iteration_result):
         new_model_state: the stringified (serialized) state to pass in to next
         iteration of step function training, represents the averaged weights
         from the last iteration of training; zeros out loss, accuracy,
-        buffer_count in this state because the new iteration must start with
+        image_count in this state because the new iteration must start with
         fresh values
     """
     if not iteration_result:
@@ -105,12 +104,12 @@ def deserialize_iteration_state(iteration_result):
 def deserialize_weights_merge(state):
     """
     Parameters:
-        state: the stringified (serialized) state containing loss, accuracy, buffer_count, and
+        state: the stringified (serialized) state containing loss, accuracy, image_count, and
             model_weights, passed from postgres to merge function
     Returns:
         loss: the averaged loss from that iteration of training
         accuracy: the averaged accuracy from that iteration of training
-        buffer_count: total buffer counts processed
+        image_count: total buffer counts processed
         model_weights: a single flattened numpy array containing all of the
         weights, flattened because all we have to do is average them (so don't
         have to reshape)
@@ -121,10 +120,10 @@ def deserialize_weights_merge(state):
     return float(state[0]), float(state[1]), int(float(state[2])), state[3:]
 
 
-def serialize_weights_merge(loss, accuracy, buffer_count, model_weights):
+def serialize_weights_merge(loss, accuracy, image_count, model_weights):
     """
     Parameters:
-        loss, accuracy, buffer_count: float values
+        loss, accuracy, image_count: float values
         model_weights: a single flattened numpy array containing all of the
         weights, averaged in merge function over the 2 states
     Returns:
@@ -133,7 +132,7 @@ def serialize_weights_merge(loss, accuracy, buffer_count, model_weights):
     """
     if model_weights is None:
         return None
-    new_model_string = np.array([loss, accuracy, buffer_count])
+    new_model_string = np.array([loss, accuracy, image_count])
     new_model_string = np.concatenate((new_model_string, model_weights))
     new_model_string = np.float32(new_model_string)
     return new_model_string.tostring()