You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2017/09/26 22:26:38 UTC

[39/50] [abbrv] ambari git commit: AMBARI-22049. Solr Data Manager should use --line-delimited option by default (mgergely)

AMBARI-22049. Solr Data Manager should use --line-delimited option by default (mgergely)

Change-Id: Iae6c30e7a5c73ed93a68b58ee41bd5443cc91a79


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ee618e12
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ee618e12
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ee618e12

Branch: refs/heads/branch-3.0-ams
Commit: ee618e12490a20ab2f4f478700dbf929f7cbe6b6
Parents: af0db35
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Tue Sep 26 11:22:27 2017 +0200
Committer: Miklos Gergely <mg...@hortonworks.com>
Committed: Tue Sep 26 11:22:27 2017 +0200

----------------------------------------------------------------------
 .../src/main/python/solrDataManager.py          | 43 ++++++++++----------
 1 file changed, 21 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/ee618e12/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
index d17aec7..18a4da7 100644
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
@@ -64,11 +64,11 @@ def parse_arguments():
                     default="%Y-%m-%dT%H:%M:%S.%fZ")
   
   parser.add_option("-q", "--additional-filter", dest="additional_filter", type="string", help="additional solr filter")
-  parser.add_option("--name", dest="name", type="string", help="name included in result files")
+  parser.add_option("-j", "--name", dest="name", type="string", help="name included in result files")
   
   parser.add_option("-g", "--ignore-unfinished-uploading", dest="ignore_unfinished_uploading", action="store_true", default=False)
   
-  parser.add_option("-j", "--line-delimited", dest="line_delimited", help="line delmited json output", action="store_true", default=False)
+  parser.add_option("--json-file", dest="json_file", help="create a json file instead of line delimited json", action="store_true", default=False)
   parser.add_option("-z", "--compression", dest="compression", help="none | tar.gz | tar.bz2 | zip", default="tar.gz")
   
   parser.add_option("-k", "--solr-keytab", dest="solr_keytab", type="string", help="the keytab for a kerberized solr")
@@ -188,7 +188,7 @@ def parse_arguments():
     print("  solr-keytab: " + options.solr_keytab)
     print("  solr-principal: " + options.solr_principal)
   if options.mode == "save":
-    print("  line-delimited: " + str(options.line_delimited))
+    print("  output: " + ("json" if options.json_file else "line-delimited-json"))
     print("  compression: " + options.compression)
   if (options.__dict__["hdfs_keytab"] is not None):
     print("  hdfs-keytab: " + options.hdfs_keytab)
@@ -250,7 +250,7 @@ def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal)
   query_solr(solr_kinit_command, delete_command, "{0} {1}".format(curl_prefix, delete_command), "Deleting")
 
 def save(solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size,
-         ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, line_delimited,
+         ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, json_file,
          compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
   solr_kinit_command = None
   if solr_keytab:
@@ -269,7 +269,7 @@ def save(solr_url, collection, filter_field, id_field, range_end, read_block_siz
   working_dir = get_working_dir(solr_url, collection)
   handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading)
   save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field, range_end,
-            read_block_size, write_block_size, working_dir, additional_filter, name, line_delimited, compression,
+            read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression,
             hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path)
 
 def ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path):
@@ -340,12 +340,11 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre
     os.remove(command_json_path)
 
 def save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
-              range_end, read_block_size, write_block_size, working_dir, additional_filter, name, line_delimited,
+              range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file,
               compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
   logger.info("Starting to save data")
   
   tmp_file_path = "{0}/tmp.json".format(working_dir)
-  true = True # needed to be able to eval 'true' in the returned json
   
   prev_lot_end_value = None
   prev_lot_end_id = None
@@ -362,8 +361,7 @@ def save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, col
   total_records = 0
   while not done:
     results = create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field,
-                           id_field, range_end, write_block_size, prev_lot_end_value, prev_lot_end_id,
-                           line_delimited)
+                           id_field, range_end, write_block_size, prev_lot_end_value, prev_lot_end_id, json_file)
     done = results[0]
     records = results[1]
     prev_lot_end_value = results[2]
@@ -377,13 +375,13 @@ def save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, col
       logger.info("A total of %d records are saved", total_records)
 
 def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field, id_field, range_end,
-                 write_block_size, prev_lot_end_value, prev_lot_end_id, line_delimited):
+                 write_block_size, prev_lot_end_value, prev_lot_end_id, json_file):
   if os.path.exists(tmp_file_path):
     os.remove(tmp_file_path)
   tmp_file = open(tmp_file_path, 'w')
   logger.debug("Created tmp file %s", tmp_file_path)
   
-  init_file(tmp_file, line_delimited)
+  init_file(tmp_file, json_file)
   records = 0
   done = False
   while records < write_block_size:
@@ -406,7 +404,7 @@ def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_
 
     for doc in rsp['response']['docs']:
       last_doc = doc
-      add_line(tmp_file, doc, line_delimited, records)
+      add_line(tmp_file, doc, json_file, records)
       records += 1
       if records == write_block_size:
         break
@@ -419,26 +417,26 @@ def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_
       print
       logger.debug("Collecting next lot of data")
 
-  finish_file(tmp_file, line_delimited)
+  finish_file(tmp_file, json_file)
   sys.stdout.write("\n")
   logger.debug("Finished data collection")
   return [done, records, prev_lot_end_value, prev_lot_end_id]
 
-def init_file(tmp_file, line_delimited):
-  if not line_delimited:
+def init_file(tmp_file, json_file):
+  if json_file:
     tmp_file.write("{\n")
 
-def add_line(tmp_file, doc, line_delimited, records):
+def add_line(tmp_file, doc, json_file, records):
   if records > 0:
-    if line_delimited:
-      tmp_file.write("\n")
-    else:
+    if json_file:
       tmp_file.write(",\n")
+    else:
+      tmp_file.write("\n")
     
   tmp_file.write(json.dumps(doc))
 
-def finish_file(tmp_file, line_delimited):
-  if not line_delimited:
+def finish_file(tmp_file, json_file):
+  if json_file:
     tmp_file.write("\n}")
 
 def upload_block(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
@@ -664,6 +662,7 @@ def query_solr(solr_kinit_command, url, curl_command, action):
     logger.warn(str(err))
     sys.exit()
 
+  true = True # needed to be able to eval 'true' in the returned json
   rsp = eval(str(out))
   if rsp["responseHeader"]["status"] != 0:
     print
@@ -703,7 +702,7 @@ if __name__ == '__main__':
     elif options.mode == "save":
       save(options.solr_url, options.collection, options.filter_field, options.id_field, end, options.read_block_size,
            options.write_block_size, options.ignore_unfinished_uploading, options.additional_filter, options.name,
-           options.solr_keytab, options.solr_principal, options.line_delimited, options.compression,
+           options.solr_keytab, options.solr_principal, options.json_file, options.compression,
            options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path, options.key_file_path,
            options.bucket, options.key_prefix, options.local_path)
     else: