You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/06/27 13:40:36 UTC
[ambari] 01/02: AMBARI-23945. Simplify old Solr data transport
This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
commit d10f8cc061ee1206983ef1a4ac6ec657a568067e
Author: Oliver Szabo <ol...@gmail.com>
AuthorDate: Wed Jun 27 15:05:53 2018 +0200
AMBARI-23945. Simplify old Solr data transport
---
.../src/main/python/migrationHelper.py | 111 +++++++++++++++++++--
.../src/main/python/solrDataManager.py | 55 ++++++----
.../src/main/resources/ambariSolrMigration.sh | 13 ++-
3 files changed, 146 insertions(+), 33 deletions(-)
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
index fb82ac2..56ab9ad 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
@@ -31,7 +31,9 @@ import socket
import time
import traceback
import ConfigParser
+import solrDataManager as solr_data_manager
+from datetime import datetime, timedelta
from random import randrange
from subprocess import Popen, PIPE
@@ -133,11 +135,10 @@ def retry(func, *args, **kwargs):
logger.info("\n{0}: waiting for {1} seconds before retyring again (retry count: {2})".format(context, delay, r+1))
time.sleep(delay)
print '{0} operation {1}FAILED{2}'.format(context, colors.FAIL, colors.ENDC)
- exit(1)
+ sys.exit(1)
-def create_solr_api_request_command(request_url, config, output=None):
- user='infra-solr'
- kerberos_enabled='false'
+def get_keytab_and_principal(config):
+ kerberos_enabled = 'false'
keytab=None
principal=None
if config.has_section('cluster') and config.has_option('cluster', 'kerberos_enabled'):
@@ -151,7 +152,14 @@ def create_solr_api_request_command(request_url, config, output=None):
keytab=config.get('infra_solr', 'keytab')
if config.has_option('infra_solr', 'principal'):
principal=config.get('infra_solr', 'principal')
+ return keytab, principal
+def create_solr_api_request_command(request_url, config, output=None):
+ user='infra-solr'
+ kerberos_enabled='false'
+ if config.has_section('cluster') and config.has_option('cluster', 'kerberos_enabled'):
+ kerberos_enabled=config.get('cluster', 'kerberos_enabled')
+ keytab, principal=get_keytab_and_principal(config)
use_infra_solr_user="sudo -u {0}".format(user)
curl_prefix = "curl -k"
if output is not None:
@@ -202,7 +210,7 @@ def create_infra_solr_client_command(options, config, command, appendZnode=False
return solr_cli_cmd
-def get_random_solr_url(solr_urls, options):
+def get_random_solr_url(solr_urls, options = None):
random_index = randrange(0, len(solr_urls))
result = solr_urls[random_index]
logger.debug("Use {0} solr address for next request.".format(result))
@@ -753,7 +761,7 @@ def get_solr_urls(options, config, collection, collections_json):
solr_hosts = config.get('infra_solr', 'hosts')
splitted_solr_hosts = solr_hosts.split(',')
- filter_solr_hosts_if_match_any(splitted_solr_hosts, collection, collections_json)
+ splitted_solr_hosts = filter_solr_hosts_if_match_any(splitted_solr_hosts, collection, collections_json)
if options.include_solr_hosts:
# keep only included ones, do not override any
include_solr_hosts_list = options.include_solr_hosts.split(',')
@@ -778,6 +786,26 @@ def get_solr_urls(options, config, collection, collections_json):
return solr_urls
+def get_input_output_solr_url(src_solr_urls, target_solr_urls):
+ """
+ Choose random solr urls for the source and target collections, prefer localhost and common urls
+ """
+ def intersect(a, b):
+ return list(set(a) & set(b))
+ input_solr_urls = src_solr_urls
+ output_solr_urls = target_solr_urls
+ hostname = socket.getfqdn()
+ if any(hostname in s for s in input_solr_urls):
+ input_solr_urls = filter(lambda x: hostname in x, input_solr_urls)
+ if any(hostname in s for s in output_solr_urls):
+ output_solr_urls = filter(lambda x: hostname in x, output_solr_urls)
+ common_url_list = intersect(input_solr_urls, output_solr_urls)
+ if common_url_list:
+ input_solr_urls = common_url_list
+ output_solr_urls = common_url_list
+
+ return get_random_solr_url(input_solr_urls), get_random_solr_url(output_solr_urls)
+
def is_atlas_available(config, service_filter):
return 'ATLAS' in service_filter and config.has_section('atlas_collections') \
and config.has_option('atlas_collections', 'enabled') and config.get('atlas_collections', 'enabled') == 'true'
@@ -1040,6 +1068,7 @@ def evaluate_check_shard_result(collection, result, skip_index_size = False):
all_shards = result['all_shards']
warnings = 0
print 30 * "-"
+ print "Number of shards: {0}".format(str(len(all_shards)))
for shard in all_shards:
if shard in active_shards:
print "{0}OK{1}: Found active leader replica for {2}" \
@@ -1241,7 +1270,6 @@ def update_state_json(original_collection, collection, config, options):
copy_znode(options, config, "{0}/new_state.json".format(coll_data_dir), "{0}/collections/{1}/state.json".format(solr_znode, collection), copy_from_local=True)
-
def delete_znodes(options, config, service_filter):
solr_znode='/infra-solr'
if is_logsearch_available(config, service_filter):
@@ -1726,13 +1754,65 @@ def check_docs(options, accessor, parser, config):
else:
print "Check number of documents - Not found any collections."
+def run_solr_data_manager_on_collection(options, config, collections, src_collection, target_collection,
+ collections_json_location, num_docs, skip_date_usage = True):
+ if target_collection in collections and src_collection in collections:
+ source_solr_urls = get_solr_urls(options, config, src_collection, collections_json_location)
+ target_solr_urls = get_solr_urls(options, config, target_collection, collections_json_location)
+ if is_collection_empty(num_docs, src_collection):
+ print "Collection '{0}' is empty. Skipping transport data operation.".format(target_collection)
+ else:
+ src_solr_url, target_solr_url = get_input_output_solr_url(source_solr_urls, target_solr_urls)
+ keytab, principal = get_keytab_and_principal(config)
+ date_format = "%Y-%m-%dT%H:%M:%S.%fZ"
+ d = datetime.now() + timedelta(days=365)
+ end = d.strftime(date_format)
+ print "Running solrDataManager.py (solr input collection: {0}, solr output collection: {1})"\
+ .format(src_collection, target_collection)
+ solr_data_manager.verbose = options.verbose
+ solr_data_manager.set_log_level(True)
+ solr_data_manager.save("archive", src_solr_url, src_collection, "evtTime", "id", end,
+ options.transport_read_block_size, options.transport_write_block_size,
+ False, None, None, keytab, principal, False, "none", None, None, None,
+ None, None, None, None, None, target_collection,
+ target_solr_url, "_version_", skip_date_usage)
+ else:
+ print "Collection '{0}' or {1} does not exist or filtered out. Skipping transport data operation.".format(target_collection, src_collection)
+
+def transfer_old_data(options, accessor, parser, config):
+ collections_json_location = COLLECTIONS_DATA_JSON_LOCATION.format("transport_collections.json")
+ collections=list_collections(options, config, collections_json_location, include_number_of_docs=True)
+ collections=filter_collections(options, collections)
+ docs_map = get_number_of_docs_map(collections_json_location) if collections else {}
+ if is_ranger_available(config, service_filter):
+ original_ranger_collection = config.get('ranger_collection', 'ranger_collection_name')
+ backup_ranger_collection = config.get('ranger_collection', 'backup_ranger_collection_name')
+ run_solr_data_manager_on_collection(options, config, collections, backup_ranger_collection,
+ original_ranger_collection, collections_json_location, docs_map, skip_date_usage=False)
+ if is_atlas_available(config, service_filter):
+ original_fulltext_index_name = config.get('atlas_collections', 'fulltext_index_name')
+ backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name')
+ run_solr_data_manager_on_collection(options, config, collections, backup_fulltext_index_name,
+ original_fulltext_index_name, collections_json_location, docs_map)
+
+ original_edge_index_name = config.get('atlas_collections', 'edge_index_name')
+ backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name')
+ run_solr_data_manager_on_collection(options, config, collections, backup_edge_index_name,
+ original_edge_index_name, collections_json_location, docs_map)
+
+ original_vertex_index_name = config.get('atlas_collections', 'vertex_index_name')
+ backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name')
+ run_solr_data_manager_on_collection(options, config, collections, backup_vertex_index_name,
+ original_vertex_index_name, collections_json_location, docs_map)
+
+
if __name__=="__main__":
parser = optparse.OptionParser("usage: %prog [options]")
parser.add_option("-a", "--action", dest="action", type="string", help="delete-collections | backup | cleanup-znodes | backup-and-cleanup | migrate | restore |' \
' rolling-restart-solr | rolling-restart-atlas | rolling-restart-ranger | check-shards | check-backup-shards | enable-solr-authorization | disable-solr-authorization |'\
' fix-solr5-kerberos-config | fix-solr7-kerberos-config | upgrade-solr-clients | upgrade-solr-instances | upgrade-logsearch-portal | upgrade-logfeeders | stop-logsearch |'\
- ' restart-solr |restart-logsearch | restart-ranger | restart-atlas")
+ ' restart-solr |restart-logsearch | restart-ranger | restart-atlas | transport-old-data")
parser.add_option("-i", "--ini-file", dest="ini_file", type="string", help="Config ini file to parse (required)")
parser.add_option("-f", "--force", dest="force", default=False, action="store_true", help="force index upgrade even if it's the right version")
parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="use for verbose logging")
@@ -1747,6 +1827,8 @@ if __name__=="__main__":
parser.add_option("--request-tries", dest="request_tries", type="int", help="number of tries for BACKUP/RESTORE status api calls in the request")
parser.add_option("--request-time-interval", dest="request_time_interval", type="int", help="time interval between BACKUP/RESTORE status api calls in the request")
parser.add_option("--request-async", dest="request_async", action="store_true", default=False, help="skip BACKUP/RESTORE status api calls from the command")
+ parser.add_option("--transport-read-block-size", dest="transport_read_block_size", type="string", help="block size to use for reading from solr during transport",default=10000)
+ parser.add_option("--transport-write-block-size", dest="transport_write_block_size", type="string", help="number of records in the output files during transport", default=100000)
parser.add_option("--include-solr-hosts", dest="include_solr_hosts", type="string", help="comma separated list of included solr hosts")
parser.add_option("--exclude-solr-hosts", dest="exclude_solr_hosts", type="string", help="comma separated list of excluded solr hosts")
parser.add_option("--disable-solr-host-check", dest="disable_solr_host_check", action="store_true", default=False, help="Disable to check solr hosts are good for the collection backups")
@@ -1790,6 +1872,13 @@ if __name__=="__main__":
username = config.get('ambari_server', 'username')
password = config.get('ambari_server', 'password')
accessor = api_accessor(host, username, password, protocol, port)
+
+ if config.has_section('infra_solr') and config.has_option('infra_solr', 'hosts'):
+ local_host = socket.getfqdn()
+ solr_hosts = config.get('infra_solr', 'hosts')
+ if solr_hosts and local_host not in solr_hosts.split(","):
+ print "{0}WARNING{1}: Host '{2}' is not found in Infra Solr hosts ({3}). Migration commands won't work from here." \
+ .format(colors.WARNING, colors.ENDC, local_host, solr_hosts)
if options.action.lower() == 'backup':
backup_ranger_configs(options, config, service_filter)
backup_collections(options, accessor, parser, config, service_filter)
@@ -1881,13 +1970,17 @@ if __name__=="__main__":
check_shards(options, accessor, parser, config, backup_shards=True)
elif options.action.lower() == 'check-docs':
check_docs(options, accessor, parser, config)
+ elif options.action.lower() == 'transport-old-data':
+ check_docs(options, accessor, parser, config)
+ transfer_old_data(options, accessor, parser, config)
+ check_docs(options, accessor, parser, config)
else:
parser.print_help()
print 'action option is invalid (available actions: delete-collections | backup | cleanup-znodes | backup-and-cleanup | migrate | restore |' \
' rolling-restart-solr | rolling-restart-ranger | rolling-restart-atlas | check-shards | check-backup-shards | check-docs | enable-solr-authorization |'\
' disable-solr-authorization | fix-solr5-kerberos-config | fix-solr7-kerberos-config | upgrade-solr-clients | upgrade-solr-instances | upgrade-logsearch-portal |' \
' upgrade-logfeeders | stop-logsearch | restart-solr |' \
- ' restart-logsearch | restart-ranger | restart-atlas)'
+ ' restart-logsearch | restart-ranger | restart-atlas | transport-old-data )'
sys.exit(1)
command_elapsed_time = time.time() - command_start_time
time_to_print = time.strftime("%H:%M:%S", time.gmtime(command_elapsed_time))
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
index b34873d..e02c491 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
@@ -47,7 +47,7 @@ def parse_arguments():
parser = optparse.OptionParser("usage: %prog [options]", version="Solr Data Manager {0}".format(VERSION))
parser.add_option("-m", "--mode", dest="mode", type="string", help="archive | delete | save")
- parser.add_option("-s", "--solr-url", dest="solr_url", type="string", help="the url of the solr server including the port")
+ parser.add_option("-s", "--solr-url", dest="solr_url", type="string", help="the url of the solr server including the port and protocol")
parser.add_option("-c", "--collection", dest="collection", type="string", help="the name of the solr collection")
parser.add_option("-f", "--filter-field", dest="filter_field", type="string", help="the name of the field to filter on")
parser.add_option("-r", "--read-block-size", dest="read_block_size", type="int", help="block size to use for reading from solr",
@@ -90,6 +90,7 @@ def parse_arguments():
parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False)
parser.add_option("--solr-output-collection", dest="solr_output_collection", help="target output solr collection for archive", type="string", default=None)
+ parser.add_option("--solr-output-url", dest="solr_output_url", default=None, type="string", help="the url of the output solr server including the port and protocol")
parser.add_option("--exclude-fields", dest="exclude_fields", help="Comma separated list of excluded fields from json response", type="string", default=None)
parser.add_option("--skip-date-usage", dest="skip_date_usage", action="store_true", default=False, help="datestamp field won't be used for queries (sort based on id field)")
@@ -208,6 +209,8 @@ def parse_arguments():
print(" compression: " + options.compression)
if options.__dict__["solr_output_collection"] is not None:
print(" solr output collection: " + options.solr_output_collection)
+ if options.__dict__["solr_output_url"] is not None:
+ print(" solr output url: " + options.solr_output_collection)
if (options.__dict__["hdfs_keytab"] is not None):
print(" hdfs-keytab: " + options.hdfs_keytab)
print(" hdfs-principal: " + options.hdfs_principal)
@@ -237,11 +240,13 @@ def parse_arguments():
return options
-def set_log_level():
+def set_log_level(disable=False):
if verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
+ if disable:
+ logger.removeHandler(handler)
def get_end(options):
if options.end:
@@ -272,7 +277,7 @@ def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal,
def save(mode, solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size,
ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, json_file,
compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path,
- solr_output_collection, exclude_fields, skip_date_usage):
+ solr_output_collection, solr_output_url, exclude_fields, skip_date_usage):
solr_kinit_command = None
if solr_keytab:
solr_kinit_command = "kinit -kt {0} {1}".format(solr_keytab, solr_principal)
@@ -284,17 +289,18 @@ def save(mode, solr_url, collection, filter_field, id_field, range_end, read_blo
if hdfs_keytab:
hdfs_kinit_command = "sudo -u {0} kinit -kt {1} {2}".format(hdfs_user, hdfs_keytab, hdfs_principal)
- if options.hdfs_path:
+ if hdfs_path:
ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path)
working_dir = get_working_dir(solr_url, collection)
if mode == "archive":
- handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading)
+ handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir,
+ ignore_unfinished_uploading, skip_date_usage)
save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression,
- hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, exclude_fields,
- skip_date_usage)
+ hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, solr_output_url,
+ exclude_fields, skip_date_usage)
def ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path):
if hdfs_kinit_command:
@@ -329,7 +335,7 @@ def get_working_dir(solr_url, collection):
logger.debug("Working directory is %s", working_dir)
return working_dir
-def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading):
+def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading, skip_date_usage):
command_json_path = "{0}/command.json".format(working_dir)
if os.path.isfile(command_json_path):
with open(command_json_path) as command_file:
@@ -345,7 +351,8 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre
logger.info("You may try to run the program with '-g' or '--ignore-unfinished-uploading' to ignore it if it keeps on failing")
if command["upload"]["type"] == "solr":
- upload_file_to_solr(solr_kinit_command, curl_prefix, command["upload"]["command"], command["upload"]["upload_file_path"], command["upload"]["solr_output_collection"])
+ upload_file_to_solr(solr_kinit_command, curl_prefix, command["upload"]["command"], command["upload"]["upload_file_path"],
+ command["upload"]["solr_output_collection"])
elif command["upload"]["type"] == "hdfs":
upload_file_hdfs(hdfs_kinit_command, command["upload"]["command"], command["upload"]["upload_file_path"],
command["upload"]["hdfs_path"], command["upload"]["hdfs_user"])
@@ -361,14 +368,14 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre
if "delete" in command.keys():
delete_data(solr_kinit_command, curl_prefix, command["delete"]["command"], command["delete"]["collection"],
command["delete"]["filter_field"], command["delete"]["id_field"], command["delete"]["prev_lot_end_value"],
- command["delete"]["prev_lot_end_id"])
+ command["delete"]["prev_lot_end_id"], skip_date_usage)
os.remove(command_json_path)
def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file,
compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection,
- exclude_fields, skip_date_usage):
+ solr_output_url, exclude_fields, skip_date_usage):
logger.info("Starting to save data")
tmp_file_path = "{0}/tmp.json".format(working_dir)
@@ -410,7 +417,7 @@ def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_ur
upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user,
hdfs_path, key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection,
- skip_date_usage)
+ solr_output_url, skip_date_usage)
total_records += records
logger.info("A total of %d records are saved", total_records)
@@ -493,7 +500,8 @@ def finish_file(tmp_file, json_file):
def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
- key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection, skip_date_usage):
+ key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection, solr_output_url,
+ skip_date_usage):
if name:
file_name = "{0}_-_{1}_-_{2}_-_{3}".format(collection, name, prev_lot_end_value, prev_lot_end_id).replace(':', '_')
else:
@@ -503,7 +511,8 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr
upload_command = create_command_file(mode, True, working_dir, upload_file_path, solr_url, collection, filter_field,
id_field, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
- key_file_path, bucket, key_prefix, local_path, solr_output_collection, skip_date_usage)
+ key_file_path, bucket, key_prefix, local_path, solr_output_collection, solr_output_url,
+ skip_date_usage)
if solr_output_collection:
upload_file_to_solr(solr_kinit_command, curl_prefix, upload_command, upload_file_path, solr_output_collection)
elif hdfs_user:
@@ -518,10 +527,10 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr
delete_command = create_command_file(mode, False, working_dir, upload_file_path, solr_url, collection, filter_field,
id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None, None,
- skip_date_usage)
+ None, skip_date_usage)
if mode == "archive":
delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
- prev_lot_end_id)
+ prev_lot_end_id, skip_date_usage)
os.remove("{0}/command.json".format(working_dir))
def compress_file(working_dir, tmp_file_path, file_name, compression):
@@ -567,7 +576,7 @@ def compress_file(working_dir, tmp_file_path, file_name, compression):
def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, collection, filter_field, id_field,
prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix,
- local_path, solr_output_collection, skip_date_usage):
+ local_path, solr_output_collection, solr_output_url, skip_date_usage):
commands = {}
if upload:
@@ -577,8 +586,9 @@ def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, c
if upload:
if solr_output_collection:
+ command_url = solr_output_url if solr_output_url else solr_url
upload_command = "{0}/{1}/update/json/docs?commit=true&wt=json --data-binary @{2}"\
- .format(solr_url, solr_output_collection, upload_file_path)
+ .format(command_url, solr_output_collection, upload_file_path)
upload_command_data = {}
upload_command_data["type"] = "solr"
upload_command_data["command"] = upload_command
@@ -726,11 +736,14 @@ def upload_file_to_solr(solr_kinit_command, curl_prefix, upload_command, upload_
logger.info("Save data to collection: %s", collection)
def delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
- prev_lot_end_id):
+ prev_lot_end_id, skip_date_usage):
delete_cmd = delete_command.split(" --data-binary")[0]
delete_query_data = delete_command.split("--data-binary ")[1].replace("+", " ")
query_solr(solr_kinit_command, delete_cmd, "{0} -H Content-Type:text/xml {1}".format(curl_prefix, delete_cmd), "Deleting", delete_query_data)
- logger.info("Deleted data from collection %s where %s,%s < %s,%s", collection, filter_field, id_field, prev_lot_end_value,
+ if skip_date_usage:
+ logger.info("Deleted data from collection %s where %s < %s", collection, id_field, prev_lot_end_id)
+ else:
+ logger.info("Deleted data from collection %s where %s,%s < %s,%s", collection, filter_field, id_field, prev_lot_end_value,
prev_lot_end_id)
def query_solr(solr_kinit_command, url, curl_command, action, data=None):
@@ -800,7 +813,7 @@ if __name__ == '__main__':
options.additional_filter, options.name, options.solr_keytab, options.solr_principal, options.json_file,
options.compression, options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path,
options.key_file_path, options.bucket, options.key_prefix, options.local_path, options.solr_output_collection,
- options.exclude_fields, options.skip_date_usage)
+ options.solr_output_url, options.exclude_fields, options.skip_date_usage)
else:
logger.warn("Unknown mode: %s", options.mode)
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/resources/ambariSolrMigration.sh b/ambari-infra/ambari-infra-solr-client/src/main/resources/ambariSolrMigration.sh
index 938d649..e054a89 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/resources/ambariSolrMigration.sh
+++ b/ambari-infra/ambari-infra-solr-client/src/main/resources/ambariSolrMigration.sh
@@ -18,7 +18,7 @@ function print_help() {
cat << EOF
Usage: /usr/lib/ambari-infra-solr-client/ambariSolrMigration.sh --mode <MODE> --ini-file <ini_file> [additional options]
- -m, --mode <MODE> available migration modes: delete-only | backup-only | migrate-restore | all
+ -m, --mode <MODE> available migration modes: delete-only | backup-only | migrate-restore | all | transport
-i, --ini-file <INI_FILE> ini-file location (used by migrationHelper.py)
-s, --migration-script-location <file> migrateHelper.py location (default: /usr/lib/ambari-infra-solr-client/migrationHelper.py)
-w, --wait-between-steps <seconds> wait between different migration steps in seconds (default: 15)
@@ -89,6 +89,13 @@ function run_migrate_commands() {
start_date=$(date +%s)
+ # execute on: transport
+ if [[ "$mode" == "transport" ]] ; then
+ log_command "$python_location $script_location --ini-file $ini_file --action transport-old-data $verbose_val"
+ $python_location $script_location --ini-file $ini_file --action transport-old-data $verbose_val
+ handle_result "$?" "Transport Old Solr Data" "$python_location" "$start_date"
+ fi
+
# execute on: backup - all
if [[ "$mode" == "backup" || "$mode" == "all" ]] ; then
log_command "$python_location $script_location --ini-file $ini_file --action check-shards $verbose_val $skip_warnings_val"
@@ -97,7 +104,7 @@ function run_migrate_commands() {
fi
# execute on: backup - delete - all
- if [[ "$mode" != "migrate-restore" ]] ; then
+ if [[ "$mode" == "delete" || "$mode" == "backup" || "$mode" == "all" ]] ; then
if [[ "$skip_solr_client_upgrade" != "true" ]]; then
log_command "$python_location $script_location --ini-file $ini_file --action upgrade-solr-clients $verbose_val"
$python_location $script_location --ini-file $ini_file --action upgrade-solr-clients $verbose_val
@@ -302,7 +309,7 @@ function main() {
print_help
exit 1
else
- if [[ "$MODE" == "delete" || "$MODE" == "backup" || "$MODE" == "migrate-restore" || "$MODE" == "all" ]]; then
+ if [[ "$MODE" == "delete" || "$MODE" == "backup" || "$MODE" == "migrate-restore" || "$MODE" == "all" || "$MODE" == "transport" ]]; then
run_migrate_commands "$MODE" "$SCRIPT_LOCATION" "$PYTHON_PATH_FOR_MIGRATION" "$INI_FILE" "$WAIT" "$SKIP_SOLR_CLIENT_UPGRADE" "$SKIP_SOLR_SERVER_UPGRADE" "$SKIP_LOGSEARCH_UPGRADE" "$SKIP_WARNINGS" "$BATCH_INTERVAL" "$KEEP_BACKUP" "$VERBOSE"
else
echo "mode '$MODE' is not supported"