You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ha...@apache.org on 2017/03/02 02:03:09 UTC

sentry git commit: SENTRY-1612: HMSFollower should persist full HMS snapshot into SentryDB if there is not one. (Hao Hao, Reviewed by: Alexander Kolbasov)

Repository: sentry
Updated Branches:
  refs/heads/sentry-ha-redesign 7aac09ec6 -> 1c6ba5ebe


SENTRY-1612: HMSFollower should persist full HMS snapshot into SentryDB if there is not one. (Hao Hao, Reviewed by: Alexander Kolbasov)

Change-Id: I375ea19277fa3092f1825476f1670652e4c981c4


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/1c6ba5eb
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/1c6ba5eb
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/1c6ba5eb

Branch: refs/heads/sentry-ha-redesign
Commit: 1c6ba5ebe4a4374b2472633dd21435eec9933d6f
Parents: 7aac09e
Author: hahao <ha...@cloudera.com>
Authored: Wed Mar 1 18:01:50 2017 -0800
Committer: hahao <ha...@cloudera.com>
Committed: Wed Mar 1 18:01:50 2017 -0800

----------------------------------------------------------------------
 .../sentry/hdfs/FullUpdateInitializer.java      | 44 ++++++++++++++++----
 .../org/apache/sentry/hdfs/PathsUpdate.java     | 24 +++++++++++
 .../sentry/hdfs/TestFullUpdateInitializer.java  | 31 ++++++--------
 .../db/service/model/MSentryPathChange.java     |  4 ++
 .../db/service/persistent/SentryStore.java      | 44 +++++++++++++++++++-
 .../sentry/service/thrift/HMSFollower.java      | 40 +++++++++++++-----
 .../db/service/persistent/TestSentryStore.java  | 14 ++++---
 7 files changed, 157 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
index f95dd94..146cea2 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
@@ -18,7 +18,7 @@
 package org.apache.sentry.hdfs;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -33,9 +33,11 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * FullUpdateInitializer is for fetching hive full update,
@@ -297,9 +299,8 @@ public class FullUpdateInitializer implements Closeable {
         ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT);
   }
 
-  public UpdateableAuthzPaths createInitialUpdate() throws ExecutionException, InterruptedException, TException {
-    UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(new
-    String[]{"/"});
+  public Map<String, Set<String>> createInitialUpdate() throws ExecutionException,
+        InterruptedException, TException {
     PathsUpdate tempUpdate = new PathsUpdate(-1, false);
     List<String> allDbStr = client.getAllDatabases();
     for (String dbName : allDbStr) {
@@ -322,15 +323,42 @@ public class FullUpdateInitializer implements Closeable {
       }
     }
 
-    authzPaths.updatePartial(Lists.newArrayList(tempUpdate), new ReentrantReadWriteLock());
-    return authzPaths;
+    return getAuthzObjToPathMapping(tempUpdate);
   }
 
 
+  /**
+   * Parsing a pathsUpdate to get the mapping of hiveObj -> [Paths].
+   * It only processes {@link TPathChanges}.addPaths, since
+   * {@link FullUpdateInitializer} only add paths when fetching
+   * full HMS Paths snapshot. Each path represented as path tree
+   * concatenated by "/". e.g 'usr/hive/warehouse'.
+   *
+   * @return mapping of hiveObj -> [Paths].
+   */
+  private Map<String, Set<String>> getAuthzObjToPathMapping(PathsUpdate pathsUpdate) {
+    Map<String, Set<String>> authzObjToPath = new HashMap<>();
+    List<TPathChanges> tPathChanges = pathsUpdate.getPathChanges();
+
+    if (!tPathChanges.isEmpty()) {
+      for (TPathChanges pathChanges : tPathChanges) {
+        // Only processes TPathChanges.addPaths
+        List<List<String>> addPaths = pathChanges.getAddPaths();
+        Set<String> paths = Sets.newHashSet();
+        for (List<String> addPath : addPaths) {
+          paths.add(PathsUpdate.cancatePath(addPath));
+        }
+        authzObjToPath.put(pathChanges.getAuthzObj(), paths);
+      }
+    }
+
+    return authzObjToPath;
+  }
+
   @Override
   public void close() throws IOException {
     if (threadPool != null) {
       threadPool.shutdownNow();
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java
index ffb0756..992c8b7 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java
@@ -24,6 +24,8 @@ import java.util.LinkedList;
 import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
 import org.apache.sentry.hdfs.service.thrift.TPathChanges;
 import org.apache.sentry.hdfs.service.thrift.TPathsUpdate;
 import org.apache.commons.httpclient.util.URIUtil;
@@ -153,6 +155,28 @@ public class PathsUpdate implements Updateable.Update {
     }
   }
 
+  /**
+   * Given a path tree in a list, return a string concatenated by "/".
+   * e.g &lt usr, hive, warehouse &gt -> 'usr/hive/warehouse'.
+   *
+   * @param paths
+   * @return a path string concatenated by "/".
+   */
+  public static String cancatePath(Iterable<String> paths) {
+    return Joiner.on("/").join(paths);
+  }
+
+  /**
+   * Split a path a path concatenated by "/" into a path tree represented
+   * as a list.
+   *
+   * @param path
+   * @return a path tree represented as a list.
+   */
+  public static List<String> splitPath(String path) {
+    return Lists.newArrayList(Splitter.on("/").split(path));
+  }
+
   @Override
   public byte[] serialize() throws IOException {
     return ThriftSerializer.serialize(tPathsUpdate);

http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
index 0bb6f66..f338ce8 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
@@ -18,6 +18,7 @@
 package org.apache.sentry.hdfs;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -29,8 +30,8 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 public class TestFullUpdateInitializer {
 
@@ -107,22 +108,14 @@ public class TestFullUpdateInitializer {
 
     FullUpdateInitializer cacheInitializer = new
     FullUpdateInitializer(client, conf);
-    UpdateableAuthzPaths update = cacheInitializer.createInitialUpdate();
-
-    Assert.assertEquals(new HashSet<String>(Arrays.asList("db1")), update.findAuthzObjectExactMatches(new
-    String[]{"db1"}));
-    Assert.assertEquals(new HashSet<String>(Arrays.asList("db2")), update.findAuthzObjectExactMatches(new
-    String[]{"db2"}));
-    Assert.assertEquals(new HashSet<String>(Arrays.asList("db2.tab21")), update.findAuthzObjectExactMatches(new
-    String[]{"db2", "tab21"}));
-    Assert.assertEquals(new HashSet<String>(Arrays.asList("db3")), update.findAuthzObjectExactMatches(new
-    String[]{"db3"}));
-    Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new
-    String[]{"db3", "tab31"}));
-    Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new
-    String[]{"db3", "tab31", "part311"}));
-    Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new
-    String[]{"db3", "tab31", "part312"}));
+    Map<String, Set<String>> update = cacheInitializer.createInitialUpdate();
+
+    Assert.assertEquals(update.get("db1"), Sets.newHashSet("db1"));
+    Assert.assertEquals(update.get("db2"), Sets.newHashSet("db2"));
+    Assert.assertEquals(update.get("db2.tab21"), Sets.newHashSet("db2/tab21"));
+    Assert.assertEquals(update.get("db3.tab31"), Sets.newHashSet("db3/tab31",
+        "db3/tab31/part311", "db3/tab31/part312"));
+
     cacheInitializer.close();
 
   }
@@ -167,4 +160,4 @@ public class TestFullUpdateInitializer {
       Assert.assertTrue(e instanceof RuntimeException);
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
index 0ca7fe2..a0d3445 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
@@ -89,6 +89,10 @@ public class MSentryPathChange implements MSentryChange {
     return changeID;
   }
 
+  public long getNotificationID() {
+    return notificationID;
+  }
+
   @Override
   public String toString() {
     return "MSentryChange [changeID=" + changeID + " , notificationID= "

http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index b9272bc..c1186ba 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -121,7 +121,7 @@ public class SentryStore {
   // is starting from 1.
   public static final long INIT_CHANGE_ID = 1L;
 
-  private static final long EMPTY_CHANGE_ID = 0L;
+  public static final long EMPTY_CHANGE_ID = 0L;
 
   // For counters, representation of the "unknown value"
   private static final long COUNT_VALUE_UNKNOWN = -1;
@@ -2329,6 +2329,24 @@ public class SentryStore {
     return result;
   }
 
+  /**
+   * Persist a full hive snapshot into Sentry DB in a single transaction.
+   *
+   * @param authzPaths Mapping of hiveObj -> [Paths]
+   * @throws Exception
+   */
+  public void persistFullPathsImage(final Map<String, Set<String>> authzPaths) throws Exception {
+    tm.executeTransactionWithRetry(
+      new TransactionBlock() {
+        public Object execute(PersistenceManager pm) throws Exception {
+          for (Map.Entry<String, Set<String>> authzPath : authzPaths.entrySet()) {
+            createAuthzPathsMappingCore(pm, authzPath.getKey(), authzPath.getValue());
+          }
+          return null;
+        }
+      });
+  }
+
   public void createAuthzPathsMapping(final String hiveObj,
       final Set<String> paths) throws Exception {
     tm.executeTransactionWithRetry(
@@ -3063,7 +3081,8 @@ public class SentryStore {
    * @param pm the PersistenceManager
    * @param changeCls the class of a delta c
    *
-   * @return the last processed changedID for the delta changes.
+   * @return the last processed changedID for the delta changes. If no
+   *         change found then return 0.
    */
   private <T extends MSentryChange> long getLastProcessedChangeIDCore(
       PersistenceManager pm, Class<T> changeCls) {
@@ -3091,6 +3110,27 @@ public class SentryStore {
   }
 
   /**
+   * Get the notification ID of last processed path delta change.
+   *
+   * @return the notification ID of latest path change. If no change
+   *         found then return 0.
+   */
+  public long getLastProcessedNotificationID() throws Exception {
+    return tm.executeTransaction(
+    new TransactionBlock<Long>() {
+      public Long execute(PersistenceManager pm) throws Exception {
+        long changeID = getLastProcessedChangeIDCore(pm, MSentryPathChange.class);
+        if (changeID == EMPTY_CHANGE_ID) {
+          return EMPTY_CHANGE_ID;
+        } else {
+          MSentryPathChange mSentryPathChange = getMSentryPathChangeByID(changeID);
+          return mSentryPathChange.getNotificationID();
+        }
+      }
+    });
+  }
+
+  /**
    * Get the MSentryPermChange object by ChangeID.
    *
    * @param changeID the given changeID.

http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index bdbb0cc..8b07f5b 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hive.hcatalog.messaging.HCatEventMessage;
 import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
 import org.apache.sentry.core.common.exception.*;
+import org.apache.sentry.hdfs.PathsUpdate;
 import org.apache.sentry.hdfs.PermissionsUpdate;
 import org.apache.sentry.hdfs.UpdateableAuthzPaths;
 import org.apache.sentry.hdfs.FullUpdateInitializer;
@@ -52,6 +53,8 @@ import java.net.SocketException;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE;
@@ -77,7 +80,6 @@ public class HMSFollower implements Runnable {
   private final SentryStore sentryStore;
   private String hiveInstance;
 
-  private volatile UpdateableAuthzPaths authzPaths;
   private boolean needHiveSnapshot = true;
   private final LeaderStatusMonitor leaderMonitor;
 
@@ -87,8 +89,11 @@ public class HMSFollower implements Runnable {
     authzConf = conf;
     this.leaderMonitor = leaderMonitor;
     sentryStore = store;
-    //TODO: Initialize currentEventID from Sentry db
-    currentEventID = 0;
+
+    // Initialize currentEventID based on the latest persisted notification ID.
+    // If currentEventID is empty, need to retrieve a full hive snapshot,
+    currentEventID = getLastProcessedNotificationID();
+    needHiveSnapshot = (currentEventID == SentryStore.EMPTY_CHANGE_ID);
   }
 
   @VisibleForTesting
@@ -222,13 +227,14 @@ public class HMSFollower implements Runnable {
         // will be dropped. A new attempts will be made after 500 milliseconds when
         // HMSFollower run again.
 
+        Map<String, Set<String>> pathsFullSnapshot;
         CurrentNotificationEventId eventIDBefore = client.getCurrentNotificationEventId();
         LOGGER.info(String.format("Before fetching hive full snapshot, Current NotificationID = %s.", eventIDBefore));
 
         try {
-          fetchFullUpdate();
+          pathsFullSnapshot = fetchFullUpdate();
         } catch (ExecutionException | InterruptedException ex) {
-          LOGGER.error("#### Encountered failure during fetching one hive full snapshot !!", ex);
+          LOGGER.error("#### Encountered failure during fetching hive full snapshot !!", ex);
           return;
         }
 
@@ -245,6 +251,7 @@ public class HMSFollower implements Runnable {
             eventIDAfter));
         needHiveSnapshot = false;
         currentEventID = eventIDAfter.getEventId();
+        sentryStore.persistFullPathsImage(pathsFullSnapshot);
       }
 
       NotificationEventResponse response = client.getNextNotification(currentEventID, Integer.MAX_VALUE, null);
@@ -292,17 +299,20 @@ public class HMSFollower implements Runnable {
   }
 
   /**
-   * Retrieve HMS full snapshot.
+   * Retrieve a Hive full snapshot from HMS.
+   *
+   * @return mapping of hiveObj -> [Paths].
+   * @throws ExecutionException, InterruptedException, TException
    */
-  private void fetchFullUpdate() throws ExecutionException, InterruptedException, TException {
+  private Map<String, Set<String>> fetchFullUpdate()
+        throws ExecutionException, InterruptedException, TException {
     FullUpdateInitializer updateInitializer = null;
 
     try {
       updateInitializer = new FullUpdateInitializer(client, authzConf);
-      // TODO - do we need to save returned authz path?
-      updateInitializer.createInitialUpdate();
-      // TODO: notify HDFS plugin
+      Map<String, Set<String>> pathsUpdate = updateInitializer.createInitialUpdate();
       LOGGER.info("#### Hive full update initialization complete !!");
+      return pathsUpdate;
     } finally {
       if (updateInitializer != null) {
         try {
@@ -314,6 +324,16 @@ public class HMSFollower implements Runnable {
     }
   }
 
+  /**
+   * Get the last processed eventID from Sentry DB.
+   *
+   * @return the stored currentID
+   * @throws Exception
+   */
+  private long getLastProcessedNotificationID() throws Exception {
+    return sentryStore.getLastProcessedNotificationID();
+  }
+
   private boolean syncWithPolicyStore(HiveAuthzConf.AuthzConfVars syncConfVar) {
     return "true"
         .equalsIgnoreCase((authzConf.get(syncConfVar.getVar(), "true")));

http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
index 0e22755..91f15c0 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
@@ -19,11 +19,7 @@
 package org.apache.sentry.provider.db.service.persistent;
 
 import java.io.File;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -2199,6 +2195,14 @@ public class TestSentryStore extends org.junit.Assert {
     Map<String, Set<String>> pathsImage = sentryStore.retrieveFullPathsImage();
     assertEquals(2, pathsImage.size());
     assertEquals(Sets.newHashSet("/user/hive/warehouse/db1.db/table1"), pathsImage.get("db1.table1"));
+
+    Map<String, Set<String>> authzPaths = new HashMap<>();
+    authzPaths.put("db2.table1", Sets.newHashSet("/user/hive/warehouse/db2.db/table1"));
+    authzPaths.put("db2.table2", Sets.newHashSet("/user/hive/warehouse/db2.db/table2"));
+    sentryStore.persistFullPathsImage(authzPaths);
+    pathsImage = sentryStore.retrieveFullPathsImage();
+    assertEquals(4, pathsImage.size());
+    assertEquals(Sets.newHashSet("/user/hive/warehouse/db2.db/table1"), pathsImage.get("db2.table1"));
   }
 
   public void testQueryParamBuilder() {