You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/10/27 05:18:59 UTC

[1/5] impala git commit: IMPALA-7760: Privilege version inconsistency causes a hang when running invalidate metadata

Repository: impala
Updated Branches:
  refs/heads/master de0c6bd6b -> 449fe73d2


IMPALA-7760: Privilege version inconsistency causes a hang when running invalidate metadata

Before this patch, a bug in SentryProxy caused a hang when running
invalidate metadata due to privilege version inconsistency. I was able
to manually reproduce the issue by doing the following steps:

1. Get all Sentry role privileges for role a: [x, y] --> in SentryProxy
2. Add a sleep statement before getting all Sentry roles to simulate the
   timing issue--> in SentryProxy
3. Remove role a --> Externally via Sentry CLI
4. Privileges x and y in step 1 do not get removed in the catalog even
   those they were removed in step 3, which causes the catalog version
   inconsistency
5. Run invalidate metadata, this will cause it to hang due to catalog
   version inconsistency

The fix is to remove all privileges in the catalog if there are no
privileges (null or empty) returned by Sentry.

Testing:
- Manually tested the patch with by the above steps and did not
  encounter the hang  when issuing invalidate metadata.

Change-Id: Ib1e0db2b1f727476f489c732c4f4e5bc1582429f
Reviewed-on: http://gerrit.cloudera.org:8080/11794
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f8b7f257
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f8b7f257
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f8b7f257

Branch: refs/heads/master
Commit: f8b7f257fb676d96fbde0ce9b078bbe4af3d4a4f
Parents: de0c6bd
Author: Fredy Wijaya <fw...@cloudera.com>
Authored: Thu Oct 25 13:33:29 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 26 20:35:17 2018 +0000

----------------------------------------------------------------------
 fe/src/main/java/org/apache/impala/util/SentryProxy.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/f8b7f257/fe/src/main/java/org/apache/impala/util/SentryProxy.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/SentryProxy.java b/fe/src/main/java/org/apache/impala/util/SentryProxy.java
index acad161..be2f7f3 100644
--- a/fe/src/main/java/org/apache/impala/util/SentryProxy.java
+++ b/fe/src/main/java/org/apache/impala/util/SentryProxy.java
@@ -257,7 +257,9 @@ public class SentryProxy {
       // in allPrincipalPrivileges. See IMPALA-7729 for more information.
       Set<TSentryPrivilege> sentryPrivileges = allPrincipalPrivileges.get(
           sentryPrincipalName);
-      if (sentryPrivileges == null) return;
+      if (sentryPrivileges == null) {
+        sentryPrivileges = Sets.newHashSet();
+      }
       // Check all the privileges that are part of this principal.
       for (TSentryPrivilege sentryPriv: sentryPrivileges) {
         TPrivilege thriftPriv =


[3/5] impala git commit: Update .gitignore

Posted by ta...@apache.org.
Update .gitignore

A few unversioned artifacts crept in over time without corresponding
.gitignore entries. These are the updates based on the git status output
on my dev env.

Change-Id: I281ab3b5c98ac32e5d60663562628ffda6606a6a
Reviewed-on: http://gerrit.cloudera.org:8080/11787
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/aa654d4b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/aa654d4b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/aa654d4b

Branch: refs/heads/master
Commit: aa654d4b87dd2f2b590e8a300f686bd2154e63d2
Parents: 8d628d7
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Oct 25 10:55:47 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 26 22:19:35 2018 +0000

----------------------------------------------------------------------
 .gitignore                       | 17 +++++++++++++++++
 common/yarn-extras/.gitignore    | 10 ++++++++++
 fe/.gitignore                    |  1 +
 fe/src/test/resources/.gitignore |  1 +
 lib/python/.gitignore            |  1 +
 testdata/.gitignore              |  3 +++
 testdata/cluster/.gitignore      |  2 ++
 7 files changed, 35 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/aa654d4b/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index a39b39e..7963ccc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -13,6 +13,7 @@ org.eclipse.jdt.ui.prefs
 load-*-generated.sql
 bin/version.info
 bin/impala-config-local.sh
+.cache
 .cdh
 
 # distcc options
@@ -56,9 +57,25 @@ hs_err_pid*.log
 
 # Binaries disallowed by ASF rules
 *.jar
+*.so
 
 # Vim related files
+.swp
 .*.swn
+.*.swm
 .*.swo
 .*.swp
 .vimrc
+
+# GDB default log file
+gdb.txt
+.gdbinit
+
+# IntelliJ artifacts
+.idea
+*.iml
+*.ipr
+*.iws
+
+# OS X Artifacts
+.DS_Store

http://git-wip-us.apache.org/repos/asf/impala/blob/aa654d4b/common/yarn-extras/.gitignore
----------------------------------------------------------------------
diff --git a/common/yarn-extras/.gitignore b/common/yarn-extras/.gitignore
new file mode 100644
index 0000000..952ea70
--- /dev/null
+++ b/common/yarn-extras/.gitignore
@@ -0,0 +1,10 @@
+target
+
+# Eclipse artifacts
+.classpath
+.settings
+.project
+*.launch
+
+# emacs backup files
+*~

http://git-wip-us.apache.org/repos/asf/impala/blob/aa654d4b/fe/.gitignore
----------------------------------------------------------------------
diff --git a/fe/.gitignore b/fe/.gitignore
index 11a5e9d..08d2c7f 100644
--- a/fe/.gitignore
+++ b/fe/.gitignore
@@ -1,3 +1,4 @@
+bin
 target
 
 # Eclipse artifacts

http://git-wip-us.apache.org/repos/asf/impala/blob/aa654d4b/fe/src/test/resources/.gitignore
----------------------------------------------------------------------
diff --git a/fe/src/test/resources/.gitignore b/fe/src/test/resources/.gitignore
index f7224b1..78d55cf 100644
--- a/fe/src/test/resources/.gitignore
+++ b/fe/src/test/resources/.gitignore
@@ -4,3 +4,4 @@ hive-log4j.properties
 log4j.properties
 sentry-site*.xml
 yarn-site.xml
+copy-mem-limit-test-*.xml

http://git-wip-us.apache.org/repos/asf/impala/blob/aa654d4b/lib/python/.gitignore
----------------------------------------------------------------------
diff --git a/lib/python/.gitignore b/lib/python/.gitignore
new file mode 100644
index 0000000..05e70ca
--- /dev/null
+++ b/lib/python/.gitignore
@@ -0,0 +1 @@
+impala_py_lib.egg-info

http://git-wip-us.apache.org/repos/asf/impala/blob/aa654d4b/testdata/.gitignore
----------------------------------------------------------------------
diff --git a/testdata/.gitignore b/testdata/.gitignore
index e05079c..62dcf9a 100644
--- a/testdata/.gitignore
+++ b/testdata/.gitignore
@@ -31,3 +31,6 @@ load-*-hbase.create
 
 # Temporary data generated by tests.
 tmp-scanner-fuzz*
+
+# Test cluster data.
+data

http://git-wip-us.apache.org/repos/asf/impala/blob/aa654d4b/testdata/cluster/.gitignore
----------------------------------------------------------------------
diff --git a/testdata/cluster/.gitignore b/testdata/cluster/.gitignore
index a8aeb3e..ea2f594 100644
--- a/testdata/cluster/.gitignore
+++ b/testdata/cluster/.gitignore
@@ -1 +1,3 @@
 /kudu
+cdh5
+cdh6


[4/5] impala git commit: IMPALA-7758: Fix LOCATION clause when creating chars_formats_*

Posted by ta...@apache.org.
IMPALA-7758: Fix LOCATION clause when creating chars_formats_*

The current location resolves to /user/hive/warehouse/chars_formats_*.

Impala's test data actually lives at /test-warehouse/chars_formats_*.

Tested this by reloading data from scratch and running the core tests.

Change-Id: I781b484e7a15ccaa5de590563d68b3dca6a658e5
Reviewed-on: http://gerrit.cloudera.org:8080/11789
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2e5d6581
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2e5d6581
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2e5d6581

Branch: refs/heads/master
Commit: 2e5d65819aaa52e1a89bc5cc212bba3b1b404339
Parents: aa654d4
Author: David Knupp <dk...@cloudera.com>
Authored: Wed Oct 24 13:38:14 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Sat Oct 27 00:56:54 2018 +0000

----------------------------------------------------------------------
 testdata/bin/load-dependent-tables.sql | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2e5d6581/testdata/bin/load-dependent-tables.sql
----------------------------------------------------------------------
diff --git a/testdata/bin/load-dependent-tables.sql b/testdata/bin/load-dependent-tables.sql
index e94def7..d4ff102 100644
--- a/testdata/bin/load-dependent-tables.sql
+++ b/testdata/bin/load-dependent-tables.sql
@@ -39,7 +39,7 @@ CREATE EXTERNAL TABLE alltypesmixedformat (
 partitioned by (year int, month int)
 row format delimited fields terminated by ','  escaped by '\\'
 stored as TEXTFILE
-LOCATION '${hiveconf:hive.metastore.warehouse.dir}/alltypesmixedformat';
+LOCATION '/test-warehouse/alltypesmixedformat';
 
 INSERT OVERWRITE TABLE alltypesmixedformat PARTITION (year=2009, month=1)
 SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
@@ -76,26 +76,26 @@ DROP TABLE IF EXISTS functional_parquet.chars_formats;
 CREATE EXTERNAL TABLE functional_parquet.chars_formats
 (cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
 STORED AS PARQUET
-LOCATION '${hiveconf:hive.metastore.warehouse.dir}/chars_formats_parquet';
+LOCATION '/test-warehouse/chars_formats_parquet';
 
 DROP TABLE IF EXISTS functional_orc_def.chars_formats;
 CREATE EXTERNAL TABLE functional_orc_def.chars_formats
 (cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
 STORED AS ORC
-LOCATION '${hiveconf:hive.metastore.warehouse.dir}/chars_formats_orc_def';
+LOCATION '/test-warehouse/chars_formats_orc_def';
 
 DROP TABLE IF EXISTS functional.chars_formats;
 CREATE EXTERNAL TABLE functional.chars_formats
 (cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
 ROW FORMAT delimited fields terminated by ','  escaped by '\\'
 STORED AS TEXTFILE
-LOCATION '${hiveconf:hive.metastore.warehouse.dir}/chars_formats_text';
+LOCATION '/test-warehouse/chars_formats_text';
 
 DROP TABLE IF EXISTS functional_avro_snap.chars_formats;
 CREATE EXTERNAL TABLE functional_avro_snap.chars_formats
 (cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
 STORED AS AVRO
-LOCATION '${hiveconf:hive.metastore.warehouse.dir}/chars_formats_avro_snap'
+LOCATION '/test-warehouse/chars_formats_avro_snap'
 TBLPROPERTIES ('avro.schema.literal'='{"type":"record",
 "name":"CharTypesTest","doc":"Schema generated by Kite",
 "fields":[


[2/5] impala git commit: IMPALA-7710: test_owner_privileges_with_grant failed with AuthorizationException

Posted by ta...@apache.org.
IMPALA-7710: test_owner_privileges_with_grant failed with AuthorizationException

The problem was acache consistency issue between impalad and catalogd.
Because a Sentry refresh was occuring during an update to privileges
from the alter table set owner, impalad had the correct privileges,
which allowed the "show grant role" to succeed but the privileges in
catalogd were being overwritten from the sentry refresh. Added a delay
in the drop call to ensure privileges are updated. This is a
workaround to get the tests to pass with the existing behaviour and
should be reassessed if IMPALA-7763 is implemented.  This would add a
lock to possibly prevent this, but will need it's own assessment.

Testing:
- Ran custom cluster tests 50 times

Change-Id: I5a1babd3dcbb94ffaa1f3e6ef2cebf1a1d391219
Reviewed-on: http://gerrit.cloudera.org:8080/11786
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8d628d7b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8d628d7b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8d628d7b

Branch: refs/heads/master
Commit: 8d628d7b62c4903bfd0236d2de150dae0752f0fe
Parents: f8b7f25
Author: Adam Holley <gi...@holleyism.com>
Authored: Thu Oct 18 06:43:29 2018 -0500
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 26 20:54:45 2018 +0000

----------------------------------------------------------------------
 tests/authorization/test_owner_privileges.py | 37 +++++++++++++----------
 1 file changed, 21 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/8d628d7b/tests/authorization/test_owner_privileges.py
----------------------------------------------------------------------
diff --git a/tests/authorization/test_owner_privileges.py b/tests/authorization/test_owner_privileges.py
index 4cc2193..6520cd0 100644
--- a/tests/authorization/test_owner_privileges.py
+++ b/tests/authorization/test_owner_privileges.py
@@ -38,8 +38,6 @@ SENTRY_LONG_POLLING_FREQUENCY_S = 60
 SENTRY_POLLING_FREQUENCY_S = 1
 # The timeout, in seconds, when waiting for a refresh of Sentry privileges.
 SENTRY_REFRESH_TIMEOUT_S = SENTRY_POLLING_FREQUENCY_S * 2
-# The timeout needed because of statestore refresh.
-STATESTORE_TIMEOUT_S = 3
 
 SENTRY_CONFIG_DIR = getenv('IMPALA_HOME') + '/fe/src/test/resources/'
 SENTRY_BASE_LOG_DIR = getenv('IMPALA_CLUSTER_LOGS_DIR') + "/sentry"
@@ -93,13 +91,8 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
     return total
 
   def _validate_user_privilege_count(self, client, query, user, delay_s, count):
-    start_time = time()
-    while time() - start_time < STATESTORE_TIMEOUT_S:
-      result = self.user_query(client, query, user=user, delay_s=delay_s)
-      if self.count_user_privileges(result) == count:
-        return True
-      sleep(1)
-    return False
+    result = self.user_query(client, query, user=user, delay_s=delay_s)
+    return self.count_user_privileges(result) == count
 
   def _test_cleanup(self):
     # Admin for manipulation and cleaning up.
@@ -191,14 +184,17 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
         % (test_obj.grant_name, test_obj.obj_name), user="oo_user1")
 
     # Change the database owner and ensure oo_user1 does not have owner privileges.
+    # Use a delay to avoid cache consistency issue that could occur after create.
     self.user_query(self.oo_user1_impalad_client, "alter %s %s set owner user oo_user2"
-        % (test_obj.obj_type, test_obj.obj_name), user="oo_user1")
+        % (test_obj.obj_type, test_obj.obj_name), user="oo_user1",
+        delay_s=sentry_refresh_timeout_s)
     assert self._validate_user_privilege_count(self.oo_user1_impalad_client,
         "show grant user oo_user1", "oo_user1", sentry_refresh_timeout_s, 0)
 
     # Ensure oo_user1 cannot drop database after owner change.
+    # Use a delay to avoid cache consistency issue that could occur after alter.
     self.user_query(self.oo_user1_impalad_client, "drop %s %s" % (test_obj.obj_type,
-        test_obj.obj_name), user="oo_user1",
+        test_obj.obj_name), user="oo_user1", delay_s=sentry_refresh_timeout_s,
         error_msg="does not have privileges to execute 'DROP'")
 
     # oo_user2 should have privileges for object now.
@@ -212,9 +208,11 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
         test_obj.obj_name))
     assert self._validate_user_privilege_count(self.oo_user2_impalad_client,
         "show grant user oo_user2", "oo_user2", sentry_refresh_timeout_s, 0)
+    # Use a delay to avoid cache consistency issue that could occur after alter.
     self.user_query(self.oo_user1_impalad_client,
         "alter %s %s set owner role owner_priv_test_owner_role"
-        % (test_obj.obj_type, test_obj.obj_name), user="oo_user1")
+        % (test_obj.obj_type, test_obj.obj_name), user="oo_user1",
+        delay_s=sentry_refresh_timeout_s)
     # Ensure oo_user1 does not have user privileges.
     assert self._validate_user_privilege_count(self.oo_user1_impalad_client,
         "show grant user oo_user1", "oo_user1", sentry_refresh_timeout_s, 0)
@@ -225,17 +223,20 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
         "oo_user1")
 
     # Drop the object and ensure no role privileges.
+    # Use a delay to avoid cache consistency issue that could occur after alter.
     self.user_query(self.oo_user1_impalad_client, "drop %s %s " % (test_obj.obj_type,
-        test_obj.obj_name), user="oo_user1")
+        test_obj.obj_name), user="oo_user1", delay_s=sentry_refresh_timeout_s)
     assert self._validate_user_privilege_count(self.oo_user1_impalad_client,
         "show grant user oo_user1", "oo_user1", sentry_refresh_timeout_s, 0)
 
     # Ensure user privileges are gone after drop.
+    # Use a delay to avoid cache consistency issue that could occur after drop.
     self.user_query(self.oo_user1_impalad_client, "create %s if not exists %s %s %s"
         % (test_obj.obj_type, test_obj.obj_name, test_obj.table_def,
-        test_obj.view_select), user="oo_user1")
+        test_obj.view_select), user="oo_user1", delay_s=sentry_refresh_timeout_s)
+    # Use a delay to avoid cache consistency issue that could occur after create.
     self.user_query(self.oo_user1_impalad_client, "drop %s %s " % (test_obj.obj_type,
-        test_obj.obj_name), user="oo_user1")
+        test_obj.obj_name), user="oo_user1", delay_s=sentry_refresh_timeout_s)
     assert self._validate_user_privilege_count(self.oo_user1_impalad_client,
         "show grant user oo_user1", "oo_user1", sentry_refresh_timeout_s, 0)
 
@@ -312,6 +313,7 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
         error_msg="does not have privileges with 'GRANT OPTION'")
 
     # Ensure oo_user1 cannot drop database.
+    # Use a delay to avoid cache consistency issue that could occur after alter.
     self.user_query(self.oo_user1_impalad_client, "drop %s %s" % (test_obj.obj_type,
         test_obj.obj_name), user="oo_user1",
         error_msg="does not have privileges to execute 'DROP'",
@@ -388,11 +390,14 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
         % (test_obj.grant_name, test_obj.obj_name), user="oo_user1",
         error_msg="does not have privileges to execute: REVOKE_PRIVILEGE")
 
+    # Use a delay to avoid cache consistency issue that could occur after create.
     self.user_query(self.oo_user1_impalad_client, "alter %s %s set owner user oo_user2"
         % (test_obj.obj_type, test_obj.obj_name), user="oo_user1",
+        delay_s=sentry_refresh_timeout_s,
         error_msg="does not have privileges with 'GRANT OPTION'")
 
+    # Use a delay to avoid cache consistency issue that could occur after alter.
     self.user_query(self.oo_user1_impalad_client, "drop %s %s " % (test_obj.obj_type,
-        test_obj.obj_name), user="oo_user1")
+        test_obj.obj_name), user="oo_user1", delay_s=sentry_refresh_timeout_s)
     assert self._validate_user_privilege_count(self.oo_user1_impalad_client,
         "show grant user oo_user1", "oo_user1", sentry_refresh_timeout_s, 0)


[5/5] impala git commit: IMPALA-7662: fix error race when scanner open fails

Posted by ta...@apache.org.
IMPALA-7662: fix error race when scanner open fails

This is very similar to IMPALA-7335, except happens
when 'progress_' is incremented in the call chain
HdfsScanNode::ProcessSplit
-> HdfsScanNodeBase::CreateAndOpenScanner()
-> HdfsScanner::Close()

The fix required restructuring the code so that
SetDoneInternal() is called with the error *before*
HdfsScanner::Close(). This required a refactoring because
HdfsScanNodeBase doesn't actually know about SetDoneInternal().

My fix is to put the common logic between HdfsScanNode and
HdfsScanNodeMt into a helper in HdfsScanNodeBase, then in
HdfsScanNode, make sure to call SetDoneInternal() before
closing the scanner.

I also reworked HdfsScanNode::ProcessSplit() to handle error propagation
internally. I think the joint responsibility between ProcessSplit() and
its caller for handling errors made things harder than necessary.

Testing:
Added a debug action and test that reproduced the race before the fix.

Change-Id: I45a61210ca7d057b048c77d9f2f2695ec450f19b
Reviewed-on: http://gerrit.cloudera.org:8080/11596
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/449fe73d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/449fe73d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/449fe73d

Branch: refs/heads/master
Commit: 449fe73d2145bd22f0f857623c3652a097f06d73
Parents: 2e5d658
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Oct 5 10:13:36 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Sat Oct 27 04:07:51 2018 +0000

----------------------------------------------------------------------
 be/src/exec/exec-node.cc                        |  7 ++--
 be/src/exec/hdfs-scan-node-base.cc              | 22 ++++--------
 be/src/exec/hdfs-scan-node-base.h               | 11 ++++--
 be/src/exec/hdfs-scan-node-mt.cc                | 13 ++++++-
 be/src/exec/hdfs-scan-node-mt.h                 |  6 ++++
 be/src/exec/hdfs-scan-node.cc                   | 36 ++++++++++++--------
 be/src/exec/hdfs-scan-node.h                    | 12 +++++--
 be/src/exec/hdfs-scanner.h                      |  2 ++
 common/thrift/PlanNodes.thrift                  |  5 +++
 .../parquet-error-propagation-race.test         | 13 +++++++
 tests/failure/test_failpoints.py                |  5 ++-
 tests/query_test/test_scanners.py               | 15 ++++++++
 12 files changed, 108 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index b9406dd..3a606f5 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -392,8 +392,7 @@ Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* s
     return Status::OK();
   } else if (debug_options_.action == TDebugAction::MEM_LIMIT_EXCEEDED) {
     return mem_tracker()->MemLimitExceeded(state, "Debug Action: MEM_LIMIT_EXCEEDED");
-  } else {
-    DCHECK_EQ(debug_options_.action, TDebugAction::SET_DENY_RESERVATION_PROBABILITY);
+  } else if (debug_options_.action == TDebugAction::SET_DENY_RESERVATION_PROBABILITY) {
     // We can only enable the debug action if the buffer pool client is registered.
     // If the buffer client is not registered at this point (e.g. if phase is PREPARE or
     // OPEN), then we will enable the debug action at the time when the client is
@@ -401,6 +400,10 @@ Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* s
     if (reservation_manager_.buffer_pool_client()->is_registered()) {
       RETURN_IF_ERROR(reservation_manager_.EnableDenyReservationDebugAction());
     }
+  } else {
+    DCHECK_EQ(debug_options_.action, TDebugAction::DELAY);
+    VLOG(1) << "DEBUG_ACTION: Sleeping";
+    SleepForMs(100);
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index b11dcdc..4a39124 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -624,9 +624,10 @@ void* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) {
   return it->second;
 }
 
-Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
+Status HdfsScanNodeBase::CreateAndOpenScannerHelper(HdfsPartitionDescriptor* partition,
     ScannerContext* context, scoped_ptr<HdfsScanner>* scanner) {
-  DCHECK(context != NULL);
+  DCHECK(context != nullptr);
+  DCHECK(scanner->get() == nullptr);
   THdfsCompression::type compression =
       context->GetStream()->file_desc()->file_compression;
 
@@ -663,19 +664,10 @@ Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition
       return Status(Substitute("Unknown Hdfs file format type: $0",
           partition->file_format()));
   }
-  DCHECK(scanner->get() != NULL);
-  Status status = ScanNodeDebugAction(TExecNodePhase::PREPARE_SCANNER);
-  if (status.ok()) {
-    status = scanner->get()->Open(context);
-    if (!status.ok()) {
-      scanner->get()->Close(nullptr);
-      scanner->reset();
-    }
-  } else {
-    context->ClearStreams();
-    scanner->reset();
-  }
-  return status;
+  DCHECK(scanner->get() != nullptr);
+  RETURN_IF_ERROR(scanner->get()->Open(context));
+  // Inject the error after the scanner is opened, to test the scanner close path.
+  return ScanNodeDebugAction(TExecNodePhase::PREPARE_SCANNER);
 }
 
 Tuple* HdfsScanNodeBase::InitTemplateTuple(const vector<ScalarExprEvaluator*>& evals,

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index b63d9c5..0a5a328 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -556,9 +556,14 @@ class HdfsScanNodeBase : public ScanNode {
   /// buffers.
   Status StartNextScanRange(int64_t* reservation, io::ScanRange** scan_range);
 
-  /// Create and open new scanner for this partition type.
-  /// If the scanner is successfully created and opened, it is returned in 'scanner'.
-  Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
+  /// Helper for the CreateAndOpenScanner() implementations in the subclass. Creates and
+  /// opens a new scanner for this partition type. Depending on the outcome, the
+  /// behaviour differs:
+  /// - If the scanner is successfully created and opened, returns OK and sets *scanner.
+  /// - If the scanner cannot be created, returns an error and does not set *scanner.
+  /// - If the scanner is created but opening fails, returns an error and sets *scanner.
+  ///   The caller is then responsible for closing the scanner.
+  Status CreateAndOpenScannerHelper(HdfsPartitionDescriptor* partition,
       ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner)
       WARN_UNUSED_RESULT;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index f3a2253..4e59e0a 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -27,8 +27,9 @@
 
 #include "gen-cpp/PlanNodes_types.h"
 
+#include "common/names.h"
+
 using namespace impala::io;
-using std::stringstream;
 
 namespace impala {
 
@@ -126,6 +127,16 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
   return Status::OK();
 }
 
+Status HdfsScanNodeMt::CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
+    ScannerContext* context, scoped_ptr<HdfsScanner>* scanner) {
+  Status status = CreateAndOpenScannerHelper(partition, context, scanner);
+  if (!status.ok() && scanner->get() != nullptr) {
+    scanner->get()->Close(nullptr);
+    scanner->reset();
+  }
+  return status;
+}
+
 void HdfsScanNodeMt::Close(RuntimeState* state) {
   if (is_closed()) return;
   if (scanner_.get() != nullptr) scanner_->Close(nullptr);

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node-mt.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h
index 78d7718..22c3d46 100644
--- a/be/src/exec/hdfs-scan-node-mt.h
+++ b/be/src/exec/hdfs-scan-node-mt.h
@@ -49,6 +49,12 @@ class HdfsScanNodeMt : public HdfsScanNodeBase {
   virtual bool HasRowBatchQueue() const override { return false; }
 
  private:
+  /// Create and open new scanner for this partition type.
+  /// If the scanner is successfully created and opened, it is returned in 'scanner'.
+  Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
+      ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner)
+      WARN_UNUSED_RESULT;
+
   /// Current scan range and corresponding scanner.
   io::ScanRange* scan_range_;
   boost::scoped_ptr<ScannerContext> scanner_ctx_;

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 6c01bf4..04b6589 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -338,6 +338,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
 
       // Abort the query. This is still holding the lock_, so done_ is known to be
       // false and status_ must be ok.
+      discard_result(ExecDebugAction(TExecNodePhase::SCANNER_ERROR, runtime_state_));
       SetDoneInternal(status);
       break;
     }
@@ -385,12 +386,6 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
     int num_unqueued_files = num_unqueued_files_.Load();
     ScanRange* scan_range;
     Status status = StartNextScanRange(&scanner_thread_reservation, &scan_range);
-    if (status.ok() && scan_range != nullptr) {
-      // Got a scan range. Process the range end to end (in this thread).
-      status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
-          &expr_results_pool, scan_range, &scanner_thread_reservation);
-    }
-
     if (!status.ok()) {
       unique_lock<mutex> l(lock_);
       // If there was already an error, the main thread will do the cleanup
@@ -401,6 +396,11 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
       SetDoneInternal(status);
       break;
     }
+    if (scan_range != nullptr) {
+      // Got a scan range. Process the range end to end (in this thread).
+      ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
+          &expr_results_pool, scan_range, &scanner_thread_reservation);
+    }
 
     // Done with range and it completed successfully
     if (progress_.done()) {
@@ -444,7 +444,7 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
   thread_state_.DecrementNumActive();
 }
 
-Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
+void HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     MemPool* expr_results_pool, ScanRange* scan_range,
     int64_t* scanner_thread_reservation) {
   DCHECK(scan_range != nullptr);
@@ -467,14 +467,14 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
       HdfsScanNodeBase::RangeComplete(partition->file_format(), desc->file_compression,
           true);
     }
-    return Status::OK();
+    return;
   }
 
   ScannerContext context(runtime_state_, this, buffer_pool_client(),
       *scanner_thread_reservation, partition, filter_ctxs, expr_results_pool);
   context.AddStream(scan_range, *scanner_thread_reservation);
   scoped_ptr<HdfsScanner> scanner;
-  Status status = CreateAndOpenScanner(partition, &context, &scanner);
+  Status status = CreateAndOpenScannerHelper(partition, &context, &scanner);
   if (!status.ok()) {
     // If preparation fails, avoid leaking unread buffers in the scan_range.
     scan_range->Cancel(status);
@@ -486,7 +486,12 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
       ss << endl << runtime_state_->ErrorLog();
       VLOG_QUERY << ss.str();
     }
-    return status;
+
+    // Ensure that the error is propagated before marking a range as complete (The
+    // scanner->Close() call marks a scan range as complete).
+    SetError(status);
+    if (scanner != nullptr) scanner->Close();
+    return;
   }
 
   status = scanner->ProcessSplit();
@@ -511,9 +516,7 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     // HdfsScanNodeBase::status_ variable and notify other scanner threads. Ensure that
     // status_ is updated before marking a range as complete (The scanner->Close() call
     // marks a scan range as complete).
-    unique_lock<mutex> l(lock_);
-    // Update the status_ and set the done_ flag if this is the first non-ok status.
-    SetDoneInternal(status);
+    SetError(status);
   }
   // Transfer remaining resources to a final batch and add it to the row batch queue and
   // decrement progress_ to indicate that the scan range is complete.
@@ -521,7 +524,6 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
   // Reservation may have been increased by the scanner, e.g. Parquet may allocate
   // additional reservation to scan columns.
   *scanner_thread_reservation = context.total_reservation();
-  return status;
 }
 
 void HdfsScanNode::SetDoneInternal(const Status& status) {
@@ -538,3 +540,9 @@ void HdfsScanNode::SetDone() {
   unique_lock<mutex> l(lock_);
   SetDoneInternal(status_);
 }
+
+void HdfsScanNode::SetError(const Status& status) {
+  discard_result(ExecDebugAction(TExecNodePhase::SCANNER_ERROR, runtime_state_));
+  unique_lock<mutex> l(lock_);
+  SetDoneInternal(status);
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 0e0f67c..853da31 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -162,10 +162,11 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows
   /// in this split. 'scanner_thread_reservation' is an in/out argument that tracks the
   /// total reservation from 'buffer_pool_client_' that is allotted for this thread's
-  /// use.
-  Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
+  /// use. If an error is encountered, calls SetDoneInternal() with the error to
+  /// initiate shutdown of the scan.
+  void ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
       MemPool* expr_results_pool, io::ScanRange* scan_range,
-      int64_t* scanner_thread_reservation) WARN_UNUSED_RESULT;
+      int64_t* scanner_thread_reservation);
 
   /// Called by scanner thread to return some or all of its reservation that is not
   /// needed. Always holds onto at least the minimum reservation to avoid violating the
@@ -185,6 +186,11 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// Gets lock_ and calls SetDoneInternal(status_). Usually used after the scan node
   /// completes execution successfully.
   void SetDone();
+
+  /// Gets lock_ and calls SetDoneInternal(status). Called after a scanner hits an
+  /// error. Must be called before HdfsScanner::Close() to ensure that 'status'
+  /// is propagated before the scan range is marked as complete by HdfsScanner::Close().
+  void SetError(const Status& status);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index d642313..9cbc99b 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -148,6 +148,8 @@ class HdfsScanner {
   /// and memory in mem pools to the given row batch. If the row batch is NULL,
   /// those resources are released instead. In any case, releases all other resources
   /// that are not backing returned rows (e.g. temporary decompression buffers).
+  /// Also marks any associated scan ranges as complete by calling RangeComplete() on the
+  /// scan node.
   /// This function is not idempotent and must only be called once.
   virtual void Close(RowBatch* row_batch) = 0;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 5d245fb..331e432 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -60,6 +60,9 @@ enum TExecNodePhase {
   GETNEXT,
   GETNEXT_SCANNER,
   CLOSE,
+  // After a scanner thread completes a range with an error but before it propagates the
+  // error.
+  SCANNER_ERROR,
   INVALID
 }
 
@@ -72,6 +75,8 @@ enum TDebugAction {
   // A floating point number in range [0.0, 1.0] that gives the probability of denying
   // each reservation increase request after the initial reservation.
   SET_DENY_RESERVATION_PROBABILITY,
+  // Delay for a short amount of time: 100ms
+  DELAY,
 }
 
 // Preference for replica selection

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/testdata/workloads/functional-query/queries/QueryTest/parquet-error-propagation-race.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-error-propagation-race.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-error-propagation-race.test
new file mode 100644
index 0000000..4104595
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-error-propagation-race.test
@@ -0,0 +1,13 @@
+====
+---- QUERY
+# Add a valid file with a single row to the table.
+INSERT INTO bad_magic_number SELECT 'good';
+---- RESULTS
+: 1
+====
+---- QUERY
+set debug_action="0:SCANNER_ERROR:DELAY";
+SELECT * FROM bad_magic_number
+---- CATCH
+invalid version number
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/tests/failure/test_failpoints.py
----------------------------------------------------------------------
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index d6cc1da..4e879a7 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -32,7 +32,10 @@ from tests.common.test_dimensions import create_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
 
 FAILPOINT_ACTIONS = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED']
-FAILPOINT_LOCATIONS = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'GETNEXT_SCANNER', 'CLOSE']
+# Not included:
+# - SCANNER_ERROR, because it only fires if the query already hit an error.
+FAILPOINT_LOCATIONS = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'GETNEXT_SCANNER',
+                       'CLOSE']
 # Map debug actions to their corresponding query options' values.
 FAILPOINT_ACTION_MAP = {'FAIL': 'FAIL', 'CANCEL': 'WAIT',
                         'MEM_LIMIT_EXCEEDED': 'MEM_LIMIT_EXCEEDED'}

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index b75c3cc..e8a24b8 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -722,6 +722,21 @@ class TestParquet(ImpalaTestSuite):
 
     self.run_test_case("QueryTest/parquet-type-widening", vector, unique_database)
 
+  def test_error_propagation_race(self, vector, unique_database):
+    """IMPALA-7662: failed scan signals completion before error is propagated. To
+    reproduce, we construct a table with two Parquet files, one valid and another
+    invalid. The scanner thread for the invalid file must propagate the error
+    before we mark the whole scan complete."""
+    if vector.get_value('exec_option')['debug_action'] is not None:
+      pytest.skip(".test file needs to override debug action")
+    create_table_and_copy_files(self.client,
+        "CREATE TABLE {db}.{tbl} (s STRING) STORED AS PARQUET",
+        unique_database, "bad_magic_number", ["testdata/data/bad_magic_number.parquet"])
+    # We need the ranges to all be scheduled on the same impalad.
+    vector.get_value('exec_option')['num_nodes'] = 1
+    self.run_test_case("QueryTest/parquet-error-propagation-race", vector,
+                       unique_database)
+
 # We use various scan range lengths to exercise corner cases in the HDFS scanner more
 # thoroughly. In particular, it will exercise:
 # 1. default scan range