You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by rl...@apache.org on 2017/10/02 20:39:20 UTC
[09/50] [abbrv] ambari git commit: AMBARI-22061. Solr Data Manager
script should provide non-destructive archive download option (mgergely)
AMBARI-22061. Solr Data Manager script should provide non-destructive archive download option (mgergely)
Change-Id: I2fd9ef9e2de840e25649feb5f39b95dd36e7cc7e
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9d802b7c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9d802b7c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9d802b7c
Branch: refs/heads/branch-feature-AMBARI-20859
Commit: 9d802b7c11a62336fd4de8aa2695af02b061c625
Parents: dc419b4
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Wed Sep 27 11:15:01 2017 +0200
Committer: Miklos Gergely <mg...@hortonworks.com>
Committed: Wed Sep 27 11:15:11 2017 +0200
----------------------------------------------------------------------
.../src/main/python/solrDataManager.py | 76 +++++++++++---------
1 file changed, 43 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/9d802b7c/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
index e0356bb..2675bd9 100644
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
@@ -47,7 +47,7 @@ verbose = False
def parse_arguments():
parser = optparse.OptionParser("usage: %prog [options]", version="Solr Data Manager {0}".format(VERSION))
- parser.add_option("-m", "--mode", dest="mode", type="string", help="delete | save")
+ parser.add_option("-m", "--mode", dest="mode", type="string", help="archive | delete | save")
parser.add_option("-s", "--solr-url", dest="solr_url", type="string", help="the url of the solr server including the port")
parser.add_option("-c", "--collection", dest="collection", type="string", help="the name of the solr collection")
parser.add_option("-f", "--filter-field", dest="filter_field", type="string", help="the name of the field to filter on")
@@ -98,14 +98,14 @@ def parse_arguments():
parser.print_help()
sys.exit()
- mode_values = ["delete", "save"]
+ mode_values = ["archive", "delete", "save"]
if options.mode not in mode_values:
print "mode must be one of {0}".format(" | ".join(mode_values))
parser.print_help()
sys.exit()
if options.mode == "delete":
- for r in ["name", "compression", "hdfs_keytab", "hdfs_principal", "hdfs_user", "hdfs_path", "key_file_path", "bucket", "key_prefix", "local_path"]:
+ for r in ["name", "hdfs_keytab", "hdfs_principal", "hdfs_user", "hdfs_path", "key_file_path", "bucket", "key_prefix", "local_path"]:
if options.__dict__[r] is not None:
print "argument '{0}' may not be specified in delete mode".format(r)
parser.print_help()
@@ -153,7 +153,7 @@ def parse_arguments():
parser.print_help()
sys.exit()
- if options.mode == "save":
+ if options.mode in ["archive", "save"]:
count = (1 if is_any_hdfs_property else 0) + (1 if is_any_s3_property else 0) + \
(1 if options.__dict__["local_path"] is not None else 0)
if count != 1:
@@ -171,7 +171,7 @@ def parse_arguments():
print(" solr-url: " + options.solr_url)
print(" collection: " + options.collection)
print(" filter-field: " + options.filter_field)
- if options.mode == "save":
+ if options.mode in ["archive", "save"]:
print(" id-field: " + options.id_field)
if options.__dict__["end"] is not None:
print(" end: " + options.end)
@@ -182,14 +182,14 @@ def parse_arguments():
print(" additional-filter: " + str(options.additional_filter))
if options.__dict__["name"] is not None:
print(" name: " + str(options.name))
- if options.mode == "save":
+ if options.mode in ["archive", "save"]:
print(" read-block-size: " + str(options.read_block_size))
print(" write-block-size: " + str(options.write_block_size))
print(" ignore-unfinished-uploading: " + str(options.ignore_unfinished_uploading))
if (options.__dict__["solr_keytab"] is not None):
print(" solr-keytab: " + options.solr_keytab)
print(" solr-principal: " + options.solr_principal)
- if options.mode == "save":
+ if options.mode in ["archive", "save"]:
print(" output: " + ("json" if options.json_file else "line-delimited-json"))
print(" compression: " + options.compression)
if (options.__dict__["hdfs_keytab"] is not None):
@@ -251,7 +251,7 @@ def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal)
query_solr(solr_kinit_command, delete_command, "{0} {1}".format(curl_prefix, delete_command), "Deleting")
-def save(solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size,
+def save(mode, solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size,
ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, json_file,
compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
solr_kinit_command = None
@@ -269,9 +269,11 @@ def save(solr_url, collection, filter_field, id_field, range_end, read_block_siz
ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path)
working_dir = get_working_dir(solr_url, collection)
- handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading)
- save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field, range_end,
- read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression,
+ if mode == "archive":
+ handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading)
+
+ save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
+ range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression,
hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path)
def ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path):
@@ -341,7 +343,7 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre
os.remove(command_json_path)
-def save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
+def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file,
compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
logger.info("Starting to save data")
@@ -370,9 +372,9 @@ def save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, col
prev_lot_end_id = results[3]
if records > 0:
- upload_block(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
- working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
- key_file_path, bucket, key_prefix, local_path, compression)
+ upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
+ id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user,
+ hdfs_path, key_file_path, bucket, key_prefix, local_path, compression)
total_records += records
logger.info("A total of %d records are saved", total_records)
@@ -441,8 +443,8 @@ def finish_file(tmp_file, json_file):
if json_file:
tmp_file.write("\n}")
-def upload_block(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
- working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
+def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
+ id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
key_file_path, bucket, key_prefix, local_path, compression):
if name:
file_name = "{0}_-_{1}_-_{2}_-_{3}".format(collection, name, prev_lot_end_value, prev_lot_end_id).replace(':', '_')
@@ -451,9 +453,9 @@ def upload_block(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url,
upload_file_path = compress_file(working_dir, tmp_file_path, file_name, compression)
- upload_command = create_command_file(True, working_dir, upload_file_path, solr_url, collection, filter_field, id_field,
- prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket,
- key_prefix, local_path)
+ upload_command = create_command_file(mode, True, working_dir, upload_file_path, solr_url, collection, filter_field,
+ id_field, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
+ key_file_path, bucket, key_prefix, local_path)
if hdfs_user:
upload_file_hdfs(hdfs_kinit_command, upload_command, upload_file_path, hdfs_path, hdfs_user)
elif key_file_path:
@@ -464,11 +466,12 @@ def upload_block(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url,
logger.warn("Unknown upload destination")
sys.exit()
- delete_command = create_command_file(False, working_dir, upload_file_path, solr_url, collection, filter_field, id_field,
- prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None)
- delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value, prev_lot_end_id)
-
- os.remove("{0}/command.json".format(working_dir))
+ delete_command = create_command_file(mode, False, working_dir, upload_file_path, solr_url, collection, filter_field,
+ id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None)
+ if mode == "archive":
+ delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
+ prev_lot_end_id)
+ os.remove("{0}/command.json".format(working_dir))
def compress_file(working_dir, tmp_file_path, file_name, compression):
data_file_name = "{0}.json".format(file_name)
@@ -511,8 +514,9 @@ def compress_file(working_dir, tmp_file_path, file_name, compression):
return upload_file_path
-def create_command_file(upload, working_dir, upload_file_path, solr_url, collection, filter_field, id_field, prev_lot_end_value,
- prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
+def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, collection, filter_field, id_field,
+ prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix,
+ local_path):
commands = {}
if upload:
@@ -551,6 +555,9 @@ def create_command_file(upload, working_dir, upload_file_path, solr_url, collect
else:
logger.warn("Unknown upload destination")
sys.exit()
+
+ if mode == "save":
+ return upload_command
delete_prev = "{0}:[*+TO+\"{1}\"]".format(filter_field, prev_lot_end_value)
@@ -558,6 +565,9 @@ def create_command_file(upload, working_dir, upload_file_path, solr_url, collect
delete_query = quote("{0}+OR+{1}".format(delete_prev, delete_last), safe="/+\"*")
delete_command = "{0}/{1}/update?stream.body=<delete><query>{2}</query></delete>&commit=true&wt=json" \
.format(solr_url, collection, delete_query)
+ if mode == "save":
+ return delete_command
+
delete_command_data = {}
delete_command_data["command"] = delete_command
delete_command_data["collection"] = collection
@@ -710,12 +720,12 @@ if __name__ == '__main__':
if options.mode == "delete":
delete(options.solr_url, options.collection, options.filter_field, end, options.solr_keytab, options.solr_principal)
- elif options.mode == "save":
- save(options.solr_url, options.collection, options.filter_field, options.id_field, end, options.read_block_size,
- options.write_block_size, options.ignore_unfinished_uploading, options.additional_filter, options.name,
- options.solr_keytab, options.solr_principal, options.json_file, options.compression,
- options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path, options.key_file_path,
- options.bucket, options.key_prefix, options.local_path)
+ elif options.mode in ["archive", "save"]:
+ save(options.mode, options.solr_url, options.collection, options.filter_field, options.id_field, end,
+ options.read_block_size, options.write_block_size, options.ignore_unfinished_uploading,
+ options.additional_filter, options.name, options.solr_keytab, options.solr_principal, options.json_file,
+ options.compression, options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path,
+ options.key_file_path, options.bucket, options.key_prefix, options.local_path)
else:
logger.warn("Unknown mode: %s", options.mode)