You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by lp...@apache.org on 2017/09/14 14:38:53 UTC
[17/38] ambari git commit: AMBARI-21810. Create Utility Script to
support Solr Collection Data Retention/Purging/Archiving (mgergely)
AMBARI-21810. Create Utility Script to support Solr Collection Data Retention/Purging/Archiving (mgergely)
Change-Id: Ie25e5187e6e96d5b14267491fb7a9a992529b405
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/153d0da7
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/153d0da7
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/153d0da7
Branch: refs/heads/feature-branch-AMBARI-21307
Commit: 153d0da771d1ad523ad90b487fa185aa2bf0d52d
Parents: 2a6dd58
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Wed Sep 13 16:46:02 2017 +0200
Committer: Miklos Gergely <mg...@hortonworks.com>
Committed: Wed Sep 13 16:46:02 2017 +0200
----------------------------------------------------------------------
ambari-infra/ambari-infra-solr-client/build.xml | 1 +
ambari-infra/ambari-infra-solr-client/pom.xml | 5 +
.../apache/ambari/infra/solr/S3Uploader.java | 64 ++
.../src/main/python/solrDataManager.py | 715 +++++++++++++++++++
4 files changed, 785 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/153d0da7/ambari-infra/ambari-infra-solr-client/build.xml
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-solr-client/build.xml b/ambari-infra/ambari-infra-solr-client/build.xml
index 9b8b6cc..25ff0cb 100644
--- a/ambari-infra/ambari-infra-solr-client/build.xml
+++ b/ambari-infra/ambari-infra-solr-client/build.xml
@@ -36,6 +36,7 @@
<copy todir="target/package" includeEmptyDirs="no">
<fileset file="src/main/resources/solrCloudCli.sh"/>
<fileset file="src/main/resources/solrIndexHelper.sh"/>
+ <fileset file="src/main/python/solrDataManager.py"/>
</copy>
<copy todir="target/package" includeEmptyDirs="no">
<fileset file="src/main/resources/log4j.properties"/>
http://git-wip-us.apache.org/repos/asf/ambari/blob/153d0da7/ambari-infra/ambari-infra-solr-client/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-solr-client/pom.xml b/ambari-infra/ambari-infra-solr-client/pom.xml
index 3818aba..0d32374 100644
--- a/ambari-infra/ambari-infra-solr-client/pom.xml
+++ b/ambari-infra/ambari-infra-solr-client/pom.xml
@@ -82,6 +82,11 @@
<artifactId>log4j</artifactId>
</dependency>
<dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>1.11.5</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/ambari/blob/153d0da7/ambari-infra/ambari-infra-solr-client/src/main/java/org/apache/ambari/infra/solr/S3Uploader.java
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/java/org/apache/ambari/infra/solr/S3Uploader.java b/ambari-infra/ambari-infra-solr-client/src/main/java/org/apache/ambari/infra/solr/S3Uploader.java
new file mode 100644
index 0000000..60b4e0a
--- /dev/null
+++ b/ambari-infra/ambari-infra-solr-client/src/main/java/org/apache/ambari/infra/solr/S3Uploader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.infra.solr;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+
+/**
+ * Uploads a file to S3, meant to be used by solrDataManager.py
+ */
+public class S3Uploader {
+ public static void main(String[] args) {
+ try {
+ String keyFilePath = args[0];
+ String bucketName = args[1];
+ String keyPrefix = args[2];
+ String filePath = args[3];
+
+ String keyFileContent = FileUtils.readFileToString(new File(keyFilePath)).trim();
+ String[] keys = keyFileContent.split(",");
+ String accessKey = keys[0];
+ String secretKey = keys[1];
+
+ BasicAWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
+ AmazonS3Client client = new AmazonS3Client(credentials);
+
+ File file = new File(filePath);
+ String key = keyPrefix + file.getName();
+
+ if (client.doesObjectExist(bucketName, key)) {
+ System.out.println("Object '" + key + "' already exists");
+ System.exit(0);
+ }
+
+ client.putObject(bucketName, key, file);
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ System.exit(1);
+ }
+
+ System.exit(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/153d0da7/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
----------------------------------------------------------------------
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
new file mode 100644
index 0000000..d17aec7
--- /dev/null
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py
@@ -0,0 +1,715 @@
+#!/usr/bin/python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import hashlib
+import json
+import logging
+import optparse
+import os
+import time
+import signal
+import sys
+
+from datetime import datetime, timedelta
+from subprocess import call, Popen, PIPE
+from urllib import quote, unquote
+from zipfile import ZipFile, ZIP_DEFLATED
+import tarfile
+
+VERSION = "1.0"
+
+logger = logging.getLogger()
+handler = logging.StreamHandler()
+formatter = logging.Formatter("%(asctime)s - %(message)s")
+handler.setFormatter(formatter)
+logger.addHandler(handler)
+verbose = False
+
+def parse_arguments():
+ parser = optparse.OptionParser("usage: %prog [options]", version="Solr Data Manager {0}".format(VERSION))
+
+ parser.add_option("-m", "--mode", dest="mode", type="string", help="delete | save")
+ parser.add_option("-s", "--solr-url", dest="solr_url", type="string", help="the url of the solr server including the port")
+ parser.add_option("-c", "--collection", dest="collection", type="string", help="the name of the solr collection")
+ parser.add_option("-f", "--filter-field", dest="filter_field", type="string", help="the name of the field to filter on")
+ parser.add_option("-r", "--read-block-size", dest="read_block_size", type="int", help="block size to use for reading from solr",
+ default=1000)
+ parser.add_option("-w", "--write-block-size", dest="write_block_size", type="int", help="number of records in the output files",
+ default=100000)
+ parser.add_option("-i", "--id-field", dest="id_field", type="string", help="the name of the id field", default="id")
+
+ end_group = optparse.OptionGroup(parser, "specifying the end of the range")
+ end_group.add_option("-e", "--end", dest="end", type="string", help="end of the range")
+ end_group.add_option("-d", "--days", dest="days", type="int", help="number of days to keep")
+ parser.add_option_group(end_group)
+
+ parser.add_option("-o", "--date-format", dest="date_format", type="string", help="the date format to use for --days",
+ default="%Y-%m-%dT%H:%M:%S.%fZ")
+
+ parser.add_option("-q", "--additional-filter", dest="additional_filter", type="string", help="additional solr filter")
+ parser.add_option("--name", dest="name", type="string", help="name included in result files")
+
+ parser.add_option("-g", "--ignore-unfinished-uploading", dest="ignore_unfinished_uploading", action="store_true", default=False)
+
+ parser.add_option("-j", "--line-delimited", dest="line_delimited", help="line delmited json output", action="store_true", default=False)
+ parser.add_option("-z", "--compression", dest="compression", help="none | tar.gz | tar.bz2 | zip", default="tar.gz")
+
+ parser.add_option("-k", "--solr-keytab", dest="solr_keytab", type="string", help="the keytab for a kerberized solr")
+ parser.add_option("-n", "--solr-principal", dest="solr_principal", type="string", help="the principal for a kerberized solr")
+
+ parser.add_option("-a", "--hdfs-keytab", dest="hdfs_keytab", type="string", help="the keytab for a kerberized hdfs")
+ parser.add_option("-l", "--hdfs-principal", dest="hdfs_principal", type="string", help="the principal for a kerberized hdfs")
+
+ parser.add_option("-u", "--hdfs-user", dest="hdfs_user", type="string", help="the user for accessing hdfs")
+ parser.add_option("-p", "--hdfs-path", dest="hdfs_path", type="string", help="the hdfs path to upload to")
+
+ parser.add_option("-t", "--key-file-path", dest="key_file_path", type="string", help="the file that contains S3 <accessKey>,<secretKey>")
+ parser.add_option("-b", "--bucket", dest="bucket", type="string", help="the bucket name for S3 upload")
+ parser.add_option("-y", "--key-prefix", dest="key_prefix", type="string", help="the key prefix for S3 upload")
+
+ parser.add_option("-x", "--local-path", dest="local_path", type="string", help="the local path to save the files to")
+
+ parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False)
+
+ (options, args) = parser.parse_args()
+
+ for r in ["mode", "solr_url", "collection", "filter_field"]:
+ if options.__dict__[r] is None:
+ print "argument '{0}' is mandatory".format(r)
+ parser.print_help()
+ sys.exit()
+
+ mode_values = ["delete", "save"]
+ if options.mode not in mode_values:
+ print "mode must be one of {0}".format(" | ".join(mode_values))
+ parser.print_help()
+ sys.exit()
+
+ if options.mode == "delete":
+ for r in ["name", "compression", "hdfs_keytab", "hdfs_principal", "hdfs_user", "hdfs_path", "key_file_path", "bucket", "key_prefix", "local_path"]:
+ if options.__dict__[r] is not None:
+ print "argument '{0}' may not be specified in delete mode".format(r)
+ parser.print_help()
+ sys.exit()
+
+ if options.__dict__["end"] is None and options.__dict__["days"] is None or \
+ options.__dict__["end"] is not None and options.__dict__["days"] is not None:
+ print "exactly one of 'end' or 'days' must be specfied"
+ parser.print_help()
+ sys.exit()
+
+ is_any_solr_kerberos_property = options.__dict__["solr_keytab"] is not None or options.__dict__["solr_principal"] is not None
+ is_all_solr_kerberos_property = options.__dict__["solr_keytab"] is not None and options.__dict__["solr_principal"] is not None
+ if is_any_solr_kerberos_property and not is_all_solr_kerberos_property:
+ print "either both 'solr-keytab' and 'solr-principal' must be specfied, or neither of them"
+ parser.print_help()
+ sys.exit()
+
+ compression_values = ["none", "tar.gz", "tar.bz2", "zip"]
+ if options.compression not in compression_values:
+ print "compression must be one of {0}".format(" | ".join(compression_values))
+ parser.print_help()
+ sys.exit()
+
+ is_any_hdfs_kerberos_property = options.__dict__["hdfs_keytab"] is not None or options.__dict__["hdfs_principal"] is not None
+ is_all_hdfs_kerberos_property = options.__dict__["hdfs_keytab"] is not None and options.__dict__["hdfs_principal"] is not None
+ if is_any_hdfs_kerberos_property and not is_all_hdfs_kerberos_property:
+ print "either both 'hdfs_keytab' and 'hdfs_principal' must be specfied, or neither of them"
+ parser.print_help()
+ sys.exit()
+
+ is_any_hdfs_property = options.__dict__["hdfs_user"] is not None or options.__dict__["hdfs_path"] is not None
+ is_all_hdfs_property = options.__dict__["hdfs_user"] is not None and options.__dict__["hdfs_path"] is not None
+ if is_any_hdfs_property and not is_all_hdfs_property:
+ print "either both 'hdfs_user' and 'hdfs_path' must be specfied, or neither of them"
+ parser.print_help()
+ sys.exit()
+
+ is_any_s3_property = options.__dict__["key_file_path"] is not None or options.__dict__["bucket"] is not None or \
+ options.__dict__["key_prefix"] is not None
+ is_all_s3_property = options.__dict__["key_file_path"] is not None and options.__dict__["bucket"] is not None and \
+ options.__dict__["key_prefix"] is not None
+ if is_any_s3_property and not is_all_s3_property:
+ print "either all the S3 arguments ('key_file_path', 'bucket', 'key_prefix') must be specfied, or none of them"
+ parser.print_help()
+ sys.exit()
+
+ if options.mode == "save":
+ count = (1 if is_any_hdfs_property else 0) + (1 if is_any_s3_property else 0) + \
+ (1 if options.__dict__["local_path"] is not None else 0)
+ if count != 1:
+ print "exactly one of the HDFS arguments ('hdfs_user', 'hdfs_path') or the S3 arguments ('key_file_path', 'bucket', 'key_prefix') or the 'local_path' argument must be specified"
+ parser.print_help()
+ sys.exit()
+
+ if options.__dict__["hdfs_keytab"] is not None and options.__dict__["hdfs_user"] is None:
+ print "HDFS kerberos keytab and principal may only be specified if the upload target is HDFS"
+ parser.print_help()
+ sys.exit()
+
+ print("You are running Solr Data Manager {0} with arguments:".format(VERSION))
+ print(" mode: " + options.mode)
+ print(" solr-url: " + options.solr_url)
+ print(" collection: " + options.collection)
+ print(" filter-field: " + options.filter_field)
+ if options.mode == "save":
+ print(" id-field: " + options.id_field)
+ if options.__dict__["end"] is not None:
+ print(" end: " + options.end)
+ else:
+ print(" days: " + str(options.days))
+ print(" date-format: " + options.date_format)
+ if options.__dict__["additional_filter"] is not None:
+ print(" additional-filter: " + str(options.additional_filter))
+ if options.__dict__["name"] is not None:
+ print(" name: " + str(options.name))
+ if options.mode == "save":
+ print(" read-block-size: " + str(options.read_block_size))
+ print(" write-block-size: " + str(options.write_block_size))
+ print(" ignore-unfinished-uploading: " + str(options.ignore_unfinished_uploading))
+ if (options.__dict__["solr_keytab"] is not None):
+ print(" solr-keytab: " + options.solr_keytab)
+ print(" solr-principal: " + options.solr_principal)
+ if options.mode == "save":
+ print(" line-delimited: " + str(options.line_delimited))
+ print(" compression: " + options.compression)
+ if (options.__dict__["hdfs_keytab"] is not None):
+ print(" hdfs-keytab: " + options.hdfs_keytab)
+ print(" hdfs-principal: " + options.hdfs_principal)
+ if (options.__dict__["hdfs_user"] is not None):
+ print(" hdfs-user: " + options.hdfs_user)
+ print(" hdfs-path: " + options.hdfs_path)
+ if (options.__dict__["key_file_path"] is not None):
+ print(" key-file-path: " + options.key_file_path)
+ print(" bucket: " + options.bucket)
+ print(" key-prefix: " + options.key_prefix)
+ if (options.__dict__["local_path"] is not None):
+ print(" local-path: " + options.local_path)
+ print(" verbose: " + str(options.verbose))
+ print
+
+ if options.__dict__["additional_filter"] is not None and options.__dict__["name"] is None:
+ go = False
+ while not go:
+ sys.stdout.write("It is recommended to set --name in case of any additional filter is set.\n")
+ sys.stdout.write("Are you sure that you want to proceed without a name (yes/no)? ")
+ choice = raw_input().lower()
+ if choice in ['yes', 'ye', 'y']:
+ go = True
+ elif choice in ['no', 'n']:
+ sys.exit()
+
+ return options
+
+def set_log_level():
+ if verbose:
+ logger.setLevel(logging.DEBUG)
+ else:
+ logger.setLevel(logging.INFO)
+
+def get_end(options):
+ if options.end:
+ return options.end
+ else:
+ d = datetime.now() - timedelta(days=options.days)
+ end = d.strftime(options.date_format)
+ logger.info("The end date will be: %s", end)
+ return end
+
+def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal):
+ logger.info("Deleting data where %s <= %s", filter_field, end)
+ solr_kinit_command = None
+ if solr_keytab:
+ solr_kinit_command = "kinit -kt {0} {1}".format(solr_keytab, solr_principal)
+ curl_prefix = "curl -k --negotiate -u : "
+ else:
+ curl_prefix = "curl -k"
+
+ delete_range = "{0}:[*+TO+\"{1}\"]".format(filter_field, end)
+ delete_query = quote("{0}:[*+TO+\"{1}\"]".format(filter_field, end), safe="/+\"*")
+ delete_command = "{0}/{1}/update?stream.body=<delete><query>{2}</query></delete>&commit=true&wt=json" \
+ .format(solr_url, collection, delete_query)
+
+ query_solr(solr_kinit_command, delete_command, "{0} {1}".format(curl_prefix, delete_command), "Deleting")
+
+def save(solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size,
+ ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, line_delimited,
+ compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
+ solr_kinit_command = None
+ if solr_keytab:
+ solr_kinit_command = "kinit -kt {0} {1}".format(solr_keytab, solr_principal)
+ curl_prefix = "curl -k --negotiate -u : "
+ else:
+ curl_prefix = "curl -k"
+
+ hdfs_kinit_command = None
+ if hdfs_keytab:
+ hdfs_kinit_command = "sudo -u {0} kinit -kt {1} {2}".format(hdfs_user, hdfs_keytab, hdfs_principal)
+
+ if options.hdfs_path:
+ ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path)
+
+ working_dir = get_working_dir(solr_url, collection)
+ handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading)
+ save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field, range_end,
+ read_block_size, write_block_size, working_dir, additional_filter, name, line_delimited, compression,
+ hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path)
+
+def ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path):
+ if hdfs_kinit_command:
+ run_kinit(hdfs_kinit_command, "HDFS")
+
+ try:
+ hdfs_create_dir_command = "sudo -u {0} hadoop fs -mkdir -p {1}".format(hdfs_user, hdfs_path)
+ logger.debug("Ensuring that the HDFS path %s exists:\n%s", hdfs_path, hdfs_create_dir_command)
+ result = call(hdfs_create_dir_command.split())
+ except Exception as e:
+ print
+ logger.warn("Could not execute hdfs ensure dir command:\n%s", hdfs_create_dir_command)
+ logger.warn(str(e))
+ sys.exit()
+
+ if result != 0:
+ print
+ logger.warn("Could not ensure HDFS dir command:\n%s", hdfs_create_dir_command)
+ logger.warn(str(err))
+ sys.exit()
+
+def get_working_dir(solr_url, collection):
+ md5 = hashlib.md5()
+ md5.update(solr_url)
+ md5.update(collection)
+ hash = md5.hexdigest()
+ working_dir = "/tmp/solrDataManager/{0}".format(hash)
+
+ if not(os.path.isdir(working_dir)):
+ os.makedirs(working_dir)
+
+ logger.debug("Working directory is %s", working_dir)
+ return working_dir
+
+def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading):
+ command_json_path = "{0}/command.json".format(working_dir)
+ if os.path.isfile(command_json_path):
+ with open(command_json_path) as command_file:
+ command = json.load(command_file)
+
+ if "upload" in command.keys() and ignore_unfinished_uploading:
+ logger.info("Ignoring unfinished uploading left by previous run")
+ os.remove(command_json_path)
+ return
+
+ if "upload" in command.keys():
+ logger.info("Previous run has left unfinished uploading")
+ logger.info("You may try to run the program with '-g' or '--ignore-unfinished-uploading' to ignore it if it keeps on failing")
+
+ if command["upload"]["type"] == "hdfs":
+ upload_file_hdfs(hdfs_kinit_command, command["upload"]["command"], command["upload"]["upload_file_path"],
+ command["upload"]["hdfs_path"], command["upload"]["hdfs_user"])
+ elif command["upload"]["type"] == "s3":
+ upload_file_s3(command["upload"]["command"], command["upload"]["upload_file_path"], command["upload"]["bucket"],
+ command["upload"]["key_prefix"])
+ elif command["upload"]["type"] == "local":
+ upload_file_local(command["upload"]["command"], command["upload"]["upload_file_path"], command["upload"]["local_path"])
+ else:
+ logger.warn("Unknown upload type: %s", command["upload"]["type"])
+ sys.exit()
+
+ if "delete" in command.keys():
+ delete_data(solr_kinit_command, curl_prefix, command["delete"]["command"], command["delete"]["collection"],
+ command["delete"]["filter_field"], command["delete"]["id_field"], command["delete"]["prev_lot_end_value"],
+ command["delete"]["prev_lot_end_id"])
+
+ os.remove(command_json_path)
+
+def save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
+ range_end, read_block_size, write_block_size, working_dir, additional_filter, name, line_delimited,
+ compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
+ logger.info("Starting to save data")
+
+ tmp_file_path = "{0}/tmp.json".format(working_dir)
+ true = True # needed to be able to eval 'true' in the returned json
+
+ prev_lot_end_value = None
+ prev_lot_end_id = None
+
+ if additional_filter:
+ q = quote("{0}+AND+{1}:[*+TO+\"{2}\"]".format(additional_filter, filter_field, range_end), safe="/+\"*")
+ else:
+ q = quote("{0}:[*+TO+\"{1}\"]".format(filter_field, range_end), safe="/+\"*")
+
+ sort = quote("{0}+asc,{1}+asc".format(filter_field, id_field), safe="/+\"*")
+ solr_query_url_prefix = "{0}/{1}/select?q={2}&sort={3}&rows={4}&wt=json".format(solr_url, collection, q, sort, read_block_size)
+
+ done = False
+ total_records = 0
+ while not done:
+ results = create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field,
+ id_field, range_end, write_block_size, prev_lot_end_value, prev_lot_end_id,
+ line_delimited)
+ done = results[0]
+ records = results[1]
+ prev_lot_end_value = results[2]
+ prev_lot_end_id = results[3]
+
+ if records > 0:
+ upload_block(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
+ working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
+ key_file_path, bucket, key_prefix, local_path, compression)
+ total_records += records
+ logger.info("A total of %d records are saved", total_records)
+
+def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field, id_field, range_end,
+ write_block_size, prev_lot_end_value, prev_lot_end_id, line_delimited):
+ if os.path.exists(tmp_file_path):
+ os.remove(tmp_file_path)
+ tmp_file = open(tmp_file_path, 'w')
+ logger.debug("Created tmp file %s", tmp_file_path)
+
+ init_file(tmp_file, line_delimited)
+ records = 0
+ done = False
+ while records < write_block_size:
+ if prev_lot_end_value:
+ fq_prev_end_rest = "({0}:\"{1}\"+AND+{2}:{{\"{3}\"+TO+*])".format(filter_field, prev_lot_end_value, id_field,
+ prev_lot_end_id)
+ fq_new = "{0}:{{\"{1}\"+TO+\"{2}\"]".format(filter_field, prev_lot_end_value, range_end)
+ fq = "{0}+OR+{1}".format(fq_prev_end_rest, fq_new)
+ else:
+ fq = "{0}:[*+TO+\"{1}\"]".format(filter_field, range_end)
+
+ url = "{0}&fq={1}".format(solr_query_url_prefix, quote(fq, safe="/+\"*"))
+ curl_command = "{0} {1}".format(curl_prefix, url)
+
+ rsp = query_solr(solr_kinit_command, url, curl_command, "Obtaining")
+
+ if rsp['response']['numFound'] == 0:
+ done = True
+ break
+
+ for doc in rsp['response']['docs']:
+ last_doc = doc
+ add_line(tmp_file, doc, line_delimited, records)
+ records += 1
+ if records == write_block_size:
+ break
+
+ prev_lot_end_value = last_doc[filter_field]
+ prev_lot_end_id = last_doc[id_field]
+ sys.stdout.write("\r{0} records are written".format(records))
+ sys.stdout.flush()
+ if verbose and records < write_block_size:
+ print
+ logger.debug("Collecting next lot of data")
+
+ finish_file(tmp_file, line_delimited)
+ sys.stdout.write("\n")
+ logger.debug("Finished data collection")
+ return [done, records, prev_lot_end_value, prev_lot_end_id]
+
+def init_file(tmp_file, line_delimited):
+ if not line_delimited:
+ tmp_file.write("{\n")
+
+def add_line(tmp_file, doc, line_delimited, records):
+ if records > 0:
+ if line_delimited:
+ tmp_file.write("\n")
+ else:
+ tmp_file.write(",\n")
+
+ tmp_file.write(json.dumps(doc))
+
+def finish_file(tmp_file, line_delimited):
+ if not line_delimited:
+ tmp_file.write("\n}")
+
+def upload_block(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
+ working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
+ key_file_path, bucket, key_prefix, local_path, compression):
+ if name:
+ file_name = "{0}_-_{1}_-_{2}_-_{3}".format(collection, name, prev_lot_end_value, prev_lot_end_id).replace(':', '_')
+ else:
+ file_name = "{0}_-_{1}_-_{2}".format(collection, prev_lot_end_value, prev_lot_end_id).replace(':', '_')
+
+ upload_file_path = compress_file(working_dir, tmp_file_path, file_name, compression)
+
+ upload_command = create_command_file(True, working_dir, upload_file_path, solr_url, collection, filter_field, id_field,
+ prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket,
+ key_prefix, local_path)
+ if hdfs_user:
+ upload_file_hdfs(hdfs_kinit_command, upload_command, upload_file_path, hdfs_path, hdfs_user)
+ elif key_file_path:
+ upload_file_s3(upload_command, upload_file_path, bucket, key_prefix)
+ elif local_path:
+ upload_file_local(upload_command, upload_file_path, local_path)
+ else:
+ logger.warn("Unknown upload destination")
+ sys.exit()
+
+ delete_command = create_command_file(False, working_dir, upload_file_path, solr_url, collection, filter_field, id_field,
+ prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None)
+ delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value, prev_lot_end_id)
+
+ os.remove("{0}/command.json".format(working_dir))
+
+def compress_file(working_dir, tmp_file_path, file_name, compression):
+ if compression == "none":
+ upload_file_path = "{0}/{1}.json".format(working_dir, file_name)
+ os.rename(tmp_file_path, upload_file_path)
+ elif compression == "tar.gz":
+ upload_file_path = "{0}/{1}.tar.gz".format(working_dir, file_name)
+ zipped_file_name = "{0}.json".format(file_name)
+ tar = tarfile.open(upload_file_path, mode="w:gz")
+ try:
+ tar.add(tmp_file_path, arcname=zipped_file_name)
+ finally:
+ tar.close()
+ elif compression == "tar.bz2":
+ upload_file_path = "{0}/{1}.tar.bz2".format(working_dir, file_name)
+ zipped_file_name = "{0}.json".format(file_name)
+ tar = tarfile.open(upload_file_path, mode="w:bz2")
+ try:
+ tar.add(tmp_file_path, arcname=zipped_file_name)
+ finally:
+ tar.close()
+ elif compression == "zip":
+ upload_file_path = "{0}/{1}.zip".format(working_dir, file_name)
+ zipped_file_name = "{0}.json".format(file_name)
+ zip = ZipFile(upload_file_path, 'w')
+ zip.write(tmp_file_path, zipped_file_name, ZIP_DEFLATED)
+ logger.info("Created file %s", zipped_file_name)
+ else:
+ logger.warn("Unknown compression type")
+ sys.exit()
+
+ return upload_file_path
+
+def create_command_file(upload, working_dir, upload_file_path, solr_url, collection, filter_field, id_field, prev_lot_end_value,
+ prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
+ commands = {}
+
+ if upload:
+ logger.debug("Creating command file with upload and delete instructions in case of an interruption")
+ else:
+ logger.debug("Creating command file with delete instructions in case of an interruption")
+
+ if upload:
+ if hdfs_path:
+ upload_command = "sudo -u {0} hadoop fs -put {1} {2}".format(hdfs_user, upload_file_path, hdfs_path)
+ upload_command_data = {}
+ upload_command_data["type"] = "hdfs"
+ upload_command_data["command"] = upload_command
+ upload_command_data["upload_file_path"] = upload_file_path
+ upload_command_data["hdfs_path"] = hdfs_path
+ upload_command_data["hdfs_user"] = hdfs_user
+ commands["upload"] = upload_command_data
+ elif key_file_path:
+ upload_command = "java -cp {0}/libs/* org.apache.ambari.infra.solr.S3Uploader {1} {2} {3} {4}".format( \
+ os.path.dirname(os.path.realpath(__file__)), key_file_path, bucket, key_prefix, upload_file_path)
+ upload_command_data = {}
+ upload_command_data["type"] = "s3"
+ upload_command_data["command"] = upload_command
+ upload_command_data["upload_file_path"] = upload_file_path
+ upload_command_data["bucket"] = bucket
+ upload_command_data["key_prefix"] = key_prefix
+ commands["upload"] = upload_command_data
+ elif local_path:
+ upload_command = "mv {0} {1}".format(upload_file_path, local_path)
+ upload_command_data = {}
+ upload_command_data["type"] = "local"
+ upload_command_data["command"] = upload_command
+ upload_command_data["upload_file_path"] = upload_file_path
+ upload_command_data["local_path"] = local_path
+ commands["upload"] = upload_command_data
+ else:
+ logger.warn("Unknown upload destination")
+ sys.exit()
+
+
+ delete_prev = "{0}:[*+TO+\"{1}\"]".format(filter_field, prev_lot_end_value)
+ delete_last = "({0}:\"{1}\"+AND+{2}:[*+TO+\"{3}\"])".format(filter_field, prev_lot_end_value, id_field, prev_lot_end_id)
+ delete_query = quote("{0}+OR+{1}".format(delete_prev, delete_last), safe="/+\"*")
+ delete_command = "{0}/{1}/update?stream.body=<delete><query>{2}</query></delete>&commit=true&wt=json" \
+ .format(solr_url, collection, delete_query)
+ delete_command_data = {}
+ delete_command_data["command"] = delete_command
+ delete_command_data["collection"] = collection
+ delete_command_data["filter_field"] = filter_field
+ delete_command_data["id_field"] = id_field
+ delete_command_data["prev_lot_end_value"] = prev_lot_end_value
+ delete_command_data["prev_lot_end_id"] = prev_lot_end_id
+ commands["delete"] = delete_command_data
+
+ command_file_path = "{0}/command.json".format(working_dir)
+ command_file_path_tmp = "{0}.tmp".format(command_file_path)
+ cft = open(command_file_path_tmp, 'w')
+ cft.write(json.dumps(commands, indent=4))
+ os.rename(command_file_path_tmp, command_file_path)
+
+ logger.debug("Command file %s was created", command_file_path)
+
+ if upload:
+ return upload_command
+ else:
+ return delete_command
+
+def upload_file_hdfs(hdfs_kinit_command, upload_command, upload_file_path, hdfs_path, hdfs_user):
+ if hdfs_kinit_command:
+ run_kinit(hdfs_kinit_command, "HDFS")
+
+ try:
+ hdfs_file_exists_command = "sudo -u {0} hadoop fs -test -e {1}".format(hdfs_user, hdfs_path + os.path.basename(upload_file_path))
+ logger.debug("Checking if file already exists on hdfs:\n%s", hdfs_file_exists_command)
+ hdfs_file_exists = (0 == call(hdfs_file_exists_command.split()))
+ except Exception as e:
+ print
+ logger.warn("Could not execute command to check if file already exists on HDFS:\n%s", hdfs_file_exists_command)
+ logger.warn(str(e))
+ sys.exit()
+
+ if os.path.isfile(upload_file_path) and not hdfs_file_exists:
+ try:
+ logger.debug("Uploading file to hdfs:\n%s", upload_command)
+ result = call(upload_command.split())
+ except Exception as e:
+ print
+ logger.warn("Could not execute command to upload file to HDFS:\n%s", upload_command)
+ logger.warn(str(e))
+ sys.exit()
+
+ if result != 0:
+ logger.warn("Could not upload file to HDFS with command:\n%s", upload_command)
+ sys.exit()
+
+ logger.info("File %s was uploaded to hdfs %s", os.path.basename(upload_file_path), hdfs_path)
+ os.remove(upload_file_path)
+
+def upload_file_s3(upload_command, upload_file_path, bucket, key_prefix):
+ if os.path.isfile(upload_file_path):
+ try:
+ logger.debug("Uploading file to s3:\n%s", upload_command)
+ result = call(upload_command.split())
+ except Exception as e:
+ print
+ logger.warn("Could not execute command to upload file to S3:\n%s", upload_command)
+ logger.warn(str(e))
+ sys.exit()
+
+ if result != 0:
+ logger.warn("Could not upload file to S3 with command:\n%s", upload_command)
+ sys.exit()
+
+ logger.info("File %s was uploaded to s3 bucket '%s', key '%s'", os.path.basename(upload_file_path), bucket,
+ key_prefix + os.path.basename(upload_file_path))
+ os.remove(upload_file_path)
+
+def upload_file_local(upload_command, upload_file_path, local_path):
+ if os.path.exists(local_path) and not os.path.isdir(local_path):
+ logger.warn("Local path %s exists, but not a directory, can not save there", local_path)
+ if not os.path.isdir(local_path):
+ os.mkdir(local_path)
+ logger.debug("Directory %s was created", local_path)
+
+ try:
+ logger.debug("Moving file to local directory %s with command\n%s", local_path, upload_command)
+ call(upload_command.split())
+ logger.info("File %s was moved to local directory %s", os.path.basename(upload_file_path), local_path)
+ except Exception as e:
+ print
+ logger.warn("Could not execute move command command:\n%s", upload_command)
+ logger.warn(str(e))
+ sys.exit()
+
+def delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
+ prev_lot_end_id):
+ query_solr(solr_kinit_command, delete_command, "{0} {1}".format(curl_prefix, delete_command), "Deleting")
+ logger.info("Deleted data from collection %s where %s,%s < %s,%s", collection, filter_field, id_field, prev_lot_end_value,
+ prev_lot_end_id)
+
+def query_solr(solr_kinit_command, url, curl_command, action):
+ if solr_kinit_command:
+ run_kinit(solr_kinit_command, "Solr")
+
+ try:
+ logger.debug("%s data from solr:\n%s", action, curl_command)
+ process = Popen(curl_command.split(), stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ except Exception as e:
+ print
+ logger.warn("Could not execute curl command:\n%s", curl_command)
+ logger.warn(str(e))
+ sys.exit()
+
+ out, err = process.communicate()
+ if process.returncode != 0:
+ print
+ logger.warn("Could not execute curl command:\n%s", curl_command)
+ logger.warn(str(err))
+ sys.exit()
+
+ rsp = eval(str(out))
+ if rsp["responseHeader"]["status"] != 0:
+ print
+ logger.warn("Could not execute solr query:\n%s", unquote(url))
+ logger.warn(rsp["error"]["msg"])
+ sys.exit()
+
+ return rsp
+
+def run_kinit(kinit_command, program):
+ try:
+ logger.debug("Running kinit for %s:\n%s", program, kinit_command)
+ result = call(kinit_command.split())
+ except Exception as e:
+ print
+ logger.warn("Could not execute %s kinit command:\n%s", program, kinit_command)
+ logger.warn(str(e))
+ sys.exit()
+
+ if result != 0:
+ print
+ logger.warn("%s kinit command was not successful:\n%s", program, kinit_command)
+ sys.exit()
+
+if __name__ == '__main__':
+ try:
+ start_time = time.time()
+
+ options = parse_arguments()
+ verbose = options.verbose
+ set_log_level()
+
+ end = get_end(options)
+
+ if options.mode == "delete":
+ delete(options.solr_url, options.collection, options.filter_field, end, options.solr_keytab, options.solr_principal)
+ elif options.mode == "save":
+ save(options.solr_url, options.collection, options.filter_field, options.id_field, end, options.read_block_size,
+ options.write_block_size, options.ignore_unfinished_uploading, options.additional_filter, options.name,
+ options.solr_keytab, options.solr_principal, options.line_delimited, options.compression,
+ options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path, options.key_file_path,
+ options.bucket, options.key_prefix, options.local_path)
+ else:
+ logger.warn("Unknown mode: %s", options.mode)
+
+ print("--- %s seconds ---" % (time.time() - start_time))
+ except KeyboardInterrupt:
+ print
+ sys.exit(128 + signal.SIGINT)