You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/06/22 10:09:34 UTC

[ambari] branch trunk updated: AMBARI-24163. Infra Solr: solrDataManager.py should support transport data without timestamp field. (#1594)

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 939a33c  AMBARI-24163. Infra Solr: solrDataManager.py should support transport data without timestamp field. (#1594)
939a33c is described below

commit 939a33c5590f5e26b7fa9940737dd419ae076db4
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Fri Jun 22 12:09:32 2018 +0200

    AMBARI-24163. Infra Solr: solrDataManager.py should support transport data without timestamp field. (#1594)
---
 .../src/main/python/solrDataManager.py             | 104 ++++++++++++++-------
 1 file changed, 68 insertions(+), 36 deletions(-)

diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
index 5d2d2fa..b34873d 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
@@ -91,15 +91,22 @@ def parse_arguments():
 
   parser.add_option("--solr-output-collection", dest="solr_output_collection", help="target output solr collection for archive", type="string", default=None)
   parser.add_option("--exclude-fields", dest="exclude_fields", help="Comma separated list of excluded fields from json response", type="string", default=None)
+  parser.add_option("--skip-date-usage", dest="skip_date_usage", action="store_true", default=False, help="datestamp field won't be used for queries (sort based on id field)")
 
   (options, args) = parser.parse_args()
 
-  for r in ["mode", "solr_url", "collection", "filter_field"]:
+  for r in ["mode", "solr_url", "collection"]:
     if options.__dict__[r] is None:
       print "argument '{0}' is mandatory".format(r)
       parser.print_help()
       sys.exit()
 
+  if not options.skip_date_usage:
+    if options.filter_field is None:
+      print "argument 'filter_field' is mandatory"
+      parser.print_help()
+      sys.exit()
+
   mode_values = ["archive", "delete", "save"]
   if options.mode not in mode_values:
     print "mode must be one of {0}".format(" | ".join(mode_values))
@@ -113,7 +120,7 @@ def parse_arguments():
         parser.print_help()
         sys.exit()
 
-  if options.__dict__["end"] is None and options.__dict__["days"] is None or \
+  if not options.skip_date_usage and options.__dict__["end"] is None and options.__dict__["days"] is None or \
           options.__dict__["end"] is not None and options.__dict__["days"] is not None:
     print "exactly one of 'end' or 'days' must be specfied"
     parser.print_help()
@@ -174,7 +181,8 @@ def parse_arguments():
   print("  mode: " + options.mode)
   print("  solr-url: " + options.solr_url)
   print("  collection: " + options.collection)
-  print("  filter-field: " + options.filter_field)
+  if options.__dict__["filter_field"] is not None:
+    print("  filter-field: " + options.filter_field)
   if options.mode in ["archive", "save"]:
     print("  id-field: " + options.id_field)
   if options.__dict__["exclude_fields"] is not None:
@@ -212,6 +220,7 @@ def parse_arguments():
     print("  key-prefix: " + options.key_prefix)
   if (options.__dict__["local_path"] is not None):
     print("  local-path: " + options.local_path)
+  print ("  skip-date-usage: " + str(options.skip_date_usage))
   print("  verbose: " + str(options.verbose))
   print
 
@@ -243,7 +252,7 @@ def get_end(options):
     logger.info("The end date will be: %s", end)
     return end
 
-def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal):
+def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal, skip_date_usage):
   logger.info("Deleting data where %s <= %s", filter_field, end)
   solr_kinit_command = None
   if solr_keytab:
@@ -251,8 +260,10 @@ def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal)
     curl_prefix = "curl -k --negotiate -u : "
   else:
     curl_prefix = "curl -k"
-
-  delete_query = "{0}:[* TO \"{1}\"]".format(filter_field, end)
+  if skip_date_usage:
+    delete_query = "*:*"
+  else:
+    delete_query = "{0}:[* TO \"{1}\"]".format(filter_field, end)
   delete_command = "{0}/{1}/update?commit=true&wt=json".format(solr_url, collection)
   delete_data = "<delete><query>{0}</query></delete>".format(delete_query)
 
@@ -261,7 +272,7 @@ def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal)
 def save(mode, solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size,
          ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, json_file,
          compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path,
-         solr_output_collection, exclude_fields):
+         solr_output_collection, exclude_fields, skip_date_usage):
   solr_kinit_command = None
   if solr_keytab:
     solr_kinit_command = "kinit -kt {0} {1}".format(solr_keytab, solr_principal)
@@ -282,7 +293,8 @@ def save(mode, solr_url, collection, filter_field, id_field, range_end, read_blo
 
   save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
             range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression,
-            hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, exclude_fields)
+            hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, exclude_fields,
+            skip_date_usage)
 
 def ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path):
   if hdfs_kinit_command:
@@ -355,7 +367,8 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre
 
 def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
               range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file,
-              compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, exclude_fields):
+              compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection,
+              exclude_fields, skip_date_usage):
   logger.info("Starting to save data")
 
   tmp_file_path = "{0}/tmp.json".format(working_dir)
@@ -363,12 +376,19 @@ def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_ur
   prev_lot_end_value = None
   prev_lot_end_id = None
 
-  if additional_filter:
-    q = quote("{0}+AND+{1}:[*+TO+\"{2}\"]".format(additional_filter, filter_field, range_end), safe="/+\"*")
+  if skip_date_usage:
+    if additional_filter:
+      q = quote("*:*+AND+{0}".format(additional_filter), safe="/+\"*")
+    else:
+      q = quote("*:*", safe="/+\"*")
+    sort = quote("{0}+asc".format(id_field), safe="/+\"*")
   else:
-    q = quote("{0}:[*+TO+\"{1}\"]".format(filter_field, range_end), safe="/+\"*")
+    if additional_filter:
+      q = quote("{0}+AND+{1}:[*+TO+\"{2}\"]".format(additional_filter, filter_field, range_end), safe="/+\"*")
+    else:
+      q = quote("{0}:[*+TO+\"{1}\"]".format(filter_field, range_end), safe="/+\"*")
+    sort = quote("{0}+asc,{1}+asc".format(filter_field, id_field), safe="/+\"*")
 
-  sort = quote("{0}+asc,{1}+asc".format(filter_field, id_field), safe="/+\"*")
   solr_query_url_prefix = "{0}/{1}/select?q={2}&sort={3}&rows={4}&wt=json".format(solr_url, collection, q, sort, read_block_size)
 
   exclude_field_list = exclude_fields.split(',') if exclude_fields else None
@@ -380,7 +400,7 @@ def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_ur
   while not done:
     results = create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field,
                            id_field, range_end, write_block_size, prev_lot_end_value, prev_lot_end_id, json_file,
-                           exclude_field_list)
+                           exclude_field_list, skip_date_usage)
     done = results[0]
     records = results[1]
     prev_lot_end_value = results[2]
@@ -389,12 +409,13 @@ def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_ur
     if records > 0:
       upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
                    id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user,
-                   hdfs_path, key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection)
+                   hdfs_path, key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection,
+                   skip_date_usage)
       total_records += records
       logger.info("A total of %d records are saved", total_records)
 
 def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field, id_field, range_end,
-                 write_block_size, prev_lot_end_value, prev_lot_end_id, json_file, exclude_field_list):
+                 write_block_size, prev_lot_end_value, prev_lot_end_id, json_file, exclude_field_list, skip_date_usage):
   if os.path.exists(tmp_file_path):
     os.remove(tmp_file_path)
   tmp_file = open(tmp_file_path, 'w')
@@ -404,15 +425,23 @@ def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_
   records = 0
   done = False
   while records < write_block_size:
-    if prev_lot_end_value:
-      fq_prev_end_rest = "({0}:\"{1}\"+AND+{2}:{{\"{3}\"+TO+*])".format(filter_field, prev_lot_end_value, id_field,
-                                                                        prev_lot_end_id)
-      fq_new = "{0}:{{\"{1}\"+TO+\"{2}\"]".format(filter_field, prev_lot_end_value, range_end)
-      fq = "{0}+OR+{1}".format(fq_prev_end_rest, fq_new)
+    if skip_date_usage:
+      if prev_lot_end_id:
+        fq = "({0}:{{\"{1}\"+TO+*])".format(id_field, prev_lot_end_id)
+        url = "{0}&fq={1}".format(solr_query_url_prefix, quote(fq, safe="/+\"*"))
+      else:
+        url = "{0}".format(solr_query_url_prefix)
     else:
-      fq = "{0}:[*+TO+\"{1}\"]".format(filter_field, range_end)
+      if prev_lot_end_value:
+        fq_prev_end_rest = "({0}:\"{1}\"+AND+{2}:{{\"{3}\"+TO+*])".format(filter_field, prev_lot_end_value, id_field,
+                                                                        prev_lot_end_id)
+        fq_new = "{0}:{{\"{1}\"+TO+\"{2}\"]".format(filter_field, prev_lot_end_value, range_end)
+        fq = "{0}+OR+{1}".format(fq_prev_end_rest, fq_new)
+      else:
+        fq = "{0}:[*+TO+\"{1}\"]".format(filter_field, range_end)
+
+      url = "{0}&fq={1}".format(solr_query_url_prefix, quote(fq, safe="/+\"*"))
 
-    url = "{0}&fq={1}".format(solr_query_url_prefix, quote(fq, safe="/+\"*"))
     curl_command = "{0} {1}".format(curl_prefix, url)
 
     rsp = query_solr(solr_kinit_command, url, curl_command, "Obtaining")
@@ -428,7 +457,7 @@ def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_
       if records == write_block_size:
         break
 
-    prev_lot_end_value = last_doc[filter_field]
+    prev_lot_end_value = last_doc[filter_field] if not skip_date_usage else prev_lot_end_value
     prev_lot_end_id = last_doc[id_field]
     sys.stdout.write("\r{0} records are written".format(records))
     sys.stdout.flush()
@@ -464,7 +493,7 @@ def finish_file(tmp_file, json_file):
 
 def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
                  id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
-                 key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection):
+                 key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection, skip_date_usage):
   if name:
     file_name = "{0}_-_{1}_-_{2}_-_{3}".format(collection, name, prev_lot_end_value, prev_lot_end_id).replace(':', '_')
   else:
@@ -474,7 +503,7 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr
 
   upload_command = create_command_file(mode, True, working_dir, upload_file_path, solr_url, collection, filter_field,
                                        id_field, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
-                                       key_file_path, bucket, key_prefix, local_path, solr_output_collection)
+                                       key_file_path, bucket, key_prefix, local_path, solr_output_collection, skip_date_usage)
   if solr_output_collection:
     upload_file_to_solr(solr_kinit_command, curl_prefix, upload_command, upload_file_path, solr_output_collection)
   elif hdfs_user:
@@ -488,7 +517,8 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr
     sys.exit()
 
   delete_command = create_command_file(mode, False, working_dir, upload_file_path, solr_url, collection, filter_field,
-                                       id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None, None)
+                                       id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None, None,
+                                       skip_date_usage)
   if mode == "archive":
     delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
                 prev_lot_end_id)
@@ -537,7 +567,7 @@ def compress_file(working_dir, tmp_file_path, file_name, compression):
 
 def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, collection, filter_field, id_field,
                         prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix,
-                        local_path, solr_output_collection):
+                        local_path, solr_output_collection, skip_date_usage):
   commands = {}
 
   if upload:
@@ -547,7 +577,7 @@ def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, c
 
   if upload:
     if solr_output_collection:
-      upload_command = "{0}/{1}/update/json/docs --data-binary @{2}"\
+      upload_command = "{0}/{1}/update/json/docs?commit=true&wt=json --data-binary @{2}"\
         .format(solr_url, solr_output_collection, upload_file_path)
       upload_command_data = {}
       upload_command_data["type"] = "solr"
@@ -589,10 +619,12 @@ def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, c
     if mode == "save":
       return upload_command
 
-
-  delete_prev = "{0}:[*+TO+\"{1}\"]".format(filter_field, prev_lot_end_value)
-  delete_last = "({0}:\"{1}\"+AND+{2}:[*+TO+\"{3}\"])".format(filter_field, prev_lot_end_value, id_field, prev_lot_end_id)
-  delete_query = "{0}+OR+{1}".format(delete_prev, delete_last)
+  if skip_date_usage:
+    delete_query = "({0}:[*+TO+\"{1}\"])".format(id_field, prev_lot_end_id)
+  else:
+    delete_prev = "{0}:[*+TO+\"{1}\"]".format(filter_field, prev_lot_end_value)
+    delete_last = "({0}:\"{1}\"+AND+{2}:[*+TO+\"{3}\"])".format(filter_field, prev_lot_end_value, id_field, prev_lot_end_id)
+    delete_query = "{0}+OR+{1}".format(delete_prev, delete_last)
 
   delete_command = "{0}/{1}/update?commit=true&wt=json --data-binary <delete><query>{2}</query></delete>" \
     .format(solr_url, collection, delete_query)
@@ -758,17 +790,17 @@ if __name__ == '__main__':
     verbose = options.verbose
     set_log_level()
 
-    end = get_end(options)
+    end = get_end(options) if not options.skip_date_usage else None
 
     if options.mode == "delete":
-      delete(options.solr_url, options.collection, options.filter_field, end, options.solr_keytab, options.solr_principal)
+      delete(options.solr_url, options.collection, options.filter_field, end, options.solr_keytab, options.solr_principal, options.skip_date_usage)
     elif options.mode in ["archive", "save"]:
       save(options.mode, options.solr_url, options.collection, options.filter_field, options.id_field, end,
            options.read_block_size, options.write_block_size, options.ignore_unfinished_uploading,
            options.additional_filter, options.name, options.solr_keytab, options.solr_principal, options.json_file,
            options.compression, options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path,
            options.key_file_path, options.bucket, options.key_prefix, options.local_path, options.solr_output_collection,
-           options.exclude_fields)
+           options.exclude_fields, options.skip_date_usage)
     else:
       logger.warn("Unknown mode: %s", options.mode)