You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2023/03/09 17:22:55 UTC

[impala] 06/06: IMPALA-11977: Fix Python 3 broken imports and object model differences

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0c7c6a335ef06dcd123804803f4d34fafcf4c9a8
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Sat Mar 4 15:17:46 2023 -0800

    IMPALA-11977: Fix Python 3 broken imports and object model differences
    
    Python 3 changed some object model methods:
     - __nonzero__ was removed in favor of __bool__
     - func_dict / func_name were removed in favor of __dict__ / __name__
     - The next() function was deprecated in favor of __next__
       (Code locations should use next(iter) rather than iter.next())
     - metaclasses are specified a different way
     - Locations that specify __eq__ should also specify __hash__
    
    Python 3 also moved some packages around (urllib2, Queue, httplib,
    etc), and this adapts the code to use the new locations (usually
    handled on Python 2 via future). This also fixes the code to
    avoid referencing exception variables outside the exception block
    and variables outside of a comprehension. Several of these seem
    like false positives, but it is better to avoid the warning.
    
    This fixes these pylint warnings:
    bad-python3-import
    eq-without-hash
    metaclass-assignment
    next-method-called
    nonzero-method
    exception-escape
    comprehension-escape
    
    Testing:
     - Ran core tests
     - Ran release exhaustive tests
    
    Change-Id: I988ae6c139142678b0d40f1f4170b892eabf25ee
    Reviewed-on: http://gerrit.cloudera.org:8080/19592
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 bin/banned_py3k_warnings.txt                      |  7 ++
 testdata/bin/generate-schema-statements.py        |  3 +-
 testdata/common/widetable.py                      |  8 +--
 tests/benchmark/report_benchmark_results.py       | 13 ++--
 tests/common/custom_cluster_test_suite.py         | 80 +++++++++++------------
 tests/common/impala_connection.py                 |  4 +-
 tests/common/test_result_verifier.py              | 15 +++++
 tests/comparison/cluster.py                       | 18 ++---
 tests/comparison/common.py                        |  6 ++
 tests/comparison/data_generator_mapred_common.py  |  4 +-
 tests/comparison/db_connection.py                 |  5 +-
 tests/comparison/db_types.py                      |  5 +-
 tests/comparison/query.py                         | 15 ++---
 tests/conftest.py                                 |  2 +-
 tests/custom_cluster/test_admission_controller.py | 20 +++---
 tests/custom_cluster/test_executor_groups.py      |  8 +--
 tests/custom_cluster/test_local_catalog.py        |  6 +-
 tests/custom_cluster/test_saml2_sso.py            | 32 +++++----
 tests/custom_cluster/test_udf_concurrency.py      |  4 +-
 tests/hs2/test_hs2.py                             |  5 +-
 tests/hs2/test_json_endpoints.py                  |  5 +-
 tests/performance/query.py                        |  3 +
 tests/query_test/test_insert_parquet.py           |  9 +++
 tests/shell/test_shell_commandline.py             |  2 +-
 tests/shell/test_shell_interactive.py             | 32 +++++----
 tests/statestore/test_statestore.py               |  8 ++-
 tests/stress/concurrent_select.py                 | 11 ++--
 tests/stress/query_retries_stress_runner.py       |  4 +-
 tests/util/concurrent_workload.py                 | 10 +--
 tests/util/filesystem_base.py                     |  5 +-
 tests/util/hdfs_util.py                           | 10 +--
 tests/util/ssh_util.py                            |  6 +-
 32 files changed, 211 insertions(+), 154 deletions(-)

diff --git a/bin/banned_py3k_warnings.txt b/bin/banned_py3k_warnings.txt
index 652d91075..ae4bc7c77 100644
--- a/bin/banned_py3k_warnings.txt
+++ b/bin/banned_py3k_warnings.txt
@@ -17,3 +17,10 @@ round-builtin
 deprecated-string-function
 sys-max-int
 exception-message-attribute
+bad-python3-import
+eq-without-hash
+metaclass-assignment
+next-method-called
+nonzero-method
+comprehension-escape
+exception-escape
diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py
index 232db516b..750fb3a8d 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -95,6 +95,7 @@
 # serially.
 #
 from __future__ import absolute_import, division, print_function
+from builtins import object
 import collections
 import csv
 import glob
@@ -592,7 +593,7 @@ class Statements(object):
     with open(filename, 'w') as f:
       f.write('\n\n'.join(output))
 
-  def __nonzero__(self):
+  def __bool__(self):
     return bool(self.create or self.load or self.load_base)
 
 def eval_section(section_str):
diff --git a/testdata/common/widetable.py b/testdata/common/widetable.py
index 405bdedf3..6bb8cd354 100755
--- a/testdata/common/widetable.py
+++ b/testdata/common/widetable.py
@@ -52,7 +52,7 @@ def get_columns(num_cols):
   iter = itertools.cycle(templates)
   # Produces [bool_col1, tinyint_col1, ..., bool_col2, tinyint_col2, ...]
   # The final list has 'num_cols' elements.
-  return [iter.next() % (i // len(templates) + 1) for i in range(num_cols)]
+  return [next(iter) % (i // len(templates) + 1) for i in range(num_cols)]
 
 # Data generators for different types. Each generator yields an infinite number of
 # value strings suitable for writing to a CSV file.
@@ -83,7 +83,7 @@ def quote(iter_fn):
   def new_iter_fn():
     iter = iter_fn()
     while True:
-      yield "'%s'" % iter.next()
+      yield "'%s'" % next(iter)
   return new_iter_fn
 
 def get_data(num_cols, num_rows, delimiter=',', quote_strings=False):
@@ -101,12 +101,12 @@ def get_data(num_cols, num_rows, delimiter=',', quote_strings=False):
     ]
   # Create a generator instance for each column, cycling through the different types
   iter = itertools.cycle(generators)
-  column_generators = [iter.next()() for i in range(num_cols)]
+  column_generators = [next(iter)() for i in range(num_cols)]
 
   # Populate each row using column_generators
   rows = []
   for i in range(num_rows):
-    vals = [gen.next() for gen in column_generators]
+    vals = [next(gen) for gen in column_generators]
     rows.append(delimiter.join(vals))
   return rows
 
diff --git a/tests/benchmark/report_benchmark_results.py b/tests/benchmark/report_benchmark_results.py
index b7b1c50a5..6a447d5ed 100755
--- a/tests/benchmark/report_benchmark_results.py
+++ b/tests/benchmark/report_benchmark_results.py
@@ -238,12 +238,15 @@ def all_query_results(grouped):
 
 
 def get_commit_date(commit_sha):
-  import urllib2
+  try:
+    from urllib.request import Request, urlopen
+  except ImportError:
+    from urllib2 import Request, urlopen
 
   url = 'https://api.github.com/repos/apache/impala/commits/' + commit_sha
   try:
-    request = urllib2.Request(url)
-    response = urllib2.urlopen(request).read()
+    request = Request(url)
+    response = urlopen(request).read()
     data = json.loads(response.decode('utf8'))
     return data['commit']['committer']['date'][:10]
   except:
@@ -251,7 +254,7 @@ def get_commit_date(commit_sha):
 
 def get_impala_version(grouped):
   """Figure out Impala version by looking at query profile."""
-  first_result = all_query_results(grouped).next()
+  first_result = next(all_query_results(grouped))
   profile = first_result['result_list'][0]['runtime_profile']
   match = re.search('Impala Version:\s(.*)\s\(build\s(.*)\)', profile)
   version = match.group(1)
@@ -915,7 +918,7 @@ class ExecSummaryComparison(object):
     table.align = 'l'
     table.float_format = '.2'
     table_contains_at_least_one_row = False
-    for row in [row for row in self.rows if is_significant(row)]:
+    for row in [r for r in self.rows if is_significant(r)]:
       table_row = [row[OPERATOR],
           '{0:.2%}'.format(row[PERCENT_OF_QUERY]),
           '{0:.2%}'.format(row[RSTD]),
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index fb951ff58..e56f59124 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -115,67 +115,67 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     method's func_dict"""
     def decorate(func):
       if impalad_args is not None:
-        func.func_dict[IMPALAD_ARGS] = impalad_args
-      func.func_dict[STATESTORED_ARGS] = statestored_args
+        func.__dict__[IMPALAD_ARGS] = impalad_args
+      func.__dict__[STATESTORED_ARGS] = statestored_args
       if catalogd_args is not None:
-        func.func_dict[CATALOGD_ARGS] = catalogd_args
+        func.__dict__[CATALOGD_ARGS] = catalogd_args
       if start_args is not None:
-        func.func_dict[START_ARGS] = start_args.split()
+        func.__dict__[START_ARGS] = start_args.split()
       if jvm_args is not None:
-        func.func_dict[JVM_ARGS] = jvm_args
+        func.__dict__[JVM_ARGS] = jvm_args
       if hive_conf_dir is not None:
-        func.func_dict[HIVE_CONF_DIR] = hive_conf_dir
+        func.__dict__[HIVE_CONF_DIR] = hive_conf_dir
       if kudu_args is not None:
-        func.func_dict[KUDU_ARGS] = kudu_args
+        func.__dict__[KUDU_ARGS] = kudu_args
       if default_query_options is not None:
-        func.func_dict[DEFAULT_QUERY_OPTIONS] = default_query_options
+        func.__dict__[DEFAULT_QUERY_OPTIONS] = default_query_options
       if impala_log_dir is not None:
-        func.func_dict[IMPALA_LOG_DIR] = impala_log_dir
+        func.__dict__[IMPALA_LOG_DIR] = impala_log_dir
       if cluster_size is not None:
-        func.func_dict[CLUSTER_SIZE] = cluster_size
+        func.__dict__[CLUSTER_SIZE] = cluster_size
       if num_exclusive_coordinators is not None:
-        func.func_dict[NUM_EXCLUSIVE_COORDINATORS] = num_exclusive_coordinators
+        func.__dict__[NUM_EXCLUSIVE_COORDINATORS] = num_exclusive_coordinators
       if statestored_timeout_s is not None:
-        func.func_dict[STATESTORED_TIMEOUT_S] = statestored_timeout_s
+        func.__dict__[STATESTORED_TIMEOUT_S] = statestored_timeout_s
       if impalad_timeout_s is not None:
-        func.func_dict[IMPALAD_TIMEOUT_S] = impalad_timeout_s
+        func.__dict__[IMPALAD_TIMEOUT_S] = impalad_timeout_s
       if expect_cores is not None:
-        func.func_dict[EXPECT_CORES] = expect_cores
+        func.__dict__[EXPECT_CORES] = expect_cores
       if reset_ranger is not False:
-        func.func_dict[RESET_RANGER] = True
+        func.__dict__[RESET_RANGER] = True
       return func
     return decorate
 
   def setup_method(self, method):
     cluster_args = list()
     for arg in [IMPALAD_ARGS, STATESTORED_ARGS, CATALOGD_ARGS, ADMISSIOND_ARGS, JVM_ARGS]:
-      if arg in method.func_dict:
-        cluster_args.append("--%s=%s " % (arg, method.func_dict[arg]))
-    if START_ARGS in method.func_dict:
-      cluster_args.extend(method.func_dict[START_ARGS])
+      if arg in method.__dict__:
+        cluster_args.append("--%s=%s " % (arg, method.__dict__[arg]))
+    if START_ARGS in method.__dict__:
+      cluster_args.extend(method.__dict__[START_ARGS])
 
-    if HIVE_CONF_DIR in method.func_dict:
-      self._start_hive_service(method.func_dict[HIVE_CONF_DIR])
+    if HIVE_CONF_DIR in method.__dict__:
+      self._start_hive_service(method.__dict__[HIVE_CONF_DIR])
       # Should let Impala adopt the same hive-site.xml. The only way is to add it in the
       # beginning of the CLASSPATH. Because there's already a hive-site.xml in the
       # default CLASSPATH (see bin/set-classpath.sh).
       cluster_args.append(
-        '--env_vars=CUSTOM_CLASSPATH=%s ' % method.func_dict[HIVE_CONF_DIR])
+        '--env_vars=CUSTOM_CLASSPATH=%s ' % method.__dict__[HIVE_CONF_DIR])
 
-    if KUDU_ARGS in method.func_dict:
-      self._restart_kudu_service(method.func_dict[KUDU_ARGS])
+    if KUDU_ARGS in method.__dict__:
+      self._restart_kudu_service(method.__dict__[KUDU_ARGS])
 
-    if RESET_RANGER in method.func_dict:
+    if RESET_RANGER in method.__dict__:
       self._reset_ranger_policy_repository()
 
     cluster_size = DEFAULT_CLUSTER_SIZE
-    if CLUSTER_SIZE in method.func_dict:
-      cluster_size = method.func_dict[CLUSTER_SIZE]
+    if CLUSTER_SIZE in method.__dict__:
+      cluster_size = method.__dict__[CLUSTER_SIZE]
 
     use_exclusive_coordinators = False
     num_coordinators = cluster_size
-    if NUM_EXCLUSIVE_COORDINATORS in method.func_dict:
-      num_coordinators = method.func_dict[NUM_EXCLUSIVE_COORDINATORS]
+    if NUM_EXCLUSIVE_COORDINATORS in method.__dict__:
+      num_coordinators = method.__dict__[NUM_EXCLUSIVE_COORDINATORS]
       use_exclusive_coordinators = True
 
     # Start a clean new cluster before each test
@@ -183,17 +183,17 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       "cluster_size": cluster_size,
       "num_coordinators": num_coordinators,
       "expected_num_impalads": cluster_size,
-      "default_query_options": method.func_dict.get(DEFAULT_QUERY_OPTIONS),
+      "default_query_options": method.__dict__.get(DEFAULT_QUERY_OPTIONS),
       "use_exclusive_coordinators": use_exclusive_coordinators
     }
-    if IMPALA_LOG_DIR in method.func_dict:
-      kwargs["impala_log_dir"] = method.func_dict[IMPALA_LOG_DIR]
-    if STATESTORED_TIMEOUT_S in method.func_dict:
-      kwargs["statestored_timeout_s"] = method.func_dict[STATESTORED_TIMEOUT_S]
-    if IMPALAD_TIMEOUT_S in method.func_dict:
-      kwargs["impalad_timeout_s"] = method.func_dict[IMPALAD_TIMEOUT_S]
-
-    if method.func_dict.get(EXPECT_CORES, False):
+    if IMPALA_LOG_DIR in method.__dict__:
+      kwargs["impala_log_dir"] = method.__dict__[IMPALA_LOG_DIR]
+    if STATESTORED_TIMEOUT_S in method.__dict__:
+      kwargs["statestored_timeout_s"] = method.__dict__[STATESTORED_TIMEOUT_S]
+    if IMPALAD_TIMEOUT_S in method.__dict__:
+      kwargs["impalad_timeout_s"] = method.__dict__[IMPALAD_TIMEOUT_S]
+
+    if method.__dict__.get(EXPECT_CORES, False):
       # Make a note of any core files that already exist
       possible_cores = find_all_files('*core*')
       self.pre_test_cores = set([f for f in possible_cores if is_core_dump(f)])
@@ -209,10 +209,10 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       super(CustomClusterTestSuite, self).setup_class()
 
   def teardown_method(self, method):
-    if HIVE_CONF_DIR in method.func_dict:
+    if HIVE_CONF_DIR in method.__dict__:
       self._start_hive_service(None)  # Restart Hive Service using default configs
 
-    if method.func_dict.get(EXPECT_CORES, False):
+    if method.__dict__.get(EXPECT_CORES, False):
       # The core dumps expected to be generated by this test should be cleaned up
       possible_cores = find_all_files('*core*')
       post_test_cores = set([f for f in possible_cores if is_core_dump(f)])
diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py
index 54c706d2b..f3832beda 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -22,6 +22,7 @@
 from __future__ import absolute_import, division, print_function
 import abc
 import codecs
+from future.utils import with_metaclass
 import logging
 import re
 
@@ -77,8 +78,7 @@ class OperationHandle(object):
 
 
 # Represents an Impala connection.
-class ImpalaConnection(object):
-  __metaclass__ = abc.ABCMeta
+class ImpalaConnection(with_metaclass(abc.ABCMeta, object)):
 
   def __enter__(self):
     return self
diff --git a/tests/common/test_result_verifier.py b/tests/common/test_result_verifier.py
index e3b90aad9..d71fe6349 100644
--- a/tests/common/test_result_verifier.py
+++ b/tests/common/test_result_verifier.py
@@ -65,6 +65,11 @@ class QueryTestResult(object):
       return False
     return self.column_types == other.column_types and self.rows == other.rows
 
+  def __hash__(self):
+    # This is not intended to be hashed. If that is happening, then something is wrong.
+    # The regexes in ResultRow make it difficult to implement this correctly.
+    assert False
+
   def __ne__(self, other):
     return not self.__eq__(other)
 
@@ -158,6 +163,11 @@ class ResultRow(object):
       return other.regex.match(self.row_string)
     return self.columns == other.columns
 
+  def __hash__(self):
+    # This is not intended to be hashed. If that is happening, then something is wrong.
+    # The regexes make it difficult to implement this correctly.
+    assert False
+
   def __ne__(self, other):
     return not self.__eq__(other)
 
@@ -225,6 +235,11 @@ class ResultColumn(object):
     else:
       return self.value == other.value
 
+  def __hash__(self):
+    # This is not intended to be hashed. If that is happening, then something is wrong.
+    # The regexes make it difficult to implement this correctly.
+    assert False
+
   def __ne__(self, other):
     return not self.__eq__(other)
 
diff --git a/tests/comparison/cluster.py b/tests/comparison/cluster.py
index 517df3f94..dc1daf8d6 100644
--- a/tests/comparison/cluster.py
+++ b/tests/comparison/cluster.py
@@ -22,6 +22,7 @@
 
 from __future__ import absolute_import, division, print_function
 from builtins import int, range, zip
+from future.utils import with_metaclass
 import hdfs
 import logging
 import os
@@ -34,17 +35,20 @@ from collections import defaultdict
 from collections import OrderedDict
 from contextlib import contextmanager
 from getpass import getuser
+from io import BytesIO
 from multiprocessing.pool import ThreadPool
 from random import choice
-from StringIO import StringIO
 from sys import maxsize
 from tempfile import mkdtemp
 from threading import Lock
 from time import mktime, strptime
-from urlparse import urlparse
 from xml.etree.ElementTree import parse as parse_xml
 from zipfile import ZipFile
 
+try:
+  from urllib.parse import urlparse
+except ImportError:
+  from urlparse import urlparse
 
 from tests.comparison.db_connection import HiveConnection, ImpalaConnection
 from tests.common.environ import HIVE_MAJOR_VERSION
@@ -65,14 +69,12 @@ CM_CLEAR_PORT = 7180
 CM_TLS_PORT = 7183
 
 
-class Cluster(object):
+class Cluster(with_metaclass(ABCMeta, object)):
   """This is a base class for clusters. Cluster classes provide various methods for
      interacting with a cluster. Ideally the various cluster implementations provide
      the same set of methods so any cluster implementation can be chosen at runtime.
   """
 
-  __metaclass__ = ABCMeta
-
   def __init__(self):
     self._hadoop_configs = None
     self._local_hadoop_conf_dir = None
@@ -323,7 +325,7 @@ class CmCluster(Cluster):
 
   def _init_local_hadoop_conf_dir(self):
     self._local_hadoop_conf_dir = mkdtemp()
-    data = StringIO(self.cm.get("/clusters/%s/services/%s/clientConfig"
+    data = BytesIO(self.cm.get("/clusters/%s/services/%s/clientConfig"
       % (self.cm_cluster.name, self._find_service("HIVE").name)))
     zip_file = ZipFile(data)
     for name in zip_file.namelist():
@@ -655,9 +657,7 @@ class CmImpala(Impala):
       raise Exception("Failed to restart Impala: %s" % command.resultMessage)
 
 
-class Impalad(object):
-
-  __metaclass__ = ABCMeta
+class Impalad(with_metaclass(ABCMeta, object)):
 
   def __init__(self):
     self.impala = None
diff --git a/tests/comparison/common.py b/tests/comparison/common.py
index 5b2e71dcd..fb768f310 100644
--- a/tests/comparison/common.py
+++ b/tests/comparison/common.py
@@ -451,6 +451,9 @@ class ArrayColumn(CollectionColumn):
       return True
     return self.name == other.name and self.owner.identifier == other.owner.identifier
 
+  def __hash__(self):
+    return hash((self.name, self.owner.identifier))
+
   def __deepcopy__(self, memo):
     other = ArrayColumn(
         owner=self.owner,
@@ -480,6 +483,9 @@ class MapColumn(CollectionColumn):
       return True
     return self.name == other.name and self.owner.identifier == other.owner.identifier
 
+  def __hash__(self):
+    return hash((self.name, self.owner.identifier))
+
   def __deepcopy__(self, memo):
     other = MapColumn(
         owner=self.owner,
diff --git a/tests/comparison/data_generator_mapred_common.py b/tests/comparison/data_generator_mapred_common.py
index 78d7d8d38..2367a0a3f 100644
--- a/tests/comparison/data_generator_mapred_common.py
+++ b/tests/comparison/data_generator_mapred_common.py
@@ -27,7 +27,7 @@ from __future__ import absolute_import, division, print_function
 from base import range
 import base64
 import pickle
-import StringIO
+from io import BytesIO
 
 from tests.comparison.db_types import Decimal
 from tests.comparison.random_val_generator import RandomValGenerator
@@ -94,7 +94,7 @@ def estimate_bytes_per_row(table_data_generator, row_count):
   original_row_count = table_data_generator.row_count
   original_output_file = table_data_generator.output_file
   table_data_generator.row_count = row_count
-  table_data_generator.output_file = StringIO.StringIO()
+  table_data_generator.output_file = BytesIO()
   table_data_generator.populate_output_file()
   table_data_generator.output_file.flush()
   bytes_per_row = len(table_data_generator.output_file.getvalue()) / float(row_count)
diff --git a/tests/comparison/db_connection.py b/tests/comparison/db_connection.py
index 0177deb6a..8705024eb 100644
--- a/tests/comparison/db_connection.py
+++ b/tests/comparison/db_connection.py
@@ -23,6 +23,7 @@
 '''
 from __future__ import absolute_import, division, print_function
 from builtins import filter, map, range, zip
+from future.utils import with_metaclass
 import hashlib
 import impala.dbapi
 import re
@@ -572,9 +573,7 @@ class DbCursor(object):
     return ()
 
 
-class DbConnection(object):
-
-  __metaclass__ = ABCMeta
+class DbConnection(with_metaclass(ABCMeta, object)):
 
   LOCK = Lock()
 
diff --git a/tests/comparison/db_types.py b/tests/comparison/db_types.py
index 5f78d4e84..27e2a6025 100644
--- a/tests/comparison/db_types.py
+++ b/tests/comparison/db_types.py
@@ -16,6 +16,7 @@
 # under the License.
 
 from __future__ import absolute_import, division, print_function
+from future.utils import with_metaclass
 import re
 import sys
 
@@ -43,15 +44,13 @@ class DataTypeMetaclass(type):
         getattr(other, 'CMP_VALUE', other.__name__))
 
 
-class DataType(ValExpr):
+class DataType(with_metaclass(DataTypeMetaclass, ValExpr)):
   '''Base class for data types.
 
      Data types are represented as classes so inheritance can be used.
 
   '''
 
-  __metaclass__ = DataTypeMetaclass
-
   @staticmethod
   def group_by_type(vals):
     '''Group cols by their data type and return a dict of the results.'''
diff --git a/tests/comparison/query.py b/tests/comparison/query.py
index dd73f895e..1d8004abd 100644
--- a/tests/comparison/query.py
+++ b/tests/comparison/query.py
@@ -16,7 +16,8 @@
 # under the License.
 
 from __future__ import absolute_import, division, print_function
-from builtins import range
+from builtins import object, range
+from future.utils import with_metaclass
 from abc import ABCMeta, abstractproperty
 from copy import deepcopy
 from logging import getLogger
@@ -47,13 +48,11 @@ class StatementExecutionMode(object):
   ) = range(5)
 
 
-class AbstractStatement(object):
+class AbstractStatement(with_metaclass(ABCMeta, object)):
   """
   Abstract query representation
   """
 
-  __metaclass__ = ABCMeta
-
   def __init__(self):
     # reference to statement's parent. For example the right side of a UNION clause
     # SELECT will have a parent as the SELECT on the left, which for the query
@@ -236,9 +235,9 @@ class SelectItemSubList(object):
   def __len__(self):
     return sum(1 for _ in self)
 
-  def __nonzero__(self):
+  def __bool__(self):
     try:
-      iter(self).next()
+      next(iter(self))
       return True
     except StopIteration:
       return False
@@ -260,7 +259,7 @@ class SelectItemSubList(object):
       items = list()
       while start < stop:
         try:
-          idx, item = self_iter.next()
+          idx, item = next(self_iter)
         except StopIteration:
           break
         if idx < start:
@@ -298,7 +297,7 @@ class SelectItemSubList(object):
       filtered_idx = 0
       while start < stop:
         try:
-          idx, item = self_iter.next()
+          idx, item = next(self_iter)
         except StopIteration:
           break
         if not self.filter(item):
diff --git a/tests/conftest.py b/tests/conftest.py
index d7bfd4372..eca1471c2 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -230,7 +230,7 @@ def pytest_generate_tests(metafunc):
 
     if len(vectors) == 0:
       LOG.warning("No test vectors generated for test '%s'. Check constraints and "
-          "input vectors" % metafunc.function.func_name)
+          "input vectors" % metafunc.function.__name__)
 
     vector_names = list(map(str, vectors))
     # In the case this is a test result update or sanity run, select a single test vector
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index c25719248..ab8068412 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -1538,11 +1538,11 @@ class TestAdmissionControllerWithACService(TestAdmissionController):
     if self.exploration_strategy() != 'exhaustive':
       pytest.skip('runs only in exhaustive')
 
-    if 'start_args' not in method.func_dict:
-      method.func_dict['start_args'] = list()
-    method.func_dict["start_args"].append("--enable_admission_service")
-    if "impalad_args" in method.func_dict:
-      method.func_dict["admissiond_args"] = method.func_dict["impalad_args"]
+    if 'start_args' not in method.__dict__:
+      method.__dict__['start_args'] = list()
+    method.__dict__["start_args"].append("--enable_admission_service")
+    if "impalad_args" in method.__dict__:
+      method.__dict__["admissiond_args"] = method.__dict__["impalad_args"]
     super(TestAdmissionController, self).setup_method(method)
 
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
@@ -2284,9 +2284,9 @@ class TestAdmissionControllerStressWithACService(TestAdmissionControllerStress):
   def setup_method(self, method):
     if self.exploration_strategy() != 'exhaustive':
       pytest.skip('runs only in exhaustive')
-    if 'start_args' not in method.func_dict:
-      method.func_dict['start_args'] = list()
-    method.func_dict["start_args"].append("--enable_admission_service")
-    if "impalad_args" in method.func_dict:
-      method.func_dict["admissiond_args"] = method.func_dict["impalad_args"]
+    if 'start_args' not in method.__dict__:
+      method.__dict__['start_args'] = list()
+    method.__dict__["start_args"].append("--enable_admission_service")
+    if "impalad_args" in method.__dict__:
+      method.__dict__["admissiond_args"] = method.__dict__["impalad_args"]
     super(TestAdmissionControllerStress, self).setup_method(method)
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 5d624d969..79d6d5c6d 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -44,10 +44,10 @@ class TestExecutorGroups(CustomClusterTestSuite):
 
   def setup_method(self, method):
     # Always start the base cluster with the coordinator in its own executor group.
-    existing_args = method.func_dict.get("impalad_args", "")
-    method.func_dict["impalad_args"] = "%s -executor_groups=coordinator" % existing_args
-    method.func_dict["cluster_size"] = 1
-    method.func_dict["num_exclusive_coordinators"] = 1
+    existing_args = method.__dict__.get("impalad_args", "")
+    method.__dict__["impalad_args"] = "%s -executor_groups=coordinator" % existing_args
+    method.__dict__["cluster_size"] = 1
+    method.__dict__["num_exclusive_coordinators"] = 1
     self.num_groups = 1
     self.num_impalads = 1
     super(TestExecutorGroups, self).setup_method(method)
diff --git a/tests/custom_cluster/test_local_catalog.py b/tests/custom_cluster/test_local_catalog.py
index 25e0c9f62..89f19e075 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -20,7 +20,7 @@
 from __future__ import absolute_import, division, print_function
 from builtins import range
 import pytest
-import Queue
+import queue
 import random
 import re
 import threading
@@ -261,7 +261,7 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
     inconsistent_seen = [0]
     inconsistent_seen_lock = threading.Lock()
     # Tracks query failures for all other reasons.
-    failed_queries = Queue.Queue()
+    failed_queries = queue.Queue()
     try:
       client1 = self.cluster.impalads[0].service.create_beeswax_client()
       client2 = self.cluster.impalads[1].service.create_beeswax_client()
@@ -378,7 +378,7 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
       replans_seen_lock = threading.Lock()
 
       # Queue to propagate exceptions from failed queries, if any.
-      failed_queries = Queue.Queue()
+      failed_queries = queue.Queue()
 
       def stress_thread(client):
         while replans_seen[0] == 0:
diff --git a/tests/custom_cluster/test_saml2_sso.py b/tests/custom_cluster/test_saml2_sso.py
index 2cf4babce..3df48b183 100644
--- a/tests/custom_cluster/test_saml2_sso.py
+++ b/tests/custom_cluster/test_saml2_sso.py
@@ -22,17 +22,23 @@ import datetime
 import os
 import pytest
 import uuid
-import urllib2
-import urlparse
 import xml.etree.ElementTree as ET
 import zlib
 
+try:
+  from urllib.parse import parse_qs, urlparse
+  from urllib.request import HTTPErrorProcessor, build_opener, Request
+except ImportError:
+  from urllib2 import HTTPErrorProcessor, build_opener, Request
+  from urlparse import parse_qs, urlparse
+
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.test_vector import ImpalaTestVector
 from tests.common.test_dimensions import create_client_protocol_dimension
 from tests.shell.util import run_impala_shell_cmd
 
-class NoRedirection(urllib2.HTTPErrorProcessor):
+
+class NoRedirection(HTTPErrorProcessor):
   """Allows inspecting http redirection responses. """
   def http_response(self, request, response):
     return response
@@ -150,8 +156,8 @@ class TestClientSaml(CustomClusterTestSuite):
   def _request_resource(self):
     """ Initial POST request to hs2-http port, response should be redirected
         to IDP and contain the authnrequest. """
-    opener = urllib2.build_opener(NoRedirection)
-    req = urllib2.Request("http://localhost:%s" % TestClientSaml.HOST_PORT, " ")
+    opener = build_opener(NoRedirection)
+    req = Request("http://localhost:%s" % TestClientSaml.HOST_PORT, " ")
     req.add_header('X-Hive-Token-Response-Port', TestClientSaml.CLIENT_PORT)
     response = opener.open(req)
     relay_state, client_id, saml_req_xml = \
@@ -161,11 +167,11 @@ class TestClientSaml(CustomClusterTestSuite):
 
   def _parse_redirection_response(self, response):
     assert response.getcode() == 302
-    client_id = response.info().getheader("X-Hive-Client-Identifier")
+    client_id = response.info().get("X-Hive-Client-Identifier", None)
     assert client_id is not None
-    new_url = response.info().getheader("location")
+    new_url = response.info()["location"]
     assert new_url.startswith(TestClientSaml.IDP_URL)
-    query = urlparse.parse_qs(urlparse.urlparse(new_url).query.encode('ASCII'))
+    query = parse_qs(urlparse(new_url).query.encode('ASCII'))
     relay_state = query["RelayState"][0]
     assert relay_state is not None
     saml_req = query["SAMLRequest"][0]
@@ -181,15 +187,15 @@ class TestClientSaml(CustomClusterTestSuite):
   def _request_resource_with_bearer(self, client_id, bearer_token):
     """ Send POST request to hs2-http port again, this time with bearer tokan.
         The response should contain a security cookie if the validation succeeded """
-    req = urllib2.Request("http://localhost:%s" % TestClientSaml.HOST_PORT, " ")
+    req = Request("http://localhost:%s" % TestClientSaml.HOST_PORT, " ")
     req.add_header('X-Hive-Client-Identifier', client_id)
     req.add_header('Authorization', "Bearer " + bearer_token)
-    opener = urllib2.build_opener(NoRedirection)
+    opener = build_opener(NoRedirection)
     response = opener.open(req)
     # saml2_ee_test_mode=true leads to returning 401 unauthorized - otherwise the
     # call would hang if there is no Thrift message.
     assert response.getcode() == 401
-    cookies = response.info().getheader('Set-Cookie')
+    cookies = response.info()['Set-Cookie']
     assert cookies.startswith("impala.auth=")
 
   def _send_authn_response(self, request_id, relay_state,
@@ -201,8 +207,8 @@ class TestClientSaml(CustomClusterTestSuite):
     authn_resp = self._generate_authn_response(request_id, attributes_xml)
     encoded_authn_resp = base64.urlsafe_b64encode(authn_resp)
     body = "SAMLResponse=%s&RelayState=%s" % (encoded_authn_resp, relay_state)
-    opener = urllib2.build_opener(NoRedirection)
-    req = urllib2.Request(TestClientSaml.SP_CALLBACK_URL, body)
+    opener = build_opener(NoRedirection)
+    req = Request(TestClientSaml.SP_CALLBACK_URL, body)
     response = opener.open(req)
     bearer_token = self._parse_xhtml_form(response, expect_success)
     return bearer_token
diff --git a/tests/custom_cluster/test_udf_concurrency.py b/tests/custom_cluster/test_udf_concurrency.py
index aa35eac39..3fd9aa837 100644
--- a/tests/custom_cluster/test_udf_concurrency.py
+++ b/tests/custom_cluster/test_udf_concurrency.py
@@ -128,7 +128,7 @@ class TestUdfConcurrency(CustomClusterTestSuite):
     # join all threads.
     for t in runner_threads: t.join()
 
-    for e in errors: print(e)
+    for err in errors: print(err)
 
     # Checks that no impalad has crashed.
     assert cluster.num_responsive_coordinators() == exp_num_coordinators
@@ -210,5 +210,5 @@ class TestUdfConcurrency(CustomClusterTestSuite):
     for t in runner_threads: t.join()
 
     # Check for any errors.
-    for e in errors: print(e)
+    for err in errors: print(err)
     assert len(errors) == 0
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index 6353fbf95..70191f032 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -29,7 +29,10 @@ import threading
 import time
 import uuid
 
-from urllib2 import urlopen
+try:
+  from urllib.request import urlopen
+except ImportError:
+  from urllib2 import urlopen
 
 from ImpalaService import ImpalaHiveServer2Service
 from tests.common.environ import ImpalaTestClusterProperties
diff --git a/tests/hs2/test_json_endpoints.py b/tests/hs2/test_json_endpoints.py
index 64bce618c..b531fcc45 100644
--- a/tests/hs2/test_json_endpoints.py
+++ b/tests/hs2/test_json_endpoints.py
@@ -22,7 +22,10 @@ import json
 import pytest
 from time import time
 
-from urllib2 import urlopen
+try:
+  from urllib.request import urlopen
+except ImportError:
+  from urllib2 import urlopen
 
 from tests.common.environ import IS_DOCKERIZED_TEST_CLUSTER
 from tests.common.impala_cluster import ImpalaCluster
diff --git a/tests/performance/query.py b/tests/performance/query.py
index 5832e5dfa..23e92686a 100644
--- a/tests/performance/query.py
+++ b/tests/performance/query.py
@@ -54,6 +54,9 @@ class Query(object):
             self.workload_name == other.workload_name and
             self.db == other.db)
 
+  def __hash__(self):
+    return hash((self.query_str, self.name, self.scale_factor, self.test_vector, self.db))
+
   def _build_query(self):
     """Populates db, query_str, table_format_str"""
     self.db = QueryTestSectionReader.get_db_name(self.test_vector, self.scale_factor)
diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py
index 9264643ab..ed9d819eb 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -54,6 +54,9 @@ class RoundFloat():
     """Compares this objects's value to a numeral after rounding it."""
     return round(self.value, self.num_digits) == round(numeral, self.num_digits)
 
+  def __hash__(self):
+    return hash(round(self.value, self.num_digits))
+
 
 class TimeStamp():
   """Class to construct timestamps with a default format specifier."""
@@ -68,6 +71,9 @@ class TimeStamp():
     """Compares this objects's value to another timetuple."""
     return self.timetuple == other_timetuple
 
+  def __hash__(self):
+    return hash(self.timetuple)
+
 
 class Date():
   """Class to compare dates specified as year-month-day to dates specified as days since
@@ -79,6 +85,9 @@ class Date():
   def __eq__(self, other_days_since_eopch):
     return self.days_since_epoch == other_days_since_eopch
 
+  def __hash__(self):
+    return hash(self.days_since_epoch)
+
 
 ColumnStats = namedtuple('ColumnStats', ['name', 'min', 'max', 'null_count'])
 
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index f02adba18..f469d2be3 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -80,7 +80,7 @@ def empty_table(unique_database, request):
   Returns:
     fq_table_name (str): the fully qualified name of the table: : dbname.table_name
   """
-  table_name = request.node.function.func_name
+  table_name = request.node.function.__name__
   fq_table_name = '.'.join([unique_database, table_name])
   stmt = "CREATE TABLE %s (i integer, s string)" % fq_table_name
   request.instance.execute_query_expect_success(request.instance.client, stmt,
diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py
index 101793613..d0920cdcf 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -19,7 +19,8 @@
 # under the License.
 
 from __future__ import absolute_import, division, print_function
-import httplib
+import http.client
+import http.server
 import logging
 import os
 import pexpect
@@ -27,6 +28,7 @@ import pytest
 import re
 import signal
 import socket
+import socketserver
 import sys
 import threading
 from time import sleep
@@ -47,8 +49,6 @@ from tests.common.test_dimensions import (
 from tests.shell.util import (assert_var_substitution, ImpalaShell, get_impalad_port,
   get_shell_cmd, get_open_sessions_metric, spawn_shell, get_unused_port,
   create_impala_shell_executable_dimension, get_impala_shell_executable)
-import SimpleHTTPServer
-import SocketServer
 
 QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell')
 
@@ -79,32 +79,30 @@ def tmp_history_file(request):
   return tmp.name
 
 
-class RequestHandler503(SimpleHTTPServer.SimpleHTTPRequestHandler):
+class RequestHandler503(http.server.SimpleHTTPRequestHandler):
   """A custom http handler that checks for duplicate 'Host' headers from the most
   recent http request, and always returns a 503 http code."""
 
   def __init__(self, request, client_address, server):
-    SimpleHTTPServer.SimpleHTTPRequestHandler.__init__(self, request, client_address,
-                                                       server)
+    http.server.SimpleHTTPRequestHandler.__init__(self, request, client_address,
+                                                   server)
 
   def should_send_body_text(self):
     # in RequestHandler503 we do not send any body text
     return False
 
   def do_POST(self):
-    # The unfortunately named self.headers here is an instance of mimetools.Message that
-    # contains the request headers.
-    request_headers = self.headers.headers
-
-    # Ensure that only one 'Host' header is contained in the request before responding.
-    host_hdr_count = sum([header.startswith('Host:') for header in request_headers])
-    assert host_hdr_count == 1, "duplicate 'Host:' headers in %s" % request_headers
+    # Ensure that a 'Host' header is contained in the request before responding.
+    assert "Host" in self.headers
 
     # Respond with 503.
-    self.send_response(code=httplib.SERVICE_UNAVAILABLE, message="Service Unavailable")
+    self.send_response(code=http.client.SERVICE_UNAVAILABLE,
+                       message="Service Unavailable")
+    # The Python 3 version of SimpleHTTPRequestHandler requires this to be called
+    # explicitly
+    self.end_headers()
     if self.should_send_body_text():
-      # Optionally send ody text with 503 message.
-      self.end_headers()
+      # Optionally send body text with 503 message.
       self.wfile.write("EXTRA")
 
 
@@ -123,7 +121,7 @@ class TestHTTPServer503(object):
   def __init__(self, clazz):
     self.HOST = "localhost"
     self.PORT = get_unused_port()
-    self.httpd = SocketServer.TCPServer((self.HOST, self.PORT), clazz)
+    self.httpd = socketserver.TCPServer((self.HOST, self.PORT), clazz)
 
     self.http_server_thread = threading.Thread(target=self.httpd.serve_forever)
     self.http_server_thread.start()
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index 7ca3d713c..ba7605ae4 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -25,9 +25,13 @@ import socket
 import threading
 import traceback
 import time
-import urllib2
 import uuid
 
+try:
+  from urllib.request import urlopen
+except ImportError:
+  from urllib2 import urlopen
+
 from Types.ttypes import TNetworkAddress
 from thrift.protocol import TBinaryProtocol
 from thrift.server.TServer import TServer
@@ -63,7 +67,7 @@ LOG = logging.getLogger('test_statestore')
 #    Test that topic deletions take effect correctly.
 
 def get_statestore_subscribers(host='localhost', port=25010):
-  response = urllib2.urlopen("http://{0}:{1}/subscribers?json".format(host, port))
+  response = urlopen("http://{0}:{1}/subscribers?json".format(host, port))
   page = response.read()
   return json.loads(page)
 
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index 96b8978f7..9503e530c 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -64,7 +64,7 @@ import re
 import signal
 import sys
 import threading
-from Queue import Empty   # Must be before Queue below
+from queue import Empty   # Must be before Queue below
 from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace, SUPPRESS
 from collections import defaultdict
 from copy import copy
@@ -196,10 +196,11 @@ def print_crash_info_if_exists(impala, start_time):
       LOG.info(
           "Timeout checking if impalads crashed: %s."
           % e + (" Will retry." if remaining_attempts else ""))
-  else:
-    LOG.error(
-        "Aborting after %s failed attempts to check if impalads crashed", max_attempts)
-    raise e
+      if not remaining_attempts:
+        LOG.error(
+            "Aborting after %s failed attempts to check if impalads crashed",
+            max_attempts)
+        raise e
   for message in crashed_impalads.values():
     print(message, file=sys.stderr)
   return crashed_impalads
diff --git a/tests/stress/query_retries_stress_runner.py b/tests/stress/query_retries_stress_runner.py
index 87560d3e2..cd221a8d1 100755
--- a/tests/stress/query_retries_stress_runner.py
+++ b/tests/stress/query_retries_stress_runner.py
@@ -34,7 +34,7 @@ import subprocess
 import sys
 import threading
 import traceback
-import Queue
+import queue
 
 from argparse import ArgumentParser
 from argparse import RawDescriptionHelpFormatter
@@ -113,7 +113,7 @@ def run_concurrent_workloads(concurrency, coordinator, database, queries):
 
   # The exception queue is used to pass errors from the workload threads back to the main
   # thread.
-  exception_queue = Queue.Queue()
+  exception_queue = queue.Queue()
 
   # The main method for the workload runner threads.
   def __run_workload(stream_id):
diff --git a/tests/util/concurrent_workload.py b/tests/util/concurrent_workload.py
index 2ceb81574..d44ebcd1b 100755
--- a/tests/util/concurrent_workload.py
+++ b/tests/util/concurrent_workload.py
@@ -28,7 +28,7 @@ import logging
 import _strptime  # noqa: F401
 import sys
 import time
-from Queue import Queue
+from queue import Queue
 from threading import current_thread, Event, Thread
 
 from tests.common.impala_cluster import ImpalaCluster
@@ -78,17 +78,17 @@ class ConcurrentWorkload(object):
           logging.exception("Caught error, stopping")
     logging.info("%s exiting" % current_thread().name)
 
-  def compute_query_rate(self, queue, stop_ev):
+  def compute_query_rate(self, queue_obj, stop_ev):
     """Computes the query throughput rate in queries per second averaged over the last 5
     seconds. This method only returns when 'stop_ev' is set by the caller."""
     AVG_WINDOW_S = 5
     times = []
     while not stop_ev.is_set():
       # Don't block to check for stop_ev
-      if queue.empty():
+      if queue_obj.empty():
         time.sleep(0.1)
         continue
-      queue.get()
+      queue_obj.get()
       now = time.time()
       times.append(now)
       # Keep only timestamps within the averaging window
@@ -118,7 +118,7 @@ class ConcurrentWorkload(object):
       self.stop()
     assert self.stop_ev.is_set(), "Stop event expected to be set but it isn't"
 
-  def _print_query_rate(self, queue, stop_ev):
+  def _print_query_rate(self, queue_obj, stop_ev):
     """Prints the query throughput rate until 'stop_ev' is set by the caller."""
     PERIOD_S = 1
 
diff --git a/tests/util/filesystem_base.py b/tests/util/filesystem_base.py
index 8f479bcb6..2f304bbaa 100644
--- a/tests/util/filesystem_base.py
+++ b/tests/util/filesystem_base.py
@@ -19,9 +19,10 @@
 
 from __future__ import absolute_import, division, print_function
 from abc import ABCMeta, abstractmethod
+from future.utils import with_metaclass
 
-class BaseFilesystem(object):
-  __metaclass__ = ABCMeta
+
+class BaseFilesystem(with_metaclass(ABCMeta, object)):
 
   @abstractmethod
   def create_file(self, path, file_data, overwrite):
diff --git a/tests/util/hdfs_util.py b/tests/util/hdfs_util.py
index ef5a0c38c..2a3cec858 100644
--- a/tests/util/hdfs_util.py
+++ b/tests/util/hdfs_util.py
@@ -19,7 +19,7 @@
 
 from __future__ import absolute_import, division, print_function
 import getpass
-import httplib
+import http.client
 import os.path
 import re
 import requests
@@ -130,7 +130,7 @@ class PyWebHdfsClientWithChmod(PyWebHdfsClient):
     '775'"""
     uri = self._create_uri(path, "SETPERMISSION", permission=permission)
     response = requests.put(uri, allow_redirects=True)
-    if not response.status_code == httplib.OK:
+    if not response.status_code == http.client.OK:
       _raise_pywebhdfs_exception(response.status_code, response.text)
     return True
 
@@ -138,21 +138,21 @@ class PyWebHdfsClientWithChmod(PyWebHdfsClient):
     """Sets the owner and the group of 'path to 'user' / 'group'"""
     uri = self._create_uri(path, "SETOWNER", owner=user, group=group)
     response = requests.put(uri, allow_redirects=True)
-    if not response.status_code == httplib.OK:
+    if not response.status_code == http.client.OK:
       _raise_pywebhdfs_exception(response.status_code, response.text)
     return True
 
   def setacl(self, path, acls):
     uri = self._create_uri(path, "SETACL", aclspec=acls)
     response = requests.put(uri, allow_redirects=True)
-    if not response.status_code == httplib.OK:
+    if not response.status_code == http.client.OK:
       _raise_pywebhdfs_exception(response.status_code, response.text)
     return True
 
   def getacl(self, path):
     uri = self._create_uri(path, "GETACLSTATUS")
     response = requests.get(uri, allow_redirects=True)
-    if not response.status_code == httplib.OK:
+    if not response.status_code == http.client.OK:
       _raise_pywebhdfs_exception(response.status_code, response.text)
     return response.json()
 
diff --git a/tests/util/ssh_util.py b/tests/util/ssh_util.py
index 61b249bf8..69aa13f97 100644
--- a/tests/util/ssh_util.py
+++ b/tests/util/ssh_util.py
@@ -72,9 +72,9 @@ class SshClient(paramiko.SSHClient):
         raise
       except Exception as e:
         LOG.warn("Error connecting to %s" % host_name, exc_info=True)
-    else:
-      LOG.error("Failed to ssh to %s" % host_name)
-      raise e
+        if retry >= retries - 1:
+          LOG.error("Failed to ssh to %s" % host_name)
+          raise e
 
     self.get_transport().set_keepalive(10)