You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by nj...@apache.org on 2019/04/29 19:39:49 UTC

[madlib] 02/03: DL: Add new unit tests and update existing one's

This is an automated email from the ASF dual-hosted git repository.

njayaram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git

commit 94e5a6ca8e7145d81111bb7b96ff2e6f241cc00a
Author: Domino Valdano <dv...@pivotal.io>
AuthorDate: Thu Apr 25 18:14:28 2019 -0700

    DL: Add new unit tests and update existing one's
    
    JIRA: MADLIB-1310
    This commit also adds a commented SQL for creating validation data using
    minibatch_preprocessor_dl with batchsize as 1 - ensuring when we fix
    MADLIB-1326, we should have 2 rows as with 1 row, the fit_merge()
    function was never being called.  This will be a better end-to-end test.
    
    Closes #378
    Co-authored-by: Ekta Khanna <ek...@pivotal.io>
---
 .../modules/deep_learning/test/madlib_keras.sql_in |   7 +
 .../test/unit_tests/test_madlib_keras.py_in        | 287 +++++++++++++++------
 2 files changed, 214 insertions(+), 80 deletions(-)

diff --git a/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in b/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in
index 81a088e..527d6e8 100644
--- a/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in
+++ b/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in
@@ -50,6 +50,13 @@ copy cifar_10_sample_batched from stdin delimiter '|';
 0|{{0,1},{1,0}}|{{{{0.792157,0.8,0.780392},{0.792157,0.8,0.780392},{0.8,0.807843,0.788235},{0.807843,0.815686,0.796079},{0.815686,0.823529,0.803922},{0.819608,0.827451,0.807843},{0.823529,0.831373,0.811765},{0.831373,0.839216,0.823529},{0.835294,0.843137,0.831373},{0.843137,0.85098,0.839216},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.843137},{0.843137,0.85098,0.839216},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.839216},{0.85098,0.858824,0.839216 [...]
 \.
 
+-- In order to test fit_merge, we need at least 2 rows in the batched table (1 on each segment).
+-- As part of supporting Postgres, an issue was reported JIRA MADLIB-1326.
+-- If we don't fix the bug, we should regenerate the batched table with this command
+-- (and paste it into the file).  (If we do fix the bug, we can just uncomment this line,
+-- and remove the mocked output tables above.)
+-- SELECT minibatch_preprocessor_dl('cifar_10_sample','cifar_10_sample_batched','y','x', 1, 255);
+
 DROP TABLE IF EXISTS cifar_10_sample_batched_summary;
 CREATE TABLE cifar_10_sample_batched_summary(
     source_table text,
diff --git a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
index 0c4072b..3d27e1b 100644
--- a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
@@ -33,6 +33,10 @@ import plpy_mock as plpy
 
 m4_changequote(`<!', `!>')
 
+# helper for multiplying array by int
+def mult(k,arr):
+    return [ k*a for a in arr ]
+
 class MadlibKerasFitTestCase(unittest.TestCase):
     def setUp(self):
         self.plpy_mock = Mock(spec='error')
@@ -60,10 +64,15 @@ class MadlibKerasFitTestCase(unittest.TestCase):
         for a in self.model.get_weights():
             self.model_shapes.append(a.shape)
 
-        self.loss = 1.3
-        self.accuracy = 0.34
+        self.loss = 13.0
+        self.accuracy = 3.4
         self.all_seg_ids = [0,1,2]
-        self.total_buffers_per_seg = [3,3,3]
+
+        self.independent_var = [[[[0.5]]]] * 10
+        self.dependent_var = [[0,1]] * 10
+        # We test on segment 0, which has 3 buffers filled with 10 identical
+        #  images each, or 30 images total
+        self.total_images_per_seg = [3*len(self.dependent_var),20,40]
 
     def tearDown(self):
         self.module_patcher.stop()
@@ -76,23 +85,28 @@ class MadlibKerasFitTestCase(unittest.TestCase):
         self.subject.K.set_session = Mock()
         self.subject.clear_keras_session = Mock()
         self.subject.is_platform_pg = Mock(return_value = True)
-        buffer_count = 0
-        previous_state = [self.loss, self.accuracy, buffer_count]
+        starting_image_count = 0
+        ending_image_count = len(self.dependent_var)
+        previous_state = [self.loss, self.accuracy, starting_image_count]
         previous_state.extend(self.model_weights)
         previous_state = np.array(previous_state, dtype=np.float32)
 
-        k = {'SD': {'buffer_count': buffer_count}}
+        k = {'SD' : {}}
+
         new_model_state = self.subject.fit_transition(
-            None, [[[[0.5]]]] , [[0,1]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg,
+            None, self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg,
             self.model.to_json(), self.compile_params, self.fit_params, False,
             previous_state.tostring(), **k)
-        buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
-        self.assertEqual(1, buffer_count)
-        # set_session must get called ONLY once, when its the first buffer
+        state = np.fromstring(new_model_state, dtype=np.float32)
+        image_count = state[2]
+        weights = np.rint(state[3:]).astype(np.int)
+        self.assertEqual(ending_image_count, image_count)
+        # weights should not be modified yet
+        self.assertTrue((self.model_weights == weights).all())
+        # set_session must be not be called in transition func for PG
         self.assertEqual(0, self.subject.K.set_session.call_count)
         # Clear session and sess.close must not get called for the first buffer
         self.assertEqual(0, self.subject.clear_keras_session.call_count)
-        self.assertEqual(1, k['SD']['buffer_count'])
         self.assertTrue(k['SD']['segment_model'])
         self.assertTrue(k['SD']['model_shapes'])
 
@@ -104,115 +118,254 @@ class MadlibKerasFitTestCase(unittest.TestCase):
         self.subject.K.set_session = Mock()
         self.subject.clear_keras_session = Mock()
         self.subject.is_platform_pg = Mock(return_value = False)
-        buffer_count = 0
-        previous_state = [self.loss, self.accuracy, buffer_count]
+        starting_image_count = 0
+        ending_image_count = len(self.dependent_var)
+        previous_state = [self.loss, self.accuracy, starting_image_count]
         previous_state.extend(self.model_weights)
         previous_state = np.array(previous_state, dtype=np.float32)
 
-        k = {'SD': {'buffer_count': buffer_count}}
+        k = {'SD' : {}}
+
         new_model_state = self.subject.fit_transition(
-            None, [[[[0.5]]]] , [[0,1]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg,
+            None, self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg,
             self.model.to_json(), self.compile_params, self.fit_params, False,
             previous_state.tostring(), **k)
-        buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
-        self.assertEqual(1, buffer_count)
+        state = np.fromstring(new_model_state, dtype=np.float32)
+        image_count = state[2]
+        weights = np.rint(state[3:]).astype(np.int)
+        self.assertEqual(ending_image_count, image_count)
+        # weights should not be modified yet
+        self.assertTrue((self.model_weights == weights).all())
         # set_session must get called ONLY once, when its the first buffer
         self.assertEqual(1, self.subject.K.set_session.call_count)
         # Clear session and sess.close must not get called for the first buffer
         self.assertEqual(0, self.subject.clear_keras_session.call_count)
-        self.assertEqual(1, k['SD']['buffer_count'])
         self.assertTrue(k['SD']['segment_model'])
         self.assertTrue(k['SD']['model_shapes'])
 
-
-    def test_fit_transition_last_buffer_pass_pg(self):
+    def test_fit_transition_middle_buffer_pass(self):
         #TODO should we mock tensorflow's close_session and keras'
         # clear_session instead of mocking the function `clear_keras_session`
         self.subject.K.set_session = Mock()
         self.subject.clear_keras_session = Mock()
-        self.subject.is_platform_pg = Mock(return_value = True)
+        self.subject.is_platform_pg = Mock(return_value = False)
 
-        buffer_count = 2
+        starting_image_count = len(self.dependent_var)
+        ending_image_count = starting_image_count + len(self.dependent_var)
 
-        state = [self.loss, self.accuracy, buffer_count]
+        state = [self.loss, self.accuracy, starting_image_count]
         state.extend(self.model_weights)
         state = np.array(state, dtype=np.float32)
 
         self.subject.compile_and_set_weights(self.model, self.compile_params,
                                              '/cpu:0', state.tostring(), self.model_shapes)
-        k = {'SD': {'buffer_count': buffer_count,
-                   'model_shapes': self.model_shapes}}
+        k = {'SD': {'model_shapes': self.model_shapes}}
         k['SD']['segment_model'] = self.model
         new_model_state = self.subject.fit_transition(
-            state.tostring(), [[[[0.5]]]] , [[1,0]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg,
+            state.tostring(), self.independent_var, self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg,
             self.model.to_json(), None, self.fit_params, False, 'dummy_previous_state', **k)
 
-        buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
-        self.assertEqual(3, buffer_count)
+        state = np.fromstring(new_model_state, dtype=np.float32)
+        image_count = state[2]
+        weights = np.rint(state[3:]).astype(np.int)
+        self.assertEqual(ending_image_count, image_count)
+        # weights should not be modified yet
+        self.assertTrue((self.model_weights == weights).all())
         # set_session must get called ONLY once, when its the first buffer
         self.assertEqual(0, self.subject.K.set_session.call_count)
-        # Clear session and sess.close must not get called for the first buffer
+        # Clear session and sess.close must not get called for the middle buffer
         self.assertEqual(0, self.subject.clear_keras_session.call_count)
-        self.assertEqual(3, k['SD']['buffer_count'])
 
-    def test_fit_transition_last_buffer_pass_gpdb(self):
+    def test_fit_transition_last_buffer_pass_pg(self):
         #TODO should we mock tensorflow's close_session and keras'
         # clear_session instead of mocking the function `clear_keras_session`
         self.subject.K.set_session = Mock()
         self.subject.clear_keras_session = Mock()
-        self.subject.is_platform_pg = Mock(return_value = False)
+        self.subject.is_platform_pg = Mock(return_value = True)
 
-        buffer_count = 2
+        starting_image_count = 2*len(self.dependent_var)
+        ending_image_count = starting_image_count + len(self.dependent_var)
 
-        state = [self.loss, self.accuracy, buffer_count]
+        state = [self.loss, self.accuracy, starting_image_count]
         state.extend(self.model_weights)
         state = np.array(state, dtype=np.float32)
 
+        multiplied_weights = [ 1.0*self.total_images_per_seg[0]*w for w in self.model_weights]
+        multiplied_weights = np.rint(multiplied_weights).astype(np.int)
+
         self.subject.compile_and_set_weights(self.model, self.compile_params,
                                              '/cpu:0', state.tostring(), self.model_shapes)
-        k = {'SD': {'buffer_count': buffer_count,
-                   'model_shapes': self.model_shapes}}
+        k = {'SD': { 'model_shapes': self.model_shapes}}
         k['SD']['segment_model'] = self.model
         new_model_state = self.subject.fit_transition(
-            state.tostring(), [[[[0.5]]]] , [[1,0]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg,
+            state.tostring(), self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg,
             self.model.to_json(), None, self.fit_params, False, 'dummy_previous_state', **k)
 
-        buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
-        self.assertEqual(3, buffer_count)
-        # set_session must get called ONLY once, when its the first buffer
+        state = np.fromstring(new_model_state, dtype=np.float32)
+        image_count = state[2]
+        weights = np.rint(state[3:]).astype(np.int)
+        self.assertEqual(ending_image_count, image_count)
+        # weights should be multiplied by final image count
+        self.assertTrue((multiplied_weights == weights).all())
+        # set_session must be not be called in transition func for PG
         self.assertEqual(0, self.subject.K.set_session.call_count)
-        # Clear session and sess.close must not get called for the first buffer
-        self.assertEqual(1, self.subject.clear_keras_session.call_count)
-        self.assertEqual(3, k['SD']['buffer_count'])
+        # Clear session and sess.close must get called for the last buffer in gpdb,
+        #  but not in postgres
+        self.assertEqual(0, self.subject.clear_keras_session.call_count)
 
-    def test_fit_transition_middle_buffer_pass(self):
+    def test_fit_transition_last_buffer_pass_gpdb(self):
         #TODO should we mock tensorflow's close_session and keras'
         # clear_session instead of mocking the function `clear_keras_session`
         self.subject.K.set_session = Mock()
         self.subject.clear_keras_session = Mock()
+        self.subject.is_platform_pg = Mock(return_value = False)
 
-        buffer_count = 1
+        starting_image_count = 2*len(self.dependent_var)
+        ending_image_count = starting_image_count + len(self.dependent_var)
 
-        state = [self.loss, self.accuracy, buffer_count]
+        state = [self.loss, self.accuracy, starting_image_count]
         state.extend(self.model_weights)
         state = np.array(state, dtype=np.float32)
 
+        multiplied_weights = [ 1.0*self.total_images_per_seg[0]*w for w in self.model_weights]
+        multiplied_weights = np.rint(multiplied_weights).astype(np.int)
+
         self.subject.compile_and_set_weights(self.model, self.compile_params,
                                              '/cpu:0', state.tostring(), self.model_shapes)
-        k = {'SD': {'buffer_count': buffer_count,
-                   'model_shapes': self.model_shapes}}
+        k = {'SD': { 'model_shapes': self.model_shapes}}
         k['SD']['segment_model'] = self.model
         new_model_state = self.subject.fit_transition(
-            state.tostring(), [[[[0.5]]]] , [[0,1]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg,
+            state.tostring(), self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg,
             self.model.to_json(), None, self.fit_params, False, 'dummy_previous_state', **k)
 
-        buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
-        self.assertEqual(2, buffer_count)
+        state = np.fromstring(new_model_state, dtype=np.float32)
+        image_count = state[2]
+        weights = np.rint(state[3:]).astype(np.int)
+        self.assertEqual(ending_image_count, image_count)
+        # weights should be multiplied by final image count
+        self.assertTrue((multiplied_weights == weights).all())
         # set_session must get called ONLY once, when its the first buffer
         self.assertEqual(0, self.subject.K.set_session.call_count)
-        # Clear session and sess.close must not get called for the first buffer
-        self.assertEqual(0, self.subject.clear_keras_session.call_count)
-        self.assertEqual(2, k['SD']['buffer_count'])
+        # Clear session and sess.close must get called for the last buffer in gpdb,
+        #  but not in postgres
+        self.assertEqual(1, self.subject.clear_keras_session.call_count)
+
+    def test_fit_transition_ending_image_count_zero(self):
+        self.subject.K.set_session = Mock()
+        self.subject.clear_keras_session = Mock()
+        starting_image_count = 0
+        previous_state = [self.loss, self.accuracy, starting_image_count]
+        previous_state.extend(self.model_weights)
+        previous_state = np.array(previous_state, dtype=np.float32)
+
+        k = {'SD' : {}}
+
+        total_images_per_seg = [0,1,1]
+
+        with self.assertRaises(plpy.PLPYException):
+            new_model_state = self.subject.fit_transition(
+            None, self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, total_images_per_seg,
+            self.model.to_json(), self.compile_params, self.fit_params, False,
+            previous_state.tostring(), **k)
+
+    def test_fit_transition_too_many_images(self):
+        self.subject.K.set_session = Mock()
+        self.subject.clear_keras_session = Mock()
+        starting_image_count = 0
+        previous_state = [self.loss, self.accuracy, starting_image_count]
+        previous_state.extend(self.model_weights)
+        previous_state = np.array(previous_state, dtype=np.float32)
+
+        k = {'SD' : {}}
+
+        total_images_per_seg = [1,1,1]
+
+        with self.assertRaises(plpy.PLPYException):
+            new_model_state = self.subject.fit_transition(
+            None, self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, total_images_per_seg,
+            self.model.to_json(), self.compile_params, self.fit_params, False,
+            previous_state.tostring(), **k)
+ 
+
+    def test_fit_merge(self):
+        image_count = self.total_images_per_seg[0]
+        state1 = [3.0*self.loss, 3.0*self.accuracy, image_count]
+        state1.extend(mult(3,self.model_weights))
+        state1 = np.array(state1, dtype=np.float32)
+        state2 = [2.0*self.loss, 2.0*self.accuracy, image_count+30]
+        state2.extend(mult(2,self.model_weights))
+        state2 = np.array(state2, dtype=np.float32)
+        merged_state = self.subject.fit_merge(state1.tostring(),state2.tostring())
+        state = np.fromstring(merged_state, dtype=np.float32)
+        agg_loss = state[0]
+        agg_accuracy = state[1]
+        image_count_total = state[2]
+        weights = np.rint(state[3:]).astype(np.int)
+       
+        self.assertEqual( 2*image_count+30 , image_count_total )
+        self.assertAlmostEqual( 5.0*self.loss, agg_loss, 2)
+        self.assertAlmostEqual( 5.0*self.accuracy, agg_accuracy, 2)
+        self.assertTrue( (mult(5,self.model_weights) == weights).all())
+
+    def test_fit_merge_none_first(self):
+        image_count = self.total_images_per_seg[0]
+        input_state = [self.loss, self.accuracy, image_count]
+        input_state.extend(self.model_weights)
+        input_state = np.array(input_state, dtype=np.float32)
+        merged_state = self.subject.fit_merge(None, input_state.tostring())
+        state = np.fromstring(merged_state, dtype=np.float32)
+        agg_loss = state[0]
+        agg_accuracy = state[1]
+        image_count_total = state[2]
+        weights = np.rint(state[3:]).astype(np.int)
+       
+        self.assertEqual(image_count, image_count_total)
+        self.assertAlmostEqual(self.loss, agg_loss, 2)
+        self.assertAlmostEqual(self.accuracy, agg_accuracy, 2)
+        self.assertTrue((self.model_weights == weights).all())
+
+    def test_fit_merge_none_second(self):
+        image_count = self.total_images_per_seg[0]
+        input_state = [self.loss, self.accuracy, image_count]
+        input_state.extend(self.model_weights)
+        input_state = np.array(input_state, dtype=np.float32)
+        merged_state = self.subject.fit_merge(input_state.tostring(), None)
+        state = np.fromstring(merged_state, dtype=np.float32)
+        agg_loss = state[0]
+        agg_accuracy = state[1]
+        image_count_total = state[2]
+        weights = np.rint(state[3:]).astype(np.int)
+       
+        self.assertEqual(image_count, image_count_total)
+        self.assertAlmostEqual(self.loss, agg_loss, 2)
+        self.assertAlmostEqual(self.accuracy, agg_accuracy, 2)
+        self.assertTrue((self.model_weights == weights).all())
+
+    def test_fit_merge_both_none(self):
+        result = self.subject.fit_merge(None,None)
+        self.assertEqual(None, result)
+
+    def test_fit_final(self):
+        image_count = self.total_images_per_seg[0]
+        input_state = [image_count*self.loss, image_count*self.accuracy, image_count]
+        input_state.extend(mult(image_count,self.model_weights))
+        input_state = np.array(input_state, dtype=np.float32)
+        
+        output_state = self.subject.fit_final(input_state.tostring())
+        output_state = np.fromstring(output_state, dtype=np.float32)
+        agg_loss = output_state[0]
+        agg_accuracy = output_state[1]
+        image_count_output = output_state[2]
+        weights = np.rint(output_state[3:]).astype(np.int)
+
+        self.assertEqual(image_count, image_count_output)
+        self.assertAlmostEqual(self.loss, agg_loss,2)
+        self.assertAlmostEqual(self.accuracy, agg_accuracy,2)
+        self.assertTrue((self.model_weights == weights).all())
+
+    def fit_final_none(self):
+        result = self.subject.fit_final(None)
+        self.assertEqual(result, None)
 
     def test_get_device_name_and_set_cuda_env(self):
         import os
@@ -305,19 +458,6 @@ class MadlibKerasValidatorTestCase(unittest.TestCase):
         import madlib_keras_validator
         self.subject = madlib_keras_validator
 
-        self.model = Sequential()
-        self.model.add(Conv2D(2, kernel_size=(1, 1), activation='relu',
-                              input_shape=(1,1,1,), padding='same'))
-        self.model.add(Flatten())
-
-        self.compile_params = "'optimizer'=SGD(lr=0.01, decay=1e-6, nesterov=True), 'loss'='categorical_crossentropy', 'metrics'=['accuracy']"
-        self.fit_params = "'batch_size'=1, 'epochs'=1"
-        self.model_weights = [3,4,5,6]
-        self.loss = 1.3
-        self.accuracy = 0.34
-        self.all_seg_ids = [0,1,2]
-        self.total_buffers_per_seg = [3,3,3]
-
     def tearDown(self):
         self.module_patcher.stop()
 
@@ -359,19 +499,6 @@ class MadlibSerializerTestCase(unittest.TestCase):
         import madlib_keras_serializer
         self.subject = madlib_keras_serializer
 
-        self.model = Sequential()
-        self.model.add(Conv2D(2, kernel_size=(1, 1), activation='relu',
-                              input_shape=(1,1,1,), padding='same'))
-        self.model.add(Flatten())
-
-        self.compile_params = "'optimizer'=SGD(lr=0.01, decay=1e-6, nesterov=True), 'loss'='categorical_crossentropy', 'metrics'=['accuracy']"
-        self.fit_params = "'batch_size'=1, 'epochs'=1"
-        self.model_weights = [3,4,5,6]
-        self.loss = 1.3
-        self.accuracy = 0.34
-        self.all_seg_ids = [0,1,2]
-        self.total_buffers_per_seg = [3,3,3]
-
     def tearDown(self):
         self.module_patcher.stop()