You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/08/08 07:26:02 UTC

[1/4] impala git commit: IMPALA-7311. Allow INSERT on writable partitions even if some other partition is READ_ONLY

Repository: impala
Updated Branches:
  refs/heads/master 96e552eb2 -> bf24a814c


IMPALA-7311. Allow INSERT on writable partitions even if some other partition is READ_ONLY

This changes the permissions-checking of INSERT so that, if a partition
is specified, we only verify writability of the specific explicit
partition. This allows insertion into a table even if it contains one or
more read-only partitions. This matches the existing behavior of LOAD
DATA.

New regression tests are added which failed prior to the fix.

Change-Id: I1dd81100ae73fcabdbfaf679c20cea7dc102cd13
Reviewed-on: http://gerrit.cloudera.org:8080/10974
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Tianyi Wang <tw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/35bce6b1
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/35bce6b1
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/35bce6b1

Branch: refs/heads/master
Commit: 35bce6b12bc3dc972b7b2a718e66895425a34e1f
Parents: 96e552e
Author: Todd Lipcon <to...@cloudera.com>
Authored: Tue Jul 17 17:51:14 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 7 23:40:11 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/analysis/InsertStmt.java  |  28 ++++-
 .../apache/impala/analysis/LoadDataStmt.java    |  26 +----
 .../org/apache/impala/catalog/FeFsTable.java    |  55 ++++++++-
 .../org/apache/impala/catalog/HdfsTable.java    |  43 ++++---
 .../impala/catalog/local/LocalFsTable.java      |   2 +-
 .../impala/analysis/AnalyzeStmtsTest.java       |   4 +-
 tests/metadata/test_hdfs_permissions.py         |  12 +-
 tests/query_test/test_insert_behaviour.py       | 112 +++++++++++++++++++
 8 files changed, 221 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/35bce6b1/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 9998b0a..368ab2f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -378,6 +378,10 @@ public class InsertStmt extends StatementBase {
       }
     }
 
+    // Check that we can write to the target table/partition. This must be
+    // done after the partition expression has been analyzed above.
+    analyzeWriteAccess();
+
     // Populate partitionKeyExprs from partitionKeyValues and selectExprTargetColumns
     prepareExpressions(selectExprTargetColumns, selectListExprs, table_, analyzer);
 
@@ -476,11 +480,6 @@ public class InsertStmt extends StatementBase {
 
     if (table_ instanceof FeFsTable) {
       FeFsTable fsTable = (FeFsTable) table_;
-      if (!fsTable.hasWriteAccess()) {
-        throw new AnalysisException(String.format("Unable to INSERT into target table " +
-            "(%s) because Impala does not have WRITE access to at least one HDFS path" +
-            ": %s", targetTableName_, fsTable.getFirstLocationWithoutWriteAccess()));
-      }
       StringBuilder error = new StringBuilder();
       fsTable.parseSkipHeaderLineCount(error);
       if (error.length() > 0) throw new AnalysisException(error.toString());
@@ -523,6 +522,25 @@ public class InsertStmt extends StatementBase {
     }
   }
 
+  private void analyzeWriteAccess() throws AnalysisException {
+    if (!(table_ instanceof FeFsTable)) return;
+    FeFsTable fsTable = (FeFsTable) table_;
+
+    FeFsTable.Utils.checkWriteAccess(fsTable,
+        hasStaticPartitionTarget() ? partitionKeyValues_ : null, "INSERT");
+  }
+
+  private boolean hasStaticPartitionTarget() {
+    if (partitionKeyValues_ == null) return false;
+
+    // If the partition target is fully static, then check for write access against
+    // the specific partition. Otherwise, check the whole table.
+    for (PartitionKeyValue pkv : partitionKeyValues_) {
+      if (pkv.isDynamic()) return false;
+    }
+    return true;
+  }
+
   /**
    * Checks that the column permutation + select list + static partition exprs + dynamic
    * partition exprs collectively cover exactly all required columns in the target table,

http://git-wip-us.apache.org/repos/asf/impala/blob/35bce6b1/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
index 6732f1f..128973f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
@@ -28,17 +28,13 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.impala.authorization.Privilege;
-import org.apache.impala.catalog.FeCatalogUtils;
-import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.thrift.TLoadDataReq;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.FsPermissionChecker;
-import org.apache.impala.util.TAccessLevelUtil;
 
 import com.google.common.base.Preconditions;
 
@@ -200,25 +196,9 @@ public class LoadDataStmt extends StatementBase {
         }
       }
 
-      String noWriteAccessErrorMsg = String.format("Unable to LOAD DATA into " +
-          "target table (%s) because Impala does not have WRITE access to HDFS " +
-          "location: ", table.getFullName());
-
-      if (partitionSpec_ != null) {
-        long partId = HdfsTable.getPartition(table,
-            partitionSpec_.getPartitionSpecKeyValues()).getId();
-        FeFsPartition partition = FeCatalogUtils.loadPartition(table, partId);
-        String location = partition.getLocation();
-        if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
-          throw new AnalysisException(noWriteAccessErrorMsg + location);
-        }
-      } else {
-        // No specific partition specified, so we need to check write access
-        // on the table as a whole.
-        if (!table.hasWriteAccess()) {
-          throw new AnalysisException(noWriteAccessErrorMsg + table.getLocation());
-        }
-      }
+      FeFsTable.Utils.checkWriteAccess(table,
+          partitionSpec_ != null ? partitionSpec_.getPartitionSpecKeyValues() : null,
+          "LOAD DATA");
     } catch (FileNotFoundException e) {
       throw new AnalysisException("File not found: " + e.getMessage(), e);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/impala/blob/35bce6b1/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index fbb5267..e9ef929 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -28,7 +28,9 @@ import java.util.TreeMap;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
+import org.apache.impala.analysis.PartitionKeyValue;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TColumn;
@@ -39,6 +41,7 @@ import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TResultSetMetadata;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.ListMap;
+import org.apache.impala.util.TAccessLevelUtil;
 import org.apache.impala.util.TResultRowBuilder;
 
 import com.google.common.base.Preconditions;
@@ -111,9 +114,11 @@ public interface FeFsTable extends FeTable {
   public Set<HdfsFileFormat> getFileFormats();
 
   /**
-   * Return true if the table may be written to.
+   * Return true if the table's base directory may be written to, in order to
+   * create new partitions, or insert into the default partition in the case of
+   * an unpartitioned table.
    */
-  public boolean hasWriteAccess();
+  public boolean hasWriteAccessToBaseDir();
 
   /**
    * Return some location found without write access for this table, useful
@@ -423,5 +428,51 @@ public interface FeFsTable extends FeTable {
       }
       return null;
     }
+
+    /**
+     * Check that the Impala user has write access to the given target table.
+     * If 'partitionKeyValues' is null, the user should have write access to all
+     * partitions (or to the table directory itself in the case of unpartitioned
+     * tables). Otherwise, the user only needs write access to the specific partition.
+     *
+     * @throws AnalysisException if write access is not available
+     */
+    public static void checkWriteAccess(FeFsTable table,
+        List<PartitionKeyValue> partitionKeyValues,
+        String operationType) throws AnalysisException {
+      String noWriteAccessErrorMsg = String.format("Unable to %s into " +
+          "target table (%s) because Impala does not have WRITE access to HDFS " +
+          "location: ", operationType, table.getFullName());
+
+      PrunablePartition existingTargetPartition = null;
+      if (partitionKeyValues != null) {
+        existingTargetPartition = HdfsTable.getPartition(table, partitionKeyValues);
+        // This could be null in the case that we are writing to a specific partition that
+        // has not been created yet.
+      }
+
+      if (existingTargetPartition != null) {
+        FeFsPartition partition = FeCatalogUtils.loadPartition(table,
+            existingTargetPartition.getId());
+        String location = partition.getLocation();
+        if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
+          throw new AnalysisException(noWriteAccessErrorMsg + location);
+        }
+      } else if (partitionKeyValues != null) {
+        // Writing into a table with a specific partition specified which doesn't
+        // exist yet. In this case, we need write access to the top-level
+        // table location in order to create a new partition.
+        if (!table.hasWriteAccessToBaseDir()) {
+          throw new AnalysisException(noWriteAccessErrorMsg + table.getHdfsBaseDir());
+        }
+      } else {
+        // No explicit partition was specified. Need to ensure that write access is
+        // available to all partitions as well as the base dir.
+        String badPath = table.getFirstLocationWithoutWriteAccess();
+        if (badPath != null) {
+          throw new AnalysisException(noWriteAccessErrorMsg + badPath);
+        }
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/35bce6b1/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 2b7967f..38d1695 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -623,32 +623,26 @@ public class HdfsTable extends Table implements FeFsTable {
 
   List<FieldSchema> getNonPartitionFieldSchemas() { return nonPartFieldSchemas_; }
 
-  // True if Impala has HDFS write permissions on the hdfsBaseDir (for an unpartitioned
-  // table) or if Impala has write permissions on all partition directories (for
-  // a partitioned table).
+  // True if Impala has HDFS write permissions on the hdfsBaseDir
   @Override
-  public boolean hasWriteAccess() {
+  public boolean hasWriteAccessToBaseDir() {
     return TAccessLevelUtil.impliesWriteAccess(accessLevel_);
   }
 
   /**
    * Returns the first location (HDFS path) that Impala does not have WRITE access
    * to, or an null if none is found. For an unpartitioned table, this just
-   * checks the hdfsBaseDir. For a partitioned table it checks all partition directories.
+   * checks the hdfsBaseDir. For a partitioned table it checks the base directory
+   * as well as all partition directories.
    */
   @Override
   public String getFirstLocationWithoutWriteAccess() {
-    if (getMetaStoreTable() == null) return null;
-
-    if (getMetaStoreTable().getPartitionKeysSize() == 0) {
-      if (!TAccessLevelUtil.impliesWriteAccess(accessLevel_)) {
-        return hdfsBaseDir_;
-      }
-    } else {
-      for (HdfsPartition partition: partitionMap_.values()) {
-        if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
-          return partition.getLocation();
-        }
+    if (!hasWriteAccessToBaseDir()) {
+      return hdfsBaseDir_;
+    }
+    for (HdfsPartition partition: partitionMap_.values()) {
+      if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
+        return partition.getLocation();
       }
     }
     return null;
@@ -666,6 +660,8 @@ public class HdfsTable extends Table implements FeFsTable {
       List<PartitionKeyValue> partitionSpec) {
     List<TPartitionKeyValue> partitionKeyValues = Lists.newArrayList();
     for (PartitionKeyValue kv: partitionSpec) {
+      Preconditions.checkArgument(kv.isStatic(), "unexpected dynamic partition: %s",
+          kv);
       String value = PartitionKeyValue.getPartitionKeyValueString(
           kv.getLiteralValue(), table.getNullPartitionKeyValue());
       partitionKeyValues.add(new TPartitionKeyValue(kv.getColName(), value));
@@ -770,19 +766,20 @@ public class HdfsTable extends Table implements FeFsTable {
     // partitions.
     HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
 
+    Path tblLocation = FileSystemUtil.createFullyQualifiedPath(getHdfsBaseDirPath());
+    FileSystem fs = tblLocation.getFileSystem(CONF);
+    accessLevel_ = getAvailableAccessLevel(fs, tblLocation);
+
     if (msTbl.getPartitionKeysSize() == 0) {
       Preconditions.checkArgument(msPartitions == null || msPartitions.isEmpty());
       // This table has no partition key, which means it has no declared partitions.
       // We model partitions slightly differently to Hive - every file must exist in a
       // partition, so add a single partition with no keys which will get all the
       // files in the table's root directory.
-      Path tblLocation = FileSystemUtil.createFullyQualifiedPath(getHdfsBaseDirPath());
       HdfsPartition part = createPartition(msTbl.getSd(), null);
       partsByPath.put(tblLocation, Lists.newArrayList(part));
       if (isMarkedCached_) part.markCached();
       addPartition(part);
-      FileSystem fs = tblLocation.getFileSystem(CONF);
-      accessLevel_ = getAvailableAccessLevel(fs, tblLocation);
     } else {
       for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) {
         HdfsPartition partition = createPartition(msPartition.getSd(), msPartition);
@@ -1275,11 +1272,9 @@ public class HdfsTable extends Table implements FeFsTable {
     Preconditions.checkNotNull(msTbl);
     hdfsBaseDir_ = msTbl.getSd().getLocation();
     isMarkedCached_ = HdfsCachingUtil.validateCacheParams(msTbl.getParameters());
-    if (msTbl.getPartitionKeysSize() == 0) {
-      Path location = new Path(hdfsBaseDir_);
-      FileSystem fs = location.getFileSystem(CONF);
-      accessLevel_ = getAvailableAccessLevel(fs, location);
-    }
+    Path location = new Path(hdfsBaseDir_);
+    FileSystem fs = location.getFileSystem(CONF);
+    accessLevel_ = getAvailableAccessLevel(fs, location);
     setMetaStoreTable(msTbl);
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/35bce6b1/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 86dce6b..7743949 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -249,7 +249,7 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   }
 
   @Override
-  public boolean hasWriteAccess() {
+  public boolean hasWriteAccessToBaseDir() {
     // TODO(todd): implement me properly
     return true;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/35bce6b1/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 6d2b100..5249826 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -3200,8 +3200,8 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     AnalysisError("insert into functional_seq.alltypes partition(year, month)" +
         "select * from functional.alltypes",
         "Unable to INSERT into target table (functional_seq.alltypes) because Impala " +
-        "does not have WRITE access to at least one HDFS path: " +
-        "hdfs://localhost:20500/test-warehouse/alltypes_seq/year=2009/month=");
+        "does not have WRITE access to HDFS location: " +
+        "hdfs://localhost:20500/test-warehouse/alltypes_seq");
 
     // Insert with a correlated inline view.
     AnalyzesOk("insert into table functional.alltypessmall " +

http://git-wip-us.apache.org/repos/asf/impala/blob/35bce6b1/tests/metadata/test_hdfs_permissions.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_hdfs_permissions.py b/tests/metadata/test_hdfs_permissions.py
index af5a41c..a49b56e 100644
--- a/tests/metadata/test_hdfs_permissions.py
+++ b/tests/metadata/test_hdfs_permissions.py
@@ -21,6 +21,7 @@ from tests.common.test_dimensions import (
     create_single_exec_option_dimension,
     create_uncompressed_text_dimension)
 from tests.util.filesystem_utils import IS_ISILON, WAREHOUSE
+import re
 
 TEST_TBL = 'read_only_tbl'
 TBL_LOC = '%s/%s' % (WAREHOUSE, TEST_TBL)
@@ -64,8 +65,8 @@ class TestHdfsPermissions(ImpalaTestSuite):
       self.client.execute('insert into table %s select 1' % TEST_TBL)
       assert False, 'Expected INSERT INTO read-only table to fail'
     except Exception, e:
-      assert 'does not have WRITE access to at least one HDFS path: hdfs:' in str(e)
-
+      assert re.search('does not have WRITE access to HDFS location: .*/read_only_tbl',
+                       str(e))
     # Should still be able to query this table without any errors.
     assert self.execute_scalar('select count(*) from %s' % TEST_TBL) == "0"
 
@@ -80,8 +81,11 @@ class TestHdfsPermissions(ImpalaTestSuite):
 
     # Verify with a partitioned table
     try:
-      self.client.execute('insert into table functional_seq.alltypes '\
+      self.client.execute(
+          'insert into table functional_seq.alltypes '
           'partition(year, month) select * from functional.alltypes limit 0')
       assert False, 'Expected INSERT INTO read-only partition to fail'
     except Exception, e:
-      assert 'does not have WRITE access to at least one HDFS path: hdfs:' in str(e)
+      assert re.search(
+          'does not have WRITE access to HDFS location: .*/alltypes_seq',
+          str(e))

http://git-wip-us.apache.org/repos/asf/impala/blob/35bce6b1/tests/query_test/test_insert_behaviour.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_behaviour.py b/tests/query_test/test_insert_behaviour.py
index e70fdee..e42d33a 100644
--- a/tests/query_test/test_insert_behaviour.py
+++ b/tests/query_test/test_insert_behaviour.py
@@ -19,6 +19,7 @@ import getpass
 import grp
 import pwd
 import pytest
+import re
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.parametrize import UniqueDatabase
@@ -245,6 +246,117 @@ class TestInsertBehaviour(ImpalaTestSuite):
   @SkipIfS3.hdfs_acls
   @SkipIfADLS.hdfs_acls
   @SkipIfIsilon.hdfs_acls
+  def test_mixed_partition_permissions(self, unique_database):
+    """
+    Test that INSERT and LOAD DATA into explicit partitions is allowed even
+    if some partitions exist without appropriate permissions.
+    """
+    table = "`{0}`.`insert_partition_perms`".format(unique_database)
+    table_path = "test-warehouse/{0}.db/insert_partition_perms".format(unique_database)
+
+    def partition_path(partition):
+      return "%s/p=%s" % (table_path, partition)
+
+    def insert_query(partition):
+      return "INSERT INTO {0} PARTITION (p='{1}') VALUES(1)".format(table, partition)
+
+    def load_data(query_fn, partition):
+      path = "tmp/{0}/data_file".format(unique_database)
+      self.hdfs_client.delete_file_dir(path)
+      self.hdfs_client.create_file(path, "1")
+      q = "LOAD DATA INPATH '/{0}' INTO TABLE {1} PARTITION (p='{2}')".format(
+          path, table, partition)
+      return query_fn(self.client, q)
+
+    insert_dynamic_partition = \
+        "INSERT INTO {0} (col, p) SELECT col, p from {0}".format(table)
+
+    insert_dynamic_partition_2 = \
+        "INSERT INTO {0} PARTITION(p) SELECT col, p from {0}".format(table)
+
+    # TODO(todd): REFRESH does not seem to refresh the permissions of existing
+    # partitions -- need to fully reload the metadata to do so.
+    invalidate_query = "INVALIDATE METADATA {0}".format(table)
+
+    self.execute_query_expect_success(self.client,
+                                      "DROP TABLE IF EXISTS {0}".format(table))
+    self.execute_query_expect_success(
+        self.client, "CREATE TABLE {0} (col int) PARTITIONED BY (p string)".format(table))
+
+    # Check that a simple insert works
+    self.execute_query_expect_success(self.client, insert_query("initial_part"))
+
+    # Add a partition with no write permissions
+    self.execute_query_expect_success(
+        self.client,
+        "ALTER TABLE {0} ADD PARTITION (p=\"no_write\")".format(table))
+    self.hdfs_client.chown(partition_path("no_write"), "another_user", "another_group")
+    self.execute_query_expect_success(self.client, invalidate_query)
+
+    # Check that inserts into the new partition do not work.
+    err = self.execute_query_expect_failure(self.client, insert_query("no_write"))
+    assert re.search(r'Impala does not have WRITE access.*p=no_write', str(err))
+
+    err = load_data(self.execute_query_expect_failure, "no_write")
+    assert re.search(r'Impala does not have WRITE access.*p=no_write', str(err))
+
+    # Inserts into the existing partition should still work
+    self.execute_query_expect_success(self.client, insert_query("initial_part"))
+    load_data(self.execute_query_expect_success, "initial_part")
+
+    # Inserts into newly created partitions should work.
+    self.execute_query_expect_success(self.client, insert_query("added_part"))
+
+    # TODO(IMPALA-7313): loading data cannot currently add partitions, so this
+    # is commented out. If that behavior is changed, uncomment this.
+    # load_data(self.execute_query_expect_success, "added_part_load")
+
+    # Inserts into dynamic partitions should fail.
+    err = self.execute_query_expect_failure(self.client, insert_dynamic_partition)
+    assert re.search(r'Impala does not have WRITE access.*p=no_write', str(err))
+    err = self.execute_query_expect_failure(self.client, insert_dynamic_partition_2)
+    assert re.search(r'Impala does not have WRITE access.*p=no_write', str(err))
+
+    # If we make the top-level table directory read-only, we should still be able to
+    # insert into existing writable partitions.
+    self.hdfs_client.chown(table_path, "another_user", "another_group")
+    self.execute_query_expect_success(self.client, insert_query("added_part"))
+    load_data(self.execute_query_expect_success, "added_part")
+
+  @SkipIfS3.hdfs_acls
+  @SkipIfADLS.hdfs_acls
+  @SkipIfIsilon.hdfs_acls
+  def test_readonly_table_dir(self, unique_database):
+    """
+    Test that, if a partitioned table has a read-only base directory,
+    the frontend rejects queries that may attempt to create new
+    partitions.
+    """
+    table = "`{0}`.`insert_partition_perms`".format(unique_database)
+    table_path = "test-warehouse/{0}.db/insert_partition_perms".format(unique_database)
+    insert_dynamic_partition = \
+        "INSERT INTO {0} (col, p) SELECT col, p from {0}".format(table)
+
+    def insert_query(partition):
+      return "INSERT INTO {0} PARTITION (p='{1}') VALUES(1)".format(table, partition)
+
+    self.execute_query_expect_success(self.client,
+                                      "DROP TABLE IF EXISTS {0}".format(table))
+    self.execute_query_expect_success(
+        self.client, "CREATE TABLE {0} (col int) PARTITIONED BY (p string)".format(table))
+
+    # If we make the top-level table directory read-only, we should not be able
+    # to create new partitions, either explicitly, or dynamically.
+    self.hdfs_client.chown(table_path, "another_user", "another_group")
+    self.execute_query_expect_success(self.client, "INVALIDATE METADATA %s" % table)
+    err = self.execute_query_expect_failure(self.client, insert_dynamic_partition)
+    assert re.search(r'Impala does not have WRITE access.*' + table_path, str(err))
+    err = self.execute_query_expect_failure(self.client, insert_query("some_new_part"))
+    assert re.search(r'Impala does not have WRITE access.*' + table_path, str(err))
+
+  @SkipIfS3.hdfs_acls
+  @SkipIfADLS.hdfs_acls
+  @SkipIfIsilon.hdfs_acls
   def test_insert_acl_permissions(self, unique_database):
     """Test that INSERT correctly respects ACLs"""
     table = "`{0}`.`insert_acl_permissions`".format(unique_database)


[4/4] impala git commit: IMPALA-6335. Allow most shell tests to run in parallel

Posted by kw...@apache.org.
IMPALA-6335. Allow most shell tests to run in parallel

This adds an IMPALA_HISTFILE environment variable (and --history_file
argument) to the shell which overrides the default location of
~/.impalahistory for the shell history. The shell tests now override
this variable to /dev/null so they don't store history. The tests that
need history use a pytest fixture to use a temporary file for their
history. This allows so that they can run in parallel without stomping
on each other's history.

This also fixes a couple flaky test which were previously missing the
"execute_serially" annotation -- that annotation is no longer needed
after this fix.

A couple of the tests still need to be executed serially because they
look at metrics such as the number of executed or running queries, and
those metrics are unstable if other tests run in parallel.

I tested this by running:

  ./bin/impala-py.test tests/shell/test_shell_interactive.py \
    -m 'not execute_serially' \
    -n 80 \
    --random

... several times in a row on an 88-core box. Prior to the change,
several would fail each time. Now they pass.

Change-Id: I1da5739276e63a50590dfcb2b050703f8e35fec7
Reviewed-on: http://gerrit.cloudera.org:8080/11045
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/bf24a814
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/bf24a814
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/bf24a814

Branch: refs/heads/master
Commit: bf24a814ccf568fd0f165f05c90488ebc4c2db47
Parents: 2d6a459
Author: Todd Lipcon <to...@cloudera.com>
Authored: Tue Jul 24 18:14:43 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Aug 8 03:39:39 2018 +0000

----------------------------------------------------------------------
 shell/impala_shell.py                           |  2 +-
 shell/impala_shell_config_defaults.py           |  4 +
 shell/option_parser.py                          |  3 +
 tests/common/impala_test_suite.py               |  4 +
 .../test_shell_interactive_reconnect.py         | 15 +---
 tests/shell/test_shell_interactive.py           | 88 ++++++++++----------
 tests/shell/util.py                             | 11 ---
 7 files changed, 57 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/bf24a814/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 4348a4e..268f229 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -175,7 +175,7 @@ class ImpalaShell(object, cmd.Cmd):
     self.webserver_address = ImpalaShell.UNKNOWN_WEBSERVER
 
     self.current_db = options.default_db
-    self.history_file = os.path.expanduser("~/.impalahistory")
+    self.history_file = os.path.expanduser(options.history_file)
     # Stores the state of user input until a delimiter is seen.
     self.partial_cmd = str()
     # Stores the old prompt while the user input is incomplete.

http://git-wip-us.apache.org/repos/asf/impala/blob/bf24a814/shell/impala_shell_config_defaults.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell_config_defaults.py b/shell/impala_shell_config_defaults.py
index 6abf95e..260e93e 100644
--- a/shell/impala_shell_config_defaults.py
+++ b/shell/impala_shell_config_defaults.py
@@ -23,10 +23,14 @@ import getpass
 import os
 import socket
 
+_histfile_from_env = os.environ.get(
+    'IMPALA_HISTFILE', '~/.impalahistory')
+
 impala_shell_defaults = {
             'ca_cert': None,
             'config_file': os.path.expanduser("~/.impalarc"),
             'default_db': None,
+            'history_file': _histfile_from_env,
             'history_max': 1000,
             'ignore_query_failure': False,
             'impalad': socket.getfqdn() + ':21000',

http://git-wip-us.apache.org/repos/asf/impala/blob/bf24a814/shell/option_parser.py
----------------------------------------------------------------------
diff --git a/shell/option_parser.py b/shell/option_parser.py
index 3c8ae08..000e319 100755
--- a/shell/option_parser.py
+++ b/shell/option_parser.py
@@ -195,6 +195,9 @@ def get_option_parser(defaults):
                           "Specifying this option within a config file will have "
                           "no effect. Only specify this as an option in the commandline."
                           ))
+  parser.add_option("--history_file", dest="history_file",
+                    help=("The file in which to store shell history. This may also be "
+                          "configured using the IMPALA_HISTFILE environment variable."))
   parser.add_option("--live_summary", dest="print_summary", action="store_true",
                     help="Print a query summary every 1s while the query is running.")
   parser.add_option("--live_progress", dest="print_progress", action="store_true",

http://git-wip-us.apache.org/repos/asf/impala/blob/bf24a814/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 4a3f3c7..e7ac7b5 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -156,6 +156,10 @@ class ImpalaTestSuite(BaseTestSuite):
     elif IS_ADLS:
       cls.filesystem_client = ADLSClient(ADLS_STORE_NAME)
 
+    # Override the shell history path so that commands run by any tests
+    # don't write any history into the developer's file.
+    os.environ['IMPALA_HISTFILE'] = '/dev/null'
+
   @classmethod
   def teardown_class(cls):
     """Setup section that runs after each test suite"""

http://git-wip-us.apache.org/repos/asf/impala/blob/bf24a814/tests/custom_cluster/test_shell_interactive_reconnect.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_shell_interactive_reconnect.py b/tests/custom_cluster/test_shell_interactive_reconnect.py
index c747139..b5a9065 100644
--- a/tests/custom_cluster/test_shell_interactive_reconnect.py
+++ b/tests/custom_cluster/test_shell_interactive_reconnect.py
@@ -23,8 +23,7 @@ import os
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_service import ImpaladService
-from tests.common.skip import SkipIfBuildType
-from tests.shell.util import ImpalaShell, move_shell_history, restore_shell_history
+from tests.shell.util import ImpalaShell
 # Follow tests/shell/test_shell_interactive.py naming.
 from shell.impala_shell import ImpalaShell as ImpalaShellClass
 
@@ -37,18 +36,6 @@ class TestShellInteractiveReconnect(CustomClusterTestSuite):
   def get_workload(cls):
     return 'functional-query'
 
-  @classmethod
-  def setup_class(cls):
-    super(TestShellInteractiveReconnect, cls).setup_class()
-
-    cls.tempfile_name = tempfile.mktemp()
-    move_shell_history(cls.tempfile_name)
-
-  @classmethod
-  def teardown_class(cls):
-    restore_shell_history(cls.tempfile_name)
-    super(TestShellInteractiveReconnect, cls).teardown_class()
-
   @pytest.mark.execute_serially
   def test_manual_reconnect(self):
     p = ImpalaShell()

http://git-wip-us.apache.org/repos/asf/impala/blob/bf24a814/tests/shell/test_shell_interactive.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py
index c481a21..bb8ff4b 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -25,7 +25,6 @@ import re
 import signal
 import socket
 import sys
-import tempfile
 from time import sleep
 
 # This import is the actual ImpalaShell class from impala_shell.py.
@@ -34,29 +33,40 @@ from time import sleep
 # to mask it.
 from shell.impala_shell import ImpalaShell as ImpalaShellClass
 
+from tempfile import NamedTemporaryFile
 from tests.common.impala_service import ImpaladService
 from tests.common.skip import SkipIfLocal
 from util import assert_var_substitution, ImpalaShell
-from util import move_shell_history, restore_shell_history
 
 SHELL_CMD = "%s/bin/impala-shell.sh" % os.environ['IMPALA_HOME']
-SHELL_HISTORY_FILE = os.path.expanduser("~/.impalahistory")
 QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell')
 
 # Regex to match the interactive shell prompt that is expected after each command.
 PROMPT_REGEX = r'\[[^:]+:2100[0-9]\]'
 
-class TestImpalaShellInteractive(object):
-  """Test the impala shell interactively"""
 
-  @classmethod
-  def setup_class(cls):
-    cls.tempfile_name = tempfile.mktemp()
-    move_shell_history(cls.tempfile_name)
+@pytest.fixture
+def tmp_history_file(request):
+  """
+  Test fixture which uses a temporary file as the path for the shell
+  history.
+  """
+  tmp = NamedTemporaryFile()
+  old_path = os.environ.get('IMPALA_HISTFILE')
+  os.environ['IMPALA_HISTFILE'] = tmp.name
+
+  def cleanup():
+    if old_path is not None:
+      os.environ['IMPALA_HISTFILE'] = old_path
+    else:
+      del os.environ['IMPALA_HISTFILE']
 
-  @classmethod
-  def teardown_class(cls):
-    restore_shell_history(cls.tempfile_name)
+  request.addfinalizer(cleanup)
+  return tmp.name
+
+
+class TestImpalaShellInteractive(object):
+  """Test the impala shell interactively"""
 
   def _expect_with_cmd(self, proc, cmd, expectations=(), db="default"):
     """Executes a command on the expect process instance and verifies a set of
@@ -67,7 +77,6 @@ class TestImpalaShellInteractive(object):
     for e in expectations:
       assert e in proc.before
 
-  @pytest.mark.execute_serially
   def test_local_shell_options(self):
     """Test that setting the local shell options works"""
     proc = pexpect.spawn(SHELL_CMD)
@@ -114,7 +123,7 @@ class TestImpalaShellInteractive(object):
     # test print to file
     p1 = ImpalaShell()
     p1.send_cmd("use tpch")
-    local_file = tempfile.NamedTemporaryFile(delete=True)
+    local_file = NamedTemporaryFile(delete=True)
     p1.send_cmd("set output_file=%s" % local_file.name)
     p1.send_cmd("select * from nation")
     result = p1.get_result()
@@ -132,7 +141,6 @@ class TestImpalaShellInteractive(object):
     result = p2.get_result()
     assert "VIETNAM" in result.stdout
 
-  @pytest.mark.execute_serially
   def test_compute_stats_with_live_progress_options(self):
     """Test that setting LIVE_PROGRESS options won't cause COMPUTE STATS query fail"""
     p = ImpalaShell()
@@ -146,7 +154,6 @@ class TestImpalaShellInteractive(object):
     result = p.get_result()
     assert "Updated 1 partition(s) and 1 column(s)" in result.stdout
 
-  @pytest.mark.execute_serially
   def test_escaped_quotes(self):
     """Test escaping quotes"""
     # test escaped quotes outside of quotes
@@ -173,7 +180,6 @@ class TestImpalaShellInteractive(object):
     assert "Cancelled" not in result.stderr
     assert impalad.wait_for_num_in_flight_queries(0)
 
-  @pytest.mark.execute_serially
   def test_unicode_input(self):
     "Test queries containing non-ascii input"
     # test a unicode query spanning multiple lines
@@ -182,7 +188,6 @@ class TestImpalaShellInteractive(object):
     result = run_impala_shell_interactive(args)
     assert "Fetched 1 row(s)" in result.stderr
 
-  @pytest.mark.execute_serially
   def test_welcome_string(self):
     """Test that the shell's welcome message is only printed once
     when the shell is started. Ensure it is not reprinted on errors.
@@ -193,14 +198,12 @@ class TestImpalaShellInteractive(object):
     result = run_impala_shell_interactive('select * from non_existent_table;')
     assert result.stdout.count("Welcome to the Impala shell") == 1
 
-  @pytest.mark.execute_serially
   def test_disconnected_shell(self):
     """Test that the shell presents a disconnected prompt if it can't connect
     """
     result = run_impala_shell_interactive('asdf;', shell_args='-i foo')
     assert ImpalaShellClass.DISCONNECTED_PROMPT in result.stdout
 
-  @pytest.mark.execute_serially
   def test_bash_cmd_timing(self):
     """Test existence of time output in bash commands run from shell"""
     args = "! ls;"
@@ -279,8 +282,7 @@ class TestImpalaShellInteractive(object):
       run_impala_shell_interactive("drop table if exists %s.%s;" % (TMP_DB, TMP_TBL))
       run_impala_shell_interactive("drop database if exists foo;")
 
-  @pytest.mark.execute_serially
-  def test_multiline_queries_in_history(self):
+  def test_multiline_queries_in_history(self, tmp_history_file):
     """Test to ensure that multiline queries with comments are preserved in history
 
     Ensure that multiline queries are preserved when they're read back from history.
@@ -309,13 +311,26 @@ class TestImpalaShellInteractive(object):
     for _, history_entry in queries:
       assert history_entry in result.stderr, "'%s' not in '%s'" % (history_entry, result.stderr)
 
-  @pytest.mark.execute_serially
-  def test_rerun(self):
+  def test_history_file_option(self, tmp_history_file):
+    """
+    Setting the 'tmp_history_file' fixture above means that the IMPALA_HISTFILE
+    environment will be overriden. Here we override that environment by passing
+    the --history_file command line option, ensuring that the history ends up
+    in the appropriate spot.
+    """
+    with NamedTemporaryFile() as new_hist:
+      child_proc = pexpect.spawn(
+          SHELL_CMD,
+          args=["--history_file=%s" % new_hist.name])
+      child_proc.expect(":21000] default>")
+      self._expect_with_cmd(child_proc, "select 'hi'", ('hi'))
+      child_proc.sendline('exit;')
+      child_proc.expect(pexpect.EOF)
+      history_contents = file(new_hist.name).read()
+      assert "select 'hi'" in history_contents
+
+  def test_rerun(self, tmp_history_file):
     """Smoke test for the 'rerun' command"""
-    # Clear history first.
-    if os.path.exists(SHELL_HISTORY_FILE):
-      os.remove(SHELL_HISTORY_FILE)
-    assert not os.path.exists(SHELL_HISTORY_FILE)
     child_proc = pexpect.spawn(SHELL_CMD)
     child_proc.expect(":21000] default>")
     self._expect_with_cmd(child_proc, "@1", ("Command index out of range"))
@@ -345,7 +360,6 @@ class TestImpalaShellInteractive(object):
     self._expect_with_cmd(child_proc, "rerun1", ("Syntax error"))
     child_proc.sendline('quit;')
 
-  @pytest.mark.execute_serially
   def test_tip(self):
     """Smoke test for the TIP command"""
     # Temporarily add impala_shell module to path to get at TIPS list for verification
@@ -359,14 +373,12 @@ class TestImpalaShellInteractive(object):
       if t in result.stderr: return
     assert False, "No tip found in output %s" % result.stderr
 
-  @pytest.mark.execute_serially
   def test_var_substitution(self):
     cmds = open(os.path.join(QUERY_FILE_PATH, 'test_var_substitution.sql')).read()
     args = '''--var=foo=123 --var=BAR=456 --delimited "--output_delimiter= " '''
     result = run_impala_shell_interactive(cmds, shell_args=args)
     assert_var_substitution(result)
 
-  @pytest.mark.execute_serially
   def test_query_option_configuration(self):
     rcfile_path = os.path.join(QUERY_FILE_PATH, 'impalarc_with_query_options')
     args = '-Q MT_dop=1 --query_option=MAX_ERRORS=200 --config_file="%s"' % rcfile_path
@@ -378,7 +390,6 @@ class TestImpalaShellInteractive(object):
     assert "INVALID_QUERY_OPTION is not supported for the impalad being "
     "connected to, ignoring." in result.stdout
 
-  @pytest.mark.execute_serially
   def test_source_file(self):
     cwd = os.getcwd()
     try:
@@ -399,7 +410,6 @@ class TestImpalaShellInteractive(object):
     finally:
       os.chdir(cwd)
 
-  @pytest.mark.execute_serially
   def test_source_file_with_errors(self):
     full_path = "%s/tests/shell/shell_error.cmds" % os.environ['IMPALA_HOME']
     result = run_impala_shell_interactive("source %s;" % full_path)
@@ -412,13 +422,11 @@ class TestImpalaShellInteractive(object):
     assert "Query: SHOW TABLES" in result.stderr
     assert "alltypes" in result.stdout
 
-  @pytest.mark.execute_serially
   def test_source_missing_file(self):
     full_path = "%s/tests/shell/doesntexist.cmds" % os.environ['IMPALA_HOME']
     result = run_impala_shell_interactive("source %s;" % full_path)
     assert "No such file or directory" in result.stderr
 
-  @pytest.mark.execute_serially
   def test_zero_row_fetch(self):
     # IMPALA-4418: DROP and USE are generally exceptional statements where
     # the client does not fetch. For statements returning 0 rows we do not
@@ -429,7 +437,6 @@ class TestImpalaShellInteractive(object):
     assert "Fetched 0 row(s)" in result.stderr
     assert re.search('> \[', result.stdout)
 
-  @pytest.mark.execute_serially
   def test_set_and_set_all(self):
     """IMPALA-2181. Tests the outputs of SET and SET ALL commands. SET should contain the
     REGULAR and ADVANCED options only. SET ALL should contain all the options grouped by
@@ -472,7 +479,6 @@ class TestImpalaShellInteractive(object):
     shell.send_cmd(command)
     assert expected in shell.get_result().stderr
 
-  @pytest.mark.execute_serially
   def test_unexpected_conversion_for_literal_string_to_lowercase(self):
     # IMPALA-4664: Impala shell can accidentally convert certain literal
     # strings to lowercase. Impala shell splits each command into tokens
@@ -487,7 +493,6 @@ class TestImpalaShellInteractive(object):
     result = run_impala_shell_interactive("select\n'MUST_HAVE_UPPER_STRING'")
     assert re.search('MUST_HAVE_UPPER_STRING', result.stdout)
 
-  @pytest.mark.execute_serially
   def test_case_sensitive_command(self):
     # IMPALA-2640: Make a given command case-sensitive
     cwd = os.getcwd()
@@ -511,7 +516,6 @@ class TestImpalaShellInteractive(object):
     finally:
       os.chdir(cwd)
 
-  @pytest.mark.execute_serially
   def test_line_with_leading_comment(self):
     # IMPALA-2195: A line with a comment produces incorrect command.
     try:
@@ -560,7 +564,6 @@ class TestImpalaShellInteractive(object):
     finally:
       run_impala_shell_interactive('drop table if exists leading_comment;')
 
-  @pytest.mark.execute_serially
   def test_line_ends_with_comment(self):
     # IMPALA-5269: Test lines that end with a comment.
     queries = ['select 1 + 1; --comment',
@@ -599,7 +602,6 @@ class TestImpalaShellInteractive(object):
     result = run_impala_shell_interactive(query)
     assert '| id   |' in result.stdout
 
-  @pytest.mark.execute_serially
   def test_fix_infinite_loop(self):
     # IMPALA-6337: Fix infinite loop.
     result = run_impala_shell_interactive("select 1 + 1; \"\n;\";")
@@ -613,7 +615,6 @@ class TestImpalaShellInteractive(object):
     result = run_impala_shell_interactive("select '1\"23\"4'\";\n;\n\";")
     assert '| 1"23"4 |' in result.stdout
 
-  @pytest.mark.execute_serially
   def test_comment_with_quotes(self):
     # IMPALA-2751: Comment does not need to have matching quotes
     queries = [
@@ -631,7 +632,6 @@ class TestImpalaShellInteractive(object):
       result = run_impala_shell_interactive(query)
       assert '| 1 |' in result.stdout
 
-  @pytest.mark.execute_serially
   def test_shell_prompt(self):
     proc = pexpect.spawn(SHELL_CMD)
     proc.expect(":21000] default>")
@@ -739,7 +739,7 @@ def run_impala_shell_interactive(input_lines, shell_args=None):
   # since piping defaults to ascii
   my_env = os.environ
   my_env['PYTHONIOENCODING'] = 'utf-8'
-  p = ImpalaShell(shell_args, env=my_env)
+  p = ImpalaShell(args=shell_args, env=my_env)
   for line in input_lines:
     p.send_cmd(line)
   return p.get_result()

http://git-wip-us.apache.org/repos/asf/impala/blob/bf24a814/tests/shell/util.py
----------------------------------------------------------------------
diff --git a/tests/shell/util.py b/tests/shell/util.py
index 22ffa1a..0c899ab 100755
--- a/tests/shell/util.py
+++ b/tests/shell/util.py
@@ -112,17 +112,6 @@ def run_impala_shell_cmd_no_expect(shell_args, stdin_input=None):
   cmd = "%s %s" % (SHELL_CMD, shell_args)
   return result
 
-def move_shell_history(filepath):
-  """ Moves history file to given filepath.
-      If there is no history file, this function has no effect. """
-  if os.path.exists(SHELL_HISTORY_FILE):
-    shutil.move(SHELL_HISTORY_FILE, filepath)
-
-def restore_shell_history(filepath):
-  """ Moves back history file from given filepath.
-      If 'filepath' doesn't exist in the filesystem, this function has no effect. """
-  if os.path.exists(filepath): shutil.move(filepath, SHELL_HISTORY_FILE)
-
 class ImpalaShellResult(object):
   def __init__(self):
     self.rc = 0


[3/4] impala git commit: IMPALA-7390: Configure /etc/hosts to avoid rpc-mgr-kerberized-test issues.

Posted by kw...@apache.org.
IMPALA-7390: Configure /etc/hosts to avoid rpc-mgr-kerberized-test issues.

In the test-with-docker context, rpc-mgr-kerberized-test was failing,
ultimately due to the fact that the hostname was resolving to 127.0.0.1
and then back to 'localhost'.

This commit applies a workaround of adding "127.0.0.1 $(hostnahostname)"
to /etc/hosts, which allows the test to pass, and is what's done in
bootstrap_system.sh. In the Docker context, /etc/hosts needs to be
updated on every container start, because Docker tries to provide an
/etc/hosts for you. Previously, we were doing a different customization
(adding "hostname" to the existing "127.0.0.1" line), which wasn't good
enough for rpc-mgr-kerberized-test.

The original workaround, which is in "boostrap_system.sh" is still
in place, but I've added more documentation about reproduction there.
I've also filed HDFS-13797 to track the HDFS issue.

Change-Id: I91003cbc86177feb7f99563f61297f7da7fabab4
Reviewed-on: http://gerrit.cloudera.org:8080/11113
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2d6a459c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2d6a459c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2d6a459c

Branch: refs/heads/master
Commit: 2d6a459c76edbe619543fd50d68be72b79a63bc5
Parents: cbc8c63
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Fri Aug 3 10:38:58 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 8 00:26:34 2018 +0000

----------------------------------------------------------------------
 bin/bootstrap_system.sh | 21 ++++++++++++++++++++-
 docker/entrypoint.sh    |  9 ++++++---
 2 files changed, 26 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2d6a459c/bin/bootstrap_system.sh
----------------------------------------------------------------------
diff --git a/bin/bootstrap_system.sh b/bin/bootstrap_system.sh
index bce161b..e5f9fa9 100755
--- a/bin/bootstrap_system.sh
+++ b/bin/bootstrap_system.sh
@@ -184,8 +184,27 @@ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
 echo "NoHostAuthenticationForLocalhost yes" >> ~/.ssh/config
 ssh localhost whoami
 
-# Workarounds for HDFS networking issues
+# Workarounds for HDFS networking issues: On the minicluster, tests that rely
+# on WebHDFS may fail with "Connection refused" errors because the namenode
+# will return a "Location:" redirect to the hostname, but the datanode is only
+# listening on localhost. See also HDFS-13797. To reproduce this, the following
+# snippet may be useful:
+#
+#  $impala-python
+#  >>> import logging
+#  >>> logging.basicConfig(level=logging.DEBUG)
+#  >>> logging.getLogger("requests.packages.urllib3").setLevel(logging.DEBUG)
+#  >>> from pywebhdfs.webhdfs import PyWebHdfsClient
+#  >>> PyWebHdfsClient(host='localhost',port='5070', user_name='hdfs').read_file(
+#         "/test-warehouse/tpch.region/region.tbl")
+#  INFO:...:Starting new HTTP connection (1): localhost
+#  DEBUG:...:"GET /webhdfs/v1//t....tbl?op=OPEN&user.name=hdfs HTTP/1.1" 307 0
+#  INFO:...:Starting new HTTP connection (1): HOSTNAME.DOMAIN
+#  Traceback (most recent call last):
+#    ...
+#  ...ConnectionError: ('Connection aborted.', error(111, 'Connection refused'))
 echo "127.0.0.1 $(hostname -s) $(hostname)" | sudo tee -a /etc/hosts
+#
 # In Docker, one can change /etc/hosts as above but not with sed -i. The error message is
 # "sed: cannot rename /etc/sedc3gPj8: Device or resource busy". The following lines are
 # basically sed -i but with cp instead of mv for -i part.

http://git-wip-us.apache.org/repos/asf/impala/blob/2d6a459c/docker/entrypoint.sh
----------------------------------------------------------------------
diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh
index 0e192c9..205bd31 100755
--- a/docker/entrypoint.sh
+++ b/docker/entrypoint.sh
@@ -95,10 +95,13 @@ function boot_container() {
   # Update /etc/hosts to remove the entry for the unique docker hostname,
   # and instead point it to 127.0.0.1. Otherwise, HttpFS returns Location:
   # redirects to said hostname, but the relevant datanode isn't listening
-  # on the wildcard address.
-  sed -e /$(hostname)/d /etc/hosts -e /127.0.0.1/s,localhost,"localhost $(hostname)," \
-    > /tmp/hosts
+  # on the wildcard address. bootstrap_system.sh does this as well, but
+  # Docker creates a new /etc/hosts every time a container is created, so
+  # this needs to be done here as well.
+  #
   # "sed -i" in place doesn't work on Docker, because /etc/hosts is a bind mount.
+  sed -e /$(hostname)/d /etc/hosts > /tmp/hosts
+  echo "127.0.0.1 $(hostname -s) $(hostname)" >> /tmp/hosts
   sudo cp /tmp/hosts /etc/hosts
 
   echo Hostname: $(hostname)


[2/4] impala git commit: IMPALA-7163: Implement a state machine for the QueryState class

Posted by kw...@apache.org.
IMPALA-7163: Implement a state machine for the QueryState class

This patch adds a state machine for the QueryState class. The motivation
behind this patch is to make the query lifecycle from the point of
view of an executor much easier to reason about and this patch is key
for a follow on patch for IMPALA-2990 where the status reporting will
be per-query rather than per-fragment-instance. Currently, the state
machine provides no other purpose, and it will mostly be used for
IMPALA-2990.

We introduce 5 possible states for the QueryState which include 3
terminal states (FINISHED, CANCELLED and ERROR) and 2 non-terminal
states (PREPARING, EXECUTING). The transition from one state to the
next is always handled by a single thread which is also the QueryState
thread. This thread will additionally bear the purpose of sending
periodic updates after IMPALA-4063, which is the primary reason behind
having only this thread modify the state of the query.

Counting barriers are introduced to keep a count of how many fragment
instances have finished Preparing and Executing. These barriers also
block until all the fragment instances have finished a respective state.
The fragment instances update the query wide query status if an error is
hit and unblocks the barrier if it is in the EXECUTING state. The
PREPARING state blocks regardless of whether a fragment instance hit an
error or not, until all the fragment instances have completed
successfully or unsuccessfully, to maintain the invariant that fragment
instances cannot be cancelled until the entire QueryState has finished
PREPARING.

The status reporting protocol has not been changed and remains exactly
as it was.

Testing:
- Added 3 failure points in the query lifecycle using debug actions
  and added tests to validate the same (extension of IMPALA-7376).
- Ran 'core' and 'exhaustive' tests.

Future related work:
1) IMPALA-2990: Make status reporting per-query.
2) Try to logically align the FIS states with the QueryState states.
3) Consider mirroring the QueryState state machine to
CoordinatorBackendState

Change-Id: Iec5670a7db83ecae4656d7bb2ea372d3767ba7fe
Reviewed-on: http://gerrit.cloudera.org:8080/10813
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/cbc8c63e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/cbc8c63e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/cbc8c63e

Branch: refs/heads/master
Commit: cbc8c63ef0446550bd080e226b38307c4de967eb
Parents: 35bce6b
Author: Sailesh Mukil <sa...@apache.org>
Authored: Wed Jun 20 20:15:30 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 8 00:16:18 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc              |   9 +-
 be/src/runtime/fragment-instance-state.cc  |  51 +++++----
 be/src/runtime/fragment-instance-state.h   |   8 --
 be/src/runtime/query-state.cc              | 134 ++++++++++++++++++-----
 be/src/runtime/query-state.h               | 136 ++++++++++++++++++++++--
 common/thrift/ImpalaInternalService.thrift |   4 +-
 tests/failure/test_failpoints.py           |  26 +++++
 7 files changed, 299 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/cbc8c63e/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 1567c9a..4b87b69 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -144,14 +144,13 @@ Status Coordinator::Exec() {
     if (coord_instance_ == nullptr) {
       // at this point, the query is done with the Prepare phase, and we expect
       // to have a coordinator instance, but coord_instance_ == nullptr,
-      // which means we failed Prepare
-      Status prepare_status = query_state_->WaitForPrepare();
-      DCHECK(!prepare_status.ok());
-      return UpdateExecState(prepare_status, nullptr, FLAGS_hostname);
+      // which means we failed before or during Prepare().
+      Status query_status = query_state_->WaitForPrepare();
+      DCHECK(!query_status.ok());
+      return UpdateExecState(query_status, nullptr, FLAGS_hostname);
     }
     // When GetFInstanceState() returns the coordinator instance, the Prepare phase is
     // done and the FragmentInstanceState's root sink will be set up.
-    DCHECK(coord_instance_->IsPrepared() && coord_instance_->WaitForPrepare().ok());
     coord_sink_ = coord_instance_->root_sink();
     DCHECK(coord_sink_ != nullptr);
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/cbc8c63e/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index cbae601..51ff13d 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -71,13 +71,18 @@ FragmentInstanceState::FragmentInstanceState(
 }
 
 Status FragmentInstanceState::Exec() {
+  bool is_prepared = false;
   Status status = Prepare();
   DCHECK(runtime_state_ != nullptr);  // we need to guarantee at least that
-  discard_result(prepared_promise_.Set(status));
+
   if (!status.ok()) {
     discard_result(opened_promise_.Set(status));
     goto done;
   }
+  // Tell the managing 'QueryState' that we're done with Prepare().
+  query_state_->DonePreparing();
+  is_prepared = true;
+
   status = Open();
   discard_result(opened_promise_.Set(status));
   if (!status.ok()) goto done;
@@ -90,7 +95,23 @@ Status FragmentInstanceState::Exec() {
     status = ExecInternal();
   }
 
+  if (!status.ok()) goto done;
+  // Tell the managing 'QueryState' that we're done with executing and that we've stopped
+  // the reporting thread.
+  query_state_->DoneExecuting();
+
 done:
+  if (!status.ok()) {
+    if (!is_prepared) {
+      DCHECK_LE(current_state_.Load(), TFInstanceExecState::WAITING_FOR_PREPARE);
+      // Tell the managing 'QueryState' that we hit an error during Prepare().
+      query_state_->ErrorDuringPrepare(status, instance_id());
+    } else {
+      DCHECK_GT(current_state_.Load(), TFInstanceExecState::WAITING_FOR_PREPARE);
+      // Tell the managing 'QueryState' that we hit an error during execution.
+      query_state_->ErrorDuringExecute(status, instance_id());
+    }
+  }
   UpdateState(StateEvent::EXEC_END);
   // call this before Close() to make sure the thread token got released
   Finalize(status);
@@ -99,10 +120,6 @@ done:
 }
 
 void FragmentInstanceState::Cancel() {
-  // Make sure Prepare() finished. We don't care about the status since the query is
-  // being cancelled.
-  discard_result(WaitForPrepare());
-
   DCHECK(runtime_state_ != nullptr);
   runtime_state_->set_is_cancelled();
   if (root_sink_ != nullptr) root_sink_->Cancel(runtime_state_);
@@ -110,7 +127,7 @@ void FragmentInstanceState::Cancel() {
 }
 
 Status FragmentInstanceState::Prepare() {
-  DCHECK(!prepared_promise_.IsSet());
+  DCHECK_EQ(current_state_.Load(), TFInstanceExecState::WAITING_FOR_EXEC);
   VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(instance_ctx_);
 
   // Do not call RETURN_IF_ERROR or explicitly return before this line,
@@ -236,8 +253,8 @@ Status FragmentInstanceState::Prepare() {
 }
 
 Status FragmentInstanceState::Open() {
-  DCHECK(prepared_promise_.IsSet());
   DCHECK(!opened_promise_.IsSet());
+  DCHECK_EQ(current_state_.Load(), TFInstanceExecState::WAITING_FOR_PREPARE);
   SCOPED_TIMER(profile()->total_time_counter());
   SCOPED_TIMER(ADD_TIMER(timings_profile_, OPEN_TIMER_NAME));
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
@@ -266,6 +283,9 @@ Status FragmentInstanceState::Open() {
 
   {
     UpdateState(StateEvent::OPEN_START);
+    // Inject failure if debug actions are enabled.
+    RETURN_IF_ERROR(DebugAction(query_state_->query_options(), "FIS_IN_OPEN"));
+
     SCOPED_TIMER(ADD_CHILD_TIMER(timings_profile_, "ExecTreeOpenTime", OPEN_TIMER_NAME));
     RETURN_IF_ERROR(exec_tree_->Open(runtime_state_));
   }
@@ -273,6 +293,10 @@ Status FragmentInstanceState::Open() {
 }
 
 Status FragmentInstanceState::ExecInternal() {
+  DCHECK_EQ(current_state_.Load(), TFInstanceExecState::WAITING_FOR_OPEN);
+  // Inject failure if debug actions are enabled.
+  RETURN_IF_ERROR(DebugAction(query_state_->query_options(), "FIS_IN_EXEC_INTERNAL"));
+
   RuntimeProfile::Counter* plan_exec_timer =
       ADD_CHILD_TIMER(timings_profile_, "ExecTreeExecTime", EXEC_TIMER_NAME);
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
@@ -484,14 +508,6 @@ void FragmentInstanceState::ReleaseThreadToken() {
   }
 }
 
-Status FragmentInstanceState::WaitForPrepare() {
-  return prepared_promise_.Get();
-}
-
-bool FragmentInstanceState::IsPrepared() {
-  return prepared_promise_.IsSet();
-}
-
 Status FragmentInstanceState::WaitForOpen() {
   return opened_promise_.Get();
 }
@@ -499,8 +515,6 @@ Status FragmentInstanceState::WaitForOpen() {
 void FragmentInstanceState::PublishFilter(const TPublishFilterParams& params) {
   VLOG_FILE << "PublishFilter(): instance_id=" << PrintId(instance_id())
             << " filter_id=" << params.filter_id;
-  // Wait until Prepare() is done, so we know that the filter bank is set up.
-  if (!WaitForPrepare().ok()) return;
   runtime_state_->filter_bank()->PublishGlobalFilter(params);
 }
 
@@ -508,14 +522,15 @@ string FragmentInstanceState::ExecStateToString(const TFInstanceExecState::type
   // Labels to send to the debug webpages to display the current state to the user.
   static const string finstance_state_labels[] = {
       "Waiting for Exec",         // WAITING_FOR_EXEC
-      "Waiting for Codegen",      // WAITING_FOR_CODEGEN
       "Waiting for Prepare",      // WAITING_FOR_PREPARE
+      "Waiting for Codegen",      // WAITING_FOR_CODEGEN
       "Waiting for First Batch",  // WAITING_FOR_OPEN
       "Waiting for First Batch",  // WAITING_FOR_FIRST_BATCH
       "First batch produced",     // FIRST_BATCH_PRODUCED
       "Producing Data",           // PRODUCING_DATA
       "Last batch sent",          // LAST_BATCH_SENT
       "Finished"                  // FINISHED
+
   };
   /// Make sure we have a label for every possible state.
   static_assert(

http://git-wip-us.apache.org/repos/asf/impala/blob/cbc8c63e/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index df27f9c..d1f21f5 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -76,8 +76,6 @@ class RuntimeState;
 ///
 /// TODO:
 /// - absorb RuntimeState?
-/// - should WaitForPrepare/Open() return the overall execution status, if there
-///   was a failure?
 class FragmentInstanceState {
  public:
   FragmentInstanceState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
@@ -91,12 +89,6 @@ class FragmentInstanceState {
   /// Cancels execution and sends a final status report. Idempotent.
   void Cancel();
 
-  /// Blocks until the Prepare phase of Exec() is finished and returns the status.
-  Status WaitForPrepare();
-
-  /// Returns true if the Prepare phase of Exec() is finished.
-  bool IsPrepared();
-
   /// Blocks until the Prepare phase of Exec() is finished and the exec tree is
   /// opened, and returns that status. If the preparation phase encountered an error,
   /// GetOpenStatus() will return that error without blocking.

http://git-wip-us.apache.org/repos/asf/impala/blob/cbc8c63e/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index c86095c..2ae4a27 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -138,6 +138,11 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
   rpc_params_.fragment_instance_ctxs.swap(non_const_params.fragment_instance_ctxs);
   rpc_params_.__isset.fragment_instance_ctxs = true;
 
+  instances_prepared_barrier_.reset(
+      new CountingBarrier(rpc_params_.fragment_instance_ctxs.size()));
+  instances_finished_barrier_.reset(
+      new CountingBarrier(rpc_params_.fragment_instance_ctxs.size()));
+
   // Claim the query-wide minimum reservation. Do this last so that we don't need
   // to handle releasing it if a later step fails.
   initial_reservations_ = obj_pool_.Add(new InitialReservations(&obj_pool_,
@@ -193,9 +198,49 @@ Status QueryState::InitBufferPoolState() {
   return Status::OK();
 }
 
+const char* QueryState::BackendExecStateToString(const BackendExecState& state) {
+  static const unordered_map<BackendExecState, const char*> exec_state_to_str{
+      {BackendExecState::PREPARING, "PREPARING"},
+      {BackendExecState::EXECUTING, "EXECUTING"},
+      {BackendExecState::FINISHED, "FINISHED"},
+      {BackendExecState::CANCELLED, "CANCELLED"},
+      {BackendExecState::ERROR, "ERROR"}};
+
+  return exec_state_to_str.at(state);
+}
+
+inline bool QueryState::IsTerminalState(const BackendExecState& state) {
+  return state == BackendExecState::FINISHED
+      || state == BackendExecState::CANCELLED
+      || state == BackendExecState::ERROR;
+}
+
+Status QueryState::UpdateBackendExecState() {
+  BackendExecState old_state = backend_exec_state_;
+
+  unique_lock<SpinLock> l(status_lock_);
+  // We shouldn't call this function if we're already in a terminal state.
+  DCHECK(!IsTerminalState(backend_exec_state_))
+      << " Current State: " << BackendExecStateToString(backend_exec_state_)
+      << " | Current Status: " << query_status_.GetDetail();
+
+  if (query_status_.IsCancelled()) {
+    // Received cancellation - go to CANCELLED state.
+    backend_exec_state_ = BackendExecState::CANCELLED;
+  } else if (!query_status_.ok()) {
+    // Error while executing - go to ERROR state.
+    backend_exec_state_ = BackendExecState::ERROR;
+  } else {
+    // Transition to the next state in the lifecycle.
+    backend_exec_state_ = old_state == BackendExecState::PREPARING ?
+        BackendExecState::EXECUTING : BackendExecState::FINISHED;
+  }
+  return query_status_;
+}
+
 FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& instance_id) {
   VLOG_FILE << "GetFInstanceState(): instance_id=" << PrintId(instance_id);
-  if (!instances_prepared_promise_.Get().ok()) return nullptr;
+  if (!WaitForPrepare().ok()) return nullptr;
   auto it = fis_map_.find(instance_id);
   return it != fis_map_.end() ? it->second : nullptr;
 }
@@ -211,7 +256,6 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
   DCHECK(status.ok() || done);
   // if this is not for a specific fragment instance, we're reporting an error
   DCHECK(fis != nullptr || !status.ok());
-  DCHECK(fis == nullptr || fis->IsPrepared());
 
   // This will send a report even if we are cancelled.  If the query completed correctly
   // but fragments still need to be cancelled (e.g. limit reached), the coordinator will
@@ -293,7 +337,17 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
 }
 
 Status QueryState::WaitForPrepare() {
-  return instances_prepared_promise_.Get();
+  instances_prepared_barrier_->Wait();
+
+  unique_lock<SpinLock> l(status_lock_);
+  return query_status_;
+}
+
+Status QueryState::WaitForFinish() {
+  instances_finished_barrier_->Wait();
+
+  unique_lock<SpinLock> l(status_lock_);
+  return query_status_;
 }
 
 void QueryState::StartFInstances() {
@@ -306,7 +360,12 @@ void QueryState::StartFInstances() {
   DCHECK(query_ctx().__isset.desc_tbl);
   Status status = DescriptorTbl::Create(&obj_pool_, query_ctx().desc_tbl, &desc_tbl_);
   if (!status.ok()) {
-    discard_result(instances_prepared_promise_.Set(status));
+    ErrorDuringPrepare(status, TUniqueId());
+    Status updated_query_status = UpdateBackendExecState();
+    instances_prepared_barrier_->NotifyRemaining();
+    DCHECK(!updated_query_status.ok());
+    // TODO (IMPALA-4063): This call to ReportExecStatusAux() should internally be handled
+    // by UpdateBackendExecState().
     ReportExecStatusAux(true, status, nullptr, false);
     return;
   }
@@ -317,6 +376,7 @@ void QueryState::StartFInstances() {
   DCHECK_GT(rpc_params_.fragment_ctxs.size(), 0);
   TPlanFragmentCtx* fragment_ctx = &rpc_params_.fragment_ctxs[0];
   int fragment_ctx_idx = 0;
+  int num_unstarted_instances = rpc_params_.fragment_instance_ctxs.size();
   fragment_events_start_time_ = MonotonicStopWatch::Now();
   for (const TPlanFragmentInstanceCtx& instance_ctx: rpc_params_.fragment_instance_ctxs) {
     // determine corresponding TPlanFragmentCtx
@@ -333,13 +393,23 @@ void QueryState::StartFInstances() {
     // start new thread to execute instance
     refcnt_.Add(1); // decremented in ExecFInstance()
     AcquireExecResourceRefcount(); // decremented in ExecFInstance()
+
+    // Add the fragment instance ID to the 'fis_map_'.
+    fis_map_.emplace(fis->instance_id(), fis);
+
     string thread_name = Substitute("$0 (finst:$1)",
         FragmentInstanceState::FINST_THREAD_NAME_PREFIX,
         PrintId(instance_ctx.fragment_instance_id));
     unique_ptr<Thread> t;
-    thread_create_status = Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME,
-        thread_name, [this, fis]() { this->ExecFInstance(fis); }, &t, true);
+
+    // Inject thread creation failures through debug actions if enabled.
+    Status debug_action_status = DebugAction(query_options(), "FIS_FAIL_THREAD_CREATION");
+    thread_create_status = debug_action_status.ok() ?
+        Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
+            [this, fis]() { this->ExecFInstance(fis); }, &t, true) :
+        debug_action_status;
     if (!thread_create_status.ok()) {
+      fis_map_.erase(fis->instance_id());
       // Undo refcnt increments done immediately prior to Thread::Create(). The
       // reference counts were both greater than zero before the increments, so
       // neither of these decrements will free any structures.
@@ -347,33 +417,45 @@ void QueryState::StartFInstances() {
       ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
       break;
     }
-    // Fragment instance successfully started
-    fis_map_.emplace(fis->instance_id(), fis);
     // update fragment_map_
     vector<FragmentInstanceState*>& fis_list = fragment_map_[instance_ctx.fragment_idx];
     fis_list.push_back(fis);
     t->Detach();
+    --num_unstarted_instances;
   }
 
-  // don't return until every instance is prepared and record the first non-OK
-  // (non-CANCELLED if available) status (including any error from thread creation
-  // above).
-  Status prepare_status = thread_create_status;
-  for (auto entry: fis_map_) {
-    Status instance_status = entry.second->WaitForPrepare();
-    // don't wipe out an error in one instance with the resulting CANCELLED from
-    // the remaining instances
-    if (!instance_status.ok() && (prepare_status.ok() || prepare_status.IsCancelled())) {
-      prepare_status = instance_status;
-    }
-  }
-  discard_result(instances_prepared_promise_.Set(prepare_status));
-  // If this is aborting due to failure in thread creation, report status to the
-  // coordinator to start query cancellation. (Other errors are reported by the
-  // fragment instance itself.)
   if (!thread_create_status.ok()) {
+    // We failed to start 'num_unstarted_instances', so make sure to notify
+    // 'instances_prepared_barrier_' 'num_unstarted_instances - 1' times, to unblock
+    // WaitForPrepare(). The last remaining notification will be set by the call to
+    // ErrorDuringPrepare() below.
+    while (num_unstarted_instances > 1) {
+      DonePreparing();
+      --num_unstarted_instances;
+    }
+
+    // We prioritize thread creation failure as a query killing error, even over an error
+    // during Prepare() for a FIS.
+    // We have to notify anyone waiting on WaitForPrepare() that this query has failed.
+    ErrorDuringPrepare(thread_create_status, TUniqueId());
+    Status updated_query_status = UpdateBackendExecState();
+    DCHECK(!updated_query_status.ok());
+    // Block until all the already started fragment instances finish Prepare()-ing to
+    // to report an error.
+    discard_result(WaitForPrepare());
     ReportExecStatusAux(true, thread_create_status, nullptr, true);
+    return;
   }
+
+  discard_result(WaitForPrepare());
+  if (!UpdateBackendExecState().ok()) return;
+  DCHECK(backend_exec_state_ == BackendExecState::EXECUTING)
+      << BackendExecStateToString(backend_exec_state_);
+
+  discard_result(WaitForFinish());
+  if (!UpdateBackendExecState().ok()) return;
+  DCHECK(backend_exec_state_ == BackendExecState::FINISHED)
+      << BackendExecStateToString(backend_exec_state_);
 }
 
 void QueryState::AcquireExecResourceRefcount() {
@@ -414,13 +496,13 @@ void QueryState::ExecFInstance(FragmentInstanceState* fis) {
 
 void QueryState::Cancel() {
   VLOG_QUERY << "Cancel: query_id=" << PrintId(query_id());
-  (void) instances_prepared_promise_.Get();
+  discard_result(WaitForPrepare());
   if (!is_cancelled_.CompareAndSwap(0, 1)) return;
   for (auto entry: fis_map_) entry.second->Cancel();
 }
 
 void QueryState::PublishFilter(const TPublishFilterParams& params) {
-  if (!instances_prepared_promise_.Get().ok()) return;
+  if (!WaitForPrepare().ok()) return;
   DCHECK_EQ(fragment_map_.count(params.dst_fragment_idx), 1);
   for (FragmentInstanceState* fis : fragment_map_[params.dst_fragment_idx]) {
     fis->PublishFilter(params);

http://git-wip-us.apache.org/repos/asf/impala/blob/cbc8c63e/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index ae3bdd5..607d82a 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -19,6 +19,7 @@
 #define IMPALA_RUNTIME_QUERY_STATE_H
 
 #include <memory>
+#include <mutex>
 #include <unordered_map>
 #include <boost/scoped_ptr.hpp>
 
@@ -27,8 +28,9 @@
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/Types_types.h"
 #include "runtime/tmp-file-mgr.h"
-#include "util/uid-util.h"
+#include "util/counting-barrier.h"
 #include "util/promise.h"
+#include "util/uid-util.h"
 
 namespace impala {
 
@@ -61,6 +63,19 @@ class RuntimeState;
 /// When any fragment instance execution returns with an error status, all
 /// fragment instances are automatically cancelled.
 ///
+/// We maintain a state denoted by BackendExecState. We transition from one non-error
+/// state to the next only if *all* underlying fragment instances have done so.
+/// Eg: We transition from the PREPARING state to the EXECUTING state only if *all* the
+/// underlying fragment instances have finished Prepare().
+/// However, the behavior for transitioning from a non-error state to an error state is
+/// different for different states. If any fragment instance hits an error or cancellation
+/// during the EXECUTING state, then we immediately change the state of the query to the
+/// ERROR or CANCELLED state accordingly.
+/// However, if a fragment instance hits an error during Prepare(), we still wait for
+/// *all* fragment instances to complete preparing before transitioning to the ERROR
+/// state. This is to simplify the query lifecycle so that Prepare() is always completed
+/// before it can handle either a Cancel() RPC or a PublishFilter() RPC.
+///
 /// Status reporting: all instances currently report their status independently.
 /// Each instance sends at least one final status report with its overall execution
 /// status, so if any of the instances encountered an error, that error will be reported.
@@ -130,15 +145,10 @@ class QueryState {
 
   /// Performs the runtime-intensive parts of initial setup and starts all fragment
   /// instances belonging to this query. Each instance receives its own execution
-  /// thread. Blocks until all fragment instances have finished their Prepare phase.
-  /// Not idempotent, not thread-safe.
+  /// thread. Blocks until a terminal state has been reached.
+  /// Not idempotent, not thread-safe. Must only be called by the QueryState thread.
   void StartFInstances();
 
-  /// Return overall status of Prepare phases of fragment instances. A failure
-  /// in any instance's Prepare will cause this function to return an error status.
-  /// Blocks until all fragment instances have finished their Prepare phase.
-  Status WaitForPrepare();
-
   /// Blocks until all fragment instances have finished their Prepare phase.
   FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id);
 
@@ -176,6 +186,48 @@ class QueryState {
 
   ~QueryState();
 
+  /// Return overall status of Prepare() phases of fragment instances. A failure
+  /// in any instance's Prepare() will cause this function to return an error status.
+  /// Blocks until all fragment instances have finished their Prepare() phase.
+  Status WaitForPrepare();
+
+  /// Called by a FragmentInstanceState thread to notify that it's done preparing.
+  void DonePreparing() { discard_result(instances_prepared_barrier_->Notify()); }
+
+  /// Called by a FragmentInstanceState thread to notify that it's done executing.
+  void DoneExecuting() { discard_result(instances_finished_barrier_->Notify()); }
+
+  /// Called by a fragment instance thread to notify that it hit an error during Prepare()
+  /// Updates the query status and the failed instance ID if it's not set already.
+  /// Also notifies anyone waiting on WaitForPrepare() if this is called by the last
+  /// fragment instance to complete Prepare().
+  void ErrorDuringPrepare(const Status& status, const TUniqueId& finst_id) {
+    // Do a racy check to avoid getting the lock if an error is already set.
+    if (query_status_.ok()) {
+      std::unique_lock<SpinLock> l(status_lock_);
+      if (query_status_.ok()) {
+        query_status_ = status;
+        failed_finstance_id_ = finst_id;
+      }
+    }
+    discard_result(instances_prepared_barrier_->Notify());
+  }
+
+  /// Called by a fragment instance thread to notify that it hit an error during Execute()
+  /// Updates the query status and records the failed instance ID if they're not set
+  /// already. Also notifies anyone waiting on WaitForFinish().
+  void ErrorDuringExecute(const Status& status, const TUniqueId& finst_id) {
+    // Do a racy check to avoid getting the lock if an error is already set.
+    if (query_status_.ok()) {
+      std::unique_lock<SpinLock> l(status_lock_);
+      if (query_status_.ok()) {
+        query_status_ = status;
+        failed_finstance_id_ = finst_id;
+      }
+    }
+    instances_finished_barrier_->NotifyRemaining();
+  }
+
  private:
   friend class QueryExecMgr;
 
@@ -185,6 +237,60 @@ class QueryState {
 
   static const int DEFAULT_BATCH_SIZE = 1024;
 
+  /// Return overall status of all fragment instances during execution. A failure
+  /// in any instance's execution (after Prepare()) will cause this function
+  /// to return an error status. Blocks until all fragment instances have finished
+  /// executing or until one of them hits an error.
+  Status WaitForFinish();
+
+  /// States that a query goes through during its lifecycle.
+  enum class BackendExecState {
+    /// PREPARING: The inital state on receiving an ExecQueryFInstances() RPC from the
+    /// coordinator. Implies that the fragment instances are being started.
+    PREPARING,
+    /// EXECUTING: All fragment instances managed by this QueryState have successfully
+    /// completed Prepare(). Implies that the query is executing.
+    EXECUTING,
+    /// FINISHED: All fragment instances managed by this QueryState have successfully
+    /// completed executing.
+    FINISHED,
+    /// CANCELLED: This query received a CancelQueryFInstances() RPC or was directed by
+    /// the coordinator to cancel itself from a response to a ReportExecStatus() RPC.
+    /// Does not imply that all the fragment instances have realized cancellation however.
+    CANCELLED,
+    /// ERROR: received an error from a fragment instance.
+    ERROR
+  };
+
+  /// Current state of this query in this executor.
+  /// Thread-safety: Only updated by the QueryState thread.
+  BackendExecState backend_exec_state_ = BackendExecState::PREPARING;
+
+  /// Updates the BackendExecState based on 'query_status_'. A state transition happens
+  /// if the current state is a non-terminal state; the transition can either be to the
+  /// next legal state or ERROR if 'query_status_' is an error. Thread safe. This is a
+  /// helper function to StartFInstances() which executes on the QueryState thread.
+  Status UpdateBackendExecState();
+
+  /// A string representation of 'state'.
+  const char* BackendExecStateToString(const BackendExecState& state);
+
+  /// Returns 'true' if 'state' is a terminal state (FINISHED, CANCELLED, ERROR).
+  inline bool IsTerminalState(const BackendExecState& state);
+
+  /// Protects 'query_status_' and 'failed_finstance_id_'.
+  SpinLock status_lock_;
+
+  /// The overall status of this QueryState.
+  /// Status::OK if all the fragment instances managed by this QS are also Status::OK;
+  /// Otherwise, it will reflect the first non-OK status of a FIS.
+  /// Protected by 'status_lock_'.
+  Status query_status_;
+
+  /// ID of first fragment instance to hit an error.
+  /// Protected by 'status_lock_'.
+  TUniqueId failed_finstance_id_;
+
   /// set in c'tor
   const TQueryCtx query_ctx_;
 
@@ -216,9 +322,17 @@ class QueryState {
   /// created in StartFInstances(), owned by obj_pool_
   DescriptorTbl* desc_tbl_ = nullptr;
 
-  /// Barrier for the completion of the Prepare phases of all fragment instances,
-  /// set in StartFInstances().
-  Promise<Status> instances_prepared_promise_;
+  /// Barrier for the completion of the Prepare() phases of all fragment instances. This
+  /// just blocks until ALL fragment instances have finished preparing, regardless of
+  /// whether they hit an error or not.
+  std::unique_ptr<CountingBarrier> instances_prepared_barrier_;
+
+  /// Barrier for the completion of all the fragment instances.
+  /// If the 'Status' is not OK due to an error during fragment instance execution, this
+  /// barrier is unblocked immediately.
+  /// 'query_status_' will be set once this is unblocked and so will 'failed_instance_id_'
+  /// if an error is hit.
+  std::unique_ptr<CountingBarrier> instances_finished_barrier_;
 
   /// map from instance id to its state (owned by obj_pool_), populated in
   /// StartFInstances(); not valid to read from until instances_prepare_promise_

http://git-wip-us.apache.org/repos/asf/impala/blob/cbc8c63e/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index c22b662..80da2a9 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -624,10 +624,12 @@ struct TErrorLogEntry {
 // Represents the states that a fragment instance goes through during its execution. The
 // current state gets sent back to the coordinator and will be presented to users through
 // the debug webpages.
+// The states are listed in order and one state will only strictly be reached after all
+// the previous states.
 enum TFInstanceExecState {
   WAITING_FOR_EXEC,
-  WAITING_FOR_CODEGEN,
   WAITING_FOR_PREPARE,
+  WAITING_FOR_CODEGEN,
   WAITING_FOR_OPEN,
   WAITING_FOR_FIRST_BATCH,
   FIRST_BATCH_PRODUCED,

http://git-wip-us.apache.org/repos/asf/impala/blob/cbc8c63e/tests/failure/test_failpoints.py
----------------------------------------------------------------------
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index 278c2f1..ef9ed07 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -146,6 +146,32 @@ class TestFailpoints(ImpalaTestSuite):
     self.execute_query_expect_failure(self.client, query,
         query_options={'debug_action':debug_action})
 
+    # Fail the Open() phase of all fragment instances.
+    debug_action = 'FIS_IN_OPEN:FAIL@1.0'
+    self.execute_query_expect_failure(self.client, query,
+        query_options={'debug_action': debug_action})
+
+    # Fail the ExecInternal() phase of all fragment instances.
+    debug_action = 'FIS_IN_EXEC_INTERNAL:FAIL@1.0'
+    self.execute_query_expect_failure(self.client, query,
+        query_options={'debug_action': debug_action})
+
+    # Fail the fragment instance thread creation with a 0.5 probability.
+    debug_action = 'FIS_FAIL_THREAD_CREATION:FAIL@0.5'
+
+    # We want to test the behavior when only some fragment instance threads fail to be
+    # created, so we set the probability of fragment instance thread creation failure to
+    # 0.5. Since there's only a 50% chance of fragment instance thread creation failure,
+    # we attempt to catch a query failure up to a very conservative maximum of 50 tries.
+    for i in range(50):
+      try:
+        self.execute_query(query,
+            query_options={'debug_action': debug_action})
+      except ImpalaBeeswaxException as e:
+        assert 'Query aborted:Debug Action: FIS_FAIL_THREAD_CREATION:FAIL@0.5' \
+            in str(e), str(e)
+        break
+
   def __execute_fail_action(self, query, vector):
     try:
       self.execute_query(query, vector.get_value('exec_option'),