You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by kh...@apache.org on 2020/10/27 20:18:10 UTC
[madlib] 05/08: DL: [AutoML] Split automl methods to their own files
This is an automated email from the ASF dual-hosted git repository.
khannaekta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git
commit c849dd08a16ee03114dbe75ee65781dd7264c925
Author: Ekta Khanna <ek...@vmware.com>
AuthorDate: Thu Oct 15 17:42:11 2020 -0700
DL: [AutoML] Split automl methods to their own files
JIRA: MADLIB-1453
This commit also set plan_cache_mode when calling fit multiple model
from the automl methods.
Co-authored-by: Nikhil Kak <nk...@vmware.com>
---
.../deep_learning/madlib_keras_automl.py_in | 831 +--------------------
.../deep_learning/madlib_keras_automl.sql_in | 20 +-
.../madlib_keras_automl_hyperband.py_in | 419 +++++++++++
.../madlib_keras_automl_hyperopt.py_in | 458 ++++++++++++
.../test/unit_tests/test_madlib_keras_automl.py_in | 16 +-
5 files changed, 907 insertions(+), 837 deletions(-)
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_automl.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_automl.py_in
index dc8c837..c795ee1 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_automl.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_automl.py_in
@@ -17,24 +17,13 @@
# specific language governing permissions and limitations
# under the License.
-from ast import literal_eval
-from datetime import datetime
-from hyperopt import hp, rand, tpe, atpe, Trials, STATUS_OK, STATUS_RUNNING
-from hyperopt.base import Domain
-import math
-import numpy as np
import plpy
-import time
from madlib_keras_validator import MstLoaderInputValidator
-# from utilities.admin import cleanup_madlib_temp_tables
from utilities.utilities import get_current_timestamp, get_seg_number, get_segments_per_host, \
- unique_string, add_postfix, extract_keyvalue_params, _assert, _assert_equal, rename_table
-from utilities.control import SetGUC
-from madlib_keras_fit_multiple_model import FitMultipleModel
-from madlib_keras_helper import generate_row_string
-from madlib_keras_helper import DISTRIBUTION_RULES_COLNAME
-from madlib_keras_model_selection import MstSearch, ModelSelectionSchema
+ unique_string, add_postfix, extract_keyvalue_params, _assert, _assert_equal, rename_table, \
+ is_platform_pg
+from madlib_keras_model_selection import ModelSelectionSchema
from keras_model_arch_table import ModelArchSchema
from utilities.validate_args import table_exists, drop_tables, input_tbl_valid
from utilities.validate_args import quote_ident
@@ -49,6 +38,7 @@ class AutoMLConstants:
R = 'R'
ETA = 'eta'
SKIP_LAST = 'skip_last'
+ HYPERBAND_PARAMS = [R, ETA, SKIP_LAST]
LOSS_METRIC = 'training_loss_final'
TEMP_MST_TABLE = unique_string('temp_mst_table')
TEMP_MST_SUMMARY_TABLE = add_postfix(TEMP_MST_TABLE, '_summary')
@@ -57,119 +47,11 @@ class AutoMLConstants:
NUM_CONFIGS = 'num_configs'
NUM_ITERS = 'num_iterations'
ALGORITHM = 'algorithm'
+ HYPEROPT_PARAMS = [NUM_CONFIGS, NUM_ITERS, ALGORITHM]
TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
INT_MAX = 2 ** 31 - 1
TARGET_SCHEMA = 'public'
-class HyperbandSchedule():
- """The utility class for loading a hyperband schedule table with algorithm inputs.
-
- Attributes:
- schedule_table (string): Name of output table containing hyperband schedule.
- R (int): Maximum number of resources (iterations) that can be allocated
- to a single configuration.
- eta (int): Controls the proportion of configurations discarded in
- each round of successive halving.
- skip_last (int): The number of last rounds to skip.
- """
- def __init__(self, schedule_table, R, eta=3, skip_last=0):
- self.schedule_table = schedule_table # table name to store hyperband schedule
- self.R = R # maximum iterations/epochs allocated to a configuration
- self.eta = eta # defines downsampling rate
- self.skip_last = skip_last
- self.module_name = 'hyperband_schedule'
- self.validate_inputs()
-
- # number of unique executions of Successive Halving (minus one)
- self.s_max = int(math.floor(math.log(self.R, self.eta)))
- self.validate_s_max()
-
- self.schedule_vals = []
-
- self.calculate_schedule()
-
- def load(self):
- """
- The entry point for loading the hyperband schedule table.
- """
- self.create_schedule_table()
- self.insert_into_schedule_table()
-
- def validate_inputs(self):
- """
- Validates user input values
- """
- _assert(self.eta > 1, "{0}: eta must be greater than 1".format(self.module_name))
- _assert(self.R >= self.eta, "{0}: R should not be less than eta".format(self.module_name))
-
- def validate_s_max(self):
- _assert(self.skip_last >= 0 and self.skip_last < self.s_max+1, "{0}: skip_last must be " +
- "non-negative and less than {1}".format(self.module_name,self.s_max))
-
- def calculate_schedule(self):
- """
- Calculates the hyperband schedule (number of configs and allocated resources)
- in each round of each bracket and skips the number of last rounds specified in 'skip_last'
- """
- for s in reversed(range(self.s_max+1)):
- n = int(math.ceil(int((self.s_max+1)/(s+1))*math.pow(self.eta, s))) # initial number of configurations
- r = self.R * math.pow(self.eta, -s)
-
- for i in range((s+1) - int(self.skip_last)):
- # Computing each of the
- n_i = n*math.pow(self.eta, -i)
- r_i = r*math.pow(self.eta, i)
-
- self.schedule_vals.append({AutoMLConstants.BRACKET: s,
- AutoMLConstants.ROUND: i,
- AutoMLConstants.CONFIGURATIONS: int(n_i),
- AutoMLConstants.RESOURCES: int(round(r_i))})
-
- def create_schedule_table(self):
- """Initializes the output schedule table"""
- create_query = """
- CREATE TABLE {self.schedule_table} (
- {s} INTEGER,
- {i} INTEGER,
- {n_i} INTEGER,
- {r_i} INTEGER,
- unique ({s}, {i})
- );
- """.format(self=self,
- s=AutoMLConstants.BRACKET,
- i=AutoMLConstants.ROUND,
- n_i=AutoMLConstants.CONFIGURATIONS,
- r_i=AutoMLConstants.RESOURCES)
- plpy.execute(create_query)
-
- def insert_into_schedule_table(self):
- """Insert everything in self.schedule_vals into the output schedule table."""
- for sd in self.schedule_vals:
- sd_s = sd[AutoMLConstants.BRACKET]
- sd_i = sd[AutoMLConstants.ROUND]
- sd_n_i = sd[AutoMLConstants.CONFIGURATIONS]
- sd_r_i = sd[AutoMLConstants.RESOURCES]
- insert_query = """
- INSERT INTO
- {self.schedule_table}(
- {s_col},
- {i_col},
- {n_i_col},
- {r_i_col}
- )
- VALUES (
- {sd_s},
- {sd_i},
- {sd_n_i},
- {sd_r_i}
- )
- """.format(s_col=AutoMLConstants.BRACKET,
- i_col=AutoMLConstants.ROUND,
- n_i_col=AutoMLConstants.CONFIGURATIONS,
- r_i_col=AutoMLConstants.RESOURCES,
- **locals())
- plpy.execute(insert_query)
-
class KerasAutoML(object):
"""
The core AutoML class for running AutoML algorithms such as Hyperband and Hyperopt.
@@ -179,6 +61,9 @@ class KerasAutoML(object):
automl_params=None, random_state=None, object_table=None,
use_gpus=False, validation_table=None, metrics_compute_frequency=None,
name=None, description=None, **kwargs):
+ if is_platform_pg():
+ plpy.error(
+ "DL: AutoML is not supported on PostgreSQL.")
self.schema_madlib = schema_madlib
self.source_table = source_table
self.model_output_table = model_output_table
@@ -381,704 +266,8 @@ class KerasAutoML(object):
Remove all intermediate tables created for AutoML runs/updates.
:param model_training: Fit Multiple function call object.
"""
+ if not model_training:
+ return
drop_tables([model_training.original_model_output_table, model_training.model_info_table,
model_training.model_summary_table, AutoMLConstants.TEMP_MST_TABLE,
AutoMLConstants.TEMP_MST_SUMMARY_TABLE])
-
-class AutoMLHyperband(KerasAutoML):
- """
- This class implements Hyperband, an infinite-arm bandit based algorithm that speeds up random search
- through adaptive resource allocation, successive halving (SHA), and early stopping.
-
- This class showcases a novel hyperband implementation by executing the hyperband rounds 'diagonally'
- to evaluate multiple configurations together and leverage the compute power of MPP databases such as Greenplum.
-
- This automl method inherits qualities from the automl class.
- """
- def __init__(self, schema_madlib, source_table, model_output_table, model_arch_table, model_selection_table,
- model_id_list, compile_params_grid, fit_params_grid, automl_method,
- automl_params, random_state=None, object_table=None,
- use_gpus=False, validation_table=None, metrics_compute_frequency=None,
- name=None, description=None, **kwargs):
- automl_method = automl_method if automl_method else AutoMLConstants.HYPERBAND
- automl_params = automl_params if automl_params else 'R=6, eta=3, skip_last=0'
- KerasAutoML.__init__(self, schema_madlib, source_table, model_output_table, model_arch_table,
- model_selection_table, model_id_list, compile_params_grid, fit_params_grid,
- automl_method, automl_params, random_state, object_table, use_gpus,
- validation_table, metrics_compute_frequency, name, description, **kwargs)
- self.validate_and_define_inputs()
- self.create_model_output_table()
- self.create_model_output_info_table()
- self.find_hyperband_config()
-
- def validate_and_define_inputs(self):
- automl_params_dict = extract_keyvalue_params(self.automl_params,
- lower_case_names=False)
- # casting dict values to int
- for i in automl_params_dict:
- automl_params_dict[i] = int(automl_params_dict[i])
- _assert(len(automl_params_dict) >= 1 and len(automl_params_dict) <= 3,
- "{0}: Only R, eta, and skip_last may be specified".format(self.module_name))
- for i in automl_params_dict:
- if i == AutoMLConstants.R:
- self.R = automl_params_dict[AutoMLConstants.R]
- elif i == AutoMLConstants.ETA:
- self.eta = automl_params_dict[AutoMLConstants.ETA]
- elif i == AutoMLConstants.SKIP_LAST:
- self.skip_last = automl_params_dict[AutoMLConstants.SKIP_LAST]
- else:
- plpy.error("{0}: {1} is an invalid automl param".format(self.module_name, i))
- _assert(self.eta > 1, "{0}: eta must be greater than 1".format(self.module_name))
- _assert(self.R >= self.eta, "{0}: R should not be less than eta".format(self.module_name))
- self.s_max = int(math.floor(math.log(self.R, self.eta)))
- _assert(self.skip_last >= 0 and self.skip_last < self.s_max+1, "{0}: skip_last must be " \
- "non-negative and less than {1}".format(self.module_name, self.s_max))
-
- def find_hyperband_config(self):
- """
- Executes the diagonal hyperband algorithm.
- """
- initial_vals = {}
-
- # get hyper parameter configs for each s
- for s in reversed(range(self.s_max+1)):
- n = int(math.ceil(int((self.s_max+1)/(s+1))*math.pow(self.eta, s))) # initial number of configurations
- r = self.R * math.pow(self.eta, -s) # initial number of iterations to run configurations for
- initial_vals[s] = (n, int(round(r)))
- self.start_training_time = self.get_current_timestamp()
- self.start_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT)
- random_search = MstSearch(self.schema_madlib,
- self.model_arch_table,
- self.model_selection_table,
- self.model_id_list,
- self.compile_params_grid,
- self.fit_params_grid,
- 'random',
- sum([initial_vals[k][0] for k in initial_vals][self.skip_last:]),
- self.random_state,
- self.object_table)
- random_search.load() # for populating mst tables
-
- # for creating the summary table for usage in fit multiple
- plpy.execute("CREATE TABLE {AutoMLSchema.TEMP_MST_SUMMARY_TABLE} AS " \
- "SELECT * FROM {random_search.model_selection_summary_table}".format(AutoMLSchema=AutoMLConstants,
- random_search=random_search))
- ranges_dict = self.mst_key_ranges_dict(initial_vals)
- # to store the bracket and round numbers
- s_dict, i_dict = {}, {}
- for key, val in ranges_dict.items():
- for mst_key in range(val[0], val[1]+1):
- s_dict[mst_key] = key
- i_dict[mst_key] = -1
-
- # outer loop on diagonal
- for i in range((self.s_max+1) - int(self.skip_last)):
- # inner loop on s desc
- temp_lst = []
- configs_prune_lookup = {}
- for s in range(self.s_max, self.s_max-i-1, -1):
- n = initial_vals[s][0]
- n_i = n * math.pow(self.eta, -i+self.s_max-s)
- configs_prune_lookup[s] = int(round(n_i))
- temp_lst.append("{0} configs under bracket={1} & round={2}".format(int(n_i), s, s-self.s_max+i))
- num_iterations = int(initial_vals[self.s_max-i][1])
- plpy.info('*** Diagonally evaluating ' + ', '.join(temp_lst) + ' with {0} iterations ***'.format(
- num_iterations))
-
- self.reconstruct_temp_mst_table(i, ranges_dict, configs_prune_lookup) # has keys to evaluate
- active_keys = plpy.execute("SELECT {ModelSelectionSchema.MST_KEY} " \
- "FROM {AutoMLSchema.TEMP_MST_TABLE}".format(AutoMLSchema=AutoMLConstants,
- ModelSelectionSchema=ModelSelectionSchema))
- for k in active_keys:
- i_dict[k[ModelSelectionSchema.MST_KEY]] += 1
- self.warm_start = int(i != 0)
- mcf = self.metrics_compute_frequency if self._is_valid_metrics_compute_frequency(num_iterations) else None
- with SetGUC("plan_cache_mode", "force_generic_plan"):
- model_training = FitMultipleModel(self.schema_madlib, self.source_table, AutoMLSchema.TEMP_OUTPUT_TABLE,
- AutoMLSchema.TEMP_MST_TABLE, num_iterations, self.use_gpus,
- self.validation_table, mcf, self.warm_start, self.name, self.description)
- self.update_model_output_table(model_training)
- self.update_model_output_info_table(i, model_training, initial_vals)
-
- self.print_best_mst_so_far()
-
- self.end_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT)
- self.add_additional_info_cols(s_dict, i_dict)
- self.update_model_selection_table()
- self.generate_model_output_summary_table(model_training)
- self.remove_temp_tables(model_training)
- # cleanup_madlib_temp_tables(self.schema_madlib, AutoMLSchema.TARGET_SCHEMA)
-
- def mst_key_ranges_dict(self, initial_vals):
- """
- Extracts the ranges of model configs (using mst_keys) belonging to / sampled as part of
- executing a particular SHA bracket.
- """
- d = {}
- for s_val in sorted(initial_vals.keys(), reverse=True): # going from s_max to 0
- if s_val == self.s_max:
- d[s_val] = (1, initial_vals[s_val][0])
- else:
- d[s_val] = (d[s_val+1][1]+1, d[s_val+1][1]+initial_vals[s_val][0])
- return d
-
- def reconstruct_temp_mst_table(self, i, ranges_dict, configs_prune_lookup):
- """
- Drops and Reconstructs a temp mst table for evaluation along particular diagonals of hyperband.
- :param i: outer diagonal loop iteration.
- :param ranges_dict: model config ranges to group by bracket number.
- :param configs_prune_lookup: Lookup dictionary for configs to evaluate for a diagonal.
- :return:
- """
- if i == 0:
- _assert_equal(len(configs_prune_lookup), 1, "invalid args")
- lower_bound, upper_bound = ranges_dict[self.s_max]
- plpy.execute("CREATE TABLE {AutoMLSchema.TEMP_MST_TABLE} AS SELECT * FROM {self.model_selection_table} "
- "WHERE {ModelSelectionSchema.MST_KEY} >= {lower_bound} " \
- "AND {ModelSelectionSchema.MST_KEY} <= {upper_bound}".format(self=self,
- AutoMLSchema=AutoMLConstants,
- lower_bound=lower_bound,
- upper_bound=upper_bound,
- ModelSelectionSchema=ModelSelectionSchema))
- return
- # dropping and repopulating temp_mst_table
- drop_tables([AutoMLConstants.TEMP_MST_TABLE])
-
- # {mst_key} changed from SERIAL to INTEGER for safe insertions and preservation of mst_key values
- create_query = """
- CREATE TABLE {AutoMLSchema.TEMP_MST_TABLE} (
- {mst_key} INTEGER,
- {model_id} INTEGER,
- {compile_params} VARCHAR,
- {fit_params} VARCHAR,
- unique ({model_id}, {compile_params}, {fit_params})
- );
- """.format(AutoMLSchema=AutoMLConstants,
- mst_key=ModelSelectionSchema.MST_KEY,
- model_id=ModelSelectionSchema.MODEL_ID,
- compile_params=ModelSelectionSchema.COMPILE_PARAMS,
- fit_params=ModelSelectionSchema.FIT_PARAMS)
- plpy.execute(create_query)
-
- query = ""
- new_configs = True
- for s_val in configs_prune_lookup:
- lower_bound, upper_bound = ranges_dict[s_val]
- if new_configs:
- query += "INSERT INTO {AutoMLSchema.TEMP_MST_TABLE} SELECT {ModelSelectionSchema.MST_KEY}, " \
- "{ModelSelectionSchema.MODEL_ID}, {ModelSelectionSchema.COMPILE_PARAMS}, " \
- "{ModelSelectionSchema.FIT_PARAMS} FROM {self.model_selection_table} WHERE " \
- "{ModelSelectionSchema.MST_KEY} >= {lower_bound} AND {ModelSelectionSchema.MST_KEY} <= " \
- "{upper_bound};".format(self=self, AutoMLSchema=AutoMLConstants,
- ModelSelectionSchema=ModelSelectionSchema,
- lower_bound=lower_bound, upper_bound=upper_bound)
- new_configs = False
- else:
- query += "INSERT INTO {AutoMLSchema.TEMP_MST_TABLE} SELECT {ModelSelectionSchema.MST_KEY}, " \
- "{ModelSelectionSchema.MODEL_ID}, {ModelSelectionSchema.COMPILE_PARAMS}, " \
- "{ModelSelectionSchema.FIT_PARAMS} " \
- "FROM {self.model_info_table} WHERE {ModelSelectionSchema.MST_KEY} >= {lower_bound} " \
- "AND {ModelSelectionSchema.MST_KEY} <= {upper_bound} ORDER BY {AutoMLSchema.LOSS_METRIC} " \
- "LIMIT {configs_prune_lookup_val};".format(self=self, AutoMLSchema=AutoMLConstants,
- ModelSelectionSchema=ModelSelectionSchema,
- lower_bound=lower_bound, upper_bound=upper_bound,
- configs_prune_lookup_val=configs_prune_lookup[s_val])
- plpy.execute(query)
-
- def update_model_output_table(self, model_training):
- """
- Updates gathered information of a hyperband diagonal run to the overall model output table.
- :param model_training: Fit Multiple function call object.
- """
- # updates model weights for any previously trained configs
- plpy.execute("UPDATE {self.model_output_table} a SET model_weights=" \
- "t.model_weights FROM {model_training.original_model_output_table} t " \
- "WHERE a.mst_key=t.mst_key".format(self=self, model_training=model_training))
-
- # truncate and re-creates table to avoid memory blow-ups
- with SetGUC("dev_opt_unsafe_truncate_in_subtransaction", "on"):
- temp_model_table = unique_string('updated_model')
- plpy.execute("CREATE TABLE {temp_model_table} AS SELECT * FROM {self.model_output_table};" \
- "TRUNCATE {self.model_output_table}; " \
- "DROP TABLE {self.model_output_table};".format(temp_model_table=temp_model_table, self=self))
- rename_table(self.schema_madlib, temp_model_table, self.model_output_table)
-
- # inserts any newly trained configs
- plpy.execute("INSERT INTO {self.model_output_table} SELECT * FROM {model_training.original_model_output_table} " \
- "WHERE {model_training.original_model_output_table}.mst_key NOT IN " \
- "(SELECT {ModelSelectionSchema.MST_KEY} FROM {self.model_output_table})".format(self=self,
- model_training=model_training,
- ModelSelectionSchema=ModelSelectionSchema))
-
- def update_model_output_info_table(self, i, model_training, initial_vals):
- """
- Updates gathered information of a hyperband diagonal run to the overall model output info table.
- :param i: outer diagonal loop iteration.
- :param model_training: Fit Multiple function call object.
- :param initial_vals: Dictionary of initial configurations and resources as part of the initial hyperband
- schedule.
- """
- # normalizing factor for metrics_iters due to warm start
- epochs_factor = sum([n[1] for n in initial_vals.values()][::-1][:i]) # i & initial_vals args needed
- iters = plpy.execute("SELECT {AutoMLSchema.METRICS_ITERS} " \
- "FROM {model_training.model_summary_table}".format(AutoMLSchema=AutoMLConstants,
- model_training=model_training))
- metrics_iters_val = [epochs_factor+mi for mi in iters[0]['metrics_iters']] # global iteration counter
-
- validation_update_q = "validation_metrics_final=t.validation_metrics_final, " \
- "validation_loss_final=t.validation_loss_final, " \
- "validation_metrics=a.validation_metrics || t.validation_metrics, " \
- "validation_loss=a.validation_loss || t.validation_loss, " \
- if self.validation_table else ""
-
- # updates train/val info for any previously trained configs
- plpy.execute("UPDATE {self.model_info_table} a SET " \
- "metrics_elapsed_time=a.metrics_elapsed_time || t.metrics_elapsed_time, " \
- "training_metrics_final=t.training_metrics_final, " \
- "training_loss_final=t.training_loss_final, " \
- "training_metrics=a.training_metrics || t.training_metrics, " \
- "training_loss=a.training_loss || t.training_loss, ".format(self=self) + validation_update_q +
- "{AutoMLSchema.METRICS_ITERS}=a.metrics_iters || ARRAY{metrics_iters_val}::INTEGER[] " \
- "FROM {model_training.model_info_table} t " \
- "WHERE a.mst_key=t.mst_key".format(model_training=model_training, AutoMLSchema=AutoMLConstants,
- metrics_iters_val=metrics_iters_val))
-
- # inserts info about metrics and validation for newly trained model configs
- plpy.execute("INSERT INTO {self.model_info_table} SELECT t.*, ARRAY{metrics_iters_val}::INTEGER[] AS metrics_iters " \
- "FROM {model_training.model_info_table} t WHERE t.mst_key NOT IN " \
- "(SELECT {ModelSelectionSchema.MST_KEY} FROM {self.model_info_table})".format(self=self,
- model_training=model_training,
- metrics_iters_val=metrics_iters_val,
- ModelSelectionSchema=ModelSelectionSchema))
-
- def add_additional_info_cols(self, s_dict, i_dict):
- """Adds s and i columns to the info table"""
-
- plpy.execute("ALTER TABLE {self.model_info_table} ADD COLUMN s int, ADD COLUMN i int;".format(self=self))
-
- l = [(k, s_dict[k], i_dict[k]) for k in s_dict]
- query = "UPDATE {self.model_info_table} t SET s=b.s_val, i=b.i_val FROM unnest(ARRAY{l}) " \
- "b (key integer, s_val integer, i_val integer) WHERE t.mst_key=b.key".format(self=self, l=l)
- plpy.execute(query)
-
-class AutoMLHyperopt(KerasAutoML):
- """
- This class implements Hyperopt, another automl method that explores awkward search spaces using
- Random Search, Tree-structured Parzen Estimator (TPE), or Adaptive TPE.
-
- This function executes hyperopt on top of our multiple model training infrastructure powered with
- Model hOpper Parallelism (MOP), a hybrid of data and task parallelism.
-
- This automl method inherits qualities from the automl class.
- """
- def __init__(self, schema_madlib, source_table, model_output_table, model_arch_table, model_selection_table,
- model_id_list, compile_params_grid, fit_params_grid, automl_method,
- automl_params, random_state=None, object_table=None,
- use_gpus=False, validation_table=None, metrics_compute_frequency=None,
- name=None, description=None, **kwargs):
- automl_method = automl_method if automl_method else AutoMLConstants.HYPEROPT
- automl_params = automl_params if automl_params else 'num_configs=20, num_iterations=5, algorithm=tpe'
- KerasAutoML.__init__(self, schema_madlib, source_table, model_output_table, model_arch_table,
- model_selection_table, model_id_list, compile_params_grid, fit_params_grid,
- automl_method, automl_params, random_state, object_table, use_gpus,
- validation_table, metrics_compute_frequency, name, description, **kwargs)
- self.compile_params_grid = self.compile_params_grid.replace('\n', '').replace(' ', '')
- self.fit_params_grid = self.fit_params_grid.replace('\n', '').replace(' ', '')
- try:
- self.compile_params_grid = literal_eval(self.compile_params_grid)
-
- except:
- plpy.error("Invalid syntax in 'compile_params_dict'")
- try:
- self.fit_params_grid = literal_eval(self.fit_params_grid)
- except:
- plpy.error("Invalid syntax in 'fit_params_dict'")
- self.validate_and_define_inputs()
- self.num_segments = self.get_num_segments()
-
- self.create_model_output_table()
- self.create_model_output_info_table()
- self.find_hyperopt_config()
-
- def get_num_segments(self):
- """
- # query dist rules from summary table to get the total no of segments
- :return:
- """
- source_summary_table = add_postfix(self.source_table, '_summary')
- dist_rules = plpy.execute("SELECT {0} from {1}".format(DISTRIBUTION_RULES_COLNAME, source_summary_table))[0][DISTRIBUTION_RULES_COLNAME]
- #TODO create constant for all_segments
- if dist_rules == "all_segments":
- return get_seg_number()
-
- return len(dist_rules)
-
- def validate_and_define_inputs(self):
- automl_params_dict = extract_keyvalue_params(self.automl_params,
- lower_case_names=True)
- # casting relevant values to int
- for i in automl_params_dict:
- try:
- automl_params_dict[i] = int(automl_params_dict[i])
- except ValueError:
- pass
- _assert(len(automl_params_dict) >= 1 and len(automl_params_dict) <= 3,
- "{0}: Only num_configs, num_iterations, and algorithm may be specified".format(self.module_name))
- for i in automl_params_dict:
- if i == AutoMLConstants.NUM_CONFIGS:
- self.num_configs = automl_params_dict[AutoMLConstants.NUM_CONFIGS]
- elif i == AutoMLConstants.NUM_ITERS:
- self.num_iters = automl_params_dict[AutoMLConstants.NUM_ITERS]
- elif i == AutoMLConstants.ALGORITHM:
- if automl_params_dict[AutoMLConstants.ALGORITHM].lower() == 'rand':
- self.algorithm = rand
- elif automl_params_dict[AutoMLConstants.ALGORITHM].lower() == 'tpe':
- self.algorithm = tpe
- # TODO: Add support for atpe uncomment the below lines after atpe works
- # elif automl_params_dict[AutoMLSchema.ALGORITHM].lower() == 'atpe':
- # self.algorithm = atpe
- else:
- plpy.error("{0}: valid algorithm 'automl_params' for hyperopt: 'rand', 'tpe'".format(self.module_name)) # , or 'atpe'
- else:
- plpy.error("{0}: {1} is an invalid automl param".format(self.module_name, i))
- _assert(self.num_configs > 0 and self.num_iters > 0, "{0}: num_configs and num_iterations in 'automl_params' "
- "must be > 0".format(self.module_name))
- _assert(self._is_valid_metrics_compute_frequency(self.num_iters), "{0}: 'metrics_compute_frequency' "
- "out of iteration range".format(self.module_name))
-
- def find_hyperopt_config(self):
- """
- Executes hyperopt on top of MOP.
- """
- make_mst_summary = True
- trials = Trials()
- domain = Domain(None, self.get_search_space())
- rand_state = np.random.RandomState(self.random_state)
- configs_lst = self.get_configs_list(self.num_configs, self.num_segments)
-
- self.start_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT)
- fit_multiple_runtime = 0
- for low, high in configs_lst:
- i, n = low, high - low + 1
-
- # Using HyperOpt TPE/ATPE to generate parameters
- hyperopt_params = []
- sampled_params = []
- for j in range(i, i + n):
- new_param = self.algorithm.suggest([j], domain, trials, rand_state.randint(0, AutoMLConstants.INT_MAX))
- new_param[0]['status'] = STATUS_RUNNING
-
- trials.insert_trial_docs(new_param)
- trials.refresh()
- hyperopt_params.append(new_param[0])
- sampled_params.append(new_param[0]['misc']['vals'])
-
- model_id_list, compile_params, fit_params = self.extract_param_vals(sampled_params)
- msts_list = self.generate_msts(model_id_list, compile_params, fit_params)
- # cleanup_madlib_temp_tables(self.schema_madlib, AutoMLSchema.TARGET_SCHEMA)
- try:
- self.remove_temp_tables(model_training)
- except:
- pass
- self.populate_temp_mst_tables(i, msts_list)
-
- plpy.info("***Evaluating {n} newly suggested model configurations***".format(n=n))
- fit_multiple_start_time = time.time()
- model_training = FitMultipleModel(self.schema_madlib, self.source_table, AutoMLConstants.TEMP_OUTPUT_TABLE,
- AutoMLConstants.TEMP_MST_TABLE, self.num_iters, self.use_gpus, self.validation_table,
- self.metrics_compute_frequency, False, self.name, self.description, fit_multiple_runtime)
- fit_multiple_runtime += time.time() - fit_multiple_start_time
- if make_mst_summary:
- self.generate_mst_summary_table(self.model_selection_summary_table)
- make_mst_summary = False
-
- # HyperOpt TPE update
- for k, hyperopt_param in enumerate(hyperopt_params, i):
- loss_val = plpy.execute("SELECT {AutoMLSchema.LOSS_METRIC} FROM {model_training.model_info_table} " \
- "WHERE {ModelSelectionSchema.MST_KEY}={k}".format(AutoMLSchema=AutoMLConstants,
- ModelSelectionSchema=ModelSelectionSchema,
- **locals()))[0][AutoMLConstants.LOSS_METRIC]
-
- # avoid removing the two lines below (part of Hyperopt updates)
- hyperopt_param['status'] = STATUS_OK
- hyperopt_param['result'] = {'loss': loss_val, 'status': STATUS_OK}
- trials.refresh()
-
- # stacks info of all model configs together
- self.update_model_output_and_info_tables(model_training)
-
- self.print_best_mst_so_far()
-
- self.end_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT)
- self.update_model_selection_table()
- self.generate_model_output_summary_table(model_training)
- # cleanup_madlib_temp_tables(self.schema_madlib, AutoMLSchema.TARGET_SCHEMA)
- self.remove_temp_tables(model_training)
-
- def get_configs_list(self, num_configs, num_segments):
- """
- Gets schedule to evaluate model configs
- :return: Model configs evaluation schedule
- """
- num_buckets = int(round(float(num_configs) / num_segments))
- configs_list = []
- start_idx = 1
- models_populated = 0
- for _ in range(num_buckets - 1):
- end_idx = start_idx + num_segments
- models_populated += num_segments
- configs_list.append((start_idx, end_idx - 1))
- start_idx = end_idx
-
- remaining_models = num_configs - models_populated
- configs_list.append((start_idx, start_idx + remaining_models-1))
-
- return configs_list
-
- def get_search_space(self):
- """
- Converts user inputs to hyperopt search space.
- :return: Hyperopt search space
- """
-
- # initial params (outside 'optimizer_params_list')
- hyperopt_search_dict = {}
- hyperopt_search_dict['model_id'] = self.get_hyperopt_exps('model_id', self.model_id_list)
-
-
- for j in self.fit_params_grid:
- hyperopt_search_dict[j] = self.get_hyperopt_exps(j, self.fit_params_grid[j])
-
- for i in self.compile_params_grid:
- if i != ModelSelectionSchema.OPTIMIZER_PARAMS_LIST:
- hyperopt_search_dict[i] = self.get_hyperopt_exps(i, self.compile_params_grid[i])
-
- hyperopt_search_space_lst = []
-
- counter = 1 # for unique names to allow multiple distribution options for optimizer params
- for optimizer_dict in self.compile_params_grid[ModelSelectionSchema.OPTIMIZER_PARAMS_LIST]:
- for o_param in optimizer_dict:
- name = o_param + '_' + str(counter)
- hyperopt_search_dict[name] = self.get_hyperopt_exps(name, optimizer_dict[o_param])
- # appending deep copy
- hyperopt_search_space_lst.append({k:v for k, v in hyperopt_search_dict.items()})
- for o_param in optimizer_dict:
- name = o_param + '_' + str(counter)
- del hyperopt_search_dict[name]
- counter += 1
-
- return hp.choice('space', hyperopt_search_space_lst)
-
- def get_hyperopt_exps(self, cp, param_value_list):
- """
- Samples a value from a given list of values, either randomly from a list of discrete elements,
- or from a specified distribution.
- :param cp: compile param
- :param param_value_list: list of values (or specified distribution) for a param
- :return: sampled value
- """
- # check if need to sample from a distribution
- if type(param_value_list[-1]) == str and all([type(i) != str and not callable(i) for i in param_value_list[:-1]]) \
- and len(param_value_list) > 1:
- _assert_equal(len(param_value_list), 3,
- "{0}: '{1}' should have exactly 3 elements if picking from a distribution".format(self.module_name, cp))
- _assert(param_value_list[1] > param_value_list[0],
- "{0}: '{1}' should be of the format [lower_bound, upper_bound, distribution_type]".format(self.module_name, cp))
- if param_value_list[-1] == 'linear':
- return hp.uniform(cp, param_value_list[0], param_value_list[1])
- elif param_value_list[-1] == 'log':
- return hp.loguniform(cp, np.log(param_value_list[0]), np.log(param_value_list[1]))
- else:
- plpy.error("{0}: Please choose a valid distribution type for '{1}': {2}".format(
- self.module_name,
- self.original_param_details(cp)[0],
- ['linear', 'log']))
- else:
- # random sampling
- return hp.choice(cp, param_value_list)
-
- def extract_param_vals(self, sampled_params):
- """
- Extract parameter values from hyperopt search space.
- :param sampled_params: params suggested by hyperopt.
- :return: lists of model ids, compile and fit params.
- """
- model_id_list, compile_params, fit_params = [], [], []
- for params_dict in sampled_params:
- compile_dict, fit_dict, optimizer_params_dict = {}, {}, {}
- for p in params_dict:
- if len(params_dict[p]) == 0 or p == 'space':
- continue
- val = params_dict[p][0]
- if p == 'model_id':
- model_id_list.append(self.model_id_list[val])
- continue
- elif p in self.fit_params_grid:
- try:
- # check if params_dict[p] is an index
- fit_dict[p] = self.fit_params_grid[p][val]
- except TypeError:
- fit_dict[p] = params_dict[p]
- elif p in self.compile_params_grid:
- try:
- # check if params_dict[p] is an index
- compile_dict[p] = self.compile_params_grid[p][val]
- except TypeError:
- compile_dict[p] = val
- else:
- o_param, idx = self.original_param_details(p) # extracting unique attribute
- try:
- # check if params_dict[p] is an index (i.e. optimizer, for example)
- optimizer_params_dict[o_param] = self.compile_params_grid[
- ModelSelectionSchema.OPTIMIZER_PARAMS_LIST][idx][o_param][val]
- except TypeError:
- optimizer_params_dict[o_param] = val
- compile_dict[ModelSelectionSchema.OPTIMIZER_PARAMS_LIST] = optimizer_params_dict
-
- compile_params.append(compile_dict)
- fit_params.append(fit_dict)
-
- return model_id_list, compile_params, fit_params
-
- def original_param_details(self, name):
- """
- Returns the original param name and book-keeping detail.
- :param name: name of the param (example - lr_1, epsilon_12)
- :return: original param name and book-keeping position.
- """
- parts = name.split('_')
- return '_'.join(parts[:-1]), int(parts[-1]) - 1
-
-
- def generate_msts(self, model_id_list, compile_params, fit_params):
- """
- Generates msts to insert in the mst table.
- :param model_id_list: list of model ids
- :param compile_params: list compile params
- :param fit_params:list of fit params
- :return: List of msts to insert in the mst table.
- """
- assert len(model_id_list) == len(compile_params) == len(fit_params)
- msts = []
-
- for i in range(len(compile_params)):
- combination = {}
- combination[ModelSelectionSchema.MODEL_ID] = model_id_list[i]
- combination[ModelSelectionSchema.COMPILE_PARAMS] = generate_row_string(compile_params[i])
- combination[ModelSelectionSchema.FIT_PARAMS] = generate_row_string(fit_params[i])
- msts.append(combination)
-
- return msts
-
- def populate_temp_mst_tables(self, i, msts_list):
- """
- Creates and populates temp mst and summary tables with newly suggested model configs for evaluation.
- :param i: mst key number
- :param msts_list: list of generated msts.
- """
- # extra sanity check
- if table_exists(AutoMLConstants.TEMP_MST_TABLE):
- drop_tables([AutoMLConstants.TEMP_MST_TABLE])
-
- create_query = """
- CREATE TABLE {AutoMLSchema.TEMP_MST_TABLE} (
- {mst_key} INTEGER,
- {model_id} INTEGER,
- {compile_params} VARCHAR,
- {fit_params} VARCHAR,
- unique ({model_id}, {compile_params}, {fit_params})
- );
- """.format(AutoMLSchema=AutoMLConstants,
- mst_key=ModelSelectionSchema.MST_KEY,
- model_id=ModelSelectionSchema.MODEL_ID,
- compile_params=ModelSelectionSchema.COMPILE_PARAMS,
- fit_params=ModelSelectionSchema.FIT_PARAMS)
- plpy.execute(create_query)
- mst_key_val = i
- for mst in msts_list:
- model_id = mst[ModelSelectionSchema.MODEL_ID]
- compile_params = mst[ModelSelectionSchema.COMPILE_PARAMS]
- fit_params = mst[ModelSelectionSchema.FIT_PARAMS]
- insert_query = """
- INSERT INTO
- {AutoMLSchema.TEMP_MST_TABLE}(
- {mst_key_col},
- {model_id_col},
- {compile_params_col},
- {fit_params_col}
- )
- VALUES (
- {mst_key_val},
- {model_id},
- $${compile_params}$$,
- $${fit_params}$$
- )
- """.format(mst_key_col=ModelSelectionSchema.MST_KEY,
- model_id_col=ModelSelectionSchema.MODEL_ID,
- compile_params_col=ModelSelectionSchema.COMPILE_PARAMS,
- fit_params_col=ModelSelectionSchema.FIT_PARAMS,
- AutoMLSchema=AutoMLConstants,
- **locals())
- mst_key_val += 1
- plpy.execute(insert_query)
-
- self.generate_mst_summary_table(AutoMLConstants.TEMP_MST_SUMMARY_TABLE)
-
- def generate_mst_summary_table(self, tbl_name):
- """
- generates mst summary table with the given name
- :param tbl_name: name of summary table
- """
- _assert(tbl_name.endswith('_summary'), 'invalid summary table name')
-
- # extra sanity check
- if table_exists(tbl_name):
- drop_tables([tbl_name])
-
- create_query = """
- CREATE TABLE {tbl_name} (
- {model_arch_table} VARCHAR,
- {object_table} VARCHAR
- );
- """.format(tbl_name=tbl_name,
- model_arch_table=ModelSelectionSchema.MODEL_ARCH_TABLE,
- object_table=ModelSelectionSchema.OBJECT_TABLE)
- plpy.execute(create_query)
-
- if self.object_table is None:
- object_table = 'NULL::VARCHAR'
- else:
- object_table = '$${0}$$'.format(self.object_table)
- insert_summary_query = """
- INSERT INTO
- {tbl_name}(
- {model_arch_table_name},
- {object_table_name}
- )
- VALUES (
- $${self.model_arch_table}$$,
- {object_table}
- )
- """.format(model_arch_table_name=ModelSelectionSchema.MODEL_ARCH_TABLE,
- object_table_name=ModelSelectionSchema.OBJECT_TABLE,
- **locals())
- plpy.execute(insert_summary_query)
-
- def update_model_output_and_info_tables(self, model_training):
- """
- Updates model output and info tables by stacking rows after each evaluation round.
- :param model_training: Fit Multiple class object
- """
- metrics_iters = plpy.execute("SELECT {AutoMLSchema.METRICS_ITERS} " \
- "FROM {model_training.original_model_output_table}_summary".format(self=self,
- model_training=model_training,
- AutoMLSchema=AutoMLConstants))[0][AutoMLConstants.METRICS_ITERS]
- if metrics_iters:
- metrics_iters = "ARRAY{0}".format(metrics_iters)
- # stacking new rows from training
- plpy.execute("INSERT INTO {self.model_output_table} SELECT * FROM " \
- "{model_training.original_model_output_table}".format(self=self, model_training=model_training))
- plpy.execute("INSERT INTO {self.model_info_table} SELECT *, {metrics_iters} FROM " \
- "{model_training.model_info_table}".format(self=self,
- model_training=model_training,
- metrics_iters=metrics_iters))
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_automl.sql_in b/src/ports/postgres/modules/deep_learning/madlib_keras_automl.sql_in
index a5c7507..113ec16 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_automl.sql_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_automl.sql_in
@@ -625,9 +625,9 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.hyperband_schedule(
eta INTEGER DEFAULT 3,
skip_last INTEGER DEFAULT 0
) RETURNS VOID AS $$
- PythonFunctionBodyOnly(`deep_learning', `madlib_keras_automl')
+ PythonFunctionBodyOnly(`deep_learning', `madlib_keras_automl_hyperband')
with AOControl(False) and MinWarning('warning'):
- schedule_loader = madlib_keras_automl.HyperbandSchedule(schedule_table, r, eta, skip_last)
+ schedule_loader = madlib_keras_automl_hyperband.HyperbandSchedule(schedule_table, r, eta, skip_last)
schedule_loader.load()
$$ LANGUAGE plpythonu VOLATILE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
@@ -650,13 +650,15 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.madlib_keras_automl(
name VARCHAR DEFAULT NULL,
description VARCHAR DEFAULT NULL
) RETURNS VOID AS $$
- PythonFunctionBodyOnly(`deep_learning', `madlib_keras_automl')
+if automl_method is None or automl_method.lower() == 'hyperband':
+ PythonFunctionBodyOnly(`deep_learning', `madlib_keras_automl_hyperband')
with AOControl(False) and MinWarning('warning'):
- if automl_method is None or automl_method.lower() == 'hyperband':
- schedule_loader = madlib_keras_automl.AutoMLHyperband(**globals())
- elif automl_method.lower() == 'hyperopt':
- schedule_loader = madlib_keras_automl.AutoMLHyperopt(**globals())
- else:
- plpy.error("madlib_keras_automl: The chosen automl method must be 'hyperband' or 'hyperopt'")
+ schedule_loader = madlib_keras_automl_hyperband.AutoMLHyperband(**globals())
+elif automl_method.lower() == 'hyperopt':
+ PythonFunctionBodyOnly(`deep_learning', `madlib_keras_automl_hyperopt')
+ with AOControl(False) and MinWarning('warning'):
+ schedule_loader = madlib_keras_automl_hyperopt.AutoMLHyperopt(**globals())
+else:
+ plpy.error("madlib_keras_automl: The chosen automl method must be 'hyperband' or 'hyperopt'")
$$ LANGUAGE plpythonu VOLATILE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_automl_hyperband.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_automl_hyperband.py_in
new file mode 100644
index 0000000..2d10f8c
--- /dev/null
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_automl_hyperband.py_in
@@ -0,0 +1,419 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import math
+import plpy
+
+from madlib_keras_automl import KerasAutoML, AutoMLConstants
+from utilities.utilities import get_current_timestamp, get_seg_number, get_segments_per_host, \
+ unique_string, add_postfix, extract_keyvalue_params, _assert, _assert_equal, rename_table, \
+ is_platform_pg
+from utilities.control import SetGUC
+from madlib_keras_fit_multiple_model import FitMultipleModel
+from madlib_keras_model_selection import MstSearch, ModelSelectionSchema
+from utilities.validate_args import table_exists, drop_tables, input_tbl_valid
+
+class HyperbandSchedule():
+ """The utility class for loading a hyperband schedule table with algorithm inputs.
+
+ Attributes:
+ schedule_table (string): Name of output table containing hyperband schedule.
+ R (int): Maximum number of resources (iterations) that can be allocated
+ to a single configuration.
+ eta (int): Controls the proportion of configurations discarded in
+ each round of successive halving.
+ skip_last (int): The number of last rounds to skip.
+ """
+ def __init__(self, schedule_table, R, eta=3, skip_last=0):
+ if is_platform_pg():
+ plpy.error(
+ "DL: Hyperband schedule is not supported on PostgreSQL.")
+ self.schedule_table = schedule_table # table name to store hyperband schedule
+ self.R = R # maximum iterations/epochs allocated to a configuration
+ self.eta = eta # defines downsampling rate
+ self.skip_last = skip_last
+ self.module_name = 'hyperband_schedule'
+ self.validate_inputs()
+
+ # number of unique executions of Successive Halving (minus one)
+ self.s_max = int(math.floor(math.log(self.R, self.eta)))
+ self.validate_s_max()
+
+ self.schedule_vals = []
+
+ self.calculate_schedule()
+
+ def load(self):
+ """
+ The entry point for loading the hyperband schedule table.
+ """
+ self.create_schedule_table()
+ self.insert_into_schedule_table()
+
+ def validate_inputs(self):
+ """
+ Validates user input values
+ """
+ _assert(self.eta > 1, "{0}: eta must be greater than 1".format(self.module_name))
+ _assert(self.R >= self.eta, "{0}: R should not be less than eta".format(self.module_name))
+
+ def validate_s_max(self):
+ _assert(self.skip_last >= 0 and self.skip_last < self.s_max+1, "{0}: skip_last must be " +
+ "non-negative and less than {1}".format(self.module_name,self.s_max))
+
+ def calculate_schedule(self):
+ """
+ Calculates the hyperband schedule (number of configs and allocated resources)
+ in each round of each bracket and skips the number of last rounds specified in 'skip_last'
+ """
+ for s in reversed(range(self.s_max+1)):
+ n = int(math.ceil(int((self.s_max+1)/(s+1))*math.pow(self.eta, s))) # initial number of configurations
+ r = self.R * math.pow(self.eta, -s)
+
+ for i in range((s+1) - int(self.skip_last)):
+ # Computing each of the
+ n_i = n*math.pow(self.eta, -i)
+ r_i = r*math.pow(self.eta, i)
+
+ self.schedule_vals.append({AutoMLConstants.BRACKET: s,
+ AutoMLConstants.ROUND: i,
+ AutoMLConstants.CONFIGURATIONS: int(n_i),
+ AutoMLConstants.RESOURCES: int(round(r_i))})
+
+ def create_schedule_table(self):
+ """Initializes the output schedule table"""
+ create_query = """
+ CREATE TABLE {self.schedule_table} (
+ {s} INTEGER,
+ {i} INTEGER,
+ {n_i} INTEGER,
+ {r_i} INTEGER,
+ unique ({s}, {i})
+ );
+ """.format(self=self,
+ s=AutoMLConstants.BRACKET,
+ i=AutoMLConstants.ROUND,
+ n_i=AutoMLConstants.CONFIGURATIONS,
+ r_i=AutoMLConstants.RESOURCES)
+ plpy.execute(create_query)
+
+ def insert_into_schedule_table(self):
+ """Insert everything in self.schedule_vals into the output schedule table."""
+ for sd in self.schedule_vals:
+ sd_s = sd[AutoMLConstants.BRACKET]
+ sd_i = sd[AutoMLConstants.ROUND]
+ sd_n_i = sd[AutoMLConstants.CONFIGURATIONS]
+ sd_r_i = sd[AutoMLConstants.RESOURCES]
+ insert_query = """
+ INSERT INTO
+ {self.schedule_table}(
+ {s_col},
+ {i_col},
+ {n_i_col},
+ {r_i_col}
+ )
+ VALUES (
+ {sd_s},
+ {sd_i},
+ {sd_n_i},
+ {sd_r_i}
+ )
+ """.format(s_col=AutoMLConstants.BRACKET,
+ i_col=AutoMLConstants.ROUND,
+ n_i_col=AutoMLConstants.CONFIGURATIONS,
+ r_i_col=AutoMLConstants.RESOURCES,
+ **locals())
+ plpy.execute(insert_query)
+
+class AutoMLHyperband(KerasAutoML):
+ """
+ This class implements Hyperband, an infinite-arm bandit based algorithm that speeds up random search
+ through adaptive resource allocation, successive halving (SHA), and early stopping.
+
+ This class showcases a novel hyperband implementation by executing the hyperband rounds 'diagonally'
+ to evaluate multiple configurations together and leverage the compute power of MPP databases such as Greenplum.
+
+ This automl method inherits qualities from the automl class.
+ """
+ def __init__(self, schema_madlib, source_table, model_output_table, model_arch_table, model_selection_table,
+ model_id_list, compile_params_grid, fit_params_grid, automl_method,
+ automl_params, random_state=None, object_table=None,
+ use_gpus=False, validation_table=None, metrics_compute_frequency=None,
+ name=None, description=None, **kwargs):
+ automl_method = automl_method if automl_method else AutoMLConstants.HYPERBAND
+ automl_params = automl_params if automl_params else 'R=6, eta=3, skip_last=0'
+ KerasAutoML.__init__(self, schema_madlib, source_table, model_output_table, model_arch_table,
+ model_selection_table, model_id_list, compile_params_grid, fit_params_grid,
+ automl_method, automl_params, random_state, object_table, use_gpus,
+ validation_table, metrics_compute_frequency, name, description, **kwargs)
+ self.validate_and_define_inputs()
+ self.create_model_output_table()
+ self.create_model_output_info_table()
+ self.find_hyperband_config()
+
+ def validate_and_define_inputs(self):
+ automl_params_dict = extract_keyvalue_params(self.automl_params,
+ lower_case_names=False)
+ # casting dict values to int
+ for i in automl_params_dict:
+ _assert(i in AutoMLConstants.HYPERBAND_PARAMS,
+ "{0}: Invalid param(s) passed in for hyperband. "\
+ "Only R, eta, and skip_last may be specified".format(self.module_name))
+ automl_params_dict[i] = int(automl_params_dict[i])
+ _assert(len(automl_params_dict) >= 1 and len(automl_params_dict) <= 3,
+ "{0}: Only R, eta, and skip_last may be specified".format(self.module_name))
+ for i in automl_params_dict:
+ if i == AutoMLConstants.R:
+ self.R = automl_params_dict[AutoMLConstants.R]
+ elif i == AutoMLConstants.ETA:
+ self.eta = automl_params_dict[AutoMLConstants.ETA]
+ elif i == AutoMLConstants.SKIP_LAST:
+ self.skip_last = automl_params_dict[AutoMLConstants.SKIP_LAST]
+ else:
+ plpy.error("{0}: {1} is an invalid automl param".format(self.module_name, i))
+ _assert(self.eta > 1, "{0}: eta must be greater than 1".format(self.module_name))
+ _assert(self.R >= self.eta, "{0}: R should not be less than eta".format(self.module_name))
+ self.s_max = int(math.floor(math.log(self.R, self.eta)))
+ _assert(self.skip_last >= 0 and self.skip_last < self.s_max+1, "{0}: skip_last must be " \
+ "non-negative and less than {1}".format(self.module_name, self.s_max))
+
+ def find_hyperband_config(self):
+ """
+ Executes the diagonal hyperband algorithm.
+ """
+ initial_vals = {}
+
+ # get hyper parameter configs for each s
+ for s in reversed(range(self.s_max+1)):
+ n = int(math.ceil(int((self.s_max+1)/(s+1))*math.pow(self.eta, s))) # initial number of configurations
+ r = self.R * math.pow(self.eta, -s) # initial number of iterations to run configurations for
+ initial_vals[s] = (n, int(round(r)))
+ self.start_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT)
+ random_search = MstSearch(self.schema_madlib,
+ self.model_arch_table,
+ self.model_selection_table,
+ self.model_id_list,
+ self.compile_params_grid,
+ self.fit_params_grid,
+ 'random',
+ sum([initial_vals[k][0] for k in initial_vals][self.skip_last:]),
+ self.random_state,
+ self.object_table)
+ random_search.load() # for populating mst tables
+
+ # for creating the summary table for usage in fit multiple
+ plpy.execute("CREATE TABLE {AutoMLSchema.TEMP_MST_SUMMARY_TABLE} AS " \
+ "SELECT * FROM {random_search.model_selection_summary_table}".format(AutoMLSchema=AutoMLConstants,
+ random_search=random_search))
+ ranges_dict = self.mst_key_ranges_dict(initial_vals)
+ # to store the bracket and round numbers
+ s_dict, i_dict = {}, {}
+ for key, val in ranges_dict.items():
+ for mst_key in range(val[0], val[1]+1):
+ s_dict[mst_key] = key
+ i_dict[mst_key] = -1
+
+ # outer loop on diagonal
+ for i in range((self.s_max+1) - int(self.skip_last)):
+ # inner loop on s desc
+ temp_lst = []
+ configs_prune_lookup = {}
+ for s in range(self.s_max, self.s_max-i-1, -1):
+ n = initial_vals[s][0]
+ n_i = n * math.pow(self.eta, -i+self.s_max-s)
+ configs_prune_lookup[s] = int(round(n_i))
+ temp_lst.append("{0} configs under bracket={1} & round={2}".format(int(n_i), s, s-self.s_max+i))
+ num_iterations = int(initial_vals[self.s_max-i][1])
+ plpy.info('*** Diagonally evaluating ' + ', '.join(temp_lst) + ' with {0} iterations ***'.format(
+ num_iterations))
+
+ self.reconstruct_temp_mst_table(i, ranges_dict, configs_prune_lookup) # has keys to evaluate
+ active_keys = plpy.execute("SELECT {ModelSelectionSchema.MST_KEY} " \
+ "FROM {AutoMLSchema.TEMP_MST_TABLE}".format(AutoMLSchema=AutoMLConstants,
+ ModelSelectionSchema=ModelSelectionSchema))
+ for k in active_keys:
+ i_dict[k[ModelSelectionSchema.MST_KEY]] += 1
+ self.warm_start = int(i != 0)
+ mcf = self.metrics_compute_frequency if self._is_valid_metrics_compute_frequency(num_iterations) else None
+ with SetGUC("plan_cache_mode", "force_generic_plan"):
+ model_training = FitMultipleModel(self.schema_madlib, self.source_table, AutoMLConstants.TEMP_OUTPUT_TABLE,
+ AutoMLConstants.TEMP_MST_TABLE, num_iterations, self.use_gpus,
+ self.validation_table, mcf, self.warm_start, self.name, self.description)
+ self.update_model_output_table(model_training)
+ self.update_model_output_info_table(i, model_training, initial_vals)
+
+ self.print_best_mst_so_far()
+
+ self.end_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT)
+ self.add_additional_info_cols(s_dict, i_dict)
+ self.update_model_selection_table()
+ self.generate_model_output_summary_table(model_training)
+ self.remove_temp_tables(model_training)
+
+ def mst_key_ranges_dict(self, initial_vals):
+ """
+ Extracts the ranges of model configs (using mst_keys) belonging to / sampled as part of
+ executing a particular SHA bracket.
+ """
+ d = {}
+ for s_val in sorted(initial_vals.keys(), reverse=True): # going from s_max to 0
+ if s_val == self.s_max:
+ d[s_val] = (1, initial_vals[s_val][0])
+ else:
+ d[s_val] = (d[s_val+1][1]+1, d[s_val+1][1]+initial_vals[s_val][0])
+ return d
+
+ def reconstruct_temp_mst_table(self, i, ranges_dict, configs_prune_lookup):
+ """
+ Drops and Reconstructs a temp mst table for evaluation along particular diagonals of hyperband.
+ :param i: outer diagonal loop iteration.
+ :param ranges_dict: model config ranges to group by bracket number.
+ :param configs_prune_lookup: Lookup dictionary for configs to evaluate for a diagonal.
+ :return:
+ """
+ if i == 0:
+ _assert_equal(len(configs_prune_lookup), 1, "invalid args")
+ lower_bound, upper_bound = ranges_dict[self.s_max]
+ plpy.execute("CREATE TABLE {AutoMLSchema.TEMP_MST_TABLE} AS SELECT * FROM {self.model_selection_table} "
+ "WHERE {ModelSelectionSchema.MST_KEY} >= {lower_bound} " \
+ "AND {ModelSelectionSchema.MST_KEY} <= {upper_bound}".format(self=self,
+ AutoMLSchema=AutoMLConstants,
+ lower_bound=lower_bound,
+ upper_bound=upper_bound,
+ ModelSelectionSchema=ModelSelectionSchema))
+ return
+ # dropping and repopulating temp_mst_table
+ drop_tables([AutoMLConstants.TEMP_MST_TABLE])
+
+ # {mst_key} changed from SERIAL to INTEGER for safe insertions and preservation of mst_key values
+ create_query = """
+ CREATE TABLE {AutoMLSchema.TEMP_MST_TABLE} (
+ {mst_key} INTEGER,
+ {model_id} INTEGER,
+ {compile_params} VARCHAR,
+ {fit_params} VARCHAR,
+ unique ({model_id}, {compile_params}, {fit_params})
+ );
+ """.format(AutoMLSchema=AutoMLConstants,
+ mst_key=ModelSelectionSchema.MST_KEY,
+ model_id=ModelSelectionSchema.MODEL_ID,
+ compile_params=ModelSelectionSchema.COMPILE_PARAMS,
+ fit_params=ModelSelectionSchema.FIT_PARAMS)
+ plpy.execute(create_query)
+
+ query = ""
+ new_configs = True
+ for s_val in configs_prune_lookup:
+ lower_bound, upper_bound = ranges_dict[s_val]
+ if new_configs:
+ query += "INSERT INTO {AutoMLSchema.TEMP_MST_TABLE} SELECT {ModelSelectionSchema.MST_KEY}, " \
+ "{ModelSelectionSchema.MODEL_ID}, {ModelSelectionSchema.COMPILE_PARAMS}, " \
+ "{ModelSelectionSchema.FIT_PARAMS} FROM {self.model_selection_table} WHERE " \
+ "{ModelSelectionSchema.MST_KEY} >= {lower_bound} AND {ModelSelectionSchema.MST_KEY} <= " \
+ "{upper_bound};".format(self=self, AutoMLSchema=AutoMLConstants,
+ ModelSelectionSchema=ModelSelectionSchema,
+ lower_bound=lower_bound, upper_bound=upper_bound)
+ new_configs = False
+ else:
+ query += "INSERT INTO {AutoMLSchema.TEMP_MST_TABLE} SELECT {ModelSelectionSchema.MST_KEY}, " \
+ "{ModelSelectionSchema.MODEL_ID}, {ModelSelectionSchema.COMPILE_PARAMS}, " \
+ "{ModelSelectionSchema.FIT_PARAMS} " \
+ "FROM {self.model_info_table} WHERE {ModelSelectionSchema.MST_KEY} >= {lower_bound} " \
+ "AND {ModelSelectionSchema.MST_KEY} <= {upper_bound} ORDER BY {AutoMLSchema.LOSS_METRIC} " \
+ "LIMIT {configs_prune_lookup_val};".format(self=self, AutoMLSchema=AutoMLConstants,
+ ModelSelectionSchema=ModelSelectionSchema,
+ lower_bound=lower_bound, upper_bound=upper_bound,
+ configs_prune_lookup_val=configs_prune_lookup[s_val])
+ plpy.execute(query)
+
+ def update_model_output_table(self, model_training):
+ """
+ Updates gathered information of a hyperband diagonal run to the overall model output table.
+ :param model_training: Fit Multiple function call object.
+ """
+ # updates model weights for any previously trained configs
+ plpy.execute("UPDATE {self.model_output_table} a SET model_weights=" \
+ "t.model_weights FROM {model_training.original_model_output_table} t " \
+ "WHERE a.mst_key=t.mst_key".format(self=self, model_training=model_training))
+
+ # truncate and re-creates table to avoid memory blow-ups
+ with SetGUC("dev_opt_unsafe_truncate_in_subtransaction", "on"):
+ temp_model_table = unique_string('updated_model')
+ plpy.execute("CREATE TABLE {temp_model_table} AS SELECT * FROM {self.model_output_table};" \
+ "TRUNCATE {self.model_output_table}; " \
+ "DROP TABLE {self.model_output_table};".format(temp_model_table=temp_model_table, self=self))
+ rename_table(self.schema_madlib, temp_model_table, self.model_output_table)
+
+ # inserts any newly trained configs
+ plpy.execute("INSERT INTO {self.model_output_table} SELECT * FROM {model_training.original_model_output_table} " \
+ "WHERE {model_training.original_model_output_table}.mst_key NOT IN " \
+ "(SELECT {ModelSelectionSchema.MST_KEY} FROM {self.model_output_table})".format(self=self,
+ model_training=model_training,
+ ModelSelectionSchema=ModelSelectionSchema))
+
+ def update_model_output_info_table(self, i, model_training, initial_vals):
+ """
+ Updates gathered information of a hyperband diagonal run to the overall model output info table.
+ :param i: outer diagonal loop iteration.
+ :param model_training: Fit Multiple function call object.
+ :param initial_vals: Dictionary of initial configurations and resources as part of the initial hyperband
+ schedule.
+ """
+ # normalizing factor for metrics_iters due to warm start
+ epochs_factor = sum([n[1] for n in initial_vals.values()][::-1][:i]) # i & initial_vals args needed
+ iters = plpy.execute("SELECT {AutoMLSchema.METRICS_ITERS} " \
+ "FROM {model_training.model_summary_table}".format(AutoMLSchema=AutoMLConstants,
+ model_training=model_training))
+ metrics_iters_val = [epochs_factor+mi for mi in iters[0]['metrics_iters']] # global iteration counter
+
+ validation_update_q = "validation_metrics_final=t.validation_metrics_final, " \
+ "validation_loss_final=t.validation_loss_final, " \
+ "validation_metrics=a.validation_metrics || t.validation_metrics, " \
+ "validation_loss=a.validation_loss || t.validation_loss, " \
+ if self.validation_table else ""
+
+ # updates train/val info for any previously trained configs
+ plpy.execute("UPDATE {self.model_info_table} a SET " \
+ "metrics_elapsed_time=a.metrics_elapsed_time || t.metrics_elapsed_time, " \
+ "training_metrics_final=t.training_metrics_final, " \
+ "training_loss_final=t.training_loss_final, " \
+ "training_metrics=a.training_metrics || t.training_metrics, " \
+ "training_loss=a.training_loss || t.training_loss, ".format(self=self) + validation_update_q +
+ "{AutoMLSchema.METRICS_ITERS}=a.metrics_iters || ARRAY{metrics_iters_val}::INTEGER[] " \
+ "FROM {model_training.model_info_table} t " \
+ "WHERE a.mst_key=t.mst_key".format(model_training=model_training, AutoMLSchema=AutoMLConstants,
+ metrics_iters_val=metrics_iters_val))
+
+ # inserts info about metrics and validation for newly trained model configs
+ plpy.execute("INSERT INTO {self.model_info_table} SELECT t.*, ARRAY{metrics_iters_val}::INTEGER[] AS metrics_iters " \
+ "FROM {model_training.model_info_table} t WHERE t.mst_key NOT IN " \
+ "(SELECT {ModelSelectionSchema.MST_KEY} FROM {self.model_info_table})".format(self=self,
+ model_training=model_training,
+ metrics_iters_val=metrics_iters_val,
+ ModelSelectionSchema=ModelSelectionSchema))
+
+ def add_additional_info_cols(self, s_dict, i_dict):
+ """Adds s and i columns to the info table"""
+
+ plpy.execute("ALTER TABLE {self.model_info_table} ADD COLUMN s int, ADD COLUMN i int;".format(self=self))
+
+ l = [(k, s_dict[k], i_dict[k]) for k in s_dict]
+ query = "UPDATE {self.model_info_table} t SET s=b.s_val, i=b.i_val FROM unnest(ARRAY{l}) " \
+ "b (key integer, s_val integer, i_val integer) WHERE t.mst_key=b.key".format(self=self, l=l)
+ plpy.execute(query)
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_automl_hyperopt.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_automl_hyperopt.py_in
new file mode 100644
index 0000000..34d2e97
--- /dev/null
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_automl_hyperopt.py_in
@@ -0,0 +1,458 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from ast import literal_eval
+from hyperopt import hp, rand, tpe, atpe, Trials, STATUS_OK, STATUS_RUNNING
+from hyperopt.base import Domain
+import numpy as np
+import plpy
+import time
+
+from madlib_keras_automl import KerasAutoML, AutoMLConstants
+from input_data_preprocessor import DistributionRulesOptions
+from madlib_keras_fit_multiple_model import FitMultipleModel
+from madlib_keras_helper import generate_row_string
+from madlib_keras_helper import DISTRIBUTION_RULES_COLNAME
+from madlib_keras_model_selection import ModelSelectionSchema
+from utilities.control import SetGUC
+from utilities.utilities import get_current_timestamp, get_seg_number, get_segments_per_host, \
+ unique_string, add_postfix, extract_keyvalue_params, _assert, _assert_equal, rename_table
+from utilities.validate_args import table_exists, drop_tables, input_tbl_valid
+
+class AutoMLHyperopt(KerasAutoML):
+ """
+ This class implements Hyperopt, another automl method that explores awkward search spaces using
+ Random Search, Tree-structured Parzen Estimator (TPE), or Adaptive TPE.
+
+ This function executes hyperopt on top of our multiple model training infrastructure powered with
+ Model hOpper Parallelism (MOP), a hybrid of data and task parallelism.
+
+ This automl method inherits qualities from the automl class.
+ """
+ def __init__(self, schema_madlib, source_table, model_output_table, model_arch_table, model_selection_table,
+ model_id_list, compile_params_grid, fit_params_grid, automl_method,
+ automl_params, random_state=None, object_table=None,
+ use_gpus=False, validation_table=None, metrics_compute_frequency=None,
+ name=None, description=None, **kwargs):
+ automl_method = automl_method if automl_method else AutoMLConstants.HYPEROPT
+ automl_params = automl_params if automl_params else 'num_configs=20, num_iterations=5, algorithm=tpe'
+ KerasAutoML.__init__(self, schema_madlib, source_table, model_output_table, model_arch_table,
+ model_selection_table, model_id_list, compile_params_grid, fit_params_grid,
+ automl_method, automl_params, random_state, object_table, use_gpus,
+ validation_table, metrics_compute_frequency, name, description, **kwargs)
+ self.compile_params_grid = self.compile_params_grid.replace('\n', '').replace(' ', '')
+ self.fit_params_grid = self.fit_params_grid.replace('\n', '').replace(' ', '')
+ try:
+ self.compile_params_grid = literal_eval(self.compile_params_grid)
+
+ except:
+ plpy.error("Invalid syntax in 'compile_params_dict'")
+ try:
+ self.fit_params_grid = literal_eval(self.fit_params_grid)
+ except:
+ plpy.error("Invalid syntax in 'fit_params_dict'")
+ self.validate_and_define_inputs()
+ self.num_segments = self.get_num_segments()
+
+ self.create_model_output_table()
+ self.create_model_output_info_table()
+ self.find_hyperopt_config()
+
+ def get_num_segments(self):
+ """
+ # query dist rules from summary table to get the total no of segments
+ :return:
+ """
+ source_summary_table = add_postfix(self.source_table, '_summary')
+ dist_rules = plpy.execute("SELECT {0} from {1}".format(DISTRIBUTION_RULES_COLNAME, source_summary_table))[0][DISTRIBUTION_RULES_COLNAME]
+ if dist_rules == DistributionRulesOptions.ALL_SEGMENTS:
+ return get_seg_number()
+
+ return len(dist_rules)
+
+ def validate_and_define_inputs(self):
+ automl_params_dict = extract_keyvalue_params(self.automl_params,
+ lower_case_names=True)
+ # casting relevant values to int
+ for i in automl_params_dict:
+ _assert(i in AutoMLConstants.HYPEROPT_PARAMS,
+ "{0}: Invalid param(s) passed in for hyperopt. "\
+ "Only num_configs, num_iterations, and algorithm may be specified".format(self.module_name))
+ try:
+ automl_params_dict[i] = int(automl_params_dict[i])
+ except ValueError:
+ pass
+ _assert(len(automl_params_dict) >= 1 and len(automl_params_dict) <= 3,
+ "{0}: Only num_configs, num_iterations, and algorithm may be specified".format(self.module_name))
+ for i in automl_params_dict:
+ if i == AutoMLConstants.NUM_CONFIGS:
+ self.num_configs = automl_params_dict[AutoMLConstants.NUM_CONFIGS]
+ elif i == AutoMLConstants.NUM_ITERS:
+ self.num_iters = automl_params_dict[AutoMLConstants.NUM_ITERS]
+ elif i == AutoMLConstants.ALGORITHM:
+ if automl_params_dict[AutoMLConstants.ALGORITHM].lower() == 'rand':
+ self.algorithm = rand
+ elif automl_params_dict[AutoMLConstants.ALGORITHM].lower() == 'tpe':
+ self.algorithm = tpe
+ # TODO: Add support for atpe uncomment the below lines after atpe works
+ # elif automl_params_dict[AutoMLSchema.ALGORITHM].lower() == 'atpe':
+ # self.algorithm = atpe
+ else:
+ plpy.error("{0}: valid algorithm 'automl_params' for hyperopt: 'rand', 'tpe'".format(self.module_name)) # , or 'atpe'
+ else:
+ plpy.error("{0}: {1} is an invalid automl param".format(self.module_name, i))
+ _assert(self.num_configs > 0 and self.num_iters > 0, "{0}: num_configs and num_iterations in 'automl_params' "
+ "must be > 0".format(self.module_name))
+ _assert(self._is_valid_metrics_compute_frequency(self.num_iters), "{0}: 'metrics_compute_frequency' "
+ "out of iteration range".format(self.module_name))
+
+ def find_hyperopt_config(self):
+ """
+ Executes hyperopt on top of MOP.
+ """
+ make_mst_summary = True
+ trials = Trials()
+ domain = Domain(None, self.get_search_space())
+ rand_state = np.random.RandomState(self.random_state)
+ configs_lst = self.get_configs_list(self.num_configs, self.num_segments)
+
+ self.start_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT)
+ metrics_elapsed_time_offset = 0
+ model_training = None
+ for low, high in configs_lst:
+ i, n = low, high - low + 1
+
+ # Using HyperOpt TPE/ATPE to generate parameters
+ hyperopt_params = []
+ sampled_params = []
+ for j in range(i, i + n):
+ new_param = self.algorithm.suggest([j], domain, trials, rand_state.randint(0, AutoMLConstants.INT_MAX))
+ new_param[0]['status'] = STATUS_RUNNING
+
+ trials.insert_trial_docs(new_param)
+ trials.refresh()
+ hyperopt_params.append(new_param[0])
+ sampled_params.append(new_param[0]['misc']['vals'])
+
+ model_id_list, compile_params, fit_params = self.extract_param_vals(sampled_params)
+ msts_list = self.generate_msts(model_id_list, compile_params, fit_params)
+ self.remove_temp_tables(model_training)
+ self.populate_temp_mst_tables(i, msts_list)
+
+ plpy.info("***Evaluating {n} newly suggested model configurations***".format(n=n))
+ start_time = time.time()
+ with SetGUC("plan_cache_mode", "force_generic_plan"):
+ model_training = FitMultipleModel(self.schema_madlib, self.source_table, AutoMLConstants.TEMP_OUTPUT_TABLE,
+ AutoMLConstants.TEMP_MST_TABLE, self.num_iters, self.use_gpus, self.validation_table,
+ self.metrics_compute_frequency, False, self.name, self.description,
+ metrics_elapsed_time_offset=metrics_elapsed_time_offset)
+ metrics_elapsed_time_offset += time.time() - start_time
+ if make_mst_summary:
+ self.generate_mst_summary_table(self.model_selection_summary_table)
+ make_mst_summary = False
+
+ # HyperOpt TPE update
+ for k, hyperopt_param in enumerate(hyperopt_params, i):
+ loss_val = plpy.execute("SELECT {AutoMLSchema.LOSS_METRIC} FROM {model_training.model_info_table} " \
+ "WHERE {ModelSelectionSchema.MST_KEY}={k}".format(AutoMLSchema=AutoMLConstants,
+ ModelSelectionSchema=ModelSelectionSchema,
+ **locals()))[0][AutoMLConstants.LOSS_METRIC]
+
+ # avoid removing the two lines below (part of Hyperopt updates)
+ hyperopt_param['status'] = STATUS_OK
+ hyperopt_param['result'] = {'loss': loss_val, 'status': STATUS_OK}
+ trials.refresh()
+
+ # stacks info of all model configs together
+ self.update_model_output_and_info_tables(model_training)
+
+ self.print_best_mst_so_far()
+
+ self.end_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT)
+ self.update_model_selection_table()
+ self.generate_model_output_summary_table(model_training)
+ self.remove_temp_tables(model_training)
+
+ def get_configs_list(self, num_configs, num_segments):
+ """
+ Gets schedule to evaluate model configs
+ :return: Model configs evaluation schedule
+ """
+ num_buckets = int(round(float(num_configs) / num_segments))
+ configs_list = []
+ start_idx = 1
+ models_populated = 0
+ for _ in range(num_buckets - 1):
+ end_idx = start_idx + num_segments
+ models_populated += num_segments
+ configs_list.append((start_idx, end_idx - 1))
+ start_idx = end_idx
+
+ remaining_models = num_configs - models_populated
+ configs_list.append((start_idx, start_idx + remaining_models-1))
+
+ return configs_list
+
+ def get_search_space(self):
+ """
+ Converts user inputs to hyperopt search space.
+ :return: Hyperopt search space
+ """
+
+ # initial params (outside 'optimizer_params_list')
+ hyperopt_search_dict = {}
+ hyperopt_search_dict['model_id'] = self.get_hyperopt_exps('model_id', self.model_id_list)
+
+
+ for j in self.fit_params_grid:
+ hyperopt_search_dict[j] = self.get_hyperopt_exps(j, self.fit_params_grid[j])
+
+ for i in self.compile_params_grid:
+ if i != ModelSelectionSchema.OPTIMIZER_PARAMS_LIST:
+ hyperopt_search_dict[i] = self.get_hyperopt_exps(i, self.compile_params_grid[i])
+
+ hyperopt_search_space_lst = []
+
+ counter = 1 # for unique names to allow multiple distribution options for optimizer params
+ for optimizer_dict in self.compile_params_grid[ModelSelectionSchema.OPTIMIZER_PARAMS_LIST]:
+ for o_param in optimizer_dict:
+ name = o_param + '_' + str(counter)
+ hyperopt_search_dict[name] = self.get_hyperopt_exps(name, optimizer_dict[o_param])
+ # appending deep copy
+ hyperopt_search_space_lst.append({k:v for k, v in hyperopt_search_dict.items()})
+ for o_param in optimizer_dict:
+ name = o_param + '_' + str(counter)
+ del hyperopt_search_dict[name]
+ counter += 1
+
+ return hp.choice('space', hyperopt_search_space_lst)
+
+ def get_hyperopt_exps(self, cp, param_value_list):
+ """
+ Samples a value from a given list of values, either randomly from a list of discrete elements,
+ or from a specified distribution.
+ :param cp: compile param
+ :param param_value_list: list of values (or specified distribution) for a param
+ :return: sampled value
+ """
+ # check if need to sample from a distribution
+ if type(param_value_list[-1]) == str and all([type(i) != str and not callable(i) for i in param_value_list[:-1]]) \
+ and len(param_value_list) > 1:
+ _assert_equal(len(param_value_list), 3,
+ "{0}: '{1}' should have exactly 3 elements if picking from a distribution".format(self.module_name, cp))
+ _assert(param_value_list[1] > param_value_list[0],
+ "{0}: '{1}' should be of the format [lower_bound, upper_bound, distribution_type]".format(self.module_name, cp))
+ if param_value_list[-1] == 'linear':
+ return hp.uniform(cp, param_value_list[0], param_value_list[1])
+ elif param_value_list[-1] == 'log':
+ return hp.loguniform(cp, np.log(param_value_list[0]), np.log(param_value_list[1]))
+ else:
+ plpy.error("{0}: Please choose a valid distribution type for '{1}': {2}".format(
+ self.module_name,
+ self.original_param_details(cp)[0],
+ ['linear', 'log']))
+ else:
+ # random sampling
+ return hp.choice(cp, param_value_list)
+
+ def extract_param_vals(self, sampled_params):
+ """
+ Extract parameter values from hyperopt search space.
+ :param sampled_params: params suggested by hyperopt.
+ :return: lists of model ids, compile and fit params.
+ """
+ model_id_list, compile_params, fit_params = [], [], []
+ for params_dict in sampled_params:
+ compile_dict, fit_dict, optimizer_params_dict = {}, {}, {}
+ for p in params_dict:
+ if len(params_dict[p]) == 0 or p == 'space':
+ continue
+ val = params_dict[p][0]
+ if p == 'model_id':
+ model_id_list.append(self.model_id_list[val])
+ continue
+ elif p in self.fit_params_grid:
+ try:
+ # check if params_dict[p] is an index
+ fit_dict[p] = self.fit_params_grid[p][val]
+ except TypeError:
+ fit_dict[p] = params_dict[p]
+ elif p in self.compile_params_grid:
+ try:
+ # check if params_dict[p] is an index
+ compile_dict[p] = self.compile_params_grid[p][val]
+ except TypeError:
+ compile_dict[p] = val
+ else:
+ o_param, idx = self.original_param_details(p) # extracting unique attribute
+ try:
+ # check if params_dict[p] is an index (i.e. optimizer, for example)
+ optimizer_params_dict[o_param] = self.compile_params_grid[
+ ModelSelectionSchema.OPTIMIZER_PARAMS_LIST][idx][o_param][val]
+ except TypeError:
+ optimizer_params_dict[o_param] = val
+ compile_dict[ModelSelectionSchema.OPTIMIZER_PARAMS_LIST] = optimizer_params_dict
+
+ compile_params.append(compile_dict)
+ fit_params.append(fit_dict)
+
+ return model_id_list, compile_params, fit_params
+
+ def original_param_details(self, name):
+ """
+ Returns the original param name and book-keeping detail.
+ :param name: name of the param (example - lr_1, epsilon_12)
+ :return: original param name and book-keeping position.
+ """
+ parts = name.split('_')
+ return '_'.join(parts[:-1]), int(parts[-1]) - 1
+
+
+ def generate_msts(self, model_id_list, compile_params, fit_params):
+ """
+ Generates msts to insert in the mst table.
+ :param model_id_list: list of model ids
+ :param compile_params: list compile params
+ :param fit_params:list of fit params
+ :return: List of msts to insert in the mst table.
+ """
+ assert len(model_id_list) == len(compile_params) == len(fit_params)
+ msts = []
+
+ for i in range(len(compile_params)):
+ combination = {}
+ combination[ModelSelectionSchema.MODEL_ID] = model_id_list[i]
+ combination[ModelSelectionSchema.COMPILE_PARAMS] = generate_row_string(compile_params[i])
+ combination[ModelSelectionSchema.FIT_PARAMS] = generate_row_string(fit_params[i])
+ msts.append(combination)
+
+ return msts
+
+ def populate_temp_mst_tables(self, i, msts_list):
+ """
+ Creates and populates temp mst and summary tables with newly suggested model configs for evaluation.
+ :param i: mst key number
+ :param msts_list: list of generated msts.
+ """
+ # extra sanity check
+ if table_exists(AutoMLConstants.TEMP_MST_TABLE):
+ drop_tables([AutoMLConstants.TEMP_MST_TABLE])
+
+ create_query = """
+ CREATE TABLE {AutoMLSchema.TEMP_MST_TABLE} (
+ {mst_key} INTEGER,
+ {model_id} INTEGER,
+ {compile_params} VARCHAR,
+ {fit_params} VARCHAR,
+ unique ({model_id}, {compile_params}, {fit_params})
+ );
+ """.format(AutoMLSchema=AutoMLConstants,
+ mst_key=ModelSelectionSchema.MST_KEY,
+ model_id=ModelSelectionSchema.MODEL_ID,
+ compile_params=ModelSelectionSchema.COMPILE_PARAMS,
+ fit_params=ModelSelectionSchema.FIT_PARAMS)
+ plpy.execute(create_query)
+ mst_key_val = i
+ for mst in msts_list:
+ model_id = mst[ModelSelectionSchema.MODEL_ID]
+ compile_params = mst[ModelSelectionSchema.COMPILE_PARAMS]
+ fit_params = mst[ModelSelectionSchema.FIT_PARAMS]
+ insert_query = """
+ INSERT INTO
+ {AutoMLSchema.TEMP_MST_TABLE}(
+ {mst_key_col},
+ {model_id_col},
+ {compile_params_col},
+ {fit_params_col}
+ )
+ VALUES (
+ {mst_key_val},
+ {model_id},
+ $${compile_params}$$,
+ $${fit_params}$$
+ )
+ """.format(mst_key_col=ModelSelectionSchema.MST_KEY,
+ model_id_col=ModelSelectionSchema.MODEL_ID,
+ compile_params_col=ModelSelectionSchema.COMPILE_PARAMS,
+ fit_params_col=ModelSelectionSchema.FIT_PARAMS,
+ AutoMLSchema=AutoMLConstants,
+ **locals())
+ mst_key_val += 1
+ plpy.execute(insert_query)
+
+ self.generate_mst_summary_table(AutoMLConstants.TEMP_MST_SUMMARY_TABLE)
+
+ def generate_mst_summary_table(self, tbl_name):
+ """
+ generates mst summary table with the given name
+ :param tbl_name: name of summary table
+ """
+ _assert(tbl_name.endswith('_summary'), 'invalid summary table name')
+
+ # extra sanity check
+ if table_exists(tbl_name):
+ drop_tables([tbl_name])
+
+ create_query = """
+ CREATE TABLE {tbl_name} (
+ {model_arch_table} VARCHAR,
+ {object_table} VARCHAR
+ );
+ """.format(tbl_name=tbl_name,
+ model_arch_table=ModelSelectionSchema.MODEL_ARCH_TABLE,
+ object_table=ModelSelectionSchema.OBJECT_TABLE)
+ plpy.execute(create_query)
+
+ if self.object_table is None:
+ object_table = 'NULL::VARCHAR'
+ else:
+ object_table = '$${0}$$'.format(self.object_table)
+ insert_summary_query = """
+ INSERT INTO
+ {tbl_name}(
+ {model_arch_table_name},
+ {object_table_name}
+ )
+ VALUES (
+ $${self.model_arch_table}$$,
+ {object_table}
+ )
+ """.format(model_arch_table_name=ModelSelectionSchema.MODEL_ARCH_TABLE,
+ object_table_name=ModelSelectionSchema.OBJECT_TABLE,
+ **locals())
+ plpy.execute(insert_summary_query)
+
+ def update_model_output_and_info_tables(self, model_training):
+ """
+ Updates model output and info tables by stacking rows after each evaluation round.
+ :param model_training: Fit Multiple class object
+ """
+ metrics_iters = plpy.execute("SELECT {AutoMLSchema.METRICS_ITERS} " \
+ "FROM {model_training.original_model_output_table}_summary".format(self=self,
+ model_training=model_training,
+ AutoMLSchema=AutoMLConstants))[0][AutoMLConstants.METRICS_ITERS]
+ if metrics_iters:
+ metrics_iters = "ARRAY{0}".format(metrics_iters)
+ # stacking new rows from training
+ plpy.execute("INSERT INTO {self.model_output_table} SELECT * FROM " \
+ "{model_training.original_model_output_table}".format(self=self, model_training=model_training))
+ plpy.execute("INSERT INTO {self.model_info_table} SELECT *, {metrics_iters} FROM " \
+ "{model_training.model_info_table}".format(self=self,
+ model_training=model_training,
+ metrics_iters=metrics_iters))
diff --git a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras_automl.py_in b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras_automl.py_in
index 946dde3..9db4ea1 100644
--- a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras_automl.py_in
+++ b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras_automl.py_in
@@ -18,11 +18,11 @@
# under the License.
m4_changequote(`<!', `!>')
+m4_ifdef(<!__POSTGRESQL__!>, <!print 'skipping automl for postgres'!>, <!
import sys
from os import path
import math
-# Add convex module to the pythonpath. # TODO: ?
sys.path.append(path.dirname(path.dirname(path.dirname(path.dirname(path.abspath(__file__))))))
sys.path.append(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))
@@ -46,8 +46,8 @@ class HyperbandScheduleTestCase(unittest.TestCase):
self.module_patcher = patch.dict('sys.modules', patches)
self.module_patcher.start()
- import deep_learning.madlib_keras_automl
- self.module = deep_learning.madlib_keras_automl
+ import deep_learning.madlib_keras_automl_hyperband
+ self.module = deep_learning.madlib_keras_automl_hyperband
# self.module.MstLoaderInputValidator._validate_input_args = \
# MagicMock()
@@ -222,13 +222,13 @@ class AutoMLHyperoptTestCase(unittest.TestCase):
self.module_patcher = patch.dict('sys.modules', patches)
self.module_patcher.start()
- import deep_learning.madlib_keras_automl
- self.module = deep_learning.madlib_keras_automl
+ import deep_learning.madlib_keras_automl_hyperopt
+ self.module = deep_learning.madlib_keras_automl_hyperopt
- from deep_learning.madlib_keras_automl import AutoMLHyperopt
+ # from deep_learning.madlib_keras_automl_hyperopt import AutoMLHyperopt
self.seg_num_mock = Mock()
- class FakeAutoMLHyperopt(AutoMLHyperopt):
+ class FakeAutoMLHyperopt(self.module.AutoMLHyperopt):
def __init__(self, *args):
pass
self.module.get_seg_number = self.seg_num_mock
@@ -282,3 +282,5 @@ class AutoMLHyperoptTestCase(unittest.TestCase):
if __name__ == '__main__':
unittest.main()
+
+!>)