You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/11/06 21:38:25 UTC

[31/50] [abbrv] usergrid git commit: Rename new script to multitenant-migrate and restore old script.

Rename new script to multitenant-migrate and restore old script.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8af152e6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8af152e6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8af152e6

Branch: refs/heads/asf-site
Commit: 8af152e6bef74aa6259aff5a69428523da35d0bf
Parents: 57a613b
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Oct 29 18:00:54 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Oct 29 18:00:54 2015 -0400

----------------------------------------------------------------------
 stack/scripts/migrate_entity_data.py | 401 +++++------------
 stack/scripts/multitenant_migrate.py | 703 ++++++++++++++++++++++++++++++
 2 files changed, 816 insertions(+), 288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/8af152e6/stack/scripts/migrate_entity_data.py
----------------------------------------------------------------------
diff --git a/stack/scripts/migrate_entity_data.py b/stack/scripts/migrate_entity_data.py
index 0edd319..9c01270 100644
--- a/stack/scripts/migrate_entity_data.py
+++ b/stack/scripts/migrate_entity_data.py
@@ -16,64 +16,30 @@
 # under the License.
 #
 #
-# To migrate multiple tenants within one cluster.
 #
-# STEP 1 - SETUP TENANT ONE TOMCAT RUNNING 2.1 NOT IN SERVICE AND INIT MIGRATION
+# Usage from a machine running Usergrid with the new Usergrid version:
 #
-#   Login to the Tomcat instance and run this command, specifying both superuser and tenant organization admin creds:
+# ######################################################
+# STEP 1 - BEFORE SWITCHING TRAFFIC TO NEW UG VERSION
+# ######################################################
 #
-#       python migrate_entity_data.py --org <org1name> --super <user>:<pass> --admin <user>:<pass> --init
+# python migrate_entity_data.py --user adminuser:adminpass
 #
-#   This command will setup and bootstrap the database, setup the migration system and update index mappings:
-#   - /system/database/setup
-#   - /system/database/bootstrap
-#   - /system/migrate/run/migration-system
-#   - /system/migrate/run/index_mapping_migration
+# The above command performs an appinfo migration and system re-index only.  This creates indices in Elasticsearch with
+# the updated indexing strategy in the new Usergrid version.
 #
-#   Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps
-#   it will de-dup connections and re-index the app.
+# ######################################################
+# STEP 2 - AFTER SWITCHING TRAFFIC TO NEW UG VERSION
+# ######################################################
 #
-#   Write down the 'Re-index start' timestamp when this is finished.
+# python migrate_entity_data.py --user adminuser:adminpass --delta --date <timestamp>
 #
-# STEP 2 - PUT TENANT ONE TOMCATS IN SERVICE AND DO DELTA MIGRATION
-#
-#   On the same Tomcat instance and run this command with the --date timestamp you noted in the previous step:
-#
-#       python migrate_entity_data.py --org <org1name> --super <user>:<pass> --admin <user>:<pass> --date <timestamp>
-#
-#   Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps
-#   it will de-dup connections and re-index the app with a start-date specified so only data modified since
-#   STEP 1 will be re-indexed.
-#
-# STEP 3 - SETUP TENANT TWO TOMCAT RUNNING 2.1 NOT IN SERVICE
-#
-#   Login to the Tomcat instance and run this command, specifying both superuser and tenant organization admin creds:
-#
-#       python migrate_entity_data.py --org <org2name> --super <user>:<pass> --admin <user>:<pass>
-#
-#   This command will migrate appinfos, re-index the management app and then for each of the specified org's apps
-#   it will de-dup connections and re-index the app.
-#
-#   Write down the 'Re-index start' timestamp when this is finished.
+# The above command performs an appinfo migration, system re-index using a start date, and full data migration which
+# includes entity data.  This step is necessary to ensure Usergrid starts reading and writing data from the latest
+# entity version, including delta indexing of any documents create during the time between STEP 1 and STEP 2.  If
+# all data has already been migrated (running this a 2nd, 3rd, etc. time), then the appinfo migration will be skipped.
+
 
-# STEP 4 - PUT TENANT TWO TOMCATS IN SERVICE AND DO DELTA MIGRATION
-#
-#   On the same Tomcat instance and run this command with the --date timestamp you noted in the previous step:
-#
-#       python migrate_entity_data.py --org <org2name> --super <user>:<pass> --admin <user>:<pass> --date <timestamp>
-#
-#   Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps
-#   it will de-dup connections and re-index the app with a start-date specified so only data modified since
-#   STEP 1 will be re-indexed.
-#
-# STEP 5 - FULL DATA MIGRATION
-#
-#   Login to any Tomcat instance in the cluster and run this command (admin user creds must be specificed but will be ignored):
-#
-#       python migrate_entity_data.py --super <user>:<pass> --admin <user>:<pass> --full
-#
-#   This command will run the full data migration.
-#
 
 import sys
 import logging
@@ -101,66 +67,40 @@ PLUGIN_ENTITYDATA = 'collections-entity-data'
 PLUGIN_INDEX_MAPPING = 'index_mapping_migration'
 PLUGIN_CORE_DATA = 'core-data'
 
-MANAGEMENT_APP_ID = 'b6768a08-b5d5-11e3-a495-11ddb1de66c8'
-
 
 
 def parse_args():
     parser = argparse.ArgumentParser(description='Usergrid Migration Tool')
 
+    parser.add_argument('--date',
+                        help='A date from which to start the migration',
+                        type=str)
+
     parser.add_argument('--endpoint',
                         help='The endpoint to use for making API requests.',
                         type=str,
                         default='http://localhost:8080')
 
-    parser.add_argument('--super',
-                        help='Superuser username and creds <user:pass>',
+    parser.add_argument('--user',
+                        help='System Admin Credentials used to authenticate with Usergrid  <user:pass>',
                         type=str,
                         required=True)
 
-    parser.add_argument('--admin',
-                        help='Organization admin creds <user:pass>',
-                        type=str,
-                        required=True)
-
-    parser.add_argument('--init',
-                        help='Init system and start first migration.',
-                        action='store_true',
-                        default=False)
-
-    parser.add_argument('--org',
-                        help='Name of organization on which to run migration.',
-                        type=str,
-                        required=False)
-
-    parser.add_argument('--date',
-                        help='A date from which to start the migration',
-                        type=str)
-
-    parser.add_argument('--full',
-                        help='Run full data migration (last step in cluster migration).',
+    parser.add_argument('--delta',
+                        help='Run a delta migration.',
                         action='store_true',
                         default=False)
 
     my_args = parser.parse_args(sys.argv[1:])
 
     arg_vars = vars(my_args)
-
-    creds = arg_vars['super'].split(':')
-    if len(creds) != 2:
-        print('Superuser credentials not properly specified.  Must be "-u <user:pass>". Exiting...')
-        exit_on_error()
-    else:
-        arg_vars['superuser'] = creds[0]
-        arg_vars['superpass'] = creds[1]
-
-    creds = arg_vars['super'].split(':')
+    creds = arg_vars['user'].split(':')
     if len(creds) != 2:
-        print('Org admin credentials not properly specified.  Must be "-u <user:pass>". Exiting...')
+        print('Credentials not properly specified.  Must be "-u <user:pass>". Exiting...')
         exit_on_error()
     else:
-        arg_vars['adminuser'] = creds[0]
-        arg_vars['adminpass'] = creds[1]
+        arg_vars['user'] = creds[0]
+        arg_vars['pass'] = creds[1]
 
     return arg_vars
 
@@ -178,13 +118,9 @@ class Migrate:
                         'full_data_migration_start': '',
                         'full_data_migration_end': ''}
         self.logger = init_logging(self.__class__.__name__)
-        self.super_user = self.args['superuser']
-        self.super_pass = self.args['superpass']
-        self.admin_user = self.args['adminuser']
-        self.admin_pass = self.args['adminpass']
-        self.org = self.args['org']
-        self.init = self.args['init']
-        self.full = self.args['full']
+        self.admin_user = self.args['user']
+        self.admin_pass = self.args['pass']
+        self.delta_migration = self.args['delta']
 
     def run(self):
         self.logger.info('Initializing...')
@@ -197,83 +133,63 @@ class Migrate:
 
         try:
 
-            if self.full:
+            self.run_database_setup()
 
-                # Do full data migration and exit
+            # We need to check and roll the migration system to 1 if not already
+            migration_system_updated = self.is_migration_system_updated()
 
-                self.start_fulldata_migration()
-
-                self.metrics['full_data_migration_start'] = get_current_time()
-                self.logger.info("Full Data Migration Started")
-                is_migrated = False
-                while not is_migrated:
+            if not migration_system_updated:
+                self.logger.info('Migration system needs to be updated.  Updating migration system..')
+                self.start_migration_system_update()
+                while not migration_system_updated:
                     time.sleep(STATUS_INTERVAL_SECONDS)
-                    is_migrated = self.is_data_migrated()
-                    if is_migrated:
+                    migration_system_updated = self.is_migration_system_updated()
+                    if migration_system_updated:
                         break
 
-                self.metrics['full_data_migration_end'] = get_current_time()
-                self.logger.info("Full Data Migration completed")
+            index_mapping_updated = self.is_index_mapping_updated()
 
-                self.log_metrics()
-                self.logger.info("Finished...")
-
-                return
-
-
-            if self.init:
-
-                # Init the migration system as this is the first migration done on the cluster
-
-                self.run_database_setup()
-
-                migration_system_updated = self.is_migration_system_updated()
-
-                if not migration_system_updated:
-                    self.logger.info('Migration system needs to be updated.  Updating migration system..')
-                    self.start_migration_system_update()
-                    while not migration_system_updated:
-                        time.sleep(STATUS_INTERVAL_SECONDS)
-                        migration_system_updated = self.is_migration_system_updated()
-                        if migration_system_updated:
-                            break
+            if not index_mapping_updated:
+                self.logger.info('Index Mapping needs to be updated.  Updating index mapping..')
+                self.start_index_mapping_migration()
+                while not index_mapping_updated:
+                    time.sleep(STATUS_INTERVAL_SECONDS)
+                    index_mapping_updated = self.is_index_mapping_updated()
+                    if index_mapping_updated:
+                        break
 
-                index_mapping_updated = self.is_index_mapping_updated()
+            # Run AppInfo migration only when both appinfos and collection entity data have not been migrated
+            if not self.is_data_migrated():
 
-                if not index_mapping_updated:
-                    self.logger.info('Index Mapping needs to be updated.  Updating index mapping..')
-                    self.start_index_mapping_migration()
-                    while not index_mapping_updated:
-                        time.sleep(STATUS_INTERVAL_SECONDS)
-                        index_mapping_updated = self.is_index_mapping_updated()
-                        if index_mapping_updated:
-                            break
+                #Migrate app info
+                if self.is_appinfo_migrated():
+                    self.logger.info('AppInfo already migrated. Resetting version for re-migration.')
+                    self.reset_appinfo_migration()
+                    time.sleep(STATUS_INTERVAL_SECONDS)
 
+                self.start_appinfo_migration()
+                self.logger.info('AppInfo Migration Started.')
+                self.metrics['appinfo_migration_start'] = get_current_time()
 
-            # Migrate app info
+                is_appinfo_migrated = False
+                while not is_appinfo_migrated:
+                    is_appinfo_migrated = self.is_appinfo_migrated()
+                    time.sleep(STATUS_INTERVAL_SECONDS)
+                    if is_appinfo_migrated:
+                        self.metrics['appinfo_migration_end'] = get_current_time()
+                        break
+                self.logger.info('AppInfo Migration Ended.')
 
-            if self.is_appinfo_migrated():
-                self.logger.info('AppInfo already migrated. Resetting version for re-migration.')
-                self.reset_appinfo_migration()
-                time.sleep(STATUS_INTERVAL_SECONDS)
 
-            self.start_appinfo_migration()
-            self.logger.info('AppInfo Migration Started.')
-            self.metrics['appinfo_migration_start'] = get_current_time()
+            else:
+                self.logger.info('Full Data Migration previously ran... skipping AppInfo migration.')
 
-            is_appinfo_migrated = False
-            while not is_appinfo_migrated:
-                is_appinfo_migrated = self.is_appinfo_migrated()
-                time.sleep(STATUS_INTERVAL_SECONDS)
-                if is_appinfo_migrated:
-                    self.metrics['appinfo_migration_end'] = get_current_time()
-                    break
-            self.logger.info('AppInfo Migration Ended.')
 
 
-            # Reindex management app
+            # We need to check and roll index mapping version to 1 if not already there
 
-            job = self.start_app_reindex(MANAGEMENT_APP_ID)
+            # Perform system re-index (it will grab date from input if provided)
+            job = self.start_reindex()
             self.metrics['reindex_start'] = get_current_time()
             self.logger.info('Started Re-index.  Job=[%s]', job)
             is_running = True
@@ -286,44 +202,33 @@ class Migrate:
             self.logger.info("Finished Re-index. Job=[%s]", job)
             self.metrics['reindex_end'] = get_current_time()
 
+            # Only when we do a delta migration do we run the full data migration (includes appinfo and entity data)
+            if self.delta_migration:
 
-            # Dedup and re-index all of organization's apps
-
-            app_ids = self.get_app_ids()
-            for app_id in app_ids:
-
-                # De-dep app
-                # job = self.start_dedup(app_id)
-                # self.metrics['dedup_start_' + app_id] = get_current_time()
-                # self.logger.info('Started dedup.  App=[%s], Job=[%s]', app_id, job)
-                # is_running = True
-                # while is_running:
-                #     time.sleep(STATUS_INTERVAL_SECONDS)
-                #     is_running = self.is_dedup_running(job)
-                #     if not is_running:
-                #         break
-                #
-                # self.logger.info("Finished dedup. App=[%s], Job=[%s]", app_id, job)
-                # self.metrics['dedup_end_' + app_id] = get_current_time()
-
-                # Re-index app
-                job = self.start_app_reindex(app_id)
-                self.metrics['reindex_start_' + app_id] = get_current_time()
-                self.logger.info('Started Re-index.  App=[%s], Job=[%s]', app_id, job)
-                is_running = True
-                while is_running:
+                self.logger.info('Delta option provided. Performing full data migration...')
+                if self.is_data_migrated():
+                    self.reset_data_migration()
+                time.sleep(STATUS_INTERVAL_SECONDS)
+                self.is_data_migrated()
+
+                # self.start_core_data_migration()
+                self.start_fulldata_migration()
+
+                self.metrics['full_data_migration_start'] = get_current_time()
+                self.logger.info("Full Data Migration Started")
+                is_migrated = False
+                while not is_migrated:
                     time.sleep(STATUS_INTERVAL_SECONDS)
-                    is_running = self.is_reindex_running(job)
-                    if not is_running:
+                    is_migrated = self.is_data_migrated()
+                    if is_migrated:
                         break
 
-                self.logger.info("Finished Re-index. App=[%s], Job=[%s]", app_id, job)
-                self.metrics['reindex_end_' + app_id] = get_current_time()
+                self.metrics['full_data_migration_end'] = get_current_time()
+                self.logger.info("Full Data Migration completed")
 
             self.log_metrics()
             self.logger.info("Finished...")
 
-
         except KeyboardInterrupt:
             self.log_metrics()
             self.logger.error('Keyboard interrupted migration. Please run again to ensure the migration finished.')
@@ -332,10 +237,6 @@ class Migrate:
         url = self.endpoint + '/system/database/setup'
         return url
 
-    def get_database_bootstrap_url(self):
-        url = self.endpoint + '/system/database/bootstrap'
-        return url
-
     def get_migration_url(self):
         url = self.endpoint + '/system/migrate/run'
         return url
@@ -348,30 +249,28 @@ class Migrate:
         url = self.endpoint + '/system/migrate/status'
         return url
 
-    def get_dedup_url(self):
-        url = self.endpoint + '/system/connection/dedup'
-        return url
-
     def get_reindex_url(self):
         url = self.endpoint + '/system/index/rebuild'
         return url
 
     def get_management_reindex_url(self):
-          url = self.get_reindex_url() + "/management"
-          return url
+        url = self.get_reindex_url() + "/management"
+        return url
+
 
     def start_core_data_migration(self):
-           try:
-               r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.super_pass))
-               response = r.json()
-               return response
-           except requests.exceptions.RequestException as e:
-               self.logger.error('Failed to start migration, %s', e)
-               exit_on_error(str(e))
+        try:
+            r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.admin_pass))
+            response = r.json()
+            return response
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Failed to start migration, %s', e)
+            exit_on_error(str(e))
+
 
     def start_fulldata_migration(self):
         try:
-            r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.super_pass))
+            r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.admin_pass))
             response = r.json()
             return response
         except requests.exceptions.RequestException as e:
@@ -380,9 +279,9 @@ class Migrate:
 
     def start_migration_system_update(self):
         try:
-            # TODO fix this URL
+            #TODO fix this URL
             migrateUrl = self.get_migration_url() + '/' + PLUGIN_MIGRATION_SYSTEM
-            r = requests.put(url=migrateUrl, auth=(self.admin_user, self.super_pass))
+            r = requests.put(url=migrateUrl, auth=(self.admin_user, self.admin_pass))
             response = r.json()
             return response
         except requests.exceptions.RequestException as e:
@@ -392,7 +291,7 @@ class Migrate:
     def run_database_setup(self):
         try:
             setupUrl = self.get_database_setup_url()
-            r = requests.put(url=setupUrl, auth=(self.super_user, self.super_pass))
+            r = requests.put(url=setupUrl, auth=(self.admin_user, self.admin_pass))
             if r.status_code != 200:
                 exit_on_error('Database Setup Failed')
 
@@ -400,21 +299,10 @@ class Migrate:
             self.logger.error('Failed to run database setup, %s', e)
             exit_on_error(str(e))
 
-    def run_database_bootstrap(self):
-        try:
-            setupUrl = self.get_database_bootstrap_url()
-            r = requests.put(url=setupUrl, auth=(self.super_user, self.super_pass))
-            if r.status_code != 200:
-                exit_on_error('Database Bootstrap Failed')
-
-        except requests.exceptions.RequestException as e:
-            self.logger.error('Failed to run database bootstrap, %s', e)
-            exit_on_error(str(e))
-
     def start_index_mapping_migration(self):
         try:
             migrateUrl = self.get_migration_url() + '/' + PLUGIN_INDEX_MAPPING
-            r = requests.put(url=migrateUrl, auth=(self.super_user, self.super_pass))
+            r = requests.put(url=migrateUrl, auth=(self.admin_user, self.admin_pass))
             response = r.json()
             return response
         except requests.exceptions.RequestException as e:
@@ -435,7 +323,7 @@ class Migrate:
         version = TARGET_ENTITY_DATA_VERSION - 1
         body = json.dumps({PLUGIN_ENTITYDATA: version, PLUGIN_APPINFO: version})
         try:
-            r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.super_user, self.super_pass))
+            r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.admin_user, self.admin_pass))
             response = r.json()
             self.logger.info('Resetting data migration versions to %s=[%s] '
                              'and %s=[%s]', PLUGIN_ENTITYDATA, version, PLUGIN_APPINFO, version)
@@ -448,7 +336,7 @@ class Migrate:
         version = TARGET_APPINFO_VERSION - 1
         body = json.dumps({PLUGIN_APPINFO: version})
         try:
-            r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.super_user, self.super_pass))
+            r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.admin_user, self.admin_pass))
             response = r.json()
             self.logger.info('Resetting appinfo migration versions to %s=[%s]', PLUGIN_APPINFO, version)
             return response
@@ -527,7 +415,7 @@ class Migrate:
     def check_data_migration_status(self):
 
         try:
-            r = requests.get(url=self.get_migration_status_url(), auth=(self.super_user, self.super_pass))
+            r = requests.get(url=self.get_migration_status_url(), auth=(self.admin_user, self.admin_pass))
             if r.status_code == 200:
                 response = r.json()
                 return response
@@ -542,20 +430,20 @@ class Migrate:
         status_url = self.get_reindex_url()+'/' + job
 
         try:
-            r = requests.get(url=status_url, auth=(self.super_user, self.super_pass))
+            r = requests.get(url=status_url, auth=(self.admin_user, self.admin_pass))
             response = r.json()
             return response['status']
         except requests.exceptions.RequestException as e:
             self.logger.error('Failed to get reindex status, %s', e)
             # exit_on_error()
 
-    def start_app_reindex(self, appId):
+    def start_reindex(self):
         body = ""
         if self.start_date is not None:
             body = json.dumps({'updated': self.start_date})
 
         try:
-            r = requests.post(url=self.get_reindex_url(), data=body, auth=(self.super_user, self.super_pass))
+            r = requests.post(url=self.get_reindex_url(), data=body, auth=(self.admin_user, self.admin_pass))
 
             if r.status_code == 200:
                 response = r.json()
@@ -575,41 +463,6 @@ class Migrate:
         else:
             return False
 
-    def get_dedup_status(self, job):
-        status_url = self.get_dedup_url()+'/' + job
-        try:
-            r = requests.get(url=status_url, auth=(self.super_user, self.super_pass))
-            print r.text
-            response = r.json()
-            return response['status']
-        except requests.exceptions.RequestException as e:
-            self.logger.error('Failed to get dedup status, %s', e)
-            # exit_on_error()
-
-    def start_dedup(self, app_id):
-        body = ""
-        try:
-            r = requests.post(url=self.get_dedup_url() + "/" + app_id, data=body, auth=(self.super_user, self.super_pass))
-            if r.status_code == 200:
-                response = r.json()
-                print r.text
-                return response['status']['jobStatusId']
-            else:
-                self.logger.error('Failed to start dedup, %s', r)
-                exit_on_error(str(r))
-
-        except requests.exceptions.RequestException as e:
-            self.logger.error('Unable to make API request for dedup, %s', e)
-            exit_on_error(str(e))
-
-    def is_dedup_running(self, job):
-        status = self.get_dedup_status(job)
-        self.logger.info('Dedup status=[%s]', status)
-        if status != "COMPLETE":
-            return True
-        else:
-            return False
-
     def is_endpoint_available(self):
 
         try:
@@ -637,34 +490,6 @@ class Migrate:
 
         )
 
-    def get_app_ids(self):
-
-        try:
-
-            url = self.endpoint + "/management/token"
-            body = json.dumps({"grant_type":"password","username":self.admin_user,"password":self.admin_pass})
-            r = requests.post(url=url, data=body)
-            if ( r.status_code != 200 ):
-                print "Error logging in: " + r.text
-                return
-
-            access_token = r.json()["access_token"]
-
-            url = self.endpoint + "/management/orgs/" + self.org + "/apps?access_token=" + access_token
-            r = requests.get(url=url)
-            if r.status_code != 200:
-                exit_on_error('Cannot get app ids: ' + r.text)
-
-            apps = r.json()["data"]
-            app_ids = []
-            for appId in apps.values():
-                app_ids.append(appId)
-
-            return app_ids;
-
-        except requests.exceptions.RequestException as e:
-            self.logger.error('Unable to get list of application ids, %s', e)
-            exit_on_error(str(e))
 
 def get_current_time():
     return str(int(time.time()*1000))

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8af152e6/stack/scripts/multitenant_migrate.py
----------------------------------------------------------------------
diff --git a/stack/scripts/multitenant_migrate.py b/stack/scripts/multitenant_migrate.py
new file mode 100644
index 0000000..62e46af
--- /dev/null
+++ b/stack/scripts/multitenant_migrate.py
@@ -0,0 +1,703 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+# To migrate multiple tenants within one cluster.
+#
+# STEP 1 - SETUP TENANT ONE TOMCAT RUNNING 2.1 NOT IN SERVICE AND INIT MIGRATION
+#
+#   Login to the Tomcat instance and run this command, specifying both superuser and tenant organization admin creds:
+#
+#       python migrate_entity_data.py --org <org1name> --super <user>:<pass> --admin <user>:<pass> --init
+#
+#   This command will setup and bootstrap the database, setup the migration system and update index mappings:
+#   - /system/database/setup
+#   - /system/database/bootstrap
+#   - /system/migrate/run/migration-system
+#   - /system/migrate/run/index_mapping_migration
+#
+#   Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps
+#   it will de-dup connections and re-index the app.
+#
+#   Write down the 'Re-index start' timestamp when this is finished.
+#
+# STEP 2 - PUT TENANT ONE TOMCATS IN SERVICE AND DO DELTA MIGRATION
+#
+#   On the same Tomcat instance and run this command with the --date timestamp you noted in the previous step:
+#
+#       python migrate_entity_data.py --org <org1name> --super <user>:<pass> --admin <user>:<pass> --date <timestamp>
+#
+#   Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps
+#   it will de-dup connections and re-index the app with a start-date specified so only data modified since
+#   STEP 1 will be re-indexed.
+#
+# STEP 3 - SETUP TENANT TWO TOMCAT RUNNING 2.1 NOT IN SERVICE
+#
+#   Login to the Tomcat instance and run this command, specifying both superuser and tenant organization admin creds:
+#
+#       python migrate_entity_data.py --org <org2name> --super <user>:<pass> --admin <user>:<pass>
+#
+#   This command will migrate appinfos, re-index the management app and then for each of the specified org's apps
+#   it will de-dup connections and re-index the app.
+#
+#   Write down the 'Re-index start' timestamp when this is finished.
+
+# STEP 4 - PUT TENANT TWO TOMCATS IN SERVICE AND DO DELTA MIGRATION
+#
+#   On the same Tomcat instance and run this command with the --date timestamp you noted in the previous step:
+#
+#       python migrate_entity_data.py --org <org2name> --super <user>:<pass> --admin <user>:<pass> --date <timestamp>
+#
+#   Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps
+#   it will de-dup connections and re-index the app with a start-date specified so only data modified since
+#   STEP 1 will be re-indexed.
+#
+# STEP 5 - FULL DATA MIGRATION
+#
+#   Login to any Tomcat instance in the cluster and run this command (admin user creds must be specificed but will be ignored):
+#
+#       python migrate_entity_data.py --super <user>:<pass> --admin <user>:<pass> --full
+#
+#   This command will run the full data migration.
+#
+
+import sys
+import logging
+from logging.handlers import RotatingFileHandler
+import argparse
+import time
+import requests
+import json
+
+
+# Version expected in status response post-migration for entity and app-info data
+TARGET_APPINFO_VERSION=2
+TARGET_ENTITY_DATA_VERSION=2
+TARGET_CORE_DATA_VERSION=2
+TARGET_MIGRATION_SYSTEM_VERSION = 1
+TARGET_INDEX_MAPPING_VERSION = 2
+
+# Set an interval (in seconds) for checking if re-index and/or migration has finished
+STATUS_INTERVAL_SECONDS = 2
+
+# Set plugin names
+PLUGIN_MIGRATION_SYSTEM = 'migration-system'
+PLUGIN_APPINFO = 'appinfo-migration'
+PLUGIN_ENTITYDATA = 'collections-entity-data'
+PLUGIN_INDEX_MAPPING = 'index_mapping_migration'
+PLUGIN_CORE_DATA = 'core-data'
+
+MANAGEMENT_APP_ID = 'b6768a08-b5d5-11e3-a495-11ddb1de66c8'
+
+
+
+def parse_args():
+    parser = argparse.ArgumentParser(description='Usergrid Migration Tool')
+
+    parser.add_argument('--endpoint',
+                        help='The endpoint to use for making API requests.',
+                        type=str,
+                        default='http://localhost:8080')
+
+    parser.add_argument('--super',
+                        help='Superuser username and creds <user:pass>',
+                        type=str,
+                        required=True)
+
+    parser.add_argument('--admin',
+                        help='Organization admin creds <user:pass>',
+                        type=str,
+                        required=True)
+
+    parser.add_argument('--init',
+                        help='Init system and start first migration.',
+                        action='store_true',
+                        default=False)
+
+    parser.add_argument('--org',
+                        help='Name of organization on which to run migration.',
+                        type=str,
+                        required=False)
+
+    parser.add_argument('--date',
+                        help='A date from which to start the migration',
+                        type=str)
+
+    parser.add_argument('--full',
+                        help='Run full data migration (last step in cluster migration).',
+                        action='store_true',
+                        default=False)
+
+    my_args = parser.parse_args(sys.argv[1:])
+
+    arg_vars = vars(my_args)
+
+    creds = arg_vars['super'].split(':')
+    if len(creds) != 2:
+        print('Superuser credentials not properly specified.  Must be "-u <user:pass>". Exiting...')
+        exit_on_error()
+    else:
+        arg_vars['superuser'] = creds[0]
+        arg_vars['superpass'] = creds[1]
+
+    creds = arg_vars['super'].split(':')
+    if len(creds) != 2:
+        print('Org admin credentials not properly specified.  Must be "-u <user:pass>". Exiting...')
+        exit_on_error()
+    else:
+        arg_vars['adminuser'] = creds[0]
+        arg_vars['adminpass'] = creds[1]
+
+    return arg_vars
+
+
+
+class Migrate:
+    def __init__(self):
+        self.args = parse_args()
+        self.start_date = self.args['date']
+        self.endpoint = self.args['endpoint']
+        self.metrics = {'reindex_start': '',
+                        'reindex_end': '',
+                        'appinfo_migration_start': '',
+                        'appinfo_migration_end': '',
+                        'full_data_migration_start': '',
+                        'full_data_migration_end': ''}
+        self.logger = init_logging(self.__class__.__name__)
+        self.super_user = self.args['superuser']
+        self.super_pass = self.args['superpass']
+        self.admin_user = self.args['adminuser']
+        self.admin_pass = self.args['adminpass']
+        self.org = self.args['org']
+        self.init = self.args['init']
+        self.full = self.args['full']
+
+    def run(self):
+        self.logger.info('Initializing...')
+
+        if not self.is_endpoint_available():
+            exit_on_error('Endpoint is not available, aborting')
+
+        if self.start_date is not None:
+            self.logger.info("Date Provided.  Re-index will run from date=[%s]", self.start_date)
+
+        try:
+
+            if self.full:
+
+                # Do full data migration and exit
+
+                self.start_fulldata_migration()
+
+                self.metrics['full_data_migration_start'] = get_current_time()
+                self.logger.info("Full Data Migration Started")
+                is_migrated = False
+                while not is_migrated:
+                    time.sleep(STATUS_INTERVAL_SECONDS)
+                    is_migrated = self.is_data_migrated()
+                    if is_migrated:
+                        break
+
+                self.metrics['full_data_migration_end'] = get_current_time()
+                self.logger.info("Full Data Migration completed")
+
+                self.log_metrics()
+                self.logger.info("Finished...")
+
+                return
+
+
+            if self.init:
+
+                # Init the migration system as this is the first migration done on the cluster
+
+                self.run_database_setup()
+
+                migration_system_updated = self.is_migration_system_updated()
+
+                if not migration_system_updated:
+                    self.logger.info('Migration system needs to be updated.  Updating migration system..')
+                    self.start_migration_system_update()
+                    while not migration_system_updated:
+                        time.sleep(STATUS_INTERVAL_SECONDS)
+                        migration_system_updated = self.is_migration_system_updated()
+                        if migration_system_updated:
+                            break
+
+                index_mapping_updated = self.is_index_mapping_updated()
+
+                if not index_mapping_updated:
+                    self.logger.info('Index Mapping needs to be updated.  Updating index mapping..')
+                    self.start_index_mapping_migration()
+                    while not index_mapping_updated:
+                        time.sleep(STATUS_INTERVAL_SECONDS)
+                        index_mapping_updated = self.is_index_mapping_updated()
+                        if index_mapping_updated:
+                            break
+
+
+            # Migrate app info
+
+            if self.is_appinfo_migrated():
+                self.logger.info('AppInfo already migrated. Resetting version for re-migration.')
+                self.reset_appinfo_migration()
+                time.sleep(STATUS_INTERVAL_SECONDS)
+
+            self.start_appinfo_migration()
+            self.logger.info('AppInfo Migration Started.')
+            self.metrics['appinfo_migration_start'] = get_current_time()
+
+            is_appinfo_migrated = False
+            while not is_appinfo_migrated:
+                is_appinfo_migrated = self.is_appinfo_migrated()
+                time.sleep(STATUS_INTERVAL_SECONDS)
+                if is_appinfo_migrated:
+                    self.metrics['appinfo_migration_end'] = get_current_time()
+                    break
+            self.logger.info('AppInfo Migration Ended.')
+
+
+            # Reindex management app
+
+            job = self.start_app_reindex(MANAGEMENT_APP_ID)
+            self.metrics['reindex_start'] = get_current_time()
+            self.logger.info('Started Re-index.  Job=[%s]', job)
+            is_running = True
+            while is_running:
+                time.sleep(STATUS_INTERVAL_SECONDS)
+                is_running = self.is_reindex_running(job)
+                if not is_running:
+                    break
+
+            self.logger.info("Finished Re-index. Job=[%s]", job)
+            self.metrics['reindex_end'] = get_current_time()
+
+
+            # Dedup and re-index all of organization's apps
+
+            app_ids = self.get_app_ids()
+            for app_id in app_ids:
+
+                # De-dup app
+                job = self.start_dedup(app_id)
+                self.logger.info('Started dedup.  App=[%s], Job=[%s]', app_id, job)
+                is_running = True
+                while is_running:
+                    time.sleep(STATUS_INTERVAL_SECONDS)
+                    is_running = self.is_dedup_running(job)
+                    if not is_running:
+                        break
+
+                self.logger.info("Finished dedup. App=[%s], Job=[%s]", app_id, job)
+                self.metrics['dedup_end_' + app_id] = get_current_time()
+
+                # Re-index app
+                job = self.start_app_reindex(app_id)
+                self.metrics['reindex_start_' + app_id] = get_current_time()
+                self.logger.info('Started Re-index.  App=[%s], Job=[%s]', app_id, job)
+                is_running = True
+                while is_running:
+                    time.sleep(STATUS_INTERVAL_SECONDS)
+                    is_running = self.is_reindex_running(job)
+                    if not is_running:
+                        break
+
+                self.logger.info("Finished Re-index. App=[%s], Job=[%s]", app_id, job)
+                self.metrics['reindex_end_' + app_id] = get_current_time()
+
+            self.log_metrics()
+            self.logger.info("Finished...")
+
+
+        except KeyboardInterrupt:
+            self.log_metrics()
+            self.logger.error('Keyboard interrupted migration. Please run again to ensure the migration finished.')
+
+    def get_database_setup_url(self):
+        url = self.endpoint + '/system/database/setup'
+        return url
+
+    def get_database_bootstrap_url(self):
+        url = self.endpoint + '/system/database/bootstrap'
+        return url
+
+    def get_migration_url(self):
+        url = self.endpoint + '/system/migrate/run'
+        return url
+
+    def get_reset_migration_url(self):
+        url = self.endpoint + '/system/migrate/set'
+        return url
+
+    def get_migration_status_url(self):
+        url = self.endpoint + '/system/migrate/status'
+        return url
+
+    def get_dedup_url(self):
+        url = self.endpoint + '/system/connection/dedup'
+        return url
+
+    def get_reindex_url(self):
+        url = self.endpoint + '/system/index/rebuild'
+        return url
+
+    def get_management_reindex_url(self):
+          url = self.get_reindex_url() + "/management"
+          return url
+
+    def start_core_data_migration(self):
+           try:
+               r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.super_pass))
+               response = r.json()
+               return response
+           except requests.exceptions.RequestException as e:
+               self.logger.error('Failed to start migration, %s', e)
+               exit_on_error(str(e))
+
+    def start_fulldata_migration(self):
+        try:
+            r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.super_pass))
+            response = r.json()
+            return response
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Failed to start migration, %s', e)
+            exit_on_error(str(e))
+
+    def start_migration_system_update(self):
+        try:
+            # TODO fix this URL
+            migrateUrl = self.get_migration_url() + '/' + PLUGIN_MIGRATION_SYSTEM
+            r = requests.put(url=migrateUrl, auth=(self.admin_user, self.super_pass))
+            response = r.json()
+            return response
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Failed to start migration, %s', e)
+            exit_on_error(str(e))
+
+    def run_database_setup(self):
+        try:
+            setupUrl = self.get_database_setup_url()
+            r = requests.put(url=setupUrl, auth=(self.super_user, self.super_pass))
+            if r.status_code != 200:
+                exit_on_error('Database Setup Failed')
+
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Failed to run database setup, %s', e)
+            exit_on_error(str(e))
+
+    def run_database_bootstrap(self):
+        try:
+            setupUrl = self.get_database_bootstrap_url()
+            r = requests.put(url=setupUrl, auth=(self.super_user, self.super_pass))
+            if r.status_code != 200:
+                exit_on_error('Database Bootstrap Failed')
+
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Failed to run database bootstrap, %s', e)
+            exit_on_error(str(e))
+
+    def start_index_mapping_migration(self):
+        try:
+            migrateUrl = self.get_migration_url() + '/' + PLUGIN_INDEX_MAPPING
+            r = requests.put(url=migrateUrl, auth=(self.super_user, self.super_pass))
+            response = r.json()
+            return response
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Failed to start migration, %s', e)
+            exit_on_error(str(e))
+
+    def start_appinfo_migration(self):
+        try:
+            migrateUrl = self.get_migration_url() + '/' + PLUGIN_APPINFO
+            r = requests.put(url=migrateUrl, auth=(self.admin_user, self.admin_pass))
+            response = r.json()
+            return response
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Failed to start migration, %s', e)
+            exit_on_error(str(e))
+
+    def reset_data_migration(self):
+        version = TARGET_ENTITY_DATA_VERSION - 1
+        body = json.dumps({PLUGIN_ENTITYDATA: version, PLUGIN_APPINFO: version})
+        try:
+            r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.super_user, self.super_pass))
+            response = r.json()
+            self.logger.info('Resetting data migration versions to %s=[%s] '
+                             'and %s=[%s]', PLUGIN_ENTITYDATA, version, PLUGIN_APPINFO, version)
+            return response
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Failed to reset full data migration versions, %s', e)
+            exit_on_error(str(e))
+
+    def reset_appinfo_migration(self):
+        version = TARGET_APPINFO_VERSION - 1
+        body = json.dumps({PLUGIN_APPINFO: version})
+        try:
+            r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.super_user, self.super_pass))
+            response = r.json()
+            self.logger.info('Resetting appinfo migration versions to %s=[%s]', PLUGIN_APPINFO, version)
+            return response
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Failed to reset appinfo migration version, %s', e)
+            exit_on_error(str(e))
+
+    def is_data_migrated(self):
+        status = self.check_data_migration_status()
+        if status is not None:
+            entity_version = status['data'][PLUGIN_ENTITYDATA]
+            appinfo_version = status['data'][PLUGIN_APPINFO]
+            core_data_version = status['data'][PLUGIN_CORE_DATA]
+
+            if entity_version == TARGET_ENTITY_DATA_VERSION and appinfo_version == TARGET_APPINFO_VERSION and core_data_version == TARGET_CORE_DATA_VERSION:
+                self.logger.info('Full Data Migration status=[COMPLETE], %s=[%s], '
+                                 '%s=[%s], %s=%s',
+                                 PLUGIN_ENTITYDATA,
+                                 entity_version,
+                                 PLUGIN_APPINFO,
+                                 appinfo_version,
+                                 PLUGIN_CORE_DATA,
+                                 core_data_version)
+                return True
+            else:
+                self.logger.info('Full Data Migration status=[NOTSTARTED/INPROGRESS]')
+        return False
+
+    def is_appinfo_migrated(self):
+        status = self.check_data_migration_status()
+        if status is not None:
+            appinfo_version = status['data'][PLUGIN_APPINFO]
+
+            if appinfo_version == TARGET_APPINFO_VERSION:
+                self.logger.info('AppInfo Migration status=[COMPLETE],'
+                                 '%s=[%s]',
+                                 PLUGIN_APPINFO,
+                                 appinfo_version)
+                return True
+            else:
+                self.logger.info('AppInfo Migration status=[NOTSTARTED/INPROGRESS]')
+        return False
+
+    def is_migration_system_updated(self):
+        status = self.check_data_migration_status()
+        if status is not None:
+            migration_system_version = status['data'][PLUGIN_MIGRATION_SYSTEM]
+
+            if migration_system_version == TARGET_MIGRATION_SYSTEM_VERSION:
+                self.logger.info('Migration System CURRENT, %s=[%s]',
+                                 PLUGIN_MIGRATION_SYSTEM,
+                                 migration_system_version)
+                return True
+            else:
+                self.logger.info('Migration System OLD, %s=[%s]',
+                                 PLUGIN_MIGRATION_SYSTEM,
+                                 migration_system_version)
+        return False
+
+    def is_index_mapping_updated(self):
+        status = self.check_data_migration_status()
+        if status is not None:
+            index_mapping_version = status['data'][PLUGIN_INDEX_MAPPING]
+
+            if index_mapping_version == TARGET_INDEX_MAPPING_VERSION:
+                self.logger.info('Index Mapping CURRENT, %s=[%s]',
+                                 PLUGIN_INDEX_MAPPING,
+                                 index_mapping_version)
+                return True
+            else:
+                self.logger.info('Index Mapping OLD, %s=[%s]',
+                                 PLUGIN_INDEX_MAPPING,
+                                 index_mapping_version)
+        return False
+
+    def check_data_migration_status(self):
+
+        try:
+            r = requests.get(url=self.get_migration_status_url(), auth=(self.super_user, self.super_pass))
+            if r.status_code == 200:
+                response = r.json()
+                return response
+            else:
+                self.logger.error('Failed to check migration status, %s', r)
+                return
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Failed to check migration status, %s', e)
+            # exit_on_error()
+
+    def get_reindex_status(self, job):
+        status_url = self.get_reindex_url()+'/' + job
+
+        try:
+            r = requests.get(url=status_url, auth=(self.super_user, self.super_pass))
+            response = r.json()
+            return response['status']
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Failed to get reindex status, %s', e)
+            # exit_on_error()
+
+    def start_app_reindex(self, appId):
+        body = ""
+        if self.start_date is not None:
+            body = json.dumps({'updated': self.start_date})
+
+        try:
+            r = requests.post(url=self.get_reindex_url(), data=body, auth=(self.super_user, self.super_pass))
+
+            if r.status_code == 200:
+                response = r.json()
+                return response['jobId']
+            else:
+                self.logger.error('Failed to start reindex, %s', r)
+                exit_on_error(str(r))
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Unable to make API request for reindex, %s', e)
+            exit_on_error(str(e))
+
+    def is_reindex_running(self, job):
+        status = self.get_reindex_status(job)
+        self.logger.info('Re-index status=[%s]', status)
+        if status != "COMPLETE":
+            return True
+        else:
+            return False
+
+    def get_dedup_status(self, job):
+        status_url = self.get_dedup_url()+'/' + job
+        try:
+            r = requests.get(url=status_url, auth=(self.super_user, self.super_pass))
+            response = r.json()
+            return response['status']['status']
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Failed to get dedup status, %s', e)
+            # exit_on_error()
+
+    def start_dedup(self, app_id):
+        body = ""
+        try:
+            r = requests.post(url=self.get_dedup_url() + "/" + app_id, data=body, auth=(self.super_user, self.super_pass))
+            if r.status_code == 200:
+                response = r.json()
+                return response['status']['jobStatusId']
+            else:
+                self.logger.error('Failed to start dedup, %s', r)
+                exit_on_error(str(r))
+
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Unable to make API request for dedup, %s', e)
+            exit_on_error(str(e))
+
+    def is_dedup_running(self, job):
+        status = self.get_dedup_status(job)
+        self.logger.info('Dedup status=[%s]', status)
+        if status != "COMPLETE":
+            return True
+        else:
+            return False
+
+    def is_endpoint_available(self):
+
+        try:
+            r = requests.get(url=self.endpoint+'/status')
+            if r.status_code == 200:
+                return True
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Endpoint is unavailable, %s', str(e))
+            return False
+
+    def log_metrics(self):
+        self.logger.info(
+            'Re-index start=[%s], ' +
+            'Re-index end =[%s], ' +
+            'Full Data Migration start=[%s], ' +
+            'Full Data Migration end=[%s] ' +
+            'AppInfo Migration start=[%s], ' +
+            'AppInfo Migration end=[%s] ',
+            self.metrics['reindex_start'],
+            self.metrics['reindex_end'],
+            self.metrics['full_data_migration_start'],
+            self.metrics['full_data_migration_end'],
+            self.metrics['appinfo_migration_start'],
+            self.metrics['appinfo_migration_end']
+
+        )
+
+    def get_app_ids(self):
+
+        try:
+
+            url = self.endpoint + "/management/token"
+            body = json.dumps({"grant_type":"password","username":self.admin_user,"password":self.admin_pass})
+            r = requests.post(url=url, data=body)
+            if ( r.status_code != 200 ):
+                print "Error logging in: " + r.text
+                return
+
+            access_token = r.json()["access_token"]
+
+            url = self.endpoint + "/management/orgs/" + self.org + "/apps?access_token=" + access_token
+            r = requests.get(url=url)
+            if r.status_code != 200:
+                exit_on_error('Cannot get app ids: ' + r.text)
+
+            apps = r.json()["data"]
+            app_ids = []
+            for appId in apps.values():
+                app_ids.append(appId)
+
+            print app_ids
+
+            return app_ids
+
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Unable to get list of application ids, %s', e)
+            exit_on_error(str(e))
+
+def get_current_time():
+    return str(int(time.time()*1000))
+
+
+def exit_on_error(e=""):
+    print ('Exiting migration script due to error: ' + str(e))
+    sys.exit(1)
+
+
+def init_logging(name):
+
+    logger = logging.getLogger(name)
+    log_file_name = './migration.log'
+    log_formatter = logging.Formatter(fmt='%(asctime)s [%(name)s] %(levelname)s %(message)s',
+                                      datefmt='%Y-%m-%d %H:%M:%S')
+
+    rotating_file = logging.handlers.RotatingFileHandler(filename=log_file_name,
+                                                         mode='a',
+                                                         maxBytes=104857600,
+                                                         backupCount=10)
+    rotating_file.setFormatter(log_formatter)
+    rotating_file.setLevel(logging.INFO)
+    logger.addHandler(rotating_file)
+    logger.setLevel(logging.INFO)
+
+    stdout_logger = logging.StreamHandler(sys.stdout)
+    stdout_logger.setFormatter(log_formatter)
+    stdout_logger.setLevel(logging.INFO)
+    logger.addHandler(stdout_logger)
+
+    return logger
+
+if __name__ == '__main__':
+
+    migration = Migrate()
+    migration.run()