You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/05/13 18:20:55 UTC

[ambari] 02/02: AMBARI-23822. Infra Solr: Add restore support if index is on HDFS.

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit c17b9f8d093447274ac2a9b90f705f3b5a943b17
Author: Oliver Szabo <ol...@gmail.com>
AuthorDate: Sun May 13 19:12:56 2018 +0200

    AMBARI-23822. Infra Solr: Add restore support if index is on HDFS.
---
 .../src/main/python/migrationHelper.py             |   7 ++
 .../AMBARI_INFRA_SOLR/0.1.0/metainfo.xml           |   3 +-
 .../0.1.0/package/scripts/collection.py            | 121 ++++++++++++++++-----
 .../0.1.0/package/scripts/command_commons.py       |  96 +++++++++++++++-
 4 files changed, 194 insertions(+), 33 deletions(-)

diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
index 8c7f6b1..4d7f1d2 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
@@ -148,6 +148,10 @@ def fill_parameters(options):
     params['solr_skip_cores'] = options.skip_cores
   if options.solr_shards:
     params['solr_shards'] = options.solr_shards
+  if options.solr_hdfs_path:
+    params['solr_hdfs_path'] = options.solr_hdfs_path
+  if options.solr_keep_backup:
+    params['solr_keep_backup'] = True
   return params
 
 def validte_common_options(options, parser):
@@ -235,6 +239,9 @@ if __name__=="__main__":
   parser.add_option("--core-filter", dest="core_filter", default=None, type="string", help="core filter for replica folders")
   parser.add_option("--skip-cores", dest="skip_cores", default=None, type="string", help="specific cores to skip (comma separated)")
   parser.add_option("--shards", dest="solr_shards", type="int", default=0, help="number of shards (required to set properly for restore)")
+  parser.add_option("--solr-hdfs-path", dest="solr_hdfs_path", type="string", default=None, help="Base path of Solr (where collections are located) if HDFS is used (like /user/infra-solr)")
+  parser.add_option("--solr-keep-backup", dest="solr_keep_backup", default=False, action="store_true", help="If it is turned on, Snapshot Solr data will not be deleted from the filesystem during restore.")
+
   (options, args) = parser.parse_args()
 
   protocol = 'https' if options.ssl else 'http'
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/metainfo.xml b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/metainfo.xml
index 015a7bb..de75d05 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/metainfo.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/metainfo.xml
@@ -58,7 +58,8 @@
               <commandScript>
                 <script>scripts/infra_solr.py</script>
                 <scriptType>PYTHON</scriptType>
-                <timeout>1200</timeout>
+                <background>true</background>
+                <timeout>36000</timeout>
               </commandScript>
             </customCommand>
             <customCommand>
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
index e32bc1f..4f51071 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
@@ -105,9 +105,11 @@ def restore_collection(env):
   if command_commons.collection in ["ranger_audits", "history", "hadoop_logs", "audit_logs",
                                     "vertex_index", "edge_index",
                                     "fulltext_index"]:  # Make sure ambari wont delete an important collection
-    raise Exeption(format(
+    raise Exception(format(
       "Selected collection for restore is: {collection}. It is not recommended to restore on default collections."))
 
+  hdfs_cores_on_host=[]
+
   for core_data in core_pairs:
     src_core = core_data['src_core']
     target_core = core_data['target_core']
@@ -124,16 +126,26 @@ def restore_collection(env):
     core_root_dir = format("{solr_datadir}/backup_{target_core}")
     core_root_without_backup_dir = format("{solr_datadir}/{target_core}")
 
-    Directory([format("{core_root_dir}/data/index"),
-               format("{core_root_dir}/data/tlog"),
-               format("{core_root_dir}/data/snapshot_metadata")],
-              mode=0755,
-              cd_access='a',
-              create_parents=True,
-              owner=params.infra_solr_user,
-              group=params.user_group,
-              only_if=only_if_cmd
-              )
+    if command_commons.solr_hdfs_path:
+      Directory([core_root_dir],
+                mode=0755,
+                cd_access='a',
+                create_parents=True,
+                owner=params.infra_solr_user,
+                group=params.user_group,
+                only_if=only_if_cmd
+                )
+    else:
+      Directory([format("{core_root_dir}/data/index"),
+                 format("{core_root_dir}/data/tlog"),
+                 format("{core_root_dir}/data/snapshot_metadata")],
+                mode=0755,
+                cd_access='a',
+                create_parents=True,
+                owner=params.infra_solr_user,
+                group=params.user_group,
+                only_if=only_if_cmd
+                )
 
     core_details = core_data[target_core]
     core_properties = {}
@@ -144,15 +156,47 @@ def restore_collection(env):
     core_properties['collection'] = command_commons.collection
     core_properties['coreNodeName'] = core_details['node']
     core_properties['shard'] = core_details['shard']
+    if command_commons.solr_hdfs_path:
+      hdfs_solr_node_folder=command_commons.solr_hdfs_path + format("/backup_{collection}/") + core_details['node']
+      source_folder=format("{index_location}/snapshot.{src_core}/")
+      if command_commons.check_folder_exists(source_folder):
+        hdfs_cores_on_host.append(target_core)
+        command_commons.HdfsResource(format("{hdfs_solr_node_folder}/data/index/"),
+                                   type="directory",
+                                   action="create_on_execute",
+                                   source=source_folder,
+                                   owner=params.infra_solr_user,
+                                   mode=0755,
+                                   recursive_chown=True,
+                                   recursive_chmod=True
+                                   )
+        command_commons.HdfsResource(format("{hdfs_solr_node_folder}/data/tlog"),
+                                   type="directory",
+                                   action="create_on_execute",
+                                   owner=params.infra_solr_user,
+                                   mode=0755
+                                   )
+        command_commons.HdfsResource(format("{hdfs_solr_node_folder}/data/snapshot_metadata"),
+                                   type="directory",
+                                   action="create_on_execute",
+                                   owner=params.infra_solr_user,
+                                   mode=0755
+                                   )
+        if command_commons.solr_keep_backup:
+          Directory(format("{index_location}/snapshot.{src_core}"),
+                  action="delete",
+                  only_if=only_if_cmd,
+                  owner=params.infra_solr_user)
+    else:
+      copy_cmd = format(
+        "mv {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/") if command_commons.solr_keep_backup \
+        else format("cp -r {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/")
+      Execute(
+        copy_cmd, only_if=only_if_cmd,
+        user=params.infra_solr_user,
+        logoutput=True
+      )
 
-    copy_cmd = format(
-      "mv {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/") if command_commons.solr_keep_backup \
-      else format("cp -r {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/")
-    Execute(
-      copy_cmd, only_if=only_if_cmd,
-      user=params.infra_solr_user,
-      logoutput=True
-    )
     PropertiesFile(
       core_root_dir + '/core.properties',
       properties=core_properties,
@@ -165,7 +209,6 @@ def restore_collection(env):
   Execute(format("rm -rf {solr_datadir}/{collection}*"),
           user=params.infra_solr_user,
           logoutput=True)
-
   for core_data in core_pairs:
     src_core = core_data['src_core']
     target_core = core_data['target_core']
@@ -179,6 +222,25 @@ def restore_collection(env):
 
     core_root_dir = format("{solr_datadir}/backup_{target_core}")
     core_root_without_backup_dir = format("{solr_datadir}/{target_core}")
+
+    if command_commons.solr_hdfs_path:
+      if target_core in hdfs_cores_on_host:
+        Logger.info(format("Core data '{target_core}' is located on this host, processing..."))
+        core_data=host_cores_map[command_commons.CORE_DATA]
+        core_details=core_data[target_core]
+        core_node=core_details['node']
+        collection_core_dir=command_commons.solr_hdfs_path + format("/{collection}/{core_node}")
+        backup_collection_core_dir=command_commons.solr_hdfs_path + format("/backup_{collection}/{core_node}")
+        command_commons.HdfsResource(collection_core_dir,
+                               type="directory",
+                               action="delete_on_execute",
+                               owner=params.infra_solr_user
+                               )
+        if command_commons.check_hdfs_folder_exists(backup_collection_core_dir):
+          command_commons.move_hdfs_folder(backup_collection_core_dir, collection_core_dir)
+      else:
+        Logger.info(format("Core data '{target_core}' is not located on this host, skipping..."))
+
     Execute(
       format("mv {core_root_dir} {core_root_without_backup_dir}"),
       user=params.infra_solr_user,
@@ -186,12 +248,13 @@ def restore_collection(env):
       only_if=format("test -d {core_root_dir}")
     )
 
-    Directory([format("{core_root_without_backup_dir}")],
-              mode=0755,
-              cd_access='a',
-              create_parents=True,
-              owner=params.infra_solr_user,
-              group=params.user_group,
-              recursive_ownership=True,
-              only_if=format("test -d {core_root_without_backup_dir}")
-              )
+    Directory(
+      [format("{core_root_without_backup_dir}")],
+      mode=0755,
+      cd_access='a',
+      create_parents=True,
+      owner=params.infra_solr_user,
+      group=params.user_group,
+      recursive_ownership=True,
+      only_if=format("test -d {core_root_without_backup_dir}")
+    )
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
index f7dc92e..a8a17e7 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
@@ -24,10 +24,12 @@ import socket
 import time
 import traceback
 
+from resource_management.core.shell import call
 from resource_management.core.logger import Logger
 from resource_management.core.resources.system import Execute
 from resource_management.libraries.functions.default import default
 from resource_management.libraries.functions.format import format
+from resource_management.libraries.resources.hdfs_resource import HdfsResource
 
 index_helper_script = '/usr/lib/ambari-infra-solr-client/solrIndexHelper.sh'
 
@@ -81,8 +83,54 @@ solr_keep_backup=default("/commandParams/solr_keep_backup", False)
 
 solr_num_shards = int(default("/commandParams/solr_shards", "0"))
 
+solr_hdfs_path=default("/commandParams/solr_hdfs_path", None)
+
 if solr_num_shards == 0:
-  raise Exeption(format("The 'solr_shards' command parameter is required to set."))
+  raise Exception(format("The 'solr_shards' command parameter is required to set."))
+
+if solr_hdfs_path:
+
+  import functools
+  from resource_management.libraries.functions import conf_select
+  from resource_management.libraries.functions import stack_select
+  from resource_management.libraries.functions import get_klist_path
+  from resource_management.libraries.functions import get_kinit_path
+  from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources
+
+  klist_path_local = get_klist_path(default('/configurations/kerberos-env/executable_search_paths', None))
+  kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
+
+  # hadoop default parameters
+  hdfs_user = params.config['configurations']['hadoop-env']['hdfs_user']
+  hadoop_bin = stack_select.get_hadoop_dir("sbin")
+  hadoop_bin_dir = stack_select.get_hadoop_dir("bin")
+  hadoop_conf_dir = conf_select.get_hadoop_conf_dir()
+  hadoop_conf_secure_dir = os.path.join(hadoop_conf_dir, "secure")
+  hadoop_lib_home = stack_select.get_hadoop_dir("lib")
+  hdfs_principal_name = default('/configurations/hadoop-env/hdfs_principal_name', None)
+  hdfs_user_keytab = params.config['configurations']['hadoop-env']['hdfs_user_keytab']
+
+  dfs_type = default("/commandParams/dfs_type", "")
+
+  hdfs_site = params.config['configurations']['hdfs-site']
+  default_fs = params.config['configurations']['core-site']['fs.defaultFS']
+  #create partial functions with common arguments for every HdfsResource call
+  #to create/delete/copyfromlocal hdfs directories/files we need to call params.HdfsResource in code
+  HdfsResource = functools.partial(
+    HdfsResource,
+    user=params.infra_solr_user,
+    hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore",
+    security_enabled = params.security_enabled,
+    keytab = hdfs_user_keytab,
+    kinit_path_local = kinit_path_local,
+    hadoop_bin_dir = hadoop_bin_dir,
+    hadoop_conf_dir = hadoop_conf_dir,
+    principal_name = hdfs_principal_name,
+    hdfs_site = hdfs_site,
+    default_fs = default_fs,
+    immutable_paths = get_not_managed_resources(),
+    dfs_type = dfs_type
+  )
 
 if params.security_enabled:
   keytab = params.infra_solr_kerberos_keytab
@@ -255,7 +303,10 @@ def __read_host_cores_from_clusterstate_json(json_zk_state_path, json_host_cores
           core_host_map[core]=domain
           core_data_map[core]['host']=domain
           core_data_map[core]['node']=replica
-          core_data_map[core]['type']=core_data['type']
+          if 'type' in core_data:
+            core_data_map[core]['type']=core_data['type']
+          else:
+            core_data_map[core]['type']='NRT'
           core_data_map[core]['shard']=shard
           Logger.info(format("Found leader/active replica: {replica} (core '{core}') in {shard} on {domain}"))
         else:
@@ -334,4 +385,43 @@ def resolve_ip_to_hostname(ip):
     return host_name if host_name == fqdn_name else fqdn_name
   except socket.error:
     pass
-  return ip
\ No newline at end of file
+  return ip
+
+def create_command(command):
+  """
+  Create hdfs command. Append kinit to the command if required.
+  """
+  kinit_cmd = "{0} -kt {1} {2};".format(kinit_path_local, params.infra_solr_kerberos_keytab, params.infra_solr_kerberos_principal) if params.security_enabled else ""
+  return kinit_cmd + command
+
+def execute_commad(command):
+  """
+  Run hdfs command by infra-solr user
+  """
+  return call(command, user=params.infra_solr_user, timeout=300)
+
+def move_hdfs_folder(source_dir, target_dir):
+  cmd=create_command(format("hdfs dfs -mv {source_dir} {target_dir}"))
+  returncode, stdout = execute_commad(cmd)
+  if returncode:
+    raise Fail("Unable to move HDFS dir '{0}' to '{1}' (return code: {2})".format(source_dir, target_dir, str(returncode)))
+  return stdout.strip()
+
+def check_hdfs_folder_exists(hdfs_dir):
+  """
+  Check that hdfs folder exists or not
+  """
+  cmd=create_command(format("hdfs dfs -ls {hdfs_dir}"))
+  returncode, stdout = execute_commad(cmd)
+  if returncode:
+    return False
+  return True
+
+def check_folder_exists(dir):
+  """
+  Check that folder exists or not
+  """
+  returncode, stdout = call(format("test -d {dir}"), user=params.infra_solr_user, timeout=300)
+  if returncode:
+    return False
+  return True

-- 
To stop receiving notification emails like this one, please contact
oleewere@apache.org.