You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/06/17 15:59:07 UTC
[ambari] 02/02: AMBARI-23945. Infra Solr migration - Add
filter-cores support during restore
This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
commit 9f9042c17e3edbac68df2b6ba48b4b980e92b858
Author: Oliver Szabo <ol...@gmail.com>
AuthorDate: Sun Jun 17 17:55:20 2018 +0200
AMBARI-23945. Infra Solr migration - Add filter-cores support during restore
---
.../src/main/python/migrationHelper.py | 61 +++++++++++++++++++---
1 file changed, 53 insertions(+), 8 deletions(-)
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
index ed8d34c..4b9c344 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
@@ -870,7 +870,38 @@ def upgrade_ranger_solrconfig_xml(options, config, service_filter):
copy_znode(options, config, "{0}/configs/{1}/solrconfig.xml".format(solr_znode, ranger_config_set_name),
"{0}/configs/{1}/solrconfig.xml".format(solr_znode, backup_ranger_config_set_name))
-def update_state_json(collection, config, options):
+def generate_core_pairs(original_collection, collection, config, options):
+ core_pairs_data={}
+
+ original_cores={}
+ original_collections_data = get_collections_data(COLLECTIONS_DATA_JSON_LOCATION.format("backup_collections.json"))
+ if original_collection in original_collections_data and 'leaderCoreHostMap' in original_collections_data[original_collection]:
+ original_cores = original_collections_data[original_collection]['leaderCoreHostMap']
+
+ sorted_original_cores=[]
+ for key in sorted(original_cores):
+ sorted_original_cores.append((key, original_cores[key]))
+
+ new_cores={}
+ collections_data = get_collections_data(COLLECTIONS_DATA_JSON_LOCATION.format("restore_collections.json"))
+ if collection in collections_data and 'leaderCoreHostMap' in collections_data[collection]:
+ new_cores = collections_data[collection]['leaderCoreHostMap']
+
+ sorted_new_cores=[]
+ for key in sorted(new_cores):
+ sorted_new_cores.append((key, new_cores[key]))
+
+ if len(new_cores) < len(original_cores):
+ raise Exception("Old collection core size is: " + str(len(new_cores)) +
+ ". You will need at least: " + str(len(original_cores)))
+ else:
+ for index, original_core_data in enumerate(sorted_original_cores):
+ core_pairs_data[sorted_new_cores[index][0]]=original_core_data[0]
+ with open(COLLECTIONS_DATA_JSON_LOCATION.format(collection + "/restore_core_pairs.json"), 'w') as outfile:
+ json.dump(core_pairs_data, outfile)
+ return core_pairs_data
+
+def update_state_json(original_collection, collection, config, options):
solr_znode='/infra-solr'
if config.has_section('infra_solr') and config.has_option('infra_solr', 'znode'):
solr_znode=config.get('infra_solr', 'znode')
@@ -884,17 +915,25 @@ def update_state_json(collection, config, options):
json_file_list=glob.glob("{0}/*.json".format(coll_data_dir))
logger.debug("Downloaded json files list: {0}".format(str(json_file_list)))
- cores_data_json_list = [k for k in json_file_list if 'state.json' not in k]
- state_json_list = [k for k in json_file_list if 'state.json' in k]
+ cores_data_json_list = [k for k in json_file_list if 'state.json' not in k and 'new_state.json' not in k and 'restore_core_pairs.json' not in k]
+ state_json_list = [k for k in json_file_list if '/state.json' in k]
if not cores_data_json_list:
raise Exception('Cannot find any downloaded restore core metadata for {0}'.format(collection))
if not state_json_list:
raise Exception('Cannot find any downloaded restore collection state metadata for {0}'.format(collection))
+ core_pairs = generate_core_pairs(original_collection, collection, config, options)
+ cores_to_skip = []
+ logger.debug("Generated core pairs: {0}".format(str(core_pairs)))
+ if options.skip_cores:
+ cores_to_skip = options.skip_cores.split(',')
+ logger.debug("Cores to skip: {0}".format(str(cores_to_skip)))
+
state_json_file=state_json_list[0]
state_data = read_json(state_json_file)
core_json_data=[]
+
for core_data_json_file in cores_data_json_list:
core_json_data.append(read_json(core_data_json_file))
@@ -918,7 +957,9 @@ def update_state_json(collection, config, options):
data_dir = core_data['dataDir'] if 'dataDir' in core_data else None
ulog_dir = core_data['ulogDir'] if 'ulogDir' in core_data else None
- if replica in core_details:
+ if cores_to_skip and (core in cores_to_skip or (core in core_pairs and core_pairs[core] in cores_to_skip)):
+ print "Skipping core '{0}' as it is in skip-cores list (or its original pair: '{1}')".format(core, core_pairs[core])
+ elif replica in core_details:
old_core_node=core_details[replica]['core_node']
new_core_node=core_details[replica]['new_core_node']
@@ -1269,25 +1310,29 @@ def update_state_jsons(options, accessor, parser, config, service_filter):
collections=list_collections(options, config, COLLECTIONS_DATA_JSON_LOCATION.format("collections.json"))
collections=filter_collections(options, collections)
if is_ranger_available(config, service_filter):
+ original_ranger_collection = config.get('ranger_collection', 'ranger_collection_name')
backup_ranger_collection = config.get('ranger_collection', 'backup_ranger_collection_name')
if backup_ranger_collection in collections:
- update_state_json(backup_ranger_collection, config, options)
+ update_state_json(original_ranger_collection, backup_ranger_collection, config, options)
else:
print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_ranger_collection)
if is_atlas_available(config, service_filter):
+ original_fulltext_index_name = config.get('atlas_collections', 'fulltext_index_name')
backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name')
if backup_fulltext_index_name in collections:
- update_state_json(backup_fulltext_index_name, config, options)
+ update_state_json(original_fulltext_index_name, backup_fulltext_index_name, config, options)
else:
print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_fulltext_index_name)
+ original_edge_index_name = config.get('atlas_collections', 'edge_index_name')
backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name')
if backup_edge_index_name in collections:
- update_state_json(backup_edge_index_name, config, options)
+ update_state_json(original_edge_index_name, backup_edge_index_name, config, options)
else:
print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_edge_index_name)
+ original_vertex_index_name = config.get('atlas_collections', 'vertex_index_name')
backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name')
if backup_vertex_index_name in collections:
- update_state_json(backup_vertex_index_name, config, options)
+ update_state_json(original_vertex_index_name, backup_vertex_index_name, config, options)
else:
print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_fulltext_index_name)
--
To stop receiving notification emails like this one, please contact
oleewere@apache.org.