You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/28 17:32:26 UTC
[1/2] incubator-usergrid git commit: Add helper script for migrating
entity data to a new version.
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-dev b7fd30051 -> eb8a18105
Add helper script for migrating entity data to a new version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a0002599
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a0002599
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a0002599
Branch: refs/heads/two-dot-o-dev
Commit: a00025992c04fbf7a33e8dd9a354284d7ae831c5
Parents: 48689eb
Author: Michael Russo <mi...@gmail.com>
Authored: Fri Jul 17 16:28:49 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Fri Jul 17 16:32:58 2015 -0700
----------------------------------------------------------------------
stack/scripts/migrate_entity_data.py | 299 ++++++++++++++++++++++++++++++
1 file changed, 299 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a0002599/stack/scripts/migrate_entity_data.py
----------------------------------------------------------------------
diff --git a/stack/scripts/migrate_entity_data.py b/stack/scripts/migrate_entity_data.py
new file mode 100644
index 0000000..060de83
--- /dev/null
+++ b/stack/scripts/migrate_entity_data.py
@@ -0,0 +1,299 @@
+
+# Usage from a machine running Usergrid:
+#
+# python migrate_entity_data.py -u adminuser:adminpass (standard data migration and reindex)
+# python migrate_entity_data.py -u adminuser:adminpass -f (force a re-migration )
+# python migrate_entity_data.py -u adminuser:adminpass -d <timestamp> (re-index only from the timestamp specified)
+#
+#
+
+
+import sys
+import logging
+from logging.handlers import RotatingFileHandler
+import argparse
+import time
+import requests
+import json
+
+
+# Version expected in status response post-migration for entity and app-info data
+TARGET_VERSION = 2
+
+# Set an interval (in seconds) for checking if re-index and/or migration has finished
+STATUS_INTERVAL_SECONDS = 2
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(description='Usergrid Migration Tool')
+
+ parser.add_argument('-d', '--date',
+ help='A date from which to start the migration',
+ type=str)
+
+ parser.add_argument('-e', '--endpoint',
+ help='The endpoint to use for making API requests.',
+ type=str,
+ default='http://localhost:8080')
+
+ parser.add_argument('-u', '--user',
+ help='System Admin Credentials used to authenticate with Usergrid <user:pass>',
+ type=str,
+ required=True)
+
+ parser.add_argument('-f', '--force',
+ help='Force the entity data migration to run. Used for delta data migrations.',
+ action='store_true',
+ default=False)
+
+ my_args = parser.parse_args(sys.argv[1:])
+
+ arg_vars = vars(my_args)
+ creds = arg_vars['user'].split(':')
+ if len(creds) != 2:
+ print('Credentials not properly specified. Must be "-u <user:pass>". Exiting...')
+ exit_on_error()
+ else:
+ arg_vars['user'] = creds[0]
+ arg_vars['pass'] = creds[1]
+
+ return arg_vars
+
+
+
+class Migrate:
+ def __init__(self):
+ self.args = parse_args()
+ self.start_date = self.args['date']
+ self.endpoint = self.args['endpoint']
+ self.metrics = {'reindex_start': '',
+ 'reindex_end': '',
+ 'data_migration_start': '',
+ 'data_migration_end': ''}
+ self.logger = init_logging(self.__class__.__name__)
+ self.admin_user = self.args['user']
+ self.admin_pass = self.args['pass']
+ self.force_migration = self.args['force']
+
+ def run(self):
+ self.logger.info('Initializing...')
+ reindex_only = False
+
+ if not self.is_endpoint_available():
+ exit_on_error('Endpoint is not available, aborting')
+
+ if self.start_date is not None:
+ self.logger.info("Date Provided. Only-Re-index will run from date=[%s]", self.start_date)
+ reindex_only = True
+ else:
+ if self.is_data_migrated():
+ if self.force_migration:
+ self.logger.info('Force option provided.')
+ self.reset_data_migration()
+ time.sleep(STATUS_INTERVAL_SECONDS)
+ self.is_data_migrated()
+ else:
+ self.logger.error('Entity Data has already been migrated. To re-run data migration provide the'
+ ' force parameter: python migrate.py -u <user:pass> -f')
+ exit_on_error('Entity Data has already been migrated')
+
+ try:
+ job = self.start_reindex()
+ self.metrics['reindex_start'] = get_current_time()
+ self.logger.info('Started Re-index. Job=[%s]', job)
+ is_running = True
+ while is_running:
+ time.sleep(STATUS_INTERVAL_SECONDS)
+ is_running = self.is_reindex_running(job)
+ if not is_running:
+ break
+
+ self.logger.info("Finished Re-index. Job=[%s]", job)
+ self.metrics['reindex_end'] = get_current_time()
+
+ if not reindex_only:
+ self.start_data_migration()
+ self.metrics['data_migration_start'] = get_current_time()
+ self.logger.info("Entity Data Migration Started")
+ is_migrated = False
+ while not is_migrated:
+ time.sleep(STATUS_INTERVAL_SECONDS)
+ is_migrated = self.is_data_migrated()
+ if is_migrated:
+ break
+
+ self.metrics['data_migration_end'] = get_current_time()
+ self.logger.info("Entity Data Migration completed")
+
+ self.log_metrics()
+ self.logger.info("Finished...")
+
+ except KeyboardInterrupt:
+ self.log_metrics()
+ self.logger.error('Keyboard interrupted migration. Please run again to ensure the migration finished.')
+
+ def get_migration_url(self):
+ url = self.endpoint + '/system/migrate/run'
+ return url
+
+ def get_reset_migration_url(self):
+ url = self.endpoint + '/system/migrate/set'
+ return url
+
+ def get_migration_status_url(self):
+ url = self.endpoint + '/system/migrate/status'
+ return url
+
+ def get_reindex_url(self):
+ url = self.endpoint + '/system/index/rebuild'
+ return url
+
+ def start_data_migration(self):
+ try:
+ r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.admin_pass))
+ response = r.json()
+ return response
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to start migration, %s', e)
+ exit_on_error(str(e))
+
+ def reset_data_migration(self):
+ version = TARGET_VERSION - 1
+ body = json.dumps({'collections-entity-data': version, 'appinfo-migration': version})
+ try:
+ r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.admin_user, self.admin_pass))
+ response = r.json()
+ self.logger.info('Resetting data migration versions to collections-entity-data=[v%s] '
+ 'and appinfo-migration=[v%s]', version, version)
+ return response
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to start migration, %s', e)
+ exit_on_error(str(e))
+
+ def is_data_migrated(self):
+ status = self.check_data_migration_status()
+ if status is not None:
+ entity_version = status['data']['collections-entity-data']
+ appinfo_version = status['data']['appinfo-migration']
+
+ if entity_version == TARGET_VERSION and appinfo_version == TARGET_VERSION:
+ self.logger.info('Data Migration status=[COMPLETE], collections-entity-data=[v%s], '
+ 'appinfo-migration=[v%s]',
+ entity_version,
+ appinfo_version)
+ return True
+ else:
+ self.logger.info('Data Migration status=[NOTSTARTED/INPROGRESS]')
+ return False
+
+ def check_data_migration_status(self):
+
+ try:
+ r = requests.get(url=self.get_migration_status_url(), auth=(self.admin_user, self.admin_pass))
+ if r.status_code == 200:
+ response = r.json()
+ return response
+ else:
+ self.logger.error('Failed to check migration status, %s', r)
+ return
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to check migration status, %s', e)
+ # exit_on_error()
+
+ def get_reindex_status(self, job):
+ status_url = self.get_reindex_url()+'/' + job
+
+ try:
+ r = requests.get(url=status_url, auth=(self.admin_user, self.admin_pass))
+ response = r.json()
+ return response['status']
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to get reindex status, %s', e)
+ # exit_on_error()
+
+ def start_reindex(self):
+ body = ""
+ if self.start_date is not None:
+ body = json.dumps({'updated': self.start_date})
+
+ try:
+ r = requests.post(url=self.get_reindex_url(), data=body, auth=(self.admin_user, self.admin_pass))
+
+ if r.status_code == 200:
+ response = r.json()
+ return response['jobId']
+ else:
+ self.logger.error('Failed to start reindex, %s', r)
+ exit_on_error(str(r))
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Unable to make API request for reindex, %s', e)
+ exit_on_error(str(e))
+
+ def is_reindex_running(self, job):
+ status = self.get_reindex_status(job)
+ self.logger.info('Re-index status=[%s]', status)
+ if status != "COMPLETE":
+ return True
+ else:
+ return False
+
+ def is_endpoint_available(self):
+
+ try:
+ r = requests.get(url=self.endpoint+'/status')
+ if r.status_code == 200:
+ return True
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Endpoint is unavailable, %s', str(e))
+ return False
+
+ def log_metrics(self):
+ self.logger.info(
+ 'Re-index start=[%s], ' +
+ 'Re-index end =[%s], ' +
+ 'Entity Migration start=[%s], ' +
+ 'Entity Migration end=[%s] ',
+ self.metrics['reindex_start'],
+ self.metrics['reindex_end'],
+ self.metrics['data_migration_start'],
+ self.metrics['data_migration_end']
+
+ )
+
+
+def get_current_time():
+ return str(int(time.time()*1000))
+
+
+def exit_on_error(e=""):
+ print ('Exiting migration script due to error: ' + str(e))
+ sys.exit(1)
+
+
+def init_logging(name):
+
+ logger = logging.getLogger(name)
+ log_file_name = './migration.log'
+ log_formatter = logging.Formatter(fmt='%(asctime)s | %(name)s | %(levelname)s | %(message)s',
+ datefmt='%m/%d/%Y %I:%M:%S %p')
+
+ rotating_file = logging.handlers.RotatingFileHandler(filename=log_file_name,
+ mode='a',
+ maxBytes=104857600,
+ backupCount=10)
+ rotating_file.setFormatter(log_formatter)
+ rotating_file.setLevel(logging.INFO)
+ logger.addHandler(rotating_file)
+ logger.setLevel(logging.INFO)
+
+ stdout_logger = logging.StreamHandler(sys.stdout)
+ stdout_logger.setFormatter(log_formatter)
+ stdout_logger.setLevel(logging.INFO)
+ logger.addHandler(stdout_logger)
+
+ return logger
+
+if __name__ == '__main__':
+
+ migration = Migrate()
+ migration.run()
[2/2] incubator-usergrid git commit: Merge branch 'pr/311' into
two-dot-o-dev
Posted by sf...@apache.org.
Merge branch 'pr/311' into two-dot-o-dev
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/eb8a1810
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/eb8a1810
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/eb8a1810
Branch: refs/heads/two-dot-o-dev
Commit: eb8a18105a7059b8243b170f99351c49d973496c
Parents: b7fd300 a000259
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Jul 28 09:32:18 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Jul 28 09:32:18 2015 -0600
----------------------------------------------------------------------
stack/scripts/migrate_entity_data.py | 299 ++++++++++++++++++++++++++++++
1 file changed, 299 insertions(+)
----------------------------------------------------------------------