You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2018/05/16 02:53:15 UTC

[08/12] impala git commit: IMPALA-4464: Remove /bin/

IMPALA-4464: Remove /bin/

This file was started before the ASF project was set up, and
committed as-is. However, it relies on some internal resources
not generally available to the external Apache community at large,
and so serves no purpose in that context.

Change-Id: I002efae6ad538d371680ce23099277708ed67e0e
Reviewed-by: Philip Zeyliger <>
Tested-by: David Knupp <>


Branch: refs/heads/2.x
Commit: 985d2d1c1b8f2ad801341eeed897e1c51bc16718
Parents: 9574a5c
Author: David Knupp <>
Authored: Mon May 14 01:03:31 2018 -0700
Committer: Impala Public Jenkins <>
Committed: Tue May 15 21:10:11 2018 +0000

 bin/ | 560 -------------------------------------------
 1 file changed, 560 deletions(-)
diff --git a/bin/ b/bin/
deleted file mode 100755
index 85a9f95..0000000
--- a/bin/
+++ /dev/null
@@ -1,560 +0,0 @@
-#!/usr/bin/env python
-# Copyright 2015 Cloudera Inc.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# This is a setup script that will downloaded a test warehouse snapshot and
-# deploy it on a remote, CM-managed cluster. Once the data is loaded, it is
-# possible to run a subset of the Impala core / exhaustive tests on the
-# remote cluster.
-#   * This script should be executed from a machine that has the Impala
-#     development environment set up.
-#   * The cluster needs to be configured appropriately:
-#     - The following services need to be installed:
-#     - GPL Extras parcel needs to be installed
-#     - Metastore DB SERDE properties PARAM_VALUE needs to be altered to
-#       allow for wide tables (See HIVE-1364.)
-#     - The hive-warehouse path needs to be /test-warehouse
-# Usage: [options] cm_host
-#  Options:
-#    -h, --help           show this help message and exit
-#    --cm_user=CM_USER    Cloudera Manager admin user
-#    --cm_pass=CM_PASS    Cloudera Manager admin user password
-#    --no-load            Do not try to load the snapshot
-#    --test               Run end-to-end tests against cluster.
-#    --gateway=GATEWAY    Gateway host to upload the data from. If not set, uses
-#                         the CM host as gateway.
-#    --ssh_user=SSH_USER  System user on the remote machine with passwordless SSH
-#                         configured.
-import fnmatch
-import glob
-import logging
-import os
-import sh
-import shutil
-import sys
-import time
-from cm_api.api_client import ApiResource
-from functools import wraps
-from optparse import OptionParser
-from sh import ssh
-from tempfile import mkdtemp
-from urllib import quote as urlquote
-                     'HDFS',
-                     'HIVE',
-                     'IMPALA',
-                     'KUDU',
-                     'MAPREDUCE',
-                     'YARN',
-                     'ZOOKEEPER']
-# TODO: It's not currently possible to get the version from the cluster.
-# It would be nice to generate this dynamically.
-# (v14 happens to be the version that ships with CDH 5.9.x)
-# Impala's data loading and test framework assumes this Hive Warehouse Directory.
-# Making this configurable would be an invasive change, and therefore, we prefer to
-# re-configure the Hive service via the CM API before loading data and running tests.
-HIVE_WAREHOUSE_DIR = "/test-warehouse"
-logger = logging.getLogger("remote_data_load")
-# Goes to the file
-fh = logging.FileHandler("remote_data_load.log")
-# Goes to stdout
-ch = logging.StreamHandler()
-# create formatter and add it to the handlers
-formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-# add the handlers to the logger
-def timing(func):
-    """
-    A decorator for timing how much time a function takes.
-    We can modify this later to do something more intelligent than just logging.
-    """
-    @wraps(func)
-    def wrap(*args, **kwargs):
-        t1 = time.time()
-        result = func(*args, **kwargs)
-        t2 = time.time()
-        output = 'TIME: {name}() took: {t:.4f} seconds'
-, t=(t2-t1)))
-        return result
-    return wrap
-def tee(line):
-    """Output wrapper function used by sh to send the stdout or stderr to the
-    module's logger."""
-    logger.debug(line.strip())
-class RemoteDataLoad(object):
-    """This is an implementation of the process to load a test-warehouse snapshot on
-    a remote CM managed cluster. This script assumes that the warehouse snapshot was
-    already downloaded and was either passed in as a parameter, or can be found by
-    either inspecting the SNAPSHOT_DIR environment variable, or based on the WORKSPACE
-    environment variable on a Jenkins build slave.
-    The reason for the additional setup code is that in the local development
-    environment it is assumed that $USER is HDFS superuser, which is not the case for
-    remote deloyments.
-    """
-    def __init__(self, cm_host, options):
-"Starting remote data load...")
-        self.options = options
-        self.cm_host = cm_host
-        # Gateway host can be used if the CM host is not configured as a Hadoop gateway
-        self.gateway = options.gateway if options.gateway else cm_host
-        self.impala_home = os.environ["IMPALA_HOME"]
-        self.api = ApiResource(self.cm_host, username=options.cm_user,
-                               password=options.cm_pass)
-        # The API returns a list of clusters managed by the CM host. We're assuming
-        # that this CM host was set up for the purpose of Impala testing on one
-        # cluster, so the list should only have one value.
-        self.cluster = self.api.get_all_clusters()[0]
- = self.get_services()
-        self.config = self.get_service_client_configurations()
-"Retrieved service configuration")
-        self.prepare()
-"IMPALA_HOME: {0}".format(self.impala_home))
-    def get_hostname_for_ref(self, host_ref):
-        """Translate the HostRef instance into the hostname."""
-        return self.api.get_host(host_ref.hostId).hostname
-    @staticmethod
-    def get_or_default(config):
-        return config.value if config.value else config.default
-    def get_services(self):
-        """Confirm that all services are running, and return service dict."""
-        services = dict((s.type, s) for s in self.cluster.get_all_services())
-        if set(REQUIRED_SERVICES) != set(services.keys()):
-            missing_services = set(REQUIRED_SERVICES) - set(services.keys())
-            logger.error("Services not installed: {0}".format(list(missing_services)))
-            raise RuntimeError("Cluster not ready.")
-        if not all(services[s].serviceState == 'STARTED' for s in services):
-            stopped = [s for s in services if services[s].serviceState != "STARTED"]
-            logger.error("Not all services started: {0}".format(stopped))
-            raise RuntimeError("Cluster not ready.")
-        return services
-    @timing
-    def download_client_config(self, cluster, service):
-        """Download the client configuration zip for a particular cluster and service.
-        Since cm_api does not provide a way to download the archive we build the URL
-        manually and download the file. Once it downloaded the file the archive is
-        extracted and its content is copied to the Hadoop configuration directories
-        defined by Impala.
-        """
-"Downloading client configuration for {0}".format(
-        url = "http://{0}:7180/api/{1}/clusters/{2}/services/{3}/clientConfig".format(
-            self.cm_host, CM_API_VERSION, urlquote(, urlquote(
-        path = mkdtemp()
-        sh.curl(url, o=os.path.join(path, ""), _out=tee, _err=tee)
-        current = os.getcwd()
-        os.chdir(path)
-        sh.unzip("")
-        for root, _, file_names in os.walk("."):
-            for filename in fnmatch.filter(file_names, "*.xml"):
-                src = os.path.join(root, filename)
-                dst = os.path.join(self.impala_home, "fe", "src", "test", "resources")
-                logger.debug("Copying {0} to {1}".format(src, dst))
-                shutil.copy(src, dst)
-        os.chdir(current)
-    # TODO: this may be available in tests/comparison/
-    def set_hive_warehouse_dir(self, cluster, service):
-"Setting the Hive Warehouse Dir")
-        for service in self.api.get_all_clusters()[0].get_all_services():
-            if service.type == "HIVE":
-              hive_config = { "hive_warehouse_directory" : HIVE_WAREHOUSE_DIR }
-              service.update_config(hive_config)
-    # TODO: This functionality should be more generally available to other infrastructure
-    # code, rather than being quarantined in this script. See IMPALA-4367.
-    @timing
-    def get_service_client_configurations(self):
-        """Download the client configurations necessary to upload data to the remote
-        cluster. Unfortunately, the CM API does not allow downloading it so we have to
-        iterate over the services and download the config for all of them.
-        In addition, returns an options dictionary with settings required for data loading
-        like the HS2 server, Impala hosts, Name node etc.
-        Returns:
-            A client-configuration dictionary, e.g.:
-            {
-                'hive_warehouse_directory': '/test-warehouse',
-                'hs2': '',
-                'impalad': ['',
-                            '',
-                            ''],
-                'metastore': '',
-                'namenode': '',
-                'namenode_http': '',
-                'kudu_master': ''
-            }
-        """
-        # Iterate overs services and find the information we need
-        result = {}
-        for service_type, service in
-            if service_type == "IMPALA":
-                roles = service.get_roles_by_type("IMPALAD")
-                impalads = []
-                for r in roles:
-                    rc_config = r.get_config("full")
-                    hostname = self.get_hostname_for_ref(r.hostRef)
-                    hs2_port = self.get_or_default(rc_config["beeswax_port"])
-                    impalads.append("{0}:{1}".format(hostname, hs2_port))
-                    result["impalad"] = impalads
-            elif service_type == "HBASE":
-                self.download_client_config(self.cluster, service)
-            elif service_type == "HDFS":
-                self.download_client_config(self.cluster, service)
-                role = service.get_roles_by_type("NAMENODE")
-                config = role[0].get_config("full")
-                namenode = self.get_hostname_for_ref(role[0].hostRef)
-                result["namenode"] = namenode
-                result["namenode_http"] = "{0}:{1}".format(
-                    namenode,
-                    self.get_or_default(config["dfs_http_port"])
-                )
-            elif service_type == "HIVE":
-                self.set_hive_warehouse_dir(self.cluster, service)
-                self.download_client_config(self.cluster, service)
-                hs2 = service.get_roles_by_type("HIVESERVER2")[0]
-                rc_config = hs2.get_config("full")
-                result["hive_warehouse_directory"] = self.get_or_default(
-                    service.get_config("full")[0]["hive_warehouse_directory"])
-                hostname = self.get_hostname_for_ref(hs2.hostRef)
-                result["hs2"] = "{0}:{1}".format(hostname, self.get_or_default(
-                    rc_config["hs2_thrift_address_port"]))
-                # Get Metastore information
-                ms = service.get_roles_by_type("HIVEMETASTORE")[0]
-                rc_config = ms.get_config("full")
-                result["metastore"] = "{0}:{1}".format(
-                    self.get_hostname_for_ref(ms.hostRef),
-                    self.get_or_default(rc_config["hive_metastore_port"])
-                )
-            elif service_type == "KUDU":
-                # Service KUDU does not require a client configuration
-                result["kudu_master"] = self.cm_host
-        return result
-    # TODO: This functionality should be more generally available to other infrastructure
-    # code, rather than being quarantined in this script. See IMPALA-4367.
-    @staticmethod
-    def find_snapshot_file(snapshot_dir):
-        """Given snapshot_directory, walks the directory tree until it finds a file
-        matching the test-warehouse archive pattern."""
-        for root, _, file_names in os.walk(snapshot_dir):
-            for filename in fnmatch.filter(file_names, "test-warehouse-*-SNAPSHOT.tar.gz"):
-      "Found Snapshot file {0}".format(filename))
-                return os.path.join(root, filename)
-    @timing
-    def prepare(self):
-        """Populate the environment of the process with the necessary values.
-        In addition, it creates helper objects to run shell and SSH processes.
-        """
-        # Populate environment with required variables
-        os.environ["HS2_HOST_PORT"] = self.config["hs2"]
-        os.environ["HDFS_NN"] = self.config["namenode"]
-        os.environ["IMPALAD"] = self.config["impalad"][0]
-        os.environ["REMOTE_LOAD"] = "1"
-        os.environ["HADOOP_USER_NAME"] = "hdfs"
-        os.environ["TEST_WAREHOUSE_DIR"] = self.config["hive_warehouse_directory"]
-        os.environ["KUDU_MASTER"] = self.config["kudu_master"]
-        if self.options.snapshot_file is None:
-            if "SNAPSHOT_DIR" in os.environ:
-                snapshot_dir = os.environ["SNAPSHOT_DIR"]
-            else:
-                snapshot_dir = "{0}/testdata/test-warehouse-SNAPSHOT".format(
-                    os.getenv("WORKSPACE"))
-            if not os.path.isdir(snapshot_dir):
-                err_msg = 'Snapshot directory "{0}" is not a valid directory'
-                logger.error(err_msg.format(snapshot_dir))
-                raise OSError("Could not find test-warehouse snapshot file.")
-  "Snapshot directory: {0}".format(snapshot_dir))
-            self.snapshot_file = self.find_snapshot_file(snapshot_dir)
-        else:
-            self.snapshot_file = self.options.snapshot_file
-        # Prepare shortcuts for connecting to remote services
-        self.gtw_ssh = ssh.bake("{0}@{1}".format(self.options.ssh_user, self.gateway),
-                                "-oStrictHostKeyChecking=no",
-                                "-oUserKnownHostsFile=/dev/null",
-                                t=True, _out=tee, _err=tee)
-        self.beeline = sh.beeline.bake(silent=False, outputformat="csv2", n="impala",
-                                       u="jdbc:hive2://{0}/default".format(
-                                           self.config["hs2"]))
-        self.load_test_warehouse = sh.Command(
-            "{0}/testdata/bin/".format(
-                self.impala_home)).bake(
-            _out=tee, _err=tee)
-        self.create_load_data = sh.Command(
-            "{0}/testdata/bin/".format(self.impala_home))
-        self.main_impalad = self.config["impalad"][0]
-        self.impala_shell = sh.Command("").bake(i=self.main_impalad,
-                                                               _out=tee, _err=tee)
-        self.python = sh.Command("impala-python").bake(u=True)
-        self.compute_stats = sh.Command(
-            "{0}/testdata/bin/".format(self.impala_home)).bake(
-            _out=tee, _err=tee)
-    @timing
-    def load(self):
-        """This method performs the actual data load. First it removes any known artifacts
-        from the remote location. Next it drops potentially existing database from the
-        Hive Metastore. Now, it invokes the and
- scripts with the appropriate parameters. The most important
-        paramters are implicitly passed to the scripts as environment variables pointing
-        to the remote HDFS, Hive and Impala.
-        """
-        exploration_strategy = self.options.exploration_strategy
-"Removing other databases")
-        dblist = self.beeline(e="show databases;", _err=tee).stdout
-        database_list = dblist.split()[1:]  # The first element is the header string
-        for db in database_list:
-            if db.strip() != "default":
-                logger.debug("Dropping database %s", db)
-                self.impala_shell(q="drop database if exists {0} cascade;".format(db))
-"Invalidating metadata in Impala")
-        self.impala_shell(q="invalidate metadata;")
-"Removing previous remote {0}".format(
-            self.config["hive_warehouse_directory"]))
-        r = sh.hdfs.dfs("-rm", "-r", "-f", "{0}".format(
-            self.config["hive_warehouse_directory"]))
-"Expunging HDFS trash")
-        r = sh.hdfs.dfs("-expunge")
-"Uploading test warehouse snapshot")
-        self.load_test_warehouse(self.snapshot_file)
-        # TODO: We need to confirm that if we change any permissions, that we don't
-        # affect any running tests. See IMPALA-4375.
-"Changing warehouse ownership")
-        r = sh.hdfs.dfs("-chown", "-R", "impala:hdfs", "{0}".format(
-            self.config["hive_warehouse_directory"]))
-        sh.hdfs.dfs("-chmod", "-R", "g+rwx", "{0}".format(
-            self.config["hive_warehouse_directory"]))
-        sh.hdfs.dfs("-chmod", "1777", "{0}".format(
-            self.config["hive_warehouse_directory"]))
-        # The $USER variable is used in the script for beeline
-        # impersonation.
-        new_env = os.environ.copy()
-        new_env["LOGNAME"] = "impala"
-        new_env["USER"] = "impala"
-        new_env["USERNAME"] = "impala"
-        # Regardless of whether we are in fact skipping the snapshot load or not,
-        # we nonetheless always pass -skip_snapshot_load to
-        # This is because we have already loaded the snapshot earlier in this
-        # script, so we don't want to invoke
-        # again.
-        #
-        # It would actually be nice to be able to skip the snapshot load, but
-        # because of the existing messiness of, we can't.
-        # This invocation...
-        #
-        #    $ -skip_snapshot_load -exploration_strategy core
-        #
-        # ...results in this error:
-        #
-        #    Creating /test-warehouse HDFS directory \
-        #    (logging to create-test-warehouse-dir.log)... FAILED
-        #    'hadoop fs -mkdir /test-warehouse' failed. Tail of log:
-        #    Log for command 'hadoop fs -mkdir /test-warehouse'
-        #    mkdir: `/test-warehouse': File exists
-        #
-        # Similarly, even though we might pass in "core" as the exploration strategy,
-        # because we aren't loading a metadata snapshot (i.e., -skip_metadata_load is
-        # false), an exhaustive dataload will always be done. This again is the result
-        # of logic in, which itself ignores the value passed in
-        # for -exploration_strategy.
-        #
-        # See IMPALA-4399: " has bitrotted to some extent, and needs
-        #                   to be cleaned up"
-        create_load_data_args = ["-skip_snapshot_load", "-cm_host", self.cm_host,
-                                 "-snapshot_file", self.snapshot_file,
-                                 "-exploration_strategy", exploration_strategy]
-        self.create_load_data(*create_load_data_args, _env=new_env, _out=tee, _err=tee)
-        sh.hdfs.dfs("-chown", "-R", "impala:hdfs", "{0}".format(
-            self.config["hive_warehouse_directory"]))
-"Re-load HBase data")
-        # Manually load the HBase data last.
-        self.python("{0}/bin/".format(self.impala_home),
-                    "--hive_warehouse_dir={0}".format(
-                        self.config["hive_warehouse_directory"]),
-                    "--table_formats=hbase/none",
-                    "--hive_hs2_hostport={0}".format(self.config["hs2"]),
-                    "--hdfs_namenode={0}".format(self.config["namenode"]),
-                    "--exploration_strategy={0}".format(exploration_strategy),
-                    workloads="functional-query",
-                    force=True,
-                    impalad=self.main_impalad,
-                    _env=new_env,
-                    _out=tee,
-                    _err=tee)
-        self.compute_stats()
-"Load data finished")
-    # TODO: Should this be refactored out of this script? It has nothing to do with
-    # data loading per se. If tests rely on the environment on the client being set
-    # a certain way -- as in the prepare() method -- we may need to find another way
-    # to deal with that. See IMPALA-4376.
-    @timing
-    def test(self):
-        """Execute Impala's end-to-end tests against a remote cluster. All configuration
-        paramters are picked from the cluster configuration that was fetched via the
-        CM API."""
-        # TODO: Running tests via is currently not working against a remote
-        # cluster (although running directly via py.test seems to work.) This method
-        # may be refactored out of this file under IMPALA-4376, so for the time being,
-        # raise a NotImplementedError.
-        raise NotImplementedError
-        # Overwrite the username to match the service user on the remote system and deal
-        # with the assumption that in the local development environment the current user
-        # is HDFS superuser as well.
-        new_env = os.environ.copy()
-        new_env["LOGNAME"] = "impala"
-        new_env["USER"] = "impala"
-        new_env["USERNAME"] = "impala"
-        strategy = self.options.exploration_strategy
-"Running tests with exploration strategy {0}".format(strategy))
-        run_tests = sh.Command("{0}/tests/".format(self.impala_home))
-        run_tests("--skip_local_tests",
-                  "--exploration_strategy={0}".format(strategy),
-                  "--workload_exploration_strategy=functional-query:{0}".format(strategy),
-                  "--namenode_http_address={0}".format(self.config["namenode_http"]),
-                  "--hive_server2={0}".format(self.config["hs2"]),
-                  "--metastore_server={0}".format(self.config["metastore"]),
-                  "query_test",
-                  maxfail=10,
-                  impalad=",".join(self.config["impalad"]),
-                  _env=new_env,
-                  _out=tee,
-                  _err=tee)
-def parse_args():
-    parser = OptionParser()
-    parser.add_option("--snapshot-file", default=None,
-                      help="Path to the test-warehouse archive")
-    parser.add_option("--cm-user", default="admin", help="Cloudera Manager admin user")
-    parser.add_option("--cm-pass", default="admin",
-                      help="Cloudera Manager admin user password")
-    parser.add_option("--gateway", default=None,
-                      help=("Gateway host to upload the data from. If not set, uses the "
-                            "CM host as gateway."))
-    parser.add_option("--ssh-user", default="jenkins",
-                      help=("System user on the remote machine with passwordless "
-                            "SSH configured."))
-    parser.add_option("--no-load", action="store_false", default=True, dest="load",
-                      help="Do not try to load the snapshot")
-    parser.add_option("--exploration-strategy", default="core")
-    parser.add_option("--test", action="store_true", default=False,
-                      help="Run end-to-end tests against cluster")
-    parser.set_usage(" [options] cm_host")
-    options, args = parser.parse_args()
-    try:
-        return options, args[0]  # args[0] is the cm_host
-    except IndexError:
-        logger.error("You must supply the cm_host.")
-        parser.print_usage()
-        raise
-def main(cm_host, options):
-    """
-    Load data to a remote cluster (and/or run tests) as specified.
-    Args:
-        cm_host: FQDN or IP of the CM host machine
-        options: an optparse 'options' instance containing RemoteDataLoad
-                 values (though any object with the correct .attributes, e.g.
-                 a collections.namedtuple instance, would also work)
-    """
-    rd = RemoteDataLoad(cm_host, options)
-    if options.load:
-        rd.load()
-    if options.test:
-        rd.test()
-if __name__ == "__main__":
-    options, cm_host = parse_args()
-    main(cm_host=cm_host, options=options)