You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/04/02 22:07:08 UTC

[ambari] branch trunk updated: AMBARI-23421. Add solr-to-solr archive operation for solrDataManager.py. (#849)

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 29dd230  AMBARI-23421. Add solr-to-solr archive operation for solrDataManager.py. (#849)
29dd230 is described below

commit 29dd2302e6a7045503e149b8d945e4d21905432e
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Tue Apr 3 00:07:05 2018 +0200

    AMBARI-23421. Add solr-to-solr archive operation for solrDataManager.py. (#849)
---
 .../src/main/python/solrDataManager.py             | 115 ++++++++++++++-------
 1 file changed, 78 insertions(+), 37 deletions(-)

diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
index b8b06be..5dfbdf6 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
@@ -18,22 +18,21 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
+import gzip
 import hashlib
 import json
 import logging
 import optparse
 import os
-import time
+import shutil
 import signal
 import sys
-
+import tarfile
+import time
 from datetime import datetime, timedelta
 from subprocess import call, Popen, PIPE
 from urllib import quote, unquote
 from zipfile import ZipFile, ZIP_DEFLATED
-import tarfile
-import gzip
-import shutil
 
 VERSION = "1.0"
 
@@ -90,6 +89,9 @@ def parse_arguments():
 
   parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False)
 
+  parser.add_option("--solr-output-collection", dest="solr_output_collection", help="target output solr collection for archive", type="string", default=None)
+  parser.add_option("--exclude-fields", dest="exclude_fields", help="Comma separated list of excluded fields from json response", type="string", default=None)
+
   (options, args) = parser.parse_args()
 
   for r in ["mode", "solr_url", "collection", "filter_field"]:
@@ -130,6 +132,8 @@ def parse_arguments():
     parser.print_help()
     sys.exit()
 
+  is_any_solr_output_property = options.__dict__["solr_output_collection"] is not None
+
   is_any_hdfs_kerberos_property = options.__dict__["hdfs_keytab"] is not None or options.__dict__["hdfs_principal"] is not None
   is_all_hdfs_kerberos_property = options.__dict__["hdfs_keytab"] is not None and options.__dict__["hdfs_principal"] is not None
   if is_any_hdfs_kerberos_property and not is_all_hdfs_kerberos_property:
@@ -154,10 +158,10 @@ def parse_arguments():
     sys.exit()
 
   if options.mode in ["archive", "save"]:
-    count = (1 if is_any_hdfs_property else 0) + (1 if is_any_s3_property else 0) + \
-            (1 if options.__dict__["local_path"] is not None else 0)
+    count = (1 if is_any_solr_output_property else 0) + (1 if is_any_hdfs_property else 0) + \
+            (1 if is_any_s3_property else 0) + (1 if options.__dict__["local_path"] is not None else 0)
     if count != 1:
-      print "exactly one of the HDFS arguments ('hdfs_user', 'hdfs_path') or the S3 arguments ('key_file_path', 'bucket', 'key_prefix') or the 'local_path' argument must be specified"
+      print "exactly one of the HDFS arguments ('hdfs_user', 'hdfs_path') or the S3 arguments ('key_file_path', 'bucket', 'key_prefix') or the solr arguments ('solr_output_collection') or the 'local_path' argument must be specified"
       parser.print_help()
       sys.exit()
 
@@ -173,6 +177,8 @@ def parse_arguments():
   print("  filter-field: " + options.filter_field)
   if options.mode in ["archive", "save"]:
     print("  id-field: " + options.id_field)
+  if options.__dict__["exclude_fields"] is not None:
+    print("  exclude fields: " + options.exclude_fields)
   if options.__dict__["end"] is not None:
     print("  end: " + options.end)
   else:
@@ -192,6 +198,8 @@ def parse_arguments():
   if options.mode in ["archive", "save"]:
     print("  output: " + ("json" if options.json_file else "line-delimited-json"))
     print("  compression: " + options.compression)
+  if options.__dict__["solr_output_collection"] is not None:
+    print("  solr output collection: " + options.solr_output_collection)
   if (options.__dict__["hdfs_keytab"] is not None):
     print("  hdfs-keytab: " + options.hdfs_keytab)
     print("  hdfs-principal: " + options.hdfs_principal)
@@ -244,16 +252,16 @@ def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal)
   else:
     curl_prefix = "curl -k"
 
-  delete_range = "{0}:[*+TO+\"{1}\"]".format(filter_field, end)
-  delete_query = quote("{0}:[*+TO+\"{1}\"]".format(filter_field, end), safe="/+\"*")
-  delete_command = "{0}/{1}/update?stream.body=<delete><query>{2}</query></delete>&commit=true&wt=json" \
-    .format(solr_url, collection, delete_query)
+  delete_query = "{0}:[* TO \"{1}\"]".format(filter_field, end)
+  delete_command = "{0}/{1}/update?commit=true&wt=json".format(solr_url, collection)
+  delete_data = "<delete><query>{0}</query></delete>".format(delete_query)
 
-  query_solr(solr_kinit_command, delete_command, "{0} {1}".format(curl_prefix, delete_command), "Deleting")
+  query_solr(solr_kinit_command, delete_command, "{0} -H Content-Type:text/xml {1}".format(curl_prefix, delete_command), "Deleting", delete_data)
 
 def save(mode, solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size,
          ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, json_file,
-         compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
+         compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path,
+         solr_output_collection, exclude_fields):
   solr_kinit_command = None
   if solr_keytab:
     solr_kinit_command = "kinit -kt {0} {1}".format(solr_keytab, solr_principal)
@@ -274,7 +282,7 @@ def save(mode, solr_url, collection, filter_field, id_field, range_end, read_blo
 
   save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
             range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression,
-            hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path)
+            hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, exclude_fields)
 
 def ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path):
   if hdfs_kinit_command:
@@ -324,7 +332,9 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre
       logger.info("Previous run has left unfinished uploading")
       logger.info("You may try to run the program with '-g' or '--ignore-unfinished-uploading' to ignore it if it keeps on failing")
 
-      if command["upload"]["type"] == "hdfs":
+      if command["upload"]["type"] == "solr":
+        upload_file_to_solr(solr_kinit_command, curl_prefix, command["upload"]["command"], command["upload"]["upload_file_path"], command["upload"]["solr_output_collection"])
+      elif command["upload"]["type"] == "hdfs":
         upload_file_hdfs(hdfs_kinit_command, command["upload"]["command"], command["upload"]["upload_file_path"],
                          command["upload"]["hdfs_path"], command["upload"]["hdfs_user"])
       elif command["upload"]["type"] == "s3":
@@ -345,7 +355,7 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre
 
 def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
               range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file,
-              compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
+              compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, exclude_fields):
   logger.info("Starting to save data")
 
   tmp_file_path = "{0}/tmp.json".format(working_dir)
@@ -361,11 +371,14 @@ def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_ur
   sort = quote("{0}+asc,{1}+asc".format(filter_field, id_field), safe="/+\"*")
   solr_query_url_prefix = "{0}/{1}/select?q={2}&sort={3}&rows={4}&wt=json".format(solr_url, collection, q, sort, read_block_size)
 
+  exclude_field_list = exclude_fields.split(',') if exclude_fields else None
+
   done = False
   total_records = 0
   while not done:
     results = create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field,
-                           id_field, range_end, write_block_size, prev_lot_end_value, prev_lot_end_id, json_file)
+                           id_field, range_end, write_block_size, prev_lot_end_value, prev_lot_end_id, json_file,
+                           exclude_field_list)
     done = results[0]
     records = results[1]
     prev_lot_end_value = results[2]
@@ -374,12 +387,12 @@ def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_ur
     if records > 0:
       upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
                    id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user,
-                   hdfs_path, key_file_path, bucket, key_prefix, local_path, compression)
+                   hdfs_path, key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection)
       total_records += records
       logger.info("A total of %d records are saved", total_records)
 
 def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field, id_field, range_end,
-                 write_block_size, prev_lot_end_value, prev_lot_end_id, json_file):
+                 write_block_size, prev_lot_end_value, prev_lot_end_id, json_file, exclude_field_list):
   if os.path.exists(tmp_file_path):
     os.remove(tmp_file_path)
   tmp_file = open(tmp_file_path, 'w')
@@ -408,7 +421,7 @@ def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_
 
     for doc in rsp['response']['docs']:
       last_doc = doc
-      add_line(tmp_file, doc, json_file, records)
+      add_line(tmp_file, doc, json_file, records, exclude_field_list)
       records += 1
       if records == write_block_size:
         break
@@ -430,12 +443,16 @@ def init_file(tmp_file, json_file):
   if json_file:
     tmp_file.write("{\n")
 
-def add_line(tmp_file, doc, json_file, records):
+def add_line(tmp_file, doc, json_file, records, exclude_fields):
   if records > 0:
     if json_file:
       tmp_file.write(",\n")
     else:
       tmp_file.write("\n")
+  if exclude_fields:
+    for exclude_field in exclude_fields:
+      if doc and exclude_field in doc:
+        del doc[exclude_field]
 
   tmp_file.write(json.dumps(doc))
 
@@ -445,7 +462,7 @@ def finish_file(tmp_file, json_file):
 
 def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
                  id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
-                 key_file_path, bucket, key_prefix, local_path, compression):
+                 key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection):
   if name:
     file_name = "{0}_-_{1}_-_{2}_-_{3}".format(collection, name, prev_lot_end_value, prev_lot_end_id).replace(':', '_')
   else:
@@ -455,8 +472,10 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr
 
   upload_command = create_command_file(mode, True, working_dir, upload_file_path, solr_url, collection, filter_field,
                                        id_field, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
-                                       key_file_path, bucket, key_prefix, local_path)
-  if hdfs_user:
+                                       key_file_path, bucket, key_prefix, local_path, solr_output_collection)
+  if solr_output_collection:
+    upload_file_to_solr(solr_kinit_command, curl_prefix, upload_command, upload_file_path, solr_output_collection)
+  elif hdfs_user:
     upload_file_hdfs(hdfs_kinit_command, upload_command, upload_file_path, hdfs_path, hdfs_user)
   elif key_file_path:
     upload_file_s3(upload_command, upload_file_path, bucket, key_prefix)
@@ -467,7 +486,7 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr
     sys.exit()
 
   delete_command = create_command_file(mode, False, working_dir, upload_file_path, solr_url, collection, filter_field,
-                                       id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None)
+                                       id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None, None)
   if mode == "archive":
     delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
                 prev_lot_end_id)
@@ -516,7 +535,7 @@ def compress_file(working_dir, tmp_file_path, file_name, compression):
 
 def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, collection, filter_field, id_field,
                         prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix,
-                        local_path):
+                        local_path, solr_output_collection):
   commands = {}
 
   if upload:
@@ -525,7 +544,16 @@ def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, c
     logger.debug("Creating command file with delete instructions in case of an interruption")
 
   if upload:
-    if hdfs_path:
+    if solr_output_collection:
+      upload_command = "{0}/{1}/update/json/docs --data-binary @{2}"\
+        .format(solr_url, solr_output_collection, upload_file_path)
+      upload_command_data = {}
+      upload_command_data["type"] = "solr"
+      upload_command_data["command"] = upload_command
+      upload_command_data["upload_file_path"] = upload_file_path
+      upload_command_data["solr_output_collection"] = solr_output_collection
+      commands["upload"] = upload_command_data
+    elif hdfs_path:
       upload_command = "sudo -u {0} hadoop fs -put {1} {2}".format(hdfs_user, upload_file_path, hdfs_path)
       upload_command_data = {}
       upload_command_data["type"] = "hdfs"
@@ -562,8 +590,9 @@ def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, c
 
   delete_prev = "{0}:[*+TO+\"{1}\"]".format(filter_field, prev_lot_end_value)
   delete_last = "({0}:\"{1}\"+AND+{2}:[*+TO+\"{3}\"])".format(filter_field, prev_lot_end_value, id_field, prev_lot_end_id)
-  delete_query = quote("{0}+OR+{1}".format(delete_prev, delete_last), safe="/+\"*")
-  delete_command = "{0}/{1}/update?stream.body=<delete><query>{2}</query></delete>&commit=true&wt=json" \
+  delete_query = "{0}+OR+{1}".format(delete_prev, delete_last)
+
+  delete_command = "{0}/{1}/update?commit=true&wt=json --data-binary <delete><query>{2}</query></delete>" \
     .format(solr_url, collection, delete_query)
   if mode == "save":
     return delete_command
@@ -657,29 +686,40 @@ def upload_file_local(upload_command, upload_file_path, local_path):
     logger.warn(str(e))
     sys.exit()
 
+def upload_file_to_solr(solr_kinit_command, curl_prefix, upload_command, upload_file_path, collection):
+  if os.path.isfile(upload_file_path):
+    query_solr(solr_kinit_command, upload_command, "{0}-H Content-type:application/json {1}".format(curl_prefix, upload_command), "Saving")
+    logger.info("Save data to collection: %s", collection)
+
 def delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
                 prev_lot_end_id):
-  query_solr(solr_kinit_command, delete_command, "{0} {1}".format(curl_prefix, delete_command), "Deleting")
+  delete_cmd = delete_command.split(" --data-binary")[0]
+  delete_query_data = delete_command.split("--data-binary ")[1].replace("+", " ")
+  query_solr(solr_kinit_command, delete_cmd, "{0}-H Content-Type:text/xml {1}".format(curl_prefix, delete_cmd), "Deleting", delete_query_data)
   logger.info("Deleted data from collection %s where %s,%s < %s,%s", collection, filter_field, id_field, prev_lot_end_value,
               prev_lot_end_id)
 
-def query_solr(solr_kinit_command, url, curl_command, action):
+def query_solr(solr_kinit_command, url, curl_command, action, data=None):
   if solr_kinit_command:
     run_kinit(solr_kinit_command, "Solr")
 
   try:
-    logger.debug("%s data from solr:\n%s", action, curl_command)
-    process = Popen(curl_command.split(), stdin=PIPE, stdout=PIPE, stderr=PIPE)
+    cmd = curl_command.split()
+    if data:
+      cmd.append("--data-binary")
+      cmd.append(data)
+    logger.debug("%s data from solr:\n%s", action, ' '.join(cmd))
+    process = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
   except Exception as e:
     print
-    logger.warn("Could not execute curl command:\n%s", curl_command)
+    logger.warn("Could not execute curl command:\n%s", ' '.join(cmd))
     logger.warn(str(e))
     sys.exit()
 
   out, err = process.communicate()
   if process.returncode != 0:
     print
-    logger.warn("Could not execute curl command:\n%s", curl_command)
+    logger.warn("Could not execute curl command:\n%s", ' '.join(cmd))
     logger.warn(str(err))
     sys.exit()
 
@@ -725,7 +765,8 @@ if __name__ == '__main__':
            options.read_block_size, options.write_block_size, options.ignore_unfinished_uploading,
            options.additional_filter, options.name, options.solr_keytab, options.solr_principal, options.json_file,
            options.compression, options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path,
-           options.key_file_path, options.bucket, options.key_prefix, options.local_path)
+           options.key_file_path, options.bucket, options.key_prefix, options.local_path, options.solr_output_collection,
+           options.exclude_fields)
     else:
       logger.warn("Unknown mode: %s", options.mode)
 

-- 
To stop receiving notification emails like this one, please contact
oleewere@apache.org.