You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/11/06 21:38:25 UTC
[31/50] [abbrv] usergrid git commit: Rename new script to
multitenant-migrate and restore old script.
Rename new script to multitenant-migrate and restore old script.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8af152e6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8af152e6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8af152e6
Branch: refs/heads/asf-site
Commit: 8af152e6bef74aa6259aff5a69428523da35d0bf
Parents: 57a613b
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Oct 29 18:00:54 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Oct 29 18:00:54 2015 -0400
----------------------------------------------------------------------
stack/scripts/migrate_entity_data.py | 401 +++++------------
stack/scripts/multitenant_migrate.py | 703 ++++++++++++++++++++++++++++++
2 files changed, 816 insertions(+), 288 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8af152e6/stack/scripts/migrate_entity_data.py
----------------------------------------------------------------------
diff --git a/stack/scripts/migrate_entity_data.py b/stack/scripts/migrate_entity_data.py
index 0edd319..9c01270 100644
--- a/stack/scripts/migrate_entity_data.py
+++ b/stack/scripts/migrate_entity_data.py
@@ -16,64 +16,30 @@
# under the License.
#
#
-# To migrate multiple tenants within one cluster.
#
-# STEP 1 - SETUP TENANT ONE TOMCAT RUNNING 2.1 NOT IN SERVICE AND INIT MIGRATION
+# Usage from a machine running Usergrid with the new Usergrid version:
#
-# Login to the Tomcat instance and run this command, specifying both superuser and tenant organization admin creds:
+# ######################################################
+# STEP 1 - BEFORE SWITCHING TRAFFIC TO NEW UG VERSION
+# ######################################################
#
-# python migrate_entity_data.py --org <org1name> --super <user>:<pass> --admin <user>:<pass> --init
+# python migrate_entity_data.py --user adminuser:adminpass
#
-# This command will setup and bootstrap the database, setup the migration system and update index mappings:
-# - /system/database/setup
-# - /system/database/bootstrap
-# - /system/migrate/run/migration-system
-# - /system/migrate/run/index_mapping_migration
+# The above command performs an appinfo migration and system re-index only. This creates indices in Elasticsearch with
+# the updated indexing strategy in the new Usergrid version.
#
-# Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps
-# it will de-dup connections and re-index the app.
+# ######################################################
+# STEP 2 - AFTER SWITCHING TRAFFIC TO NEW UG VERSION
+# ######################################################
#
-# Write down the 'Re-index start' timestamp when this is finished.
+# python migrate_entity_data.py --user adminuser:adminpass --delta --date <timestamp>
#
-# STEP 2 - PUT TENANT ONE TOMCATS IN SERVICE AND DO DELTA MIGRATION
-#
-# On the same Tomcat instance and run this command with the --date timestamp you noted in the previous step:
-#
-# python migrate_entity_data.py --org <org1name> --super <user>:<pass> --admin <user>:<pass> --date <timestamp>
-#
-# Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps
-# it will de-dup connections and re-index the app with a start-date specified so only data modified since
-# STEP 1 will be re-indexed.
-#
-# STEP 3 - SETUP TENANT TWO TOMCAT RUNNING 2.1 NOT IN SERVICE
-#
-# Login to the Tomcat instance and run this command, specifying both superuser and tenant organization admin creds:
-#
-# python migrate_entity_data.py --org <org2name> --super <user>:<pass> --admin <user>:<pass>
-#
-# This command will migrate appinfos, re-index the management app and then for each of the specified org's apps
-# it will de-dup connections and re-index the app.
-#
-# Write down the 'Re-index start' timestamp when this is finished.
+# The above command performs an appinfo migration, system re-index using a start date, and full data migration which
+# includes entity data. This step is necessary to ensure Usergrid starts reading and writing data from the latest
+# entity version, including delta indexing of any documents create during the time between STEP 1 and STEP 2. If
+# all data has already been migrated (running this a 2nd, 3rd, etc. time), then the appinfo migration will be skipped.
+
-# STEP 4 - PUT TENANT TWO TOMCATS IN SERVICE AND DO DELTA MIGRATION
-#
-# On the same Tomcat instance and run this command with the --date timestamp you noted in the previous step:
-#
-# python migrate_entity_data.py --org <org2name> --super <user>:<pass> --admin <user>:<pass> --date <timestamp>
-#
-# Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps
-# it will de-dup connections and re-index the app with a start-date specified so only data modified since
-# STEP 1 will be re-indexed.
-#
-# STEP 5 - FULL DATA MIGRATION
-#
-# Login to any Tomcat instance in the cluster and run this command (admin user creds must be specificed but will be ignored):
-#
-# python migrate_entity_data.py --super <user>:<pass> --admin <user>:<pass> --full
-#
-# This command will run the full data migration.
-#
import sys
import logging
@@ -101,66 +67,40 @@ PLUGIN_ENTITYDATA = 'collections-entity-data'
PLUGIN_INDEX_MAPPING = 'index_mapping_migration'
PLUGIN_CORE_DATA = 'core-data'
-MANAGEMENT_APP_ID = 'b6768a08-b5d5-11e3-a495-11ddb1de66c8'
-
def parse_args():
parser = argparse.ArgumentParser(description='Usergrid Migration Tool')
+ parser.add_argument('--date',
+ help='A date from which to start the migration',
+ type=str)
+
parser.add_argument('--endpoint',
help='The endpoint to use for making API requests.',
type=str,
default='http://localhost:8080')
- parser.add_argument('--super',
- help='Superuser username and creds <user:pass>',
+ parser.add_argument('--user',
+ help='System Admin Credentials used to authenticate with Usergrid <user:pass>',
type=str,
required=True)
- parser.add_argument('--admin',
- help='Organization admin creds <user:pass>',
- type=str,
- required=True)
-
- parser.add_argument('--init',
- help='Init system and start first migration.',
- action='store_true',
- default=False)
-
- parser.add_argument('--org',
- help='Name of organization on which to run migration.',
- type=str,
- required=False)
-
- parser.add_argument('--date',
- help='A date from which to start the migration',
- type=str)
-
- parser.add_argument('--full',
- help='Run full data migration (last step in cluster migration).',
+ parser.add_argument('--delta',
+ help='Run a delta migration.',
action='store_true',
default=False)
my_args = parser.parse_args(sys.argv[1:])
arg_vars = vars(my_args)
-
- creds = arg_vars['super'].split(':')
- if len(creds) != 2:
- print('Superuser credentials not properly specified. Must be "-u <user:pass>". Exiting...')
- exit_on_error()
- else:
- arg_vars['superuser'] = creds[0]
- arg_vars['superpass'] = creds[1]
-
- creds = arg_vars['super'].split(':')
+ creds = arg_vars['user'].split(':')
if len(creds) != 2:
- print('Org admin credentials not properly specified. Must be "-u <user:pass>". Exiting...')
+ print('Credentials not properly specified. Must be "-u <user:pass>". Exiting...')
exit_on_error()
else:
- arg_vars['adminuser'] = creds[0]
- arg_vars['adminpass'] = creds[1]
+ arg_vars['user'] = creds[0]
+ arg_vars['pass'] = creds[1]
return arg_vars
@@ -178,13 +118,9 @@ class Migrate:
'full_data_migration_start': '',
'full_data_migration_end': ''}
self.logger = init_logging(self.__class__.__name__)
- self.super_user = self.args['superuser']
- self.super_pass = self.args['superpass']
- self.admin_user = self.args['adminuser']
- self.admin_pass = self.args['adminpass']
- self.org = self.args['org']
- self.init = self.args['init']
- self.full = self.args['full']
+ self.admin_user = self.args['user']
+ self.admin_pass = self.args['pass']
+ self.delta_migration = self.args['delta']
def run(self):
self.logger.info('Initializing...')
@@ -197,83 +133,63 @@ class Migrate:
try:
- if self.full:
+ self.run_database_setup()
- # Do full data migration and exit
+ # We need to check and roll the migration system to 1 if not already
+ migration_system_updated = self.is_migration_system_updated()
- self.start_fulldata_migration()
-
- self.metrics['full_data_migration_start'] = get_current_time()
- self.logger.info("Full Data Migration Started")
- is_migrated = False
- while not is_migrated:
+ if not migration_system_updated:
+ self.logger.info('Migration system needs to be updated. Updating migration system..')
+ self.start_migration_system_update()
+ while not migration_system_updated:
time.sleep(STATUS_INTERVAL_SECONDS)
- is_migrated = self.is_data_migrated()
- if is_migrated:
+ migration_system_updated = self.is_migration_system_updated()
+ if migration_system_updated:
break
- self.metrics['full_data_migration_end'] = get_current_time()
- self.logger.info("Full Data Migration completed")
+ index_mapping_updated = self.is_index_mapping_updated()
- self.log_metrics()
- self.logger.info("Finished...")
-
- return
-
-
- if self.init:
-
- # Init the migration system as this is the first migration done on the cluster
-
- self.run_database_setup()
-
- migration_system_updated = self.is_migration_system_updated()
-
- if not migration_system_updated:
- self.logger.info('Migration system needs to be updated. Updating migration system..')
- self.start_migration_system_update()
- while not migration_system_updated:
- time.sleep(STATUS_INTERVAL_SECONDS)
- migration_system_updated = self.is_migration_system_updated()
- if migration_system_updated:
- break
+ if not index_mapping_updated:
+ self.logger.info('Index Mapping needs to be updated. Updating index mapping..')
+ self.start_index_mapping_migration()
+ while not index_mapping_updated:
+ time.sleep(STATUS_INTERVAL_SECONDS)
+ index_mapping_updated = self.is_index_mapping_updated()
+ if index_mapping_updated:
+ break
- index_mapping_updated = self.is_index_mapping_updated()
+ # Run AppInfo migration only when both appinfos and collection entity data have not been migrated
+ if not self.is_data_migrated():
- if not index_mapping_updated:
- self.logger.info('Index Mapping needs to be updated. Updating index mapping..')
- self.start_index_mapping_migration()
- while not index_mapping_updated:
- time.sleep(STATUS_INTERVAL_SECONDS)
- index_mapping_updated = self.is_index_mapping_updated()
- if index_mapping_updated:
- break
+ #Migrate app info
+ if self.is_appinfo_migrated():
+ self.logger.info('AppInfo already migrated. Resetting version for re-migration.')
+ self.reset_appinfo_migration()
+ time.sleep(STATUS_INTERVAL_SECONDS)
+ self.start_appinfo_migration()
+ self.logger.info('AppInfo Migration Started.')
+ self.metrics['appinfo_migration_start'] = get_current_time()
- # Migrate app info
+ is_appinfo_migrated = False
+ while not is_appinfo_migrated:
+ is_appinfo_migrated = self.is_appinfo_migrated()
+ time.sleep(STATUS_INTERVAL_SECONDS)
+ if is_appinfo_migrated:
+ self.metrics['appinfo_migration_end'] = get_current_time()
+ break
+ self.logger.info('AppInfo Migration Ended.')
- if self.is_appinfo_migrated():
- self.logger.info('AppInfo already migrated. Resetting version for re-migration.')
- self.reset_appinfo_migration()
- time.sleep(STATUS_INTERVAL_SECONDS)
- self.start_appinfo_migration()
- self.logger.info('AppInfo Migration Started.')
- self.metrics['appinfo_migration_start'] = get_current_time()
+ else:
+ self.logger.info('Full Data Migration previously ran... skipping AppInfo migration.')
- is_appinfo_migrated = False
- while not is_appinfo_migrated:
- is_appinfo_migrated = self.is_appinfo_migrated()
- time.sleep(STATUS_INTERVAL_SECONDS)
- if is_appinfo_migrated:
- self.metrics['appinfo_migration_end'] = get_current_time()
- break
- self.logger.info('AppInfo Migration Ended.')
- # Reindex management app
+ # We need to check and roll index mapping version to 1 if not already there
- job = self.start_app_reindex(MANAGEMENT_APP_ID)
+ # Perform system re-index (it will grab date from input if provided)
+ job = self.start_reindex()
self.metrics['reindex_start'] = get_current_time()
self.logger.info('Started Re-index. Job=[%s]', job)
is_running = True
@@ -286,44 +202,33 @@ class Migrate:
self.logger.info("Finished Re-index. Job=[%s]", job)
self.metrics['reindex_end'] = get_current_time()
+ # Only when we do a delta migration do we run the full data migration (includes appinfo and entity data)
+ if self.delta_migration:
- # Dedup and re-index all of organization's apps
-
- app_ids = self.get_app_ids()
- for app_id in app_ids:
-
- # De-dep app
- # job = self.start_dedup(app_id)
- # self.metrics['dedup_start_' + app_id] = get_current_time()
- # self.logger.info('Started dedup. App=[%s], Job=[%s]', app_id, job)
- # is_running = True
- # while is_running:
- # time.sleep(STATUS_INTERVAL_SECONDS)
- # is_running = self.is_dedup_running(job)
- # if not is_running:
- # break
- #
- # self.logger.info("Finished dedup. App=[%s], Job=[%s]", app_id, job)
- # self.metrics['dedup_end_' + app_id] = get_current_time()
-
- # Re-index app
- job = self.start_app_reindex(app_id)
- self.metrics['reindex_start_' + app_id] = get_current_time()
- self.logger.info('Started Re-index. App=[%s], Job=[%s]', app_id, job)
- is_running = True
- while is_running:
+ self.logger.info('Delta option provided. Performing full data migration...')
+ if self.is_data_migrated():
+ self.reset_data_migration()
+ time.sleep(STATUS_INTERVAL_SECONDS)
+ self.is_data_migrated()
+
+ # self.start_core_data_migration()
+ self.start_fulldata_migration()
+
+ self.metrics['full_data_migration_start'] = get_current_time()
+ self.logger.info("Full Data Migration Started")
+ is_migrated = False
+ while not is_migrated:
time.sleep(STATUS_INTERVAL_SECONDS)
- is_running = self.is_reindex_running(job)
- if not is_running:
+ is_migrated = self.is_data_migrated()
+ if is_migrated:
break
- self.logger.info("Finished Re-index. App=[%s], Job=[%s]", app_id, job)
- self.metrics['reindex_end_' + app_id] = get_current_time()
+ self.metrics['full_data_migration_end'] = get_current_time()
+ self.logger.info("Full Data Migration completed")
self.log_metrics()
self.logger.info("Finished...")
-
except KeyboardInterrupt:
self.log_metrics()
self.logger.error('Keyboard interrupted migration. Please run again to ensure the migration finished.')
@@ -332,10 +237,6 @@ class Migrate:
url = self.endpoint + '/system/database/setup'
return url
- def get_database_bootstrap_url(self):
- url = self.endpoint + '/system/database/bootstrap'
- return url
-
def get_migration_url(self):
url = self.endpoint + '/system/migrate/run'
return url
@@ -348,30 +249,28 @@ class Migrate:
url = self.endpoint + '/system/migrate/status'
return url
- def get_dedup_url(self):
- url = self.endpoint + '/system/connection/dedup'
- return url
-
def get_reindex_url(self):
url = self.endpoint + '/system/index/rebuild'
return url
def get_management_reindex_url(self):
- url = self.get_reindex_url() + "/management"
- return url
+ url = self.get_reindex_url() + "/management"
+ return url
+
def start_core_data_migration(self):
- try:
- r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.super_pass))
- response = r.json()
- return response
- except requests.exceptions.RequestException as e:
- self.logger.error('Failed to start migration, %s', e)
- exit_on_error(str(e))
+ try:
+ r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.admin_pass))
+ response = r.json()
+ return response
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to start migration, %s', e)
+ exit_on_error(str(e))
+
def start_fulldata_migration(self):
try:
- r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.super_pass))
+ r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.admin_pass))
response = r.json()
return response
except requests.exceptions.RequestException as e:
@@ -380,9 +279,9 @@ class Migrate:
def start_migration_system_update(self):
try:
- # TODO fix this URL
+ #TODO fix this URL
migrateUrl = self.get_migration_url() + '/' + PLUGIN_MIGRATION_SYSTEM
- r = requests.put(url=migrateUrl, auth=(self.admin_user, self.super_pass))
+ r = requests.put(url=migrateUrl, auth=(self.admin_user, self.admin_pass))
response = r.json()
return response
except requests.exceptions.RequestException as e:
@@ -392,7 +291,7 @@ class Migrate:
def run_database_setup(self):
try:
setupUrl = self.get_database_setup_url()
- r = requests.put(url=setupUrl, auth=(self.super_user, self.super_pass))
+ r = requests.put(url=setupUrl, auth=(self.admin_user, self.admin_pass))
if r.status_code != 200:
exit_on_error('Database Setup Failed')
@@ -400,21 +299,10 @@ class Migrate:
self.logger.error('Failed to run database setup, %s', e)
exit_on_error(str(e))
- def run_database_bootstrap(self):
- try:
- setupUrl = self.get_database_bootstrap_url()
- r = requests.put(url=setupUrl, auth=(self.super_user, self.super_pass))
- if r.status_code != 200:
- exit_on_error('Database Bootstrap Failed')
-
- except requests.exceptions.RequestException as e:
- self.logger.error('Failed to run database bootstrap, %s', e)
- exit_on_error(str(e))
-
def start_index_mapping_migration(self):
try:
migrateUrl = self.get_migration_url() + '/' + PLUGIN_INDEX_MAPPING
- r = requests.put(url=migrateUrl, auth=(self.super_user, self.super_pass))
+ r = requests.put(url=migrateUrl, auth=(self.admin_user, self.admin_pass))
response = r.json()
return response
except requests.exceptions.RequestException as e:
@@ -435,7 +323,7 @@ class Migrate:
version = TARGET_ENTITY_DATA_VERSION - 1
body = json.dumps({PLUGIN_ENTITYDATA: version, PLUGIN_APPINFO: version})
try:
- r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.super_user, self.super_pass))
+ r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.admin_user, self.admin_pass))
response = r.json()
self.logger.info('Resetting data migration versions to %s=[%s] '
'and %s=[%s]', PLUGIN_ENTITYDATA, version, PLUGIN_APPINFO, version)
@@ -448,7 +336,7 @@ class Migrate:
version = TARGET_APPINFO_VERSION - 1
body = json.dumps({PLUGIN_APPINFO: version})
try:
- r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.super_user, self.super_pass))
+ r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.admin_user, self.admin_pass))
response = r.json()
self.logger.info('Resetting appinfo migration versions to %s=[%s]', PLUGIN_APPINFO, version)
return response
@@ -527,7 +415,7 @@ class Migrate:
def check_data_migration_status(self):
try:
- r = requests.get(url=self.get_migration_status_url(), auth=(self.super_user, self.super_pass))
+ r = requests.get(url=self.get_migration_status_url(), auth=(self.admin_user, self.admin_pass))
if r.status_code == 200:
response = r.json()
return response
@@ -542,20 +430,20 @@ class Migrate:
status_url = self.get_reindex_url()+'/' + job
try:
- r = requests.get(url=status_url, auth=(self.super_user, self.super_pass))
+ r = requests.get(url=status_url, auth=(self.admin_user, self.admin_pass))
response = r.json()
return response['status']
except requests.exceptions.RequestException as e:
self.logger.error('Failed to get reindex status, %s', e)
# exit_on_error()
- def start_app_reindex(self, appId):
+ def start_reindex(self):
body = ""
if self.start_date is not None:
body = json.dumps({'updated': self.start_date})
try:
- r = requests.post(url=self.get_reindex_url(), data=body, auth=(self.super_user, self.super_pass))
+ r = requests.post(url=self.get_reindex_url(), data=body, auth=(self.admin_user, self.admin_pass))
if r.status_code == 200:
response = r.json()
@@ -575,41 +463,6 @@ class Migrate:
else:
return False
- def get_dedup_status(self, job):
- status_url = self.get_dedup_url()+'/' + job
- try:
- r = requests.get(url=status_url, auth=(self.super_user, self.super_pass))
- print r.text
- response = r.json()
- return response['status']
- except requests.exceptions.RequestException as e:
- self.logger.error('Failed to get dedup status, %s', e)
- # exit_on_error()
-
- def start_dedup(self, app_id):
- body = ""
- try:
- r = requests.post(url=self.get_dedup_url() + "/" + app_id, data=body, auth=(self.super_user, self.super_pass))
- if r.status_code == 200:
- response = r.json()
- print r.text
- return response['status']['jobStatusId']
- else:
- self.logger.error('Failed to start dedup, %s', r)
- exit_on_error(str(r))
-
- except requests.exceptions.RequestException as e:
- self.logger.error('Unable to make API request for dedup, %s', e)
- exit_on_error(str(e))
-
- def is_dedup_running(self, job):
- status = self.get_dedup_status(job)
- self.logger.info('Dedup status=[%s]', status)
- if status != "COMPLETE":
- return True
- else:
- return False
-
def is_endpoint_available(self):
try:
@@ -637,34 +490,6 @@ class Migrate:
)
- def get_app_ids(self):
-
- try:
-
- url = self.endpoint + "/management/token"
- body = json.dumps({"grant_type":"password","username":self.admin_user,"password":self.admin_pass})
- r = requests.post(url=url, data=body)
- if ( r.status_code != 200 ):
- print "Error logging in: " + r.text
- return
-
- access_token = r.json()["access_token"]
-
- url = self.endpoint + "/management/orgs/" + self.org + "/apps?access_token=" + access_token
- r = requests.get(url=url)
- if r.status_code != 200:
- exit_on_error('Cannot get app ids: ' + r.text)
-
- apps = r.json()["data"]
- app_ids = []
- for appId in apps.values():
- app_ids.append(appId)
-
- return app_ids;
-
- except requests.exceptions.RequestException as e:
- self.logger.error('Unable to get list of application ids, %s', e)
- exit_on_error(str(e))
def get_current_time():
return str(int(time.time()*1000))
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8af152e6/stack/scripts/multitenant_migrate.py
----------------------------------------------------------------------
diff --git a/stack/scripts/multitenant_migrate.py b/stack/scripts/multitenant_migrate.py
new file mode 100644
index 0000000..62e46af
--- /dev/null
+++ b/stack/scripts/multitenant_migrate.py
@@ -0,0 +1,703 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+# To migrate multiple tenants within one cluster.
+#
+# STEP 1 - SETUP TENANT ONE TOMCAT RUNNING 2.1 NOT IN SERVICE AND INIT MIGRATION
+#
+# Login to the Tomcat instance and run this command, specifying both superuser and tenant organization admin creds:
+#
+# python migrate_entity_data.py --org <org1name> --super <user>:<pass> --admin <user>:<pass> --init
+#
+# This command will setup and bootstrap the database, setup the migration system and update index mappings:
+# - /system/database/setup
+# - /system/database/bootstrap
+# - /system/migrate/run/migration-system
+# - /system/migrate/run/index_mapping_migration
+#
+# Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps
+# it will de-dup connections and re-index the app.
+#
+# Write down the 'Re-index start' timestamp when this is finished.
+#
+# STEP 2 - PUT TENANT ONE TOMCATS IN SERVICE AND DO DELTA MIGRATION
+#
+# On the same Tomcat instance and run this command with the --date timestamp you noted in the previous step:
+#
+# python migrate_entity_data.py --org <org1name> --super <user>:<pass> --admin <user>:<pass> --date <timestamp>
+#
+# Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps
+# it will de-dup connections and re-index the app with a start-date specified so only data modified since
+# STEP 1 will be re-indexed.
+#
+# STEP 3 - SETUP TENANT TWO TOMCAT RUNNING 2.1 NOT IN SERVICE
+#
+# Login to the Tomcat instance and run this command, specifying both superuser and tenant organization admin creds:
+#
+# python migrate_entity_data.py --org <org2name> --super <user>:<pass> --admin <user>:<pass>
+#
+# This command will migrate appinfos, re-index the management app and then for each of the specified org's apps
+# it will de-dup connections and re-index the app.
+#
+# Write down the 'Re-index start' timestamp when this is finished.
+
+# STEP 4 - PUT TENANT TWO TOMCATS IN SERVICE AND DO DELTA MIGRATION
+#
+# On the same Tomcat instance and run this command with the --date timestamp you noted in the previous step:
+#
+# python migrate_entity_data.py --org <org2name> --super <user>:<pass> --admin <user>:<pass> --date <timestamp>
+#
+# Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps
+# it will de-dup connections and re-index the app with a start-date specified so only data modified since
+# STEP 1 will be re-indexed.
+#
+# STEP 5 - FULL DATA MIGRATION
+#
+# Login to any Tomcat instance in the cluster and run this command (admin user creds must be specificed but will be ignored):
+#
+# python migrate_entity_data.py --super <user>:<pass> --admin <user>:<pass> --full
+#
+# This command will run the full data migration.
+#
+
+import sys
+import logging
+from logging.handlers import RotatingFileHandler
+import argparse
+import time
+import requests
+import json
+
+
+# Version expected in status response post-migration for entity and app-info data
+TARGET_APPINFO_VERSION=2
+TARGET_ENTITY_DATA_VERSION=2
+TARGET_CORE_DATA_VERSION=2
+TARGET_MIGRATION_SYSTEM_VERSION = 1
+TARGET_INDEX_MAPPING_VERSION = 2
+
+# Set an interval (in seconds) for checking if re-index and/or migration has finished
+STATUS_INTERVAL_SECONDS = 2
+
+# Set plugin names
+PLUGIN_MIGRATION_SYSTEM = 'migration-system'
+PLUGIN_APPINFO = 'appinfo-migration'
+PLUGIN_ENTITYDATA = 'collections-entity-data'
+PLUGIN_INDEX_MAPPING = 'index_mapping_migration'
+PLUGIN_CORE_DATA = 'core-data'
+
+MANAGEMENT_APP_ID = 'b6768a08-b5d5-11e3-a495-11ddb1de66c8'
+
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(description='Usergrid Migration Tool')
+
+ parser.add_argument('--endpoint',
+ help='The endpoint to use for making API requests.',
+ type=str,
+ default='http://localhost:8080')
+
+ parser.add_argument('--super',
+ help='Superuser username and creds <user:pass>',
+ type=str,
+ required=True)
+
+ parser.add_argument('--admin',
+ help='Organization admin creds <user:pass>',
+ type=str,
+ required=True)
+
+ parser.add_argument('--init',
+ help='Init system and start first migration.',
+ action='store_true',
+ default=False)
+
+ parser.add_argument('--org',
+ help='Name of organization on which to run migration.',
+ type=str,
+ required=False)
+
+ parser.add_argument('--date',
+ help='A date from which to start the migration',
+ type=str)
+
+ parser.add_argument('--full',
+ help='Run full data migration (last step in cluster migration).',
+ action='store_true',
+ default=False)
+
+ my_args = parser.parse_args(sys.argv[1:])
+
+ arg_vars = vars(my_args)
+
+ creds = arg_vars['super'].split(':')
+ if len(creds) != 2:
+ print('Superuser credentials not properly specified. Must be "-u <user:pass>". Exiting...')
+ exit_on_error()
+ else:
+ arg_vars['superuser'] = creds[0]
+ arg_vars['superpass'] = creds[1]
+
+ creds = arg_vars['super'].split(':')
+ if len(creds) != 2:
+ print('Org admin credentials not properly specified. Must be "-u <user:pass>". Exiting...')
+ exit_on_error()
+ else:
+ arg_vars['adminuser'] = creds[0]
+ arg_vars['adminpass'] = creds[1]
+
+ return arg_vars
+
+
+
+class Migrate:
+ def __init__(self):
+ self.args = parse_args()
+ self.start_date = self.args['date']
+ self.endpoint = self.args['endpoint']
+ self.metrics = {'reindex_start': '',
+ 'reindex_end': '',
+ 'appinfo_migration_start': '',
+ 'appinfo_migration_end': '',
+ 'full_data_migration_start': '',
+ 'full_data_migration_end': ''}
+ self.logger = init_logging(self.__class__.__name__)
+ self.super_user = self.args['superuser']
+ self.super_pass = self.args['superpass']
+ self.admin_user = self.args['adminuser']
+ self.admin_pass = self.args['adminpass']
+ self.org = self.args['org']
+ self.init = self.args['init']
+ self.full = self.args['full']
+
+ def run(self):
+ self.logger.info('Initializing...')
+
+ if not self.is_endpoint_available():
+ exit_on_error('Endpoint is not available, aborting')
+
+ if self.start_date is not None:
+ self.logger.info("Date Provided. Re-index will run from date=[%s]", self.start_date)
+
+ try:
+
+ if self.full:
+
+ # Do full data migration and exit
+
+ self.start_fulldata_migration()
+
+ self.metrics['full_data_migration_start'] = get_current_time()
+ self.logger.info("Full Data Migration Started")
+ is_migrated = False
+ while not is_migrated:
+ time.sleep(STATUS_INTERVAL_SECONDS)
+ is_migrated = self.is_data_migrated()
+ if is_migrated:
+ break
+
+ self.metrics['full_data_migration_end'] = get_current_time()
+ self.logger.info("Full Data Migration completed")
+
+ self.log_metrics()
+ self.logger.info("Finished...")
+
+ return
+
+
+ if self.init:
+
+ # Init the migration system as this is the first migration done on the cluster
+
+ self.run_database_setup()
+
+ migration_system_updated = self.is_migration_system_updated()
+
+ if not migration_system_updated:
+ self.logger.info('Migration system needs to be updated. Updating migration system..')
+ self.start_migration_system_update()
+ while not migration_system_updated:
+ time.sleep(STATUS_INTERVAL_SECONDS)
+ migration_system_updated = self.is_migration_system_updated()
+ if migration_system_updated:
+ break
+
+ index_mapping_updated = self.is_index_mapping_updated()
+
+ if not index_mapping_updated:
+ self.logger.info('Index Mapping needs to be updated. Updating index mapping..')
+ self.start_index_mapping_migration()
+ while not index_mapping_updated:
+ time.sleep(STATUS_INTERVAL_SECONDS)
+ index_mapping_updated = self.is_index_mapping_updated()
+ if index_mapping_updated:
+ break
+
+
+ # Migrate app info
+
+ if self.is_appinfo_migrated():
+ self.logger.info('AppInfo already migrated. Resetting version for re-migration.')
+ self.reset_appinfo_migration()
+ time.sleep(STATUS_INTERVAL_SECONDS)
+
+ self.start_appinfo_migration()
+ self.logger.info('AppInfo Migration Started.')
+ self.metrics['appinfo_migration_start'] = get_current_time()
+
+ is_appinfo_migrated = False
+ while not is_appinfo_migrated:
+ is_appinfo_migrated = self.is_appinfo_migrated()
+ time.sleep(STATUS_INTERVAL_SECONDS)
+ if is_appinfo_migrated:
+ self.metrics['appinfo_migration_end'] = get_current_time()
+ break
+ self.logger.info('AppInfo Migration Ended.')
+
+
+ # Reindex management app
+
+ job = self.start_app_reindex(MANAGEMENT_APP_ID)
+ self.metrics['reindex_start'] = get_current_time()
+ self.logger.info('Started Re-index. Job=[%s]', job)
+ is_running = True
+ while is_running:
+ time.sleep(STATUS_INTERVAL_SECONDS)
+ is_running = self.is_reindex_running(job)
+ if not is_running:
+ break
+
+ self.logger.info("Finished Re-index. Job=[%s]", job)
+ self.metrics['reindex_end'] = get_current_time()
+
+
+ # Dedup and re-index all of organization's apps
+
+ app_ids = self.get_app_ids()
+ for app_id in app_ids:
+
+ # De-dup app
+ job = self.start_dedup(app_id)
+ self.logger.info('Started dedup. App=[%s], Job=[%s]', app_id, job)
+ is_running = True
+ while is_running:
+ time.sleep(STATUS_INTERVAL_SECONDS)
+ is_running = self.is_dedup_running(job)
+ if not is_running:
+ break
+
+ self.logger.info("Finished dedup. App=[%s], Job=[%s]", app_id, job)
+ self.metrics['dedup_end_' + app_id] = get_current_time()
+
+ # Re-index app
+ job = self.start_app_reindex(app_id)
+ self.metrics['reindex_start_' + app_id] = get_current_time()
+ self.logger.info('Started Re-index. App=[%s], Job=[%s]', app_id, job)
+ is_running = True
+ while is_running:
+ time.sleep(STATUS_INTERVAL_SECONDS)
+ is_running = self.is_reindex_running(job)
+ if not is_running:
+ break
+
+ self.logger.info("Finished Re-index. App=[%s], Job=[%s]", app_id, job)
+ self.metrics['reindex_end_' + app_id] = get_current_time()
+
+ self.log_metrics()
+ self.logger.info("Finished...")
+
+
+ except KeyboardInterrupt:
+ self.log_metrics()
+ self.logger.error('Keyboard interrupted migration. Please run again to ensure the migration finished.')
+
+ def get_database_setup_url(self):
+ url = self.endpoint + '/system/database/setup'
+ return url
+
+ def get_database_bootstrap_url(self):
+ url = self.endpoint + '/system/database/bootstrap'
+ return url
+
+ def get_migration_url(self):
+ url = self.endpoint + '/system/migrate/run'
+ return url
+
+ def get_reset_migration_url(self):
+ url = self.endpoint + '/system/migrate/set'
+ return url
+
+ def get_migration_status_url(self):
+ url = self.endpoint + '/system/migrate/status'
+ return url
+
+ def get_dedup_url(self):
+ url = self.endpoint + '/system/connection/dedup'
+ return url
+
+ def get_reindex_url(self):
+ url = self.endpoint + '/system/index/rebuild'
+ return url
+
+ def get_management_reindex_url(self):
+ url = self.get_reindex_url() + "/management"
+ return url
+
+ def start_core_data_migration(self):
+ try:
+ r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.super_pass))
+ response = r.json()
+ return response
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to start migration, %s', e)
+ exit_on_error(str(e))
+
+ def start_fulldata_migration(self):
+ try:
+ r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.super_pass))
+ response = r.json()
+ return response
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to start migration, %s', e)
+ exit_on_error(str(e))
+
+ def start_migration_system_update(self):
+ try:
+ # TODO fix this URL
+ migrateUrl = self.get_migration_url() + '/' + PLUGIN_MIGRATION_SYSTEM
+ r = requests.put(url=migrateUrl, auth=(self.admin_user, self.super_pass))
+ response = r.json()
+ return response
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to start migration, %s', e)
+ exit_on_error(str(e))
+
+ def run_database_setup(self):
+ try:
+ setupUrl = self.get_database_setup_url()
+ r = requests.put(url=setupUrl, auth=(self.super_user, self.super_pass))
+ if r.status_code != 200:
+ exit_on_error('Database Setup Failed')
+
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to run database setup, %s', e)
+ exit_on_error(str(e))
+
+ def run_database_bootstrap(self):
+ try:
+ setupUrl = self.get_database_bootstrap_url()
+ r = requests.put(url=setupUrl, auth=(self.super_user, self.super_pass))
+ if r.status_code != 200:
+ exit_on_error('Database Bootstrap Failed')
+
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to run database bootstrap, %s', e)
+ exit_on_error(str(e))
+
+ def start_index_mapping_migration(self):
+ try:
+ migrateUrl = self.get_migration_url() + '/' + PLUGIN_INDEX_MAPPING
+ r = requests.put(url=migrateUrl, auth=(self.super_user, self.super_pass))
+ response = r.json()
+ return response
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to start migration, %s', e)
+ exit_on_error(str(e))
+
+ def start_appinfo_migration(self):
+ try:
+ migrateUrl = self.get_migration_url() + '/' + PLUGIN_APPINFO
+ r = requests.put(url=migrateUrl, auth=(self.admin_user, self.admin_pass))
+ response = r.json()
+ return response
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to start migration, %s', e)
+ exit_on_error(str(e))
+
+ def reset_data_migration(self):
+ version = TARGET_ENTITY_DATA_VERSION - 1
+ body = json.dumps({PLUGIN_ENTITYDATA: version, PLUGIN_APPINFO: version})
+ try:
+ r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.super_user, self.super_pass))
+ response = r.json()
+ self.logger.info('Resetting data migration versions to %s=[%s] '
+ 'and %s=[%s]', PLUGIN_ENTITYDATA, version, PLUGIN_APPINFO, version)
+ return response
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to reset full data migration versions, %s', e)
+ exit_on_error(str(e))
+
+ def reset_appinfo_migration(self):
+ version = TARGET_APPINFO_VERSION - 1
+ body = json.dumps({PLUGIN_APPINFO: version})
+ try:
+ r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.super_user, self.super_pass))
+ response = r.json()
+ self.logger.info('Resetting appinfo migration versions to %s=[%s]', PLUGIN_APPINFO, version)
+ return response
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to reset appinfo migration version, %s', e)
+ exit_on_error(str(e))
+
+ def is_data_migrated(self):
+ status = self.check_data_migration_status()
+ if status is not None:
+ entity_version = status['data'][PLUGIN_ENTITYDATA]
+ appinfo_version = status['data'][PLUGIN_APPINFO]
+ core_data_version = status['data'][PLUGIN_CORE_DATA]
+
+ if entity_version == TARGET_ENTITY_DATA_VERSION and appinfo_version == TARGET_APPINFO_VERSION and core_data_version == TARGET_CORE_DATA_VERSION:
+ self.logger.info('Full Data Migration status=[COMPLETE], %s=[%s], '
+ '%s=[%s], %s=%s',
+ PLUGIN_ENTITYDATA,
+ entity_version,
+ PLUGIN_APPINFO,
+ appinfo_version,
+ PLUGIN_CORE_DATA,
+ core_data_version)
+ return True
+ else:
+ self.logger.info('Full Data Migration status=[NOTSTARTED/INPROGRESS]')
+ return False
+
+ def is_appinfo_migrated(self):
+ status = self.check_data_migration_status()
+ if status is not None:
+ appinfo_version = status['data'][PLUGIN_APPINFO]
+
+ if appinfo_version == TARGET_APPINFO_VERSION:
+ self.logger.info('AppInfo Migration status=[COMPLETE],'
+ '%s=[%s]',
+ PLUGIN_APPINFO,
+ appinfo_version)
+ return True
+ else:
+ self.logger.info('AppInfo Migration status=[NOTSTARTED/INPROGRESS]')
+ return False
+
+ def is_migration_system_updated(self):
+ status = self.check_data_migration_status()
+ if status is not None:
+ migration_system_version = status['data'][PLUGIN_MIGRATION_SYSTEM]
+
+ if migration_system_version == TARGET_MIGRATION_SYSTEM_VERSION:
+ self.logger.info('Migration System CURRENT, %s=[%s]',
+ PLUGIN_MIGRATION_SYSTEM,
+ migration_system_version)
+ return True
+ else:
+ self.logger.info('Migration System OLD, %s=[%s]',
+ PLUGIN_MIGRATION_SYSTEM,
+ migration_system_version)
+ return False
+
+ def is_index_mapping_updated(self):
+ status = self.check_data_migration_status()
+ if status is not None:
+ index_mapping_version = status['data'][PLUGIN_INDEX_MAPPING]
+
+ if index_mapping_version == TARGET_INDEX_MAPPING_VERSION:
+ self.logger.info('Index Mapping CURRENT, %s=[%s]',
+ PLUGIN_INDEX_MAPPING,
+ index_mapping_version)
+ return True
+ else:
+ self.logger.info('Index Mapping OLD, %s=[%s]',
+ PLUGIN_INDEX_MAPPING,
+ index_mapping_version)
+ return False
+
+ def check_data_migration_status(self):
+
+ try:
+ r = requests.get(url=self.get_migration_status_url(), auth=(self.super_user, self.super_pass))
+ if r.status_code == 200:
+ response = r.json()
+ return response
+ else:
+ self.logger.error('Failed to check migration status, %s', r)
+ return
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to check migration status, %s', e)
+ # exit_on_error()
+
+ def get_reindex_status(self, job):
+ status_url = self.get_reindex_url()+'/' + job
+
+ try:
+ r = requests.get(url=status_url, auth=(self.super_user, self.super_pass))
+ response = r.json()
+ return response['status']
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to get reindex status, %s', e)
+ # exit_on_error()
+
+ def start_app_reindex(self, appId):
+ body = ""
+ if self.start_date is not None:
+ body = json.dumps({'updated': self.start_date})
+
+ try:
+ r = requests.post(url=self.get_reindex_url(), data=body, auth=(self.super_user, self.super_pass))
+
+ if r.status_code == 200:
+ response = r.json()
+ return response['jobId']
+ else:
+ self.logger.error('Failed to start reindex, %s', r)
+ exit_on_error(str(r))
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Unable to make API request for reindex, %s', e)
+ exit_on_error(str(e))
+
+ def is_reindex_running(self, job):
+ status = self.get_reindex_status(job)
+ self.logger.info('Re-index status=[%s]', status)
+ if status != "COMPLETE":
+ return True
+ else:
+ return False
+
+ def get_dedup_status(self, job):
+ status_url = self.get_dedup_url()+'/' + job
+ try:
+ r = requests.get(url=status_url, auth=(self.super_user, self.super_pass))
+ response = r.json()
+ return response['status']['status']
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Failed to get dedup status, %s', e)
+ # exit_on_error()
+
+ def start_dedup(self, app_id):
+ body = ""
+ try:
+ r = requests.post(url=self.get_dedup_url() + "/" + app_id, data=body, auth=(self.super_user, self.super_pass))
+ if r.status_code == 200:
+ response = r.json()
+ return response['status']['jobStatusId']
+ else:
+ self.logger.error('Failed to start dedup, %s', r)
+ exit_on_error(str(r))
+
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Unable to make API request for dedup, %s', e)
+ exit_on_error(str(e))
+
+ def is_dedup_running(self, job):
+ status = self.get_dedup_status(job)
+ self.logger.info('Dedup status=[%s]', status)
+ if status != "COMPLETE":
+ return True
+ else:
+ return False
+
+ def is_endpoint_available(self):
+
+ try:
+ r = requests.get(url=self.endpoint+'/status')
+ if r.status_code == 200:
+ return True
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Endpoint is unavailable, %s', str(e))
+ return False
+
+ def log_metrics(self):
+ self.logger.info(
+ 'Re-index start=[%s], ' +
+ 'Re-index end =[%s], ' +
+ 'Full Data Migration start=[%s], ' +
+ 'Full Data Migration end=[%s] ' +
+ 'AppInfo Migration start=[%s], ' +
+ 'AppInfo Migration end=[%s] ',
+ self.metrics['reindex_start'],
+ self.metrics['reindex_end'],
+ self.metrics['full_data_migration_start'],
+ self.metrics['full_data_migration_end'],
+ self.metrics['appinfo_migration_start'],
+ self.metrics['appinfo_migration_end']
+
+ )
+
+ def get_app_ids(self):
+
+ try:
+
+ url = self.endpoint + "/management/token"
+ body = json.dumps({"grant_type":"password","username":self.admin_user,"password":self.admin_pass})
+ r = requests.post(url=url, data=body)
+ if ( r.status_code != 200 ):
+ print "Error logging in: " + r.text
+ return
+
+ access_token = r.json()["access_token"]
+
+ url = self.endpoint + "/management/orgs/" + self.org + "/apps?access_token=" + access_token
+ r = requests.get(url=url)
+ if r.status_code != 200:
+ exit_on_error('Cannot get app ids: ' + r.text)
+
+ apps = r.json()["data"]
+ app_ids = []
+ for appId in apps.values():
+ app_ids.append(appId)
+
+ print app_ids
+
+ return app_ids
+
+ except requests.exceptions.RequestException as e:
+ self.logger.error('Unable to get list of application ids, %s', e)
+ exit_on_error(str(e))
+
+def get_current_time():
+ return str(int(time.time()*1000))
+
+
+def exit_on_error(e=""):
+ print ('Exiting migration script due to error: ' + str(e))
+ sys.exit(1)
+
+
+def init_logging(name):
+
+ logger = logging.getLogger(name)
+ log_file_name = './migration.log'
+ log_formatter = logging.Formatter(fmt='%(asctime)s [%(name)s] %(levelname)s %(message)s',
+ datefmt='%Y-%m-%d %H:%M:%S')
+
+ rotating_file = logging.handlers.RotatingFileHandler(filename=log_file_name,
+ mode='a',
+ maxBytes=104857600,
+ backupCount=10)
+ rotating_file.setFormatter(log_formatter)
+ rotating_file.setLevel(logging.INFO)
+ logger.addHandler(rotating_file)
+ logger.setLevel(logging.INFO)
+
+ stdout_logger = logging.StreamHandler(sys.stdout)
+ stdout_logger.setFormatter(log_formatter)
+ stdout_logger.setLevel(logging.INFO)
+ logger.addHandler(stdout_logger)
+
+ return logger
+
+if __name__ == '__main__':
+
+ migration = Migrate()
+ migration.run()