You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/09/27 22:57:10 UTC

[2/3] incubator-impala git commit: IMPALA-3980: qgen: re-enable Hive as a target database

IMPALA-3980: qgen: re-enable Hive as a target database

Changes:

* Added hive cli options back in (removed in commit "Stress test: Various changes")
* Modifications so that if --use-hive is specified, a Hive connection is actually created
* A few minor bug fixes so that the RQG can be run locally
* Modified MiniCluster to use HADOOP_CONF_DIR and HIVE_CONF_DIR rather than a hard-coded
file under IMPALA_HOME
* Fixed fe/src/test/resources/hive-default.xml so that it is a valid XML file, it was
missing a few element terminators that cause an exception in the cluster.py file

Testing:

* Hive integration tested locally by invoking the data generator via the command:

./data-generator.py \
--db-name=functional \
--use-hive \
--min-row-count=50 \
--max-row-count=100 \
--storage-file-formats textfile \
--use-postgresql \
--postgresql-user stakiar

and the discrepancy checker via the command:

./discrepancy-checker.py \
--db-name=functional \
--use-hive \
--use-postgresql \
--postgresql-user stakiar \
--test-db-type HIVE \
--timeout 300 \
--query-count 50 \
--profile hive

* The output of the above two commands is essentially the same as the Impala output,
however, about 20% of the queries will fail when the discrepancy checker is run
* Regression testing done by running Leopard in a local VM running Ubuntu 14.04, and by
running the discrepancy checker against Impala while inside an Impala Docker container

Change-Id: Ifb1199b50a5b65c21de7876fb70cc03bda1a9b46
Reviewed-on: http://gerrit.cloudera.org:8080/4011
Reviewed-by: Taras Bobrovytsky <tb...@cloudera.com>
Tested-by: Taras Bobrovytsky <tb...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0780d2c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0780d2c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0780d2c8

Branch: refs/heads/master
Commit: 0780d2c8af56f5439ad482319e4dd7f106e38047
Parents: 4d9c261
Author: Sahil Takiar <st...@cloudera.com>
Authored: Mon Aug 15 17:16:26 2016 -0700
Committer: Taras Bobrovytsky <tb...@cloudera.com>
Committed: Tue Sep 27 22:24:59 2016 +0000

----------------------------------------------------------------------
 fe/src/test/resources/hive-default.xml |  8 ++--
 tests/comparison/cli_options.py        | 23 ++++++++++-
 tests/comparison/cluster.py            | 62 ++++++++++++++++++++++++-----
 tests/comparison/data_generator.py     | 35 +++++++++++-----
 tests/comparison/db_connection.py      | 17 ++++++++
 5 files changed, 121 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0780d2c8/fe/src/test/resources/hive-default.xml
----------------------------------------------------------------------
diff --git a/fe/src/test/resources/hive-default.xml b/fe/src/test/resources/hive-default.xml
index 84a44e2..08e102b 100644
--- a/fe/src/test/resources/hive-default.xml
+++ b/fe/src/test/resources/hive-default.xml
@@ -479,6 +479,7 @@
     If the user has set hive.merge.mapfiles to true and hive.merge.mapredfiles to false, the idea was the
     number of reducers are few, so the number of files anyway are small. However, with this optimization,
     we are increasing the number of files possibly by a big margin. So, we merge aggresively.
+  </description>
 </property>
 
 <property>
@@ -487,6 +488,7 @@
   <description>Whether the version of hadoop which is running supports sub-directories for tables/partitions.
     Many hive optimizations can be applied if the hadoop version supports sub-directories for
     tables/partitions. It was added by MAPREDUCE-1501
+  </description>
 </property>
 
 <property>
@@ -552,9 +554,9 @@
     to know how to construct the canonical path. It just gives user choice if they want to change 
     the default directory name.
     For example, there are 2 skewed column c1 and c2. 2 skewed value: (1,a) and (2,b). subdirectory:
-    <partition-dir>/c1=1/c2=a/
-    <partition-dir>/c1=2/c2=b/
-    <partition-dir>/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/
+    &lt;partition-dir&gt;/c1=1/c2=a/
+    &lt;partition-dir&gt;/c1=2/c2=b/
+    &lt;partition-dir&gt;/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/
     Note: This config won't impact users if they don't list bucketing.
 </description>
 </property>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0780d2c8/tests/comparison/cli_options.py
----------------------------------------------------------------------
diff --git a/tests/comparison/cli_options.py b/tests/comparison/cli_options.py
index 4e28832..92901f4 100644
--- a/tests/comparison/cli_options.py
+++ b/tests/comparison/cli_options.py
@@ -24,7 +24,8 @@ from getpass import getuser
 from tempfile import gettempdir
 
 import db_connection
-from cluster import CmCluster, MiniCluster
+from cluster import CmCluster, DEFAULT_HIVE_HOST, DEFAULT_HIVE_PORT, MiniCluster, \
+  MiniHiveCluster
 from db_types import TYPES
 
 def add_logging_options(section, default_debug_log_file=None):
@@ -119,8 +120,10 @@ def create_cluster(args):
     cluster = CmCluster(args.cm_host, user=args.cm_user, password=args.cm_password,
         cluster_name=args.cm_cluster_name, ssh_user=args.ssh_user, ssh_port=args.ssh_port,
         ssh_key_file=args.ssh_key_file)
+  elif args.use_hive:
+    cluster = MiniHiveCluster(args.hive_host, args.hive_port)
   else:
-    cluster = MiniCluster(args.minicluster_num_impalads)
+    cluster = MiniCluster(args.hive_host, args.hive_port, args.minicluster_num_impalads)
   cluster.hadoop_user_name = args.hadoop_user_name
   return cluster
 
@@ -143,6 +146,20 @@ def add_timeout_option(section):
 
 
 def add_connection_option_groups(parser):
+
+  group = parser.add_argument_group("Hive Options")
+  group.add_argument('--use-hive', action='store_true', default=False,
+      help='Use Hive (Impala will be skipped)')
+  group.add_argument('--hive-host', default=DEFAULT_HIVE_HOST,
+      help="The name of the host running the HS2")
+  group.add_argument("--hive-port", default=DEFAULT_HIVE_PORT, type=int,
+      help="The port of HiveServer2")
+  group.add_argument('--hive-user', default='hive',
+      help="The user name to use when connecting to HiveServer2")
+  group.add_argument('--hive-password', default='hive',
+      help="The password to use when connecting to HiveServer2")
+  parser.add_argument_group(group)
+
   group = parser.add_argument_group('MySQL Options')
   group.add_argument('--use-mysql', action='store_true',
       help='Use MySQL')
@@ -208,6 +225,8 @@ def create_connection(args, db_type=None, db_name=None):
     conn_class = db_connection.MySQLConnection
   elif db_type == db_connection.ORACLE:
     conn_class = db_connection.OracleConnection
+  elif db_type == db_connection.HIVE:
+    conn_class = db_connection.HiveConnection
   else:
     raise Exception('Unexpected db_type: %s; expected one of %s.'
         % (db_type, ', '.join([db_connection.POSTGRESQL, db_connection.MYSQL,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0780d2c8/tests/comparison/cluster.py
----------------------------------------------------------------------
diff --git a/tests/comparison/cluster.py b/tests/comparison/cluster.py
index c602df1..a009e92 100644
--- a/tests/comparison/cluster.py
+++ b/tests/comparison/cluster.py
@@ -40,6 +40,7 @@ from sys import maxint
 from tempfile import mkdtemp
 from threading import Lock
 from time import mktime, strptime
+from urlparse import urlparse
 from xml.etree.ElementTree import parse as parse_xml
 from zipfile import ZipFile
 
@@ -51,6 +52,8 @@ from tests.util.parse_util import parse_glog, parse_mem_to_mb
 
 LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
 
+DEFAULT_HIVE_HOST = '127.0.0.1'
+DEFAULT_HIVE_PORT = 11050
 DEFAULT_TIMEOUT = 300
 
 class Cluster(object):
@@ -72,7 +75,7 @@ class Cluster(object):
     self._hive = None
     self._impala = None
 
-  def get_hadoop_config(self, key):
+  def _load_hadoop_config(self):
     if not self._hadoop_configs:
       self._hadoop_configs = dict()
       for file_name in os.listdir(self.local_hadoop_conf_dir):
@@ -87,7 +90,17 @@ class Cluster(object):
           if value is None or value.text is None:
             continue
           self._hadoop_configs[name.text] = value.text
-    return self._hadoop_configs[key]
+
+  def get_hadoop_config(self, key, default=None):
+    """Returns the Hadoop Configuration value mapped to the given key. If a default is
+       specified, it is returned if the key is cannot be found. If no default is specified
+       and the key cannot be found, a 'No Such Key' error will be thrown.
+    """
+    self._load_hadoop_config()
+    result = self._hadoop_configs.get(key, default)
+    if result is None:
+      raise KeyError
+    return result
 
   @abstractproperty
   def shell(self, cmd, host_name, timeout_secs=DEFAULT_TIMEOUT):
@@ -152,8 +165,11 @@ class Cluster(object):
 
 class MiniCluster(Cluster):
 
-  def __init__(self, num_impalads=3):
+  def __init__(self, hive_host=DEFAULT_HIVE_HOST, hive_port=DEFAULT_HIVE_PORT,
+               num_impalads=3):
     Cluster.__init__(self)
+    self.hive_host = hive_host
+    self.hive_port = hive_port
     self.num_impalads = num_impalads
 
   def shell(self, cmd, unused_host_name, timeout_secs=DEFAULT_TIMEOUT):
@@ -162,21 +178,28 @@ class MiniCluster(Cluster):
   def _init_local_hadoop_conf_dir(self):
     self._local_hadoop_conf_dir = mkdtemp()
 
-    node_conf_dir = os.path.join(os.environ["IMPALA_HOME"], "testdata", "cluster",
-        "cdh%s" % os.environ["CDH_MAJOR_VERSION"], "node-1", "etc", "hadoop", "conf")
+    node_conf_dir = self._get_node_conf_dir()
     for file_name in os.listdir(node_conf_dir):
       shutil.copy(os.path.join(node_conf_dir, file_name), self._local_hadoop_conf_dir)
 
-    other_conf_dir = os.path.join(os.environ["IMPALA_HOME"], "fe", "src", "test",
-        "resources")
+    other_conf_dir = self._get_other_conf_dir()
     for file_name in ["hive-site.xml"]:
       shutil.copy(os.path.join(other_conf_dir, file_name), self._local_hadoop_conf_dir)
 
+  def _get_node_conf_dir(self):
+    return os.path.join(os.environ["IMPALA_HOME"], "testdata", "cluster",
+                        "cdh%s" % os.environ["CDH_MAJOR_VERSION"], "node-1",
+                        "etc", "hadoop", "conf")
+
+  def _get_other_conf_dir(self):
+    return os.path.join(os.environ["IMPALA_HOME"], "fe", "src", "test",
+                        "resources")
+
   def _init_hdfs(self):
     self._hdfs = Hdfs(self, self.hadoop_user_name)
 
   def _init_hive(self):
-    self._hive = Hive(self, "127.0.0.1", 11050)
+    self._hive = Hive(self, self.hive_host, self.hive_port)
 
   def _init_impala(self):
     hs2_base_port = 21050
@@ -185,6 +208,23 @@ class MiniCluster(Cluster):
                 for p in xrange(self.num_impalads)]
     self._impala = Impala(self, impalads)
 
+class MiniHiveCluster(MiniCluster):
+  """
+  A MiniCluster useful for running against Hive. It allows Hadoop configuration files
+  to be specified by HADOOP_CONF_DIR and Hive configuration files to be specified by
+  HIVE_CONF_DIR.
+  """
+
+  def __init__(self, hive_host=DEFAULT_HIVE_HOST, hive_port=DEFAULT_HIVE_PORT):
+    MiniCluster.__init__(self)
+    self.hive_host = hive_host
+    self.hive_port = hive_port
+
+  def _get_node_conf_dir(self):
+    return os.environ["HADOOP_CONF_DIR"]
+
+  def _get_other_conf_dir(self):
+    return os.environ["HIVE_CONF_DIR"]
 
 class CmCluster(Cluster):
 
@@ -315,7 +355,8 @@ class Hdfs(Service):
 
   def create_client(self, as_admin=False):
     """Returns an HdfsClient."""
-    endpoint = self.cluster.get_hadoop_config("dfs.namenode.http-address")
+    endpoint = self.cluster.get_hadoop_config("dfs.namenode.http-address",
+                                              "0.0.0.0:50070")
     if endpoint.startswith("0.0.0.0"):
       endpoint.replace("0.0.0.0", "127.0.0.1")
     return HdfsClient("http://%s" % endpoint, use_kerberos=False,
@@ -412,7 +453,8 @@ class Hive(Service):
   @property
   def warehouse_dir(self):
     if not self._warehouse_dir:
-      self._warehouse_dir = self.cluster.get_hadoop_config("hive.metastore.warehouse.dir")
+      self._warehouse_dir = urlparse(
+        self.cluster.get_hadoop_config("hive.metastore.warehouse.dir")).path
     return self._warehouse_dir
 
   def connect(self, db_name=None):

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0780d2c8/tests/comparison/data_generator.py
----------------------------------------------------------------------
diff --git a/tests/comparison/data_generator.py b/tests/comparison/data_generator.py
index e919f98..e1f0a01 100755
--- a/tests/comparison/data_generator.py
+++ b/tests/comparison/data_generator.py
@@ -49,6 +49,7 @@ from db_types import (
     Timestamp,
     TYPES,
     VarChar)
+from tests.comparison import db_connection
 
 LOG = getLogger(__name__)
 
@@ -87,9 +88,10 @@ class DbPopulator(object):
 
   '''
 
-  def __init__(self):
+  def __init__(self, db_engine=db_connection.IMPALA):
     self.cluster = None
     self.db_name = None
+    self.db_engine = db_engine
 
     self.min_col_count = None
     self.max_col_count = None
@@ -147,9 +149,17 @@ class DbPopulator(object):
           cursor.execute('INSERT INTO %s SELECT * FROM %s'
               % (table.name, text_table.name))
           cursor.drop_table(text_table.name)
-    with self.cluster.impala.cursor(db_name=self.db_name) as cursor:
-      cursor.invalidate_metadata()
-      cursor.compute_stats()
+    if self.db_engine is db_connection.IMPALA:
+      with self.cluster.impala.cursor(db_name=self.db_name) as cursor:
+        cursor.invalidate_metadata()
+        cursor.compute_stats()
+    elif self.db_engine is db_connection.HIVE:
+      with self.cluster.hive.cursor(db_name=self.db_name) as cursor:
+        cursor.invalidate_metadata()
+        cursor.compute_stats()
+    else:
+      raise ValueError("db_engine must be of type %s or %s", db_connection.IMPALA,
+                       db_connection.HIVE)
     if postgresql_conn:
       with postgresql_conn.cursor() as postgresql_cursor:
         index_tables_in_db_if_possible(postgresql_cursor)
@@ -298,7 +308,7 @@ if __name__ == '__main__':
 
   cluster = cli_options.create_cluster(args)
 
-  populator = DbPopulator()
+  populator = DbPopulator(db_connection.HIVE if args.use_hive else db_connection.IMPALA)
   if command == 'populate':
     populator.randomization_seed = args.randomization_seed
     populator.cluster = cluster
@@ -308,10 +318,17 @@ if __name__ == '__main__':
     populator.min_row_count = args.min_row_count
     populator.max_row_count = args.max_row_count
     populator.allowed_storage_formats = args.storage_file_formats.split(',')
-    with cluster.impala.connect() as conn:
-      with conn.cursor() as cursor:
-        cursor.invalidate_metadata()
-        cursor.ensure_empty_db(args.db_name)
+
+    if args.use_hive:
+      with cluster.hive.connect() as conn:
+        with conn.cursor() as cursor:
+          cursor.ensure_empty_db(args.db_name)
+    else:
+      with cluster.impala.connect() as conn:
+        with conn.cursor() as cursor:
+          cursor.invalidate_metadata()
+          cursor.ensure_empty_db(args.db_name)
+
     if args.use_postgresql:
       with cli_options.create_connection(args) as postgresql_conn:
         with postgresql_conn.cursor() as cursor:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0780d2c8/tests/comparison/db_connection.py
----------------------------------------------------------------------
diff --git a/tests/comparison/db_connection.py b/tests/comparison/db_connection.py
index 2fe8174..13125c2 100644
--- a/tests/comparison/db_connection.py
+++ b/tests/comparison/db_connection.py
@@ -789,11 +789,28 @@ class ImpalaConnection(DbConnection):
         auth_mechanism=('GSSAPI' if self._use_kerberos else self._NON_KERBEROS_AUTH_MECH))
 
 
+class HiveCursor(ImpalaCursor):
+
+  def invalidate_metadata(self, table_name=None):
+    # There is no equivalent of "INVALIDATE METADATA" in Hive
+    pass
+
+  def compute_stats(self, table_name=None):
+    if table_name:
+      self.execute("ANALYZE TABLE %s COMPUTE STATISTICS" % table_name)
+      self.execute("ANALYZE TABLE %s COMPUTE STATISTICS FOR COLUMNS" % table_name)
+    else:
+      for table_name in self.list_table_names():
+        self.execute("ANALYZE TABLE %s COMPUTE STATISTICS" % table_name)
+        self.execute("ANALYZE TABLE %s COMPUTE STATISTICS FOR COLUMNS" % table_name)
+
+
 class HiveConnection(ImpalaConnection):
 
   PORT = 11050
 
   _DB_TYPE = HIVE
+  _CURSOR_CLASS = HiveCursor
   _NON_KERBEROS_AUTH_MECH = 'PLAIN'