You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/07/09 20:48:27 UTC

[ambari] branch branch-2.7 updated: AMBARI-23945. Infra Solr migration: use async request for collection deletion

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new b8c2108  AMBARI-23945. Infra Solr migration: use async request for collection deletion
b8c2108 is described below

commit b8c2108cad777c4daad716ae11868b9c006592b9
Author: Oliver Szabo <ol...@gmail.com>
AuthorDate: Mon Jul 9 22:44:06 2018 +0200

    AMBARI-23945. Infra Solr migration: use async request for collection deletion
---
 ambari-infra/ambari-infra-solr-client/README.md    |   3 +
 .../src/main/python/migrationHelper.py             | 112 ++++++++++++++++++---
 2 files changed, 102 insertions(+), 13 deletions(-)

diff --git a/ambari-infra/ambari-infra-solr-client/README.md b/ambari-infra/ambari-infra-solr-client/README.md
index b4c4614..a14f92a 100644
--- a/ambari-infra/ambari-infra-solr-client/README.md
+++ b/ambari-infra/ambari-infra-solr-client/README.md
@@ -853,6 +853,9 @@ Options:
                         only if no backup path in the ini file
   --version=INDEX_VERSION
                         lucene index version for migration (6.6.2 or 7.3.1)
+  --solr-async-request-tries=SOLR_ASYNC_REQUEST_TRIES
+                        number of max tries for async Solr requests (e.g.:
+                        delete operation)
   --request-tries=REQUEST_TRIES
                         number of tries for BACKUP/RESTORE status api calls in
                         the request
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
index 56ab9ad..1842333 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
@@ -34,7 +34,7 @@ import ConfigParser
 import solrDataManager as solr_data_manager
 
 from datetime import datetime, timedelta
-from random import randrange
+from random import randrange, randint
 from subprocess import Popen, PIPE
 
 HTTP_PROTOCOL = 'http'
@@ -68,8 +68,9 @@ CREATE_CONFIGURATIONS_URL = '/configurations'
 
 LIST_SOLR_COLLECTION_URL = '{0}/admin/collections?action=LIST&wt=json'
 CREATE_SOLR_COLLECTION_URL = '{0}/admin/collections?action=CREATE&name={1}&collection.configName={2}&numShards={3}&replicationFactor={4}&maxShardsPerNode={5}&wt=json'
-DELETE_SOLR_COLLECTION_URL = '{0}/admin/collections?action=DELETE&name={1}&wt=json'
+DELETE_SOLR_COLLECTION_URL = '{0}/admin/collections?action=DELETE&name={1}&wt=json&async={2}'
 RELOAD_SOLR_COLLECTION_URL = '{0}/admin/collections?action=RELOAD&name={1}&wt=json'
+REQUEST_STATUS_SOLR_COLLECTION_URL = '{0}/admin/collections?action=REQUESTSTATUS&requestid={1}'
 CORE_DETAILS_URL = '{0}replication?command=details&wt=json'
 
 INFRA_SOLR_CLIENT_BASE_PATH = '/usr/lib/ambari-infra-solr-client/'
@@ -818,8 +819,69 @@ def is_logsearch_available(config, service_filter):
   return 'LOGSEARCH' in service_filter and config.has_section('logsearch_collections') \
     and config.has_option('logsearch_collections', 'enabled') and config.get('logsearch_collections', 'enabled') == 'true'
 
-def delete_collection(options, config, collection, solr_urls):
-  request = DELETE_SOLR_COLLECTION_URL.format(get_random_solr_url(solr_urls, options), collection)
+def monitor_solr_async_request(options, config, status_request, request_id):
+  request_status_json_cmd=create_solr_api_request_command(status_request, config)
+  logger.debug("Solr request: {0}".format(status_request))
+  async_request_success_msg = "Async Solr request (id: {0}) {1}COMPLETED{2}".format(request_id, colors.OKGREEN, colors.ENDC)
+  async_request_timeout_msg = "Async Solr request (id: {0}) {1}FAILED{2}".format(request_id, colors.FAIL, colors.ENDC)
+  async_request_fail_msg = "\nAsync Solr request (id: {0}) {1}TIMED OUT{2} (increase --solr-async-request-tries if required, default is 400)".format(request_id, colors.FAIL, colors.ENDC)
+  max_tries = options.solr_async_request_tries if options.solr_async_request_tries else 400
+  tries = 0
+  sys.stdout.write("Start monitoring Solr request with id {0} ...".format(request_id))
+  sys.stdout.flush()
+  async_request_finished = False
+  async_request_failed = False
+  async_request_timed_out = False
+  while not async_request_finished:
+    tries = tries + 1
+    process = Popen(request_status_json_cmd, stdout=PIPE, stderr=PIPE, shell=True)
+    out, err = process.communicate()
+    if process.returncode != 0:
+      raise Exception("{0} command failed: {1}".format(request_status_json_cmd, str(err)))
+    else:
+      response=json.loads(str(out))
+      logger.debug(response)
+      if 'status' in response:
+        async_state=response['status']['state']
+        async_msg=response['status']['msg']
+        if async_state == "completed":
+          async_request_finished = True
+          sys.stdout.write("\nSolr response message: {0}\n".format(async_msg))
+          sys.stdout.flush()
+        elif async_state == "failed":
+          async_request_finished = True
+          async_request_failed = True
+          sys.stdout.write("\nSolr response message: {0}\n".format(async_msg))
+          sys.stdout.flush()
+        else:
+          if not options.verbose:
+            sys.stdout.write(".")
+            sys.stdout.flush()
+          logger.debug(str(async_msg))
+          logger.debug("Sleep 5 seconds ...")
+          time.sleep(5)
+      else:
+        raise Exception("The 'status' field is missing from the response: {0}".format(response))
+    if tries == max_tries:
+      async_request_finished = True
+      async_request_timed_out = True
+
+  if async_request_failed:
+    if async_request_timed_out:
+      print async_request_timeout_msg
+      sys.exit(1)
+    else:
+      print async_request_fail_msg
+      sys.exit(1)
+  else:
+    print async_request_success_msg
+    return request_id
+
+
+def delete_collection(options, config, collection, solr_urls, response_data_map):
+  async_id = str(randint(1000,100000))
+  solr_url = get_random_solr_url(solr_urls, options)
+  request = DELETE_SOLR_COLLECTION_URL.format(solr_url, collection, async_id)
   logger.debug("Solr request: {0}".format(request))
   delete_collection_json_cmd=create_solr_api_request_command(request, config)
   process = Popen(delete_collection_json_cmd, stdout=PIPE, stderr=PIPE, shell=True)
@@ -827,8 +889,10 @@ def delete_collection(options, config, collection, solr_urls):
   if process.returncode != 0:
     raise Exception("{0} command failed: {1}".format(delete_collection_json_cmd, str(err)))
   response=json.loads(str(out))
-  if 'success' in response:
-    print 'Deleting collection {0} was {1}SUCCESSFUL{2}'.format(collection, colors.OKGREEN, colors.ENDC)
+  if 'requestid' in response:
+    print 'Deleting collection {0} request sent. {1}DONE{2}'.format(collection, colors.OKGREEN, colors.ENDC)
+    response_data_map['request_id']=response['requestid']
+    response_data_map['status_request']=REQUEST_STATUS_SOLR_COLLECTION_URL.format(solr_url, response['requestid'])
     return collection
   else:
     raise Exception("DELETE collection ('{0}') failed. Response: {1}".format(collection, str(out)))
@@ -980,17 +1044,26 @@ def delete_logsearch_collections(options, config, collections_json_location, col
   history_collection = config.get('logsearch_collections', 'history_collection_name')
   if service_logs_collection in collections:
     solr_urls = get_solr_urls(options, config, service_logs_collection, collections_json_location)
-    retry(delete_collection, options, config, service_logs_collection, solr_urls, context='[Delete {0} collection]'.format(service_logs_collection))
+    response_map={}
+    retry(delete_collection, options, config, service_logs_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(service_logs_collection))
+    retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'],
+          context="[Monitor Solr async request, id: {0}]".format(response_map['request_id']))
   else:
     print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(service_logs_collection)
   if audit_logs_collection in collections:
     solr_urls = get_solr_urls(options, config, audit_logs_collection, collections_json_location)
-    retry(delete_collection, options, config, audit_logs_collection, solr_urls, context='[Delete {0} collection]'.format(audit_logs_collection))
+    response_map={}
+    retry(delete_collection, options, config, audit_logs_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(audit_logs_collection))
+    retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'],
+          context="[Monitor Solr async request, id: {0}]".format(response_map['request_id']))
   else:
     print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(audit_logs_collection)
   if history_collection in collections:
     solr_urls = get_solr_urls(options, config, history_collection, collections_json_location)
-    retry(delete_collection, options, config, history_collection, solr_urls, context='[Delete {0} collection]'.format(history_collection))
+    response_map={}
+    retry(delete_collection, options, config, history_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(history_collection))
+    retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'],
+          context="[Monitor Solr async request, id: {0}]".format(response_map['request_id']))
   else:
     print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(history_collection)
 
@@ -1000,17 +1073,26 @@ def delete_atlas_collections(options, config, collections_json_location, collect
   vertex_index_collection = config.get('atlas_collections', 'vertex_index_name')
   if fulltext_collection in collections:
     solr_urls = get_solr_urls(options, config, fulltext_collection, collections_json_location)
-    retry(delete_collection, options, config, fulltext_collection, solr_urls, context='[Delete {0} collection]'.format(fulltext_collection))
+    response_map={}
+    retry(delete_collection, options, config, fulltext_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(fulltext_collection))
+    retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'],
+          context="[Monitor Solr async request, id: {0}]".format(response_map['request_id']))
   else:
     print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(fulltext_collection)
   if edge_index_collection in collections:
     solr_urls = get_solr_urls(options, config, edge_index_collection, collections_json_location)
-    retry(delete_collection, options, config, edge_index_collection, solr_urls, context='[Delete {0} collection]'.format(edge_index_collection))
+    response_map={}
+    retry(delete_collection, options, config, edge_index_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(edge_index_collection))
+    retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'],
+          context="[Monitor Solr async request, id: {0}]".format(response_map['request_id']))
   else:
     print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(edge_index_collection)
   if vertex_index_collection in collections:
     solr_urls = get_solr_urls(options, config, vertex_index_collection, collections_json_location)
-    retry(delete_collection, options, config, vertex_index_collection, solr_urls, context='[Delete {0} collection]'.format(vertex_index_collection))
+    response_map={}
+    retry(delete_collection, options, config, vertex_index_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(vertex_index_collection))
+    retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'],
+          context="[Monitor Solr async request, id: {0}]".format(response_map['request_id']))
   else:
     print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(vertex_index_collection)
 
@@ -1018,7 +1100,10 @@ def delete_ranger_collection(options, config, collections_json_location, collect
   ranger_collection_name = config.get('ranger_collection', 'ranger_collection_name')
   if ranger_collection_name in collections:
     solr_urls = get_solr_urls(options, config, ranger_collection_name, collections_json_location)
-    retry(delete_collection, options, config, ranger_collection_name, solr_urls, context='[Delete {0} collection]'.format(ranger_collection_name))
+    response_map={}
+    retry(delete_collection, options, config, ranger_collection_name, solr_urls, response_map, context='[Delete {0} collection]'.format(ranger_collection_name))
+    retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'],
+          context="[Monitor Solr async request, id: {0}]".format(response_map['request_id']))
   else:
     print 'Collection {0} does not exist or filtered out. Skipping delete operation'.format(ranger_collection_name)
 
@@ -1824,6 +1909,7 @@ if __name__=="__main__":
   parser.add_option("--ranger-index-location", dest="ranger_index_location", type="string", help="location of the index backups (for ranger). required only if no backup path in the ini file")
 
   parser.add_option("--version", dest="index_version", type="string", default="6.6.2", help="lucene index version for migration (6.6.2 or 7.3.1)")
+  parser.add_option("--solr-async-request-tries", dest="solr_async_request_tries", type="int", default=400,  help="number of max tries for async Solr requests (e.g.: delete operation)")
   parser.add_option("--request-tries", dest="request_tries", type="int", help="number of tries for BACKUP/RESTORE status api calls in the request")
   parser.add_option("--request-time-interval", dest="request_time_interval", type="int", help="time interval between BACKUP/RESTORE status api calls in the request")
   parser.add_option("--request-async", dest="request_async", action="store_true", default=False, help="skip BACKUP/RESTORE status api calls from the command")