You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/06/22 10:09:34 UTC
[ambari] branch trunk updated: AMBARI-24163. Infra Solr:
solrDataManager.py should support transport data without timestamp field.
(#1594)
This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 939a33c AMBARI-24163. Infra Solr: solrDataManager.py should support transport data without timestamp field. (#1594)
939a33c is described below
commit 939a33c5590f5e26b7fa9940737dd419ae076db4
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Fri Jun 22 12:09:32 2018 +0200
AMBARI-24163. Infra Solr: solrDataManager.py should support transport data without timestamp field. (#1594)
---
.../src/main/python/solrDataManager.py | 104 ++++++++++++++-------
1 file changed, 68 insertions(+), 36 deletions(-)
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
index 5d2d2fa..b34873d 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
@@ -91,15 +91,22 @@ def parse_arguments():
parser.add_option("--solr-output-collection", dest="solr_output_collection", help="target output solr collection for archive", type="string", default=None)
parser.add_option("--exclude-fields", dest="exclude_fields", help="Comma separated list of excluded fields from json response", type="string", default=None)
+ parser.add_option("--skip-date-usage", dest="skip_date_usage", action="store_true", default=False, help="datestamp field won't be used for queries (sort based on id field)")
(options, args) = parser.parse_args()
- for r in ["mode", "solr_url", "collection", "filter_field"]:
+ for r in ["mode", "solr_url", "collection"]:
if options.__dict__[r] is None:
print "argument '{0}' is mandatory".format(r)
parser.print_help()
sys.exit()
+ if not options.skip_date_usage:
+ if options.filter_field is None:
+ print "argument 'filter_field' is mandatory"
+ parser.print_help()
+ sys.exit()
+
mode_values = ["archive", "delete", "save"]
if options.mode not in mode_values:
print "mode must be one of {0}".format(" | ".join(mode_values))
@@ -113,7 +120,7 @@ def parse_arguments():
parser.print_help()
sys.exit()
- if options.__dict__["end"] is None and options.__dict__["days"] is None or \
+ if not options.skip_date_usage and options.__dict__["end"] is None and options.__dict__["days"] is None or \
options.__dict__["end"] is not None and options.__dict__["days"] is not None:
print "exactly one of 'end' or 'days' must be specfied"
parser.print_help()
@@ -174,7 +181,8 @@ def parse_arguments():
print(" mode: " + options.mode)
print(" solr-url: " + options.solr_url)
print(" collection: " + options.collection)
- print(" filter-field: " + options.filter_field)
+ if options.__dict__["filter_field"] is not None:
+ print(" filter-field: " + options.filter_field)
if options.mode in ["archive", "save"]:
print(" id-field: " + options.id_field)
if options.__dict__["exclude_fields"] is not None:
@@ -212,6 +220,7 @@ def parse_arguments():
print(" key-prefix: " + options.key_prefix)
if (options.__dict__["local_path"] is not None):
print(" local-path: " + options.local_path)
+ print (" skip-date-usage: " + str(options.skip_date_usage))
print(" verbose: " + str(options.verbose))
print
@@ -243,7 +252,7 @@ def get_end(options):
logger.info("The end date will be: %s", end)
return end
-def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal):
+def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal, skip_date_usage):
logger.info("Deleting data where %s <= %s", filter_field, end)
solr_kinit_command = None
if solr_keytab:
@@ -251,8 +260,10 @@ def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal)
curl_prefix = "curl -k --negotiate -u : "
else:
curl_prefix = "curl -k"
-
- delete_query = "{0}:[* TO \"{1}\"]".format(filter_field, end)
+ if skip_date_usage:
+ delete_query = "*:*"
+ else:
+ delete_query = "{0}:[* TO \"{1}\"]".format(filter_field, end)
delete_command = "{0}/{1}/update?commit=true&wt=json".format(solr_url, collection)
delete_data = "<delete><query>{0}</query></delete>".format(delete_query)
@@ -261,7 +272,7 @@ def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal)
def save(mode, solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size,
ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, json_file,
compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path,
- solr_output_collection, exclude_fields):
+ solr_output_collection, exclude_fields, skip_date_usage):
solr_kinit_command = None
if solr_keytab:
solr_kinit_command = "kinit -kt {0} {1}".format(solr_keytab, solr_principal)
@@ -282,7 +293,8 @@ def save(mode, solr_url, collection, filter_field, id_field, range_end, read_blo
save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression,
- hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, exclude_fields)
+ hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, exclude_fields,
+ skip_date_usage)
def ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path):
if hdfs_kinit_command:
@@ -355,7 +367,8 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre
def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file,
- compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, exclude_fields):
+ compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection,
+ exclude_fields, skip_date_usage):
logger.info("Starting to save data")
tmp_file_path = "{0}/tmp.json".format(working_dir)
@@ -363,12 +376,19 @@ def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_ur
prev_lot_end_value = None
prev_lot_end_id = None
- if additional_filter:
- q = quote("{0}+AND+{1}:[*+TO+\"{2}\"]".format(additional_filter, filter_field, range_end), safe="/+\"*")
+ if skip_date_usage:
+ if additional_filter:
+ q = quote("*:*+AND+{0}".format(additional_filter), safe="/+\"*")
+ else:
+ q = quote("*:*", safe="/+\"*")
+ sort = quote("{0}+asc".format(id_field), safe="/+\"*")
else:
- q = quote("{0}:[*+TO+\"{1}\"]".format(filter_field, range_end), safe="/+\"*")
+ if additional_filter:
+ q = quote("{0}+AND+{1}:[*+TO+\"{2}\"]".format(additional_filter, filter_field, range_end), safe="/+\"*")
+ else:
+ q = quote("{0}:[*+TO+\"{1}\"]".format(filter_field, range_end), safe="/+\"*")
+ sort = quote("{0}+asc,{1}+asc".format(filter_field, id_field), safe="/+\"*")
- sort = quote("{0}+asc,{1}+asc".format(filter_field, id_field), safe="/+\"*")
solr_query_url_prefix = "{0}/{1}/select?q={2}&sort={3}&rows={4}&wt=json".format(solr_url, collection, q, sort, read_block_size)
exclude_field_list = exclude_fields.split(',') if exclude_fields else None
@@ -380,7 +400,7 @@ def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_ur
while not done:
results = create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field,
id_field, range_end, write_block_size, prev_lot_end_value, prev_lot_end_id, json_file,
- exclude_field_list)
+ exclude_field_list, skip_date_usage)
done = results[0]
records = results[1]
prev_lot_end_value = results[2]
@@ -389,12 +409,13 @@ def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_ur
if records > 0:
upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user,
- hdfs_path, key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection)
+ hdfs_path, key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection,
+ skip_date_usage)
total_records += records
logger.info("A total of %d records are saved", total_records)
def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field, id_field, range_end,
- write_block_size, prev_lot_end_value, prev_lot_end_id, json_file, exclude_field_list):
+ write_block_size, prev_lot_end_value, prev_lot_end_id, json_file, exclude_field_list, skip_date_usage):
if os.path.exists(tmp_file_path):
os.remove(tmp_file_path)
tmp_file = open(tmp_file_path, 'w')
@@ -404,15 +425,23 @@ def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_
records = 0
done = False
while records < write_block_size:
- if prev_lot_end_value:
- fq_prev_end_rest = "({0}:\"{1}\"+AND+{2}:{{\"{3}\"+TO+*])".format(filter_field, prev_lot_end_value, id_field,
- prev_lot_end_id)
- fq_new = "{0}:{{\"{1}\"+TO+\"{2}\"]".format(filter_field, prev_lot_end_value, range_end)
- fq = "{0}+OR+{1}".format(fq_prev_end_rest, fq_new)
+ if skip_date_usage:
+ if prev_lot_end_id:
+ fq = "({0}:{{\"{1}\"+TO+*])".format(id_field, prev_lot_end_id)
+ url = "{0}&fq={1}".format(solr_query_url_prefix, quote(fq, safe="/+\"*"))
+ else:
+ url = "{0}".format(solr_query_url_prefix)
else:
- fq = "{0}:[*+TO+\"{1}\"]".format(filter_field, range_end)
+ if prev_lot_end_value:
+ fq_prev_end_rest = "({0}:\"{1}\"+AND+{2}:{{\"{3}\"+TO+*])".format(filter_field, prev_lot_end_value, id_field,
+ prev_lot_end_id)
+ fq_new = "{0}:{{\"{1}\"+TO+\"{2}\"]".format(filter_field, prev_lot_end_value, range_end)
+ fq = "{0}+OR+{1}".format(fq_prev_end_rest, fq_new)
+ else:
+ fq = "{0}:[*+TO+\"{1}\"]".format(filter_field, range_end)
+
+ url = "{0}&fq={1}".format(solr_query_url_prefix, quote(fq, safe="/+\"*"))
- url = "{0}&fq={1}".format(solr_query_url_prefix, quote(fq, safe="/+\"*"))
curl_command = "{0} {1}".format(curl_prefix, url)
rsp = query_solr(solr_kinit_command, url, curl_command, "Obtaining")
@@ -428,7 +457,7 @@ def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_
if records == write_block_size:
break
- prev_lot_end_value = last_doc[filter_field]
+ prev_lot_end_value = last_doc[filter_field] if not skip_date_usage else prev_lot_end_value
prev_lot_end_id = last_doc[id_field]
sys.stdout.write("\r{0} records are written".format(records))
sys.stdout.flush()
@@ -464,7 +493,7 @@ def finish_file(tmp_file, json_file):
def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
- key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection):
+ key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection, skip_date_usage):
if name:
file_name = "{0}_-_{1}_-_{2}_-_{3}".format(collection, name, prev_lot_end_value, prev_lot_end_id).replace(':', '_')
else:
@@ -474,7 +503,7 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr
upload_command = create_command_file(mode, True, working_dir, upload_file_path, solr_url, collection, filter_field,
id_field, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
- key_file_path, bucket, key_prefix, local_path, solr_output_collection)
+ key_file_path, bucket, key_prefix, local_path, solr_output_collection, skip_date_usage)
if solr_output_collection:
upload_file_to_solr(solr_kinit_command, curl_prefix, upload_command, upload_file_path, solr_output_collection)
elif hdfs_user:
@@ -488,7 +517,8 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr
sys.exit()
delete_command = create_command_file(mode, False, working_dir, upload_file_path, solr_url, collection, filter_field,
- id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None, None)
+ id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None, None,
+ skip_date_usage)
if mode == "archive":
delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
prev_lot_end_id)
@@ -537,7 +567,7 @@ def compress_file(working_dir, tmp_file_path, file_name, compression):
def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, collection, filter_field, id_field,
prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix,
- local_path, solr_output_collection):
+ local_path, solr_output_collection, skip_date_usage):
commands = {}
if upload:
@@ -547,7 +577,7 @@ def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, c
if upload:
if solr_output_collection:
- upload_command = "{0}/{1}/update/json/docs --data-binary @{2}"\
+ upload_command = "{0}/{1}/update/json/docs?commit=true&wt=json --data-binary @{2}"\
.format(solr_url, solr_output_collection, upload_file_path)
upload_command_data = {}
upload_command_data["type"] = "solr"
@@ -589,10 +619,12 @@ def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, c
if mode == "save":
return upload_command
-
- delete_prev = "{0}:[*+TO+\"{1}\"]".format(filter_field, prev_lot_end_value)
- delete_last = "({0}:\"{1}\"+AND+{2}:[*+TO+\"{3}\"])".format(filter_field, prev_lot_end_value, id_field, prev_lot_end_id)
- delete_query = "{0}+OR+{1}".format(delete_prev, delete_last)
+ if skip_date_usage:
+ delete_query = "({0}:[*+TO+\"{1}\"])".format(id_field, prev_lot_end_id)
+ else:
+ delete_prev = "{0}:[*+TO+\"{1}\"]".format(filter_field, prev_lot_end_value)
+ delete_last = "({0}:\"{1}\"+AND+{2}:[*+TO+\"{3}\"])".format(filter_field, prev_lot_end_value, id_field, prev_lot_end_id)
+ delete_query = "{0}+OR+{1}".format(delete_prev, delete_last)
delete_command = "{0}/{1}/update?commit=true&wt=json --data-binary <delete><query>{2}</query></delete>" \
.format(solr_url, collection, delete_query)
@@ -758,17 +790,17 @@ if __name__ == '__main__':
verbose = options.verbose
set_log_level()
- end = get_end(options)
+ end = get_end(options) if not options.skip_date_usage else None
if options.mode == "delete":
- delete(options.solr_url, options.collection, options.filter_field, end, options.solr_keytab, options.solr_principal)
+ delete(options.solr_url, options.collection, options.filter_field, end, options.solr_keytab, options.solr_principal, options.skip_date_usage)
elif options.mode in ["archive", "save"]:
save(options.mode, options.solr_url, options.collection, options.filter_field, options.id_field, end,
options.read_block_size, options.write_block_size, options.ignore_unfinished_uploading,
options.additional_filter, options.name, options.solr_keytab, options.solr_principal, options.json_file,
options.compression, options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path,
options.key_file_path, options.bucket, options.key_prefix, options.local_path, options.solr_output_collection,
- options.exclude_fields)
+ options.exclude_fields, options.skip_date_usage)
else:
logger.warn("Unknown mode: %s", options.mode)