You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/05/13 18:20:55 UTC
[ambari] 02/02: AMBARI-23822. Infra Solr: Add restore support if
index is on HDFS.
This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
commit c17b9f8d093447274ac2a9b90f705f3b5a943b17
Author: Oliver Szabo <ol...@gmail.com>
AuthorDate: Sun May 13 19:12:56 2018 +0200
AMBARI-23822. Infra Solr: Add restore support if index is on HDFS.
---
.../src/main/python/migrationHelper.py | 7 ++
.../AMBARI_INFRA_SOLR/0.1.0/metainfo.xml | 3 +-
.../0.1.0/package/scripts/collection.py | 121 ++++++++++++++++-----
.../0.1.0/package/scripts/command_commons.py | 96 +++++++++++++++-
4 files changed, 194 insertions(+), 33 deletions(-)
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
index 8c7f6b1..4d7f1d2 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
@@ -148,6 +148,10 @@ def fill_parameters(options):
params['solr_skip_cores'] = options.skip_cores
if options.solr_shards:
params['solr_shards'] = options.solr_shards
+ if options.solr_hdfs_path:
+ params['solr_hdfs_path'] = options.solr_hdfs_path
+ if options.solr_keep_backup:
+ params['solr_keep_backup'] = True
return params
def validte_common_options(options, parser):
@@ -235,6 +239,9 @@ if __name__=="__main__":
parser.add_option("--core-filter", dest="core_filter", default=None, type="string", help="core filter for replica folders")
parser.add_option("--skip-cores", dest="skip_cores", default=None, type="string", help="specific cores to skip (comma separated)")
parser.add_option("--shards", dest="solr_shards", type="int", default=0, help="number of shards (required to set properly for restore)")
+ parser.add_option("--solr-hdfs-path", dest="solr_hdfs_path", type="string", default=None, help="Base path of Solr (where collections are located) if HDFS is used (like /user/infra-solr)")
+ parser.add_option("--solr-keep-backup", dest="solr_keep_backup", default=False, action="store_true", help="If it is turned on, Snapshot Solr data will not be deleted from the filesystem during restore.")
+
(options, args) = parser.parse_args()
protocol = 'https' if options.ssl else 'http'
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/metainfo.xml b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/metainfo.xml
index 015a7bb..de75d05 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/metainfo.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/metainfo.xml
@@ -58,7 +58,8 @@
<commandScript>
<script>scripts/infra_solr.py</script>
<scriptType>PYTHON</scriptType>
- <timeout>1200</timeout>
+ <background>true</background>
+ <timeout>36000</timeout>
</commandScript>
</customCommand>
<customCommand>
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
index e32bc1f..4f51071 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
@@ -105,9 +105,11 @@ def restore_collection(env):
if command_commons.collection in ["ranger_audits", "history", "hadoop_logs", "audit_logs",
"vertex_index", "edge_index",
"fulltext_index"]: # Make sure ambari wont delete an important collection
- raise Exeption(format(
+ raise Exception(format(
"Selected collection for restore is: {collection}. It is not recommended to restore on default collections."))
+ hdfs_cores_on_host=[]
+
for core_data in core_pairs:
src_core = core_data['src_core']
target_core = core_data['target_core']
@@ -124,16 +126,26 @@ def restore_collection(env):
core_root_dir = format("{solr_datadir}/backup_{target_core}")
core_root_without_backup_dir = format("{solr_datadir}/{target_core}")
- Directory([format("{core_root_dir}/data/index"),
- format("{core_root_dir}/data/tlog"),
- format("{core_root_dir}/data/snapshot_metadata")],
- mode=0755,
- cd_access='a',
- create_parents=True,
- owner=params.infra_solr_user,
- group=params.user_group,
- only_if=only_if_cmd
- )
+ if command_commons.solr_hdfs_path:
+ Directory([core_root_dir],
+ mode=0755,
+ cd_access='a',
+ create_parents=True,
+ owner=params.infra_solr_user,
+ group=params.user_group,
+ only_if=only_if_cmd
+ )
+ else:
+ Directory([format("{core_root_dir}/data/index"),
+ format("{core_root_dir}/data/tlog"),
+ format("{core_root_dir}/data/snapshot_metadata")],
+ mode=0755,
+ cd_access='a',
+ create_parents=True,
+ owner=params.infra_solr_user,
+ group=params.user_group,
+ only_if=only_if_cmd
+ )
core_details = core_data[target_core]
core_properties = {}
@@ -144,15 +156,47 @@ def restore_collection(env):
core_properties['collection'] = command_commons.collection
core_properties['coreNodeName'] = core_details['node']
core_properties['shard'] = core_details['shard']
+ if command_commons.solr_hdfs_path:
+ hdfs_solr_node_folder=command_commons.solr_hdfs_path + format("/backup_{collection}/") + core_details['node']
+ source_folder=format("{index_location}/snapshot.{src_core}/")
+ if command_commons.check_folder_exists(source_folder):
+ hdfs_cores_on_host.append(target_core)
+ command_commons.HdfsResource(format("{hdfs_solr_node_folder}/data/index/"),
+ type="directory",
+ action="create_on_execute",
+ source=source_folder,
+ owner=params.infra_solr_user,
+ mode=0755,
+ recursive_chown=True,
+ recursive_chmod=True
+ )
+ command_commons.HdfsResource(format("{hdfs_solr_node_folder}/data/tlog"),
+ type="directory",
+ action="create_on_execute",
+ owner=params.infra_solr_user,
+ mode=0755
+ )
+ command_commons.HdfsResource(format("{hdfs_solr_node_folder}/data/snapshot_metadata"),
+ type="directory",
+ action="create_on_execute",
+ owner=params.infra_solr_user,
+ mode=0755
+ )
+ if command_commons.solr_keep_backup:
+ Directory(format("{index_location}/snapshot.{src_core}"),
+ action="delete",
+ only_if=only_if_cmd,
+ owner=params.infra_solr_user)
+ else:
+ copy_cmd = format(
+ "mv {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/") if command_commons.solr_keep_backup \
+ else format("cp -r {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/")
+ Execute(
+ copy_cmd, only_if=only_if_cmd,
+ user=params.infra_solr_user,
+ logoutput=True
+ )
- copy_cmd = format(
- "mv {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/") if command_commons.solr_keep_backup \
- else format("cp -r {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/")
- Execute(
- copy_cmd, only_if=only_if_cmd,
- user=params.infra_solr_user,
- logoutput=True
- )
PropertiesFile(
core_root_dir + '/core.properties',
properties=core_properties,
@@ -165,7 +209,6 @@ def restore_collection(env):
Execute(format("rm -rf {solr_datadir}/{collection}*"),
user=params.infra_solr_user,
logoutput=True)
-
for core_data in core_pairs:
src_core = core_data['src_core']
target_core = core_data['target_core']
@@ -179,6 +222,25 @@ def restore_collection(env):
core_root_dir = format("{solr_datadir}/backup_{target_core}")
core_root_without_backup_dir = format("{solr_datadir}/{target_core}")
+
+ if command_commons.solr_hdfs_path:
+ if target_core in hdfs_cores_on_host:
+ Logger.info(format("Core data '{target_core}' is located on this host, processing..."))
+ core_data=host_cores_map[command_commons.CORE_DATA]
+ core_details=core_data[target_core]
+ core_node=core_details['node']
+ collection_core_dir=command_commons.solr_hdfs_path + format("/{collection}/{core_node}")
+ backup_collection_core_dir=command_commons.solr_hdfs_path + format("/backup_{collection}/{core_node}")
+ command_commons.HdfsResource(collection_core_dir,
+ type="directory",
+ action="delete_on_execute",
+ owner=params.infra_solr_user
+ )
+ if command_commons.check_hdfs_folder_exists(backup_collection_core_dir):
+ command_commons.move_hdfs_folder(backup_collection_core_dir, collection_core_dir)
+ else:
+ Logger.info(format("Core data '{target_core}' is not located on this host, skipping..."))
+
Execute(
format("mv {core_root_dir} {core_root_without_backup_dir}"),
user=params.infra_solr_user,
@@ -186,12 +248,13 @@ def restore_collection(env):
only_if=format("test -d {core_root_dir}")
)
- Directory([format("{core_root_without_backup_dir}")],
- mode=0755,
- cd_access='a',
- create_parents=True,
- owner=params.infra_solr_user,
- group=params.user_group,
- recursive_ownership=True,
- only_if=format("test -d {core_root_without_backup_dir}")
- )
+ Directory(
+ [format("{core_root_without_backup_dir}")],
+ mode=0755,
+ cd_access='a',
+ create_parents=True,
+ owner=params.infra_solr_user,
+ group=params.user_group,
+ recursive_ownership=True,
+ only_if=format("test -d {core_root_without_backup_dir}")
+ )
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
index f7dc92e..a8a17e7 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
@@ -24,10 +24,12 @@ import socket
import time
import traceback
+from resource_management.core.shell import call
from resource_management.core.logger import Logger
from resource_management.core.resources.system import Execute
from resource_management.libraries.functions.default import default
from resource_management.libraries.functions.format import format
+from resource_management.libraries.resources.hdfs_resource import HdfsResource
index_helper_script = '/usr/lib/ambari-infra-solr-client/solrIndexHelper.sh'
@@ -81,8 +83,54 @@ solr_keep_backup=default("/commandParams/solr_keep_backup", False)
solr_num_shards = int(default("/commandParams/solr_shards", "0"))
+solr_hdfs_path=default("/commandParams/solr_hdfs_path", None)
+
if solr_num_shards == 0:
- raise Exeption(format("The 'solr_shards' command parameter is required to set."))
+ raise Exception(format("The 'solr_shards' command parameter is required to set."))
+
+if solr_hdfs_path:
+
+ import functools
+ from resource_management.libraries.functions import conf_select
+ from resource_management.libraries.functions import stack_select
+ from resource_management.libraries.functions import get_klist_path
+ from resource_management.libraries.functions import get_kinit_path
+ from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources
+
+ klist_path_local = get_klist_path(default('/configurations/kerberos-env/executable_search_paths', None))
+ kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
+
+ # hadoop default parameters
+ hdfs_user = params.config['configurations']['hadoop-env']['hdfs_user']
+ hadoop_bin = stack_select.get_hadoop_dir("sbin")
+ hadoop_bin_dir = stack_select.get_hadoop_dir("bin")
+ hadoop_conf_dir = conf_select.get_hadoop_conf_dir()
+ hadoop_conf_secure_dir = os.path.join(hadoop_conf_dir, "secure")
+ hadoop_lib_home = stack_select.get_hadoop_dir("lib")
+ hdfs_principal_name = default('/configurations/hadoop-env/hdfs_principal_name', None)
+ hdfs_user_keytab = params.config['configurations']['hadoop-env']['hdfs_user_keytab']
+
+ dfs_type = default("/commandParams/dfs_type", "")
+
+ hdfs_site = params.config['configurations']['hdfs-site']
+ default_fs = params.config['configurations']['core-site']['fs.defaultFS']
+ #create partial functions with common arguments for every HdfsResource call
+ #to create/delete/copyfromlocal hdfs directories/files we need to call params.HdfsResource in code
+ HdfsResource = functools.partial(
+ HdfsResource,
+ user=params.infra_solr_user,
+ hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore",
+ security_enabled = params.security_enabled,
+ keytab = hdfs_user_keytab,
+ kinit_path_local = kinit_path_local,
+ hadoop_bin_dir = hadoop_bin_dir,
+ hadoop_conf_dir = hadoop_conf_dir,
+ principal_name = hdfs_principal_name,
+ hdfs_site = hdfs_site,
+ default_fs = default_fs,
+ immutable_paths = get_not_managed_resources(),
+ dfs_type = dfs_type
+ )
if params.security_enabled:
keytab = params.infra_solr_kerberos_keytab
@@ -255,7 +303,10 @@ def __read_host_cores_from_clusterstate_json(json_zk_state_path, json_host_cores
core_host_map[core]=domain
core_data_map[core]['host']=domain
core_data_map[core]['node']=replica
- core_data_map[core]['type']=core_data['type']
+ if 'type' in core_data:
+ core_data_map[core]['type']=core_data['type']
+ else:
+ core_data_map[core]['type']='NRT'
core_data_map[core]['shard']=shard
Logger.info(format("Found leader/active replica: {replica} (core '{core}') in {shard} on {domain}"))
else:
@@ -334,4 +385,43 @@ def resolve_ip_to_hostname(ip):
return host_name if host_name == fqdn_name else fqdn_name
except socket.error:
pass
- return ip
\ No newline at end of file
+ return ip
+
+def create_command(command):
+ """
+ Create hdfs command. Append kinit to the command if required.
+ """
+ kinit_cmd = "{0} -kt {1} {2};".format(kinit_path_local, params.infra_solr_kerberos_keytab, params.infra_solr_kerberos_principal) if params.security_enabled else ""
+ return kinit_cmd + command
+
+def execute_commad(command):
+ """
+ Run hdfs command by infra-solr user
+ """
+ return call(command, user=params.infra_solr_user, timeout=300)
+
+def move_hdfs_folder(source_dir, target_dir):
+ cmd=create_command(format("hdfs dfs -mv {source_dir} {target_dir}"))
+ returncode, stdout = execute_commad(cmd)
+ if returncode:
+ raise Fail("Unable to move HDFS dir '{0}' to '{1}' (return code: {2})".format(source_dir, target_dir, str(returncode)))
+ return stdout.strip()
+
+def check_hdfs_folder_exists(hdfs_dir):
+ """
+ Check that hdfs folder exists or not
+ """
+ cmd=create_command(format("hdfs dfs -ls {hdfs_dir}"))
+ returncode, stdout = execute_commad(cmd)
+ if returncode:
+ return False
+ return True
+
+def check_folder_exists(dir):
+ """
+ Check that folder exists or not
+ """
+ returncode, stdout = call(format("test -d {dir}"), user=params.infra_solr_user, timeout=300)
+ if returncode:
+ return False
+ return True
--
To stop receiving notification emails like this one, please contact
oleewere@apache.org.