You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/04/02 22:07:08 UTC
[ambari] branch trunk updated: AMBARI-23421. Add solr-to-solr
archive operation for solrDataManager.py. (#849)
This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 29dd230 AMBARI-23421. Add solr-to-solr archive operation for solrDataManager.py. (#849)
29dd230 is described below
commit 29dd2302e6a7045503e149b8d945e4d21905432e
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Tue Apr 3 00:07:05 2018 +0200
AMBARI-23421. Add solr-to-solr archive operation for solrDataManager.py. (#849)
---
.../src/main/python/solrDataManager.py | 115 ++++++++++++++-------
1 file changed, 78 insertions(+), 37 deletions(-)
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
index b8b06be..5dfbdf6 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
@@ -18,22 +18,21 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
+import gzip
import hashlib
import json
import logging
import optparse
import os
-import time
+import shutil
import signal
import sys
-
+import tarfile
+import time
from datetime import datetime, timedelta
from subprocess import call, Popen, PIPE
from urllib import quote, unquote
from zipfile import ZipFile, ZIP_DEFLATED
-import tarfile
-import gzip
-import shutil
VERSION = "1.0"
@@ -90,6 +89,9 @@ def parse_arguments():
parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False)
+ parser.add_option("--solr-output-collection", dest="solr_output_collection", help="target output solr collection for archive", type="string", default=None)
+ parser.add_option("--exclude-fields", dest="exclude_fields", help="Comma separated list of excluded fields from json response", type="string", default=None)
+
(options, args) = parser.parse_args()
for r in ["mode", "solr_url", "collection", "filter_field"]:
@@ -130,6 +132,8 @@ def parse_arguments():
parser.print_help()
sys.exit()
+ is_any_solr_output_property = options.__dict__["solr_output_collection"] is not None
+
is_any_hdfs_kerberos_property = options.__dict__["hdfs_keytab"] is not None or options.__dict__["hdfs_principal"] is not None
is_all_hdfs_kerberos_property = options.__dict__["hdfs_keytab"] is not None and options.__dict__["hdfs_principal"] is not None
if is_any_hdfs_kerberos_property and not is_all_hdfs_kerberos_property:
@@ -154,10 +158,10 @@ def parse_arguments():
sys.exit()
if options.mode in ["archive", "save"]:
- count = (1 if is_any_hdfs_property else 0) + (1 if is_any_s3_property else 0) + \
- (1 if options.__dict__["local_path"] is not None else 0)
+ count = (1 if is_any_solr_output_property else 0) + (1 if is_any_hdfs_property else 0) + \
+ (1 if is_any_s3_property else 0) + (1 if options.__dict__["local_path"] is not None else 0)
if count != 1:
- print "exactly one of the HDFS arguments ('hdfs_user', 'hdfs_path') or the S3 arguments ('key_file_path', 'bucket', 'key_prefix') or the 'local_path' argument must be specified"
+ print "exactly one of the HDFS arguments ('hdfs_user', 'hdfs_path') or the S3 arguments ('key_file_path', 'bucket', 'key_prefix') or the solr arguments ('solr_output_collection') or the 'local_path' argument must be specified"
parser.print_help()
sys.exit()
@@ -173,6 +177,8 @@ def parse_arguments():
print(" filter-field: " + options.filter_field)
if options.mode in ["archive", "save"]:
print(" id-field: " + options.id_field)
+ if options.__dict__["exclude_fields"] is not None:
+ print(" exclude fields: " + options.exclude_fields)
if options.__dict__["end"] is not None:
print(" end: " + options.end)
else:
@@ -192,6 +198,8 @@ def parse_arguments():
if options.mode in ["archive", "save"]:
print(" output: " + ("json" if options.json_file else "line-delimited-json"))
print(" compression: " + options.compression)
+ if options.__dict__["solr_output_collection"] is not None:
+ print(" solr output collection: " + options.solr_output_collection)
if (options.__dict__["hdfs_keytab"] is not None):
print(" hdfs-keytab: " + options.hdfs_keytab)
print(" hdfs-principal: " + options.hdfs_principal)
@@ -244,16 +252,16 @@ def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal)
else:
curl_prefix = "curl -k"
- delete_range = "{0}:[*+TO+\"{1}\"]".format(filter_field, end)
- delete_query = quote("{0}:[*+TO+\"{1}\"]".format(filter_field, end), safe="/+\"*")
- delete_command = "{0}/{1}/update?stream.body=<delete><query>{2}</query></delete>&commit=true&wt=json" \
- .format(solr_url, collection, delete_query)
+ delete_query = "{0}:[* TO \"{1}\"]".format(filter_field, end)
+ delete_command = "{0}/{1}/update?commit=true&wt=json".format(solr_url, collection)
+ delete_data = "<delete><query>{0}</query></delete>".format(delete_query)
- query_solr(solr_kinit_command, delete_command, "{0} {1}".format(curl_prefix, delete_command), "Deleting")
+ query_solr(solr_kinit_command, delete_command, "{0} -H Content-Type:text/xml {1}".format(curl_prefix, delete_command), "Deleting", delete_data)
def save(mode, solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size,
ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, json_file,
- compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
+ compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path,
+ solr_output_collection, exclude_fields):
solr_kinit_command = None
if solr_keytab:
solr_kinit_command = "kinit -kt {0} {1}".format(solr_keytab, solr_principal)
@@ -274,7 +282,7 @@ def save(mode, solr_url, collection, filter_field, id_field, range_end, read_blo
save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression,
- hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path)
+ hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, exclude_fields)
def ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path):
if hdfs_kinit_command:
@@ -324,7 +332,9 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre
logger.info("Previous run has left unfinished uploading")
logger.info("You may try to run the program with '-g' or '--ignore-unfinished-uploading' to ignore it if it keeps on failing")
- if command["upload"]["type"] == "hdfs":
+ if command["upload"]["type"] == "solr":
+ upload_file_to_solr(solr_kinit_command, curl_prefix, command["upload"]["command"], command["upload"]["upload_file_path"], command["upload"]["solr_output_collection"])
+ elif command["upload"]["type"] == "hdfs":
upload_file_hdfs(hdfs_kinit_command, command["upload"]["command"], command["upload"]["upload_file_path"],
command["upload"]["hdfs_path"], command["upload"]["hdfs_user"])
elif command["upload"]["type"] == "s3":
@@ -345,7 +355,7 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre
def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file,
- compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
+ compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, exclude_fields):
logger.info("Starting to save data")
tmp_file_path = "{0}/tmp.json".format(working_dir)
@@ -361,11 +371,14 @@ def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_ur
sort = quote("{0}+asc,{1}+asc".format(filter_field, id_field), safe="/+\"*")
solr_query_url_prefix = "{0}/{1}/select?q={2}&sort={3}&rows={4}&wt=json".format(solr_url, collection, q, sort, read_block_size)
+ exclude_field_list = exclude_fields.split(',') if exclude_fields else None
+
done = False
total_records = 0
while not done:
results = create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field,
- id_field, range_end, write_block_size, prev_lot_end_value, prev_lot_end_id, json_file)
+ id_field, range_end, write_block_size, prev_lot_end_value, prev_lot_end_id, json_file,
+ exclude_field_list)
done = results[0]
records = results[1]
prev_lot_end_value = results[2]
@@ -374,12 +387,12 @@ def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_ur
if records > 0:
upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user,
- hdfs_path, key_file_path, bucket, key_prefix, local_path, compression)
+ hdfs_path, key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection)
total_records += records
logger.info("A total of %d records are saved", total_records)
def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field, id_field, range_end,
- write_block_size, prev_lot_end_value, prev_lot_end_id, json_file):
+ write_block_size, prev_lot_end_value, prev_lot_end_id, json_file, exclude_field_list):
if os.path.exists(tmp_file_path):
os.remove(tmp_file_path)
tmp_file = open(tmp_file_path, 'w')
@@ -408,7 +421,7 @@ def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_
for doc in rsp['response']['docs']:
last_doc = doc
- add_line(tmp_file, doc, json_file, records)
+ add_line(tmp_file, doc, json_file, records, exclude_field_list)
records += 1
if records == write_block_size:
break
@@ -430,12 +443,16 @@ def init_file(tmp_file, json_file):
if json_file:
tmp_file.write("{\n")
-def add_line(tmp_file, doc, json_file, records):
+def add_line(tmp_file, doc, json_file, records, exclude_fields):
if records > 0:
if json_file:
tmp_file.write(",\n")
else:
tmp_file.write("\n")
+ if exclude_fields:
+ for exclude_field in exclude_fields:
+ if doc and exclude_field in doc:
+ del doc[exclude_field]
tmp_file.write(json.dumps(doc))
@@ -445,7 +462,7 @@ def finish_file(tmp_file, json_file):
def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
- key_file_path, bucket, key_prefix, local_path, compression):
+ key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection):
if name:
file_name = "{0}_-_{1}_-_{2}_-_{3}".format(collection, name, prev_lot_end_value, prev_lot_end_id).replace(':', '_')
else:
@@ -455,8 +472,10 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr
upload_command = create_command_file(mode, True, working_dir, upload_file_path, solr_url, collection, filter_field,
id_field, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
- key_file_path, bucket, key_prefix, local_path)
- if hdfs_user:
+ key_file_path, bucket, key_prefix, local_path, solr_output_collection)
+ if solr_output_collection:
+ upload_file_to_solr(solr_kinit_command, curl_prefix, upload_command, upload_file_path, solr_output_collection)
+ elif hdfs_user:
upload_file_hdfs(hdfs_kinit_command, upload_command, upload_file_path, hdfs_path, hdfs_user)
elif key_file_path:
upload_file_s3(upload_command, upload_file_path, bucket, key_prefix)
@@ -467,7 +486,7 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr
sys.exit()
delete_command = create_command_file(mode, False, working_dir, upload_file_path, solr_url, collection, filter_field,
- id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None)
+ id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None, None)
if mode == "archive":
delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
prev_lot_end_id)
@@ -516,7 +535,7 @@ def compress_file(working_dir, tmp_file_path, file_name, compression):
def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, collection, filter_field, id_field,
prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix,
- local_path):
+ local_path, solr_output_collection):
commands = {}
if upload:
@@ -525,7 +544,16 @@ def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, c
logger.debug("Creating command file with delete instructions in case of an interruption")
if upload:
- if hdfs_path:
+ if solr_output_collection:
+ upload_command = "{0}/{1}/update/json/docs --data-binary @{2}"\
+ .format(solr_url, solr_output_collection, upload_file_path)
+ upload_command_data = {}
+ upload_command_data["type"] = "solr"
+ upload_command_data["command"] = upload_command
+ upload_command_data["upload_file_path"] = upload_file_path
+ upload_command_data["solr_output_collection"] = solr_output_collection
+ commands["upload"] = upload_command_data
+ elif hdfs_path:
upload_command = "sudo -u {0} hadoop fs -put {1} {2}".format(hdfs_user, upload_file_path, hdfs_path)
upload_command_data = {}
upload_command_data["type"] = "hdfs"
@@ -562,8 +590,9 @@ def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, c
delete_prev = "{0}:[*+TO+\"{1}\"]".format(filter_field, prev_lot_end_value)
delete_last = "({0}:\"{1}\"+AND+{2}:[*+TO+\"{3}\"])".format(filter_field, prev_lot_end_value, id_field, prev_lot_end_id)
- delete_query = quote("{0}+OR+{1}".format(delete_prev, delete_last), safe="/+\"*")
- delete_command = "{0}/{1}/update?stream.body=<delete><query>{2}</query></delete>&commit=true&wt=json" \
+ delete_query = "{0}+OR+{1}".format(delete_prev, delete_last)
+
+ delete_command = "{0}/{1}/update?commit=true&wt=json --data-binary <delete><query>{2}</query></delete>" \
.format(solr_url, collection, delete_query)
if mode == "save":
return delete_command
@@ -657,29 +686,40 @@ def upload_file_local(upload_command, upload_file_path, local_path):
logger.warn(str(e))
sys.exit()
+def upload_file_to_solr(solr_kinit_command, curl_prefix, upload_command, upload_file_path, collection):
+ if os.path.isfile(upload_file_path):
+ query_solr(solr_kinit_command, upload_command, "{0}-H Content-type:application/json {1}".format(curl_prefix, upload_command), "Saving")
+ logger.info("Save data to collection: %s", collection)
+
def delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
prev_lot_end_id):
- query_solr(solr_kinit_command, delete_command, "{0} {1}".format(curl_prefix, delete_command), "Deleting")
+ delete_cmd = delete_command.split(" --data-binary")[0]
+ delete_query_data = delete_command.split("--data-binary ")[1].replace("+", " ")
+ query_solr(solr_kinit_command, delete_cmd, "{0}-H Content-Type:text/xml {1}".format(curl_prefix, delete_cmd), "Deleting", delete_query_data)
logger.info("Deleted data from collection %s where %s,%s < %s,%s", collection, filter_field, id_field, prev_lot_end_value,
prev_lot_end_id)
-def query_solr(solr_kinit_command, url, curl_command, action):
+def query_solr(solr_kinit_command, url, curl_command, action, data=None):
if solr_kinit_command:
run_kinit(solr_kinit_command, "Solr")
try:
- logger.debug("%s data from solr:\n%s", action, curl_command)
- process = Popen(curl_command.split(), stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ cmd = curl_command.split()
+ if data:
+ cmd.append("--data-binary")
+ cmd.append(data)
+ logger.debug("%s data from solr:\n%s", action, ' '.join(cmd))
+ process = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
except Exception as e:
print
- logger.warn("Could not execute curl command:\n%s", curl_command)
+ logger.warn("Could not execute curl command:\n%s", ' '.join(cmd))
logger.warn(str(e))
sys.exit()
out, err = process.communicate()
if process.returncode != 0:
print
- logger.warn("Could not execute curl command:\n%s", curl_command)
+ logger.warn("Could not execute curl command:\n%s", ' '.join(cmd))
logger.warn(str(err))
sys.exit()
@@ -725,7 +765,8 @@ if __name__ == '__main__':
options.read_block_size, options.write_block_size, options.ignore_unfinished_uploading,
options.additional_filter, options.name, options.solr_keytab, options.solr_principal, options.json_file,
options.compression, options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path,
- options.key_file_path, options.bucket, options.key_prefix, options.local_path)
+ options.key_file_path, options.bucket, options.key_prefix, options.local_path, options.solr_output_collection,
+ options.exclude_fields)
else:
logger.warn("Unknown mode: %s", options.mode)
--
To stop receiving notification emails like this one, please contact
oleewere@apache.org.