You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2017/09/26 22:26:38 UTC
[39/50] [abbrv] ambari git commit: AMBARI-22049. Solr Data Manager
should use --line-delimited option by default (mgergely)
AMBARI-22049. Solr Data Manager should use --line-delimited option by default (mgergely)
Change-Id: Iae6c30e7a5c73ed93a68b58ee41bd5443cc91a79
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ee618e12
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ee618e12
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ee618e12
Branch: refs/heads/branch-3.0-ams
Commit: ee618e12490a20ab2f4f478700dbf929f7cbe6b6
Parents: af0db35
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Tue Sep 26 11:22:27 2017 +0200
Committer: Miklos Gergely <mg...@hortonworks.com>
Committed: Tue Sep 26 11:22:27 2017 +0200
----------------------------------------------------------------------
.../src/main/python/solrDataManager.py | 43 ++++++++++----------
1 file changed, 21 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/ee618e12/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
index d17aec7..18a4da7 100644
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
@@ -64,11 +64,11 @@ def parse_arguments():
default="%Y-%m-%dT%H:%M:%S.%fZ")
parser.add_option("-q", "--additional-filter", dest="additional_filter", type="string", help="additional solr filter")
- parser.add_option("--name", dest="name", type="string", help="name included in result files")
+ parser.add_option("-j", "--name", dest="name", type="string", help="name included in result files")
parser.add_option("-g", "--ignore-unfinished-uploading", dest="ignore_unfinished_uploading", action="store_true", default=False)
- parser.add_option("-j", "--line-delimited", dest="line_delimited", help="line delmited json output", action="store_true", default=False)
+ parser.add_option("--json-file", dest="json_file", help="create a json file instead of line delimited json", action="store_true", default=False)
parser.add_option("-z", "--compression", dest="compression", help="none | tar.gz | tar.bz2 | zip", default="tar.gz")
parser.add_option("-k", "--solr-keytab", dest="solr_keytab", type="string", help="the keytab for a kerberized solr")
@@ -188,7 +188,7 @@ def parse_arguments():
print(" solr-keytab: " + options.solr_keytab)
print(" solr-principal: " + options.solr_principal)
if options.mode == "save":
- print(" line-delimited: " + str(options.line_delimited))
+ print(" output: " + ("json" if options.json_file else "line-delimited-json"))
print(" compression: " + options.compression)
if (options.__dict__["hdfs_keytab"] is not None):
print(" hdfs-keytab: " + options.hdfs_keytab)
@@ -250,7 +250,7 @@ def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal)
query_solr(solr_kinit_command, delete_command, "{0} {1}".format(curl_prefix, delete_command), "Deleting")
def save(solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size,
- ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, line_delimited,
+ ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, json_file,
compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
solr_kinit_command = None
if solr_keytab:
@@ -269,7 +269,7 @@ def save(solr_url, collection, filter_field, id_field, range_end, read_block_siz
working_dir = get_working_dir(solr_url, collection)
handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading)
save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field, range_end,
- read_block_size, write_block_size, working_dir, additional_filter, name, line_delimited, compression,
+ read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression,
hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path)
def ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path):
@@ -340,12 +340,11 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre
os.remove(command_json_path)
def save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
- range_end, read_block_size, write_block_size, working_dir, additional_filter, name, line_delimited,
+ range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file,
compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
logger.info("Starting to save data")
tmp_file_path = "{0}/tmp.json".format(working_dir)
- true = True # needed to be able to eval 'true' in the returned json
prev_lot_end_value = None
prev_lot_end_id = None
@@ -362,8 +361,7 @@ def save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, col
total_records = 0
while not done:
results = create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field,
- id_field, range_end, write_block_size, prev_lot_end_value, prev_lot_end_id,
- line_delimited)
+ id_field, range_end, write_block_size, prev_lot_end_value, prev_lot_end_id, json_file)
done = results[0]
records = results[1]
prev_lot_end_value = results[2]
@@ -377,13 +375,13 @@ def save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, col
logger.info("A total of %d records are saved", total_records)
def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field, id_field, range_end,
- write_block_size, prev_lot_end_value, prev_lot_end_id, line_delimited):
+ write_block_size, prev_lot_end_value, prev_lot_end_id, json_file):
if os.path.exists(tmp_file_path):
os.remove(tmp_file_path)
tmp_file = open(tmp_file_path, 'w')
logger.debug("Created tmp file %s", tmp_file_path)
- init_file(tmp_file, line_delimited)
+ init_file(tmp_file, json_file)
records = 0
done = False
while records < write_block_size:
@@ -406,7 +404,7 @@ def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_
for doc in rsp['response']['docs']:
last_doc = doc
- add_line(tmp_file, doc, line_delimited, records)
+ add_line(tmp_file, doc, json_file, records)
records += 1
if records == write_block_size:
break
@@ -419,26 +417,26 @@ def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_
print
logger.debug("Collecting next lot of data")
- finish_file(tmp_file, line_delimited)
+ finish_file(tmp_file, json_file)
sys.stdout.write("\n")
logger.debug("Finished data collection")
return [done, records, prev_lot_end_value, prev_lot_end_id]
-def init_file(tmp_file, line_delimited):
- if not line_delimited:
+def init_file(tmp_file, json_file):
+ if json_file:
tmp_file.write("{\n")
-def add_line(tmp_file, doc, line_delimited, records):
+def add_line(tmp_file, doc, json_file, records):
if records > 0:
- if line_delimited:
- tmp_file.write("\n")
- else:
+ if json_file:
tmp_file.write(",\n")
+ else:
+ tmp_file.write("\n")
tmp_file.write(json.dumps(doc))
-def finish_file(tmp_file, line_delimited):
- if not line_delimited:
+def finish_file(tmp_file, json_file):
+ if json_file:
tmp_file.write("\n}")
def upload_block(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
@@ -664,6 +662,7 @@ def query_solr(solr_kinit_command, url, curl_command, action):
logger.warn(str(err))
sys.exit()
+ true = True # needed to be able to eval 'true' in the returned json
rsp = eval(str(out))
if rsp["responseHeader"]["status"] != 0:
print
@@ -703,7 +702,7 @@ if __name__ == '__main__':
elif options.mode == "save":
save(options.solr_url, options.collection, options.filter_field, options.id_field, end, options.read_block_size,
options.write_block_size, options.ignore_unfinished_uploading, options.additional_filter, options.name,
- options.solr_keytab, options.solr_principal, options.line_delimited, options.compression,
+ options.solr_keytab, options.solr_principal, options.json_file, options.compression,
options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path, options.key_file_path,
options.bucket, options.key_prefix, options.local_path)
else: