You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/03/20 05:26:26 UTC

[impala] branch master updated (9cb6dab -> 030c12a)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 9cb6dab  IMPALA-8361: Propagate predicates of outer-joined InlineView
     new 1d27b91  IMPALA-9357: Fix race condition in alter_database event
     new ed15c2c  IMPALA-3343: Part 1 -- Fix simple python 2->3 syntax errors
     new 030c12a  IMPALA-9492: Fix test_unescaped_string_partition failing on S3

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../impala/catalog/CatalogServiceCatalog.java      | 138 ++++----
 fe/src/main/java/org/apache/impala/catalog/Db.java |  21 +-
 .../org/apache/impala/catalog/HdfsPartition.java   |  20 +-
 .../main/java/org/apache/impala/catalog/Table.java |  14 +-
 .../apache/impala/service/CatalogOpExecutor.java   | 148 ++++++---
 shell/TSSLSocketWithWildcardSAN.py                 |   2 +-
 shell/impala_client.py                             |  55 ++--
 shell/impala_shell.py                              | 355 +++++++++++----------
 shell/option_parser.py                             |  15 +-
 shell/pkg_resources.py                             |   2 +-
 shell/shell_output.py                              |  17 +-
 tests/custom_cluster/test_concurrent_ddls.py       |  34 +-
 tests/custom_cluster/test_event_processing.py      |  11 +-
 tests/metadata/test_recover_partitions.py          |  15 +-
 14 files changed, 493 insertions(+), 354 deletions(-)


[impala] 03/03: IMPALA-9492: Fix test_unescaped_string_partition failing on S3

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 030c12ab2c995a8618dcf601296e310203f70f8c
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Wed Mar 18 20:23:33 2020 +0800

    IMPALA-9492: Fix test_unescaped_string_partition failing on S3
    
    test_unescaped_string_partition in metadata/test_recover_partitions.py
    use hdfs clients to create four partition directories with special
    characters, i.e. single quote, double quotes and back slash. It aims to
    test on whether ALTER TABLE RECOVER PARTITIONS can recognize those
    directories correctly. However, when running against s3, only two
    directories are created as expected, which causes the failure.
    
    The reason is that when running against s3, we use hadoop cli for
    operations. A shell command will be launched for each operation. Passing
    arguments through shell results in duplicate unescaping. So the 4 dirs,
    [p=', p=", p=\', p=\"] finally became [p=', p=", p=', p="], resulting in
    two distinct directories. When the test running against hdfs, we use
    webhdfs_client so don't have this issue.
    
    Actually, we shouldn't use special characters in partition paths. Hive
    converts them to their ascii hex values when creating partition
    directories. E.g. partition paths of [p=', p=", p=\', p=\"] are
    [p=%27, p=%22, p=%5C%27, p=%5C%22]. We should follow this rule when
    creating directories in test. Also we won't have the above shell issue
    on s3 anymore.
    
    Tests:
     - Added two more special partitions in test_unescaped_string_partition.
     - Ran test_unescaped_string_partition in S3.
    
    Change-Id: I63d149c9bdec52c2e1c0b25c8c3f0448cf7bdadb
    Reviewed-on: http://gerrit.cloudera.org:8080/15475
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/metadata/test_recover_partitions.py | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git a/tests/metadata/test_recover_partitions.py b/tests/metadata/test_recover_partitions.py
index dcd393e..5d8d736 100644
--- a/tests/metadata/test_recover_partitions.py
+++ b/tests/metadata/test_recover_partitions.py
@@ -18,6 +18,7 @@
 # Impala tests for ALTER TABLE RECOVER PARTITIONS statement
 
 import os
+from six.moves import urllib
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfLocal, SkipIfS3, SkipIfCatalogV2
 from tests.common.test_dimensions import ALL_NODES_ONLY
@@ -363,16 +364,18 @@ class TestRecoverPartitions(ImpalaTestSuite):
 
     self.execute_query_expect_success(
         self.client, "CREATE TABLE %s (i int) PARTITIONED BY (p string)" % fq_tbl_name)
-    self.create_fs_partition(tbl_location, 'p=\"', "file_000", "1")
-    self.create_fs_partition(tbl_location, 'p=\'', "file_000", "2")
-    self.create_fs_partition(tbl_location, 'p=\\\"', "file_000", "3")
-    self.create_fs_partition(tbl_location, 'p=\\\'', "file_000", "4")
+    parts = ["\'", "\"", "\\\'", "\\\"", "\\\\\'", "\\\\\""]
+    for i in range(len(parts)):
+      # When creating partition directories, Hive replaces special characters in
+      # partition value string using the %xx escape. e.g. p=' will become p=%27.
+      hex_part = urllib.parse.quote(parts[i])
+      self.create_fs_partition(tbl_location, "p=%s" % hex_part, "file_%d" % i, str(i))
+
     self.execute_query_expect_success(
         self.client, "ALTER TABLE %s RECOVER PARTITIONS" % fq_tbl_name)
     result = self.execute_query_expect_success(
         self.client, "SHOW PARTITIONS %s" % fq_tbl_name)
-    assert self.count_partition(result.data) == 4
-    self.verify_partitions(['\"', '\'', '\\\"', '\\\''], result.data)
+    self.verify_partitions(parts, result.data)
 
   @SkipIfLocal.hdfs_client
   @SkipIfS3.empty_directory


[impala] 01/03: IMPALA-9357: Fix race condition in alter_database event

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1d27b91a36f687f958b02738ba7899652b2cfec7
Author: Vihang Karajgaonkar <vi...@cloudera.com>
AuthorDate: Thu Feb 6 13:43:17 2020 -0800

    IMPALA-9357: Fix race condition in alter_database event
    
    The race condition is exposed intermittently, on certain builds which
    causes test_event_processing::test_self_events test to fail. This
    happens because we are checking for self-event identifiers in the Db
    object without taking a lock. When a DDL like 'comment on
    database is 'test'' is executed, it is possible that the event
    processor thread is triggered as soon as the ALTER_DATABASE event is
    generated. This may cause event processor fail the self-event detection
    since the self-event identifiers are not yet added to the Db object.
    
    The fix adds a Db lock similar to Table lock. Alter db operations
    in CatalogOpExecutor now take db locks instead of metastoreDdlLock_
    which makes it consistent with table locking protocol.
    
    Testing:
    1. Ran existing tests for events processor.
    2. This test was failing on centos6 frequently (failed in 1/3 times).
    After the fix I ran the test in a loop for 24 hrs (197 iterations) and
    the test didn't fail.
    3. Ran core tests with CDP and CDH builds.
    
    Change-Id: I472fd8a55740769ee5cdb84e48422a4ab39a8d1e
    Reviewed-on: http://gerrit.cloudera.org:8080/15260
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/CatalogServiceCatalog.java      | 138 +++++++++++--------
 fe/src/main/java/org/apache/impala/catalog/Db.java |  21 +--
 .../org/apache/impala/catalog/HdfsPartition.java   |  20 +--
 .../main/java/org/apache/impala/catalog/Table.java |  14 +-
 .../apache/impala/service/CatalogOpExecutor.java   | 148 ++++++++++++++-------
 tests/custom_cluster/test_concurrent_ddls.py       |  34 +++--
 tests/custom_cluster/test_event_processing.py      |  11 +-
 7 files changed, 241 insertions(+), 145 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index b66d5ee..fb7ef4d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -192,9 +191,9 @@ public class CatalogServiceCatalog extends Catalog {
   private static final int MAX_NUM_SKIPPED_TOPIC_UPDATES = 2;
   // Timeout for acquiring a table lock
   // TODO: Make this configurable
-  private static final long TBL_LOCK_TIMEOUT_MS = 7200000;
+  private static final long LOCK_RETRY_TIMEOUT_MS = 7200000;
   // Time to sleep before retrying to acquire a table lock
-  private static final int TBL_LOCK_RETRY_MS = 10;
+  private static final int LOCK_RETRY_DELAY_MS = 10;
 
   private final TUniqueId catalogServiceId_;
 
@@ -402,8 +401,9 @@ public class CatalogServiceCatalog extends Catalog {
 
   /**
    * Tries to acquire versionLock_ and the lock of 'tbl' in that order. Returns true if it
-   * successfully acquires both within TBL_LOCK_TIMEOUT_MS millisecs; both locks are held
-   * when the function returns. Returns false otherwise and no lock is held in this case.
+   * successfully acquires both within LOCK_RETRY_TIMEOUT_MS millisecs; both locks are
+   * held when the function returns. Returns false otherwise and no lock is held in
+   * this case.
    */
   public boolean tryLockTable(Table tbl) {
     try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
@@ -423,12 +423,45 @@ public class CatalogServiceCatalog extends Catalog {
         versionLock_.writeLock().unlock();
         try {
           // Sleep to avoid spinning and allow other operations to make progress.
-          Thread.sleep(TBL_LOCK_RETRY_MS);
+          Thread.sleep(LOCK_RETRY_DELAY_MS);
         } catch (InterruptedException e) {
           // ignore
         }
         end = System.currentTimeMillis();
-      } while (end - begin < TBL_LOCK_TIMEOUT_MS);
+      } while (end - begin < LOCK_RETRY_TIMEOUT_MS);
+      return false;
+    }
+  }
+
+  /**
+   * Similar to tryLock on a table, but works on a database object instead of Table.
+   * TODO: Refactor the code so that both table and db can be "lockable" using a single
+   * method.
+   */
+  public boolean tryLockDb(Db db) {
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
+        "Attempting to lock database " + db.getName())) {
+      long begin = System.currentTimeMillis();
+      long end;
+      do {
+        versionLock_.writeLock().lock();
+        if (db.getLock().tryLock()) {
+          if (LOG.isTraceEnabled()) {
+            end = System.currentTimeMillis();
+            LOG.trace(String.format("Lock for db %s was acquired in %d msec",
+                db.getName(), end - begin));
+          }
+          return true;
+        }
+        versionLock_.writeLock().unlock();
+        try {
+          // Sleep to avoid spinning and allow other operations to make progress.
+          Thread.sleep(LOCK_RETRY_DELAY_MS);
+        } catch (InterruptedException e) {
+          // ignore
+        }
+        end = System.currentTimeMillis();
+      } while (end - begin < LOCK_RETRY_TIMEOUT_MS);
       return false;
     }
   }
@@ -781,42 +814,6 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Gets the list of versions for in-flight events for the given table. Applicable
-   * only when external event processing is enabled.
-   * @param dbName database name
-   * @param tblName table name
-   * @return List of previous version numbers for in-flight events on this table.
-   * If table is not laoded returns a empty list. If event processing is disabled,
-   * returns a empty list
-   */
-  public List<Long> getInFlightVersionsForEvents(String dbName, String tblName)
-      throws DatabaseNotFoundException, TableNotFoundException {
-    Preconditions.checkState(isEventProcessingActive(),
-        "Event processing should be enabled before calling this method");
-    List<Long> result = Collections.EMPTY_LIST;
-    versionLock_.readLock().lock();
-    try {
-      Db db = getDb(dbName);
-      if (db == null) {
-        throw new DatabaseNotFoundException(
-            String.format("Database %s not found", dbName));
-      }
-      if (tblName == null) {
-        return db.getVersionsForInflightEvents();
-      }
-      Table tbl = getTable(dbName, tblName);
-      if (tbl == null) {
-        throw new TableNotFoundException(
-            String.format("Table %s not found", new TableName(dbName, tblName)));
-      }
-      if (tbl instanceof IncompleteTable) return result;
-      return tbl.getVersionsForInflightEvents();
-    } finally {
-      versionLock_.readLock().unlock();
-    }
-  }
-
-  /**
    * Evaluates if the information from an event (serviceId and versionNumber) matches to
    * the catalog object. If there is match, the in-flight version for that object is
    * removed and method returns true. If it does not match, returns false
@@ -830,20 +827,45 @@ public class CatalogServiceCatalog extends Catalog {
         "Event processing should be enabled when calling this method");
     long versionNumber = ctx.getVersionNumberFromEvent();
     String serviceIdFromEvent = ctx.getServiceIdFromEvent();
+    LOG.debug("Input arguments for self-event evaluation: {} {}",versionNumber,
+        serviceIdFromEvent);
     // no version info or service id in the event
-    if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) return false;
+    if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) {
+      LOG.info("Not a self-event since the given version is {} and service id is {}",
+          versionNumber, serviceIdFromEvent);
+      return false;
+    }
     // if the service id from event doesn't match with our service id this is not a
     // self-event
-    if (!getCatalogServiceId().equals(serviceIdFromEvent)) return false;
+    if (!getCatalogServiceId().equals(serviceIdFromEvent)) {
+      LOG.info("Not a self-event because service id of this catalog {} does not match "
+          + "with one in event {}.", getCatalogServiceId(), serviceIdFromEvent);
+      return false;
+    }
     Db db = getDb(ctx.getDbName());
     if (db == null) {
       throw new DatabaseNotFoundException("Database " + ctx.getDbName() + " not found");
     }
     // if the given tblName is null we look db's in-flight events
     if (ctx.getTblName() == null) {
-      return db.removeFromVersionsForInflightEvents(versionNumber);
+      //TODO use read/write locks for both table and db
+      if (!tryLockDb(db)) {
+        throw new CatalogException("Could not acquire lock on database object " +
+            db.getName());
+      }
+      versionLock_.writeLock().unlock();
+      try {
+        boolean removed = db.removeFromVersionsForInflightEvents(versionNumber);
+        if (!removed) {
+          LOG.info("Could not find version {} in the in-flight event list of database "
+              + "{}", versionNumber, db.getName());
+        }
+        return removed;
+      } finally {
+        db.getLock().unlock();
+      }
     }
-    Table tbl = getTable(ctx.getDbName(), ctx.getTblName());
+    Table tbl = db.getTable(ctx.getTblName());
     if (tbl == null) {
       throw new TableNotFoundException(
           String.format("Table %s.%s not found", ctx.getDbName(), ctx.getTblName()));
@@ -859,7 +881,12 @@ public class CatalogServiceCatalog extends Catalog {
       List<List<TPartitionKeyValue>> partitionKeyValues = ctx.getPartitionKeyValues();
       // if the partitionKeyValues is null, we look for tbl's in-flight events
       if (partitionKeyValues == null) {
-        return tbl.removeFromVersionsForInflightEvents(versionNumber);
+        boolean removed = tbl.removeFromVersionsForInflightEvents(versionNumber);
+        if (!removed) {
+          LOG.info("Could not find version {} in in-flight event list of table {}",
+              versionNumber, tbl.getFullName());
+        }
+        return removed;
       }
       if (tbl instanceof HdfsTable) {
         List<String> failingPartitions = new ArrayList<>();
@@ -872,8 +899,11 @@ public class CatalogServiceCatalog extends Catalog {
             // should clean up the self-event state on the rest of the partitions
             String partName = HdfsTable.constructPartitionName(partitionKeyValue);
             if (hdfsPartition == null) {
-              LOG.warn(String.format("Partition %s not found during self-event "
-                + "evaluation for the table %s", partName, tbl.getFullName()));
+              LOG.info("Partition {} not found during self-event "
+                + "evaluation for the table {}", partName, tbl.getFullName());
+            } else {
+              LOG.info("Could not find {} in in-flight event list of the partition {} "
+                  + "of table {}", versionNumber, partName, tbl.getFullName());
             }
             failingPartitions.add(partName);
           }
@@ -895,11 +925,9 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public void addVersionsForInflightEvents(Table tbl, long versionNumber) {
     if (!isEventProcessingActive()) return;
-    // we generally don't take locks on Incomplete tables since they are atomically
-    // replaced during load
-    Preconditions.checkState(
-        tbl instanceof IncompleteTable || tbl.getLock().isHeldByCurrentThread());
     tbl.addToVersionsForInflightEvents(versionNumber);
+    LOG.info("Added catalog version {} in table's {} in-flight events",
+        versionNumber, tbl.getFullName());
   }
 
   /**
@@ -911,6 +939,8 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public void addVersionsForInflightEvents(Db db, long versionNumber) {
     if (!isEventProcessingActive()) return;
+    LOG.info("Added catalog version {} in database's {} in-flight events",
+        versionNumber, db.getName());
     db.addToVersionsForInflightEvents(versionNumber);
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 4bfd070..330227c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -21,10 +21,10 @@ import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.impala.analysis.ColumnDef;
@@ -100,6 +100,10 @@ public class Db extends CatalogObjectImpl implements FeDb {
   // tracks the in-flight metastore events for this db
   private final InFlightEvents inFlightEvents_ = new InFlightEvents();
 
+  // lock to make sure modifications to the Db object are atomically done along with
+  // its associated HMS operation (eg. alterDbOwner or commentOnDb)
+  private final ReentrantLock dbLock_ = new ReentrantLock();
+
   public Db(String name, org.apache.hadoop.hive.metastore.api.Database msDb) {
     setMetastoreDb(name, msDb);
     tableCache_ = new CatalogObjectCache<>();
@@ -139,6 +143,8 @@ public class Db extends CatalogObjectImpl implements FeDb {
     return msDb.getParameters().remove(k) != null;
   }
 
+  public ReentrantLock getLock() { return dbLock_; }
+
   @Override // FeDb
   public boolean isSystemDb() { return isSystemDb_; }
   @Override // FeDb
@@ -497,18 +503,14 @@ public class Db extends CatalogObjectImpl implements FeDb {
   }
 
   /**
-   * Gets the current list of versions for in-flight events for this database
-   */
-  public List<Long> getVersionsForInflightEvents() {
-    return inFlightEvents_.getAll();
-  }
-
-  /**
    * Removes a given version from the collection of version numbers for in-flight events
    * @param versionNumber version number to remove from the collection
    * @return true if version was successfully removed, false if didn't exist
    */
   public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+    Preconditions.checkState(dbLock_.isHeldByCurrentThread(),
+        "removeFromVersionsForInflightEvents called without getting the db lock for "
+            + getName() + " database.");
     return inFlightEvents_.remove(versionNumber);
   }
 
@@ -520,6 +522,9 @@ public class Db extends CatalogObjectImpl implements FeDb {
    * @param versionNumber version number to add
    */
   public void addToVersionsForInflightEvents(long versionNumber) {
+    Preconditions.checkState(dbLock_.isHeldByCurrentThread(),
+        "addToVersionsForInFlightEvents called without getting the db lock for "
+            + getName() + " database.");
     if (!inFlightEvents_.add(versionNumber)) {
       LOG.warn(String.format("Could not add version %s to the list of in-flight "
           + "events. This could cause unnecessary database %s invalidation when the "
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index d0664df..45a0a83 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -851,18 +851,14 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
   }
 
   /**
-   * Gets the current list of versions for in-flight events for this partition
-   */
-  public List<Long> getVersionsForInflightEvents() {
-    return inFlightEvents_.getAll();
-  }
-
-  /**
    * Removes a given version from the in-flight events
    * @param versionNumber version number to remove
    * @return true if the versionNumber was removed, false if it didn't exist
    */
   public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+    Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
+        "removeFromVersionsForInflightEvents called without holding the table lock on "
+            + "partition " + getPartitionName() + " of table " + table_.getFullName());
     return inFlightEvents_.remove(versionNumber);
   }
 
@@ -871,6 +867,9 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
    * @param versionNumber version number to add
    */
   public void addToVersionsForInflightEvents(long versionNumber) {
+    Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
+        "addToVersionsForInflightEvents called without holding the table lock on "
+            + "partition " + getPartitionName() + " of table " + table_.getFullName());
     if (!inFlightEvents_.add(versionNumber)) {
       LOG.warn(String.format("Could not add %s version to the partition %s of table %s. "
           + "This could cause unnecessary refresh of the partition when the event is"
@@ -881,17 +880,22 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
 
   /**
    * Adds the version from the given Partition parameters. No-op if the parameters does
-   * not contain the <code>MetastoreEventPropertyKey.CATALOG_VERSION</code>
+   * not contain the <code>MetastoreEventPropertyKey.CATALOG_VERSION</code>. This is
+   * done to detect add partition events from this catalog which are generated when
+   * partitions are added or recovered.
    */
   private void addInflightVersionsFromParameters() {
     Preconditions.checkNotNull(hmsParameters_);
     Preconditions.checkState(inFlightEvents_.size() == 0);
+    // we should not check for table lock being held here since there are certain code
+    // paths which call this method without holding the table lock (eg. getOrLoadTable())
     if (!hmsParameters_.containsKey(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())) {
       return;
     }
     inFlightEvents_.add(Long.parseLong(
             hmsParameters_.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())));
   }
+
   /**
    * Marks this partition's metadata as "dirty" indicating that changes have been
    * made and this partition's metadata should not be reused during the next
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 6107a12..db9fe3e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -794,18 +794,14 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
   }
 
   /**
-   * Gets the current list of versions for in-flight events for this table
-   */
-  public List<Long> getVersionsForInflightEvents() {
-    return inFlightEvents.getAll();
-  }
-
-  /**
    * Removes a given version from the collection of version numbers for in-flight events
    * @param versionNumber version number to remove from the collection
    * @return true if version was successfully removed, false if didn't exist
    */
   public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+    Preconditions.checkState(tableLock_.isHeldByCurrentThread(),
+        "removeFromVersionsForInFlightEvents called without taking the table lock on "
+            + getFullName());
     return inFlightEvents.remove(versionNumber);
   }
 
@@ -819,6 +815,10 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
    * capacity
    */
   public void addToVersionsForInflightEvents(long versionNumber) {
+    // we generally don't take locks on Incomplete tables since they are atomically
+    // replaced during load
+    Preconditions.checkState(
+        this instanceof IncompleteTable || tableLock_.isHeldByCurrentThread());
     if (!inFlightEvents.add(versionNumber)) {
       LOG.warn(String.format("Could not add %s version to the table %s. This could "
           + "cause unnecessary refresh of the table when the event is received by the "
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 92d8820..098f2e9 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -216,21 +216,27 @@ import org.slf4j.LoggerFactory;
  * updates, DDL operations should not directly modify the HMS objects of the catalog
  * objects but should operate on copies instead.
  *
- * The CatalogOpExecutor uses table-level locking to protect table metadata during
- * concurrent modifications and is responsible for assigning a new catalog version when
- * a table is modified (e.g. alterTable()).
+ * The CatalogOpExecutor uses table-level or Db object level locking to protect table
+ * metadata or database metadata respectively during concurrent modifications and is
+ * responsible for assigning a new catalog version when a table/Db is modified
+ * (e.g. alterTable() or alterDb()).
  *
  * The following locking protocol is employed to ensure that modifying
- * the table metadata and assigning a new catalog version is performed atomically and
+ * the table/Db metadata and assigning a new catalog version is performed atomically and
  * consistently in the presence of concurrent DDL operations. The following pattern
  * ensures that the catalog lock is never held for a long period of time, preventing
- * other DDL operations from making progress. This pattern only applies to single-table
+ * other DDL operations from making progress. This pattern only applies to single-table/Db
  * update operations and requires the use of fair table locks to prevent starvation.
+ * Additionally, this locking protocol is also followed in case of CREATE/DROP
+ * FUNCTION. In case of CREATE/DROP FUNCTION, we take the Db object lock since
+ * certain FUNCTION are stored in the HMS database parameters. Using this approach
+ * also makes sure that adding or removing functions from different databases do not
+ * block each other.
  *
  *   DO {
  *     Acquire the catalog lock (see CatalogServiceCatalog.versionLock_)
- *     Try to acquire a table lock
- *     IF the table lock acquisition fails {
+ *     Try to acquire a table/Db lock
+ *     IF the table/Db lock acquisition fails {
  *       Release the catalog lock
  *       YIELD()
  *     ELSE
@@ -241,15 +247,15 @@ import org.slf4j.LoggerFactory;
  *
  *   Increment and get a new catalog version
  *   Release the catalog lock
- *   Modify table metadata
- *   Release table lock
+ *   Modify table/Db metadata
+ *   Release table/Db lock
  *
  * Note: The getCatalogObjects() function is the only case where this locking pattern is
  * not used since it accesses multiple catalog entities in order to compute a snapshot
  * of catalog metadata.
  *
- * Operations that CREATE/DROP catalog objects such as tables and databases employ the
- * following locking protocol:
+ * Operations that CREATE/DROP catalog objects such as tables and databases
+ * (except for functions, see above) employ the following locking protocol:
  * 1. Acquire the metastoreDdlLock_
  * 2. Update the Hive Metastore
  * 3. Increment and get a new catalog version
@@ -257,6 +263,7 @@ import org.slf4j.LoggerFactory;
  * 5. Grant/revoke owner privilege if authorization with ownership is enabled.
  * 6. Release the metastoreDdlLock_
  *
+ *
  * It is imperative that other operations that need to hold both the catalog lock and
  * table locks at the same time follow the same locking protocol and acquire these
  * locks in that particular order. Also, operations that modify table metadata
@@ -1356,21 +1363,23 @@ public class CatalogOpExecutor {
     }
     boolean isPersistentJavaFn =
         (fn.getBinaryType() == TFunctionBinaryType.JAVA) && fn.isPersistent();
-    synchronized (metastoreDdlLock_) {
-      Db db = catalog_.getDb(fn.dbName());
-      if (db == null) {
-        throw new CatalogException("Database: " + fn.dbName() + " does not exist.");
-      }
-      // Get a new catalog version to assign to the database being altered. This is
-      // needed for events processor as this method creates alter database events.
-      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
-      addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
+    Db db = catalog_.getDb(fn.dbName());
+    if (db == null) {
+      throw new CatalogException("Database: " + fn.dbName() + " does not exist.");
+    }
+
+    tryLock(db, "creating function " + fn.getClass().getSimpleName());
+    // Get a new catalog version to assign to the database being altered. This is
+    // needed for events processor as this method creates alter database events.
+    long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+    catalog_.getLock().writeLock().unlock();
+    try {
       // Search for existing functions with the same name or signature that would
       // conflict with the function being added.
-      for (Function function: db.getFunctions(fn.functionName())) {
+      for (Function function : db.getFunctions(fn.functionName())) {
         if (isPersistentJavaFn || (function.isPersistent() &&
             (function.getBinaryType() == TFunctionBinaryType.JAVA)) ||
-                function.compare(fn, Function.CompareMode.IS_INDISTINGUISHABLE)) {
+            function.compare(fn, Function.CompareMode.IS_INDISTINGUISHABLE)) {
           if (!params.if_not_exists) {
             throw new CatalogException("Function " + fn.functionName() +
                 " already exists.");
@@ -1386,15 +1395,16 @@ public class CatalogOpExecutor {
         // the corresponding Jar and add each signature to the catalog.
         Preconditions.checkState(fn instanceof ScalarFunction);
         org.apache.hadoop.hive.metastore.api.Function hiveFn =
-            ((ScalarFunction)fn).toHiveFunction();
+            ((ScalarFunction) fn).toHiveFunction();
         List<Function> funcs = FunctionUtils.extractFunctions(fn.dbName(), hiveFn,
             BackendConfig.INSTANCE.getBackendCfg().local_library_path);
         if (funcs.isEmpty()) {
           throw new CatalogException(
-            "No compatible function signatures found in class: " + hiveFn.getClassName());
+              "No compatible function signatures found in class: " + hiveFn
+                  .getClassName());
         }
         if (addJavaFunctionToHms(fn.dbName(), hiveFn, params.if_not_exists)) {
-          for (Function addedFn: funcs) {
+          for (Function addedFn : funcs) {
             if (LOG.isTraceEnabled()) {
               LOG.trace(String.format("Adding function: %s.%s", addedFn.dbName(),
                   addedFn.signatureString()));
@@ -1404,7 +1414,14 @@ public class CatalogOpExecutor {
           }
         }
       } else {
+        //TODO(Vihang): addFunction method below directly updates the database
+        // parameters. If the applyAlterDatabase method below throws an exception,
+        // catalog might end up in a inconsistent state. Ideally, we should make a copy
+        // of hms Database object and then update the Db once the HMS operation succeeds
+        // similar to what happens in alterDatabaseSetOwner method.
         if (catalog_.addFunction(fn)) {
+          addCatalogServiceIdentifiers(db.getMetaStoreDb(),
+              catalog_.getCatalogServiceId(), newCatalogVersion);
           // Flush DB changes to metastore
           applyAlterDatabase(db.getMetaStoreDb());
           addedFunctions.add(fn.toTCatalogObject());
@@ -1421,6 +1438,8 @@ public class CatalogOpExecutor {
       } else {
         addSummary(resp, "Function already exists.");
       }
+    } finally {
+      db.getLock().unlock();
     }
   }
 
@@ -2098,24 +2117,26 @@ public class CatalogOpExecutor {
   private void dropFunction(TDropFunctionParams params, TDdlExecResponse resp)
       throws ImpalaException {
     FunctionName fName = FunctionName.fromThrift(params.fn_name);
-    synchronized (metastoreDdlLock_) {
-      Db db = catalog_.getDb(fName.getDb());
-      if (db == null) {
-        if (!params.if_exists) {
-            throw new CatalogException("Database: " + fName.getDb()
-                + " does not exist.");
-        }
-        addSummary(resp, "Database does not exist.");
-        return;
+    Db db = catalog_.getDb(fName.getDb());
+    if (db == null) {
+      if (!params.if_exists) {
+        throw new CatalogException("Database: " + fName.getDb()
+            + " does not exist.");
       }
-      // Get a new catalog version to assign to the database being altered. This is
-      // needed for events processor as this method creates alter database events.
-      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
-      addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
+      addSummary(resp, "Database does not exist.");
+      return;
+    }
+
+    tryLock(db, "dropping function " + fName);
+    // Get a new catalog version to assign to the database being altered. This is
+    // needed for events processor as this method creates alter database events.
+    long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+    catalog_.getLock().writeLock().unlock();
+    try {
       List<TCatalogObject> removedFunctions = Lists.newArrayList();
       if (!params.isSetSignature()) {
         dropJavaFunctionFromHms(fName.getDb(), fName.getFunction(), params.if_exists);
-        for (Function fn: db.getFunctions(fName.getFunction())) {
+        for (Function fn : db.getFunctions(fName.getFunction())) {
           if (fn.getBinaryType() != TFunctionBinaryType.JAVA
               || !fn.isPersistent()) {
             continue;
@@ -2125,7 +2146,7 @@ public class CatalogOpExecutor {
         }
       } else {
         ArrayList<Type> argTypes = Lists.newArrayList();
-        for (TColumnType t: params.arg_types) {
+        for (TColumnType t : params.arg_types) {
           argTypes.add(Type.fromThrift(t));
         }
         Function desc = new Function(fName, argTypes, Type.INVALID, false);
@@ -2136,6 +2157,8 @@ public class CatalogOpExecutor {
                 "Function: " + desc.signatureString() + " does not exist.");
           }
         } else {
+          addCatalogServiceIdentifiers(db.getMetaStoreDb(),
+              catalog_.getCatalogServiceId(), newCatalogVersion);
           // Flush DB changes to metastore
           applyAlterDatabase(db.getMetaStoreDb());
           removedFunctions.add(fn.toTCatalogObject());
@@ -2152,6 +2175,8 @@ public class CatalogOpExecutor {
         addSummary(resp, "Function does not exist.");
       }
       resp.result.setVersion(catalog_.getCatalogVersion());
+    } finally {
+      db.getLock().unlock();
     }
   }
 
@@ -4576,16 +4601,19 @@ public class CatalogOpExecutor {
   }
 
   private void alterCommentOnDb(String dbName, String comment, TDdlExecResponse response)
-      throws ImpalaRuntimeException, CatalogException {
+      throws ImpalaRuntimeException, CatalogException, InternalException {
     Db db = catalog_.getDb(dbName);
     if (db == null) {
       throw new CatalogException("Database: " + dbName + " does not exist.");
     }
-    synchronized (metastoreDdlLock_) {
-      // Get a new catalog version to assign to the database being altered.
-      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
-      addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
+    tryLock(db, "altering the comment");
+    // Get a new catalog version to assign to the database being altered.
+    long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+    catalog_.getLock().writeLock().unlock();
+    try {
       Database msDb = db.getMetaStoreDb().deepCopy();
+      addCatalogServiceIdentifiers(msDb, catalog_.getCatalogServiceId(),
+          newCatalogVersion);
       msDb.setDescription(comment);
       try {
         applyAlterDatabase(msDb);
@@ -4597,6 +4625,8 @@ public class CatalogOpExecutor {
       // now that HMS alter operation has succeeded, add this version to list of inflight
       // events in catalog database if event processing is enabled
       catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
+    } finally {
+      db.getLock().unlock();
     }
     addSummary(response, "Updated database.");
   }
@@ -4623,11 +4653,14 @@ public class CatalogOpExecutor {
       TDdlExecResponse response) throws ImpalaException {
     Preconditions.checkNotNull(params.owner_name);
     Preconditions.checkNotNull(params.owner_type);
-    synchronized (metastoreDdlLock_) {
-      // Get a new catalog version to assign to the database being altered.
-      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
-      addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
+    tryLock(db, "altering the owner");
+    // Get a new catalog version to assign to the database being altered.
+    long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+    catalog_.getLock().writeLock().unlock();
+    try {
       Database msDb = db.getMetaStoreDb().deepCopy();
+      addCatalogServiceIdentifiers(msDb, catalog_.getCatalogServiceId(),
+          newCatalogVersion);
       String originalOwnerName = msDb.getOwnerName();
       PrincipalType originalOwnerType = msDb.getOwnerType();
       msDb.setOwnerName(params.owner_name);
@@ -4647,6 +4680,8 @@ public class CatalogOpExecutor {
       // now that HMS alter operation has succeeded, add this version to list of inflight
       // events in catalog database if event processing is enabled
       catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
+    } finally {
+      db.getLock().unlock();
     }
     addSummary(response, "Updated database.");
   }
@@ -4656,9 +4691,9 @@ public class CatalogOpExecutor {
    * No-op if event processing is disabled
    */
   private void addCatalogServiceIdentifiers(
-      Db db, String catalogServiceId, long newCatalogVersion) {
+      Database msDb, String catalogServiceId, long newCatalogVersion) {
     if (!catalog_.isEventProcessingActive()) return;
-    org.apache.hadoop.hive.metastore.api.Database msDb = db.getMetaStoreDb();
+    Preconditions.checkNotNull(msDb);
     msDb.putToParameters(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
         catalogServiceId);
     msDb.putToParameters(MetastoreEventPropertyKey.CATALOG_VERSION.getKey(),
@@ -4775,6 +4810,17 @@ public class CatalogOpExecutor {
   }
 
   /**
+   * Try to lock the given Db in the catalog for the given operation. Throws
+   * InternalException if catalog is unable to lock the database.
+   */
+  private void tryLock(Db db, String operation) throws InternalException {
+    if (!catalog_.tryLockDb(db)) {
+      throw new InternalException(String.format("Error %s of database %s due to lock "
+          + "contention.", operation, db.getName()));
+    }
+  }
+
+  /**
    * Commits ACID transaction with given transaction id.
    * @param transactionId is the id of the transaction.
    * @throws TransactionException
diff --git a/tests/custom_cluster/test_concurrent_ddls.py b/tests/custom_cluster/test_concurrent_ddls.py
index 37a498a..695045b 100644
--- a/tests/custom_cluster/test_concurrent_ddls.py
+++ b/tests/custom_cluster/test_concurrent_ddls.py
@@ -97,22 +97,32 @@ class TestConcurrentDdls(CustomClusterTestSuite):
 
     def run_ddls(i):
       tbl_name = db + ".test_" + str(i)
-      for query_tmpl in [
+      # func_name = "f_" + str(i)
+      for query in [
+        # alter database operations
+        # TODO (IMPALA-9532): Uncomment the alter database operations
+        # "comment on database %s is 'test-concurrent-ddls'" % db,
+        # "alter database %s set owner user `test-user`" % db,
+        # "create function %s.%s() returns int location '%s/libTestUdfs.so' \
+        #    symbol='NoArgs'" % (db, func_name, WAREHOUSE),
+        # "drop function if exists %s.%s()" % (db, func_name),
         # Create a partitioned and unpartitioned table
-        "create table %s (i int)",
-        "create table %s_part (i int) partitioned by (j int)",
+        "create table %s (i int)" % tbl_name,
+        "create table %s_part (i int) partitioned by (j int)" % tbl_name,
         # Below queries could fail if running with invalidate metadata concurrently
-        "alter table %s_part add partition (j=1)",
-        "alter table %s_part add partition (j=2)",
-        "invalidate metadata %s_part",
-        "refresh %s",
-        "refresh %s_part",
-        "insert overwrite table %s select int_col from functional.alltypestiny",
-        "insert overwrite table %s_part partition(j=1) values (1), (2), (3), (4), (5)",
-        "insert overwrite table %s_part partition(j=2) values (1), (2), (3), (4), (5)"
+        "alter table %s_part add partition (j=1)" % tbl_name,
+        "alter table %s_part add partition (j=2)" % tbl_name,
+        "invalidate metadata %s_part" % tbl_name,
+        "refresh %s" % tbl_name,
+        "refresh %s_part" % tbl_name,
+        "insert overwrite table %s select int_col from "
+        "functional.alltypestiny" % tbl_name,
+        "insert overwrite table %s_part partition(j=1) "
+        "values (1), (2), (3), (4), (5)" % tbl_name,
+        "insert overwrite table %s_part partition(j=2) "
+        "values (1), (2), (3), (4), (5)" % tbl_name
       ]:
         try:
-          query = query_tmpl % tbl_name
           # TODO(IMPALA-9123): Timeout logic here does not work for DDLs since they are
           #  usually stuck in CREATED state and execute_async() won't return. We finally
           #  use timeout in pytest.mark.timeout() but it's not precise. We should find a
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index e9d5c4f..f39f55f 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -18,16 +18,13 @@
 import random
 import string
 import pytest
-import json
-import time
-import requests
 
-from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, \
-    SkipIfLocal, SkipIfHive2
+from tests.common.skip import SkipIfHive2
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
 from tests.util.hive_utils import HiveDbWrapper
 from tests.util.event_processor_utils import EventProcessorUtils
+from tests.util.filesystem_utils import WAREHOUSE
 
 
 @SkipIfS3.hive
@@ -242,8 +239,12 @@ class TestEventProcessing(CustomClusterTestSuite):
     self_event_test_queries = {
       # Queries which will increment the self-events-skipped counter
       True: [
+          # ALTER_DATABASE case
           "comment on database {0} is 'self-event test database'".format(db_name),
           "alter database {0} set owner user `test-user`".format(db_name),
+          "create function {0}.f() returns int location '{1}/libTestUdfs.so' "
+          "symbol='NoArgs'".format(db_name, WAREHOUSE),
+          "drop function {0}.f()".format(db_name),
           # ALTER_TABLE case
           "alter table {0}.{1} set TBLPROPERTIES ('k'='v')".format(db_name, tbl_name),
           "alter table {0}.{1} ADD COLUMN c1 int".format(db_name, tbl_name),


[impala] 02/03: IMPALA-3343: Part 1 -- Fix simple python 2->3 syntax errors

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ed15c2c58f75c47a1111690a30e81af3202e7f82
Author: David Knupp <dk...@cloudera.com>
AuthorDate: Wed Mar 18 15:59:43 2020 -0700

    IMPALA-3343: Part 1 -- Fix simple python 2->3 syntax errors
    
    In an effort to keep the work of reviewing the changes more manageable
    with regard to making the impala-shell python3 compatible, I'm trying
    to break the patches up into smaller chunks.
    
    The first patch is the easiest one -- simply addressing the handful of
    syntax issues that aren't python 3 compatible, namely changing the
    print statements to function calls, changing the way we catch exceptions,
    and adding a few simple branches to work around the removal of such
    things as dict.iteritems().
    
    We needed the print function imported from __future__ because it allows
    us to pass in a file descriptor, e.g., sys.stderr.
    
    Notably, there's nothing in this patch related to string/bytes/unicode
    changes from python 2 to 3.
    
    Change-Id: I9a515da01ef03d5936cb1a4d9e4bc6d105386b1d
    Reviewed-on: http://gerrit.cloudera.org:8080/15487
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 shell/TSSLSocketWithWildcardSAN.py |   2 +-
 shell/impala_client.py             |  55 +++---
 shell/impala_shell.py              | 355 +++++++++++++++++++------------------
 shell/option_parser.py             |  15 +-
 shell/pkg_resources.py             |   2 +-
 shell/shell_output.py              |  17 +-
 6 files changed, 243 insertions(+), 203 deletions(-)

diff --git a/shell/TSSLSocketWithWildcardSAN.py b/shell/TSSLSocketWithWildcardSAN.py
index 88fc119..c6ff1ca 100755
--- a/shell/TSSLSocketWithWildcardSAN.py
+++ b/shell/TSSLSocketWithWildcardSAN.py
@@ -65,7 +65,7 @@ class TSSLSocketWithWildcardSAN(TSSLSocket.TSSLSocket):
       self._match_hostname(cert, self.host)
       self.is_valid = True
       return
-    except CertificateError, ce:
+    except CertificateError as ce:
       raise TTransportException(
         type=TTransportException.UNKNOWN,
         message='Certificate error with remote host: %s' % (ce))
diff --git a/shell/impala_client.py b/shell/impala_client.py
index fbfca32..e74cfa4 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -16,6 +16,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from __future__ import print_function
 
 from bitarray import bitarray
 import base64
@@ -101,8 +102,6 @@ class QueryOptionLevels:
     """Return the integral value based on the string. Defaults to DEVELOPMENT."""
     return cls.NAME_TO_VALUES.get(string.upper(), cls.DEVELOPMENT)
 
-def print_to_stderr(message):
-  print >> sys.stderr, message
 
 class ImpalaClient(object):
   """Base class for shared functionality between HS2 and Beeswax. Includes stub methods
@@ -149,8 +148,8 @@ class ImpalaClient(object):
     assert self.transport and self.transport.isOpen()
 
     if self.verbose:
-      print_to_stderr('Opened TCP connection to %s:%s' % (self.impalad_host,
-          self.impalad_port))
+      msg = 'Opened TCP connection to %s:%s' % (self.impalad_host, self.impalad_port)
+      print(msg, file=sys.stderr)
     protocol = TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
     self.imp_service = self._get_thrift_client(protocol)
     self.connected = True
@@ -336,7 +335,7 @@ class ImpalaClient(object):
     # context in IMPALA-8864. CentOs 6 ships such an incompatible python version
     # out of the box.
     if not hasattr(ssl, "create_default_context"):
-      print_to_stderr("Python version too old. SSLContext not supported.")
+      print("Python version too old. SSLContext not supported.", file=sys.stderr)
       raise NotImplementedError()
     # Current implementation of ImpalaHttpClient does a close() and open() of the
     # underlying http connection on every flush() (THRIFT-4600). Due to this, setting a
@@ -344,15 +343,15 @@ class ImpalaClient(object):
     # block similary in case of problematic remote end points.
     # TODO: Investigate connection reuse in ImpalaHttpClient and revisit this.
     if connect_timeout_ms > 0:
-      print_to_stderr("Warning: --connect_timeout_ms is currently ignored with" +
-          " HTTP transport.")
+      print("Warning: --connect_timeout_ms is currently ignored with HTTP transport.",
+            file=sys.stderr)
 
     # HTTP server implemententations do not support SPNEGO yet.
     # TODO: when we add support for Kerberos+HTTP, we need to re-enable the automatic
     # kerberos retry logic in impala_shell.py that was disabled for HTTP because of
     # IMPALA-8932.
     if self.use_kerberos or self.kerberos_host_fqdn:
-      print_to_stderr("Kerberos not supported with HTTP endpoints.")
+      print("Kerberos not supported with HTTP endpoints.", file=sys.stderr)
       raise NotImplementedError()
 
     host_and_port = "{0}:{1}".format(self.impalad_host, self.impalad_port)
@@ -624,8 +623,8 @@ class ImpalaHS2Client(ImpalaClient):
         self.query_option_levels[name.upper()] = QueryOptionLevels.from_string(level)
     try:
       self.close_query(set_all_handle)
-    except Exception, e:
-      print str(e), type(e)
+    except Exception as e:
+      print('{0} {1}'.format(str(e), type(e)), file=sys.stderr)
       raise
 
   def close_connection(self):
@@ -637,7 +636,7 @@ class ImpalaHS2Client(ImpalaClient):
         req = TCloseSessionReq(self.session_handle)
         resp = self._do_hs2_rpc(lambda: self.imp_service.CloseSession(req))
         self._check_hs2_rpc_status(resp.status)
-      except Exception, e:
+      except Exception as e:
         print("Warning: close session RPC failed: {0}, {1}".format(str(e), type(e)))
       self.session_handle = None
     self._close_transport()
@@ -646,7 +645,7 @@ class ImpalaHS2Client(ImpalaClient):
     req = TPingImpalaHS2ServiceReq(self.session_handle)
     try:
       resp = self.imp_service.PingImpalaHS2Service(req)
-    except TApplicationException, t:
+    except TApplicationException as t:
       if t.type == TApplicationException.UNKNOWN_METHOD:
         raise MissingThriftMethodException(t.message)
       raise
@@ -685,8 +684,17 @@ class ImpalaHS2Client(ImpalaClient):
     # as a result prints the hex representation in the reverse order to how
     # bytes are laid out in guid.
     guid_bytes = last_query_handle.operationId.guid
-    return "{0}:{1}".format(guid_bytes[7::-1].encode('hex_codec'),
-                            guid_bytes[16:7:-1].encode('hex_codec'))
+    low_bytes_reversed = guid_bytes[7::-1]
+    high_bytes_reversed = guid_bytes[16:7:-1]
+
+    if sys.version_info.major < 3:
+      low_hex = low_bytes_reversed.encode('hex_codec')
+      high_hex = high_bytes_reversed.encode('hex_codec')
+    else:
+      low_hex = low_bytes_reversed.hex()
+      high_hex = high_bytes_reversed.hex()
+
+    return "{low}:{high}".format(low=low_hex, high=high_hex)
 
   def _fetch_one_batch(self, query_handle):
     assert query_handle.hasResultSet
@@ -817,10 +825,10 @@ class ImpalaHS2Client(ImpalaClient):
     self._check_connected()
     try:
       return rpc()
-    except TTransportException, e:
+    except TTransportException as e:
       # issue with the connection with the impalad
       raise DisconnectedException("Error communicating with impalad: %s" % e)
-    except TApplicationException, t:
+    except TApplicationException as t:
       # Suppress the errors from cancelling a query that is in waiting_to_finish state
       if suppress_error_on_cancel and self.is_query_cancelled:
         raise QueryCancelledByShellException()
@@ -875,7 +883,12 @@ class ImpalaBeeswaxClient(ImpalaClient):
     return ImpalaService.Client(protocol)
 
   def _options_to_string_list(self, set_query_options):
-    return ["%s=%s" % (k, v) for (k, v) in set_query_options.iteritems()]
+      if sys.version_info.major < 3:
+        key_value_pairs = set_query_options.iteritems()
+      else:
+        key_value_pairs = set_query_options.items()
+
+      return ["%s=%s" % (k, v) for (k, v) in key_value_pairs]
 
   def _open_session(self):
     # Beeswax doesn't have a "session" concept independent of connections, so
@@ -910,7 +923,7 @@ class ImpalaBeeswaxClient(ImpalaClient):
   def _ping_impala_service(self):
     try:
       resp = self.imp_service.PingImpalaService()
-    except TApplicationException, t:
+    except TApplicationException as t:
       if t.type == TApplicationException.UNKNOWN_METHOD:
         raise MissingThriftMethodException(t.message)
       raise
@@ -1056,15 +1069,15 @@ class ImpalaBeeswaxClient(ImpalaClient):
       raise QueryStateException('Error: Stale query handle')
     # beeswaxException prints out the entire object, printing
     # just the message is far more readable/helpful.
-    except BeeswaxService.BeeswaxException, b:
+    except BeeswaxService.BeeswaxException as b:
       # Suppress the errors from cancelling a query that is in fetch state
       if suppress_error_on_cancel and self.is_query_cancelled:
         raise QueryCancelledByShellException()
       raise RPCException("ERROR: %s" % b.message)
-    except TTransportException, e:
+    except TTransportException as e:
       # issue with the connection with the impalad
       raise DisconnectedException("Error communicating with impalad: %s" % e)
-    except TApplicationException, t:
+    except TApplicationException as t:
       # Suppress the errors from cancelling a query that is in waiting_to_finish state
       if suppress_error_on_cancel and self.is_query_cancelled:
         raise QueryCancelledByShellException()
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index e05a76f..87600d9 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -16,9 +16,10 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 #
 # Impala's shell
+from __future__ import print_function
+
 import cmd
 import errno
 import getpass
@@ -234,8 +235,8 @@ class ImpalaShell(object, cmd.Cmd):
         try:
           self.readline.set_history_length(int(options.history_max))
         except ValueError:
-          print_to_stderr("WARNING: history_max option malformed %s\n"
-            % options.history_max)
+          warning = "WARNING: history_max option malformed %s\n" % options.history_max
+          print(warning, file=sys.stderr)
           self.readline.set_history_length(1000)
       except ImportError:
         self._disable_readline()
@@ -277,9 +278,9 @@ class ImpalaShell(object, cmd.Cmd):
     The options are displayed in groups based on option levels received in parameter.
     Input parameter decides whether all groups or just the 'Regular' and 'Advanced'
     options are displayed."""
-    print "Query options (defaults shown in []):"
+    print("Query options (defaults shown in []):")
     if not self.imp_client.default_query_options and not self.set_query_options:
-      print '\tNo options available.'
+      print('\tNo options available.')
     else:
       (regular_options, advanced_options, development_options, deprecated_options,
           removed_options) = self._get_query_option_grouping()
@@ -288,14 +289,14 @@ class ImpalaShell(object, cmd.Cmd):
       # the advanced_options would be empty and only the regular options would
       # be displayed.
       if advanced_options:
-        print '\nAdvanced Query Options:'
+        print('\nAdvanced Query Options:')
         self._print_option_group(advanced_options)
       if print_mode == QueryOptionDisplayModes.ALL_OPTIONS:
         if development_options:
-          print '\nDevelopment Query Options:'
+          print('\nDevelopment Query Options:')
           self._print_option_group(development_options)
         if deprecated_options:
-          print '\nDeprecated Query Options:'
+          print('\nDeprecated Query Options:')
           self._print_option_group(deprecated_options)
     self._print_shell_options()
 
@@ -306,7 +307,11 @@ class ImpalaShell(object, cmd.Cmd):
     If the option level can't be determined then it defaults to 'REGULAR'"""
     (regular_options, advanced_options, development_options, deprecated_options,
         removed_options) = {}, {}, {}, {}, {}
-    for option_name, option_value in self.imp_client.default_query_options.iteritems():
+    if sys.version_info.major < 3:
+      client_default_query_opts = self.imp_client.default_query_options.iteritems()
+    else:
+      client_default_query_opts = self.imp_client.default_query_options.items()
+    for option_name, option_value in client_default_query_opts:
       level = self.imp_client.query_option_levels.get(option_name,
                                                       QueryOptionLevels.REGULAR)
       if level == QueryOptionLevels.REGULAR:
@@ -320,32 +325,32 @@ class ImpalaShell(object, cmd.Cmd):
       else:
         advanced_options[option_name] = option_value
     return (regular_options, advanced_options, development_options, deprecated_options,
-        removed_options)
+            removed_options)
 
   def _print_option_group(self, query_options):
     """Gets query options and prints them. Value is inside [] for the ones having
     default values.
     query_options parameter is a subset of the default_query_options map"""
-    for option_name in sorted(query_options):
-      if (option_name in self.set_query_options and
-          self.set_query_options[option_name] != query_options[option_name]):
-        print '\n'.join(["\t%s: %s" % (option_name, self.set_query_options[option_name])])
+    for option in sorted(query_options):
+      if (option in self.set_query_options and
+          self.set_query_options[option] != query_options[option]):  # noqa
+        print('\n'.join(["\t%s: %s" % (option, self.set_query_options[option])]))
       else:
-        print '\n'.join(["\t%s: [%s]" % (option_name, query_options[option_name])])
+        print('\n'.join(["\t%s: [%s]" % (option, query_options[option])]))
 
   def _print_variables(self):
     # Prints the currently defined variables.
     if not self.set_variables:
-      print '\tNo variables defined.'
+      print('\tNo variables defined.')
     else:
       for k in sorted(self.set_variables):
-        print '\n'.join(["\t%s: %s" % (k, self.set_variables[k])])
+        print('\n'.join(["\t%s: %s" % (k, self.set_variables[k])]))
 
   def _print_shell_options(self):
     """Prints shell options, which are local and independent of query options."""
-    print "\nShell Options"
+    print("\nShell Options")
     for x in self.VALID_SHELL_OPTIONS:
-      print "\t%s: %s" % (x, self.__dict__[self.VALID_SHELL_OPTIONS[x][1]])
+      print("\t%s: %s" % (x, self.__dict__[self.VALID_SHELL_OPTIONS[x][1]]))
 
   def _build_query_string(self, leading_comment, cmd, args):
     """Called to build a query string based on the parts output by parseline():
@@ -365,8 +370,8 @@ class ImpalaShell(object, cmd.Cmd):
       start_time = time.time()
       os.system(args)
       self._print_if_verbose("--------\nExecuted in %2.2fs" % (time.time() - start_time))
-    except Exception, e:
-      print_to_stderr('Error running command : %s' % e)
+    except Exception as e:
+      print('Error running command : %s' % e, file=sys.stderr)
       return CmdStatus.ERROR
 
   def _remove_comments_before_set(self, line):
@@ -400,7 +405,7 @@ class ImpalaShell(object, cmd.Cmd):
         # The print statement makes the new prompt appear in a new line.
         # Also print an extra newline to indicate that the current command has
         # been cancelled.
-        print '\n'
+        print('\n')
         return str()
     args = self._check_for_command_completion(args)
     args = self._remove_comments_before_set(args)
@@ -544,8 +549,8 @@ class ImpalaShell(object, cmd.Cmd):
                           self.ca_cert, self.user, self.ldap_password,
                           self.use_ldap, self.client_connect_timeout_ms, self.verbose)
     else:
-      print_to_stderr("Invalid --protocol value {0}, must be beeswax, hs2 or hs2-http."
-              .format(protocol))
+      err_msg = "Invalid --protocol value {0}, must be beeswax, hs2 or hs2-http."
+      print(err_msg.format(protocol), file=sys.stderr)
       raise FatalShellException()
 
   def close_connection(self):
@@ -569,7 +574,7 @@ class ImpalaShell(object, cmd.Cmd):
     for cancel_try in xrange(ImpalaShell.CANCELLATION_TRIES):
       try:
         self.imp_client.is_query_cancelled = True
-        print_to_stderr(ImpalaShell.CANCELLATION_MESSAGE)
+        print(ImpalaShell.CANCELLATION_MESSAGE, file=sys.stderr)
         new_imp_client = self._new_impala_client()
         new_imp_client.connect()
         try:
@@ -578,13 +583,14 @@ class ImpalaShell(object, cmd.Cmd):
         finally:
           new_imp_client.close_connection()
         break
-      except Exception, e:
+      except Exception as e:
         # Suppress harmless errors.
         err_msg = str(e).strip()
         if err_msg in ['ERROR: Cancelled', 'ERROR: Invalid or unknown query handle']:
           break
-        print_to_stderr("Failed to reconnect and close (try %i/%i): %s" % (
-            cancel_try + 1, ImpalaShell.CANCELLATION_TRIES, err_msg))
+        err_details = "Failed to reconnect and close (try %i/%i): %s"
+        print(err_details % (cancel_try + 1, ImpalaShell.CANCELLATION_TRIES, err_msg),
+              file=sys.stderr)
 
   def _is_quit_command(self, command):
     # Do a case insensitive check
@@ -609,7 +615,7 @@ class ImpalaShell(object, cmd.Cmd):
       return str()
     # There is no need to reconnect if we are quitting.
     if not self.imp_client.is_connected() and not self._is_quit_command(args):
-      print_to_stderr("Connection lost, reconnecting...")
+      print("Connection lost, reconnecting...", file=sys.stderr)
       self._connect()
       self._validate_database(immediately=True)
     return args.encode('utf-8')
@@ -660,22 +666,22 @@ class ImpalaShell(object, cmd.Cmd):
 
   def do_summary(self, args):
     if not self.last_query_handle:
-      print_to_stderr("Could not retrieve summary: no previous query.")
+      print("Could not retrieve summary: no previous query.", file=sys.stderr)
       return CmdStatus.ERROR
 
     summary = None
     try:
       summary = self.imp_client.get_summary(self.last_query_handle)
-    except RPCException, e:
+    except RPCException as e:
       import re
       error_pattern = re.compile("ERROR: Query id \d+:\d+ not found.")
       if error_pattern.match(e.value):
-        print_to_stderr("Could not retrieve summary for query.")
+        print("Could not retrieve summary for query.", file=sys.stderr)
       else:
-        print_to_stderr(e)
+        print(e, file=sys.stderr)
       return CmdStatus.ERROR
     if summary.nodes is None:
-      print_to_stderr("Summary not available")
+      print("Summary not available", file=sys.stderr)
       return CmdStatus.SUCCESS
     output = []
     table = self._default_summary_table()
@@ -702,7 +708,7 @@ class ImpalaShell(object, cmd.Cmd):
 
   def _print_with_set(self, print_level):
     self._print_options(print_level)
-    print "\nVariables:"
+    print("\nVariables:")
     self._print_variables()
 
   def do_set(self, args):
@@ -730,9 +736,10 @@ class ImpalaShell(object, cmd.Cmd):
         self._print_with_set(QueryOptionDisplayModes.ALL_OPTIONS)
         return CmdStatus.SUCCESS
       else:
-        print_to_stderr("Error: SET <option>=<value>")
-        print_to_stderr("       OR")
-        print_to_stderr("       SET VAR:<variable>=<value>")
+        set_err_msg = ("Error: SET <option>=<value>\n"
+                       "       OR\n"
+                       "       SET VAR:<variable>=<value>")
+        print(set_err_msg, file=sys.stderr)
         return CmdStatus.ERROR
     option_upper = tokens[0].upper()
     # Check if it's a variable
@@ -743,8 +750,8 @@ class ImpalaShell(object, cmd.Cmd):
       self._print_if_verbose('Variable %s set to %s' % (var_name, tokens[1]))
     elif not self._handle_shell_options(option_upper, tokens[1]):
       if option_upper not in self.imp_client.default_query_options.keys():
-        print "Unknown query option: %s" % (tokens[0])
-        print "Available query options, with their values (defaults shown in []):"
+        print("Unknown query option: %s" % (tokens[0]))
+        print("Available query options, with their values (defaults shown in []):")
         self._print_options(QueryOptionDisplayModes.REGULAR_OPTIONS_ONLY)
         return CmdStatus.ERROR
       if self.imp_client.query_option_levels[option_upper] == QueryOptionLevels.REMOVED:
@@ -756,24 +763,24 @@ class ImpalaShell(object, cmd.Cmd):
   def do_unset(self, args):
     """Unset a query option"""
     if len(args.split()) != 1:
-      print 'Usage: unset <option>'
+      print('Usage: unset <option>')
       return CmdStatus.ERROR
     option = args.upper()
     # Check if it's a variable
     var_name = get_var_name(option)
     if var_name is not None:
       if self.set_variables.get(var_name):
-        print 'Unsetting variable %s' % var_name
+        print('Unsetting variable %s' % var_name)
         del self.set_variables[var_name]
       else:
-        print "No variable called %s is set" % var_name
+        print("No variable called %s is set" % var_name)
     elif self.set_query_options.get(option):
-      print 'Unsetting option %s' % option
+      print('Unsetting option %s' % option)
       del self.set_query_options[option]
     elif self._handle_unset_shell_options(option):
-      print 'Unsetting shell option %s' % option
+      print('Unsetting shell option %s' % option)
     else:
-      print "No option called %s is set" % option
+      print("No option called %s is set" % option)
 
   def do_quit(self, args):
     """Quit the Impala shell"""
@@ -803,8 +810,8 @@ class ImpalaShell(object, cmd.Cmd):
     host_port = [val for val in tokens[0].split(':') if val.strip()]
     protocol = options.protocol.lower()
     if (':' in tokens[0] and len(host_port) != 2):
-      print_to_stderr("Connection string must either be empty, or of the form "
-                      "<hostname[:port]>")
+      print("Connection string must either be empty, or of the form "
+            "<hostname[:port]>", file=sys.stderr)
       return CmdStatus.ERROR
     elif len(host_port) == 1:
       if protocol == 'hs2':
@@ -814,7 +821,7 @@ class ImpalaShell(object, cmd.Cmd):
       elif protocol == 'beeswax':
         port = str(DEFAULT_BEESWAX_PORT)
       else:
-        print_to_stderr("Invalid protocol specified: %s" % protocol)
+        print("Invalid protocol specified: %s" % protocol, file=sys.stderr)
         raise FatalShellException()
       host_port.append(port)
     self.impalad = tuple(host_port)
@@ -828,14 +835,14 @@ class ImpalaShell(object, cmd.Cmd):
     if not self.imp_client.connected and not self.use_kerberos and protocol != 'hs2-http':
       try:
         if call(["klist", "-s"]) == 0:
-          print_to_stderr("Kerberos ticket found in the credentials cache, retrying "
-                          "the connection with a secure transport.")
+          print("Kerberos ticket found in the credentials cache, retrying "
+                "the connection with a secure transport.", file=sys.stderr)
           self.use_kerberos = True
           self.use_ldap = False
           self.ldap_password = None
           self.imp_client = self._new_impala_client()
           self._connect()
-      except OSError, e:
+      except OSError:
         pass
 
     if self.imp_client.connected:
@@ -852,10 +859,15 @@ class ImpalaShell(object, cmd.Cmd):
     # Use a temporary to avoid changing set_query_options during iteration.
     new_query_options = {}
     default_query_option_keys = set(self.imp_client.default_query_options)
-    for set_option, value in self.set_query_options.iteritems():
+    if sys.version_info.major < 3:
+      query_options_to_set = self.set_query_options.iteritems()
+    else:
+      query_options_to_set = self.set_query_options.items()
+
+    for set_option, value in query_options_to_set:
       if set_option not in default_query_option_keys:
-        print ('%s is not supported for the impalad being '
-               'connected to, ignoring.' % set_option)
+        print('%s is not supported for the impalad being '
+              'connected to, ignoring.' % set_option)
       else:
         new_query_options[set_option] = value
 
@@ -870,7 +882,7 @@ class ImpalaShell(object, cmd.Cmd):
   def _connect(self):
     try:
       self.server_version, self.webserver_address = self.imp_client.connect()
-    except MissingThriftMethodException, e:
+    except MissingThriftMethodException as e:
       if options.protocol.lower() == 'beeswax':
         port_flag = "-beeswax_port"
         addtl_suggestion = ""
@@ -884,38 +896,38 @@ class ImpalaShell(object, cmd.Cmd):
                             "protocol with the --protocol=beeswax.")
       # We get a TApplicationException if the transport is valid,
       # but the RPC does not exist.
-      print_to_stderr("\n".join(textwrap.wrap(
+      print("\n".join(textwrap.wrap(
         "Error: Unable to communicate with impalad service because of the error "
         "reported below. The service does not implement a required Thrift method. "
         "The service may not be an Impala Daemon or you may have specified the wrong "
         "port to connect to. Check host:port to ensure that the port matches the "
         "{port_flag} flag on the Impala Daemon and try again.{addtl_suggestion}".format(
-          port_flag=port_flag, addtl_suggestion=addtl_suggestion))))
-      print_to_stderr(str(e))
+          port_flag=port_flag, addtl_suggestion=addtl_suggestion))), file=sys.stderr)
+      print(str(e), file=sys.stderr)
       self.close_connection()
       raise
     except ImportError:
-      print_to_stderr("Unable to import the python 'ssl' module. It is"
-      " required for an SSL-secured connection.")
+      print("Unable to import the python 'ssl' module. It is"
+            " required for an SSL-secured connection.", file=sys.stderr)
       raise FatalShellException()
-    except socket.error, e:
+    except socket.error as e:
       # if the socket was interrupted, reconnect the connection with the client
       if e.errno == errno.EINTR:
         self._reconnect_cancellation()
       else:
-        print_to_stderr("Socket error %s: %s" % (e.errno, e))
+        print("Socket error %s: %s" % (e.errno, e), file=sys.stderr)
         self.close_connection()
         self.prompt = self.DISCONNECTED_PROMPT
-    except Exception, e:
+    except Exception as e:
       if self.ldap_password_cmd and \
           self.ldap_password and \
           self.ldap_password.endswith('\n'):
-        print_to_stderr("Warning: LDAP password contains a trailing newline. "
-                      "Did you use 'echo' instead of 'echo -n'?")
+        print("Warning: LDAP password contains a trailing newline. "
+              "Did you use 'echo' instead of 'echo -n'?", file=sys.stderr)
       if self.use_ssl and sys.version_info < (2,7,9) \
           and "EOF occurred in violation of protocol" in str(e):
-        print_to_stderr("Warning: TLSv1.2 is not supported for Python < 2.7.9")
-      print_to_stderr("Error connecting: %s, %s" % (type(e).__name__, e))
+        print("Warning: TLSv1.2 is not supported for Python < 2.7.9", file=sys.stderr)
+      print("Error connecting: %s, %s" % (type(e).__name__, e), file=sys.stderr)
       # A secure connection may still be open. So we explicitly close it.
       self.close_connection()
       # If a connection to another impalad failed while already connected
@@ -945,14 +957,19 @@ class ImpalaShell(object, cmd.Cmd):
       else:
         self.cmdqueue.append(use_current_db + ImpalaShell.CMD_DELIM)
 
-  def _print_if_verbose(self, message):
+  def _print_if_verbose(self, message, file_descriptor=sys.stderr, flush=True):
     if self.verbose:
-      print_to_stderr(message)
+      print(message, file=file_descriptor)
+
+    # print() takes a flush keyword argument in python3, but not in python2,
+    # so we do it as a second step
+    if flush:
+      file_descriptor.flush()
 
   def print_runtime_profile(self, profile, status=False):
     if self.show_profiles or status:
       if profile is not None:
-        print "Query Runtime Profile:\n" + profile
+        print("Query Runtime Profile:\n" + profile)
 
   def _parse_table_name_arg(self, arg):
     """ Parses an argument string and returns the result as a db name, table name combo.
@@ -996,10 +1013,10 @@ class ImpalaShell(object, cmd.Cmd):
   def do_profile(self, args):
     """Prints the runtime profile of the last DML statement or SELECT query executed."""
     if len(args) > 0:
-      print_to_stderr("'profile' does not accept any arguments")
+      print("'profile' does not accept any arguments", file=sys.stderr)
       return CmdStatus.ERROR
     elif self.last_query_handle is None:
-      print_to_stderr('No previous query available to profile')
+      print('No previous query available to profile', file=sys.stderr)
       return CmdStatus.ERROR
     profile = self.imp_client.get_runtime_profile(self.last_query_handle)
     return self.print_runtime_profile(profile, True)
@@ -1092,12 +1109,10 @@ class ImpalaShell(object, cmd.Cmd):
     other output that we want to display to users printed to the output.
     Results for queries are streamed from the server and displayed as they become
     available.
-
     is_dml: True iff the caller detects that 'query_str' is a DML statement, false
             otherwise.
     print_web_link: if True, a link to the query on the Impala debug webserver is printed.
     """
-
     self._print_if_verbose("Query: %s" % query_str)
     # TODO: Clean up this try block and refactor it (IMPALA-3814)
     try:
@@ -1173,41 +1188,42 @@ class ImpalaShell(object, cmd.Cmd):
       try:
         profile = self.imp_client.get_runtime_profile(self.last_query_handle)
         self.print_runtime_profile(profile)
-      except RPCException, e:
+      except RPCException as e:
         if self.show_profiles: raise e
       return CmdStatus.SUCCESS
-    except QueryCancelledByShellException, e:
+    except QueryCancelledByShellException as e:
       return CmdStatus.SUCCESS
-    except RPCException, e:
+    except RPCException as e:
       # could not complete the rpc successfully
-      print_to_stderr(e)
-    except QueryStateException, e:
+      print(e, file=sys.stderr)
+    except QueryStateException as e:
       # an exception occurred while executing the query
       if self.last_query_handle is not None:
         self.imp_client.close_query(self.last_query_handle)
-      print_to_stderr(e)
-    except DisconnectedException, e:
+      print(e, file=sys.stderr)
+    except DisconnectedException as e:
       # the client has lost the connection
-      print_to_stderr(e)
+      print(e, file=sys.stderr)
       self.imp_client.connected = False
       self.prompt = ImpalaShell.DISCONNECTED_PROMPT
-    except socket.error, (code, e):
+    except socket.error as e:
       # if the socket was interrupted, reconnect the connection with the client
-      if code == errno.EINTR:
-        print ImpalaShell.CANCELLATION_MESSAGE
+      if e.errno == errno.EINTR:
+        print(ImpalaShell.CANCELLATION_MESSAGE)
         self._reconnect_cancellation()
       else:
-        print_to_stderr("Socket error %s: %s" % (code, e))
+        print("Socket error %s: %s" % (e.errno, e), file=sys.stderr)
         self.prompt = self.DISCONNECTED_PROMPT
         self.imp_client.connected = False
-    except Exception, u:
+    except Exception as e:
       # if the exception is unknown, there was possibly an issue with the connection
       # set the shell as disconnected
-      print_to_stderr('Unknown Exception : %s' % (u,))
+      print('Unknown Exception : %s' % (e,), file=sys.stderr)
       self.close_connection()
       self.prompt = ImpalaShell.DISCONNECTED_PROMPT
     return CmdStatus.ERROR
 
+
   def construct_table_with_header(self, column_names):
     """ Constructs the table header for a given query handle.
 
@@ -1306,9 +1322,9 @@ class ImpalaShell(object, cmd.Cmd):
     if self.readline and self.readline.get_current_history_length() > 0:
       for index in xrange(1, self.readline.get_current_history_length() + 1):
         cmd = self.readline.get_history_item(index)
-        print_to_stderr('[%d]: %s' % (index, cmd))
+        print('[%d]: %s' % (index, cmd), file=sys.stderr)
     else:
-      print_to_stderr(READLINE_UNAVAILABLE_ERROR)
+      print(READLINE_UNAVAILABLE_ERROR, file=sys.stderr)
 
   def do_rerun(self, args):
     """Rerun a command with an command index in history
@@ -1319,26 +1335,26 @@ class ImpalaShell(object, cmd.Cmd):
     self.readline.remove_history_item(history_len - 1)
     history_len -= 1
     if not self.readline:
-      print_to_stderr(READLINE_UNAVAILABLE_ERROR)
+      print(READLINE_UNAVAILABLE_ERROR, file=sys.stderr)
       return CmdStatus.ERROR
     try:
       cmd_idx = int(args)
     except ValueError:
-      print_to_stderr("Command index to be rerun must be an integer.")
+      print("Command index to be rerun must be an integer.", file=sys.stderr)
       return CmdStatus.ERROR
     if not (0 < cmd_idx <= history_len or -history_len <= cmd_idx < 0):
-      print_to_stderr("Command index out of range. Valid range: [1, {0}] and [-{0}, -1]"
-                      .format(history_len))
+      print("Command index out of range. Valid range: [1, {0}] and [-{0}, -1]"
+                      .format(history_len), file=sys.stderr)
       return CmdStatus.ERROR
     if cmd_idx < 0:
       cmd_idx += history_len + 1
     cmd = self.readline.get_history_item(cmd_idx)
-    print_to_stderr("Rerunning " + cmd)
+    print("Rerunning " + cmd, file=sys.stderr)
     return self.onecmd(cmd.rstrip(";"))
 
   def do_tip(self, args):
     """Print a random tip"""
-    print_to_stderr(random.choice(TIPS))
+    print(random.choice(TIPS), file=sys.stderr)
 
   def do_src(self, args):
     return self.do_source(args)
@@ -1346,8 +1362,8 @@ class ImpalaShell(object, cmd.Cmd):
   def do_source(self, args):
     try:
       cmd_file = open(args, "r")
-    except Exception, e:
-      print_to_stderr("Error opening file '%s': %s" % (args, e))
+    except Exception as e:
+      print("Error opening file '%s': %s" % (args, e), file=sys.stderr)
       return CmdStatus.ERROR
     if self.execute_query_list(parse_query_text(cmd_file.read())):
       return CmdStatus.SUCCESS
@@ -1364,9 +1380,9 @@ class ImpalaShell(object, cmd.Cmd):
       try:
         self.readline.read_history_file(self.history_file)
         self._replace_history_delimiters(ImpalaShell.HISTORY_FILE_QUERY_DELIM, '\n')
-      except IOError, i:
+      except IOError as i:
         msg = "Unable to load command history (disabling history collection): %s" % i
-        print_to_stderr(msg)
+        print(msg, file=sys.stderr)
         # This history file exists but is not readable, disable readline.
         self._disable_readline()
 
@@ -1376,9 +1392,9 @@ class ImpalaShell(object, cmd.Cmd):
       try:
         self._replace_history_delimiters('\n', ImpalaShell.HISTORY_FILE_QUERY_DELIM)
         self.readline.write_history_file(self.history_file)
-      except IOError, i:
+      except IOError as i:
         msg = "Unable to save command history (disabling history collection): %s" % i
-        print_to_stderr(msg)
+        print(msg, file=sys.stderr)
         # The history file is not writable, disable readline.
         self._disable_readline()
 
@@ -1491,8 +1507,8 @@ class ImpalaShell(object, cmd.Cmd):
 
   def do_version(self, args):
     """Prints the Impala build version"""
-    print_to_stderr("Shell version: %s" % VERSION_STRING)
-    print_to_stderr("Server version: %s" % self.server_version)
+    print("Shell version: %s" % VERSION_STRING, file=sys.stderr)
+    print("Server version: %s" % self.server_version, file=sys.stderr)
 
   def completenames(self, text, *ignored):
     """Make tab completion of commands case agnostic
@@ -1508,12 +1524,12 @@ class ImpalaShell(object, cmd.Cmd):
 
   def execute_query_list(self, queries):
     if not self.imp_client.connected:
-      print_to_stderr('Not connected to Impala, could not execute queries.')
+      print('Not connected to Impala, could not execute queries.', file=sys.stderr)
       return False
     queries = [self.sanitise_input(q) for q in queries]
     for q in queries:
       if self.onecmd(q) is CmdStatus.ERROR:
-        print_to_stderr('Could not execute command: %s' % q)
+        print('Could not execute command: %s' % q, file=sys.stderr)
         if not self.ignore_query_failure: return False
     return True
 
@@ -1564,8 +1580,6 @@ Welcome to the Impala shell.
 """ \
   % (VERSION_STRING, _format_tip(random.choice(TIPS)))
 
-def print_to_stderr(message):
-  print >> sys.stderr, message
 
 def parse_query_text(query_text, utf8_encode_policy='strict'):
   """Parse query file text to extract queries and encode into utf-8"""
@@ -1595,8 +1609,8 @@ def parse_variables(keyvals):
     for keyval in keyvals:
       match = re.match(kv_pattern, keyval)
       if not match:
-        print_to_stderr('Error: Could not parse key-value "%s". ' % (keyval,) +
-                        'It must follow the pattern "KEY=VALUE".')
+        print('Error: Could not parse key-value "%s". ' % (keyval,) +
+              'It must follow the pattern "KEY=VALUE".', file=sys.stderr)
         parser.print_help()
         raise FatalShellException()
       else:
@@ -1604,18 +1618,18 @@ def parse_variables(keyvals):
   return vars
 
 
-def replace_variables(set_variables, string):
+def replace_variables(set_variables, input_string):
   """Replaces variable within the string with their corresponding values using the
      given set_variables."""
   errors = False
-  matches = set(map(lambda v: v.upper(), re.findall(r'(?<!\\)\${([^}]+)}', string)))
+  matches = set([v.upper() for v in re.findall(r'(?<!\\)\${([^}]+)}', input_string)])
   for name in matches:
     value = None
     # Check if syntax is correct
     var_name = get_var_name(name)
     if var_name is None:
-      print_to_stderr('Error: Unknown substitution syntax (%s). ' % (name,) +
-                      'Use ${VAR:var_name}.')
+      print('Error: Unknown substitution syntax (%s). ' % (name,) +
+            'Use ${VAR:var_name}.', file=sys.stderr)
       errors = True
     else:
       # Replaces variable value
@@ -1625,14 +1639,14 @@ def replace_variables(set_variables, string):
           errors = True
         else:
           regexp = re.compile(r'(?<!\\)\${%s}' % (name,), re.IGNORECASE)
-          string = regexp.sub(value, string)
+          input_string = regexp.sub(value, input_string)
       else:
-        print_to_stderr('Error: Unknown variable %s' % (var_name))
+        print('Error: Unknown variable %s' % (var_name), file=sys.stderr)
         errors = True
   if errors:
     return None
   else:
-    return string
+    return input_string
 
 
 def get_var_name(name):
@@ -1658,7 +1672,7 @@ def execute_queries_non_interactive_mode(options, query_options):
       else:
         query_file_handle = open(options.query_file, 'r')
     except Exception, e:
-      print_to_stderr("Could not open file '%s': %s" % (options.query_file, e))
+      print("Could not open file '%s': %s" % (options.query_file, e), file=sys.stderr)
       return False
 
     query_text = query_file_handle.read()
@@ -1709,10 +1723,11 @@ def impala_shell_main():
   if os.path.isfile(global_config):
     # Always output the source of the global config if verbose
     if options.verbose:
-      print_to_stderr(
-        "Loading in options from global config file: %s \n" % global_config)
+      print(
+        "Loading in options from global config file: %s \n" % global_config,
+        file=sys.stderr)
   elif global_config != impala_shell_defaults['global_config_default_path']:
-    print_to_stderr('%s not found.\n' % global_config)
+    print('%s not found.\n' % global_config, file=sys.stderr)
     raise FatalShellException()
   # Override the default user config by a custom config if necessary
   user_config = impala_shell_defaults.get("config_file")
@@ -1721,11 +1736,12 @@ def impala_shell_main():
   if input_config != user_config:
     if os.path.isfile(input_config):
       if options.verbose:
-        print_to_stderr("Loading in options from config file: %s \n" % input_config)
+        print("Loading in options from config file: %s \n" % input_config,
+              file=sys.stderr)
       # command line overrides loading ~/.impalarc
       user_config = input_config
     else:
-      print_to_stderr('%s not found.\n' % input_config)
+      print('%s not found.\n' % input_config, file=sys.stderr)
       raise FatalShellException()
   configs_to_load = [global_config, user_config]
 
@@ -1741,8 +1757,8 @@ def impala_shell_main():
       query_options.update(q_options)
 
     impala_shell_defaults.update(loaded_shell_options)
-  except Exception, e:
-    print_to_stderr(e)
+  except Exception as e:
+    print(e, file=sys.stderr)
     raise FatalShellException()
 
   parser = get_option_parser(impala_shell_defaults)
@@ -1750,54 +1766,56 @@ def impala_shell_main():
 
   # Arguments that could not be parsed are stored in args. Print an error and exit.
   if len(args) > 0:
-    print_to_stderr('Error, could not parse arguments "%s"' % (' ').join(args))
+    print('Error, could not parse arguments "%s"' % (' ').join(args), file=sys.stderr)
     parser.print_help()
     raise FatalShellException()
 
   if options.version:
-    print VERSION_STRING
+    print(VERSION_STRING)
     return
 
   if options.write_delimited:
     delim = options.output_delimiter.decode('string-escape')
     if len(delim) != 1:
-      print_to_stderr("Illegal delimiter %s, the delimiter "
-                      "must be a 1-character string." % delim)
+      print("Illegal delimiter %s, the delimiter "
+            "must be a 1-character string." % delim, file=sys.stderr)
       raise FatalShellException()
 
   if options.use_kerberos and options.use_ldap:
-    print_to_stderr("Please specify at most one authentication mechanism (-k or -l)")
+    print("Please specify at most one authentication mechanism (-k or -l)",
+          file=sys.stderr)
     raise FatalShellException()
 
   if not options.ssl and not options.creds_ok_in_clear and options.use_ldap:
-    print_to_stderr("LDAP credentials may not be sent over insecure " +
-                    "connections. Enable SSL or set --auth_creds_ok_in_clear")
+    print("LDAP credentials may not be sent over insecure " +
+          "connections. Enable SSL or set --auth_creds_ok_in_clear",
+          file=sys.stderr)
     raise FatalShellException()
 
   if not options.use_ldap and options.ldap_password_cmd:
-    print_to_stderr("Option --ldap_password_cmd requires using LDAP authentication " +
-                    "mechanism (-l)")
+    print("Option --ldap_password_cmd requires using LDAP authentication " +
+          "mechanism (-l)", file=sys.stderr)
     raise FatalShellException()
 
   if options.use_kerberos:
     if options.verbose:
-      print_to_stderr("Starting Impala Shell using Kerberos authentication")
-      print_to_stderr("Using service name '%s'" % options.kerberos_service_name)
+      print("Starting Impala Shell using Kerberos authentication", file=sys.stderr)
+      print("Using service name '%s'" % options.kerberos_service_name, file=sys.stderr)
     # Check if the user has a ticket in the credentials cache
     try:
       if call(['klist', '-s']) != 0:
-        print_to_stderr(("-k requires a valid kerberos ticket but no valid kerberos "
-                         "ticket found."))
+        print(("-k requires a valid kerberos ticket but no valid kerberos "
+               "ticket found."), file=sys.stderr)
         raise FatalShellException()
-    except OSError, e:
-      print_to_stderr('klist not found on the system, install kerberos clients')
+    except OSError as e:
+      print('klist not found on the system, install kerberos clients', file=sys.stderr)
       raise FatalShellException()
   elif options.use_ldap:
     if options.verbose:
-      print_to_stderr("Starting Impala Shell using LDAP-based authentication")
+      print("Starting Impala Shell using LDAP-based authentication", file=sys.stderr)
   else:
     if options.verbose:
-      print_to_stderr("Starting Impala Shell without Kerberos authentication")
+      print("Starting Impala Shell without Kerberos authentication", file=sys.stderr)
 
   options.ldap_password = None
   if options.use_ldap and options.ldap_password_cmd:
@@ -1806,30 +1824,30 @@ def impala_shell_main():
                            stderr=subprocess.PIPE)
       options.ldap_password, stderr = p.communicate()
       if p.returncode != 0:
-        print_to_stderr("Error retrieving LDAP password (command was '%s', error was: "
-                        "'%s')" % (options.ldap_password_cmd, stderr.strip()))
+        print("Error retrieving LDAP password (command was '%s', error was: "
+              "'%s')" % (options.ldap_password_cmd, stderr.strip()), file=sys.stderr)
         raise FatalShellException()
-    except Exception, e:
-      print_to_stderr("Error retrieving LDAP password (command was: '%s', exception "
-                      "was: '%s')" % (options.ldap_password_cmd, e))
+    except Exception as e:
+      print("Error retrieving LDAP password (command was: '%s', exception "
+            "was: '%s')" % (options.ldap_password_cmd, e), file=sys.stderr)
       raise FatalShellException()
 
   if options.ssl:
     if options.ca_cert is None:
       if options.verbose:
-        print_to_stderr("SSL is enabled. Impala server certificates will NOT be verified"\
-                        " (set --ca_cert to change)")
+        print("SSL is enabled. Impala server certificates will NOT be verified "
+              "(set --ca_cert to change)", file=sys.stderr)
     else:
       if options.verbose:
-        print_to_stderr("SSL is enabled")
+        print("SSL is enabled", file=sys.stderr)
 
   if options.output_file:
     try:
       # Make sure the given file can be opened for writing. This will also clear the file
       # if successful.
       open(options.output_file, 'wb')
-    except IOError, e:
-      print_to_stderr('Error opening output file for writing: %s' % e)
+    except IOError as e:
+      print('Error opening output file for writing: %s' % e, file=sys.stderr)
       raise FatalShellException()
 
   options.variables = parse_variables(options.keyval)
@@ -1842,10 +1860,11 @@ def impala_shell_main():
   if options.query or options.query_file:
     # Impala shell will disable live_progress if non-interactive mode is detected.
     options.live_progress = False
-    print_to_stderr("Warning: live_progress only applies to interactive shell sessions, "
-                    "and is being skipped for now.")
+    print("Warning: live_progress only applies to interactive shell sessions, "
+          "and is being skipped for now.", file=sys.stderr)
     if options.live_summary:
-      print_to_stderr("Error: live_summary is available for interactive mode only.")
+      print("Error: live_summary is available for interactive mode only.",
+            file=sys.stderr)
       raise FatalShellException()
 
     if execute_queries_non_interactive_mode(options, query_options):
@@ -1861,31 +1880,31 @@ def impala_shell_main():
         try:
           shell.cmdloop(intro)
         except KeyboardInterrupt:
-          print_to_stderr('^C')
+          print('^C', file=sys.stderr)
         # A last measure against any exceptions thrown by an rpc
         # not caught in the shell
-        except socket.error, (code, e):
+        except socket.error as e:
           # if the socket was interrupted, reconnect the connection with the client
-          if code == errno.EINTR:
-            print shell.CANCELLATION_MESSAGE
+          if e.errno == errno.EINTR:
+            print(shell.CANCELLATION_MESSAGE)
             shell._reconnect_cancellation()
           else:
-            print_to_stderr("Socket error %s: %s" % (code, e))
+            print("Socket error %s: %s" % (e.errno, e), file=sys.stderr)
             shell.imp_client.connected = False
             shell.prompt = shell.DISCONNECTED_PROMPT
-        except DisconnectedException, e:
+        except DisconnectedException as e:
           # the client has lost the connection
-          print_to_stderr(e)
+          print(e, file=sys.stderr)
           shell.imp_client.connected = False
           shell.prompt = shell.DISCONNECTED_PROMPT
-        except QueryStateException, e:
+        except QueryStateException as e:
           # an exception occurred while executing the query
           shell.imp_client.close_query(shell.last_query_handle)
-          print_to_stderr(e)
-        except RPCException, e:
+          print(e, file=sys.stderr)
+        except RPCException as e:
           # could not complete the rpc successfully
-          print_to_stderr(e)
-        except IOError, e:
+          print(e, file=sys.stderr)
+        except IOError as e:
           # Interrupted system calls (e.g. because of cancellation) should be ignored.
           if e.errno != errno.EINTR: raise
       finally:
diff --git a/shell/option_parser.py b/shell/option_parser.py
index 3573993..808c97a 100755
--- a/shell/option_parser.py
+++ b/shell/option_parser.py
@@ -26,6 +26,7 @@
 # [impala.query_options]
 # EXPLAIN_LEVEL=2
 # MT_DOP=2
+from __future__ import print_function
 
 import ConfigParser
 import sys
@@ -78,8 +79,11 @@ def parse_shell_options(options, defaults, option_list):
   for option, value in options:
     opt = option_dests.get(option)
     if opt is None:
-        print >> sys.stderr, "WARNING: Unable to read configuration file correctly. " \
-          "Ignoring unrecognized config option: '%s'\n" % option
+      warn_msg = (
+        "WARNING: Unable to read configuration file correctly. "
+        "Ignoring unrecognized config option: '%s'" % option
+      )
+      print('\n{0}'.format(warn_msg), file=sys.stderr)
     elif isinstance(defaults.get(option), bool) or \
         opt.action == "store_true" or opt.action == "store_false":
       result[option] = parse_bool_option(value)
@@ -120,7 +124,7 @@ def get_config_from_file(config_filename, option_list):
   config.optionxform = str
   try:
     config.read(config_filename)
-  except Exception, e:
+  except Exception as e:
     raise ConfigFileFormatError(
       "Unable to read configuration file correctly. Check formatting: %s" % e)
 
@@ -129,13 +133,14 @@ def get_config_from_file(config_filename, option_list):
     shell_options = parse_shell_options(config.items("impala"), impala_shell_defaults,
                                         option_list)
     if "config_file" in shell_options:
-      print >> sys.stderr, "WARNING: Option 'config_file' can be only set from shell."
+      warn_msg = "WARNING: Option 'config_file' can be only set from shell."
+      print('\n{0}'.format(warn_msg), file=sys.stderr)
       shell_options["config_file"] = config_filename
 
   config = ConfigParser.ConfigParser()
   try:
     config.read(config_filename)
-  except Exception, e:
+  except Exception as e:
     raise ConfigFileFormatError(
       "Unable to read configuration file correctly. Check formatting: %s" % e)
 
diff --git a/shell/pkg_resources.py b/shell/pkg_resources.py
index aca743a..977de19 100644
--- a/shell/pkg_resources.py
+++ b/shell/pkg_resources.py
@@ -623,7 +623,7 @@ class WorkingSet(object):
                 try:
                     resolvees = shadow_set.resolve(req, env, installer)
 
-                except ResolutionError,v:
+                except ResolutionError as v:
                     error_info[dist] = v    # save error info
                     if fallback:
                         continue    # try the next older version of project
diff --git a/shell/shell_output.py b/shell/shell_output.py
index 190e8bc..75ead3b 100644
--- a/shell/shell_output.py
+++ b/shell/shell_output.py
@@ -16,6 +16,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from __future__ import print_function
 
 import csv
 import re
@@ -38,13 +39,14 @@ class PrettyOutputFormatter(object):
       # output, since Python won't do the encoding automatically when outputting to a
       # non-terminal (see IMPALA-2717).
       return self.prettytable.get_string().encode('utf-8')
-    except Exception, e:
+    except Exception as e:
       # beeswax returns each row as a tab separated string. If a string column
       # value in a row has tabs, it will break the row split. Default to displaying
       # raw results. This will change with a move to hiveserver2. Reference: IMPALA-116
       error_msg = ("Prettytable cannot resolve string columns values that have "
-                   " embedded tabs. Reverting to tab delimited text output")
-      print >>sys.stderr, error_msg
+                   "embedded tabs. Reverting to tab delimited text output")
+      print(error_msg, file=sys.stderr)
+      print('{0}: {1}'.format(type(e), str(e)), file=sys.stderr)
       return '\n'.join(['\t'.join(row) for row in rows])
 
 
@@ -81,12 +83,13 @@ class OutputStream(object):
     if self.filename:
       try:
         self.handle = open(self.filename, 'ab')
-      except IOError, err:
-        print >>sys.stderr, "Error opening file %s: %s" % (self.filename, str(err))
-        print >>sys.stderr, "Writing to stdout"
+      except IOError as err:
+        print("Error opening file %s: %s" % (self.filename, str(err)),
+              file=self.handle)
+        print(sys.stderr, "Writing to stdout", file=self.handle)
 
   def write(self, data):
-    print >>self.handle, self.formatter.format(data)
+    print(self.formatter.format(data), file=self.handle)
     self.handle.flush()
 
   def __del__(self):