You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/05/13 18:20:53 UTC

[ambari] branch trunk updated (7bedbef -> c17b9f8)

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git.


    from 7bedbef  AMBARI-23424. Need to add new property for Ranger-Tagsync when enabling federation for Namenode-HA via UI wizard (akovalenko)
     new a3c06c4  AMBARI-23822. Infra Solr: Migration script does not backup/restore all of the cores.
     new c17b9f8  AMBARI-23822. Infra Solr: Add restore support if index is on HDFS.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/main/python/migrationHelper.py             |  13 +-
 .../docker/knox/logsearch/1.0.0/service.xml        |   9 +-
 .../AMBARI_INFRA_SOLR/0.1.0/metainfo.xml           |   3 +-
 .../0.1.0/package/scripts/collection.py            | 268 ++++++++++++++----
 .../0.1.0/package/scripts/command_commons.py       | 314 +++++++++++++++++----
 .../0.1.0/package/scripts/migrate.py               |   5 -
 .../package/templates/infra-solr-security.json.j2  |   6 +
 7 files changed, 498 insertions(+), 120 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
oleewere@apache.org.

[ambari] 01/02: AMBARI-23822. Infra Solr: Migration script does not backup/restore all of the cores.

Posted by ol...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit a3c06c4495f40783b3630f8c5aa5f85137646001
Author: Oliver Szabo <ol...@gmail.com>
AuthorDate: Fri May 11 12:37:28 2018 +0200

    AMBARI-23822. Infra Solr: Migration script does not backup/restore all of the cores.
---
 .../src/main/python/migrationHelper.py             |   8 +-
 .../docker/knox/logsearch/1.0.0/service.xml        |   9 +-
 .../0.1.0/package/scripts/collection.py            | 215 +++++++++++++++-----
 .../0.1.0/package/scripts/command_commons.py       | 224 +++++++++++++++------
 .../0.1.0/package/scripts/migrate.py               |   5 -
 .../package/templates/infra-solr-security.json.j2  |   6 +
 6 files changed, 342 insertions(+), 125 deletions(-)

diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
index 447d66e..8c7f6b1 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
@@ -144,7 +144,10 @@ def fill_parameters(options):
     params['solr_check_hosts'] = False
   if options.core_filter:
     params['solr_core_filter'] = options.core_filter
-
+  if options.core_filter:
+    params['solr_skip_cores'] = options.skip_cores
+  if options.solr_shards:
+    params['solr_shards'] = options.solr_shards
   return params
 
 def validte_common_options(options, parser):
@@ -230,7 +233,8 @@ if __name__=="__main__":
   parser.add_option("--solr-hosts", dest="solr_hosts", type="string", help="comma separated list of solr hosts")
   parser.add_option("--disable-solr-host-check", dest="disable_solr_host_check", action="store_true", default=False, help="Disable to check solr hosts are good for the collection backups")
   parser.add_option("--core-filter", dest="core_filter", default=None, type="string", help="core filter for replica folders")
-
+  parser.add_option("--skip-cores", dest="skip_cores", default=None, type="string", help="specific cores to skip (comma separated)")
+  parser.add_option("--shards", dest="solr_shards", type="int", default=0, help="number of shards (required to set properly for restore)")
   (options, args) = parser.parse_args()
 
   protocol = 'https' if options.ssl else 'http'
diff --git a/ambari-logsearch/docker/knox/logsearch/1.0.0/service.xml b/ambari-logsearch/docker/knox/logsearch/1.0.0/service.xml
index c728b07..566092e 100644
--- a/ambari-logsearch/docker/knox/logsearch/1.0.0/service.xml
+++ b/ambari-logsearch/docker/knox/logsearch/1.0.0/service.xml
@@ -16,13 +16,6 @@
   limitations under the License.
 -->
 <service role="LOGSEARCH" name="logsearch" version="1.0.0">
-
-  <policies>
-    <policy role="webappsec"/>
-    <policy role="authentication" name="Anonymous"/>
-    <policy role="rewrite"/>
-    <policy role="authorization"/>
-  </policies>
   <routes>
 
     <route path="/logsearch">
@@ -44,4 +37,4 @@
 
   <dispatch classname="org.apache.hadoop.gateway.dispatch.PassAllHeadersDispatch"/>
 
-</service>
\ No newline at end of file
+</service>
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
index 316a232..e32bc1f 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
@@ -20,28 +20,49 @@ import time
 from resource_management.core.logger import Logger
 from resource_management.core.resources.system import Directory, Execute, File
 from resource_management.libraries.functions.format import format
+from resource_management.libraries.resources.properties_file import PropertiesFile
 
-def backup_collection(env):
-    """
-    Backup collections using replication API (as Solr Cloud Backup API is not available in Solr 5)
-    """
-    import params, command_commons
-    env.set_params(command_commons)
-
-    Directory(command_commons.index_location,
-              mode=0755,
-              cd_access='a',
-              owner=params.infra_solr_user,
-              group=params.user_group
-              )
-    collection_available = command_commons.is_collection_available_on_host()
-    if not collection_available:
-      Logger.info(format("No any '{collection}' replica is used on {params.hostname} host"))
-      return
 
-    Logger.info(format("Backup Solr Collection {collection} to {index_location}"))
-
-    solr_request_path = format("{collection}/replication?command=BACKUP&location={index_location}&name={backup_name}&wt=json")
+def backup_collection(env):
+  """
+  Backup collections using replication API (as Solr Cloud Backup API is not available in Solr 5)
+  If the cluster is not kerberized, it will be needed to resolve ip addresses to hostnames (as SOLR_HOST=`hostname -f` is not used by default in infra-solr-env)
+  """
+  import params, command_commons
+  env.set_params(command_commons)
+
+  Directory(command_commons.index_location,
+            mode=0755,
+            cd_access='a',
+            owner=params.infra_solr_user,
+            group=params.user_group
+            )
+  host_cores_data_map = command_commons.get_host_cores_for_collection()
+
+  Logger.info(format("Backup Solr Collection {collection} to {index_location}"))
+
+  host_core_map = host_cores_data_map[command_commons.HOST_CORES]
+
+  host_or_ip = params.hostname
+  # IP resolve - for unsecure cluster
+  host_ip_pairs = {}
+  if not params.security_enabled:
+    keys = host_core_map.keys()
+    for key in keys:
+      if command_commons.is_ip(key):
+        resolved_hostname = command_commons.resolve_ip_to_hostname(key)
+        host_ip_pairs[resolved_hostname] = key
+
+  if params.hostname in host_ip_pairs:
+    host_or_ip = host_ip_pairs[params.hostname]
+
+  cores = host_core_map[host_or_ip] if host_or_ip in host_core_map else []
+
+  for core in cores:
+    if core in command_commons.skip_cores:
+      Logger.info(format("Core '{core}' is filtered out."))
+      continue
+    solr_request_path = format("{core}/replication?command=BACKUP&location={index_location}&name={core}&wt=json")
     backup_api_cmd = command_commons.create_solr_api_request_command(solr_request_path)
 
     Execute(backup_api_cmd, user=params.infra_solr_user, logoutput=True)
@@ -50,41 +71,127 @@ def backup_collection(env):
       Logger.info("Sleep 5 seconds to wait until the backup request is executed.")
       time.sleep(5)
       Logger.info("Check backup status ...")
-      solr_status_request_path = format("{collection}/replication?command=details&wt=json")
+      solr_status_request_path = format("{core}/replication?command=details&wt=json")
       status_check_json_output = format("{index_location}/backup_status.json")
-      status_check_cmd = command_commons.create_solr_api_request_command(solr_status_request_path, status_check_json_output)
-      command_commons.snapshot_status_check(status_check_cmd, status_check_json_output, command_commons.backup_name, True,
-        log_output=command_commons.log_output, tries=command_commons.request_tries, time_interval=command_commons.request_time_interval)
-
-def restore_collection(env):
-    """
-    Restore collections using replication API (as Solr Cloud Backup API is not available in Solr 5)
-    """
-    import params, command_commons
-    env.set_params(command_commons)
-
-    collection_available = command_commons.is_collection_available_on_host()
-    if command_commons.check_hosts and not collection_available:
-      Logger.info(format("No any '{collection}' replica is used on {params.hostname} host"))
-      return
-
-    Logger.info(format("Remove write.lock files from folder '{index_location}'"))
-    for write_lock_file in command_commons.get_files_by_pattern(format("{index_location}"), 'write.lock'):
-      File(write_lock_file, action="delete")
+      status_check_cmd = command_commons.create_solr_api_request_command(solr_status_request_path,
+                                                                         status_check_json_output)
+      command_commons.snapshot_status_check(status_check_cmd, status_check_json_output, core, True,
+                                            log_output=command_commons.log_output, tries=command_commons.request_tries,
+                                            time_interval=command_commons.request_time_interval)
 
-    Logger.info(format("Restore Solr Collection {collection} from {index_location}"))
 
-    solr_request_path = format("{collection}/replication?command=RESTORE&location={index_location}&name={backup_name}&wt=json")
-    restore_api_cmd = command_commons.create_solr_api_request_command(solr_request_path)
-
-    Execute(restore_api_cmd, user=params.infra_solr_user, logoutput=True)
+def restore_collection(env):
+  """
+  Restore collections - by copying snapshots with backup_* prefix, then remove old one and remove backup_* prefixes from the folder names.
+  """
+  import params, command_commons
+  env.set_params(command_commons)
+
+  host_cores_backup_map = command_commons.read_backup_json()
+  host_cores_map = command_commons.get_host_cores_for_collection(backup=False)
+
+  original_core_host_pairs = command_commons.sort_core_host_pairs(host_cores_backup_map[command_commons.CORE_HOST])
+  new_core_host_pairs = command_commons.sort_core_host_pairs(host_cores_map[command_commons.CORE_HOST])
+
+  core_pairs = command_commons.create_core_pairs(original_core_host_pairs, new_core_host_pairs)
+  Logger.info("Generated core pairs: " + str(core_pairs))
+
+  Logger.info(format("Remove write.lock files from folder '{index_location}'"))
+  for write_lock_file in command_commons.get_files_by_pattern(format("{index_location}"), 'write.lock'):
+    File(write_lock_file, action="delete")
+
+  Logger.info(format("Restore Solr Collection {collection} from {index_location} ..."))
+
+  if command_commons.collection in ["ranger_audits", "history", "hadoop_logs", "audit_logs",
+                                    "vertex_index", "edge_index",
+                                    "fulltext_index"]:  # Make sure ambari wont delete an important collection
+    raise Exeption(format(
+      "Selected collection for restore is: {collection}. It is not recommended to restore on default collections."))
+
+  for core_data in core_pairs:
+    src_core = core_data['src_core']
+    target_core = core_data['target_core']
+
+    if src_core in command_commons.skip_cores:
+      Logger.info(format("Core '{src_core}' (src) is filtered out."))
+      continue
+    elif target_core in command_commons.skip_cores:
+      Logger.info(format("Core '{target_core}' (target) is filtered out."))
+      continue
+
+    core_data = host_cores_map[command_commons.CORE_DATA]
+    only_if_cmd = format("test -d {index_location}/snapshot.{src_core}")
+    core_root_dir = format("{solr_datadir}/backup_{target_core}")
+    core_root_without_backup_dir = format("{solr_datadir}/{target_core}")
+
+    Directory([format("{core_root_dir}/data/index"),
+               format("{core_root_dir}/data/tlog"),
+               format("{core_root_dir}/data/snapshot_metadata")],
+              mode=0755,
+              cd_access='a',
+              create_parents=True,
+              owner=params.infra_solr_user,
+              group=params.user_group,
+              only_if=only_if_cmd
+              )
 
-    if command_commons.request_async is False:
-      Logger.info("Sleep 5 seconds to wait until the restore request is executed.")
-      time.sleep(5)
-      Logger.info("Check restore status ...")
-      solr_status_request_path = format("{collection}/replication?command=restorestatus&wt=json")
-      status_check_json_output = format("{index_location}/restore_status.json")
-      status_check_cmd = command_commons.create_solr_api_request_command(solr_status_request_path, status_check_json_output)
-      command_commons.snapshot_status_check(status_check_cmd, status_check_json_output, command_commons.backup_name, False,
-        log_output=command_commons.log_output, tries=command_commons.request_tries, time_interval=command_commons.request_time_interval)
\ No newline at end of file
+    core_details = core_data[target_core]
+    core_properties = {}
+    core_properties['numShards'] = command_commons.solr_num_shards
+    core_properties['collection.configName'] = "ranger_audits"
+    core_properties['name'] = target_core
+    core_properties['replicaType'] = core_details['type']
+    core_properties['collection'] = command_commons.collection
+    core_properties['coreNodeName'] = core_details['node']
+    core_properties['shard'] = core_details['shard']
+
+    copy_cmd = format(
+      "mv {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/") if command_commons.solr_keep_backup \
+      else format("cp -r {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/")
+    Execute(
+      copy_cmd, only_if=only_if_cmd,
+      user=params.infra_solr_user,
+      logoutput=True
+    )
+    PropertiesFile(
+      core_root_dir + '/core.properties',
+      properties=core_properties,
+      owner=params.infra_solr_user,
+      group=params.user_group,
+      mode=0644,
+      only_if=only_if_cmd
+    )
+
+  Execute(format("rm -rf {solr_datadir}/{collection}*"),
+          user=params.infra_solr_user,
+          logoutput=True)
+
+  for core_data in core_pairs:
+    src_core = core_data['src_core']
+    target_core = core_data['target_core']
+
+    if src_core in command_commons.skip_cores:
+      Logger.info(format("Core '{src_core}' (src) is filtered out."))
+      continue
+    elif target_core in command_commons.skip_cores:
+      Logger.info(format("Core '{target_core}' (target) is filtered out."))
+      continue
+
+    core_root_dir = format("{solr_datadir}/backup_{target_core}")
+    core_root_without_backup_dir = format("{solr_datadir}/{target_core}")
+    Execute(
+      format("mv {core_root_dir} {core_root_without_backup_dir}"),
+      user=params.infra_solr_user,
+      logoutput=True,
+      only_if=format("test -d {core_root_dir}")
+    )
+
+    Directory([format("{core_root_without_backup_dir}")],
+              mode=0755,
+              cd_access='a',
+              create_parents=True,
+              owner=params.infra_solr_user,
+              group=params.user_group,
+              recursive_ownership=True,
+              only_if=format("test -d {core_root_without_backup_dir}")
+              )
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
index b07a9ea..f7dc92e 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
@@ -20,10 +20,10 @@ import fnmatch
 import json
 import os
 import params
+import socket
 import time
 import traceback
 
-from resource_management.core.exceptions import ExecutionFailed
 from resource_management.core.logger import Logger
 from resource_management.core.resources.system import Execute
 from resource_management.libraries.functions.default import default
@@ -51,6 +51,9 @@ debug = default("/commandParams/solr_migrate_debug", True)
 # used for filtering folders in backup location (like: if the filter is ranger, that will include snapshot.ranger folder but won't include snapshot.hadoop_logs)
 core_filter = default("/commandParams/solr_core_filter", None)
 
+# used to filer out comma separated cores - can be useful if backup/resotre failed in some point
+skip_cores = default("/commandParams/solr_skip_cores", "").split(",")
+
 # delete write.lock file at the start of lucene index migration process
 delete_lock_on_start = default("/commandParams/solr_delete_lock_on_start", True)
 # if it used, then core filter will be used with snapshot.* folder pattern
@@ -70,7 +73,16 @@ check_hosts_default = True if params.security_enabled else False
 check_hosts = default("/commandParams/solr_check_hosts", check_hosts_default)
 
 solr_protocol = "https" if params.infra_solr_ssl_enabled else "http"
+solr_port = format("{params.infra_solr_port}")
 solr_base_url = format("{solr_protocol}://{params.hostname}:{params.infra_solr_port}/solr")
+solr_datadir = params.infra_solr_datadir
+
+solr_keep_backup=default("/commandParams/solr_keep_backup", False)
+
+solr_num_shards = int(default("/commandParams/solr_shards", "0"))
+
+if solr_num_shards == 0:
+  raise Exeption(format("The 'solr_shards' command parameter is required to set."))
 
 if params.security_enabled:
   keytab = params.infra_solr_kerberos_keytab
@@ -78,6 +90,11 @@ if params.security_enabled:
 
 hostname_suffix = params.hostname.replace(".", "_")
 
+HOST_CORES='host-cores'
+CORE_HOST='core-host'
+HOST_SHARDS='host-shards'
+CORE_DATA='core-data'
+
 if shared_fs:
   index_location = format("{index_location}_{hostname_suffix}")
 
@@ -92,8 +109,8 @@ def get_files_by_pattern(directory, pattern):
       if matched:
         yield os.path.join(root, basename)
 
-def create_solr_api_request_command(request_path, output=None):
-  solr_url = format("{solr_base_url}/{request_path}")
+def create_solr_api_request_command(request_path, output=None, override_solr_base_url=None):
+  solr_url = format("{solr_base_url}/{request_path}") if override_solr_base_url is None else format("{override_solr_base_url}/{request_path}")
   grep_cmd = " | grep 'solr_rs_status: 200'"
   api_cmd = format("kinit -kt {keytab} {principal} && curl -w'solr_rs_status: %{{http_code}}' -k --negotiate -u : '{solr_url}'") \
     if params.security_enabled else format("curl -w'solr_rs_status: %{{http_code}}' -k '{solr_url}'")
@@ -126,52 +143,63 @@ def snapshot_status_check(request_cmd, json_output, snapshot_name, backup=True,
         json_data = json.load(json_file)
         if backup:
           details = json_data['details']
-          backup_list = details['backup']
-          if log_output:
-            Logger.info(str(backup_list))
-
-          if type(backup_list) == type(list()): # support map and list format as well
-            backup_data = dict(backup_list[i:i+2] for i in range(0, len(backup_list), 2))
+          if 'backup' in details:
+            backup_list = details['backup']
+            if log_output:
+              Logger.info(str(backup_list))
+
+            if type(backup_list) == type(list()): # support map and list format as well
+              backup_data = dict(backup_list[i:i+2] for i in range(0, len(backup_list), 2))
+            else:
+              backup_data = backup_list
+
+            if (not 'snapshotName' in backup_data) or backup_data['snapshotName'] != snapshot_name:
+              snapshot = backup_data['snapshotName']
+              Logger.info(format("Snapshot name: {snapshot}, wait until {snapshot_name} will be available."))
+              time.sleep(time_interval)
+              continue
+
+            if backup_data['status'] == 'success':
+              Logger.info("Backup command status: success.")
+              failed = False
+            elif backup_data['status'] == 'failed':
+              Logger.info("Backup command status: failed.")
+            else:
+              Logger.info(format("Backup command is in progress... Sleep for {time_interval} seconds."))
+              time.sleep(time_interval)
+              continue
           else:
-            backup_data = backup_list
-
-          if (not 'snapshotName' in backup_data) or backup_data['snapshotName'] != snapshot_name:
-            snapshot = backup_data['snapshotName']
-            Logger.info(format("Snapshot name: {snapshot}, wait until {snapshot_name} will be available."))
-            time.sleep(time_interval)
-            continue
-
-          if backup_data['status'] == 'success':
-            Logger.info("Backup command status: success.")
-            failed = False
-          elif backup_data['status'] == 'failed':
-            Logger.info("Backup command status: failed.")
-          else:
-            Logger.info(format("Backup command is in progress... Sleep for {time_interval} seconds."))
+            Logger.info("Backup data is not found yet in details JSON response...")
             time.sleep(time_interval)
             continue
 
         else:
-          restorestatus_data = json_data['restorestatus']
-          if log_output:
-            Logger.info(str(restorestatus_data))
-
-          if (not 'snapshotName' in restorestatus_data) or restorestatus_data['snapshotName'] != format("snapshot.{snapshot_name}"):
-            snapshot = restorestatus_data['snapshotName']
-            Logger.info(format("Snapshot name: {snapshot}, wait until snapshot.{snapshot_name} will be available."))
-            time.sleep(time_interval)
-            continue
-
-          if restorestatus_data['status'] == 'success':
-            Logger.info("Restore command successfully finished.")
-            failed = False
-          elif restorestatus_data['status'] == 'failed':
-            Logger.info("Restore command failed.")
+          if 'restorestatus' in json_data:
+            restorestatus_data = json_data['restorestatus']
+            if log_output:
+              Logger.info(str(restorestatus_data))
+
+            if (not 'snapshotName' in restorestatus_data) or restorestatus_data['snapshotName'] != format("snapshot.{snapshot_name}"):
+              snapshot = restorestatus_data['snapshotName']
+              Logger.info(format("Snapshot name: {snapshot}, wait until snapshot.{snapshot_name} will be available."))
+              time.sleep(time_interval)
+              continue
+
+            if restorestatus_data['status'] == 'success':
+              Logger.info("Restore command successfully finished.")
+              failed = False
+            elif restorestatus_data['status'] == 'failed':
+              Logger.info("Restore command failed.")
+            else:
+              Logger.info(format("Restore command is in progress... Sleep for {time_interval} seconds."))
+              time.sleep(time_interval)
+              continue
           else:
-            Logger.info(format("Restore command is in progress... Sleep for {time_interval} seconds."))
+            Logger.info("Restore status data is not found yet in details JSON response...")
             time.sleep(time_interval)
             continue
 
+
     except Exception:
       traceback.print_exc()
       time.sleep(time_interval)
@@ -189,9 +217,16 @@ def __get_domain_name(url):
   dm = spltAr[i].split('/')[0].split(':')[0].lower()
   return dm
 
-def __read_hosts_from_clusterstate_json(json_path):
-  hosts = set()
-  with open(json_path) as json_file:
+def __read_host_cores_from_clusterstate_json(json_zk_state_path, json_host_cores_path):
+  """
+  Fill (and write to file) a JSON object with core data from state.json (znode).
+  """
+  json_content={}
+  hosts_core_map={}
+  hosts_shard_map={}
+  core_host_map={}
+  core_data_map={}
+  with open(json_zk_state_path) as json_file:
     json_data = json.load(json_file)
     znode = json_data['znode']
     data = json.loads(znode['data'])
@@ -205,21 +240,98 @@ def __read_hosts_from_clusterstate_json(json_path):
         core_data = replicas[replica]
         core = core_data['core']
         base_url = core_data['base_url']
+        state = core_data['state']
+        leader = core_data['leader'] if 'leader' in core_data else 'false'
         domain = __get_domain_name(base_url)
-        hosts.add(domain)
-        Logger.info(format("Found replica: {replica} (core '{core}') in {shard} on {domain}"))
-    return hosts
-
-def __get_hosts_for_collection():
+        if state == 'active' and leader == 'true':
+          if domain not in hosts_core_map:
+            hosts_core_map[domain]=[]
+          if domain not in hosts_shard_map:
+            hosts_shard_map[domain]=[]
+          if core not in core_data_map:
+            core_data_map[core]={}
+          hosts_core_map[domain].append(core)
+          hosts_shard_map[domain].append(shard)
+          core_host_map[core]=domain
+          core_data_map[core]['host']=domain
+          core_data_map[core]['node']=replica
+          core_data_map[core]['type']=core_data['type']
+          core_data_map[core]['shard']=shard
+          Logger.info(format("Found leader/active replica: {replica} (core '{core}') in {shard} on {domain}"))
+        else:
+          Logger.info(format("Found non-leader/active replica: {replica} (core '{core}') in {shard} on {domain}"))
+  json_content[HOST_CORES]=hosts_core_map
+  json_content[CORE_HOST]=core_host_map
+  json_content[HOST_SHARDS]=hosts_shard_map
+  json_content[CORE_DATA]=core_data_map
+  with open(json_host_cores_path, 'w') as outfile:
+    json.dump(json_content, outfile)
+  return json_content
+
+def get_host_cores_for_collection(backup=True):
+  """
+  Get core details to an object and write them to a file as well. Backup data will be used during restore.
+  :param backup: if enabled, save file into backup_host_cores.json, otherwise use restore_host_cores.json
+  :return: detailed json about the cores
+  """
   request_path = 'admin/zookeeper?wt=json&detail=true&path=%2Fclusterstate.json&view=graph'
-  json_path = format("{index_location}/zk_state.json")
-  api_request = create_solr_api_request_command(request_path, output=json_path)
+  json_folder = format("{index_location}")
+  json_zk_state_path = format("{json_folder}/zk_state.json")
+  if backup:
+    json_host_cores_path = format("{json_folder}/backup_host_cores.json")
+  else:
+    json_host_cores_path = format("{json_folder}/restore_host_cores.json")
+  api_request = create_solr_api_request_command(request_path, output=json_zk_state_path)
   Execute(api_request, user=params.infra_solr_user)
-  return __read_hosts_from_clusterstate_json(json_path)
+  return __read_host_cores_from_clusterstate_json(json_zk_state_path, json_host_cores_path)
+
+def read_backup_json():
+  with open(format("{index_location}/backup_host_cores.json")) as json_file:
+    json_data = json.load(json_file)
+    return json_data
 
-def is_collection_available_on_host():
-  if check_hosts:
-    hosts_set = __get_hosts_for_collection()
-    return params.hostname in hosts_set
+def create_core_pairs(original_cores, new_cores):
+  """
+  Create core pairss from the original and new cores (backups -> restored ones), use alphabetic order
+  """
+  core_pairs_data=[]
+  if len(new_cores) < len(original_cores):
+    raise Exception("Old collection core size is: " + str(len(new_cores)) +
+                    ". You will need at least: " + str(len(original_cores)))
   else:
-    return True
\ No newline at end of file
+    for index, core_data in enumerate(original_cores):
+      value={}
+      value['src_core']=core_data[0]
+      value['src_host']=core_data[1]
+      value['target_core']=new_cores[index][0]
+      value['target_host']=new_cores[index][1]
+      core_pairs_data.append(value)
+    with open(format("{index_location}/restore_core_pairs.json"), 'w') as outfile:
+      json.dump(core_pairs_data, outfile)
+    return core_pairs_data
+
+def sort_core_host_pairs(host_core_map):
+  """
+  Sort host core map by key
+  """
+  core_host_pairs=[]
+  for key in sorted(host_core_map):
+    core_host_pairs.append((key, host_core_map[key]))
+  return core_host_pairs
+
+def is_ip(addr):
+  try:
+    socket.inet_aton(addr)
+    return True
+  except socket.error:
+    return False
+
+def resolve_ip_to_hostname(ip):
+  try:
+    host_name = socket.gethostbyaddr(ip)[0].lower()
+    Logger.info(format("Resolved {ip} to {host_name}"))
+    fqdn_name = socket.getaddrinfo(host_name, 0, 0, 0, 0, socket.AI_CANONNAME)[0][3].lower()
+    return host_name if host_name == fqdn_name else fqdn_name
+  except socket.error:
+    pass
+  return ip
\ No newline at end of file
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/migrate.py b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/migrate.py
index 5d65d1d..1ff7d95 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/migrate.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/migrate.py
@@ -28,11 +28,6 @@ def migrate_index(env):
   import params, command_commons
   env.set_params(command_commons)
 
-  collection_available = command_commons.is_collection_available_on_host()
-  if not collection_available:
-    Logger.info(format("No any '{collection}' replica is used on {params.hostname} host"))
-    return
-
   index_migrate_cmd = format("{index_helper_script} upgrade-index -d {index_location} -v {index_version}")
 
   if command_commons.force is True:
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/templates/infra-solr-security.json.j2 b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/templates/infra-solr-security.json.j2
index b6c4e95..18d2340 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/templates/infra-solr-security.json.j2
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/templates/infra-solr-security.json.j2
@@ -67,6 +67,12 @@
        "role": ["admin", "{{infra_solr_role_ranger_admin}}", "{{infra_solr_role_ranger_audit}}"],
        "name": "ranger-manager",
        "path": "/*"
+    },
+    {
+       "collection": "old_ranger_audits",
+       "role": ["admin", "{{infra_solr_role_ranger_admin}}", "{{infra_solr_role_ranger_audit}}"],
+       "name": "backup-ranger-manager",
+       "path": "/*"
     }]
   }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
oleewere@apache.org.

[ambari] 02/02: AMBARI-23822. Infra Solr: Add restore support if index is on HDFS.

Posted by ol...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit c17b9f8d093447274ac2a9b90f705f3b5a943b17
Author: Oliver Szabo <ol...@gmail.com>
AuthorDate: Sun May 13 19:12:56 2018 +0200

    AMBARI-23822. Infra Solr: Add restore support if index is on HDFS.
---
 .../src/main/python/migrationHelper.py             |   7 ++
 .../AMBARI_INFRA_SOLR/0.1.0/metainfo.xml           |   3 +-
 .../0.1.0/package/scripts/collection.py            | 121 ++++++++++++++++-----
 .../0.1.0/package/scripts/command_commons.py       |  96 +++++++++++++++-
 4 files changed, 194 insertions(+), 33 deletions(-)

diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
index 8c7f6b1..4d7f1d2 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
@@ -148,6 +148,10 @@ def fill_parameters(options):
     params['solr_skip_cores'] = options.skip_cores
   if options.solr_shards:
     params['solr_shards'] = options.solr_shards
+  if options.solr_hdfs_path:
+    params['solr_hdfs_path'] = options.solr_hdfs_path
+  if options.solr_keep_backup:
+    params['solr_keep_backup'] = True
   return params
 
 def validte_common_options(options, parser):
@@ -235,6 +239,9 @@ if __name__=="__main__":
   parser.add_option("--core-filter", dest="core_filter", default=None, type="string", help="core filter for replica folders")
   parser.add_option("--skip-cores", dest="skip_cores", default=None, type="string", help="specific cores to skip (comma separated)")
   parser.add_option("--shards", dest="solr_shards", type="int", default=0, help="number of shards (required to set properly for restore)")
+  parser.add_option("--solr-hdfs-path", dest="solr_hdfs_path", type="string", default=None, help="Base path of Solr (where collections are located) if HDFS is used (like /user/infra-solr)")
+  parser.add_option("--solr-keep-backup", dest="solr_keep_backup", default=False, action="store_true", help="If it is turned on, Snapshot Solr data will not be deleted from the filesystem during restore.")
+
   (options, args) = parser.parse_args()
 
   protocol = 'https' if options.ssl else 'http'
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/metainfo.xml b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/metainfo.xml
index 015a7bb..de75d05 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/metainfo.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/metainfo.xml
@@ -58,7 +58,8 @@
               <commandScript>
                 <script>scripts/infra_solr.py</script>
                 <scriptType>PYTHON</scriptType>
-                <timeout>1200</timeout>
+                <background>true</background>
+                <timeout>36000</timeout>
               </commandScript>
             </customCommand>
             <customCommand>
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
index e32bc1f..4f51071 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
@@ -105,9 +105,11 @@ def restore_collection(env):
   if command_commons.collection in ["ranger_audits", "history", "hadoop_logs", "audit_logs",
                                     "vertex_index", "edge_index",
                                     "fulltext_index"]:  # Make sure ambari wont delete an important collection
-    raise Exeption(format(
+    raise Exception(format(
       "Selected collection for restore is: {collection}. It is not recommended to restore on default collections."))
 
+  hdfs_cores_on_host=[]
+
   for core_data in core_pairs:
     src_core = core_data['src_core']
     target_core = core_data['target_core']
@@ -124,16 +126,26 @@ def restore_collection(env):
     core_root_dir = format("{solr_datadir}/backup_{target_core}")
     core_root_without_backup_dir = format("{solr_datadir}/{target_core}")
 
-    Directory([format("{core_root_dir}/data/index"),
-               format("{core_root_dir}/data/tlog"),
-               format("{core_root_dir}/data/snapshot_metadata")],
-              mode=0755,
-              cd_access='a',
-              create_parents=True,
-              owner=params.infra_solr_user,
-              group=params.user_group,
-              only_if=only_if_cmd
-              )
+    if command_commons.solr_hdfs_path:
+      Directory([core_root_dir],
+                mode=0755,
+                cd_access='a',
+                create_parents=True,
+                owner=params.infra_solr_user,
+                group=params.user_group,
+                only_if=only_if_cmd
+                )
+    else:
+      Directory([format("{core_root_dir}/data/index"),
+                 format("{core_root_dir}/data/tlog"),
+                 format("{core_root_dir}/data/snapshot_metadata")],
+                mode=0755,
+                cd_access='a',
+                create_parents=True,
+                owner=params.infra_solr_user,
+                group=params.user_group,
+                only_if=only_if_cmd
+                )
 
     core_details = core_data[target_core]
     core_properties = {}
@@ -144,15 +156,47 @@ def restore_collection(env):
     core_properties['collection'] = command_commons.collection
     core_properties['coreNodeName'] = core_details['node']
     core_properties['shard'] = core_details['shard']
+    if command_commons.solr_hdfs_path:
+      hdfs_solr_node_folder=command_commons.solr_hdfs_path + format("/backup_{collection}/") + core_details['node']
+      source_folder=format("{index_location}/snapshot.{src_core}/")
+      if command_commons.check_folder_exists(source_folder):
+        hdfs_cores_on_host.append(target_core)
+        command_commons.HdfsResource(format("{hdfs_solr_node_folder}/data/index/"),
+                                   type="directory",
+                                   action="create_on_execute",
+                                   source=source_folder,
+                                   owner=params.infra_solr_user,
+                                   mode=0755,
+                                   recursive_chown=True,
+                                   recursive_chmod=True
+                                   )
+        command_commons.HdfsResource(format("{hdfs_solr_node_folder}/data/tlog"),
+                                   type="directory",
+                                   action="create_on_execute",
+                                   owner=params.infra_solr_user,
+                                   mode=0755
+                                   )
+        command_commons.HdfsResource(format("{hdfs_solr_node_folder}/data/snapshot_metadata"),
+                                   type="directory",
+                                   action="create_on_execute",
+                                   owner=params.infra_solr_user,
+                                   mode=0755
+                                   )
+        if command_commons.solr_keep_backup:
+          Directory(format("{index_location}/snapshot.{src_core}"),
+                  action="delete",
+                  only_if=only_if_cmd,
+                  owner=params.infra_solr_user)
+    else:
+      copy_cmd = format(
+        "mv {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/") if command_commons.solr_keep_backup \
+        else format("cp -r {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/")
+      Execute(
+        copy_cmd, only_if=only_if_cmd,
+        user=params.infra_solr_user,
+        logoutput=True
+      )
 
-    copy_cmd = format(
-      "mv {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/") if command_commons.solr_keep_backup \
-      else format("cp -r {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/")
-    Execute(
-      copy_cmd, only_if=only_if_cmd,
-      user=params.infra_solr_user,
-      logoutput=True
-    )
     PropertiesFile(
       core_root_dir + '/core.properties',
       properties=core_properties,
@@ -165,7 +209,6 @@ def restore_collection(env):
   Execute(format("rm -rf {solr_datadir}/{collection}*"),
           user=params.infra_solr_user,
           logoutput=True)
-
   for core_data in core_pairs:
     src_core = core_data['src_core']
     target_core = core_data['target_core']
@@ -179,6 +222,25 @@ def restore_collection(env):
 
     core_root_dir = format("{solr_datadir}/backup_{target_core}")
     core_root_without_backup_dir = format("{solr_datadir}/{target_core}")
+
+    if command_commons.solr_hdfs_path:
+      if target_core in hdfs_cores_on_host:
+        Logger.info(format("Core data '{target_core}' is located on this host, processing..."))
+        core_data=host_cores_map[command_commons.CORE_DATA]
+        core_details=core_data[target_core]
+        core_node=core_details['node']
+        collection_core_dir=command_commons.solr_hdfs_path + format("/{collection}/{core_node}")
+        backup_collection_core_dir=command_commons.solr_hdfs_path + format("/backup_{collection}/{core_node}")
+        command_commons.HdfsResource(collection_core_dir,
+                               type="directory",
+                               action="delete_on_execute",
+                               owner=params.infra_solr_user
+                               )
+        if command_commons.check_hdfs_folder_exists(backup_collection_core_dir):
+          command_commons.move_hdfs_folder(backup_collection_core_dir, collection_core_dir)
+      else:
+        Logger.info(format("Core data '{target_core}' is not located on this host, skipping..."))
+
     Execute(
       format("mv {core_root_dir} {core_root_without_backup_dir}"),
       user=params.infra_solr_user,
@@ -186,12 +248,13 @@ def restore_collection(env):
       only_if=format("test -d {core_root_dir}")
     )
 
-    Directory([format("{core_root_without_backup_dir}")],
-              mode=0755,
-              cd_access='a',
-              create_parents=True,
-              owner=params.infra_solr_user,
-              group=params.user_group,
-              recursive_ownership=True,
-              only_if=format("test -d {core_root_without_backup_dir}")
-              )
+    Directory(
+      [format("{core_root_without_backup_dir}")],
+      mode=0755,
+      cd_access='a',
+      create_parents=True,
+      owner=params.infra_solr_user,
+      group=params.user_group,
+      recursive_ownership=True,
+      only_if=format("test -d {core_root_without_backup_dir}")
+    )
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
index f7dc92e..a8a17e7 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
@@ -24,10 +24,12 @@ import socket
 import time
 import traceback
 
+from resource_management.core.shell import call
 from resource_management.core.logger import Logger
 from resource_management.core.resources.system import Execute
 from resource_management.libraries.functions.default import default
 from resource_management.libraries.functions.format import format
+from resource_management.libraries.resources.hdfs_resource import HdfsResource
 
 index_helper_script = '/usr/lib/ambari-infra-solr-client/solrIndexHelper.sh'
 
@@ -81,8 +83,54 @@ solr_keep_backup=default("/commandParams/solr_keep_backup", False)
 
 solr_num_shards = int(default("/commandParams/solr_shards", "0"))
 
+solr_hdfs_path=default("/commandParams/solr_hdfs_path", None)
+
 if solr_num_shards == 0:
-  raise Exeption(format("The 'solr_shards' command parameter is required to set."))
+  raise Exception(format("The 'solr_shards' command parameter is required to set."))
+
+if solr_hdfs_path:
+
+  import functools
+  from resource_management.libraries.functions import conf_select
+  from resource_management.libraries.functions import stack_select
+  from resource_management.libraries.functions import get_klist_path
+  from resource_management.libraries.functions import get_kinit_path
+  from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources
+
+  klist_path_local = get_klist_path(default('/configurations/kerberos-env/executable_search_paths', None))
+  kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
+
+  # hadoop default parameters
+  hdfs_user = params.config['configurations']['hadoop-env']['hdfs_user']
+  hadoop_bin = stack_select.get_hadoop_dir("sbin")
+  hadoop_bin_dir = stack_select.get_hadoop_dir("bin")
+  hadoop_conf_dir = conf_select.get_hadoop_conf_dir()
+  hadoop_conf_secure_dir = os.path.join(hadoop_conf_dir, "secure")
+  hadoop_lib_home = stack_select.get_hadoop_dir("lib")
+  hdfs_principal_name = default('/configurations/hadoop-env/hdfs_principal_name', None)
+  hdfs_user_keytab = params.config['configurations']['hadoop-env']['hdfs_user_keytab']
+
+  dfs_type = default("/commandParams/dfs_type", "")
+
+  hdfs_site = params.config['configurations']['hdfs-site']
+  default_fs = params.config['configurations']['core-site']['fs.defaultFS']
+  #create partial functions with common arguments for every HdfsResource call
+  #to create/delete/copyfromlocal hdfs directories/files we need to call params.HdfsResource in code
+  HdfsResource = functools.partial(
+    HdfsResource,
+    user=params.infra_solr_user,
+    hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore",
+    security_enabled = params.security_enabled,
+    keytab = hdfs_user_keytab,
+    kinit_path_local = kinit_path_local,
+    hadoop_bin_dir = hadoop_bin_dir,
+    hadoop_conf_dir = hadoop_conf_dir,
+    principal_name = hdfs_principal_name,
+    hdfs_site = hdfs_site,
+    default_fs = default_fs,
+    immutable_paths = get_not_managed_resources(),
+    dfs_type = dfs_type
+  )
 
 if params.security_enabled:
   keytab = params.infra_solr_kerberos_keytab
@@ -255,7 +303,10 @@ def __read_host_cores_from_clusterstate_json(json_zk_state_path, json_host_cores
           core_host_map[core]=domain
           core_data_map[core]['host']=domain
           core_data_map[core]['node']=replica
-          core_data_map[core]['type']=core_data['type']
+          if 'type' in core_data:
+            core_data_map[core]['type']=core_data['type']
+          else:
+            core_data_map[core]['type']='NRT'
           core_data_map[core]['shard']=shard
           Logger.info(format("Found leader/active replica: {replica} (core '{core}') in {shard} on {domain}"))
         else:
@@ -334,4 +385,43 @@ def resolve_ip_to_hostname(ip):
     return host_name if host_name == fqdn_name else fqdn_name
   except socket.error:
     pass
-  return ip
\ No newline at end of file
+  return ip
+
+def create_command(command):
+  """
+  Create hdfs command. Append kinit to the command if required.
+  """
+  kinit_cmd = "{0} -kt {1} {2};".format(kinit_path_local, params.infra_solr_kerberos_keytab, params.infra_solr_kerberos_principal) if params.security_enabled else ""
+  return kinit_cmd + command
+
+def execute_commad(command):
+  """
+  Run hdfs command by infra-solr user
+  """
+  return call(command, user=params.infra_solr_user, timeout=300)
+
+def move_hdfs_folder(source_dir, target_dir):
+  cmd=create_command(format("hdfs dfs -mv {source_dir} {target_dir}"))
+  returncode, stdout = execute_commad(cmd)
+  if returncode:
+    raise Fail("Unable to move HDFS dir '{0}' to '{1}' (return code: {2})".format(source_dir, target_dir, str(returncode)))
+  return stdout.strip()
+
+def check_hdfs_folder_exists(hdfs_dir):
+  """
+  Check that hdfs folder exists or not
+  """
+  cmd=create_command(format("hdfs dfs -ls {hdfs_dir}"))
+  returncode, stdout = execute_commad(cmd)
+  if returncode:
+    return False
+  return True
+
+def check_folder_exists(dir):
+  """
+  Check that folder exists or not
+  """
+  returncode, stdout = call(format("test -d {dir}"), user=params.infra_solr_user, timeout=300)
+  if returncode:
+    return False
+  return True

-- 
To stop receiving notification emails like this one, please contact
oleewere@apache.org.