You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/06/08 18:39:30 UTC
[ambari] branch trunk updated: AMBARI-23945. Infra Solr migration -
restored data can be deleted on Solr startup (if data on hdfs)
This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5529b29 AMBARI-23945. Infra Solr migration - restored data can be deleted on Solr startup (if data on hdfs)
5529b29 is described below
commit 5529b29b0ba8e20bac7a69ef541539cc9085e709
Author: Oliver Szabo <ol...@gmail.com>
AuthorDate: Fri Jun 8 20:37:30 2018 +0200
AMBARI-23945. Infra Solr migration - restored data can be deleted on Solr startup (if data on hdfs)
---
.../libraries/functions/solr_cloud_util.py | 15 +++
ambari-infra/ambari-infra-solr-client/build.xml | 5 +
.../src/main/python/migrationHelper.py | 113 ++++++++++++++++++++-
.../src/main/resources/data/.keep | 15 +++
.../0.1.0/package/scripts/collection.py | 59 +++++++----
.../0.1.0/package/scripts/command_commons.py | 23 +++--
6 files changed, 203 insertions(+), 27 deletions(-)
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/solr_cloud_util.py b/ambari-common/src/main/python/resource_management/libraries/functions/solr_cloud_util.py
index bffbebd..a6ed9b3 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/solr_cloud_util.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/solr_cloud_util.py
@@ -199,6 +199,21 @@ def remove_admin_handlers(zookeeper_quorum, solr_znode, java64_home, collection,
remove_admin_handlers_cmd = format('{solr_cli_prefix} --remove-admin-handlers --collection {collection} --retry {retry} --interval {interval}')
Execute(remove_admin_handlers_cmd)
+def copy_solr_znode(zookeeper_quorum, solr_znode, java64_home, jaas_file, src_znode, target_znode, retry = 5, interval = 10, java_opts=None):
+ solr_cli_prefix = __create_solr_cloud_cli_prefix(zookeeper_quorum, solr_znode, java64_home, java_opts, jaas_file)
+ copy_znode_cmd = format("{solr_cli_prefix} --transfer-znode --copy-src {src_znode} --copy-dest {target_znode} --retry {retry} --interval {interval}")
+ Execute(copy_znode_cmd)
+
+def copy_solr_znode_to_locat(zookeeper_quorum, solr_znode, java64_home, jaas_file, src_znode, target, retry = 5, interval = 10, java_opts=None):
+ solr_cli_prefix = __create_solr_cloud_cli_prefix(zookeeper_quorum, solr_znode, java64_home, java_opts, jaas_file)
+ copy_znode_cmd = format("{solr_cli_prefix} --transfer-znode --transfer-mode copyToLocal --copy-src {src_znode} --copy-dest {target} --retry {retry} --interval {interval}")
+ Execute(copy_znode_cmd)
+
+def copy_solr_znode_from_local(zookeeper_quorum, solr_znode, java64_home, jaas_file, src, target_znode, retry = 5, interval = 10, java_opts=None):
+ solr_cli_prefix = __create_solr_cloud_cli_prefix(zookeeper_quorum, solr_znode, java64_home, java_opts, jaas_file)
+ copy_znode_cmd = format("{solr_cli_prefix} --transfer-znode --transfer-mode copyFromLocal --copy-src {src} --copy-dest {target_znode} --retry {retry} --interval {interval}")
+ Execute(copy_znode_cmd)
+
def default_config(config, name, default_value):
subdicts = filter(None, name.split('/'))
if not config:
diff --git a/ambari-infra/ambari-infra-solr-client/build.xml b/ambari-infra/ambari-infra-solr-client/build.xml
index 866e323..71c59e7 100644
--- a/ambari-infra/ambari-infra-solr-client/build.xml
+++ b/ambari-infra/ambari-infra-solr-client/build.xml
@@ -35,6 +35,7 @@
<fileset file="target/*.jar"/>
</copy>
<mkdir dir="target/migrate"/>
+ <mkdir dir="target/migrate/data"/>
<get src="${lucene6-core.url}" dest="target/migrate/${lucene6-core-jar.name}" usetimestamp="true"/>
<get src="${lucene6-backward-codecs.url}" dest="target/migrate/${lucene6-backward-codecs-jar.name}" usetimestamp="true"/>
<copy todir="target/package/migrate" includeEmptyDirs="no">
@@ -42,6 +43,10 @@
<fileset file="target/package/libs/lucene-core-${solr.version}.jar"/>
<fileset file="target/package/libs/lucene-backward-codecs-${solr.version}.jar"/>
<fileset file="src/main/resources/managed-schema"/>
+ <fileset file="src/main/resources/managed-schema"/>
+ </copy>
+ <copy todir="target/package/migrate/data" includeEmptyDirs="no">
+ <fileset file="src/main/resources/data/.keep"/>
</copy>
<copy todir="target/package" includeEmptyDirs="no">
<fileset file="src/main/resources/solrCloudCli.sh"/>
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
index 4a971b2..29ae933 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
@@ -18,6 +18,8 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
+import copy
+import glob
import logging
import os
import sys
@@ -800,8 +802,84 @@ def upgrade_ranger_solrconfig_xml(options, config, service_filter):
solr_znode=config.get('infra_solr', 'znode')
ranger_config_set_name = config.get('ranger_collection', 'ranger_config_set_name')
backup_ranger_config_set_name = config.get('ranger_collection', 'backup_ranger_config_set_name')
- copy_znode(options, config, "{0}/configs/solrconfig.xml".format(solr_znode, ranger_config_set_name),
- "{0}/configs/solrconfig.xml".format(solr_znode, backup_ranger_config_set_name))
+ copy_znode(options, config, "{0}/configs/{1}/solrconfig.xml".format(solr_znode, ranger_config_set_name),
+ "{0}/configs/{1}/solrconfig.xml".format(solr_znode, backup_ranger_config_set_name))
+
+def update_state_json(collection, config, options):
+ solr_znode='/infra-solr'
+ if config.has_section('infra_solr') and config.has_option('infra_solr', 'znode'):
+ solr_znode=config.get('infra_solr', 'znode')
+ coll_data_dir = "{0}migrate/data/{1}".format(INFRA_SOLR_CLIENT_BASE_PATH, collection)
+ if not os.path.exists(coll_data_dir):
+ os.makedirs(coll_data_dir)
+
+ copy_znode(options, config, "{0}/collections/{1}/state.json".format(solr_znode, collection), "{0}/state.json".format(coll_data_dir), copy_to_local=True)
+ copy_znode(options, config, "{0}/restore_metadata/{1}".format(solr_znode, collection), "{0}".format(coll_data_dir), copy_to_local=True)
+
+ json_file_list=glob.glob("{0}/*.json".format(coll_data_dir))
+ logger.debug("Downloaded json files list: {0}".format(str(json_file_list)))
+
+ cores_data_json_list = [k for k in json_file_list if 'state.json' not in k]
+ state_json_list = [k for k in json_file_list if 'state.json' in k]
+
+ if not cores_data_json_list:
+ raise Exception('Cannot find any downloaded restore core metadata for {0}'.format(collection))
+ if not state_json_list:
+ raise Exception('Cannot find any downloaded restore collection state metadata for {0}'.format(collection))
+
+ state_json_file=state_json_list[0]
+ state_data = read_json(state_json_file)
+ core_json_data=[]
+ for core_data_json_file in cores_data_json_list:
+ core_json_data.append(read_json(core_data_json_file))
+
+ logger.debug("collection data content: {0}".format(str(state_data)))
+ core_details={}
+ for core in core_json_data:
+ core_details[core['core_node']]=core
+ logger.debug("core data contents: {0}".format(str(core_details)))
+
+ collection_data = state_data[collection]
+ shards = collection_data['shards']
+ new_state_json_data=copy.deepcopy(state_data)
+
+ for shard in shards:
+ replicas = shards[shard]['replicas']
+ for replica in replicas:
+ core_data = replicas[replica]
+ core = core_data['core']
+ base_url = core_data['base_url']
+ node_name = core_data['node_name']
+ data_dir = core_data['dataDir'] if 'dataDir' in core_data else None
+ ulog_dir = core_data['ulogDir'] if 'ulogDir' in core_data else None
+
+ if replica in core_details:
+ old_core_node=core_details[replica]['core_node']
+ new_core_node=core_details[replica]['new_core_node']
+
+ new_state_core = copy.deepcopy(state_data[collection]['shards'][shard]['replicas'][replica])
+ new_state_json_data[collection]['shards'][shard]['replicas'][new_core_node]=new_state_core
+ if old_core_node != new_core_node:
+ if old_core_node in new_state_json_data[collection]['shards'][shard]['replicas']:
+ del new_state_json_data[collection]['shards'][shard]['replicas'][old_core_node]
+ if data_dir:
+ new_state_json_data[collection]['shards'][shard]['replicas'][new_core_node]['dataDir']=data_dir.replace(old_core_node, new_core_node)
+ if ulog_dir:
+ new_state_json_data[collection]['shards'][shard]['replicas'][new_core_node]['ulogDir']=ulog_dir.replace(old_core_node, new_core_node)
+ old_host=core_details[replica]['old_host']
+ new_host=core_details[replica]['new_host']
+ if old_host != new_host and old_core_node != new_core_node:
+ new_state_json_data[collection]['shards'][shard]['replicas'][new_core_node]['base_url']=base_url.replace(old_host, new_host)
+ new_state_json_data[collection]['shards'][shard]['replicas'][new_core_node]['node_name']=node_name.replace(old_host, new_host)
+ elif old_host != new_host:
+ new_state_json_data[collection]['shards'][shard]['replicas'][replica]['base_url']=base_url.replace(old_host, new_host)
+ new_state_json_data[collection]['shards'][shard]['replicas'][replica]['node_name']=node_name.replace(old_host, new_host)
+
+ with open("{0}/new_state.json".format(coll_data_dir), 'w') as outfile:
+ json.dump(new_state_json_data, outfile)
+
+ copy_znode(options, config, "{0}/new_state.json".format(coll_data_dir), "{0}/collections/{1}/state.json".format(solr_znode, collection), copy_from_local=True)
+
def delete_znodes(options, config, service_filter):
solr_znode='/infra-solr'
@@ -1113,6 +1191,33 @@ def rolling_restart_solr(options, accessor, parser, config):
post_json(accessor, BATCH_REQUEST_API_URL.format(cluster), request_body)
print "Rolling Restart Infra Solr Instances request sent. (check Ambari UI about the requests)"
+def update_state_jsons(options, accessor, parser, config, service_filter):
+ solr_urls = get_solr_urls(config)
+ collections=retry(list_collections, options, config, solr_urls, context="[List Solr Collections]")
+ collections=filter_collections(options, collections)
+ if is_ranger_available(config, service_filter):
+ backup_ranger_collection = config.get('ranger_collection', 'backup_ranger_collection_name')
+ if backup_ranger_collection in collections:
+ update_state_json(backup_ranger_collection, config, options)
+ else:
+ print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_ranger_collection)
+ if is_atlas_available(config, service_filter):
+ backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name')
+ if backup_fulltext_index_name in collections:
+ update_state_json(backup_fulltext_index_name, config, options)
+ else:
+ print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_fulltext_index_name)
+ backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name')
+ if backup_edge_index_name in collections:
+ update_state_json(backup_edge_index_name, config, options)
+ else:
+ print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_edge_index_name)
+ backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name')
+ if backup_vertex_index_name in collections:
+ update_state_json(backup_vertex_index_name, config, options)
+ else:
+ print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_fulltext_index_name)
+
if __name__=="__main__":
parser = optparse.OptionParser("usage: %prog [options]")
@@ -1187,6 +1292,10 @@ if __name__=="__main__":
upgrade_ranger_solrconfig_xml(options, config, service_filter)
create_backup_collections(options, accessor, parser, config, service_filter)
restore_collections(options, accessor, parser, config, service_filter)
+ update_state_jsons(options, accessor, parser, config, service_filter)
+ elif options.action.lower() == 'update-collection-state':
+ update_state_jsons(options, accessor, parser, config, service_filter)
+ elif options.action.lower() == 'reload':
reload_collections(options, accessor, parser, config, service_filter)
elif options.action.lower() == 'migrate':
migrate_snapshots(options, accessor, parser, config, service_filter)
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/resources/data/.keep b/ambari-infra/ambari-infra-solr-client/src/main/resources/data/.keep
new file mode 100644
index 0000000..5c8bc35
--- /dev/null
+++ b/ambari-infra/ambari-infra-solr-client/src/main/resources/data/.keep
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+echo "Keep this empty file"
\ No newline at end of file
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
index 46c4e0e..3d83e14 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
@@ -16,13 +16,14 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
+import os
import time
from resource_management.core.logger import Logger
from resource_management.core.resources.system import Directory, Execute, File
from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions import solr_cloud_util
from resource_management.libraries.resources.properties_file import PropertiesFile
-
def backup_collection(env):
"""
Backup collections using replication API (as Solr Cloud Backup API is not available in Solr 5)
@@ -158,7 +159,10 @@ def restore_collection(env):
core_properties['name'] = target_core
core_properties['replicaType'] = core_details['type']
core_properties['collection'] = command_commons.collection
- core_properties['coreNodeName'] = core_details['node']
+ if command_commons.solr_hdfs_path:
+ core_properties['coreNodeName'] = 'backup_' + core_details['node']
+ else:
+ core_properties['coreNodeName'] = core_details['node']
core_properties['shard'] = core_details['shard']
if command_commons.solr_hdfs_path:
hdfs_solr_node_folder=command_commons.solr_hdfs_path + format("/backup_{collection}/") + core_details['node']
@@ -174,30 +178,21 @@ def restore_collection(env):
recursive_chown=True,
recursive_chmod=True
)
- command_commons.HdfsResource(None, action="execute")
command_commons.HdfsResource(format("{hdfs_solr_node_folder}/data/tlog"),
type="directory",
action="create_on_execute",
owner=params.infra_solr_user,
mode=0755
)
- command_commons.HdfsResource(None, action="execute")
command_commons.HdfsResource(format("{hdfs_solr_node_folder}/data/snapshot_metadata"),
type="directory",
action="create_on_execute",
owner=params.infra_solr_user,
mode=0755
)
- command_commons.HdfsResource(None, action="execute")
- if command_commons.solr_keep_backup:
- Directory(format("{index_location}/snapshot.{src_core}"),
- action="delete",
- only_if=only_if_cmd,
- owner=params.infra_solr_user)
else:
- copy_cmd = format(
- "mv {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/") if command_commons.solr_keep_backup \
- else format("cp -r {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/")
+ copy_cmd = format("cp -r {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/") if command_commons.solr_keep_backup \
+ else format("mv {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/")
Execute(
copy_cmd, only_if=only_if_cmd,
user=params.infra_solr_user,
@@ -218,6 +213,7 @@ def restore_collection(env):
logoutput=True)
for core_data in core_pairs:
src_core = core_data['src_core']
+ src_host = core_data['src_host']
target_core = core_data['target_core']
if src_core in command_commons.skip_cores:
@@ -227,14 +223,36 @@ def restore_collection(env):
Logger.info(format("Core '{target_core}' (target) is filtered out."))
continue
+ if os.path.exists(format("{index_location}/snapshot.{src_core}")):
+ data_to_save = {}
+ host_core_data=host_cores_map[command_commons.CORE_DATA]
+ core_details=host_core_data[target_core]
+ core_node=core_details['node']
+ data_to_save['core']=target_core
+ data_to_save['core_node']=core_node
+ data_to_save['old_host']=core_data['target_host']
+ data_to_save['new_host']=src_host
+ if command_commons.solr_hdfs_path:
+ data_to_save['new_core_node']="backup_" + core_node
+ else:
+ data_to_save['new_core_node']=core_node
+
+ command_commons.write_core_file(target_core, data_to_save)
+ jaas_file = params.infra_solr_jaas_file if params.security_enabled else None
+ core_json_location = format("{index_location}/{target_core}.json")
+ znode_json_location = format("/restore_metadata/{collection}/{target_core}.json")
+ solr_cloud_util.copy_solr_znode_from_local(params.zookeeper_quorum, params.infra_solr_znode, params.java64_home, jaas_file, core_json_location, znode_json_location)
+
core_root_dir = format("{solr_datadir}/backup_{target_core}")
core_root_without_backup_dir = format("{solr_datadir}/{target_core}")
if command_commons.solr_hdfs_path:
if target_core in hdfs_cores_on_host:
+
Logger.info(format("Core data '{target_core}' is located on this host, processing..."))
- core_data=host_cores_map[command_commons.CORE_DATA]
- core_details=core_data[target_core]
+ host_core_data=host_cores_map[command_commons.CORE_DATA]
+ core_details=host_core_data[target_core]
+
core_node=core_details['node']
collection_core_dir=command_commons.solr_hdfs_path + format("/{collection}/{core_node}")
backup_collection_core_dir=command_commons.solr_hdfs_path + format("/backup_{collection}/{core_node}")
@@ -243,9 +261,9 @@ def restore_collection(env):
action="delete_on_execute",
owner=params.infra_solr_user
)
- command_commons.HdfsResource(None, action="execute")
if command_commons.check_hdfs_folder_exists(backup_collection_core_dir):
- command_commons.move_hdfs_folder(backup_collection_core_dir, collection_core_dir)
+ collection_backup_core_dir=command_commons.solr_hdfs_path + format("/{collection}/backup_{core_node}")
+ command_commons.move_hdfs_folder(backup_collection_core_dir, collection_backup_core_dir)
else:
Logger.info(format("Core data '{target_core}' is not located on this host, skipping..."))
@@ -266,3 +284,10 @@ def restore_collection(env):
recursive_ownership=True,
only_if=format("test -d {core_root_without_backup_dir}")
)
+
+ if command_commons.solr_hdfs_path and not command_commons.solr_keep_backup:
+ only_if_cmd = format("test -d {index_location}/snapshot.{src_core}")
+ Directory(format("{index_location}/snapshot.{src_core}"),
+ action="delete",
+ only_if=only_if_cmd,
+ owner=params.infra_solr_user)
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
index 5d3e897..8051d6c 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
@@ -87,6 +87,12 @@ solr_num_shards = int(default("/commandParams/solr_shards", "0"))
solr_hdfs_path=default("/commandParams/solr_hdfs_path", None)
+keytab = None
+principal = None
+if params.security_enabled:
+ keytab = params.infra_solr_kerberos_keytab
+ principal = params.infra_solr_kerberos_principal
+
if solr_hdfs_path:
import functools
@@ -120,21 +126,17 @@ if solr_hdfs_path:
user=params.infra_solr_user,
hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore",
security_enabled = params.security_enabled,
- keytab = hdfs_user_keytab,
+ keytab = keytab,
kinit_path_local = kinit_path_local,
hadoop_bin_dir = hadoop_bin_dir,
hadoop_conf_dir = hadoop_conf_dir,
- principal_name = hdfs_principal_name,
+ principal_name = principal,
hdfs_site = hdfs_site,
default_fs = default_fs,
immutable_paths = get_not_managed_resources(),
dfs_type = dfs_type
)
-if params.security_enabled:
- keytab = params.infra_solr_kerberos_keytab
- principal = params.infra_solr_kerberos_principal
-
hostname_suffix = params.hostname.replace(".", "_")
HOST_CORES='host-cores'
@@ -318,6 +320,11 @@ def __read_host_cores_from_clusterstate_json(json_zk_state_path, json_host_cores
json.dump(json_content, outfile)
return json_content
+def write_core_file(core, core_data):
+ core_json_location = format("{index_location}/{core}.json")
+ with open(core_json_location, 'w') as outfile:
+ json.dump(core_data, outfile)
+
def __read_host_cores_from_file(json_host_cores_path):
"""
Read host cores from file, can be useful if you do not want to regenerate host core data (with that you can generate your own host core pairs for restore)
@@ -410,10 +417,10 @@ def execute_commad(command):
return call(command, user=params.infra_solr_user, timeout=300)
def move_hdfs_folder(source_dir, target_dir):
- cmd=create_command(format("hdfs dfs -mv {source_dir} {target_dir}"))
+ cmd=create_command(format('hdfs dfs -mv {source_dir} {target_dir}'))
returncode, stdout = execute_commad(cmd)
if returncode:
- raise Fail("Unable to move HDFS dir '{0}' to '{1}' (return code: {2})".format(source_dir, target_dir, str(returncode)))
+ raise Exception("Unable to move HDFS dir '{0}' to '{1}' (return code: {2})".format(source_dir, target_dir, str(returncode)))
return stdout.strip()
def check_hdfs_folder_exists(hdfs_dir):
--
To stop receiving notification emails like this one, please contact
oleewere@apache.org.