You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ha...@apache.org on 2017/01/27 23:20:52 UTC

[5/7] sentry git commit: SENTRY-1612

SENTRY-1612

Change-Id: If429dee0836cc3ceff9ff276aec670c631fc27d1


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/813d10e8
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/813d10e8
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/813d10e8

Branch: refs/heads/sentry-ha-redesign-1
Commit: 813d10e86aeb35fb6c89c983761c194cb6b0ac7b
Parents: dbd5870
Author: hahao <ha...@cloudera.com>
Authored: Thu Jan 26 17:54:19 2017 -0800
Committer: hahao <ha...@cloudera.com>
Committed: Thu Jan 26 17:54:19 2017 -0800

----------------------------------------------------------------------
 .../sentry/hdfs/FullUpdateInitializer.java      | 33 +++++++++---
 .../sentry/hdfs/TestFullUpdateInitializer.java  | 29 ++++------
 .../db/service/persistent/SentryStore.java      | 57 ++++++++++++++++++--
 .../sentry/service/thrift/HMSFollower.java      | 42 +++++++++++----
 4 files changed, 121 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/813d10e8/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
index a1f970b..c990c53 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
@@ -19,6 +19,7 @@ package org.apache.sentry.hdfs;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -30,8 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -296,9 +296,7 @@ public class FullUpdateInitializer implements Closeable {
         ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT);
   }
 
-  public UpdateableAuthzPaths createInitialUpdate() throws Exception {
-    UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(new
-    String[]{"/"});
+  public Map<String, Set<String>> createInitialUpdate() throws Exception {
     PathsUpdate tempUpdate = new PathsUpdate(-1, false);
     List<String> allDbStr = client.getAllDatabases();
     for (String dbName : allDbStr) {
@@ -321,11 +319,32 @@ public class FullUpdateInitializer implements Closeable {
       }
     }
 
-    authzPaths.updatePartial(Lists.newArrayList(tempUpdate), new ReentrantReadWriteLock());
-    return authzPaths;
+    return getAuthzObjToPathMapping(tempUpdate);
   }
 
 
+  /**
+   * Parsing pathsUpdate to get the mapping of hiveObj -> [Paths].
+   * Only processing AddPaths, since in {@link FullUpdateInitializer} only
+   * adds paths when fetching full HMS Paths snapshot.
+   *
+   * @return mapping of hiveObj -> [Paths].
+   */
+  private Map<String, Set<String>> getAuthzObjToPathMapping(PathsUpdate pathsUpdate) {
+    Map<String, Set<String>> authzObjToPath = new HashMap<>();
+    for (TPathChanges pathChanges : pathsUpdate.getPathChanges()) {
+      // Only processing AddPaths
+      List<List<String>> addPaths = pathChanges.getAddPaths();
+      Set<String> paths = Sets.newHashSet();
+      for (List<String> addPath : addPaths) {
+        paths.add(PathsUpdate.getPath(addPath));
+      }
+      authzObjToPath.put(pathChanges.getAuthzObj(), paths);
+    }
+
+    return authzObjToPath;
+  }
+
   @Override
   public void close() throws IOException {
     if (threadPool != null) {

http://git-wip-us.apache.org/repos/asf/sentry/blob/813d10e8/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
index 0bb6f66..792b847 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
@@ -18,6 +18,7 @@
 package org.apache.sentry.hdfs;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -28,9 +29,7 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
+import java.util.*;
 
 public class TestFullUpdateInitializer {
 
@@ -107,22 +106,14 @@ public class TestFullUpdateInitializer {
 
     FullUpdateInitializer cacheInitializer = new
     FullUpdateInitializer(client, conf);
-    UpdateableAuthzPaths update = cacheInitializer.createInitialUpdate();
-
-    Assert.assertEquals(new HashSet<String>(Arrays.asList("db1")), update.findAuthzObjectExactMatches(new
-    String[]{"db1"}));
-    Assert.assertEquals(new HashSet<String>(Arrays.asList("db2")), update.findAuthzObjectExactMatches(new
-    String[]{"db2"}));
-    Assert.assertEquals(new HashSet<String>(Arrays.asList("db2.tab21")), update.findAuthzObjectExactMatches(new
-    String[]{"db2", "tab21"}));
-    Assert.assertEquals(new HashSet<String>(Arrays.asList("db3")), update.findAuthzObjectExactMatches(new
-    String[]{"db3"}));
-    Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new
-    String[]{"db3", "tab31"}));
-    Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new
-    String[]{"db3", "tab31", "part311"}));
-    Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new
-    String[]{"db3", "tab31", "part312"}));
+    Map<String, Set<String>> update = cacheInitializer.createInitialUpdate();
+
+    Assert.assertEquals(update.get("db1"), Sets.newHashSet("db1"));
+    Assert.assertEquals(update.get("db2"), Sets.newHashSet("db2"));
+    Assert.assertEquals(update.get("db2.tab21"), Sets.newHashSet("db2/tab21"));
+    Assert.assertEquals(update.get("db3.tab31"), Sets.newHashSet("db3/tab31",
+        "db3/tab31/part311", "db3/tab31/part312"));
+
     cacheInitializer.close();
 
   }

http://git-wip-us.apache.org/repos/asf/sentry/blob/813d10e8/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index 46e23c6..3536579 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -49,7 +49,6 @@ import org.apache.sentry.core.common.exception.*;
 import org.apache.sentry.core.common.utils.SentryConstants;
 import org.apache.sentry.core.model.db.AccessConstants;
 import org.apache.sentry.core.model.db.DBModelAuthorizable.AuthorizableType;
-import org.apache.sentry.hdfs.Updateable;
 import org.apache.sentry.provider.db.service.model.*;
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessor;
 import org.apache.sentry.provider.db.service.thrift.TSentryActiveRoleSet;
@@ -105,7 +104,7 @@ public class SentryStore {
   // is starting from 1.
   public static final long INIT_CHANGE_ID = 1L;
 
-  private static final long EMPTY_CHANGE_ID = 0L;
+  public static final long EMPTY_CHANGE_ID = 0L;
 
   // For counters, representation of the "unknown value"
   private static final long COUNT_VALUE_UNKNOWN = -1;
@@ -2300,6 +2299,24 @@ public class SentryStore {
     return retVal;
   }
 
+  /**
+   * Persist a full hive snapshot into Sentry DB in a single transaction.
+   *
+   * @param authzPaths Mapping of hiveObj -> [Paths]
+   * @throws Exception
+   */
+  public void persistFullPathsImage(final Map<String, Set<String>> authzPaths) throws Exception {
+    tm.executeTransactionWithRetry(
+      new TransactionBlock() {
+        public Object execute(PersistenceManager pm) throws Exception {
+          for (Map.Entry<String, Set<String>> authzPath : authzPaths.entrySet()) {
+            createAuthzPathsMappingCore(pm, authzPath.getKey(), authzPath.getValue());
+          }
+          return null;
+        }
+      });
+  }
+
   public void createAuthzPathsMapping(final String hiveObj,
       final Set<String> paths) throws Exception {
     tm.executeTransactionWithRetry(
@@ -3339,10 +3356,10 @@ public class SentryStore {
   }
 
   /**
-   * Get the MSentryPermChange object by ChangeID. Internally invoke
+   * Get the last processed perm change ID. Internally invoke
    * getLastProcessedPermChangeIDCore().
    *
-   * @return MSentryPermChange
+   * @return the change id of last processed MSentryPermChange.
    */
   @VisibleForTesting
   long getLastProcessedPermChangeID() throws Exception {
@@ -3355,6 +3372,38 @@ public class SentryStore {
   }
 
   /**
+   * Get the last processed path change ID.
+   *
+   * @param pm the PersistenceManager
+   * @return the last processed path changedID
+   */
+  private long getLastProcessedPathChangeIDCore(PersistenceManager pm) {
+    Query query = pm.newQuery(MSentryPathChange.class);
+    query.setResult("max(this.changeID)");
+    Long changeID = (Long) query.execute();
+    if (changeID == null) {
+      return EMPTY_CHANGE_ID;
+    } else {
+      return changeID;
+    }
+  }
+
+  /**
+   * Get the last processed path change ID. Internally invoke
+   * getLastProcessedPathChangeIDCore().
+   *
+   * @return the change id of last processed MSentryPathChange.
+   */
+  public long getLastProcessedPathChangeID() throws Exception {
+    return tm.executeTransaction(
+    new TransactionBlock<Long>() {
+      public Long execute(PersistenceManager pm) throws Exception {
+        return getLastProcessedPathChangeIDCore(pm);
+      }
+    });
+  }
+
+  /**
    * Get the MSentryPermChange object by ChangeID.
    *
    * @param changeID the given changeID.

http://git-wip-us.apache.org/repos/asf/sentry/blob/813d10e8/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index 59646b6..ad6bdda 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hive.hcatalog.messaging.HCatEventMessage;
 import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
 import org.apache.sentry.core.common.exception.*;
+import org.apache.sentry.hdfs.PathsUpdate;
 import org.apache.sentry.hdfs.PermissionsUpdate;
 import org.apache.sentry.hdfs.UpdateableAuthzPaths;
 import org.apache.sentry.hdfs.FullUpdateInitializer;
@@ -49,6 +50,8 @@ import java.io.File;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE;
 import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE;
@@ -74,7 +77,6 @@ public class HMSFollower implements Runnable {
   private static final int maxRetriesForLogin = 3;
   private static final int maxRetriesForConnection = 3;
 
-  private volatile UpdateableAuthzPaths authzPaths;
   private boolean needHiveSnapshot = true;
   private final LeaderStatusMonitor leaderMonitor;
 
@@ -84,8 +86,11 @@ public class HMSFollower implements Runnable {
     authzConf = conf;
     this.leaderMonitor = leaderMonitor;
     sentryStore = new SentryStore(authzConf);
-    //TODO: Initialize currentEventID from Sentry db
-    currentEventID = 0;
+
+    // Initialize currentEventID from Sentry db. If currentEventID
+    // is empty, need to retrieve a hive snapshot, otherwise not.
+    currentEventID = getStoredCurrentID();
+    needHiveSnapshot = (currentEventID == SentryStore.EMPTY_CHANGE_ID);
   }
 
   @VisibleForTesting
@@ -227,12 +232,13 @@ public class HMSFollower implements Runnable {
 
         CurrentNotificationEventId eventIDBefore = null;
         CurrentNotificationEventId eventIDAfter = null;
+        Map<String, Set<String>> pathsFullSnapshot = null;
 
         try {
           eventIDBefore = client.getCurrentNotificationEventId();
           LOGGER.info(String.format("Before fetching hive full snapshot, Current NotificationID = %s.",
               eventIDBefore));
-          fetchFullUpdate();
+          pathsFullSnapshot = fetchFullUpdate();
           eventIDAfter = client.getCurrentNotificationEventId();
           LOGGER.info(String.format("After fetching hive full snapshot, Current NotificationID = %s.",
               eventIDAfter));
@@ -252,6 +258,7 @@ public class HMSFollower implements Runnable {
             eventIDAfter));
         needHiveSnapshot = false;
         currentEventID = eventIDAfter.getEventId();
+        sentryStore.persistFullPathsImage(pathsFullSnapshot);
       }
 
       NotificationEventResponse response = client.getNextNotification(currentEventID, Integer.MAX_VALUE, null);
@@ -261,25 +268,30 @@ public class HMSFollower implements Runnable {
         processNotificationEvents(response.getEvents());
       }
     } catch (TException e) {
-      LOGGER.error("ThriftException occured fetching Notification entries, will try");
+      LOGGER.error("ThriftException occurred fetching Notification entries, will try");
       e.printStackTrace();
     } catch (SentryInvalidInputException|SentryInvalidHMSEventException e) {
       throw new RuntimeException(e);
+    } catch (Exception e) {
+      LOGGER.error("Exception occurred persisting Hive full snapshot into DB");
+      e.printStackTrace();
     }
   }
 
   /**
-   * Retrieve HMS full snapshot.
+   * Retrieve a HMS full snapshot.
+   *
+   * @return UpdateableAuthzPaths
+   * @throws Exception
    */
-  private void fetchFullUpdate() throws Exception {
+  private  Map<String, Set<String>> fetchFullUpdate() throws Exception {
     FullUpdateInitializer updateInitializer = null;
 
     try {
       updateInitializer = new FullUpdateInitializer(client, authzConf);
-      // TODO - do we need to save returned authz path?
-      updateInitializer.createInitialUpdate();
-      // TODO: notify HDFS plugin
+      Map<String, Set<String>> pathsUpdate = updateInitializer.createInitialUpdate();
       LOGGER.info("#### Hive full update initialization complete !!");
+      return pathsUpdate;
     } finally {
       if (updateInitializer != null) {
         try {
@@ -291,6 +303,16 @@ public class HMSFollower implements Runnable {
     }
   }
 
+  /**
+   * Get current eventID from Sentry DB.
+   *
+   * @return the stored currentID
+   * @throws Exception
+   */
+  private long getStoredCurrentID() throws Exception {
+    return sentryStore.getLastProcessedPathChangeID();
+  }
+
   private boolean syncWithPolicyStore(HiveAuthzConf.AuthzConfVars syncConfVar) {
     return "true"
         .equalsIgnoreCase((authzConf.get(syncConfVar.getVar(), "true")));