You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/05/27 22:29:38 UTC

[ambari] branch trunk updated: AMBARI-23945. Re-implement migration helper (use params from ini file)

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 59855d1  AMBARI-23945. Re-implement migration helper (use params from ini file)
59855d1 is described below

commit 59855d15ae93cbb8d5fba3b0498b614d49218314
Author: Oliver Szabo <ol...@gmail.com>
AuthorDate: Mon May 28 00:00:44 2018 +0200

    AMBARI-23945. Re-implement migration helper (use params from ini file)
---
 .../src/main/python/migrationConfigGenerator.py    |  11 +-
 .../src/main/python/migrationHelper.py             | 816 +++++++++++++++++++--
 2 files changed, 749 insertions(+), 78 deletions(-)

diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationConfigGenerator.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationConfigGenerator.py
index 51d92be..487e577 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationConfigGenerator.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationConfigGenerator.py
@@ -88,7 +88,7 @@ def create_solr_api_request_command(request_url, user='infra-solr', kerberos_ena
   curl_prefix = "curl -k"
   if output is not None:
     curl_prefix+=" -o {0}".format(output)
-  api_cmd = '{0} kinit -kt {1} {2} && {3} --negotiate -u : "{4}"'.format(use_infra_solr_user, keytab, principal, curl_prefix, request_url) \
+  api_cmd = '{0} kinit -kt {1} {2} && {3} {4} --negotiate -u : "{5}"'.format(use_infra_solr_user, keytab, principal, use_infra_solr_user, curl_prefix, request_url) \
     if kerberos_enabled == 'true' else '{0} {1} "{2}"'.format(use_infra_solr_user, curl_prefix, request_url)
   return api_cmd
 
@@ -96,7 +96,7 @@ def get_random_solr_url(solr_urls):
   splitted_solr_urls = solr_urls.split(',')
   random_index = randrange(0, len(splitted_solr_urls))
   result = splitted_solr_urls[random_index]
-  logger.debug("Use {0} for request ...".format(result))
+  logger.debug("Use {0} for request.".format(result))
   return result
 
 def retry(func, *args, **kwargs):
@@ -106,7 +106,7 @@ def retry(func, *args, **kwargs):
   for r in range(retry_count):
     try:
       result = func(*args, **kwargs)
-      if result: return result
+      if result is not None: return result
     except Exception as e:
       logger.debug("Error occurred during {0} operation: {1}".format(context, str(traceback.format_exc())))
     logger.info("{0}: waiting for {1} seconds before retyring again (retry count: {2})".format(context, delay, r+1))
@@ -318,7 +318,7 @@ def generate_ambari_solr_migration_ini_file(options, accessor, protocol):
     external_zk_connection_string = infra_solr_env_props['infra_solr_zookeeper_quorum'] if 'infra_solr_zookeeper_quorum' in infra_solr_env_props else default_zk_quorum
     if default_zk_quorum != external_zk_connection_string:
       print "Found external zk connection string: {0}".format(external_zk_connection_string)
-      config.set('infra_solr', 'external_zk_connection_string', external_zk_connection_string)
+      config.set('infra_solr', 'external_zk_connect_string', external_zk_connection_string)
     config.set('infra_solr', 'zk_principal_user', zk_principal_user)
 
   state_json_map = retry(get_state_json_map, solr_urls, infra_solr_user, security_enabled, infra_solr_kerberos_keytab, infra_solr_kerberos_principal, count=options.retry, delay=options.delay, context="Get clusterstate.json")
@@ -342,7 +342,7 @@ def generate_ambari_solr_migration_ini_file(options, accessor, protocol):
           config.set('ranger_collection', 'ranger_collection_shards', coll_shard_map[ranger_collection_name])
         if ranger_collection_name in max_shards_map:
            config.set('ranger_collection', 'ranger_collection_max_shards_per_node', max_shards_map[ranger_collection_name])
-        config.set('ranger_collection', 'backup_ranger_config_set', 'old_ranger_audits')
+        config.set('ranger_collection', 'backup_ranger_config_set_name', 'old_ranger_audits')
         config.set('ranger_collection', 'backup_ranger_collection_name', 'old_ranger_audits')
         print 'Ranger Solr collection: ' + ranger_collection_name
         ranger_backup_path = None
@@ -362,6 +362,7 @@ def generate_ambari_solr_migration_ini_file(options, accessor, protocol):
   if "ATLAS_SERVER" in installed_components and not options.skip_atlas:
     print "Service detected: " + colors.OKGREEN + "ATLAS" + colors.ENDC
     config.set('atlas_collections', 'enabled', 'true')
+    config.set('atlas_collections', 'config_set', 'atlas_configs')
     config.set('atlas_collections', 'fulltext_index_name', 'fulltext_index')
     config.set('atlas_collections', 'backup_fulltext_index_name', 'old_fulltext_index')
     if 'fulltext_index' in coll_shard_map:
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
index 0328106..d3e2de1 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
@@ -18,15 +18,25 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
+import logging
+import os
 import sys
 import urllib2
 import json
 import base64
 import optparse
+import time
+import traceback
+import ConfigParser
+
+from random import randrange
+from subprocess import Popen, PIPE
 
 HTTP_PROTOCOL = 'http'
 HTTPS_PROTOCOL = 'https'
 
+AMBARI_SUDO = "/var/lib/ambari-agent/ambari-sudo.sh"
+
 SOLR_SERVICE_NAME = 'AMBARI_INFRA_SOLR'
 
 SOLR_COMPONENT_NAME ='INFRA_SOLR'
@@ -37,13 +47,34 @@ GET_HOSTS_COMPONENTS_URL = '/services/{0}/components/{1}?fields=host_components'
 
 REQUESTS_API_URL = '/requests'
 
+LIST_SOLR_COLLECTION_URL = '{0}/admin/collections?action=LIST&wt=json'
+CREATE_SOLR_COLLECTION_URL = '{0}/admin/collections?action=CREATE&name={1}&collection.configName={2}&numShards={3}&replicationFactor={4}&maxShardsPerNode={5}&wt=json'
+DELETE_SOLR_COLLECTION_URL = '{0}/admin/collections?action=DELETE&name={1}&wt=json'
+RELOAD_SOLR_COLLECTION_URL = '{0}/admin/collections?action=RELOAD&name={1}&wt=json'
+
+INFRA_SOLR_CLIENT_BASE_PATH = '/usr/lib/ambari-infra-solr-client/'
+RANGER_NEW_SCHEMA = 'migrate/managed-schema'
+SOLR_CLOUD_CLI_SCRIPT = 'solrCloudCli.sh'
+
+logger = logging.getLogger()
+handler = logging.StreamHandler()
+formatter = logging.Formatter("%(asctime)s - %(message)s")
+handler.setFormatter(formatter)
+logger.addHandler(handler)
+
+class colors:
+  OKGREEN = '\033[92m'
+  WARNING = '\033[93m'
+  FAIL = '\033[91m'
+  ENDC = '\033[0m'
+
 def api_accessor(host, username, password, protocol, port):
   def do_request(api_url, request_type, request_body=''):
     try:
       url = '{0}://{1}:{2}{3}'.format(protocol, host, port, api_url)
-      print 'Execute {0} {1}'.format(request_type, url)
+      logger.debug('Execute {0} {1}'.format(request_type, url))
       if request_body:
-        print 'Request body: {0}'.format(request_body)
+        logger.debug('Request body: {0}'.format(request_body))
       admin_auth = base64.encodestring('%s:%s' % (username, password)).replace('\n', '')
       request = urllib2.Request(url)
       request.add_header('Authorization', 'Basic %s' % admin_auth)
@@ -57,6 +88,94 @@ def api_accessor(host, username, password, protocol, port):
     return response_body
   return do_request
 
+def set_log_level(verbose):
+  if verbose:
+    logger.setLevel(logging.DEBUG)
+  else:
+    logger.setLevel(logging.INFO)
+
+def retry(func, *args, **kwargs):
+  retry_count = kwargs.pop("count", 10)
+  delay = kwargs.pop("delay", 5)
+  context = kwargs.pop("context", "")
+  for r in range(retry_count):
+    try:
+      result = func(*args, **kwargs)
+      if result is not None: return result
+    except Exception as e:
+      logger.error("Error occurred during {0} operation: {1}".format(context, str(traceback.format_exc())))
+    logger.info("\n{0}: waiting for {1} seconds before retyring again (retry count: {2})".format(context, delay, r+1))
+    time.sleep(delay)
+  print '{0} operation {1}FAILED{2}'.format(context, colors.FAIL, colors.ENDC)
+  exit(1)
+
+def create_solr_api_request_command(request_url, config, output=None):
+  user='infra-solr'
+  kerberos_enabled='false'
+  keytab=None
+  principal=None
+  if config.has_section('cluster') and config.has_option('cluster', 'kerberos_enabled'):
+    kerberos_enabled=config.get('cluster', 'kerberos_enabled')
+
+  if config.has_section('infra_solr'):
+    if config.has_option('infra_solr', 'user'):
+      user=config.get('infra_solr', 'user')
+    if kerberos_enabled == 'true':
+      if config.has_option('infra_solr', 'keytab'):
+        keytab=config.get('infra_solr', 'keytab')
+      if config.has_option('infra_solr', 'principal'):
+        principal=config.get('infra_solr', 'principal')
+
+  use_infra_solr_user="sudo -u {0}".format(user)
+  curl_prefix = "curl -k"
+  if output is not None:
+    curl_prefix+=" -o {0}".format(output)
+  api_cmd = '{0} kinit -kt {1} {2} && {3} {4} --negotiate -u : "{5}"'.format(use_infra_solr_user, keytab, principal, use_infra_solr_user, curl_prefix, request_url) \
+    if kerberos_enabled == 'true' else '{0} {1} "{2}"'.format(use_infra_solr_user, curl_prefix, request_url)
+  logger.debug("Solr API command: {0}".format(api_cmd))
+  return api_cmd
+
+def create_infra_solr_client_command(options, config, command):
+  user='infra-solr'
+  kerberos_enabled='false'
+  infra_solr_cli_opts = ''
+  java_home=None
+  jaasOption=None
+  zkConnectString=None
+  if config.has_section('cluster') and config.has_option('cluster', 'kerberos_enabled'):
+    kerberos_enabled=config.get('cluster', 'kerberos_enabled')
+  if config.has_section('infra_solr'):
+    if config.has_option('infra_solr', 'user'):
+      user=config.get('infra_solr', 'user')
+    if config.has_option('infra_solr', 'external_zk_connect_string'):
+      zkConnectString=config.get('infra_solr', 'external_zk_connect_string')
+    elif config.has_option('infra_solr', 'zk_connect_string'):
+      zkConnectString=config.get('infra_solr', 'zk_connect_string')
+  if kerberos_enabled == 'true':
+    zk_principal_user = config.get('infra_solr', 'zk_principal_user') if config.has_option('infra_solr', 'zk_principal_user') else 'zookeeper'
+    infra_solr_cli_opts= '-Dzookeeper.sasl.client=true -Dzookeeper.sasl.client.username={0} -Dzookeeper.sasl.clientconfig=Client'.format(zk_principal_user)
+    jaasOption=" --jaas-file /etc/ambari-infra-solr/conf/infra_solr_jaas.conf"
+    command+=jaasOption
+  if config.has_section('local') and config.has_option('local', 'java_home'):
+    java_home=config.get('local', 'java_home')
+  if not java_home:
+    raise Exception("'local' section or 'java_home' is missing (or empty) from the configuration")
+  if not zkConnectString:
+    raise Exception("'zk_connect_string' section or 'external_zk_connect_string' is missing (or empty) from the configuration")
+  set_java_home_= 'JAVA_HOME={0}'.format(java_home)
+  set_infra_solr_cli_opts = ' INFRA_SOLR_CLI_OPTS="{0}"'.format(infra_solr_cli_opts) if infra_solr_cli_opts != '' else ''
+  solr_cli_cmd = '{0} {1}{2} /usr/lib/ambari-infra-solr-client/solrCloudCli.sh --zookeeper-connect-string {3} {4}'\
+    .format(AMBARI_SUDO, set_java_home_, set_infra_solr_cli_opts, zkConnectString, command)
+
+  return solr_cli_cmd
+
+def get_random_solr_url(solr_urls): # TODO: use Solr host filter
+  splitted_solr_urls = solr_urls.split(',')
+  random_index = randrange(0, len(splitted_solr_urls))
+  result = splitted_solr_urls[random_index]
+  logger.debug("Use {0} solr address for next request.".format(result))
+  return result
+
 def format_json(dictionary, tab_level=0):
   output = ''
   tab = ' ' * 2 * tab_level
@@ -76,17 +195,17 @@ def read_json(json_file):
 
 def get_json(accessor, url):
   response = accessor(url, 'GET')
-  print 'GET ' + url + ' response: '
-  print '----------------------------'
-  print response
+  logger.debug('GET ' + url + ' response: ')
+  logger.debug('----------------------------')
+  logger.debug(str(response))
   json_resp = json.loads(response)
   return json_resp
 
 def post_json(accessor, url, request_body):
   response = accessor(url, 'POST', json.dumps(request_body))
-  print 'POST ' + url + ' response: '
-  print '----------------------------'
-  print response
+  logger.debug('POST ' + url + ' response: ')
+  logger.debug( '----------------------------')
+  logger.debug(str(response))
   json_resp = json.loads(response)
   return json_resp
 
@@ -122,20 +241,19 @@ def create_command_request(command, parameters, hosts, cluster, context):
   request["Requests/resource_filters"] = resource_filters
   return request
 
-def fill_parameters(options):
+def fill_parameters(options, collection, index_location, shards=None):
   params = {}
-  if options.collection:
-    params['solr_collection'] = options.collection
-  if options.index_location:
-    params['solr_index_location'] = options.index_location
-  if options.backup_name:
-    params['solr_backup_name'] = options.backup_name
+  if collection:
+    params['solr_collection'] = collection
+    params['solr_backup_name'] = collection
+  if index_location:
+    params['solr_index_location'] = index_location
   if options.index_version:
     params['solr_index_version'] = options.index_version
   if options.force:
     params['solr_index_upgrade_force'] = options.force
   if options.async:
-    params['solr_request_async'] = options.async
+    params['solr_request_async'] = options.request_async
   if options.request_tries:
     params['solr_request_tries'] = options.request_tries
   if options.request_time_interval:
@@ -146,9 +264,9 @@ def fill_parameters(options):
     params['solr_core_filter'] = options.core_filter
   if options.core_filter:
     params['solr_skip_cores'] = options.skip_cores
-  if options.solr_shards:
-    params['solr_shards'] = options.solr_shards
-  if options.solr_hdfs_path:
+  if shards:
+    params['solr_shards'] = shards
+  if options.solr_hdfs_path: # TODO get from ini file + had=ndle shared_fs value
     params['solr_hdfs_path'] = options.solr_hdfs_path
   if options.solr_keep_backup:
     params['solr_keep_backup'] = True
@@ -156,7 +274,7 @@ def fill_parameters(options):
     params['solr_skip_generate_restore_host_cores'] = True
   return params
 
-def validte_common_options(options, parser):
+def validte_common_options(options, parser, config):
   if not options.index_location:
     parser.print_help()
     print 'index-location option is required'
@@ -167,99 +285,651 @@ def validte_common_options(options, parser):
     print 'collection option is required'
     sys.exit(1)
 
-def get_solr_hosts(options, accessor):
+def get_solr_hosts(options, accessor, cluster):
   if options.solr_hosts:
     component_hosts = options.solr_hosts.split(",")
   else:
-    host_components_json = get_json(accessor, CLUSTERS_URL.format(options.cluster) + GET_HOSTS_COMPONENTS_URL.format(SOLR_SERVICE_NAME, SOLR_COMPONENT_NAME))
+    host_components_json = get_json(accessor, CLUSTERS_URL.format(cluster) + GET_HOSTS_COMPONENTS_URL.format(SOLR_SERVICE_NAME, SOLR_COMPONENT_NAME))
     component_hosts = get_component_hosts(host_components_json)
   return component_hosts
 
-def restore(options, accessor, parser):
+def restore(options, accessor, parser, config, collection, index_location, shards):
   """
   Send restore solr collection custom command request to ambari-server
   """
-  validte_common_options(options, parser)
-  if not options.backup_name:
-    parser.print_help()
-    print 'backup-name option is required'
-    sys.exit(1)
-  component_hosts = get_solr_hosts(options, accessor)
-  parameters = fill_parameters(options)
+  cluster = config.get('ambari_server', 'cluster')
 
-  cmd_request = create_command_request("RESTORE", parameters, component_hosts, options.cluster, 'Restore Solr Collection: ' + options.collection)
-  post_json(accessor, CLUSTERS_URL.format(options.cluster) + REQUESTS_API_URL, cmd_request)
+  component_hosts = get_solr_hosts(options, accessor, cluster)
+  parameters = fill_parameters(options, collection, index_location, shards)
 
-def migrate(options, accessor, parser):
+  cmd_request = create_command_request("RESTORE", parameters, component_hosts, cluster, 'Restore Solr Collection: ' + collection)
+  return post_json(accessor, CLUSTERS_URL.format(cluster) + REQUESTS_API_URL, cmd_request)
+
+def migrate(options, accessor, parser, config, collection, index_location):
   """
   Send migrate lucene index custom command request to ambari-server
   """
-  validte_common_options(options, parser)
-  component_hosts = get_solr_hosts(options, accessor)
-  parameters = fill_parameters(options)
+  cluster = config.get('ambari_server', 'cluster')
+
+  component_hosts = get_solr_hosts(options, accessor, cluster)
+  parameters = fill_parameters(options, collection, index_location)
 
-  cmd_request = create_command_request("MIGRATE", parameters, component_hosts, options.cluster, 'Migrating Solr Collection: ' + options.collection)
-  post_json(accessor, CLUSTERS_URL.format(options.cluster) + REQUESTS_API_URL, cmd_request)
+  cmd_request = create_command_request("MIGRATE", parameters, component_hosts, cluster, 'Migrating Solr Collection: ' + collection)
+  return post_json(accessor, CLUSTERS_URL.format(cluster) + REQUESTS_API_URL, cmd_request)
 
-def backup(options, accessor, parser):
+def backup(options, accessor, parser, config, collection, index_location):
   """
   Send backup solr collection custom command request to ambari-server
   """
-  validte_common_options(options, parser)
-  if not options.backup_name:
-    parser.print_help()
-    print 'backup-name option is required'
+  cluster = config.get('ambari_server', 'cluster')
+
+  component_hosts = get_solr_hosts(options, accessor, cluster)
+  parameters = fill_parameters(options, collection, index_location)
+
+  cmd_request = create_command_request("BACKUP", parameters, component_hosts, cluster, 'Backup Solr Collection: ' + collection)
+  return post_json(accessor, CLUSTERS_URL.format(cluster) + REQUESTS_API_URL, cmd_request)
+
+def monitor_request(options, accessor, cluster, request_id, context):
+  while True:
+    request_response=get_json(accessor, "/api/v1/clusters/{0}{1}/{2}".format(cluster, REQUESTS_API_URL, request_id))
+    if 'Requests' in request_response and 'request_status' in request_response['Requests']:
+      request_status = request_response['Requests']['request_status']
+      logger.debug("\nMonitoring '{0}' request (id: '{1}') status is {2}".format(context, request_id, request_status))
+      if request_status in ['FAILED', 'TIMEDOUT', 'ABORTED', 'COMPLETED', 'SKIPPED_FAILED']:
+        if request_status == 'COMPLETED':
+          print "\nRequest (id: {0}) {1}COMPLETED{2}".format(request_id, colors.OKGREEN, colors.ENDC)
+          time.sleep(4)
+        else:
+          print "\nRequest (id: {0}) {1}FAILED{2} (checkout Ambari UI about the failed tasks)\n".format(request_id, colors.FAIL, colors.ENDC)
+          sys.exit(1)
+        break
+      else:
+        if not options.verbose:
+          sys.stdout.write(".")
+          sys.stdout.flush()
+        logger.debug("Sleep 5 seconds ...")
+        time.sleep(5)
+    else:
+      print "'Requests' or 'request_status' cannot be found in JSON response: {0}".format(request_response)
+      sys.exit(1)
+
+def get_request_id(json_response):
+  if "Requests" in json_response:
+    if "id" in json_response['Requests']:
+      return json_response['Requests']['id']
+  raise Exception("Cannot access request id from Ambari response: {0}".format(json_response))
+
+def filter_collection(options, collections):
+  if options.collection is not None:
+    filtered_collections = []
+    if options.collection in collections:
+      filtered_collections.append(options.collection)
+    return filtered_collections
+  else:
+    return collections
+
+def get_solr_urls(config):
+  solr_urls = None
+  if config.has_section('infra_solr') and config.has_option('infra_solr', 'urls'):
+    return config.get('infra_solr', 'urls')
+  return solr_urls
+
+def delete_collection(options, config, collection, solr_urls):
+  request = DELETE_SOLR_COLLECTION_URL.format(get_random_solr_url(solr_urls), collection)
+  logger.debug("Solr request: {0}".format(request))
+  delete_collection_json_cmd=create_solr_api_request_command(request, config)
+  process = Popen(delete_collection_json_cmd, stdout=PIPE, stderr=PIPE, shell=True)
+  out, err = process.communicate()
+  if process.returncode != 0:
+    raise Exception("{0} command failed: {1}".format(delete_collection_json_cmd, str(err)))
+  response=json.loads(str(out))
+  if 'success' in response:
+    print 'Deleting collection {0} was {1}SUCCESSFUL{2}'.format(collection, colors.OKGREEN, colors.ENDC)
+    return collection
+  else:
+    raise Exception("DELETE collection ('{0}') failed. Response: {1}".format(collection, str(out)))
+
+def list_collections(options, config, solr_urls):
+  request = LIST_SOLR_COLLECTION_URL.format(get_random_solr_url(solr_urls))
+  logger.debug("Solr request: {0}".format(request))
+  list_collection_json_cmd=create_solr_api_request_command(request, config)
+  process = Popen(list_collection_json_cmd, stdout=PIPE, stderr=PIPE, shell=True)
+  out, err = process.communicate()
+  if process.returncode != 0:
+    raise Exception("{0} command failed: {1}".format(list_collection_json_cmd, str(err)))
+  response=json.loads(str(out))
+  if 'collections' in response:
+    return response['collections']
+  else:
+    raise Exception("LIST collections failed ({0}). Response: {1}".format(request, str(out)))
+
+def create_collection(options, config, solr_urls, collection, config_set, shards, replica, max_shards_per_node):
+  request = CREATE_SOLR_COLLECTION_URL.format(get_random_solr_url(solr_urls), collection, config_set, shards, replica, max_shards_per_node)
+  logger.debug("Solr request: {0}".format(request))
+  create_collection_json_cmd=create_solr_api_request_command(request, config)
+  process = Popen(create_collection_json_cmd, stdout=PIPE, stderr=PIPE, shell=True)
+  out, err = process.communicate()
+  if process.returncode != 0:
+    raise Exception("{0} command failed: {1}".format(create_collection_json_cmd, str(err)))
+  response=json.loads(str(out))
+  if 'success' in response:
+    print 'Creating collection {0} was {1}SUCCESSFUL{2}'.format(collection, colors.OKGREEN, colors.ENDC)
+    return collection
+  else:
+    raise Exception("CREATE collection ('{0}') failed. ({1}) Response: {1}".format(collection, str(out)))
+
+def reload_collection(options, config, solr_urls, collection):
+  request = RELOAD_SOLR_COLLECTION_URL.format(get_random_solr_url(solr_urls), collection)
+  logger.debug("Solr request: {0}".format(request))
+  reload_collection_json_cmd=create_solr_api_request_command(request, config)
+  process = Popen(reload_collection_json_cmd, stdout=PIPE, stderr=PIPE, shell=True)
+  out, err = process.communicate()
+  if process.returncode != 0:
+    raise Exception("{0} command failed: {1}".format(reload_collection_json_cmd, str(err)))
+  response=json.loads(str(out))
+  if 'success' in response:
+    print 'Reloading collection {0} was {1}SUCCESSFUL{2}'.format(collection, colors.OKGREEN, colors.ENDC)
+    return collection
+  else:
+    raise Exception("RELOAD collection ('{0}') failed. ({1}) Response: {1}".format(collection, str(out)))
+
+def delete_znode(options, config, znode):
+  solr_cli_command=create_infra_solr_client_command(options, config, '--delete-znode --znode {0}'.format(znode))
+  logger.debug("Solr cli command: {0}".format(solr_cli_command))
+  sys.stdout.write('Deleting znode {0} ... '.format(znode))
+  sys.stdout.flush()
+  process = Popen(solr_cli_command, stdout=PIPE, stderr=PIPE, shell=True)
+  out, err = process.communicate()
+  if process.returncode != 0:
+    sys.stdout.write(colors.FAIL + 'FAILED\n' + colors.ENDC)
+    sys.stdout.flush()
+    raise Exception("{0} command failed: {1}".format(solr_cli_command, str(err)))
+  sys.stdout.write(colors.OKGREEN + 'DONE\n' + colors.ENDC)
+  sys.stdout.flush()
+  logger.debug(str(out))
+
+def copy_znode(options, config, copy_src, copy_dest, copy_from_local=False, copy_to_local=False):
+  solr_cli_command=create_infra_solr_client_command(options, config, '--transfer-znode --copy-src {0} --copy-dest {1}'.format(copy_src, copy_dest))
+  if copy_from_local:
+    solr_cli_command+=" --transfer-mode copyFromLocal"
+  elif copy_to_local:
+    solr_cli_command+=" --transfer-mode copyToLocal"
+  logger.debug("Solr cli command: {0}".format(solr_cli_command))
+  sys.stdout.write('Transferring data from {0} to {1} ... '.format(copy_src, copy_dest))
+  sys.stdout.flush()
+  process = Popen(solr_cli_command, stdout=PIPE, stderr=PIPE, shell=True)
+  out, err = process.communicate()
+  if process.returncode != 0:
+    sys.stdout.write(colors.FAIL + 'FAILED\n' + colors.ENDC)
+    sys.stdout.flush()
+    raise Exception("{0} command failed: {1}".format(solr_cli_command, str(err)))
+  sys.stdout.write(colors.OKGREEN + 'DONE\n' + colors.ENDC)
+  sys.stdout.flush()
+  logger.debug(str(out))
+
+def delete_logsearch_collections(options, config, solr_urls, collections):
+  if config.has_section('logsearch_collections'):
+    if config.has_option('logsearch_collections', 'enabled') and config.get('logsearch_collections', 'enabled') == 'true':
+      service_logs_collection = config.get('logsearch_collections', 'hadoop_logs_collection_name')
+      audit_logs_collection = config.get('logsearch_collections', 'audit_logs_collection_name')
+      history_collection = config.get('logsearch_collections', 'history_collection_name')
+      if service_logs_collection in collections:
+        retry(delete_collection, options, config, service_logs_collection, solr_urls, context='[Delete {0} collection]'.format(service_logs_collection))
+      else:
+        print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(service_logs_collection)
+      if audit_logs_collection in collections:
+        retry(delete_collection, options, config, audit_logs_collection, solr_urls, context='[Delete {0} collection]'.format(audit_logs_collection))
+      else:
+        print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(audit_logs_collection)
+      if history_collection in collections:
+        retry(delete_collection, options, config, history_collection, solr_urls, context='[Delete {0} collection]'.format(history_collection))
+      else:
+        print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(history_collection)
+
+def delete_atlas_collections(options, config, solr_urls, collections):
+  if config.has_section('atlas_collections'):
+    if config.has_option('atlas_collections', 'enabled') and config.get('atlas_collections', 'enabled') == 'true':
+      fulltext_collection = config.get('atlas_collections', 'fulltext_index_name')
+      edge_index_collection = config.get('atlas_collections', 'edge_index_name')
+      vertex_index_collection = config.get('atlas_collections', 'vertex_index_name')
+      if fulltext_collection in collections:
+        retry(delete_collection, options, config, fulltext_collection, solr_urls, context='[Delete {0} collection]'.format(fulltext_collection))
+      else:
+        print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(fulltext_collection)
+      if edge_index_collection in collections:
+        retry(delete_collection, options, config, edge_index_collection, solr_urls, context='[Delete {0} collection]'.format(edge_index_collection))
+      else:
+        print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(edge_index_collection)
+      if vertex_index_collection in collections:
+        retry(delete_collection, options, config, vertex_index_collection, solr_urls, context='[Delete {0} collection]'.format(vertex_index_collection))
+      else:
+        print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(vertex_index_collection)
+
+def delete_ranger_collection(options, config, solr_urls, collections):
+  if config.has_section('ranger_collection'):
+    if config.has_option('ranger_collection', 'enabled') and config.get('ranger_collection', 'enabled') == 'true':
+      ranger_collection_name = config.get('ranger_collection', 'ranger_collection_name')
+      if ranger_collection_name in collections:
+        retry(delete_collection, options, config, ranger_collection_name, solr_urls, context='[Delete {0} collection]'.format(ranger_collection_name))
+      else:
+        print 'Collection {0} does not exist or filtered out. Skipping delete operation'.format(ranger_collection_name)
+
+def delete_collections(options, config, service_filter):
+  solr_urls = get_solr_urls(config)
+  collections=retry(list_collections, options, config, solr_urls, context="[List Solr Collections]")
+  collections=filter_collection(options, collections)
+  if 'RANGER' in service_filter:
+    delete_ranger_collection(options, config, solr_urls, collections)
+  if 'ATLAS' in service_filter:
+    delete_atlas_collections(options, config, solr_urls, collections)
+  if 'LOGSEARCH' in service_filter:
+    delete_logsearch_collections(options, config, solr_urls, collections)
+
+def upgrade_ranger_schema(options, config, service_filter):
+  solr_znode='/infra-solr'
+  if 'RANGER' in service_filter and config.has_option('ranger_collection', 'enabled') \
+    and config.get('ranger_collection', 'enabled') == 'true':
+    if config.has_section('infra_solr') and config.has_option('infra_solr', 'znode'):
+      solr_znode=config.get('infra_solr', 'znode')
+    ranger_config_set_name = config.get('ranger_collection', 'ranger_config_set_name')
+    copy_znode(options, config, "{0}{1}".format(INFRA_SOLR_CLIENT_BASE_PATH, RANGER_NEW_SCHEMA),
+               "{0}/configs/{1}/managed-schema".format(solr_znode, ranger_config_set_name), copy_from_local=True)
+
+def backup_ranger_configs(options, config, service_filter):
+  solr_znode='/infra-solr'
+  if 'RANGER' in service_filter and config.has_option('ranger_collection', 'enabled') \
+    and config.get('ranger_collection', 'enabled') == 'true':
+    if config.has_section('infra_solr') and config.has_option('infra_solr', 'znode'):
+      solr_znode=config.get('infra_solr', 'znode')
+    ranger_config_set_name = config.get('ranger_collection', 'ranger_config_set_name')
+    backup_ranger_config_set_name = config.get('ranger_collection', 'backup_ranger_config_set_name')
+    copy_znode(options, config, "{0}/configs/{1}".format(solr_znode, ranger_config_set_name),
+               "{0}/configs/{1}".format(solr_znode, backup_ranger_config_set_name))
+
+def upgrade_ranger_solrconfig_xml(options, config, service_filter):
+  solr_znode='/infra-solr'
+  if 'RANGER' in service_filter and config.has_option('ranger_collection', 'enabled') \
+    and config.get('ranger_collection', 'enabled') == 'true':
+    if config.has_section('infra_solr') and config.has_option('infra_solr', 'znode'):
+      solr_znode=config.get('infra_solr', 'znode')
+    ranger_config_set_name = config.get('ranger_collection', 'ranger_config_set_name')
+    backup_ranger_config_set_name = config.get('ranger_collection', 'backup_ranger_config_set_name')
+    copy_znode(options, config, "{0}/configs/solrconfig.xml".format(solr_znode, ranger_config_set_name),
+               "{0}/configs/solrconfig.xml".format(solr_znode, backup_ranger_config_set_name))
+
+def delete_znodes(options, config, service_filter):
+  solr_znode='/infra-solr'
+  if 'LOGSEARCH' in service_filter and config.has_option('logsearch_collections', 'enabled') \
+    and config.get('logsearch_collections', 'enabled') == 'true':
+    if config.has_section('infra_solr') and config.has_option('infra_solr', 'znode'):
+      solr_znode=config.get('infra_solr', 'znode')
+    delete_znode(options, config, "{0}/configs/hadoop_logs".format(solr_znode))
+    delete_znode(options, config, "{0}/configs/audit_logs".format(solr_znode))
+    delete_znode(options, config, "{0}/configs/history".format(solr_znode))
+
+def do_backup_request(options, accessor, parser, config, collection, index_location):
+  sys.stdout.write("Sending backup collection request ('{0}') to Ambari to process (backup destination: '{1}')..."
+                   .format(collection, index_location))
+  sys.stdout.flush()
+  response = backup(options, accessor, parser, config, collection, index_location)
+  request_id = get_request_id(response)
+  sys.stdout.write(colors.OKGREEN + 'DONE\n' + colors.ENDC)
+  sys.stdout.flush()
+  print 'Backup command request id: {0}'.format(request_id)
+  if options.async:
+    print "Backup {0} collection request sent to Ambari server. Check Ambari UI about the results.".format(collection)
+    sys.exit(0)
+  else:
+    sys.stdout.write("Start monitoring Ambari request with id {0} ...".format(request_id))
+    sys.stdout.flush()
+    cluster = config.get('ambari_server', 'cluster')
+    monitor_request(options, accessor, cluster, request_id, 'Backup Solr collection: ' + collection)
+    print "Backup collection '{0}'... {1}DONE{2}".format(collection, colors.OKGREEN, colors.ENDC)
+
+def do_migrate_request(options, accessor, parser, config, collection, index_location):
+  sys.stdout.write("Sending migrate collection request ('{0}') to Ambari to process (migrate folder: '{1}')..."
+                   .format(collection, index_location))
+  sys.stdout.flush()
+  response = migrate(options, accessor, parser, config, collection, index_location)
+  request_id = get_request_id(response)
+  sys.stdout.write(colors.OKGREEN + 'DONE\n' + colors.ENDC)
+  sys.stdout.flush()
+  print 'Migrate command request id: {0}'.format(request_id)
+  if options.async:
+    print "Migrate {0} collection index request sent to Ambari server. Check Ambari UI about the results.".format(collection)
+    sys.exit(0)
+  else:
+    sys.stdout.write("Start monitoring Ambari request with id {0} ...".format(request_id))
+    sys.stdout.flush()
+    cluster = config.get('ambari_server', 'cluster')
+    monitor_request(options, accessor, cluster, request_id, 'Migrate Solr collection index: ' + collection)
+    print "Migrate index '{0}'... {1}DONE{2}".format(collection, colors.OKGREEN, colors.ENDC)
+
+def do_restore_request(options, accessor, parser, config, collection, index_location, shards):
+  sys.stdout.write("Sending restore collection request ('{0}') to Ambari to process (backup location: '{1}')..."
+                   .format(collection, index_location))
+  sys.stdout.flush()
+  response = restore(options, accessor, parser, config, collection, index_location, shards)
+  request_id = get_request_id(response)
+  sys.stdout.write(colors.OKGREEN + 'DONE\n' + colors.ENDC)
+  sys.stdout.flush()
+  print 'Restore command request id: {0}'.format(request_id)
+  if options.async:
+    print "Restore {0} collection request sent to Ambari server. Check Ambari UI about the results.".format(collection)
+    sys.exit(0)
+  else:
+    sys.stdout.write("Start monitoring Ambari request with id {0} ...".format(request_id))
+    sys.stdout.flush()
+    cluster = config.get('ambari_server', 'cluster')
+    monitor_request(options, accessor, cluster, request_id, 'Restore Solr collection: ' + collection)
+    print "Restoring collection '{0}'... {1}DONE{2}".format(collection, colors.OKGREEN, colors.ENDC)
+
+def get_ranger_index_location(collection, config, options):
+  ranger_index_location = None
+  if options.index_location:
+    ranger_index_location = os.path.join(options.index_location, "ranger")
+  elif options.ranger_index_location:
+    ranger_index_location = options.ranger_index_location
+  elif config.has_option('ranger_collection', 'backup_path'):
+    ranger_index_location = config.get('ranger_collection', 'backup_path')
+  else:
+    print "'backup_path'is missing from config file and --index-location or --ranger-index-location options are missing as well. Backup collection {0} {1}FAILED{2}." \
+      .format(collection, colors.FAIL, colors.ENDC)
+    sys.exit(1)
+  return ranger_index_location
+
+def get_atlas_index_location(collection, config, options):
+  atlas_index_location = None
+  if options.index_location:
+    atlas_index_location = os.path.join(options.index_location, "atlas", collection)
+  elif options.ranger_index_location:
+    atlas_index_location = os.path.join(options.atlas_index_location, collection)
+  elif config.has_option('atlas_collections', 'backup_path'):
+    atlas_index_location = os.path.join(config.get('atlas_collections', 'backup_path'), collection)
+  else:
+    print "'backup_path'is missing from config file and --index-location or --atlas-index-location options are missing as well. Backup collection {0} {1}FAILED{2}." \
+      .format(collection, colors.FAIL, colors.ENDC)
     sys.exit(1)
-  component_hosts = get_solr_hosts(options, accessor)
-  parameters = fill_parameters(options)
+  return atlas_index_location
+
+def backup_collections(options, accessor, parser, config, service_filter):
+  solr_urls = get_solr_urls(config)
+  collections=retry(list_collections, options, config, solr_urls, context="[List Solr Collections]")
+  collections=filter_collection(options, collections)
+  if 'RANGER' in service_filter and config.has_section('ranger_collection') and config.has_option('ranger_collection', 'enabled') \
+    and config.get('ranger_collection', 'enabled') == 'true':
+    collection_name = config.get('ranger_collection', 'ranger_collection_name')
+    if collection_name in collections:
+      ranger_index_location=get_ranger_index_location(collection_name, config, options)
+      do_backup_request(options, accessor, parser, config, collection_name, ranger_index_location)
+    else:
+      print 'Collection {0} does not exist or filtered out. Skipping backup operation.'.format(collection_name)
+  if 'ATLAS' in service_filter and config.has_section('atlas_collections') \
+    and config.has_option('atlas_collections', 'enabled') and config.get('atlas_collections', 'enabled') == 'true':
+    fulltext_index_collection = config.get('atlas_collections', 'fulltext_index_name')
+    if fulltext_index_collection in collections:
+      fulltext_index_location = get_atlas_index_location(fulltext_index_collection, config, options)
+      do_backup_request(options, accessor, parser, config, fulltext_index_collection, fulltext_index_location)
+    else:
+      print 'Collection {0} does not exist or filtered out. Skipping backup operation.'.format(fulltext_index_collection)
+    vertex_index_collection = config.get('atlas_collections', 'vertex_index_name')
+    if vertex_index_collection in collections:
+      vertex_index_location = get_atlas_index_location(vertex_index_collection, config, options)
+      do_backup_request(options, accessor, parser, config, vertex_index_collection, vertex_index_location)
+    else:
+      print 'Collection {0} does not exist or filtered out. Skipping backup operation.'.format(vertex_index_collection)
+    edge_index_collection = config.get('atlas_collections', 'edge_index_name')
+    if edge_index_collection in collections:
+      edge_index_location = get_atlas_index_location(vertex_index_collection, config, options)
+      do_backup_request(options, accessor, parser, config, edge_index_collection, edge_index_location)
+    else:
+      print 'Collection {0} does not exist or filtered out. Skipping backup operation.'.format(edge_index_collection)
+
+def migrate_snapshots(options, accessor, parser, config, service_filter):
+  if 'RANGER' in service_filter and config.has_section('ranger_collection') and config.has_option('ranger_collection', 'enabled') \
+    and config.get('ranger_collection', 'enabled') == 'true':
+    collection_name = config.get('ranger_collection', 'ranger_collection_name')
+    if options.collection is None or options.collection == collection_name:
+      ranger_index_location=get_ranger_index_location(collection_name, config, options)
+      do_migrate_request(options, accessor, parser, config, collection_name, ranger_index_location)
+    else:
+      print "Collection ('{0}') backup index has filtered out. Skipping migrate operation.".format(collection_name)
+  if 'ATLAS' in service_filter and config.has_section('atlas_collections') \
+    and config.has_option('atlas_collections', 'enabled') and config.get('atlas_collections', 'enabled') == 'true':
+    fulltext_index_collection = config.get('atlas_collections', 'fulltext_index_name')
+    if options.collection is None or options.collection == fulltext_index_collection:
+      fulltext_index_location=get_atlas_index_location(fulltext_index_collection, config, options)
+      do_migrate_request(options, accessor, parser, config, fulltext_index_collection, fulltext_index_location)
+    else:
+      print "Collection ('{0}') backup index has filtered out. Skipping migrate operation.".format(fulltext_index_collection)
+    vertex_index_collection = config.get('atlas_collections', 'vertex_index_name')
+    if options.collection is None or options.collection == vertex_index_collection:
+      vertex_index_location=get_atlas_index_location(vertex_index_collection, config, options)
+      do_migrate_request(options, accessor, parser, config, fulltext_index_collection, vertex_index_location)
+    else:
+      print "Collection ('{0}') backup index has filtered out. Skipping migrate operation.".format(vertex_index_collection)
+    edge_index_collection = config.get('atlas_collections', 'edge_index_name')
+    if options.collection is None or options.collection == edge_index_collection:
+      edge_index_location=get_atlas_index_location(edge_index_collection, config, options)
+      do_migrate_request(options, accessor, parser, config, edge_index_collection, edge_index_location)
+    else:
+      print "Collection ('{0}') backup index has filtered out. Skipping migrate operation.".format(edge_index_collection)
+
+def create_backup_collections(options, accessor, parser, config, service_filter):
+  solr_urls = get_solr_urls(config)
+  collections=retry(list_collections, options, config, solr_urls, context="[List Solr Collections]")
+  replica_number = "1" # hard coded
+  if 'RANGER' in service_filter and config.has_section('ranger_collection') and config.has_option('ranger_collection', 'enabled') \
+    and config.get('ranger_collection', 'enabled') == 'true':
+    backup_ranger_collection = config.get('ranger_collection', 'backup_ranger_collection_name')
+    if backup_ranger_collection not in collections:
+      if options.collection is not None and options.collection != backup_ranger_collection:
+        print "Collection {0} has filtered out. Skipping create operation.".format(backup_ranger_collection)
+      else:
+        backup_ranger_config_set = config.get('ranger_collection', 'backup_ranger_config_set_name')
+        backup_ranger_shards = config.get('ranger_collection', 'ranger_collection_shards')
+        backup_ranger_max_shards = config.get('ranger_collection', 'ranger_collection_max_shards_per_node')
+        retry(create_collection, options, config, solr_urls, backup_ranger_collection, backup_ranger_config_set,
+                        backup_ranger_shards, replica_number, backup_ranger_max_shards, context="[Create Solr Collections]")
+    else:
+      print "Collection {0} has already exist. Skipping create operation.".format(backup_ranger_collection)
+  if 'ATLAS' in service_filter and config.has_section('atlas_collections') \
+    and config.has_option('atlas_collections', 'enabled') and config.get('atlas_collections', 'enabled') == 'true':
+    backup_atlas_config_set = config.get('atlas_collections', 'config_set')
+    backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name')
+    if backup_fulltext_index_name not in collections:
+      if options.collection is not None and options.collection != backup_fulltext_index_name:
+        print "Collection {0} has filtered out. Skipping create operation.".format(backup_fulltext_index_name)
+      else:
+        backup_fulltext_index_shards = config.get('atlas_collections', 'fulltext_index_shards')
+        backup_fulltext_index_max_shards = config.get('atlas_collections', 'fulltext_index_max_shards_per_node')
+        retry(create_collection, options, config, solr_urls, backup_fulltext_index_name, backup_atlas_config_set,
+                        backup_fulltext_index_shards, replica_number, backup_fulltext_index_max_shards, context="[Create Solr Collections]")
+    else:
+      print "Collection {0} has already exist. Skipping create operation.".format(backup_fulltext_index_name)
+    backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name')
+    if backup_edge_index_name not in collections:
+      if options.collection is not None and options.collection != backup_edge_index_name:
+        print "Collection {0} has filtered out. Skipping create operation.".format(backup_edge_index_name)
+      else:
+        backup_edge_index_shards = config.get('atlas_collections', 'edge_index_shards')
+        backup_edge_index_max_shards = config.get('atlas_collections', 'edge_index_max_shards_per_node')
+        retry(create_collection, options, config, solr_urls, backup_edge_index_name, backup_atlas_config_set,
+                        backup_edge_index_shards, replica_number, backup_edge_index_max_shards, context="[Create Solr Collections]")
+    else:
+      print "Collection {0} has already exist. Skipping create operation.".format(backup_edge_index_name)
+    backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name')
+    if backup_vertex_index_name not in collections:
+      if options.collection is not None and options.collection != backup_vertex_index_name:
+        print "Collection {0} has filtered out. Skipping create operation.".format(backup_vertex_index_name)
+      else:
+        backup_vertex_index_shards = config.get('atlas_collections', 'vertex_index_shards')
+        backup_vertex_index_max_shards = config.get('atlas_collections', 'vertex_index_max_shards_per_node')
+        retry(create_collection, options, config, solr_urls, backup_vertex_index_name, backup_atlas_config_set,
+                        backup_vertex_index_shards, replica_number, backup_vertex_index_max_shards, context="[Create Solr Collections]")
+    else:
+      print "Collection {0} has already exist. Skipping create operation.".format(backup_fulltext_index_name)
+
+def restore_collections(options, accessor, parser, config, service_filter):
+  solr_urls = get_solr_urls(config)
+  collections=retry(list_collections, options, config, solr_urls, context="[List Solr Collections]")
+  collections=filter_collection(options, collections)
+  if 'RANGER' in service_filter and config.has_section('ranger_collection') and config.has_option('ranger_collection', 'enabled') \
+    and config.get('ranger_collection', 'enabled') == 'true':
+    collection_name = config.get('ranger_collection', 'ranger_collection_name')
+    backup_ranger_collection = config.get('ranger_collection', 'backup_ranger_collection_name')
+    if backup_ranger_collection in collections:
+      backup_ranger_shards = config.get('ranger_collection', 'ranger_collection_shards')
+      ranger_index_location=get_ranger_index_location(collection_name, config, options)
+      do_restore_request(options, accessor, parser, config, backup_ranger_collection, ranger_index_location, backup_ranger_shards)
+    else:
+      print "Collection ('{0}') does not exist or filtered out. Skipping restore operation.".format(backup_ranger_collection)
+
+  if 'ATLAS' in service_filter and config.has_section('atlas_collections') \
+    and config.has_option('atlas_collections', 'enabled') and config.get('atlas_collections', 'enabled') == 'true':
+
+    fulltext_index_collection = config.get('atlas_collections', 'fulltext_index_name')
+    backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name')
+    if backup_fulltext_index_name in collections:
+      backup_fulltext_index_shards = config.get('atlas_collections', 'fulltext_index_shards')
+      fulltext_index_location=get_atlas_index_location(fulltext_index_collection, config, options)
+      do_restore_request(options, accessor, parser, config, backup_fulltext_index_name, fulltext_index_location, backup_fulltext_index_shards)
+    else:
+      print "Collection ('{0}') does not exist or filtered out. Skipping restore operation.".format(fulltext_index_collection)
+
+    edge_index_collection = config.get('atlas_collections', 'edge_index_name')
+    backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name')
+    if backup_edge_index_name in collections:
+      backup_edge_index_shards = config.get('atlas_collections', 'edge_index_shards')
+      edge_index_location=get_atlas_index_location(edge_index_collection, config, options)
+      do_restore_request(options, accessor, parser, config, backup_edge_index_name, edge_index_location, backup_edge_index_shards)
+    else:
+      print "Collection ('{0}') does not exist or filtered out. Skipping restore operation.".format(edge_index_collection)
+
+    vertex_index_collection = config.get('atlas_collections', 'vertex_index_name')
+    backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name')
+    if backup_vertex_index_name in collections:
+      backup_vertex_index_shards = config.get('atlas_collections', 'vertex_index_shards')
+      vertex_index_location=get_atlas_index_location(vertex_index_collection, config, options)
+      do_restore_request(options, accessor, parser, config, backup_vertex_index_name, vertex_index_location, backup_vertex_index_shards)
+    else:
+      print "Collection ('{0}') does not exist or filtered out. Skipping restore operation.".format(vertex_index_collection)
+
+def reload_collections(options, accessor, parser, config, service_filter):
+  solr_urls = get_solr_urls(config)
+  collections=retry(list_collections, options, config, solr_urls, context="[List Solr Collections]")
+  collections=filter_collection(options, collections)
+  if 'RANGER' in service_filter and config.has_section('ranger_collection') and config.has_option('ranger_collection', 'enabled') \
+    and config.get('ranger_collection', 'enabled') == 'true':
+    backup_ranger_collection = config.get('ranger_collection', 'backup_ranger_collection_name')
+    if backup_ranger_collection in collections:
+      retry(reload_collection, options, config, solr_urls, backup_ranger_collection, context="[Reload Solr Collections]")
+    else:
+      print "Collection ('{0}') does not exist or filtered out. Skipping reload operation.".format(backup_ranger_collection)
+  if 'ATLAS' in service_filter and config.has_section('atlas_collections') \
+    and config.has_option('atlas_collections', 'enabled') and config.get('atlas_collections', 'enabled') == 'true':
+    backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name')
+    if backup_fulltext_index_name in collections:
+      retry(reload_collection, options, config, solr_urls, backup_fulltext_index_name, context="[Reload Solr Collections]")
+    else:
+      print "Collection ('{0}') does not exist or filtered out. Skipping reload operation.".format(backup_fulltext_index_name)
+    backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name')
+    if backup_edge_index_name in collections:
+      retry(reload_collection, options, config, solr_urls, backup_edge_index_name, context="[Reload Solr Collections]")
+    else:
+      print "Collection ('{0}') does not exist or filtered out. Skipping reload operation.".format(backup_edge_index_name)
+    backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name')
+    if backup_vertex_index_name in collections:
+      retry(reload_collection, options, config, solr_urls, backup_vertex_index_name, context="[Reload Solr Collections]")
+    else:
+      print "Collection ('{0}') does not exist or filtered out. Skipping reload operation.".format(backup_fulltext_index_name)
 
-  cmd_request = create_command_request("BACKUP", parameters, component_hosts, options.cluster, 'Backup Solr Collection: ' + options.collection)
-  post_json(accessor, CLUSTERS_URL.format(options.cluster) + REQUESTS_API_URL, cmd_request)
+def validate_ini_file(options, parser):
+  if options.ini_file is None:
+    parser.print_help()
+    print 'ini-file option is missing'
+    sys.exit(1)
+  elif not os.path.isfile(options.ini_file):
+    parser.print_help()
+    print 'ini file ({0}) does not exist'.format(options.ini_file)
+    sys.exit(1)
 
 if __name__=="__main__":
   parser = optparse.OptionParser("usage: %prog [options]")
-  parser.add_option("-H", "--host", dest="host", default="localhost", type="string", help="hostname for ambari server")
-  parser.add_option("-P", "--port", dest="port", default=8080, type="int", help="port number for ambari server")
-  parser.add_option("-c", "--cluster", dest="cluster", type="string", help="name cluster")
-  parser.add_option("-s", "--ssl", dest="ssl", action="store_true", help="use if ambari server using https")
-  parser.add_option("-u", "--username", dest="username", default="admin", type="string", help="username for accessing ambari server")
-  parser.add_option("-p", "--password", dest="password", default="admin", type="string", help="password for accessing ambari server")
-
-  parser.add_option("-a", "--action", dest="action", type="string", help="backup | restore | migrate ")
+
+  parser.add_option("-a", "--action", dest="action", type="string", help="delete-collections | backup | cleanup-znodes | backup-and-cleanup | restore | migrate ")
+  parser.add_option("-i", "--ini-file", dest="ini_file", type="string", help="Config ini file to parse (required)")
   parser.add_option("-f", "--force", dest="force", default=False, action="store_true", help="force index upgrade even if it's the right version")
-  parser.add_option("--index-location", dest="index_location", type="string", help="location of the index backups")
-  parser.add_option("--backup-name", dest="backup_name", type="string", help="backup name of the index")
-  parser.add_option("--collection", dest="collection", type="string", help="solr collection")
+  parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="use for verbose logging")
+  parser.add_option("-s", "--service-filter", dest="service_filter", default=None, type="string", help="run commands only selected services (comma separated: LOGSEARCH,ATLAS,RANGER)")
+  parser.add_option("-c", "--collection", dest="collection", default=None, type="string", help="selected collection to run an operation")
+  parser.add_option("--async", dest="async", action="store_true", default=False, help="async Ambari operations (backup | restore | migrate)")
+  parser.add_option("--index-location", dest="index_location", type="string", help="location of the index backups. add ranger/atlas prefix after the path. required only if no backup path in the ini file")
+  parser.add_option("--atlas-index-location", dest="atlas_index_location", type="string", help="location of the index backups (for atlas). required only if no backup path in the ini file")
+  parser.add_option("--ranger-index-location", dest="ranger_index_location", type="string", help="location of the index backups (for ranger). required only if no backup path in the ini file")
 
   parser.add_option("--version", dest="index_version", type="string", default="6.6.2", help="lucene index version for migration (6.6.2 or 7.3.1)")
   parser.add_option("--request-tries", dest="request_tries", type="int", help="number of tries for BACKUP/RESTORE status api calls in the request")
   parser.add_option("--request-time-interval", dest="request_time_interval", type="int", help="time interval between BACKUP/RESTORE status api calls in the request")
-  parser.add_option("--request-async", dest="async", action="store_true", default=False, help="skip BACKUP/RESTORE status api calls from the command")
+  parser.add_option("--request-async", dest="request_async", action="store_true", default=False, help="skip BACKUP/RESTORE status api calls from the command")
   parser.add_option("--shared-fs", dest="shared_fs", action="store_true", default=False, help="shared fs for storing backup (will create index location to <path><hostname>)")
   parser.add_option("--solr-hosts", dest="solr_hosts", type="string", help="comma separated list of solr hosts")
   parser.add_option("--disable-solr-host-check", dest="disable_solr_host_check", action="store_true", default=False, help="Disable to check solr hosts are good for the collection backups")
   parser.add_option("--core-filter", dest="core_filter", default=None, type="string", help="core filter for replica folders")
   parser.add_option("--skip-cores", dest="skip_cores", default=None, type="string", help="specific cores to skip (comma separated)")
   parser.add_option("--skip-generate-restore-host-cores", dest="skip_generate_restore_host_cores", default=False, action="store_true", help="Skip the generation of restore_host_cores.json, just read the file itself, can be useful if command failed at some point.")
-  parser.add_option("--shards", dest="solr_shards", type="int", default=0, help="number of shards (required to set properly for restore)")
   parser.add_option("--solr-hdfs-path", dest="solr_hdfs_path", type="string", default=None, help="Base path of Solr (where collections are located) if HDFS is used (like /user/infra-solr)")
   parser.add_option("--solr-keep-backup", dest="solr_keep_backup", default=False, action="store_true", help="If it is turned on, Snapshot Solr data will not be deleted from the filesystem during restore.")
 
-
   (options, args) = parser.parse_args()
+
+  set_log_level(options.verbose)
+
+  validate_ini_file(options, parser)
+
+  config = ConfigParser.RawConfigParser()
+  config.read(options.ini_file)
+
+  service_filter=options.service_filter.upper().split(',') if options.service_filter is not None else ['LOGSEARCH', 'ATLAS', 'RANGER']
+
   if options.action is None:
      parser.print_help()
      print 'action option is missing'
+     sys.exit(1)
   else:
-    protocol = 'https' if options.ssl else 'http'
-    accessor = api_accessor(options.host, options.username, options.password, protocol, options.port)
-    print 'Inputs: ' + str(options)
-    if options.action.lower() == 'backup':
-      backup(options, accessor, parser)
-    elif options.action.lower() == 'restore':
-      restore(options, accessor, parser)
-    elif options.action.lower() == 'migrate':
-      migrate(options, accessor, parser)
-    else:
-      parser.print_help()
-      print 'action option is invalid (available actions: backup | restore | migrate)'
+    if config.has_section('ambari_server'):
+      host = config.get('ambari_server', 'host')
+      port = config.get('ambari_server', 'port')
+      protocol = config.get('ambari_server', 'protocol')
+      username = config.get('ambari_server', 'username')
+      password = config.get('ambari_server', 'password')
+      accessor = api_accessor(host, username, password, protocol, port)
+      if options.action.lower() == 'backup':
+        backup_ranger_configs(options, config, service_filter)
+        backup_collections(options, accessor, parser, config, service_filter)
+      elif options.action.lower() == 'delete-collections':
+        delete_collections(options, config, service_filter)
+        delete_znodes(options, config, service_filter)
+        upgrade_ranger_schema(options, config, service_filter)
+      elif options.action.lower() == 'cleanup-znodes':
+        delete_znodes(options, config, service_filter)
+        upgrade_ranger_schema(options, config, service_filter)
+      elif options.action.lower() == 'backup-and-cleanup':
+        backup_ranger_configs(options, config, service_filter)
+        backup_collections(options, accessor, parser, config, service_filter)
+        delete_collections(options, config, service_filter)
+        delete_znodes(options, config, service_filter)
+        upgrade_ranger_schema(options, config, service_filter)
+      elif options.action.lower() == 'restore':
+        upgrade_ranger_solrconfig_xml(options, config, service_filter)
+        create_backup_collections(options, accessor, parser, config, service_filter)
+        restore_collections(options, accessor, parser, config, service_filter)
+        reload_collections(options, accessor, parser, config, service_filter)
+      elif options.action.lower() == 'migrate':
+        migrate_snapshots(options, accessor, parser, config, service_filter)
+      else:
+        parser.print_help()
+        print 'action option is invalid (available actions: delete-collections | backup | cleanup-znodes | backup-and-cleanup | restore | migrate)'
+        sys.exit(1)
+
+      print "Migration helper command {0}FINISHED{1}".format(colors.OKGREEN, colors.ENDC)
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
oleewere@apache.org.