You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/05/16 02:53:15 UTC
[08/12] impala git commit: IMPALA-4464: Remove
/bin/remote_data_load.py
IMPALA-4464: Remove /bin/remote_data_load.py
This file was started before the ASF project was set up, and
committed as-is. However, it relies on some internal resources
not generally available to the external Apache community at large,
and so serves no purpose in that context.
Change-Id: I002efae6ad538d371680ce23099277708ed67e0e
Reviewed-on: http://gerrit.cloudera.org:8080/10388
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
Tested-by: David Knupp <dk...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/985d2d1c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/985d2d1c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/985d2d1c
Branch: refs/heads/2.x
Commit: 985d2d1c1b8f2ad801341eeed897e1c51bc16718
Parents: 9574a5c
Author: David Knupp <dk...@cloudera.com>
Authored: Mon May 14 01:03:31 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue May 15 21:10:11 2018 +0000
----------------------------------------------------------------------
bin/remote_data_load.py | 560 -------------------------------------------
1 file changed, 560 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/985d2d1c/bin/remote_data_load.py
----------------------------------------------------------------------
diff --git a/bin/remote_data_load.py b/bin/remote_data_load.py
deleted file mode 100755
index 85a9f95..0000000
--- a/bin/remote_data_load.py
+++ /dev/null
@@ -1,560 +0,0 @@
-#!/usr/bin/env python
-# Copyright 2015 Cloudera Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-# This is a setup script that will downloaded a test warehouse snapshot and
-# deploy it on a remote, CM-managed cluster. Once the data is loaded, it is
-# possible to run a subset of the Impala core / exhaustive tests on the
-# remote cluster.
-#
-# * This script should be executed from a machine that has the Impala
-# development environment set up.
-#
-# * The cluster needs to be configured appropriately:
-# - The following services need to be installed:
-# HDFS, YARN, HIVE, IMPALA, MAPREDUCE, KUDU, HBASE, ZOOKEEPER
-# - GPL Extras parcel needs to be installed
-# - Metastore DB SERDE properties PARAM_VALUE needs to be altered to
-# allow for wide tables (See HIVE-1364.)
-# - The hive-warehouse path needs to be /test-warehouse
-#
-# Usage: remote_data_load.py [options] cm_host
-#
-# Options:
-# -h, --help show this help message and exit
-# --cm_user=CM_USER Cloudera Manager admin user
-# --cm_pass=CM_PASS Cloudera Manager admin user password
-# --no-load Do not try to load the snapshot
-# --test Run end-to-end tests against cluster.
-# --gateway=GATEWAY Gateway host to upload the data from. If not set, uses
-# the CM host as gateway.
-# --ssh_user=SSH_USER System user on the remote machine with passwordless SSH
-# configured.
-#
-import fnmatch
-import glob
-import logging
-import os
-import sh
-import shutil
-import sys
-import time
-
-from cm_api.api_client import ApiResource
-from functools import wraps
-from optparse import OptionParser
-from sh import ssh
-from tempfile import mkdtemp
-from urllib import quote as urlquote
-
-
-REQUIRED_SERVICES = ['HBASE',
- 'HDFS',
- 'HIVE',
- 'IMPALA',
- 'KUDU',
- 'MAPREDUCE',
- 'YARN',
- 'ZOOKEEPER']
-
-# TODO: It's not currently possible to get the version from the cluster.
-# It would be nice to generate this dynamically.
-# (v14 happens to be the version that ships with CDH 5.9.x)
-CM_API_VERSION = 'v14'
-
-# Impala's data loading and test framework assumes this Hive Warehouse Directory.
-# Making this configurable would be an invasive change, and therefore, we prefer to
-# re-configure the Hive service via the CM API before loading data and running tests.
-HIVE_WAREHOUSE_DIR = "/test-warehouse"
-
-logger = logging.getLogger("remote_data_load")
-logger.setLevel(logging.DEBUG)
-
-# Goes to the file
-fh = logging.FileHandler("remote_data_load.log")
-fh.setLevel(logging.DEBUG)
-
-# Goes to stdout
-ch = logging.StreamHandler()
-ch.setLevel(logging.INFO)
-
-# create formatter and add it to the handlers
-formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-fh.setFormatter(formatter)
-ch.setFormatter(formatter)
-
-# add the handlers to the logger
-logger.addHandler(fh)
-logger.addHandler(ch)
-
-
-def timing(func):
- """
- A decorator for timing how much time a function takes.
-
- We can modify this later to do something more intelligent than just logging.
- """
- @wraps(func)
- def wrap(*args, **kwargs):
- t1 = time.time()
- result = func(*args, **kwargs)
- t2 = time.time()
-
- output = 'TIME: {name}() took: {t:.4f} seconds'
- logger.info(output.format(name=func.__name__, t=(t2-t1)))
- return result
- return wrap
-
-
-def tee(line):
- """Output wrapper function used by sh to send the stdout or stderr to the
- module's logger."""
- logger.debug(line.strip())
-
-
-class RemoteDataLoad(object):
- """This is an implementation of the process to load a test-warehouse snapshot on
- a remote CM managed cluster. This script assumes that the warehouse snapshot was
- already downloaded and was either passed in as a parameter, or can be found by
- either inspecting the SNAPSHOT_DIR environment variable, or based on the WORKSPACE
- environment variable on a Jenkins build slave.
-
- The reason for the additional setup code is that in the local development
- environment it is assumed that $USER is HDFS superuser, which is not the case for
- remote deloyments.
- """
-
- def __init__(self, cm_host, options):
- logger.info("Starting remote data load...")
- self.options = options
- self.cm_host = cm_host
-
- # Gateway host can be used if the CM host is not configured as a Hadoop gateway
- self.gateway = options.gateway if options.gateway else cm_host
- self.impala_home = os.environ["IMPALA_HOME"]
- self.api = ApiResource(self.cm_host, username=options.cm_user,
- password=options.cm_pass)
-
- # The API returns a list of clusters managed by the CM host. We're assuming
- # that this CM host was set up for the purpose of Impala testing on one
- # cluster, so the list should only have one value.
- self.cluster = self.api.get_all_clusters()[0]
- self.services = self.get_services()
-
- self.config = self.get_service_client_configurations()
- logger.info("Retrieved service configuration")
- logger.info(str(self.config))
- self.prepare()
- logger.info("IMPALA_HOME: {0}".format(self.impala_home))
-
- def get_hostname_for_ref(self, host_ref):
- """Translate the HostRef instance into the hostname."""
- return self.api.get_host(host_ref.hostId).hostname
-
- @staticmethod
- def get_or_default(config):
- return config.value if config.value else config.default
-
- def get_services(self):
- """Confirm that all services are running, and return service dict."""
- services = dict((s.type, s) for s in self.cluster.get_all_services())
-
- if set(REQUIRED_SERVICES) != set(services.keys()):
- missing_services = set(REQUIRED_SERVICES) - set(services.keys())
- logger.error("Services not installed: {0}".format(list(missing_services)))
- raise RuntimeError("Cluster not ready.")
-
- if not all(services[s].serviceState == 'STARTED' for s in services):
- stopped = [s for s in services if services[s].serviceState != "STARTED"]
- logger.error("Not all services started: {0}".format(stopped))
- raise RuntimeError("Cluster not ready.")
-
- return services
-
- @timing
- def download_client_config(self, cluster, service):
- """Download the client configuration zip for a particular cluster and service.
-
- Since cm_api does not provide a way to download the archive we build the URL
- manually and download the file. Once it downloaded the file the archive is
- extracted and its content is copied to the Hadoop configuration directories
- defined by Impala.
- """
- logger.info("Downloading client configuration for {0}".format(service.name))
- url = "http://{0}:7180/api/{1}/clusters/{2}/services/{3}/clientConfig".format(
- self.cm_host, CM_API_VERSION, urlquote(cluster.name), urlquote(service.name))
- path = mkdtemp()
- sh.curl(url, o=os.path.join(path, "clientConfig.zip"), _out=tee, _err=tee)
- current = os.getcwd()
- os.chdir(path)
- sh.unzip("clientConfig.zip")
- for root, _, file_names in os.walk("."):
- for filename in fnmatch.filter(file_names, "*.xml"):
- src = os.path.join(root, filename)
- dst = os.path.join(self.impala_home, "fe", "src", "test", "resources")
- logger.debug("Copying {0} to {1}".format(src, dst))
- shutil.copy(src, dst)
- os.chdir(current)
-
- # TODO: this may be available in tests/comparison/cluster.py
- def set_hive_warehouse_dir(self, cluster, service):
- logger.info("Setting the Hive Warehouse Dir")
- for service in self.api.get_all_clusters()[0].get_all_services():
- logger.info(service)
- if service.type == "HIVE":
- hive_config = { "hive_warehouse_directory" : HIVE_WAREHOUSE_DIR }
- service.update_config(hive_config)
-
- # TODO: This functionality should be more generally available to other infrastructure
- # code, rather than being quarantined in this script. See IMPALA-4367.
- @timing
- def get_service_client_configurations(self):
- """Download the client configurations necessary to upload data to the remote
- cluster. Unfortunately, the CM API does not allow downloading it so we have to
- iterate over the services and download the config for all of them.
-
- In addition, returns an options dictionary with settings required for data loading
- like the HS2 server, Impala hosts, Name node etc.
-
- Returns:
- A client-configuration dictionary, e.g.:
-
- {
- 'hive_warehouse_directory': '/test-warehouse',
- 'hs2': 'impala-test-cluster-1.gce.cloudera.com:10000',
- 'impalad': ['impala-test-cluster-4.gce.cloudera.com:21000',
- 'impala-test-cluster-2.gce.cloudera.com:21000',
- 'impala-test-cluster-3.gce.cloudera.com:21000'],
- 'metastore': 'impala-test-cluster-1.gce.cloudera.com:9083',
- 'namenode': 'impala-test-cluster-1.gce.cloudera.com',
- 'namenode_http': 'impala-test-cluster-1.gce.cloudera.com:20101',
- 'kudu_master': 'impala-test-cluster-1.gce.cloudera.com'
- }
- """
- # Iterate overs services and find the information we need
- result = {}
- for service_type, service in self.services.iteritems():
- if service_type == "IMPALA":
- roles = service.get_roles_by_type("IMPALAD")
- impalads = []
- for r in roles:
- rc_config = r.get_config("full")
- hostname = self.get_hostname_for_ref(r.hostRef)
- hs2_port = self.get_or_default(rc_config["beeswax_port"])
- impalads.append("{0}:{1}".format(hostname, hs2_port))
- result["impalad"] = impalads
- elif service_type == "HBASE":
- self.download_client_config(self.cluster, service)
- elif service_type == "HDFS":
- self.download_client_config(self.cluster, service)
- role = service.get_roles_by_type("NAMENODE")
- config = role[0].get_config("full")
- namenode = self.get_hostname_for_ref(role[0].hostRef)
- result["namenode"] = namenode
- result["namenode_http"] = "{0}:{1}".format(
- namenode,
- self.get_or_default(config["dfs_http_port"])
- )
- elif service_type == "HIVE":
- self.set_hive_warehouse_dir(self.cluster, service)
- self.download_client_config(self.cluster, service)
- hs2 = service.get_roles_by_type("HIVESERVER2")[0]
- rc_config = hs2.get_config("full")
- result["hive_warehouse_directory"] = self.get_or_default(
- service.get_config("full")[0]["hive_warehouse_directory"])
- hostname = self.get_hostname_for_ref(hs2.hostRef)
- result["hs2"] = "{0}:{1}".format(hostname, self.get_or_default(
- rc_config["hs2_thrift_address_port"]))
-
- # Get Metastore information
- ms = service.get_roles_by_type("HIVEMETASTORE")[0]
- rc_config = ms.get_config("full")
- result["metastore"] = "{0}:{1}".format(
- self.get_hostname_for_ref(ms.hostRef),
- self.get_or_default(rc_config["hive_metastore_port"])
- )
- elif service_type == "KUDU":
- # Service KUDU does not require a client configuration
- result["kudu_master"] = self.cm_host
-
- return result
-
- # TODO: This functionality should be more generally available to other infrastructure
- # code, rather than being quarantined in this script. See IMPALA-4367.
- @staticmethod
- def find_snapshot_file(snapshot_dir):
- """Given snapshot_directory, walks the directory tree until it finds a file
- matching the test-warehouse archive pattern."""
- for root, _, file_names in os.walk(snapshot_dir):
- for filename in fnmatch.filter(file_names, "test-warehouse-*-SNAPSHOT.tar.gz"):
- logger.info("Found Snapshot file {0}".format(filename))
- return os.path.join(root, filename)
-
- @timing
- def prepare(self):
- """Populate the environment of the process with the necessary values.
-
- In addition, it creates helper objects to run shell and SSH processes.
- """
- # Populate environment with required variables
- os.environ["HS2_HOST_PORT"] = self.config["hs2"]
- os.environ["HDFS_NN"] = self.config["namenode"]
- os.environ["IMPALAD"] = self.config["impalad"][0]
- os.environ["REMOTE_LOAD"] = "1"
- os.environ["HADOOP_USER_NAME"] = "hdfs"
- os.environ["TEST_WAREHOUSE_DIR"] = self.config["hive_warehouse_directory"]
- os.environ["KUDU_MASTER"] = self.config["kudu_master"]
-
- if self.options.snapshot_file is None:
- if "SNAPSHOT_DIR" in os.environ:
- snapshot_dir = os.environ["SNAPSHOT_DIR"]
- else:
- snapshot_dir = "{0}/testdata/test-warehouse-SNAPSHOT".format(
- os.getenv("WORKSPACE"))
- if not os.path.isdir(snapshot_dir):
- err_msg = 'Snapshot directory "{0}" is not a valid directory'
- logger.error(err_msg.format(snapshot_dir))
- raise OSError("Could not find test-warehouse snapshot file.")
-
- logger.info("Snapshot directory: {0}".format(snapshot_dir))
- self.snapshot_file = self.find_snapshot_file(snapshot_dir)
- else:
- self.snapshot_file = self.options.snapshot_file
-
- # Prepare shortcuts for connecting to remote services
- self.gtw_ssh = ssh.bake("{0}@{1}".format(self.options.ssh_user, self.gateway),
- "-oStrictHostKeyChecking=no",
- "-oUserKnownHostsFile=/dev/null",
- t=True, _out=tee, _err=tee)
-
- self.beeline = sh.beeline.bake(silent=False, outputformat="csv2", n="impala",
- u="jdbc:hive2://{0}/default".format(
- self.config["hs2"]))
-
- self.load_test_warehouse = sh.Command(
- "{0}/testdata/bin/load-test-warehouse-snapshot.sh".format(
- self.impala_home)).bake(
- _out=tee, _err=tee)
-
- self.create_load_data = sh.Command(
- "{0}/testdata/bin/create-load-data.sh".format(self.impala_home))
-
- self.main_impalad = self.config["impalad"][0]
- self.impala_shell = sh.Command("impala-shell.sh").bake(i=self.main_impalad,
- _out=tee, _err=tee)
-
- self.python = sh.Command("impala-python").bake(u=True)
- self.compute_stats = sh.Command(
- "{0}/testdata/bin/compute-table-stats.sh".format(self.impala_home)).bake(
- _out=tee, _err=tee)
-
- @timing
- def load(self):
- """This method performs the actual data load. First it removes any known artifacts
- from the remote location. Next it drops potentially existing database from the
- Hive Metastore. Now, it invokes the load-test-warehouse-snapshot.sh and
- create-load-data.sh scripts with the appropriate parameters. The most important
- paramters are implicitly passed to the scripts as environment variables pointing
- to the remote HDFS, Hive and Impala.
- """
- exploration_strategy = self.options.exploration_strategy
-
- logger.info("Removing other databases")
- dblist = self.beeline(e="show databases;", _err=tee).stdout
- database_list = dblist.split()[1:] # The first element is the header string
- for db in database_list:
- if db.strip() != "default":
- logger.debug("Dropping database %s", db)
- self.impala_shell(q="drop database if exists {0} cascade;".format(db))
-
- logger.info("Invalidating metadata in Impala")
- self.impala_shell(q="invalidate metadata;")
-
- logger.info("Removing previous remote {0}".format(
- self.config["hive_warehouse_directory"]))
- r = sh.hdfs.dfs("-rm", "-r", "-f", "{0}".format(
- self.config["hive_warehouse_directory"]))
-
- logger.info("Expunging HDFS trash")
- r = sh.hdfs.dfs("-expunge")
-
- logger.info("Uploading test warehouse snapshot")
- self.load_test_warehouse(self.snapshot_file)
-
- # TODO: We need to confirm that if we change any permissions, that we don't
- # affect any running tests. See IMPALA-4375.
- logger.info("Changing warehouse ownership")
- r = sh.hdfs.dfs("-chown", "-R", "impala:hdfs", "{0}".format(
- self.config["hive_warehouse_directory"]))
- sh.hdfs.dfs("-chmod", "-R", "g+rwx", "{0}".format(
- self.config["hive_warehouse_directory"]))
- sh.hdfs.dfs("-chmod", "1777", "{0}".format(
- self.config["hive_warehouse_directory"]))
-
- logger.info("Calling create_load_data.sh")
- # The $USER variable is used in the create-load-data.sh script for beeline
- # impersonation.
- new_env = os.environ.copy()
- new_env["LOGNAME"] = "impala"
- new_env["USER"] = "impala"
- new_env["USERNAME"] = "impala"
-
- # Regardless of whether we are in fact skipping the snapshot load or not,
- # we nonetheless always pass -skip_snapshot_load to create-load-data.sh.
- # This is because we have already loaded the snapshot earlier in this
- # script, so we don't want create-load-data.sh to invoke
- # load-test-warehouse-snapshot.sh again.
- #
- # It would actually be nice to be able to skip the snapshot load, but
- # because of the existing messiness of create-load-data.sh, we can't.
- # This invocation...
- #
- # $ create-load-data.sh -skip_snapshot_load -exploration_strategy core
- #
- # ...results in this error:
- #
- # Creating /test-warehouse HDFS directory \
- # (logging to create-test-warehouse-dir.log)... FAILED
- # 'hadoop fs -mkdir /test-warehouse' failed. Tail of log:
- # Log for command 'hadoop fs -mkdir /test-warehouse'
- # mkdir: `/test-warehouse': File exists
- #
- # Similarly, even though we might pass in "core" as the exploration strategy,
- # because we aren't loading a metadata snapshot (i.e., -skip_metadata_load is
- # false), an exhaustive dataload will always be done. This again is the result
- # of logic in create-load-data.sh, which itself ignores the value passed in
- # for -exploration_strategy.
- #
- # See IMPALA-4399: "create-load-data.sh has bitrotted to some extent, and needs
- # to be cleaned up"
- create_load_data_args = ["-skip_snapshot_load", "-cm_host", self.cm_host,
- "-snapshot_file", self.snapshot_file,
- "-exploration_strategy", exploration_strategy]
-
- self.create_load_data(*create_load_data_args, _env=new_env, _out=tee, _err=tee)
-
- sh.hdfs.dfs("-chown", "-R", "impala:hdfs", "{0}".format(
- self.config["hive_warehouse_directory"]))
-
- logger.info("Re-load HBase data")
- # Manually load the HBase data last.
- self.python("{0}/bin/load-data.py".format(self.impala_home),
- "--hive_warehouse_dir={0}".format(
- self.config["hive_warehouse_directory"]),
- "--table_formats=hbase/none",
- "--hive_hs2_hostport={0}".format(self.config["hs2"]),
- "--hdfs_namenode={0}".format(self.config["namenode"]),
- "--exploration_strategy={0}".format(exploration_strategy),
- workloads="functional-query",
- force=True,
- impalad=self.main_impalad,
- _env=new_env,
- _out=tee,
- _err=tee)
-
- self.compute_stats()
- logger.info("Load data finished")
-
- # TODO: Should this be refactored out of this script? It has nothing to do with
- # data loading per se. If tests rely on the environment on the client being set
- # a certain way -- as in the prepare() method -- we may need to find another way
- # to deal with that. See IMPALA-4376.
- @timing
- def test(self):
- """Execute Impala's end-to-end tests against a remote cluster. All configuration
- paramters are picked from the cluster configuration that was fetched via the
- CM API."""
-
- # TODO: Running tests via runtest.py is currently not working against a remote
- # cluster (although running directly via py.test seems to work.) This method
- # may be refactored out of this file under IMPALA-4376, so for the time being,
- # raise a NotImplementedError.
- raise NotImplementedError
-
- # Overwrite the username to match the service user on the remote system and deal
- # with the assumption that in the local development environment the current user
- # is HDFS superuser as well.
- new_env = os.environ.copy()
- new_env["LOGNAME"] = "impala"
- new_env["USER"] = "impala"
- new_env["USERNAME"] = "impala"
-
- strategy = self.options.exploration_strategy
- logger.info("Running tests with exploration strategy {0}".format(strategy))
- run_tests = sh.Command("{0}/tests/run-tests.py".format(self.impala_home))
- run_tests("--skip_local_tests",
- "--exploration_strategy={0}".format(strategy),
- "--workload_exploration_strategy=functional-query:{0}".format(strategy),
- "--namenode_http_address={0}".format(self.config["namenode_http"]),
- "--hive_server2={0}".format(self.config["hs2"]),
- "--metastore_server={0}".format(self.config["metastore"]),
- "query_test",
- maxfail=10,
- impalad=",".join(self.config["impalad"]),
- _env=new_env,
- _out=tee,
- _err=tee)
-
-
-def parse_args():
- parser = OptionParser()
- parser.add_option("--snapshot-file", default=None,
- help="Path to the test-warehouse archive")
- parser.add_option("--cm-user", default="admin", help="Cloudera Manager admin user")
- parser.add_option("--cm-pass", default="admin",
- help="Cloudera Manager admin user password")
- parser.add_option("--gateway", default=None,
- help=("Gateway host to upload the data from. If not set, uses the "
- "CM host as gateway."))
- parser.add_option("--ssh-user", default="jenkins",
- help=("System user on the remote machine with passwordless "
- "SSH configured."))
- parser.add_option("--no-load", action="store_false", default=True, dest="load",
- help="Do not try to load the snapshot")
- parser.add_option("--exploration-strategy", default="core")
- parser.add_option("--test", action="store_true", default=False,
- help="Run end-to-end tests against cluster")
- parser.set_usage("remote_data_load.py [options] cm_host")
-
- options, args = parser.parse_args()
-
- try:
- return options, args[0] # args[0] is the cm_host
- except IndexError:
- logger.error("You must supply the cm_host.")
- parser.print_usage()
- raise
-
-def main(cm_host, options):
- """
- Load data to a remote cluster (and/or run tests) as specified.
-
- Args:
- cm_host: FQDN or IP of the CM host machine
- options: an optparse 'options' instance containing RemoteDataLoad
- values (though any object with the correct .attributes, e.g.
- a collections.namedtuple instance, would also work)
- """
- rd = RemoteDataLoad(cm_host, options)
-
- if options.load:
- rd.load()
- if options.test:
- rd.test()
-
-
-if __name__ == "__main__":
- options, cm_host = parse_args()
- main(cm_host=cm_host, options=options)