You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by nj...@apache.org on 2019/04/29 19:39:47 UTC
[madlib] branch master updated (27ddd27 -> 56efedf)
This is an automated email from the ASF dual-hosted git repository.
njayaram pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git.
from 27ddd27 DL: Handle NULL value for optional pred_type param in predict
new 918ce80 DL: Update madlib_keras_fit code to weight by images instead of buffers.
new 94e5a6c DL: Add new unit tests and update existing one's
new 56efedf DL: Refactor computation of images per segment
The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../modules/deep_learning/madlib_keras.py_in | 156 +++++++----
.../modules/deep_learning/madlib_keras.sql_in | 2 +-
.../deep_learning/madlib_keras_serializer.py_in | 27 +-
.../modules/deep_learning/test/madlib_keras.sql_in | 7 +
.../test/unit_tests/test_madlib_keras.py_in | 287 +++++++++++++++------
5 files changed, 335 insertions(+), 144 deletions(-)
[madlib] 03/03: DL: Refactor computation of images per segment
Posted by nj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
njayaram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git
commit 56efedfe91f239d436c48c3d35fd327d1f5d999e
Author: Domino Valdano <dv...@pivotal.io>
AuthorDate: Fri Apr 26 16:39:35 2019 -0700
DL: Refactor computation of images per segment
JIRA: MADLIB-1310
Closes #378
---
.../modules/deep_learning/madlib_keras.py_in | 53 +++++++++++++++-------
1 file changed, 37 insertions(+), 16 deletions(-)
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
index 5bf215b..82d1069 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
@@ -89,25 +89,15 @@ def fit(schema_madlib, source_table, model, dependent_varname,
# about making the fit function easier to read and maintain.
if is_platform_pg():
set_keras_session(use_gpu)
- # Compute total images in dataset
- total_images_per_seg = plpy.execute(
- """ SELECT SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg
- FROM {1}
- """.format(dependent_varname, source_table))
- seg_ids_train = "[]::integer[]"
- gp_segment_id_col = -1
else:
- # Compute total images on each segment
- total_images_per_seg = plpy.execute(
- """ SELECT gp_segment_id, SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg
- FROM {1}
- GROUP BY gp_segment_id
- """.format(dependent_varname, source_table))
- seg_ids_train = [int(each_segment["gp_segment_id"])
- for each_segment in total_images_per_seg]
- gp_segment_id_col = 'gp_segment_id'
# Disable GPU on master for gpdb
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
+
+ # Compute total images on each segment
+ gp_segment_id_col,\
+ seg_ids_train,\
+ total_images_per_seg = get_images_per_seg(source_table, dependent_varname)
+
if validation_table:
seg_ids_val, rows_per_seg_val = get_rows_per_seg_from_db(validation_table)
@@ -296,6 +286,37 @@ def fit(schema_madlib, source_table, model, dependent_varname,
if is_platform_pg():
clear_keras_session()
+def get_images_per_seg(source_table, dependent_varname):
+ """
+ Compute total images in each segment, by querying source_table. For
+ postgres, this is just the total number of images in the db.
+ :param source_table:
+ :param dependent_var:
+ :return: Returns a string and two arrays
+ 1. The appropriate string to use for querying segment number
+ ("gp_segment_id" for gpdb or "-1" for postgres).
+ 1. An array containing all the segment numbers in ascending order
+ 1. An array containing the total images on each of the segments in the
+ segment array.
+ """
+ if is_platform_pg():
+ total_images_per_seg = plpy.execute(
+ """ SELECT SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg
+ FROM {1}
+ """.format(dependent_varname, source_table))
+ seg_ids_train = "[]::integer[]"
+ gp_segment_id_col = -1
+ else:
+ total_images_per_seg = plpy.execute(
+ """ SELECT gp_segment_id, SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg
+ FROM {1}
+ GROUP BY gp_segment_id
+ """.format(dependent_varname, source_table))
+ seg_ids_train = [int(each_segment["gp_segment_id"])
+ for each_segment in total_images_per_seg]
+ gp_segment_id_col = 'gp_segment_id'
+ return gp_segment_id_col, seg_ids_train, total_images_per_seg
+
def get_rows_per_seg_from_db(table_name):
"""
This function queries the given table and returns the total rows per segment.
[madlib] 02/03: DL: Add new unit tests and update existing one's
Posted by nj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
njayaram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git
commit 94e5a6ca8e7145d81111bb7b96ff2e6f241cc00a
Author: Domino Valdano <dv...@pivotal.io>
AuthorDate: Thu Apr 25 18:14:28 2019 -0700
DL: Add new unit tests and update existing one's
JIRA: MADLIB-1310
This commit also adds a commented SQL for creating validation data using
minibatch_preprocessor_dl with batchsize as 1 - ensuring when we fix
MADLIB-1326, we should have 2 rows as with 1 row, the fit_merge()
function was never being called. This will be a better end-to-end test.
Closes #378
Co-authored-by: Ekta Khanna <ek...@pivotal.io>
---
.../modules/deep_learning/test/madlib_keras.sql_in | 7 +
.../test/unit_tests/test_madlib_keras.py_in | 287 +++++++++++++++------
2 files changed, 214 insertions(+), 80 deletions(-)
diff --git a/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in b/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in
index 81a088e..527d6e8 100644
--- a/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in
+++ b/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in
@@ -50,6 +50,13 @@ copy cifar_10_sample_batched from stdin delimiter '|';
0|{{0,1},{1,0}}|{{{{0.792157,0.8,0.780392},{0.792157,0.8,0.780392},{0.8,0.807843,0.788235},{0.807843,0.815686,0.796079},{0.815686,0.823529,0.803922},{0.819608,0.827451,0.807843},{0.823529,0.831373,0.811765},{0.831373,0.839216,0.823529},{0.835294,0.843137,0.831373},{0.843137,0.85098,0.839216},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.843137},{0.843137,0.85098,0.839216},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.839216},{0.85098,0.858824,0.839216 [...]
\.
+-- In order to test fit_merge, we need at least 2 rows in the batched table (1 on each segment).
+-- As part of supporting Postgres, an issue was reported JIRA MADLIB-1326.
+-- If we don't fix the bug, we should regenerate the batched table with this command
+-- (and paste it into the file). (If we do fix the bug, we can just uncomment this line,
+-- and remove the mocked output tables above.)
+-- SELECT minibatch_preprocessor_dl('cifar_10_sample','cifar_10_sample_batched','y','x', 1, 255);
+
DROP TABLE IF EXISTS cifar_10_sample_batched_summary;
CREATE TABLE cifar_10_sample_batched_summary(
source_table text,
diff --git a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
index 0c4072b..3d27e1b 100644
--- a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
@@ -33,6 +33,10 @@ import plpy_mock as plpy
m4_changequote(`<!', `!>')
+# helper for multiplying array by int
+def mult(k,arr):
+ return [ k*a for a in arr ]
+
class MadlibKerasFitTestCase(unittest.TestCase):
def setUp(self):
self.plpy_mock = Mock(spec='error')
@@ -60,10 +64,15 @@ class MadlibKerasFitTestCase(unittest.TestCase):
for a in self.model.get_weights():
self.model_shapes.append(a.shape)
- self.loss = 1.3
- self.accuracy = 0.34
+ self.loss = 13.0
+ self.accuracy = 3.4
self.all_seg_ids = [0,1,2]
- self.total_buffers_per_seg = [3,3,3]
+
+ self.independent_var = [[[[0.5]]]] * 10
+ self.dependent_var = [[0,1]] * 10
+ # We test on segment 0, which has 3 buffers filled with 10 identical
+ # images each, or 30 images total
+ self.total_images_per_seg = [3*len(self.dependent_var),20,40]
def tearDown(self):
self.module_patcher.stop()
@@ -76,23 +85,28 @@ class MadlibKerasFitTestCase(unittest.TestCase):
self.subject.K.set_session = Mock()
self.subject.clear_keras_session = Mock()
self.subject.is_platform_pg = Mock(return_value = True)
- buffer_count = 0
- previous_state = [self.loss, self.accuracy, buffer_count]
+ starting_image_count = 0
+ ending_image_count = len(self.dependent_var)
+ previous_state = [self.loss, self.accuracy, starting_image_count]
previous_state.extend(self.model_weights)
previous_state = np.array(previous_state, dtype=np.float32)
- k = {'SD': {'buffer_count': buffer_count}}
+ k = {'SD' : {}}
+
new_model_state = self.subject.fit_transition(
- None, [[[[0.5]]]] , [[0,1]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg,
+ None, self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg,
self.model.to_json(), self.compile_params, self.fit_params, False,
previous_state.tostring(), **k)
- buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
- self.assertEqual(1, buffer_count)
- # set_session must get called ONLY once, when its the first buffer
+ state = np.fromstring(new_model_state, dtype=np.float32)
+ image_count = state[2]
+ weights = np.rint(state[3:]).astype(np.int)
+ self.assertEqual(ending_image_count, image_count)
+ # weights should not be modified yet
+ self.assertTrue((self.model_weights == weights).all())
+ # set_session must be not be called in transition func for PG
self.assertEqual(0, self.subject.K.set_session.call_count)
# Clear session and sess.close must not get called for the first buffer
self.assertEqual(0, self.subject.clear_keras_session.call_count)
- self.assertEqual(1, k['SD']['buffer_count'])
self.assertTrue(k['SD']['segment_model'])
self.assertTrue(k['SD']['model_shapes'])
@@ -104,115 +118,254 @@ class MadlibKerasFitTestCase(unittest.TestCase):
self.subject.K.set_session = Mock()
self.subject.clear_keras_session = Mock()
self.subject.is_platform_pg = Mock(return_value = False)
- buffer_count = 0
- previous_state = [self.loss, self.accuracy, buffer_count]
+ starting_image_count = 0
+ ending_image_count = len(self.dependent_var)
+ previous_state = [self.loss, self.accuracy, starting_image_count]
previous_state.extend(self.model_weights)
previous_state = np.array(previous_state, dtype=np.float32)
- k = {'SD': {'buffer_count': buffer_count}}
+ k = {'SD' : {}}
+
new_model_state = self.subject.fit_transition(
- None, [[[[0.5]]]] , [[0,1]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg,
+ None, self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg,
self.model.to_json(), self.compile_params, self.fit_params, False,
previous_state.tostring(), **k)
- buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
- self.assertEqual(1, buffer_count)
+ state = np.fromstring(new_model_state, dtype=np.float32)
+ image_count = state[2]
+ weights = np.rint(state[3:]).astype(np.int)
+ self.assertEqual(ending_image_count, image_count)
+ # weights should not be modified yet
+ self.assertTrue((self.model_weights == weights).all())
# set_session must get called ONLY once, when its the first buffer
self.assertEqual(1, self.subject.K.set_session.call_count)
# Clear session and sess.close must not get called for the first buffer
self.assertEqual(0, self.subject.clear_keras_session.call_count)
- self.assertEqual(1, k['SD']['buffer_count'])
self.assertTrue(k['SD']['segment_model'])
self.assertTrue(k['SD']['model_shapes'])
-
- def test_fit_transition_last_buffer_pass_pg(self):
+ def test_fit_transition_middle_buffer_pass(self):
#TODO should we mock tensorflow's close_session and keras'
# clear_session instead of mocking the function `clear_keras_session`
self.subject.K.set_session = Mock()
self.subject.clear_keras_session = Mock()
- self.subject.is_platform_pg = Mock(return_value = True)
+ self.subject.is_platform_pg = Mock(return_value = False)
- buffer_count = 2
+ starting_image_count = len(self.dependent_var)
+ ending_image_count = starting_image_count + len(self.dependent_var)
- state = [self.loss, self.accuracy, buffer_count]
+ state = [self.loss, self.accuracy, starting_image_count]
state.extend(self.model_weights)
state = np.array(state, dtype=np.float32)
self.subject.compile_and_set_weights(self.model, self.compile_params,
'/cpu:0', state.tostring(), self.model_shapes)
- k = {'SD': {'buffer_count': buffer_count,
- 'model_shapes': self.model_shapes}}
+ k = {'SD': {'model_shapes': self.model_shapes}}
k['SD']['segment_model'] = self.model
new_model_state = self.subject.fit_transition(
- state.tostring(), [[[[0.5]]]] , [[1,0]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg,
+ state.tostring(), self.independent_var, self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg,
self.model.to_json(), None, self.fit_params, False, 'dummy_previous_state', **k)
- buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
- self.assertEqual(3, buffer_count)
+ state = np.fromstring(new_model_state, dtype=np.float32)
+ image_count = state[2]
+ weights = np.rint(state[3:]).astype(np.int)
+ self.assertEqual(ending_image_count, image_count)
+ # weights should not be modified yet
+ self.assertTrue((self.model_weights == weights).all())
# set_session must get called ONLY once, when its the first buffer
self.assertEqual(0, self.subject.K.set_session.call_count)
- # Clear session and sess.close must not get called for the first buffer
+ # Clear session and sess.close must not get called for the middle buffer
self.assertEqual(0, self.subject.clear_keras_session.call_count)
- self.assertEqual(3, k['SD']['buffer_count'])
- def test_fit_transition_last_buffer_pass_gpdb(self):
+ def test_fit_transition_last_buffer_pass_pg(self):
#TODO should we mock tensorflow's close_session and keras'
# clear_session instead of mocking the function `clear_keras_session`
self.subject.K.set_session = Mock()
self.subject.clear_keras_session = Mock()
- self.subject.is_platform_pg = Mock(return_value = False)
+ self.subject.is_platform_pg = Mock(return_value = True)
- buffer_count = 2
+ starting_image_count = 2*len(self.dependent_var)
+ ending_image_count = starting_image_count + len(self.dependent_var)
- state = [self.loss, self.accuracy, buffer_count]
+ state = [self.loss, self.accuracy, starting_image_count]
state.extend(self.model_weights)
state = np.array(state, dtype=np.float32)
+ multiplied_weights = [ 1.0*self.total_images_per_seg[0]*w for w in self.model_weights]
+ multiplied_weights = np.rint(multiplied_weights).astype(np.int)
+
self.subject.compile_and_set_weights(self.model, self.compile_params,
'/cpu:0', state.tostring(), self.model_shapes)
- k = {'SD': {'buffer_count': buffer_count,
- 'model_shapes': self.model_shapes}}
+ k = {'SD': { 'model_shapes': self.model_shapes}}
k['SD']['segment_model'] = self.model
new_model_state = self.subject.fit_transition(
- state.tostring(), [[[[0.5]]]] , [[1,0]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg,
+ state.tostring(), self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg,
self.model.to_json(), None, self.fit_params, False, 'dummy_previous_state', **k)
- buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
- self.assertEqual(3, buffer_count)
- # set_session must get called ONLY once, when its the first buffer
+ state = np.fromstring(new_model_state, dtype=np.float32)
+ image_count = state[2]
+ weights = np.rint(state[3:]).astype(np.int)
+ self.assertEqual(ending_image_count, image_count)
+ # weights should be multiplied by final image count
+ self.assertTrue((multiplied_weights == weights).all())
+ # set_session must be not be called in transition func for PG
self.assertEqual(0, self.subject.K.set_session.call_count)
- # Clear session and sess.close must not get called for the first buffer
- self.assertEqual(1, self.subject.clear_keras_session.call_count)
- self.assertEqual(3, k['SD']['buffer_count'])
+ # Clear session and sess.close must get called for the last buffer in gpdb,
+ # but not in postgres
+ self.assertEqual(0, self.subject.clear_keras_session.call_count)
- def test_fit_transition_middle_buffer_pass(self):
+ def test_fit_transition_last_buffer_pass_gpdb(self):
#TODO should we mock tensorflow's close_session and keras'
# clear_session instead of mocking the function `clear_keras_session`
self.subject.K.set_session = Mock()
self.subject.clear_keras_session = Mock()
+ self.subject.is_platform_pg = Mock(return_value = False)
- buffer_count = 1
+ starting_image_count = 2*len(self.dependent_var)
+ ending_image_count = starting_image_count + len(self.dependent_var)
- state = [self.loss, self.accuracy, buffer_count]
+ state = [self.loss, self.accuracy, starting_image_count]
state.extend(self.model_weights)
state = np.array(state, dtype=np.float32)
+ multiplied_weights = [ 1.0*self.total_images_per_seg[0]*w for w in self.model_weights]
+ multiplied_weights = np.rint(multiplied_weights).astype(np.int)
+
self.subject.compile_and_set_weights(self.model, self.compile_params,
'/cpu:0', state.tostring(), self.model_shapes)
- k = {'SD': {'buffer_count': buffer_count,
- 'model_shapes': self.model_shapes}}
+ k = {'SD': { 'model_shapes': self.model_shapes}}
k['SD']['segment_model'] = self.model
new_model_state = self.subject.fit_transition(
- state.tostring(), [[[[0.5]]]] , [[0,1]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg,
+ state.tostring(), self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg,
self.model.to_json(), None, self.fit_params, False, 'dummy_previous_state', **k)
- buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
- self.assertEqual(2, buffer_count)
+ state = np.fromstring(new_model_state, dtype=np.float32)
+ image_count = state[2]
+ weights = np.rint(state[3:]).astype(np.int)
+ self.assertEqual(ending_image_count, image_count)
+ # weights should be multiplied by final image count
+ self.assertTrue((multiplied_weights == weights).all())
# set_session must get called ONLY once, when its the first buffer
self.assertEqual(0, self.subject.K.set_session.call_count)
- # Clear session and sess.close must not get called for the first buffer
- self.assertEqual(0, self.subject.clear_keras_session.call_count)
- self.assertEqual(2, k['SD']['buffer_count'])
+ # Clear session and sess.close must get called for the last buffer in gpdb,
+ # but not in postgres
+ self.assertEqual(1, self.subject.clear_keras_session.call_count)
+
+ def test_fit_transition_ending_image_count_zero(self):
+ self.subject.K.set_session = Mock()
+ self.subject.clear_keras_session = Mock()
+ starting_image_count = 0
+ previous_state = [self.loss, self.accuracy, starting_image_count]
+ previous_state.extend(self.model_weights)
+ previous_state = np.array(previous_state, dtype=np.float32)
+
+ k = {'SD' : {}}
+
+ total_images_per_seg = [0,1,1]
+
+ with self.assertRaises(plpy.PLPYException):
+ new_model_state = self.subject.fit_transition(
+ None, self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, total_images_per_seg,
+ self.model.to_json(), self.compile_params, self.fit_params, False,
+ previous_state.tostring(), **k)
+
+ def test_fit_transition_too_many_images(self):
+ self.subject.K.set_session = Mock()
+ self.subject.clear_keras_session = Mock()
+ starting_image_count = 0
+ previous_state = [self.loss, self.accuracy, starting_image_count]
+ previous_state.extend(self.model_weights)
+ previous_state = np.array(previous_state, dtype=np.float32)
+
+ k = {'SD' : {}}
+
+ total_images_per_seg = [1,1,1]
+
+ with self.assertRaises(plpy.PLPYException):
+ new_model_state = self.subject.fit_transition(
+ None, self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, total_images_per_seg,
+ self.model.to_json(), self.compile_params, self.fit_params, False,
+ previous_state.tostring(), **k)
+
+
+ def test_fit_merge(self):
+ image_count = self.total_images_per_seg[0]
+ state1 = [3.0*self.loss, 3.0*self.accuracy, image_count]
+ state1.extend(mult(3,self.model_weights))
+ state1 = np.array(state1, dtype=np.float32)
+ state2 = [2.0*self.loss, 2.0*self.accuracy, image_count+30]
+ state2.extend(mult(2,self.model_weights))
+ state2 = np.array(state2, dtype=np.float32)
+ merged_state = self.subject.fit_merge(state1.tostring(),state2.tostring())
+ state = np.fromstring(merged_state, dtype=np.float32)
+ agg_loss = state[0]
+ agg_accuracy = state[1]
+ image_count_total = state[2]
+ weights = np.rint(state[3:]).astype(np.int)
+
+ self.assertEqual( 2*image_count+30 , image_count_total )
+ self.assertAlmostEqual( 5.0*self.loss, agg_loss, 2)
+ self.assertAlmostEqual( 5.0*self.accuracy, agg_accuracy, 2)
+ self.assertTrue( (mult(5,self.model_weights) == weights).all())
+
+ def test_fit_merge_none_first(self):
+ image_count = self.total_images_per_seg[0]
+ input_state = [self.loss, self.accuracy, image_count]
+ input_state.extend(self.model_weights)
+ input_state = np.array(input_state, dtype=np.float32)
+ merged_state = self.subject.fit_merge(None, input_state.tostring())
+ state = np.fromstring(merged_state, dtype=np.float32)
+ agg_loss = state[0]
+ agg_accuracy = state[1]
+ image_count_total = state[2]
+ weights = np.rint(state[3:]).astype(np.int)
+
+ self.assertEqual(image_count, image_count_total)
+ self.assertAlmostEqual(self.loss, agg_loss, 2)
+ self.assertAlmostEqual(self.accuracy, agg_accuracy, 2)
+ self.assertTrue((self.model_weights == weights).all())
+
+ def test_fit_merge_none_second(self):
+ image_count = self.total_images_per_seg[0]
+ input_state = [self.loss, self.accuracy, image_count]
+ input_state.extend(self.model_weights)
+ input_state = np.array(input_state, dtype=np.float32)
+ merged_state = self.subject.fit_merge(input_state.tostring(), None)
+ state = np.fromstring(merged_state, dtype=np.float32)
+ agg_loss = state[0]
+ agg_accuracy = state[1]
+ image_count_total = state[2]
+ weights = np.rint(state[3:]).astype(np.int)
+
+ self.assertEqual(image_count, image_count_total)
+ self.assertAlmostEqual(self.loss, agg_loss, 2)
+ self.assertAlmostEqual(self.accuracy, agg_accuracy, 2)
+ self.assertTrue((self.model_weights == weights).all())
+
+ def test_fit_merge_both_none(self):
+ result = self.subject.fit_merge(None,None)
+ self.assertEqual(None, result)
+
+ def test_fit_final(self):
+ image_count = self.total_images_per_seg[0]
+ input_state = [image_count*self.loss, image_count*self.accuracy, image_count]
+ input_state.extend(mult(image_count,self.model_weights))
+ input_state = np.array(input_state, dtype=np.float32)
+
+ output_state = self.subject.fit_final(input_state.tostring())
+ output_state = np.fromstring(output_state, dtype=np.float32)
+ agg_loss = output_state[0]
+ agg_accuracy = output_state[1]
+ image_count_output = output_state[2]
+ weights = np.rint(output_state[3:]).astype(np.int)
+
+ self.assertEqual(image_count, image_count_output)
+ self.assertAlmostEqual(self.loss, agg_loss,2)
+ self.assertAlmostEqual(self.accuracy, agg_accuracy,2)
+ self.assertTrue((self.model_weights == weights).all())
+
+ def fit_final_none(self):
+ result = self.subject.fit_final(None)
+ self.assertEqual(result, None)
def test_get_device_name_and_set_cuda_env(self):
import os
@@ -305,19 +458,6 @@ class MadlibKerasValidatorTestCase(unittest.TestCase):
import madlib_keras_validator
self.subject = madlib_keras_validator
- self.model = Sequential()
- self.model.add(Conv2D(2, kernel_size=(1, 1), activation='relu',
- input_shape=(1,1,1,), padding='same'))
- self.model.add(Flatten())
-
- self.compile_params = "'optimizer'=SGD(lr=0.01, decay=1e-6, nesterov=True), 'loss'='categorical_crossentropy', 'metrics'=['accuracy']"
- self.fit_params = "'batch_size'=1, 'epochs'=1"
- self.model_weights = [3,4,5,6]
- self.loss = 1.3
- self.accuracy = 0.34
- self.all_seg_ids = [0,1,2]
- self.total_buffers_per_seg = [3,3,3]
-
def tearDown(self):
self.module_patcher.stop()
@@ -359,19 +499,6 @@ class MadlibSerializerTestCase(unittest.TestCase):
import madlib_keras_serializer
self.subject = madlib_keras_serializer
- self.model = Sequential()
- self.model.add(Conv2D(2, kernel_size=(1, 1), activation='relu',
- input_shape=(1,1,1,), padding='same'))
- self.model.add(Flatten())
-
- self.compile_params = "'optimizer'=SGD(lr=0.01, decay=1e-6, nesterov=True), 'loss'='categorical_crossentropy', 'metrics'=['accuracy']"
- self.fit_params = "'batch_size'=1, 'epochs'=1"
- self.model_weights = [3,4,5,6]
- self.loss = 1.3
- self.accuracy = 0.34
- self.all_seg_ids = [0,1,2]
- self.total_buffers_per_seg = [3,3,3]
-
def tearDown(self):
self.module_patcher.stop()
[madlib] 01/03: DL: Update madlib_keras_fit code to weight by
images instead of buffers.
Posted by nj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
njayaram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git
commit 918ce80c0a4f10f23ab8c5e2acfc622b5ba71a2b
Author: Domino Valdano <dv...@pivotal.io>
AuthorDate: Thu Apr 25 18:14:20 2019 -0700
DL: Update madlib_keras_fit code to weight by images instead of buffers.
JIRA: MADLIB-1310
Prior to this commit, while merging the weights, accuracy and loss, we
assume that all the buffers are of the same size. This may not be always
the case. Ideally we should use the total no of images per segment to
average the weights, accuracy and loss.
This commit, updates the transition, merge and final function for
`madlib_keras_fit()` to use the number of images on a segment instead of
the buffer count for averaging weights, accuracy and loss.
Additionally, the function `deserialize_weights()` when deserializing
loss and accuracy from state, type-casted it to an int, which resulted
in computing lower loss and accuracy while training. This commit also
addresses this issue.
Closes #378
Co-authored-by: Ekta Khanna <ek...@pivotal.io>
---
.../modules/deep_learning/madlib_keras.py_in | 131 +++++++++++++--------
.../modules/deep_learning/madlib_keras.sql_in | 2 +-
.../deep_learning/madlib_keras_serializer.py_in | 27 ++---
3 files changed, 98 insertions(+), 62 deletions(-)
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
index bd12f55..5bf215b 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
@@ -49,11 +49,18 @@ from utilities.model_arch_info import get_num_classes
from utilities.utilities import is_platform_pg
from utilities.utilities import madlib_version
from utilities.validate_args import get_col_value_and_type
+from utilities.validate_args import quote_ident
def fit(schema_madlib, source_table, model, dependent_varname,
independent_varname, model_arch_table, model_arch_id, compile_params,
fit_params, num_iterations, use_gpu = True,
validation_table=None, name="", description="", **kwargs):
+
+ source_table = quote_ident(source_table)
+ dependent_varname = quote_ident(dependent_varname)
+ independent_varname = quote_ident(independent_varname)
+ model_arch_table = quote_ident(model_arch_table)
+
fit_validator = FitInputValidator(
source_table, validation_table, model, model_arch_table,
dependent_varname, independent_varname, num_iterations)
@@ -82,12 +89,25 @@ def fit(schema_madlib, source_table, model, dependent_varname,
# about making the fit function easier to read and maintain.
if is_platform_pg():
set_keras_session(use_gpu)
+ # Compute total images in dataset
+ total_images_per_seg = plpy.execute(
+ """ SELECT SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg
+ FROM {1}
+ """.format(dependent_varname, source_table))
+ seg_ids_train = "[]::integer[]"
gp_segment_id_col = -1
else:
+ # Compute total images on each segment
+ total_images_per_seg = plpy.execute(
+ """ SELECT gp_segment_id, SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg
+ FROM {1}
+ GROUP BY gp_segment_id
+ """.format(dependent_varname, source_table))
+ seg_ids_train = [int(each_segment["gp_segment_id"])
+ for each_segment in total_images_per_seg]
gp_segment_id_col = 'gp_segment_id'
# Disable GPU on master for gpdb
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
- seg_ids_train, rows_per_seg_train = get_rows_per_seg_from_db(source_table)
if validation_table:
seg_ids_val, rows_per_seg_val = get_rows_per_seg_from_db(validation_table)
@@ -110,6 +130,9 @@ def fit(schema_madlib, source_table, model, dependent_varname,
validation_set_provided = bool(validation_table)
validation_aggregate_accuracy = []; validation_aggregate_loss = []
+ total_images_per_seg = [int(each_segment["total_images_per_seg"])
+ for each_segment in total_images_per_seg]
+
# Prepare the SQL for running distributed training via UDA
compile_params_to_pass = "$madlib$" + compile_params + "$madlib$"
fit_params_to_pass = "$madlib$" + fit_params + "$madlib$"
@@ -120,7 +143,7 @@ def fit(schema_madlib, source_table, model, dependent_varname,
{gp_segment_id_col},
{num_classes}::INTEGER,
ARRAY{seg_ids_train},
- ARRAY{rows_per_seg_train},
+ ARRAY{total_images_per_seg},
$MAD${model_arch}$MAD$::TEXT,
{compile_params_to_pass}::TEXT,
{fit_params_to_pass}::TEXT,
@@ -305,7 +328,7 @@ def get_rows_per_seg_from_db(table_name):
def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
- all_seg_ids, total_buffers_per_seg, architecture,
+ all_seg_ids, total_images_per_seg, architecture,
compile_params, fit_params, use_gpu, previous_state,
**kwargs):
@@ -317,7 +340,7 @@ def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
:param current_seg_id:
:param num_classes:
:param all_seg_ids:
- :param total_buffers_per_seg:
+ :param total_images_per_seg:
:param architecture:
:param compile_params:
:param fit_params:
@@ -344,13 +367,14 @@ def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
compile_and_set_weights(segment_model, compile_params, device_name,
previous_state, SD['model_shapes'])
SD['segment_model'] = segment_model
- SD['buffer_count'] = 0
+ image_count = 0
agg_loss = 0
agg_accuracy = 0
+ agg_image_count = 0
else:
segment_model = SD['segment_model']
#TODO we don't need to deserialize the weights here.
- agg_loss, agg_accuracy, _, _ = madlib_keras_serializer.deserialize_weights(
+ agg_loss, agg_accuracy, agg_image_count, _ = madlib_keras_serializer.deserialize_weights(
state, SD['model_shapes'])
# Prepare the data
@@ -367,79 +391,92 @@ def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
accuracy = history.history['acc'][0]
end_fit = time.time()
- # Re-serialize the weights
- # Update buffer count, check if we are done
- SD['buffer_count'] += 1
- agg_loss += loss
- agg_accuracy += accuracy
+ image_count = len(x_train)
+ # Aggregating number of images, loss and accuracy
+ agg_image_count += image_count
+ agg_loss += (image_count * loss)
+ agg_accuracy += (image_count * accuracy)
with K.tf.device(device_name):
updated_weights = segment_model.get_weights()
if is_platform_pg():
- total_buffers_per_seg = total_buffers_per_seg[0]
+ total_images = total_images_per_seg[0]
else:
- total_buffers_per_seg = total_buffers_per_seg[all_seg_ids.index(current_seg_id)]
- if total_buffers_per_seg == 0:
+ total_images = total_images_per_seg[all_seg_ids.index(current_seg_id)]
+ if total_images == 0:
plpy.error('Got 0 rows. Expected at least 1.')
- if SD['buffer_count'] == total_buffers_per_seg:
- agg_loss /= total_buffers_per_seg
- agg_accuracy /= total_buffers_per_seg
+ # Re-serialize the weights
+ # Update image count, check if we are done
+ if agg_image_count == total_images:
+ if total_images == 0:
+ plpy.error('Total images is 0')
+ # Once done with all images on a segment, we update weights
+ # with the total number of images here instead of the merge function.
+ # The merge function only deals with aggregating them.
+ updated_weights = [ total_images * w for w in updated_weights ]
if not is_platform_pg():
# In GPDB, each segment would have a keras session, so clear
# them after the last buffer is processed.
clear_keras_session()
+ elif agg_image_count > total_images:
+ plpy.error('Processed {0} images, but there were supposed to be only {1}!'
+ .format(agg_image_count,total_images))
new_model_state = madlib_keras_serializer.serialize_weights(
- agg_loss, agg_accuracy, SD['buffer_count'], updated_weights)
+ agg_loss, agg_accuracy, agg_image_count, updated_weights)
+
del x_train
del y_train
end_transition = time.time()
- plpy.info("Processed buffer {0}: Fit took {1} sec, Total was {2} sec".format(
- SD['buffer_count'], end_fit - start_fit, end_transition - start_transition))
+ plpy.info("Processed {0} images: Fit took {1} sec, Total was {2} sec".format(
+ image_count, end_fit - start_fit, end_transition - start_transition))
return new_model_state
def fit_merge(state1, state2, **kwargs):
+
# Return if called early
if not state1 or not state2:
return state1 or state2
# Deserialize states
- loss1, accuracy1, buffer_count1, weights1 = madlib_keras_serializer.deserialize_weights_merge(state1)
- loss2, accuracy2, buffer_count2, weights2 = madlib_keras_serializer.deserialize_weights_merge(state2)
- # plpy.info('merge buffer loss1 {}, accuracy1 {}, buffer count1 {}'.format(loss1, accuracy1, buffer_count1))
- # plpy.info('merge buffer loss2 {}, accuracy2 {}, buffer count2 {}'.format(loss2, accuracy2, buffer_count2))
-
- # Compute total buffer counts
- # buffer_count1, buffer_count2 = state1[2], state2[2]
- total_buffers = (buffer_count1 + buffer_count2) * 1.0
- if total_buffers == 0:
- plpy.error('total buffers in merge is 0')
- merge_weight1 = buffer_count1 / total_buffers
- merge_weight2 = buffer_count2 / total_buffers
-
- # Average the losses
- # loss1, loss2 = state1[0], state2[0]
- avg_loss = merge_weight1*loss1 + merge_weight2*loss2
-
- # Average the accuracies
- # accuracy1, accuracy2 = state1[1], state2[1]
- avg_accuracy = merge_weight1*accuracy1 + merge_weight2*accuracy2
-
- # Average the weights
- # weights1, weights2 = state1[3:], state2[3:]
- avg_weights = merge_weight1*weights1 + merge_weight2*weights2
- # avg_weights = [(merge_weight1 * e1) + (merge_weight2 * e2) for e1, e2 in zip(weights1, weights2)]
+ loss1, accuracy1, image_count1, weights1 = madlib_keras_serializer.deserialize_weights_merge(state1)
+ loss2, accuracy2, image_count2, weights2 = madlib_keras_serializer.deserialize_weights_merge(state2)
+
+ # Compute total image counts
+ image_count = (image_count1 + image_count2) * 1.0
+ plpy.info("FIT_MERGE: Mergeing {0} + {1} = {2} images".format(image_count1,image_count2,image_count))
+ if image_count == 0:
+ plpy.error('total images in merge is 0')
+
+ # Aggregate the losses
+ total_loss = loss1 + loss2
+
+ # Aggregate the accuracies
+ total_accuracy = accuracy1 + accuracy2
+
+ # Aggregate the weights
+ total_weights = weights1 + weights2
# Return the merged state
return madlib_keras_serializer.serialize_weights_merge(
- avg_loss, avg_accuracy, total_buffers, avg_weights)
+ total_loss, total_accuracy, image_count, total_weights)
def fit_final(state, **kwargs):
- return state
+ # Return if called early
+ if not state:
+ return state
+
+ loss, accuracy, image_count, weights = madlib_keras_serializer.deserialize_weights_merge(state)
+ # Averaging the accuracy, loss and weights
+ loss /= image_count
+ accuracy /= image_count
+ weights /= image_count
+ return madlib_keras_serializer.serialize_weights_merge(
+ loss, accuracy, image_count, weights)
def evaluate1(schema_madlib, model_table, test_table, id_col, model_arch_table,
model_arch_id, dependent_varname, independent_varname,
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
index 543bbed..3c44205 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
@@ -105,7 +105,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.fit_transition(
current_seg_id INTEGER,
num_classes INTEGER,
all_seg_ids INTEGER[],
- total_buffers_per_seg INTEGER[],
+ total_images_per_seg INTEGER[],
architecture TEXT,
compile_params TEXT,
fit_params TEXT,
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_serializer.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_serializer.py_in
index d52ed16..09f3eb8 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_serializer.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_serializer.py_in
@@ -31,7 +31,7 @@ def get_model_shapes(model):
# np.array(np.array(loss, acc, buff_count).concatenate(weights_np_array)).tostring()
# Proposed logic
-# loss , accuracy and buffer_count can be comma separated values
+# loss , accuracy and image_count can be comma separated values
# weights -> np.array.tostring()
# combine these 2 into one string by a random splitter
# serialized string -> loss_splitter_acc_splitter_buffer_splitter_weights
@@ -40,11 +40,11 @@ def deserialize_weights(model_state, model_shapes):
"""
Parameters:
model_state: a stringified (serialized) state containing loss,
- accuracy, buffer_count, and model_weights, passed from postgres
+ accuracy, image_count, and model_weights, passed from postgres
model_shapes: a list of tuples containing the shapes of each element
in keras.get_weights()
Returns:
- buffer_count: the buffer count from state
+ image_count: the buffer count from state
model_weights: a list of numpy arrays that can be inputted into keras.set_weights()
"""
if not model_state or not model_shapes:
@@ -57,13 +57,13 @@ def deserialize_weights(model_state, model_shapes):
weight_arr_portion = model_weights_serialized[i:next_pointer]
model_weights.append(weight_arr_portion.reshape(model_shapes[j]))
i, j = next_pointer, j + 1
- return int(float(state[0])), int(float(state[1])), int(float(state[2])), model_weights
+ return float(state[0]), float(state[1]), int(float(state[2])), model_weights
-def serialize_weights(loss, accuracy, buffer_count, model_weights):
+def serialize_weights(loss, accuracy, image_count, model_weights):
"""
Parameters:
- loss, accuracy, buffer_count: float values
+ loss, accuracy, image_count: float values
model_weights: a list of numpy arrays, what you get from
keras.get_weights()
Returns:
@@ -74,12 +74,11 @@ def serialize_weights(loss, accuracy, buffer_count, model_weights):
return None
flattened_weights = [w.flatten() for w in model_weights]
model_weights_serialized = np.concatenate(flattened_weights)
- new_model_string = np.array([loss, accuracy, buffer_count])
+ new_model_string = np.array([loss, accuracy, image_count])
new_model_string = np.concatenate((new_model_string, model_weights_serialized))
new_model_string = np.float32(new_model_string)
return new_model_string.tostring()
-
def deserialize_iteration_state(iteration_result):
"""
Parameters:
@@ -90,7 +89,7 @@ def deserialize_iteration_state(iteration_result):
new_model_state: the stringified (serialized) state to pass in to next
iteration of step function training, represents the averaged weights
from the last iteration of training; zeros out loss, accuracy,
- buffer_count in this state because the new iteration must start with
+ image_count in this state because the new iteration must start with
fresh values
"""
if not iteration_result:
@@ -105,12 +104,12 @@ def deserialize_iteration_state(iteration_result):
def deserialize_weights_merge(state):
"""
Parameters:
- state: the stringified (serialized) state containing loss, accuracy, buffer_count, and
+ state: the stringified (serialized) state containing loss, accuracy, image_count, and
model_weights, passed from postgres to merge function
Returns:
loss: the averaged loss from that iteration of training
accuracy: the averaged accuracy from that iteration of training
- buffer_count: total buffer counts processed
+ image_count: total buffer counts processed
model_weights: a single flattened numpy array containing all of the
weights, flattened because all we have to do is average them (so don't
have to reshape)
@@ -121,10 +120,10 @@ def deserialize_weights_merge(state):
return float(state[0]), float(state[1]), int(float(state[2])), state[3:]
-def serialize_weights_merge(loss, accuracy, buffer_count, model_weights):
+def serialize_weights_merge(loss, accuracy, image_count, model_weights):
"""
Parameters:
- loss, accuracy, buffer_count: float values
+ loss, accuracy, image_count: float values
model_weights: a single flattened numpy array containing all of the
weights, averaged in merge function over the 2 states
Returns:
@@ -133,7 +132,7 @@ def serialize_weights_merge(loss, accuracy, buffer_count, model_weights):
"""
if model_weights is None:
return None
- new_model_string = np.array([loss, accuracy, buffer_count])
+ new_model_string = np.array([loss, accuracy, image_count])
new_model_string = np.concatenate((new_model_string, model_weights))
new_model_string = np.float32(new_model_string)
return new_model_string.tostring()