You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2018/11/19 10:45:37 UTC

[30/33] impala git commit: IMPALA-7824: INVALIDATE METADATA should not hang when Sentry is unavailable

IMPALA-7824: INVALIDATE METADATA should not hang when Sentry is unavailable

Before this patch, running INVALIDATE METADATA when Sentry is
unavailable could cause Impala query to hang. PolicyReader thread in
SentryProxy is used by two use cases, one as a background thread
that periodically refreshes Sentry policy and another one as a
synchronous operation for INVALIDATE METADATA. For the background
thread, we need to swallow any exception thrown while refreshing the
Sentry policy in order to not kill the background thread. For a
synchronous reset operation, such as INVALIDATE METADATA, swallowing
an exception causes the Impala catalog to wait indefinitely for
authorization catalog objects that never get processed due to Sentry
being unavailable. The patch updates the code by not swallowing any
exception in INVALIDATE METADATA and return the exception to the
caller.

Testing:
- Ran all FE tests
- Added a new E2E test
- Ran all E2E authorization tests

Change-Id: Icff987a6184f62a338faadfdc1a0d349d912fc37
Reviewed-on: http://gerrit.cloudera.org:8080/11897
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/59f3f6b9
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/59f3f6b9
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/59f3f6b9

Branch: refs/heads/branch-3.1.0
Commit: 59f3f6b901581d2f6b5084fa739db2b5870f2af0
Parents: d07bc81
Author: Fredy Wijaya <fw...@cloudera.com>
Authored: Tue Nov 6 19:59:09 2018 -0800
Committer: Zoltan Borok-Nagy <bo...@cloudera.com>
Committed: Tue Nov 13 12:52:36 2018 +0100

----------------------------------------------------------------------
 .../common/SentryPolicyReaderException.java     | 35 ++++++++++++++++
 .../common/SentryUnavailableException.java      | 35 ++++++++++++++++
 .../org/apache/impala/util/SentryProxy.java     | 44 +++++++++++++++-----
 .../impala/testutil/ImpaladTestCatalog.java     |  5 ++-
 tests/authorization/test_authorization.py       | 32 ++++++++++++++
 tests/common/custom_cluster_test_suite.py       | 10 ++++-
 6 files changed, 147 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/59f3f6b9/fe/src/main/java/org/apache/impala/common/SentryPolicyReaderException.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/SentryPolicyReaderException.java b/fe/src/main/java/org/apache/impala/common/SentryPolicyReaderException.java
new file mode 100644
index 0000000..b3c27f8
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/common/SentryPolicyReaderException.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.common;
+
+/**
+ * Thrown as a generic exception when processing Sentry policy.
+ */
+public class SentryPolicyReaderException extends RuntimeException {
+  public SentryPolicyReaderException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+
+  public SentryPolicyReaderException(String msg) {
+    super(msg);
+  }
+
+  public SentryPolicyReaderException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/59f3f6b9/fe/src/main/java/org/apache/impala/common/SentryUnavailableException.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/SentryUnavailableException.java b/fe/src/main/java/org/apache/impala/common/SentryUnavailableException.java
new file mode 100644
index 0000000..2c32a44
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/common/SentryUnavailableException.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.common;
+
+/**
+ * Thrown when Sentry is not available.
+ */
+public class SentryUnavailableException extends RuntimeException {
+  public SentryUnavailableException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+
+  public SentryUnavailableException(String msg) {
+    super(msg);
+  }
+
+  public SentryUnavailableException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/59f3f6b9/fe/src/main/java/org/apache/impala/util/SentryProxy.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/SentryProxy.java b/fe/src/main/java/org/apache/impala/util/SentryProxy.java
index be2f7f3..2470bc3 100644
--- a/fe/src/main/java/org/apache/impala/util/SentryProxy.java
+++ b/fe/src/main/java/org/apache/impala/util/SentryProxy.java
@@ -32,6 +32,8 @@ import org.apache.impala.catalog.Principal;
 import org.apache.impala.catalog.PrincipalPrivilege;
 import org.apache.impala.catalog.Role;
 import org.apache.impala.common.Reference;
+import org.apache.impala.common.SentryPolicyReaderException;
+import org.apache.impala.common.SentryUnavailableException;
 import org.apache.impala.thrift.TPrincipalType;
 import org.apache.log4j.Logger;
 import org.apache.sentry.api.service.thrift.TSentryGroup;
@@ -48,8 +50,10 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.sentry.core.common.exception.SentryUserException;
 import org.apache.sentry.service.common.SentryOwnerPrivilegeType;
 import org.apache.sentry.service.common.ServiceConstants;
+import org.apache.thrift.transport.TTransportException;
 
 /**
  * Thread safe class that acts as a link between the Sentry Service and the Catalog
@@ -105,9 +109,11 @@ public class SentryProxy {
     } else {
       objectOwnershipConfigValue_ = SentryOwnerPrivilegeType.NONE.toString();
     }
-    policyReader_.scheduleAtFixedRate(new PolicyReader(false), 0,
-        BackendConfig.INSTANCE.getSentryCatalogPollingFrequency(),
-        TimeUnit.SECONDS);
+    // We configure the PolicyReader to swallow any exception because we do not want to
+    // kill the PolicyReader background thread on exception.
+    policyReader_.scheduleAtFixedRate(new PolicyReader(/*reset versions*/ false,
+        /*swallow exception*/ true), 0,
+        BackendConfig.INSTANCE.getSentryCatalogPollingFrequency(), TimeUnit.SECONDS);
   }
 
   /**
@@ -125,10 +131,13 @@ public class SentryProxy {
    * atomically.
    */
   private class PolicyReader implements Runnable {
-    private boolean resetVersions_;
+    private final boolean resetVersions_;
+    // A flag to indicate whether or not to swallow any exception thrown.
+    private final boolean swallowException_;
 
-    public PolicyReader(boolean resetVersions) {
+    public PolicyReader(boolean resetVersions, boolean swallowException) {
       resetVersions_ = resetVersions;
+      swallowException_ = swallowException;
     }
 
     public void run() {
@@ -141,7 +150,17 @@ public class SentryProxy {
           usersToRemove = refreshUserPrivileges();
         } catch (Exception e) {
           LOG.error("Error refreshing Sentry policy: ", e);
-          return;
+          if (swallowException_) return;
+          // We need to differentiate between Sentry not available exception and
+          // any other exceptions.
+          if (e.getCause() != null && e.getCause() instanceof SentryUserException) {
+            Throwable sentryException = e.getCause();
+            if (sentryException.getCause() != null &&
+                sentryException.getCause() instanceof TTransportException) {
+              throw new SentryUnavailableException(e);
+            }
+          }
+          throw new SentryPolicyReaderException(e);
         } finally {
           LOG.debug("Refreshing Sentry policy took " +
               (System.currentTimeMillis() - startTime) + "ms");
@@ -527,16 +546,21 @@ public class SentryProxy {
   }
 
   /**
-   * Perfoms a synchronous refresh of all authorization policy metadata and updates
+   * Performs a synchronous refresh of all authorization policy metadata and updates
    * the Catalog with any changes. Throws an ImpalaRuntimeException if there are any
    * errors executing the refresh job.
    */
   public void refresh(boolean resetVersions) throws ImpalaRuntimeException {
     try {
-      policyReader_.submit(new PolicyReader(resetVersions)).get();
+      // Since this is a synchronous refresh, any exception thrown while running the
+      // refresh should be thrown here instead of silently swallowing it.
+      policyReader_.submit(new PolicyReader(resetVersions, /*swallow exception*/ false))
+          .get();
     } catch (Exception e) {
-      // We shouldn't make it here. It means an exception leaked from the
-      // AuthorizationPolicyReader.
+      if (e.getCause() != null && e.getCause() instanceof SentryUnavailableException) {
+        throw new ImpalaRuntimeException("Error refreshing authorization policy. " +
+            "Sentry is unavailable. Ensure Sentry is up: ", e);
+      }
       throw new ImpalaRuntimeException("Error refreshing authorization policy, " +
           "current policy state may be inconsistent. Running 'invalidate metadata' " +
           "may resolve this problem: ", e);

http://git-wip-us.apache.org/repos/asf/impala/blob/59f3f6b9/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
index 3186113..0734ee0 100644
--- a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
@@ -55,8 +55,9 @@ public class ImpaladTestCatalog extends ImpaladCatalog {
    */
   public ImpaladTestCatalog(AuthorizationConfig authzConfig) {
     super("127.0.0.1");
-    CatalogServiceCatalog catalogServerCatalog =
-        CatalogServiceTestCatalog.createWithAuth(authzConfig.getSentryConfig());
+    CatalogServiceCatalog catalogServerCatalog = authzConfig.isEnabled() ?
+        CatalogServiceTestCatalog.createWithAuth(authzConfig.getSentryConfig()) :
+        CatalogServiceTestCatalog.create();
     // Bootstrap the catalog by adding all dbs, tables, and functions.
     for (FeDb db: catalogServerCatalog.getDbs(PatternMatcher.MATCHER_MATCH_ALL)) {
       // Adding DB should include all tables/fns in that database.

http://git-wip-us.apache.org/repos/asf/impala/blob/59f3f6b9/tests/authorization/test_authorization.py
----------------------------------------------------------------------
diff --git a/tests/authorization/test_authorization.py b/tests/authorization/test_authorization.py
index 9da43d3..a036680 100644
--- a/tests/authorization/test_authorization.py
+++ b/tests/authorization/test_authorization.py
@@ -478,3 +478,35 @@ class TestAuthorization(CustomClusterTestSuite):
         assert "CatalogException" in obj_dump
     finally:
       self.role_cleanup(unique_role)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--server_name=server1 --sentry_config=%s" % SENTRY_CONFIG_FILE,
+    catalogd_args="--sentry_config=%s --sentry_catalog_polling_frequency_s=3600" %
+                  SENTRY_CONFIG_FILE,
+    impala_log_dir=tempfile.mkdtemp(prefix="test_invalidate_metadata_sentry_unavailable_",
+                                    dir=os.getenv("LOG_DIR")))
+  def test_invalidate_metadata_sentry_unavailable(self, unique_role):
+    """IMPALA-7824: Tests that running INVALIDATE METADATA when Sentry is unavailable
+    should not cause Impala to hang."""
+    self.role_cleanup(unique_role)
+    try:
+      group_name = grp.getgrnam(getuser()).gr_name
+      self.client.execute("create role %s" % unique_role)
+      self.client.execute("grant all on server to role %s" % unique_role)
+      self.client.execute("grant role %s to group `%s`" % (unique_role, group_name))
+
+      self._stop_sentry_service()
+      # Calling INVALIDATE METADATA when Sentry is unavailable should return an error.
+      result = self.execute_query_expect_failure(self.client, "invalidate metadata")
+      result_str = str(result)
+      assert "MESSAGE: CatalogException: Error updating authorization policy:" \
+             in result_str
+      assert "CAUSED BY: ImpalaRuntimeException: Error refreshing authorization policy." \
+             " Sentry is unavailable. Ensure Sentry is up:" in result_str
+
+      self._start_sentry_service(SENTRY_CONFIG_FILE)
+      # Calling INVALIDATE METADATA after Sentry is up should not return an error.
+      self.execute_query_expect_success(self.client, "invalidate metadata")
+    finally:
+      self.role_cleanup(unique_role)

http://git-wip-us.apache.org/repos/asf/impala/blob/59f3f6b9/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 0140303..90cd4e9 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -164,11 +164,17 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     sentry_env['SENTRY_SERVICE_CONFIG'] = sentry_service_config
     call = subprocess.Popen(
         ['/bin/bash', '-c', os.path.join(IMPALA_HOME,
-        'testdata/bin/run-sentry-service.sh')],
+                                         'testdata/bin/run-sentry-service.sh')],
         env=sentry_env)
     call.wait()
     if call.returncode != 0:
-      raise RuntimeError("unable to start sentry")
+      raise RuntimeError("Unable to start Sentry")
+
+  @classmethod
+  def _stop_sentry_service(cls):
+    subprocess.check_call([os.path.join(os.environ["IMPALA_HOME"],
+                                        "testdata/bin/kill-sentry-service.sh")],
+                          close_fds=True)
 
   @classmethod
   def _start_impala_cluster(cls, options, impala_log_dir=os.getenv('LOG_DIR', "/tmp/"),