You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/06/17 15:59:07 UTC

[ambari] 02/02: AMBARI-23945. Infra Solr migration - Add filter-cores support during restore

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit 9f9042c17e3edbac68df2b6ba48b4b980e92b858
Author: Oliver Szabo <ol...@gmail.com>
AuthorDate: Sun Jun 17 17:55:20 2018 +0200

    AMBARI-23945. Infra Solr migration - Add filter-cores support during restore
---
 .../src/main/python/migrationHelper.py             | 61 +++++++++++++++++++---
 1 file changed, 53 insertions(+), 8 deletions(-)

diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
index ed8d34c..4b9c344 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
@@ -870,7 +870,38 @@ def upgrade_ranger_solrconfig_xml(options, config, service_filter):
     copy_znode(options, config, "{0}/configs/{1}/solrconfig.xml".format(solr_znode, ranger_config_set_name),
                "{0}/configs/{1}/solrconfig.xml".format(solr_znode, backup_ranger_config_set_name))
 
-def update_state_json(collection, config, options):
+def generate_core_pairs(original_collection, collection, config, options):
+  core_pairs_data={}
+
+  original_cores={}
+  original_collections_data = get_collections_data(COLLECTIONS_DATA_JSON_LOCATION.format("backup_collections.json"))
+  if original_collection in original_collections_data and 'leaderCoreHostMap' in original_collections_data[original_collection]:
+    original_cores = original_collections_data[original_collection]['leaderCoreHostMap']
+
+  sorted_original_cores=[]
+  for key in sorted(original_cores):
+    sorted_original_cores.append((key, original_cores[key]))
+
+  new_cores={}
+  collections_data = get_collections_data(COLLECTIONS_DATA_JSON_LOCATION.format("restore_collections.json"))
+  if collection in collections_data and 'leaderCoreHostMap' in collections_data[collection]:
+    new_cores = collections_data[collection]['leaderCoreHostMap']
+
+  sorted_new_cores=[]
+  for key in sorted(new_cores):
+    sorted_new_cores.append((key, new_cores[key]))
+
+  if len(new_cores) < len(original_cores):
+    raise Exception("Old collection core size is: " + str(len(new_cores)) +
+                    ". You will need at least: " + str(len(original_cores)))
+  else:
+    for index, original_core_data in enumerate(sorted_original_cores):
+      core_pairs_data[sorted_new_cores[index][0]]=original_core_data[0]
+    with open(COLLECTIONS_DATA_JSON_LOCATION.format(collection + "/restore_core_pairs.json"), 'w') as outfile:
+      json.dump(core_pairs_data, outfile)
+    return core_pairs_data
+
+def update_state_json(original_collection, collection, config, options):
   solr_znode='/infra-solr'
   if config.has_section('infra_solr') and config.has_option('infra_solr', 'znode'):
     solr_znode=config.get('infra_solr', 'znode')
@@ -884,17 +915,25 @@ def update_state_json(collection, config, options):
   json_file_list=glob.glob("{0}/*.json".format(coll_data_dir))
   logger.debug("Downloaded json files list: {0}".format(str(json_file_list)))
 
-  cores_data_json_list = [k for k in json_file_list if 'state.json' not in k]
-  state_json_list = [k for k in json_file_list if 'state.json' in k]
+  cores_data_json_list = [k for k in json_file_list if 'state.json' not in k and 'new_state.json' not in k and 'restore_core_pairs.json' not in k]
+  state_json_list = [k for k in json_file_list if '/state.json' in k]
 
   if not cores_data_json_list:
     raise Exception('Cannot find any downloaded restore core metadata for {0}'.format(collection))
   if not state_json_list:
     raise Exception('Cannot find any downloaded restore collection state metadata for {0}'.format(collection))
 
+  core_pairs = generate_core_pairs(original_collection, collection, config, options)
+  cores_to_skip = []
+  logger.debug("Generated core pairs: {0}".format(str(core_pairs)))
+  if options.skip_cores:
+    cores_to_skip = options.skip_cores.split(',')
+  logger.debug("Cores to skip: {0}".format(str(cores_to_skip)))
+
   state_json_file=state_json_list[0]
   state_data = read_json(state_json_file)
   core_json_data=[]
+
   for core_data_json_file in cores_data_json_list:
     core_json_data.append(read_json(core_data_json_file))
 
@@ -918,7 +957,9 @@ def update_state_json(collection, config, options):
       data_dir = core_data['dataDir'] if 'dataDir' in core_data else None
       ulog_dir = core_data['ulogDir'] if 'ulogDir' in core_data else None
 
-      if replica in core_details:
+      if cores_to_skip and (core in cores_to_skip or (core in core_pairs and core_pairs[core] in cores_to_skip)):
+        print "Skipping core '{0}' as it is in skip-cores list (or its original pair: '{1}')".format(core, core_pairs[core])
+      elif replica in core_details:
         old_core_node=core_details[replica]['core_node']
         new_core_node=core_details[replica]['new_core_node']
 
@@ -1269,25 +1310,29 @@ def update_state_jsons(options, accessor, parser, config, service_filter):
   collections=list_collections(options, config, COLLECTIONS_DATA_JSON_LOCATION.format("collections.json"))
   collections=filter_collections(options, collections)
   if is_ranger_available(config, service_filter):
+    original_ranger_collection = config.get('ranger_collection', 'ranger_collection_name')
     backup_ranger_collection = config.get('ranger_collection', 'backup_ranger_collection_name')
     if backup_ranger_collection in collections:
-      update_state_json(backup_ranger_collection, config, options)
+      update_state_json(original_ranger_collection, backup_ranger_collection, config, options)
     else:
       print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_ranger_collection)
   if is_atlas_available(config, service_filter):
+    original_fulltext_index_name = config.get('atlas_collections', 'fulltext_index_name')
     backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name')
     if backup_fulltext_index_name in collections:
-      update_state_json(backup_fulltext_index_name, config, options)
+      update_state_json(original_fulltext_index_name, backup_fulltext_index_name, config, options)
     else:
       print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_fulltext_index_name)
+    original_edge_index_name = config.get('atlas_collections', 'edge_index_name')
     backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name')
     if backup_edge_index_name in collections:
-      update_state_json(backup_edge_index_name, config, options)
+      update_state_json(original_edge_index_name, backup_edge_index_name, config, options)
     else:
       print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_edge_index_name)
+    original_vertex_index_name = config.get('atlas_collections', 'vertex_index_name')
     backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name')
     if backup_vertex_index_name in collections:
-      update_state_json(backup_vertex_index_name, config, options)
+      update_state_json(original_vertex_index_name, backup_vertex_index_name, config, options)
     else:
       print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_fulltext_index_name)
 

-- 
To stop receiving notification emails like this one, please contact
oleewere@apache.org.