You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/06/27 13:40:36 UTC

[ambari] 01/02: AMBARI-23945. Simplify old Solr data transport

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit d10f8cc061ee1206983ef1a4ac6ec657a568067e
Author: Oliver Szabo <ol...@gmail.com>
AuthorDate: Wed Jun 27 15:05:53 2018 +0200

    AMBARI-23945. Simplify old Solr data transport
---
 .../src/main/python/migrationHelper.py             | 111 +++++++++++++++++++--
 .../src/main/python/solrDataManager.py             |  55 ++++++----
 .../src/main/resources/ambariSolrMigration.sh      |  13 ++-
 3 files changed, 146 insertions(+), 33 deletions(-)

diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
index fb82ac2..56ab9ad 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
@@ -31,7 +31,9 @@ import socket
 import time
 import traceback
 import ConfigParser
+import solrDataManager as solr_data_manager
 
+from datetime import datetime, timedelta
 from random import randrange
 from subprocess import Popen, PIPE
 
@@ -133,11 +135,10 @@ def retry(func, *args, **kwargs):
     logger.info("\n{0}: waiting for {1} seconds before retyring again (retry count: {2})".format(context, delay, r+1))
     time.sleep(delay)
   print '{0} operation {1}FAILED{2}'.format(context, colors.FAIL, colors.ENDC)
-  exit(1)
+  sys.exit(1)
 
-def create_solr_api_request_command(request_url, config, output=None):
-  user='infra-solr'
-  kerberos_enabled='false'
+def get_keytab_and_principal(config):
+  kerberos_enabled = 'false'
   keytab=None
   principal=None
   if config.has_section('cluster') and config.has_option('cluster', 'kerberos_enabled'):
@@ -151,7 +152,14 @@ def create_solr_api_request_command(request_url, config, output=None):
         keytab=config.get('infra_solr', 'keytab')
       if config.has_option('infra_solr', 'principal'):
         principal=config.get('infra_solr', 'principal')
+  return keytab, principal
 
+def create_solr_api_request_command(request_url, config, output=None):
+  user='infra-solr'
+  kerberos_enabled='false'
+  if config.has_section('cluster') and config.has_option('cluster', 'kerberos_enabled'):
+    kerberos_enabled=config.get('cluster', 'kerberos_enabled')
+  keytab, principal=get_keytab_and_principal(config)
   use_infra_solr_user="sudo -u {0}".format(user)
   curl_prefix = "curl -k"
   if output is not None:
@@ -202,7 +210,7 @@ def create_infra_solr_client_command(options, config, command, appendZnode=False
 
   return solr_cli_cmd
 
-def get_random_solr_url(solr_urls, options):
+def get_random_solr_url(solr_urls, options = None):
   random_index = randrange(0, len(solr_urls))
   result = solr_urls[random_index]
   logger.debug("Use {0} solr address for next request.".format(result))
@@ -753,7 +761,7 @@ def get_solr_urls(options, config, collection, collections_json):
     solr_hosts = config.get('infra_solr', 'hosts')
 
   splitted_solr_hosts = solr_hosts.split(',')
-  filter_solr_hosts_if_match_any(splitted_solr_hosts, collection, collections_json)
+  splitted_solr_hosts = filter_solr_hosts_if_match_any(splitted_solr_hosts, collection, collections_json)
   if options.include_solr_hosts:
     # keep only included ones, do not override any
     include_solr_hosts_list = options.include_solr_hosts.split(',')
@@ -778,6 +786,26 @@ def get_solr_urls(options, config, collection, collections_json):
 
   return solr_urls
 
+def get_input_output_solr_url(src_solr_urls, target_solr_urls):
+  """
+  Choose random solr urls for the source and target collections, prefer localhost and common urls
+  """
+  def intersect(a, b):
+    return list(set(a) & set(b))
+  input_solr_urls = src_solr_urls
+  output_solr_urls = target_solr_urls
+  hostname = socket.getfqdn()
+  if any(hostname in s for s in input_solr_urls):
+    input_solr_urls = filter(lambda x: hostname in x, input_solr_urls)
+  if any(hostname in s for s in output_solr_urls):
+    output_solr_urls = filter(lambda x: hostname in x, output_solr_urls)
+  common_url_list = intersect(input_solr_urls, output_solr_urls)
+  if common_url_list:
+    input_solr_urls = common_url_list
+    output_solr_urls = common_url_list
+
+  return get_random_solr_url(input_solr_urls), get_random_solr_url(output_solr_urls)
+
 def is_atlas_available(config, service_filter):
   return 'ATLAS' in service_filter and config.has_section('atlas_collections') \
     and config.has_option('atlas_collections', 'enabled') and config.get('atlas_collections', 'enabled') == 'true'
@@ -1040,6 +1068,7 @@ def evaluate_check_shard_result(collection, result, skip_index_size = False):
   all_shards = result['all_shards']
   warnings = 0
   print 30 * "-"
+  print "Number of shards: {0}".format(str(len(all_shards)))
   for shard in all_shards:
     if shard in active_shards:
       print "{0}OK{1}: Found active leader replica for {2}" \
@@ -1241,7 +1270,6 @@ def update_state_json(original_collection, collection, config, options):
 
   copy_znode(options, config, "{0}/new_state.json".format(coll_data_dir), "{0}/collections/{1}/state.json".format(solr_znode, collection), copy_from_local=True)
 
-
 def delete_znodes(options, config, service_filter):
   solr_znode='/infra-solr'
   if is_logsearch_available(config, service_filter):
@@ -1726,13 +1754,65 @@ def check_docs(options, accessor, parser, config):
   else:
     print "Check number of documents - Not found any collections."
 
+def run_solr_data_manager_on_collection(options, config, collections, src_collection, target_collection,
+                                        collections_json_location, num_docs, skip_date_usage = True):
+  if target_collection in collections and src_collection in collections:
+    source_solr_urls = get_solr_urls(options, config, src_collection, collections_json_location)
+    target_solr_urls = get_solr_urls(options, config, target_collection, collections_json_location)
+    if is_collection_empty(num_docs, src_collection):
+      print "Collection '{0}' is empty. Skipping transport data operation.".format(target_collection)
+    else:
+      src_solr_url, target_solr_url = get_input_output_solr_url(source_solr_urls, target_solr_urls)
+      keytab, principal = get_keytab_and_principal(config)
+      date_format = "%Y-%m-%dT%H:%M:%S.%fZ"
+      d = datetime.now() + timedelta(days=365)
+      end = d.strftime(date_format)
+      print "Running solrDataManager.py (solr input collection: {0}, solr output collection: {1})"\
+        .format(src_collection, target_collection)
+      solr_data_manager.verbose = options.verbose
+      solr_data_manager.set_log_level(True)
+      solr_data_manager.save("archive", src_solr_url, src_collection, "evtTime", "id", end,
+                             options.transport_read_block_size, options.transport_write_block_size,
+                             False, None, None, keytab, principal, False, "none", None, None, None,
+                             None, None, None, None, None, target_collection,
+                             target_solr_url, "_version_", skip_date_usage)
+  else:
+    print "Collection '{0}' or {1} does not exist or filtered out. Skipping transport data operation.".format(target_collection, src_collection)
+
+def transfer_old_data(options, accessor, parser, config):
+  collections_json_location = COLLECTIONS_DATA_JSON_LOCATION.format("transport_collections.json")
+  collections=list_collections(options, config, collections_json_location, include_number_of_docs=True)
+  collections=filter_collections(options, collections)
+  docs_map = get_number_of_docs_map(collections_json_location) if collections else {}
+  if is_ranger_available(config, service_filter):
+    original_ranger_collection = config.get('ranger_collection', 'ranger_collection_name')
+    backup_ranger_collection = config.get('ranger_collection', 'backup_ranger_collection_name')
+    run_solr_data_manager_on_collection(options, config, collections, backup_ranger_collection,
+                                        original_ranger_collection, collections_json_location, docs_map, skip_date_usage=False)
+  if is_atlas_available(config, service_filter):
+    original_fulltext_index_name = config.get('atlas_collections', 'fulltext_index_name')
+    backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name')
+    run_solr_data_manager_on_collection(options, config, collections, backup_fulltext_index_name,
+                                        original_fulltext_index_name, collections_json_location, docs_map)
+
+    original_edge_index_name = config.get('atlas_collections', 'edge_index_name')
+    backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name')
+    run_solr_data_manager_on_collection(options, config, collections, backup_edge_index_name,
+                                        original_edge_index_name, collections_json_location, docs_map)
+
+    original_vertex_index_name = config.get('atlas_collections', 'vertex_index_name')
+    backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name')
+    run_solr_data_manager_on_collection(options, config, collections, backup_vertex_index_name,
+                                        original_vertex_index_name, collections_json_location, docs_map)
+
+
 if __name__=="__main__":
   parser = optparse.OptionParser("usage: %prog [options]")
 
   parser.add_option("-a", "--action", dest="action", type="string", help="delete-collections | backup | cleanup-znodes | backup-and-cleanup | migrate | restore |' \
               ' rolling-restart-solr | rolling-restart-atlas | rolling-restart-ranger | check-shards | check-backup-shards | enable-solr-authorization | disable-solr-authorization |'\
               ' fix-solr5-kerberos-config | fix-solr7-kerberos-config | upgrade-solr-clients | upgrade-solr-instances | upgrade-logsearch-portal | upgrade-logfeeders | stop-logsearch |'\
-              ' restart-solr |restart-logsearch | restart-ranger | restart-atlas")
+              ' restart-solr |restart-logsearch | restart-ranger | restart-atlas | transport-old-data")
   parser.add_option("-i", "--ini-file", dest="ini_file", type="string", help="Config ini file to parse (required)")
   parser.add_option("-f", "--force", dest="force", default=False, action="store_true", help="force index upgrade even if it's the right version")
   parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="use for verbose logging")
@@ -1747,6 +1827,8 @@ if __name__=="__main__":
   parser.add_option("--request-tries", dest="request_tries", type="int", help="number of tries for BACKUP/RESTORE status api calls in the request")
   parser.add_option("--request-time-interval", dest="request_time_interval", type="int", help="time interval between BACKUP/RESTORE status api calls in the request")
   parser.add_option("--request-async", dest="request_async", action="store_true", default=False, help="skip BACKUP/RESTORE status api calls from the command")
+  parser.add_option("--transport-read-block-size", dest="transport_read_block_size", type="string", help="block size to use for reading from solr during transport",default=10000)
+  parser.add_option("--transport-write-block-size", dest="transport_write_block_size", type="string", help="number of records in the output files during transport", default=100000)
   parser.add_option("--include-solr-hosts", dest="include_solr_hosts", type="string", help="comma separated list of included solr hosts")
   parser.add_option("--exclude-solr-hosts", dest="exclude_solr_hosts", type="string", help="comma separated list of excluded solr hosts")
   parser.add_option("--disable-solr-host-check", dest="disable_solr_host_check", action="store_true", default=False, help="Disable to check solr hosts are good for the collection backups")
@@ -1790,6 +1872,13 @@ if __name__=="__main__":
       username = config.get('ambari_server', 'username')
       password = config.get('ambari_server', 'password')
       accessor = api_accessor(host, username, password, protocol, port)
+
+      if config.has_section('infra_solr') and config.has_option('infra_solr', 'hosts'):
+        local_host = socket.getfqdn()
+        solr_hosts = config.get('infra_solr', 'hosts')
+        if solr_hosts and local_host not in solr_hosts.split(","):
+          print "{0}WARNING{1}: Host '{2}' is not found in Infra Solr hosts ({3}). Migration commands won't work from here." \
+            .format(colors.WARNING, colors.ENDC, local_host, solr_hosts)
       if options.action.lower() == 'backup':
         backup_ranger_configs(options, config, service_filter)
         backup_collections(options, accessor, parser, config, service_filter)
@@ -1881,13 +1970,17 @@ if __name__=="__main__":
         check_shards(options, accessor, parser, config, backup_shards=True)
       elif options.action.lower() == 'check-docs':
         check_docs(options, accessor, parser, config)
+      elif options.action.lower() == 'transport-old-data':
+        check_docs(options, accessor, parser, config)
+        transfer_old_data(options, accessor, parser, config)
+        check_docs(options, accessor, parser, config)
       else:
         parser.print_help()
         print 'action option is invalid (available actions: delete-collections | backup | cleanup-znodes | backup-and-cleanup | migrate | restore |' \
               ' rolling-restart-solr | rolling-restart-ranger | rolling-restart-atlas | check-shards | check-backup-shards | check-docs | enable-solr-authorization |'\
               ' disable-solr-authorization | fix-solr5-kerberos-config | fix-solr7-kerberos-config | upgrade-solr-clients | upgrade-solr-instances | upgrade-logsearch-portal |' \
               ' upgrade-logfeeders | stop-logsearch | restart-solr |' \
-              ' restart-logsearch | restart-ranger | restart-atlas)'
+              ' restart-logsearch | restart-ranger | restart-atlas | transport-old-data )'
         sys.exit(1)
       command_elapsed_time = time.time() - command_start_time
       time_to_print = time.strftime("%H:%M:%S", time.gmtime(command_elapsed_time))
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
index b34873d..e02c491 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
@@ -47,7 +47,7 @@ def parse_arguments():
   parser = optparse.OptionParser("usage: %prog [options]", version="Solr Data Manager {0}".format(VERSION))
 
   parser.add_option("-m", "--mode", dest="mode", type="string", help="archive | delete | save")
-  parser.add_option("-s", "--solr-url", dest="solr_url", type="string", help="the url of the solr server including the port")
+  parser.add_option("-s", "--solr-url", dest="solr_url", type="string", help="the url of the solr server including the port and protocol")
   parser.add_option("-c", "--collection", dest="collection", type="string", help="the name of the solr collection")
   parser.add_option("-f", "--filter-field", dest="filter_field", type="string", help="the name of the field to filter on")
   parser.add_option("-r", "--read-block-size", dest="read_block_size", type="int", help="block size to use for reading from solr",
@@ -90,6 +90,7 @@ def parse_arguments():
   parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False)
 
   parser.add_option("--solr-output-collection", dest="solr_output_collection", help="target output solr collection for archive", type="string", default=None)
+  parser.add_option("--solr-output-url", dest="solr_output_url", default=None, type="string", help="the url of the output solr server including the port and protocol")
   parser.add_option("--exclude-fields", dest="exclude_fields", help="Comma separated list of excluded fields from json response", type="string", default=None)
   parser.add_option("--skip-date-usage", dest="skip_date_usage", action="store_true", default=False, help="datestamp field won't be used for queries (sort based on id field)")
 
@@ -208,6 +209,8 @@ def parse_arguments():
     print("  compression: " + options.compression)
   if options.__dict__["solr_output_collection"] is not None:
     print("  solr output collection: " + options.solr_output_collection)
+  if options.__dict__["solr_output_url"] is not None:
+    print("  solr output url: " + options.solr_output_collection)
   if (options.__dict__["hdfs_keytab"] is not None):
     print("  hdfs-keytab: " + options.hdfs_keytab)
     print("  hdfs-principal: " + options.hdfs_principal)
@@ -237,11 +240,13 @@ def parse_arguments():
 
   return options
 
-def set_log_level():
+def set_log_level(disable=False):
   if verbose:
     logger.setLevel(logging.DEBUG)
   else:
     logger.setLevel(logging.INFO)
+  if disable:
+    logger.removeHandler(handler)
 
 def get_end(options):
   if options.end:
@@ -272,7 +277,7 @@ def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal,
 def save(mode, solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size,
          ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, json_file,
          compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path,
-         solr_output_collection, exclude_fields, skip_date_usage):
+         solr_output_collection, solr_output_url, exclude_fields, skip_date_usage):
   solr_kinit_command = None
   if solr_keytab:
     solr_kinit_command = "kinit -kt {0} {1}".format(solr_keytab, solr_principal)
@@ -284,17 +289,18 @@ def save(mode, solr_url, collection, filter_field, id_field, range_end, read_blo
   if hdfs_keytab:
     hdfs_kinit_command = "sudo -u {0} kinit -kt {1} {2}".format(hdfs_user, hdfs_keytab, hdfs_principal)
 
-  if options.hdfs_path:
+  if hdfs_path:
     ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path)
 
   working_dir = get_working_dir(solr_url, collection)
   if mode == "archive":
-    handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading)
+    handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir,
+                                ignore_unfinished_uploading, skip_date_usage)
 
   save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
             range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression,
-            hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, exclude_fields,
-            skip_date_usage)
+            hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, solr_output_url,
+            exclude_fields, skip_date_usage)
 
 def ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path):
   if hdfs_kinit_command:
@@ -329,7 +335,7 @@ def get_working_dir(solr_url, collection):
   logger.debug("Working directory is %s", working_dir)
   return working_dir
 
-def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading):
+def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading, skip_date_usage):
   command_json_path = "{0}/command.json".format(working_dir)
   if os.path.isfile(command_json_path):
     with open(command_json_path) as command_file:
@@ -345,7 +351,8 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre
       logger.info("You may try to run the program with '-g' or '--ignore-unfinished-uploading' to ignore it if it keeps on failing")
 
       if command["upload"]["type"] == "solr":
-        upload_file_to_solr(solr_kinit_command, curl_prefix, command["upload"]["command"], command["upload"]["upload_file_path"], command["upload"]["solr_output_collection"])
+        upload_file_to_solr(solr_kinit_command, curl_prefix, command["upload"]["command"], command["upload"]["upload_file_path"],
+                            command["upload"]["solr_output_collection"])
       elif command["upload"]["type"] == "hdfs":
         upload_file_hdfs(hdfs_kinit_command, command["upload"]["command"], command["upload"]["upload_file_path"],
                          command["upload"]["hdfs_path"], command["upload"]["hdfs_user"])
@@ -361,14 +368,14 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre
     if "delete" in command.keys():
       delete_data(solr_kinit_command, curl_prefix, command["delete"]["command"], command["delete"]["collection"],
                   command["delete"]["filter_field"], command["delete"]["id_field"], command["delete"]["prev_lot_end_value"],
-                  command["delete"]["prev_lot_end_id"])
+                  command["delete"]["prev_lot_end_id"], skip_date_usage)
 
     os.remove(command_json_path)
 
 def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
               range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file,
               compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection,
-              exclude_fields, skip_date_usage):
+              solr_output_url, exclude_fields, skip_date_usage):
   logger.info("Starting to save data")
 
   tmp_file_path = "{0}/tmp.json".format(working_dir)
@@ -410,7 +417,7 @@ def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_ur
       upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
                    id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user,
                    hdfs_path, key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection,
-                   skip_date_usage)
+                   solr_output_url, skip_date_usage)
       total_records += records
       logger.info("A total of %d records are saved", total_records)
 
@@ -493,7 +500,8 @@ def finish_file(tmp_file, json_file):
 
 def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
                  id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
-                 key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection, skip_date_usage):
+                 key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection, solr_output_url,
+                 skip_date_usage):
   if name:
     file_name = "{0}_-_{1}_-_{2}_-_{3}".format(collection, name, prev_lot_end_value, prev_lot_end_id).replace(':', '_')
   else:
@@ -503,7 +511,8 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr
 
   upload_command = create_command_file(mode, True, working_dir, upload_file_path, solr_url, collection, filter_field,
                                        id_field, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
-                                       key_file_path, bucket, key_prefix, local_path, solr_output_collection, skip_date_usage)
+                                       key_file_path, bucket, key_prefix, local_path, solr_output_collection, solr_output_url,
+                                       skip_date_usage)
   if solr_output_collection:
     upload_file_to_solr(solr_kinit_command, curl_prefix, upload_command, upload_file_path, solr_output_collection)
   elif hdfs_user:
@@ -518,10 +527,10 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr
 
   delete_command = create_command_file(mode, False, working_dir, upload_file_path, solr_url, collection, filter_field,
                                        id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None, None,
-                                       skip_date_usage)
+                                       None, skip_date_usage)
   if mode == "archive":
     delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
-                prev_lot_end_id)
+                prev_lot_end_id, skip_date_usage)
     os.remove("{0}/command.json".format(working_dir))
 
 def compress_file(working_dir, tmp_file_path, file_name, compression):
@@ -567,7 +576,7 @@ def compress_file(working_dir, tmp_file_path, file_name, compression):
 
 def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, collection, filter_field, id_field,
                         prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix,
-                        local_path, solr_output_collection, skip_date_usage):
+                        local_path, solr_output_collection, solr_output_url, skip_date_usage):
   commands = {}
 
   if upload:
@@ -577,8 +586,9 @@ def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, c
 
   if upload:
     if solr_output_collection:
+      command_url = solr_output_url if solr_output_url else solr_url
       upload_command = "{0}/{1}/update/json/docs?commit=true&wt=json --data-binary @{2}"\
-        .format(solr_url, solr_output_collection, upload_file_path)
+        .format(command_url, solr_output_collection, upload_file_path)
       upload_command_data = {}
       upload_command_data["type"] = "solr"
       upload_command_data["command"] = upload_command
@@ -726,11 +736,14 @@ def upload_file_to_solr(solr_kinit_command, curl_prefix, upload_command, upload_
     logger.info("Save data to collection: %s", collection)
 
 def delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
-                prev_lot_end_id):
+                prev_lot_end_id, skip_date_usage):
   delete_cmd = delete_command.split(" --data-binary")[0]
   delete_query_data = delete_command.split("--data-binary ")[1].replace("+", " ")
   query_solr(solr_kinit_command, delete_cmd, "{0} -H Content-Type:text/xml {1}".format(curl_prefix, delete_cmd), "Deleting", delete_query_data)
-  logger.info("Deleted data from collection %s where %s,%s < %s,%s", collection, filter_field, id_field, prev_lot_end_value,
+  if skip_date_usage:
+    logger.info("Deleted data from collection %s where %s < %s", collection, id_field, prev_lot_end_id)
+  else:
+    logger.info("Deleted data from collection %s where %s,%s < %s,%s", collection, filter_field, id_field, prev_lot_end_value,
               prev_lot_end_id)
 
 def query_solr(solr_kinit_command, url, curl_command, action, data=None):
@@ -800,7 +813,7 @@ if __name__ == '__main__':
            options.additional_filter, options.name, options.solr_keytab, options.solr_principal, options.json_file,
            options.compression, options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path,
            options.key_file_path, options.bucket, options.key_prefix, options.local_path, options.solr_output_collection,
-           options.exclude_fields, options.skip_date_usage)
+           options.solr_output_url, options.exclude_fields, options.skip_date_usage)
     else:
       logger.warn("Unknown mode: %s", options.mode)
 
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/resources/ambariSolrMigration.sh b/ambari-infra/ambari-infra-solr-client/src/main/resources/ambariSolrMigration.sh
index 938d649..e054a89 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/resources/ambariSolrMigration.sh
+++ b/ambari-infra/ambari-infra-solr-client/src/main/resources/ambariSolrMigration.sh
@@ -18,7 +18,7 @@ function print_help() {
   cat << EOF
    Usage: /usr/lib/ambari-infra-solr-client/ambariSolrMigration.sh --mode <MODE> --ini-file <ini_file> [additional options]
 
-   -m, --mode  <MODE>                     available migration modes: delete-only | backup-only | migrate-restore | all
+   -m, --mode  <MODE>                     available migration modes: delete-only | backup-only | migrate-restore | all | transport
    -i, --ini-file <INI_FILE>              ini-file location (used by migrationHelper.py)
    -s, --migration-script-location <file> migrateHelper.py location (default: /usr/lib/ambari-infra-solr-client/migrationHelper.py)
    -w, --wait-between-steps <seconds>     wait between different migration steps in seconds (default: 15)
@@ -89,6 +89,13 @@ function run_migrate_commands() {
 
   start_date=$(date +%s)
 
+  # execute on: transport
+  if [[ "$mode" == "transport" ]] ; then
+    log_command "$python_location $script_location --ini-file $ini_file --action transport-old-data $verbose_val"
+    $python_location $script_location --ini-file $ini_file --action transport-old-data $verbose_val
+    handle_result "$?" "Transport Old Solr Data" "$python_location" "$start_date"
+  fi
+
   # execute on: backup - all
   if [[ "$mode" == "backup" || "$mode" == "all" ]] ; then
     log_command "$python_location $script_location --ini-file $ini_file --action check-shards $verbose_val $skip_warnings_val"
@@ -97,7 +104,7 @@ function run_migrate_commands() {
   fi
 
   # execute on: backup - delete - all
-  if [[ "$mode" != "migrate-restore" ]] ; then
+  if [[ "$mode" == "delete" || "$mode" == "backup" || "$mode" == "all" ]] ; then
     if [[ "$skip_solr_client_upgrade" != "true" ]]; then
       log_command "$python_location $script_location --ini-file $ini_file --action upgrade-solr-clients $verbose_val"
       $python_location $script_location --ini-file $ini_file --action upgrade-solr-clients $verbose_val
@@ -302,7 +309,7 @@ function main() {
     print_help
     exit 1
   else
-    if [[ "$MODE" == "delete" || "$MODE" == "backup" || "$MODE" == "migrate-restore" || "$MODE" == "all" ]]; then
+    if [[ "$MODE" == "delete" || "$MODE" == "backup" || "$MODE" == "migrate-restore" || "$MODE" == "all" || "$MODE" == "transport" ]]; then
       run_migrate_commands "$MODE" "$SCRIPT_LOCATION" "$PYTHON_PATH_FOR_MIGRATION" "$INI_FILE" "$WAIT" "$SKIP_SOLR_CLIENT_UPGRADE" "$SKIP_SOLR_SERVER_UPGRADE" "$SKIP_LOGSEARCH_UPGRADE" "$SKIP_WARNINGS" "$BATCH_INTERVAL" "$KEEP_BACKUP" "$VERBOSE"
     else
       echo "mode '$MODE' is not supported"