You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2016/09/14 00:47:57 UTC

[1/3] incubator-impala git commit: IMPALA-3949: Log the error message in FileSystemUtil.copyToLocal()

Repository: incubator-impala
Updated Branches:
  refs/heads/master 64ffbc4b5 -> 0c874189e


IMPALA-3949: Log the error message in FileSystemUtil.copyToLocal()

To improve supportability, this commit logs the actual error stack trace
that can cause FileSystemUtil.copyToLocal() to fail. Additionaly this
also cleans up FileSystemUtil.isPathReachable() method to throw an
exception on failures rather than returning false and then returning
the error message as a string to callers.

Change-Id: I5664a75aa837951de1d5dcc90e43bd8f313736b8
Reviewed-on: http://gerrit.cloudera.org:8080/4125
Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6c446b35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6c446b35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6c446b35

Branch: refs/heads/master
Commit: 6c446b35b951ba27b4a458c23f408379495f85ff
Parents: 64ffbc4
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Thu Aug 25 12:45:31 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Sep 13 21:36:28 2016 +0000

----------------------------------------------------------------------
 .../com/cloudera/impala/analysis/HdfsUri.java   |  9 +++++-
 .../impala/catalog/CatalogServiceCatalog.java   |  6 ++--
 .../cloudera/impala/common/FileSystemUtil.java  | 32 +++-----------------
 .../cloudera/impala/util/AvroSchemaUtils.java   |  5 ++-
 4 files changed, 19 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c446b35/fe/src/main/java/com/cloudera/impala/analysis/HdfsUri.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/HdfsUri.java b/fe/src/main/java/com/cloudera/impala/analysis/HdfsUri.java
index 77c0369..9fbe467 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/HdfsUri.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/HdfsUri.java
@@ -88,8 +88,15 @@ public class HdfsUri {
     Path parentPath = uriPath_.getParent();
     try {
       FileSystem fs = uriPath_.getFileSystem(FileSystemUtil.getConfiguration());
+      boolean pathExists = false;
       StringBuilder errorMsg = new StringBuilder();
-      if (!FileSystemUtil.isPathReachable(parentPath, fs, errorMsg)) {
+      try {
+        pathExists = fs.exists(parentPath);
+        if (!pathExists) errorMsg.append("Path does not exist.");
+      } catch (Exception e) {
+        errorMsg.append(e.getMessage());
+      }
+      if (!pathExists) {
         analyzer.addWarning(String.format("Path '%s' cannot be reached: %s",
             parentPath, errorMsg.toString()));
       } else if (perm != FsAction.NONE) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c446b35/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java
index f43938b..27d25e1 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java
@@ -458,11 +458,13 @@ public class CatalogServiceCatalog extends Catalog {
     try {
       Path localJarPath = new Path(LOCAL_LIBRARY_PATH,
           UUID.randomUUID().toString() + ".jar");
-      if (!FileSystemUtil.copyToLocal(new Path(jarUri), localJarPath)) {
+      try {
+        FileSystemUtil.copyToLocal(new Path(jarUri), localJarPath);
+      } catch (IOException e) {
         String errorMsg = "Error loading Java function: " + db + "." +
             function.getFunctionName() + ". Couldn't copy " + jarUri +
             " to local path: " + localJarPath.toString();
-        LOG.error(errorMsg);
+        LOG.error(errorMsg, e);
         throw new ImpalaRuntimeException(errorMsg);
       }
       URL[] classLoaderUrls = new URL[] {new URL(localJarPath.toString())};

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c446b35/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java b/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java
index b0771e0..2239853 100644
--- a/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java
@@ -371,34 +371,12 @@ public class FileSystemUtil {
   }
 
   /**
-   * Copies the source file to a destination path on the local filesystem and returns true
-   * if successful.
+   * Copies the source file to a destination path on the local filesystem.
+   * Throws IOException on failure.
    */
-   public static boolean copyToLocal(Path source, Path dest) {
-     try {
-       FileSystem fs = source.getFileSystem(CONF);
-       fs.copyToLocalFile(source, dest);
-     } catch (IOException e) {
-       return false;
-     }
-     return true;
-   }
-
-  /**
-   * Return true if the path can be reached, false for all other cases
-   * File doesn't exist, cannot access the FileSystem, etc.
-   */
-  public static boolean isPathReachable(Path path, FileSystem fs, StringBuilder error_msg) {
-    try {
-      if (fs.exists(path)) {
-        return true;
-      } else {
-        error_msg.append("Path does not exist.");
-      }
-    } catch (Exception e) {
-      error_msg.append(e.getMessage());
-    }
-    return false;
+  public static void copyToLocal(Path source, Path dest) throws IOException {
+    FileSystem fs = source.getFileSystem(CONF);
+    fs.copyToLocalFile(source, dest);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c446b35/fe/src/main/java/com/cloudera/impala/util/AvroSchemaUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/util/AvroSchemaUtils.java b/fe/src/main/java/com/cloudera/impala/util/AvroSchemaUtils.java
index b1e7910..f86c347 100644
--- a/fe/src/main/java/com/cloudera/impala/util/AvroSchemaUtils.java
+++ b/fe/src/main/java/com/cloudera/impala/util/AvroSchemaUtils.java
@@ -83,10 +83,9 @@ public class AvroSchemaUtils {
         Path path = new Path(url);
         FileSystem fs = null;
         fs = path.getFileSystem(FileSystemUtil.getConfiguration());
-        StringBuilder errorMsg = new StringBuilder();
-        if (!FileSystemUtil.isPathReachable(path, fs, errorMsg)) {
+        if (!fs.exists(path)) {
           throw new AnalysisException(String.format(
-              "Invalid avro.schema.url: %s. %s", url, errorMsg));
+              "Invalid avro.schema.url: %s. Path does not exist.", url));
         }
         schema = FileSystemUtil.readFile(path);
       }


[2/3] incubator-impala git commit: IMPALA-3491: Use unique db in test_scanners.py and test_aggregation.py

Posted by ab...@apache.org.
IMPALA-3491: Use unique db in test_scanners.py and test_aggregation.py

Testing: Ran the tests locally in a loop on exhaustive.
Did a private debug/exhaustive run.

Change-Id: Ided0848c138bdc1d43694a12222010c48e23ee1c
Reviewed-on: http://gerrit.cloudera.org:8080/4339
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d3798758
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d3798758
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d3798758

Branch: refs/heads/master
Commit: d379875806daed018cdbc93f82d21c85ea5b212a
Parents: 6c446b3
Author: Alex Behm <al...@cloudera.com>
Authored: Wed Sep 7 10:56:38 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Sep 13 21:57:36 2016 +0000

----------------------------------------------------------------------
 .../QueryTest/aggregation_no_codegen_only.test  |  6 +--
 tests/query_test/test_aggregation.py            |  7 ++--
 tests/query_test/test_scanners.py               | 42 +++++++-------------
 3 files changed, 20 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3798758/testdata/workloads/functional-query/queries/QueryTest/aggregation_no_codegen_only.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/aggregation_no_codegen_only.test b/testdata/workloads/functional-query/queries/QueryTest/aggregation_no_codegen_only.test
index b648e30..cdf0eb1 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/aggregation_no_codegen_only.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/aggregation_no_codegen_only.test
@@ -1,12 +1,10 @@
 ====
 ---- QUERY
 # Regression test for IMPALA-901.
-DROP TABLE IF EXISTS functional.imp_901;
-CREATE TABLE functional.imp_901 (col tinyint);
 # The second value is carefully chosen to be equal to (int8_t)HashUtil::FNV_SEED, which
 # causes the hash collision that caused the bug.
-INSERT INTO functional.imp_901 VALUES(-59), (NULL);
-SELECT col FROM functional.imp_901 GROUP BY 1
+INSERT INTO imp_901 VALUES(-59), (NULL);
+SELECT col FROM imp_901 GROUP BY 1;
 ---- TYPES
 tinyint
 ---- RESULTS

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3798758/tests/query_test/test_aggregation.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_aggregation.py b/tests/query_test/test_aggregation.py
index 1b2ef12..3535637 100644
--- a/tests/query_test/test_aggregation.py
+++ b/tests/query_test/test_aggregation.py
@@ -110,14 +110,15 @@ class TestAggregationQueries(ImpalaTestSuite):
     if cls.exploration_strategy() == 'core':
       cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload()))
 
-  @pytest.mark.execute_serially
-  def test_non_codegen_tinyint_grouping(self, vector):
+  def test_non_codegen_tinyint_grouping(self, vector, unique_database):
     # Regression for IMPALA-901. The test includes an INSERT statement, so can only be run
     # on INSERT-able formats - text only in this case, since the bug doesn't depend on the
     # file format.
     if vector.get_value('table_format').file_format == 'text' \
         and vector.get_value('table_format').compression_codec == 'none':
-      self.run_test_case('QueryTest/aggregation_no_codegen_only', vector)
+      self.client.execute("create table %s.imp_901 (col tinyint)" % unique_database)
+      self.run_test_case('QueryTest/aggregation_no_codegen_only', vector,
+          unique_database)
 
   def test_aggregation(self, vector):
     if vector.get_value('table_format').file_format == 'hbase':

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3798758/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 3de56f4..28bae70 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -28,7 +28,7 @@ import re
 import tempfile
 from copy import deepcopy
 from parquet.ttypes import ConvertedType
-from subprocess import call, check_call
+from subprocess import check_call
 
 from testdata.common import widetable
 from tests.common.impala_test_suite import ImpalaTestSuite, LOG
@@ -546,10 +546,7 @@ class TestTextScanRangeLengths(ImpalaTestSuite):
 @SkipIfS3.hive
 @SkipIfIsilon.hive
 @SkipIfLocal.hive
-@pytest.mark.execute_serially
 class TestScanTruncatedFiles(ImpalaTestSuite):
-  TEST_DB = 'test_truncated_file'
-
   @classmethod
   def get_workload(self):
     return 'functional-query'
@@ -570,36 +567,25 @@ class TestScanTruncatedFiles(ImpalaTestSuite):
     else:
       cls.TestMatrix.add_constraint(lambda v: False)
 
-  def setup_method(self, method):
-    self.cleanup_db(TestScanTruncatedFiles.TEST_DB)
-    self.client.execute("create database %s location '%s/%s.db'" %
-        (TestScanTruncatedFiles.TEST_DB, WAREHOUSE,
-        TestScanTruncatedFiles.TEST_DB))
-
-  def teardown_method(self, method):
-    self.cleanup_db(TestScanTruncatedFiles.TEST_DB)
-
-  def test_scan_truncated_file_empty(self, vector):
-    self.scan_truncated_file(0)
+  def test_scan_truncated_file_empty(self, vector, unique_database):
+    self.scan_truncated_file(0, unique_database)
 
-  def test_scan_truncated_file(self, vector):
-    self.scan_truncated_file(10)
+  def test_scan_truncated_file(self, vector, unique_database):
+    self.scan_truncated_file(10, unique_database)
 
-  def scan_truncated_file(self, num_rows):
-    db_name = TestScanTruncatedFiles.TEST_DB
-    tbl_name = "tbl"
-    self.execute_query("use %s" % db_name)
-    self.execute_query("create table %s (s string)" % tbl_name)
-    call(["hive", "-e", "INSERT OVERWRITE TABLE %s.%s SELECT string_col from "\
-        "functional.alltypes" % (db_name, tbl_name)])
+  def scan_truncated_file(self, num_rows, db_name):
+    fq_tbl_name = db_name + ".truncated_file_test"
+    self.execute_query("create table %s (s string)" % fq_tbl_name)
+    self.run_stmt_in_hive("insert overwrite table %s select string_col from "
+        "functional.alltypes" % fq_tbl_name)
 
     # Update the Impala metadata
-    self.execute_query("refresh %s" % tbl_name)
+    self.execute_query("refresh %s" % fq_tbl_name)
 
     # Insert overwrite with a truncated file
-    call(["hive", "-e", "INSERT OVERWRITE TABLE %s.%s SELECT string_col from "\
-        "functional.alltypes limit %s" % (db_name, tbl_name, num_rows)])
+    self.run_stmt_in_hive("insert overwrite table %s select string_col from "
+        "functional.alltypes limit %s" % (fq_tbl_name, num_rows))
 
-    result = self.execute_query("select count(*) from %s" % tbl_name)
+    result = self.execute_query("select count(*) from %s" % fq_tbl_name)
     assert(len(result.data) == 1)
     assert(result.data[0] == str(num_rows))


[3/3] incubator-impala git commit: IMPALA-4122: qgen: fix bitrotted cluster unit tests

Posted by ab...@apache.org.
IMPALA-4122: qgen: fix bitrotted cluster unit tests

There's a small set of pytest-style tests and associated conftest for
testing some of the cluster-related test infrastructure Python objects.
Going forward, I want unit tests for the query generator to be run as
part of patch acceptance (CI isn't necessary at this time).

This patch fixes a bitrotted test and moves the tests into the
tests-for-qgen directory. The moves were performed thusly:

 $ git mv conftest.py tests/
 $ git mv cluster_tests.py tests/test_cluster.py

Change-Id: I3e855e265ae245ebe3691d077284ac5761909e00
Reviewed-on: http://gerrit.cloudera.org:8080/4404
Reviewed-by: Michael Brown <mi...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0c874189
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0c874189
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0c874189

Branch: refs/heads/master
Commit: 0c874189ed902d566b3df37959c937ff6a7bfee8
Parents: d379875
Author: Michael Brown <mi...@cloudera.com>
Authored: Tue Sep 13 12:30:26 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Sep 14 00:24:06 2016 +0000

----------------------------------------------------------------------
 tests/comparison/cluster_tests.py      | 94 -----------------------------
 tests/comparison/conftest.py           | 46 --------------
 tests/comparison/tests/conftest.py     | 62 +++++++++++++++++++
 tests/comparison/tests/test_cluster.py | 94 +++++++++++++++++++++++++++++
 4 files changed, 156 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c874189/tests/comparison/cluster_tests.py
----------------------------------------------------------------------
diff --git a/tests/comparison/cluster_tests.py b/tests/comparison/cluster_tests.py
deleted file mode 100644
index 976f423..0000000
--- a/tests/comparison/cluster_tests.py
+++ /dev/null
@@ -1,94 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# These are unit tests for cluster.py.
-
-from time import time
-
-from tests.common.errors import Timeout
-
-from common import Column, Table
-from db_types import BigInt, String
-
-class TestCluster(object):
-
-  def test_cmd(self, cluster):
-    host = cluster.impala.impalads[0].host_name
-    assert cluster.shell("echo -n HI", host) == "HI"
-
-    try:
-      cluster.shell("bad", host)
-      assert False
-    except Exception as e:
-      assert "command not found" in str(e)
-
-    start = time()
-    try:
-      cluster.shell("echo HI; sleep 60", host, timeout_secs=3)
-      assert False
-    except Timeout as e:
-      assert "HI" in str(e)
-      assert 3 <= time() - start <= 6
-
-
-class TestHdfs(object):
-
-  def test_ls(self, cluster):
-    ls = cluster.hdfs.create_client().list("/")
-    assert "tmp" in ls
-    assert "etc" not in ls
-
-
-class TestHive(object):
-
-  def test_list_databases(self, hive_cursor):
-    assert "default" in hive_cursor.list_db_names()
-
-  def test_non_mr_exec(self, hive_cursor):
-    hive_cursor.execute("SELECT 1")
-    rows = hive_cursor.fetchall()
-    assert rows
-    assert rows[0][0] == 1
-
-
-class TestImpala(object):
-
-  def test_list_databases(self, cursor):
-    assert "default" in cursor.list_db_names()
-
-  def test_exec(self, cursor):
-    cursor.execute("SELECT 1")
-    rows = cursor.fetchall()
-    assert rows
-    assert rows[0][0] == 1
-
-
-class TestModel(object):
-
-  def test_table_model(self, cursor, hive_cursor):
-    table = Table("some_test_table")
-    cursor.drop_table(table.name, if_exists=True)
-    table.storage_format = 'textfile'
-    table.cols.append(Column(table, "bigint_col", BigInt))
-    table.cols.append(Column(table, "string_col", String))
-    cursor.create_table(table)
-    try:
-      other = hive_cursor.describe_table(table.name)
-      assert other.name == table.name
-      assert other.cols == table.cols
-    finally:
-      cursor.drop_table(table.name)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c874189/tests/comparison/conftest.py
----------------------------------------------------------------------
diff --git a/tests/comparison/conftest.py b/tests/comparison/conftest.py
deleted file mode 100644
index 8126a84..0000000
--- a/tests/comparison/conftest.py
+++ /dev/null
@@ -1,46 +0,0 @@
-import pytest
-
-import cli_options
-from cluster import CmCluster, MiniCluster
-
-"""This module provides pytest 'fixtures'. See cluster_tests.py for usage."""
-
-__cluster = None
-
-def pytest_addoption(parser):
-  if not hasattr(parser, "add_argument"):
-    parser.add_argument = parser.addoption
-  cli_options.add_cm_options(parser)
-
-
-@pytest.fixture
-def cluster(request):
-  global __cluster
-  if not __cluster:
-    cm_host = get_option_value(request, "cm_host")
-    if cm_host:
-      __cluster = CmCluster(cm_host, port=get_option_value(request, "cm_port"),
-          user=get_option_value(request, "cm_user"),
-          password=get_option_value(request, "cm_password"),
-          cluster_name=get_option_value(request, "cm_cluster_name"))
-    else:
-      __cluster = MiniCluster()
-  return __cluster
-
-
-@pytest.yield_fixture
-def hive_cursor(request):
-  with cluster(request).hive.connect() as conn:
-    with conn.cursor() as cur:
-      yield cur
-
-
-@pytest.yield_fixture
-def cursor(request):
-  with cluster(request).impala.connect() as conn:
-    with conn.cursor() as cur:
-      yield cur
-
-
-def get_option_value(request, dest_var_name):
-  return request.config.getoption("--" + dest_var_name.replace("_", "-"))

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c874189/tests/comparison/tests/conftest.py
----------------------------------------------------------------------
diff --git a/tests/comparison/tests/conftest.py b/tests/comparison/tests/conftest.py
new file mode 100644
index 0000000..dd39fde
--- /dev/null
+++ b/tests/comparison/tests/conftest.py
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.comparison import cli_options
+from tests.comparison.cluster import CmCluster, MiniCluster
+
+
+__cluster = None
+
+def pytest_addoption(parser):
+  if not hasattr(parser, "add_argument"):
+    parser.add_argument = parser.addoption
+  cli_options.add_cm_options(parser)
+
+
+@pytest.fixture
+def cluster(request):
+  global __cluster
+  if not __cluster:
+    cm_host = get_option_value(request, "cm_host")
+    if cm_host:
+      __cluster = CmCluster(cm_host, port=get_option_value(request, "cm_port"),
+          user=get_option_value(request, "cm_user"),
+          password=get_option_value(request, "cm_password"),
+          cluster_name=get_option_value(request, "cm_cluster_name"))
+    else:
+      __cluster = MiniCluster()
+  return __cluster
+
+
+@pytest.yield_fixture
+def hive_cursor(request):
+  with cluster(request).hive.connect() as conn:
+    with conn.cursor() as cur:
+      yield cur
+
+
+@pytest.yield_fixture
+def cursor(request):
+  with cluster(request).impala.connect() as conn:
+    with conn.cursor() as cur:
+      yield cur
+
+
+def get_option_value(request, dest_var_name):
+  return request.config.getoption("--" + dest_var_name.replace("_", "-"))

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c874189/tests/comparison/tests/test_cluster.py
----------------------------------------------------------------------
diff --git a/tests/comparison/tests/test_cluster.py b/tests/comparison/tests/test_cluster.py
new file mode 100644
index 0000000..2e3bc54
--- /dev/null
+++ b/tests/comparison/tests/test_cluster.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# These are unit tests for cluster.py.
+
+from time import time
+
+from tests.common.errors import Timeout
+
+from tests.comparison.common import Column, Table
+from tests.comparison.db_types import BigInt, String
+
+class TestCluster(object):
+
+  def test_cmd(self, cluster):
+    host = cluster.impala.impalads[0].host_name
+    assert cluster.shell("echo -n HI", host) == "HI"
+
+    try:
+      cluster.shell("bad", host)
+      assert False
+    except Exception as e:
+      assert "command not found" in str(e)
+
+    start = time()
+    try:
+      cluster.shell("echo HI; sleep 60", host, timeout_secs=3)
+      assert False
+    except Timeout as e:
+      assert "HI" in str(e)
+      assert 3 <= time() - start <= 6
+
+
+class TestHdfs(object):
+
+  def test_ls(self, cluster):
+    ls = cluster.hdfs.create_client().list("/")
+    assert "tmp" in ls
+    assert "etc" not in ls
+
+
+class TestHive(object):
+
+  def test_list_databases(self, hive_cursor):
+    assert "default" in hive_cursor.list_db_names()
+
+  def test_non_mr_exec(self, hive_cursor):
+    hive_cursor.execute("SELECT 1")
+    rows = hive_cursor.fetchall()
+    assert rows
+    assert rows[0][0] == 1
+
+
+class TestImpala(object):
+
+  def test_list_databases(self, cursor):
+    assert "default" in cursor.list_db_names()
+
+  def test_exec(self, cursor):
+    cursor.execute("SELECT 1")
+    rows = cursor.fetchall()
+    assert rows
+    assert rows[0][0] == 1
+
+
+class TestModel(object):
+
+  def test_table_model(self, cursor, hive_cursor):
+    table = Table("some_test_table")
+    cursor.drop_table(table.name, if_exists=True)
+    table.storage_format = 'textfile'
+    table.add_col(Column(table, "bigint_col", BigInt))
+    table.add_col(Column(table, "string_col", String))
+    cursor.create_table(table)
+    try:
+      other = hive_cursor.describe_table(table.name)
+      assert other.name == table.name
+      assert other.cols == table.cols
+    finally:
+      cursor.drop_table(table.name)