You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/11/06 21:38:16 UTC

[22/50] [abbrv] usergrid git commit: First and untested stab at multi-tenant migration.

First and untested stab at multi-tenant migration.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d79bf4c3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d79bf4c3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d79bf4c3

Branch: refs/heads/asf-site
Commit: d79bf4c36810fdab578a86e4de0ae725f1aa6e75
Parents: 2686054
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Oct 29 08:56:14 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Oct 29 08:56:14 2015 -0400

----------------------------------------------------------------------
 stack/scripts/create_test_data.py    | 213 +++++++++++++++++++++
 stack/scripts/migrate_entity_data.py | 301 +++++++++++++++++++++---------
 2 files changed, 429 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d79bf4c3/stack/scripts/create_test_data.py
----------------------------------------------------------------------
diff --git a/stack/scripts/create_test_data.py b/stack/scripts/create_test_data.py
new file mode 100644
index 0000000..de85da0
--- /dev/null
+++ b/stack/scripts/create_test_data.py
@@ -0,0 +1,213 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+import logging
+from logging.handlers import RotatingFileHandler
+import argparse
+import time
+import requests
+import json
+
+# Creates two organizations each with two apps each with three collections each with 100 entities
+# Caller must provide a "slug" string which will be used as a prefix for all names
+#
+# For example, if the slug is mytest then:
+#   Orgs will be named mytest_org0 and mytest_org1
+#   Apps will be named mytest_org0_app0 and so on
+#   Collections will be named mytest_org0_app0_col0 and so on
+#   Entities will be named mytest_org0_app0_col0_entity and so on
+#   Org admins will be named mytest_org0_admin and mytest_org1_admin (both with password test) 
+
+def parse_args():
+    parser = argparse.ArgumentParser(description="Usergrid Test Data Creation Tool")
+
+    parser.add_argument("--endpoint",
+                        help="The endpoint to use for making API requests.",
+                        type=str,
+                        default="http://localhost:8080")
+
+    parser.add_argument("--user",
+                        help="System Admin Credentials used to authenticate with Usergrid  <user:pass>",
+                        type=str,
+                        required=True)
+
+    parser.add_argument("--slug",
+                        help="Unique string to be used to name organization, applications and other things create",
+                        type=str,
+                        required=True)
+
+    my_args = parser.parse_args(sys.argv[1:])
+
+    arg_vars = vars(my_args)
+    creds = arg_vars["user"].split(":")
+    if len(creds) != 2:
+        print("Credentials not properly specified.  Must be '-u <user:pass>'. Exiting...")
+        exit_on_error()
+    else:
+        arg_vars["user"] = creds[0]
+        arg_vars["pass"] = creds[1]
+
+    return arg_vars
+
+
+class Creator:
+    def __init__(self):
+        self.args = parse_args()
+        self.endpoint = self.args["endpoint"]
+        self.logger = init_logging(self.__class__.__name__)
+        self.admin_user = self.args["user"]
+        self.admin_pass = self.args["pass"]
+        self.slug = self.args["slug"]
+
+    def run(self):
+        self.logger.info("Initializing...")
+
+        if not self.is_endpoint_available():
+            exit_on_error("Endpoint is not available, aborting")
+
+        for orgIndex in range(2):
+            orgName = self.slug + "_org" + str(orgIndex)
+            orgUser = orgName + "_admin"
+            orgEmail = orgUser + "@example.com"
+
+            url = self.endpoint + "/management/orgs"
+            body = json.dumps({"username":orgUser, "email":orgEmail, "password":"test", "organization":orgName })
+            r = requests.post(url=url, data=body, auth=(self.admin_user, self.admin_pass))
+            if ( r.status_code >= 400 ):
+                print "Error creating organization " + orgName + ": " + r.text
+                return
+
+            print "Created org " + orgName
+
+            url = self.endpoint + "/management/token"
+            body = json.dumps({"grant_type":"password","username":orgUser,"password":"test"})
+            r = requests.post(url=url, data=body)
+            if ( r.status_code != 200 ):
+                print "Error logging into organization " + orgName + ": " + r.text
+                return
+            
+            accessToken = r.json()["access_token"]
+
+            for appIndex in range(2):
+                appName = orgName + "_app" + str(appIndex)
+
+                url = self.endpoint + "/management/orgs/" + orgName + "/apps?access_token=" + accessToken
+                body = json.dumps({"name":appName})
+                r = requests.post(url=url, data=body, auth=(self.admin_user, self.admin_pass))
+                if ( r.status_code >= 400 ):
+                    print "Error creating application" + appName + ": " + r.text
+                    return
+
+                print "   Created app: " + orgName + "/" + appName
+                appUrl = self.endpoint + "/" + orgName + "/" + appName
+                time.sleep(2) 
+
+                for userIndex in range(2):
+                    userName = appName + "_user" + str(userIndex)
+                    email = userName + "@example.com" 
+
+                    url = appUrl + "/users?access_token=" + accessToken
+                    body = json.dumps({"name":userName, "username":userName, "email":email, "password":"test"})
+                    r = requests.post(url=url, data=body)
+                    if ( r.status_code >= 400 ):
+                        print "Error creating user " + userName + ": " + r.text
+                        return
+
+                for colIndex in range(3):
+                    colName = appName + "_col" + str(colIndex)
+                    print "      Creating collection: " + colName
+
+                    for entityIndex in range(100):
+                        entityName = colName + "_entity" + str(entityIndex)
+
+                        url = appUrl + "/" + colName + "s?access_token=" + accessToken
+                        body = json.dumps({"name":entityName})
+                        r = requests.post(url=url, data=body)
+                        if ( r.status_code >= 400 ):
+                            print "Error creating entity" + userName + ": " + r.text
+                            retur
+
+                # connect entities in collection 0 to collection 1
+                for entityIndex in range(100):
+                    sourceCollection = appName + "_col0s"
+                    sourceName = appName + "_col0_entity" + str(entityIndex)
+                    targetName = appName + "_col1_entity" + str(entityIndex)
+                    targetType = appName + "_col1"
+                    url = appUrl + "/" + sourceCollection + "/" + sourceName + "/has/" + targetType + "/" + targetName
+                    r = requests.post(url=url + "?access_token=" + accessToken)
+                    if ( r.status_code >= 400 ):
+                        print "Error connecting entity " + sourceName + " to " + targetName + ": " + r.text
+                        print "url is: " + url
+                        return
+
+                # connect entities in collection 1 to collection 2
+                for entityIndex in range(100):
+                    sourceCollection = appName + "_col1s"
+                    sourceName = appName + "_col1_entity" + str(entityIndex)
+                    targetName = appName + "_col2_entity" + str(entityIndex)
+                    targetType = appName + "_col2"
+                    url = appUrl + "/" + sourceCollection + "/" + sourceName + "/has/" + targetType + "/" + targetName
+                    r = requests.post(url=url + "?access_token=" + accessToken)
+                    if ( r.status_code >= 400 ):
+                        print "Error connecting entity " + sourceName + " to " + targetName + ": " + r.text
+                        print "url is: " + url
+                        return
+
+    def is_endpoint_available(self):
+
+        try:
+            r = requests.get(url=self.endpoint+"/status")
+            if r.status_code == 200:
+                return True
+        except requests.exceptions.RequestException as e:
+            self.logger.error("Endpoint is unavailable, %s", str(e))
+            return False
+
+
+def exit_on_error(e=""):
+    print ("Exiting script due to error: " + str(e))
+    sys.exit(1)
+
+
+def init_logging(name):
+
+    logger = logging.getLogger(name)
+    log_file_name = "./create-test-data.log"
+    log_formatter = logging.Formatter(fmt="%(asctime)s [%(name)s] %(levelname)s %(message)s",
+                                      datefmt="%Y-%m-%d %H:%M:%S")
+
+    rotating_file = logging.handlers.RotatingFileHandler(filename=log_file_name,
+                                                         mode="a",
+                                                         maxBytes=104857600,
+                                                         backupCount=10)
+    rotating_file.setFormatter(log_formatter)
+    rotating_file.setLevel(logging.INFO)
+    logger.addHandler(rotating_file)
+    logger.setLevel(logging.INFO)
+
+    stdout_logger = logging.StreamHandler(sys.stdout)
+    stdout_logger.setFormatter(log_formatter)
+    stdout_logger.setLevel(logging.INFO)
+    logger.addHandler(stdout_logger)
+
+    return logger
+
+if __name__ == "__main__":
+
+    creator = Creator()
+    creator.run()

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d79bf4c3/stack/scripts/migrate_entity_data.py
----------------------------------------------------------------------
diff --git a/stack/scripts/migrate_entity_data.py b/stack/scripts/migrate_entity_data.py
index 36f73c0..f576108 100644
--- a/stack/scripts/migrate_entity_data.py
+++ b/stack/scripts/migrate_entity_data.py
@@ -16,30 +16,50 @@
 # under the License.
 #
 #
+# To migrate multiple tenants within one cluster.
 #
-# Usage from a machine running Usergrid with the new Usergrid version:
+# STEP 1 - SETUP TENANT ONE TOMCAT RUNNING 2.1 NOT IN SERVICE AND INIT MIGRATION
 #
-# ######################################################
-# STEP 1 - BEFORE SWITCHING TRAFFIC TO NEW UG VERSION
-# ######################################################
+#   python migrate_entity_data.py --org <org1name> --user <superuser>:<superpass> --init
 #
-# python migrate_entity_data.py --user adminuser:adminpass
+#   This command will setup and bootstrap the database, setup the migration system and update index mappings:
+#   - /system/database/setup
+#   - /system/database/bootstrap
+#   - /system/migrate/run/migration-system
+#   - /system/migrate/run/index_mapping_migration
 #
-# The above command performs an appinfo migration and system re-index only.  This creates indices in Elasticsearch with
-# the updated indexing strategy in the new Usergrid version.
+#   Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps
+#   it will de-dup connections and re-index the app.
 #
-# ######################################################
-# STEP 2 - AFTER SWITCHING TRAFFIC TO NEW UG VERSION
-# ######################################################
+# STEP 2 - PUT TENANT ONE TOMCATS IN SERVICE AND DO DELTA MIGRATION
 #
-# python migrate_entity_data.py --user adminuser:adminpass --delta --date <timestamp>
+#   python migrate_entity_data.py --org <org1name> --user <superuser>:<superpass> --date
+#
+#   Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps
+#   it will de-dup connections and re-index the app with a start-date specified so only data modified since
+#   STEP 1 will be re-indexed.
+#
+# STEP 3 - SETUP TENENT TWO TOMCAT RUNNING 2.1 NOT IN SERVICE
+#
+#   python migrate_entity_data.py --org <org2name> --user <superuser>:<superpass> --date
+#
+#   This command will migrate appinfos, re-index the management app and then for each of the specified org's apps
+#   it will de-dup connections and re-index the app.
+#
+# STEP 4 - PUT TENANT TWO TOMCATS IN SERVICE AND DO DELTA MIGRATION
+#
+#   python migrate_entity_data.py --org <org2name> --user <superuser>:<superpass> --date
+#
+#   Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps
+#   it will de-dup connections and re-index the app with a start-date specified so only data modified since
+#   STEP 1 will be re-indexed.
+#
+# STEP 5 - FULL DATA MIGRATION
+#
+#   python migrate_entity_data.py --user <superuser>:<superpass> --full
+#
+#   This command will run the full data migration.
 #
-# The above command performs an appinfo migration, system re-index using a start date, and full data migration which
-# includes entity data.  This step is necessary to ensure Usergrid starts reading and writing data from the latest
-# entity version, including delta indexing of any documents create during the time between STEP 1 and STEP 2.  If
-# all data has already been migrated (running this a 2nd, 3rd, etc. time), then the appinfo migration will be skipped.
-
-
 
 import sys
 import logging
@@ -72,10 +92,6 @@ PLUGIN_CORE_DATA = 'core-data'
 def parse_args():
     parser = argparse.ArgumentParser(description='Usergrid Migration Tool')
 
-    parser.add_argument('--date',
-                        help='A date from which to start the migration',
-                        type=str)
-
     parser.add_argument('--endpoint',
                         help='The endpoint to use for making API requests.',
                         type=str,
@@ -86,8 +102,22 @@ def parse_args():
                         type=str,
                         required=True)
 
-    parser.add_argument('--delta',
-                        help='Run a delta migration.',
+    parser.add_argument('--init',
+                        help='Init system and start first migration.',
+                        action='store_true',
+                        default=False)
+
+    parser.add_argument('--org',
+                        help='Name of organization on which to run migration.',
+                        type=str,
+                        required=False)
+
+    parser.add_argument('--date',
+                        help='A date from which to start the migration',
+                        type=str)
+
+    parser.add_argument('--full',
+                        help='Run full data migration (last step in cluster migration).',
                         action='store_true',
                         default=False)
 
@@ -120,7 +150,9 @@ class Migrate:
         self.logger = init_logging(self.__class__.__name__)
         self.admin_user = self.args['user']
         self.admin_pass = self.args['pass']
-        self.delta_migration = self.args['delta']
+        self.org = self.args['org']
+        self.init = self.args['init']
+        self.full = self.args['full']
 
     def run(self):
         self.logger.info('Initializing...')
@@ -133,62 +165,82 @@ class Migrate:
 
         try:
 
-            self.run_database_setup()
+            if self.full:
 
-            # We need to check and roll the migration system to 1 if not already
-            migration_system_updated = self.is_migration_system_updated()
+                # Do full data migration and exit
 
-            if not migration_system_updated:
-                self.logger.info('Migration system needs to be updated.  Updating migration system..')
-                self.start_migration_system_update()
-                while not migration_system_updated:
+                self.start_fulldata_migration()
+
+                self.metrics['full_data_migration_start'] = get_current_time()
+                self.logger.info("Full Data Migration Started")
+                is_migrated = False
+                while not is_migrated:
                     time.sleep(STATUS_INTERVAL_SECONDS)
-                    migration_system_updated = self.is_migration_system_updated()
-                    if migration_system_updated:
+                    is_migrated = self.is_data_migrated()
+                    if is_migrated:
                         break
 
-            index_mapping_updated = self.is_index_mapping_updated()
+                self.metrics['full_data_migration_end'] = get_current_time()
+                self.logger.info("Full Data Migration completed")
 
-            if not index_mapping_updated:
-                self.logger.info('Index Mapping needs to be updated.  Updating index mapping..')
-                self.start_index_mapping_migration()
-                while not index_mapping_updated:
-                    time.sleep(STATUS_INTERVAL_SECONDS)
-                    index_mapping_updated = self.is_index_mapping_updated()
-                    if index_mapping_updated:
-                        break
+                self.log_metrics()
+                self.logger.info("Finished...")
 
-            # Run AppInfo migration only when both appinfos and collection entity data have not been migrated
-            if not self.is_data_migrated():
+                return
 
-                #Migrate app info
-                if self.is_appinfo_migrated():
-                    self.logger.info('AppInfo already migrated. Resetting version for re-migration.')
-                    self.reset_appinfo_migration()
-                    time.sleep(STATUS_INTERVAL_SECONDS)
 
-                self.start_appinfo_migration()
-                self.logger.info('AppInfo Migration Started.')
-                self.metrics['appinfo_migration_start'] = get_current_time()
+            if self.init:
 
-                is_appinfo_migrated = False
-                while not is_appinfo_migrated:
-                    is_appinfo_migrated = self.is_appinfo_migrated()
-                    time.sleep(STATUS_INTERVAL_SECONDS)
-                    if is_appinfo_migrated:
-                        self.metrics['appinfo_migration_end'] = get_current_time()
-                        break
-                self.logger.info('AppInfo Migration Ended.')
+                # Init the migration system as this is the first migration done on the cluster
 
+                self.run_database_setup()
 
-            else:
-                self.logger.info('Full Data Migration previously ran... skipping AppInfo migration.')
+                migration_system_updated = self.is_migration_system_updated()
 
+                if not migration_system_updated:
+                    self.logger.info('Migration system needs to be updated.  Updating migration system..')
+                    self.start_migration_system_update()
+                    while not migration_system_updated:
+                        time.sleep(STATUS_INTERVAL_SECONDS)
+                        migration_system_updated = self.is_migration_system_updated()
+                        if migration_system_updated:
+                            break
 
+                index_mapping_updated = self.is_index_mapping_updated()
 
-            # We need to check and roll index mapping version to 1 if not already there
+                if not index_mapping_updated:
+                    self.logger.info('Index Mapping needs to be updated.  Updating index mapping..')
+                    self.start_index_mapping_migration()
+                    while not index_mapping_updated:
+                        time.sleep(STATUS_INTERVAL_SECONDS)
+                        index_mapping_updated = self.is_index_mapping_updated()
+                        if index_mapping_updated:
+                            break
+
+
+            # Migrate app info
+
+            if self.is_appinfo_migrated():
+                self.logger.info('AppInfo already migrated. Resetting version for re-migration.')
+                self.reset_appinfo_migration()
+                time.sleep(STATUS_INTERVAL_SECONDS)
+
+            self.start_appinfo_migration()
+            self.logger.info('AppInfo Migration Started.')
+            self.metrics['appinfo_migration_start'] = get_current_time()
+
+            is_appinfo_migrated = False
+            while not is_appinfo_migrated:
+                is_appinfo_migrated = self.is_appinfo_migrated()
+                time.sleep(STATUS_INTERVAL_SECONDS)
+                if is_appinfo_migrated:
+                    self.metrics['appinfo_migration_end'] = get_current_time()
+                    break
+            self.logger.info('AppInfo Migration Ended.')
+
+
+            # Reindex management app
 
-            # Perform system re-index (it will grab date from input if provided)
             job = self.start_reindex()
             self.metrics['reindex_start'] = get_current_time()
             self.logger.info('Started Re-index.  Job=[%s]', job)
@@ -202,33 +254,44 @@ class Migrate:
             self.logger.info("Finished Re-index. Job=[%s]", job)
             self.metrics['reindex_end'] = get_current_time()
 
-            # Only when we do a delta migration do we run the full data migration (includes appinfo and entity data)
-            if self.delta_migration:
 
-                self.logger.info('Delta option provided. Performing full data migration...')
-                if self.is_data_migrated():
-                    self.reset_data_migration()
-                time.sleep(STATUS_INTERVAL_SECONDS)
-                self.is_data_migrated()
+            # Dedup and re-index all of organization's apps
 
-                # self.start_core_data_migration()
-                self.start_fulldata_migration()
+            app_ids = self.get_app_ids()
+            for app_id in app_ids:
 
-                self.metrics['full_data_migration_start'] = get_current_time()
-                self.logger.info("Full Data Migration Started")
-                is_migrated = False
-                while not is_migrated:
+                # De-dep app
+                job = self.start_app_dedup(app_id)
+                self.metrics['dedup_start_' + app_id] = get_current_time()
+                self.logger.info('Started dedup.  App=[%s], Job=[%s]', app_id, job)
+                is_running = True
+                while is_running:
                     time.sleep(STATUS_INTERVAL_SECONDS)
-                    is_migrated = self.is_data_migrated()
-                    if is_migrated:
+                    is_running = self.is_reindex_running(job)
+                    if not is_running:
                         break
 
-                self.metrics['full_data_migration_end'] = get_current_time()
-                self.logger.info("Full Data Migration completed")
+                self.logger.info("Finished dedup. App=[%s], Job=[%s]", app_id, job)
+                self.metrics['dedup_end_' + app_id] = get_current_time()
+
+                # Re-index app
+                job = self.start_app_reindex(app_id)
+                self.metrics['reindex_start_' + app_id] = get_current_time()
+                self.logger.info('Started Re-index.  App=[%s], Job=[%s]', app_id, job)
+                is_running = True
+                while is_running:
+                    time.sleep(STATUS_INTERVAL_SECONDS)
+                    is_running = self.is_reindex_running(job)
+                    if not is_running:
+                        break
+
+                self.logger.info("Finished Re-index. App=[%s], Job=[%s]", app_id, job)
+                self.metrics['reindex_end_' + app_id] = get_current_time()
 
             self.log_metrics()
             self.logger.info("Finished...")
 
+
         except KeyboardInterrupt:
             self.log_metrics()
             self.logger.error('Keyboard interrupted migration. Please run again to ensure the migration finished.')
@@ -237,6 +300,10 @@ class Migrate:
         url = self.endpoint + '/system/database/setup'
         return url
 
+    def get_database_bootstrap_url(self):
+        url = self.endpoint + '/system/database/bootstrap'
+        return url
+
     def get_migration_url(self):
         url = self.endpoint + '/system/migrate/run'
         return url
@@ -249,6 +316,10 @@ class Migrate:
         url = self.endpoint + '/system/migrate/status'
         return url
 
+    def get_dedup_url(self):
+        url = self.endpoint + '/system/connection/dedup'
+        return url
+
     def get_reindex_url(self):
         url = self.endpoint + '/system/index/rebuild'
         return url
@@ -257,7 +328,6 @@ class Migrate:
           url = self.get_reindex_url() + "/management"
           return url
 
-
     def start_core_data_migration(self):
            try:
                r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.admin_pass))
@@ -267,7 +337,6 @@ class Migrate:
                self.logger.error('Failed to start migration, %s', e)
                exit_on_error(str(e))
 
-
     def start_fulldata_migration(self):
         try:
             r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.admin_pass))
@@ -279,7 +348,7 @@ class Migrate:
 
     def start_migration_system_update(self):
         try:
-            #TODO fix this URL
+            # TODO fix this URL
             migrateUrl = self.get_migration_url() + '/' + PLUGIN_MIGRATION_SYSTEM
             r = requests.put(url=migrateUrl, auth=(self.admin_user, self.admin_pass))
             response = r.json()
@@ -299,6 +368,17 @@ class Migrate:
             self.logger.error('Failed to run database setup, %s', e)
             exit_on_error(str(e))
 
+    def run_database_bootstrap(self):
+        try:
+            setupUrl = self.get_database_bootstrap_url()
+            r = requests.put(url=setupUrl, auth=(self.admin_user, self.admin_pass))
+            if r.status_code != 200:
+                exit_on_error('Database Bootstrap Failed')
+
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Failed to run database bootstrap, %s', e)
+            exit_on_error(str(e))
+
     def start_index_mapping_migration(self):
         try:
             migrateUrl = self.get_migration_url() + '/' + PLUGIN_INDEX_MAPPING
@@ -437,7 +517,7 @@ class Migrate:
             self.logger.error('Failed to get reindex status, %s', e)
             # exit_on_error()
 
-    def start_reindex(self):
+    def start_app_reindex(self, appId):
         body = ""
         if self.start_date is not None:
             body = json.dumps({'updated': self.start_date})
@@ -463,6 +543,39 @@ class Migrate:
         else:
             return False
 
+    def get_dedup_status(self, job):
+        status_url = self.get_dedup_url()+'/' + job
+        try:
+            r = requests.get(url=status_url, auth=(self.admin_user, self.admin_pass))
+            response = r.json()
+            return response['status']
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Failed to get dedup status, %s', e)
+            # exit_on_error()
+
+    def start_dedup(self, app_id):
+        body = ""
+        try:
+            r = requests.post(url=self.get_dedup_url() + "/" + app_id, data=body, auth=(self.admin_user, self.admin_pass))
+            if r.status_code == 200:
+                response = r.json()
+                return response['jobId']
+            else:
+                self.logger.error('Failed to start dedup, %s', r)
+                exit_on_error(str(r))
+
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Unable to make API request for dedup, %s', e)
+            exit_on_error(str(e))
+
+    def is_dedup_running(self, job):
+        status = self.get_dedup_status(job)
+        self.logger.info('Dedup status=[%s]', status)
+        if status != "COMPLETE":
+            return True
+        else:
+            return False
+
     def is_endpoint_available(self):
 
         try:
@@ -490,6 +603,24 @@ class Migrate:
 
         )
 
+    def get_app_ids(self):
+
+        try:
+            url = self.endpoint + "/management/orgs/" + self.org + "/apps"
+            r = requests.get(url=url)
+            if r.status_code != 200:
+                exit_on_error('Database Bootstrap Failed')
+
+            apps = r.json()["data"]
+            app_ids = []
+            for appId in apps.values():
+                app_ids.append(appId)
+
+            return app_ids;
+
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Unable to get list of application ids, %s', e)
+            exit_on_error(str(e))
 
 def get_current_time():
     return str(int(time.time()*1000))