You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ha...@apache.org on 2017/03/02 02:03:09 UTC
sentry git commit: SENTRY-1612: HMSFollower should persist full HMS
snapshot into SentryDB if there is not one. (Hao Hao,
Reviewed by: Alexander Kolbasov)
Repository: sentry
Updated Branches:
refs/heads/sentry-ha-redesign 7aac09ec6 -> 1c6ba5ebe
SENTRY-1612: HMSFollower should persist full HMS snapshot into SentryDB if there is not one. (Hao Hao, Reviewed by: Alexander Kolbasov)
Change-Id: I375ea19277fa3092f1825476f1670652e4c981c4
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/1c6ba5eb
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/1c6ba5eb
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/1c6ba5eb
Branch: refs/heads/sentry-ha-redesign
Commit: 1c6ba5ebe4a4374b2472633dd21435eec9933d6f
Parents: 7aac09e
Author: hahao <ha...@cloudera.com>
Authored: Wed Mar 1 18:01:50 2017 -0800
Committer: hahao <ha...@cloudera.com>
Committed: Wed Mar 1 18:01:50 2017 -0800
----------------------------------------------------------------------
.../sentry/hdfs/FullUpdateInitializer.java | 44 ++++++++++++++++----
.../org/apache/sentry/hdfs/PathsUpdate.java | 24 +++++++++++
.../sentry/hdfs/TestFullUpdateInitializer.java | 31 ++++++--------
.../db/service/model/MSentryPathChange.java | 4 ++
.../db/service/persistent/SentryStore.java | 44 +++++++++++++++++++-
.../sentry/service/thrift/HMSFollower.java | 40 +++++++++++++-----
.../db/service/persistent/TestSentryStore.java | 14 ++++---
7 files changed, 157 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
index f95dd94..146cea2 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
@@ -18,7 +18,7 @@
package org.apache.sentry.hdfs;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -33,9 +33,11 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* FullUpdateInitializer is for fetching hive full update,
@@ -297,9 +299,8 @@ public class FullUpdateInitializer implements Closeable {
ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT);
}
- public UpdateableAuthzPaths createInitialUpdate() throws ExecutionException, InterruptedException, TException {
- UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(new
- String[]{"/"});
+ public Map<String, Set<String>> createInitialUpdate() throws ExecutionException,
+ InterruptedException, TException {
PathsUpdate tempUpdate = new PathsUpdate(-1, false);
List<String> allDbStr = client.getAllDatabases();
for (String dbName : allDbStr) {
@@ -322,15 +323,42 @@ public class FullUpdateInitializer implements Closeable {
}
}
- authzPaths.updatePartial(Lists.newArrayList(tempUpdate), new ReentrantReadWriteLock());
- return authzPaths;
+ return getAuthzObjToPathMapping(tempUpdate);
}
+ /**
+ * Parsing a pathsUpdate to get the mapping of hiveObj -> [Paths].
+ * It only processes {@link TPathChanges}.addPaths, since
+ * {@link FullUpdateInitializer} only add paths when fetching
+ * full HMS Paths snapshot. Each path represented as path tree
+ * concatenated by "/". e.g 'usr/hive/warehouse'.
+ *
+ * @return mapping of hiveObj -> [Paths].
+ */
+ private Map<String, Set<String>> getAuthzObjToPathMapping(PathsUpdate pathsUpdate) {
+ Map<String, Set<String>> authzObjToPath = new HashMap<>();
+ List<TPathChanges> tPathChanges = pathsUpdate.getPathChanges();
+
+ if (!tPathChanges.isEmpty()) {
+ for (TPathChanges pathChanges : tPathChanges) {
+ // Only processes TPathChanges.addPaths
+ List<List<String>> addPaths = pathChanges.getAddPaths();
+ Set<String> paths = Sets.newHashSet();
+ for (List<String> addPath : addPaths) {
+ paths.add(PathsUpdate.cancatePath(addPath));
+ }
+ authzObjToPath.put(pathChanges.getAuthzObj(), paths);
+ }
+ }
+
+ return authzObjToPath;
+ }
+
@Override
public void close() throws IOException {
if (threadPool != null) {
threadPool.shutdownNow();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java
index ffb0756..992c8b7 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java
@@ -24,6 +24,8 @@ import java.util.LinkedList;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
import org.apache.sentry.hdfs.service.thrift.TPathChanges;
import org.apache.sentry.hdfs.service.thrift.TPathsUpdate;
import org.apache.commons.httpclient.util.URIUtil;
@@ -153,6 +155,28 @@ public class PathsUpdate implements Updateable.Update {
}
}
+ /**
+ * Given a path tree in a list, return a string concatenated by "/".
+ * e.g < usr, hive, warehouse > -> 'usr/hive/warehouse'.
+ *
+ * @param paths
+ * @return a path string concatenated by "/".
+ */
+ public static String cancatePath(Iterable<String> paths) {
+ return Joiner.on("/").join(paths);
+ }
+
+ /**
+ * Split a path a path concatenated by "/" into a path tree represented
+ * as a list.
+ *
+ * @param path
+ * @return a path tree represented as a list.
+ */
+ public static List<String> splitPath(String path) {
+ return Lists.newArrayList(Splitter.on("/").split(path));
+ }
+
@Override
public byte[] serialize() throws IOException {
return ThriftSerializer.serialize(tPathsUpdate);
http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
index 0bb6f66..f338ce8 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
@@ -18,6 +18,7 @@
package org.apache.sentry.hdfs;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -29,8 +30,8 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
public class TestFullUpdateInitializer {
@@ -107,22 +108,14 @@ public class TestFullUpdateInitializer {
FullUpdateInitializer cacheInitializer = new
FullUpdateInitializer(client, conf);
- UpdateableAuthzPaths update = cacheInitializer.createInitialUpdate();
-
- Assert.assertEquals(new HashSet<String>(Arrays.asList("db1")), update.findAuthzObjectExactMatches(new
- String[]{"db1"}));
- Assert.assertEquals(new HashSet<String>(Arrays.asList("db2")), update.findAuthzObjectExactMatches(new
- String[]{"db2"}));
- Assert.assertEquals(new HashSet<String>(Arrays.asList("db2.tab21")), update.findAuthzObjectExactMatches(new
- String[]{"db2", "tab21"}));
- Assert.assertEquals(new HashSet<String>(Arrays.asList("db3")), update.findAuthzObjectExactMatches(new
- String[]{"db3"}));
- Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new
- String[]{"db3", "tab31"}));
- Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new
- String[]{"db3", "tab31", "part311"}));
- Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new
- String[]{"db3", "tab31", "part312"}));
+ Map<String, Set<String>> update = cacheInitializer.createInitialUpdate();
+
+ Assert.assertEquals(update.get("db1"), Sets.newHashSet("db1"));
+ Assert.assertEquals(update.get("db2"), Sets.newHashSet("db2"));
+ Assert.assertEquals(update.get("db2.tab21"), Sets.newHashSet("db2/tab21"));
+ Assert.assertEquals(update.get("db3.tab31"), Sets.newHashSet("db3/tab31",
+ "db3/tab31/part311", "db3/tab31/part312"));
+
cacheInitializer.close();
}
@@ -167,4 +160,4 @@ public class TestFullUpdateInitializer {
Assert.assertTrue(e instanceof RuntimeException);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
index 0ca7fe2..a0d3445 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
@@ -89,6 +89,10 @@ public class MSentryPathChange implements MSentryChange {
return changeID;
}
+ public long getNotificationID() {
+ return notificationID;
+ }
+
@Override
public String toString() {
return "MSentryChange [changeID=" + changeID + " , notificationID= "
http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index b9272bc..c1186ba 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -121,7 +121,7 @@ public class SentryStore {
// is starting from 1.
public static final long INIT_CHANGE_ID = 1L;
- private static final long EMPTY_CHANGE_ID = 0L;
+ public static final long EMPTY_CHANGE_ID = 0L;
// For counters, representation of the "unknown value"
private static final long COUNT_VALUE_UNKNOWN = -1;
@@ -2329,6 +2329,24 @@ public class SentryStore {
return result;
}
+ /**
+ * Persist a full hive snapshot into Sentry DB in a single transaction.
+ *
+ * @param authzPaths Mapping of hiveObj -> [Paths]
+ * @throws Exception
+ */
+ public void persistFullPathsImage(final Map<String, Set<String>> authzPaths) throws Exception {
+ tm.executeTransactionWithRetry(
+ new TransactionBlock() {
+ public Object execute(PersistenceManager pm) throws Exception {
+ for (Map.Entry<String, Set<String>> authzPath : authzPaths.entrySet()) {
+ createAuthzPathsMappingCore(pm, authzPath.getKey(), authzPath.getValue());
+ }
+ return null;
+ }
+ });
+ }
+
public void createAuthzPathsMapping(final String hiveObj,
final Set<String> paths) throws Exception {
tm.executeTransactionWithRetry(
@@ -3063,7 +3081,8 @@ public class SentryStore {
* @param pm the PersistenceManager
* @param changeCls the class of a delta c
*
- * @return the last processed changedID for the delta changes.
+ * @return the last processed changedID for the delta changes. If no
+ * change found then return 0.
*/
private <T extends MSentryChange> long getLastProcessedChangeIDCore(
PersistenceManager pm, Class<T> changeCls) {
@@ -3091,6 +3110,27 @@ public class SentryStore {
}
/**
+ * Get the notification ID of last processed path delta change.
+ *
+ * @return the notification ID of latest path change. If no change
+ * found then return 0.
+ */
+ public long getLastProcessedNotificationID() throws Exception {
+ return tm.executeTransaction(
+ new TransactionBlock<Long>() {
+ public Long execute(PersistenceManager pm) throws Exception {
+ long changeID = getLastProcessedChangeIDCore(pm, MSentryPathChange.class);
+ if (changeID == EMPTY_CHANGE_ID) {
+ return EMPTY_CHANGE_ID;
+ } else {
+ MSentryPathChange mSentryPathChange = getMSentryPathChangeByID(changeID);
+ return mSentryPathChange.getNotificationID();
+ }
+ }
+ });
+ }
+
+ /**
* Get the MSentryPermChange object by ChangeID.
*
* @param changeID the given changeID.
http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index bdbb0cc..8b07f5b 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hive.hcatalog.messaging.HCatEventMessage;
import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
import org.apache.sentry.core.common.exception.*;
+import org.apache.sentry.hdfs.PathsUpdate;
import org.apache.sentry.hdfs.PermissionsUpdate;
import org.apache.sentry.hdfs.UpdateableAuthzPaths;
import org.apache.sentry.hdfs.FullUpdateInitializer;
@@ -52,6 +53,8 @@ import java.net.SocketException;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE;
@@ -77,7 +80,6 @@ public class HMSFollower implements Runnable {
private final SentryStore sentryStore;
private String hiveInstance;
- private volatile UpdateableAuthzPaths authzPaths;
private boolean needHiveSnapshot = true;
private final LeaderStatusMonitor leaderMonitor;
@@ -87,8 +89,11 @@ public class HMSFollower implements Runnable {
authzConf = conf;
this.leaderMonitor = leaderMonitor;
sentryStore = store;
- //TODO: Initialize currentEventID from Sentry db
- currentEventID = 0;
+
+ // Initialize currentEventID based on the latest persisted notification ID.
+ // If currentEventID is empty, need to retrieve a full hive snapshot,
+ currentEventID = getLastProcessedNotificationID();
+ needHiveSnapshot = (currentEventID == SentryStore.EMPTY_CHANGE_ID);
}
@VisibleForTesting
@@ -222,13 +227,14 @@ public class HMSFollower implements Runnable {
// will be dropped. A new attempts will be made after 500 milliseconds when
// HMSFollower run again.
+ Map<String, Set<String>> pathsFullSnapshot;
CurrentNotificationEventId eventIDBefore = client.getCurrentNotificationEventId();
LOGGER.info(String.format("Before fetching hive full snapshot, Current NotificationID = %s.", eventIDBefore));
try {
- fetchFullUpdate();
+ pathsFullSnapshot = fetchFullUpdate();
} catch (ExecutionException | InterruptedException ex) {
- LOGGER.error("#### Encountered failure during fetching one hive full snapshot !!", ex);
+ LOGGER.error("#### Encountered failure during fetching hive full snapshot !!", ex);
return;
}
@@ -245,6 +251,7 @@ public class HMSFollower implements Runnable {
eventIDAfter));
needHiveSnapshot = false;
currentEventID = eventIDAfter.getEventId();
+ sentryStore.persistFullPathsImage(pathsFullSnapshot);
}
NotificationEventResponse response = client.getNextNotification(currentEventID, Integer.MAX_VALUE, null);
@@ -292,17 +299,20 @@ public class HMSFollower implements Runnable {
}
/**
- * Retrieve HMS full snapshot.
+ * Retrieve a Hive full snapshot from HMS.
+ *
+ * @return mapping of hiveObj -> [Paths].
+ * @throws ExecutionException, InterruptedException, TException
*/
- private void fetchFullUpdate() throws ExecutionException, InterruptedException, TException {
+ private Map<String, Set<String>> fetchFullUpdate()
+ throws ExecutionException, InterruptedException, TException {
FullUpdateInitializer updateInitializer = null;
try {
updateInitializer = new FullUpdateInitializer(client, authzConf);
- // TODO - do we need to save returned authz path?
- updateInitializer.createInitialUpdate();
- // TODO: notify HDFS plugin
+ Map<String, Set<String>> pathsUpdate = updateInitializer.createInitialUpdate();
LOGGER.info("#### Hive full update initialization complete !!");
+ return pathsUpdate;
} finally {
if (updateInitializer != null) {
try {
@@ -314,6 +324,16 @@ public class HMSFollower implements Runnable {
}
}
+ /**
+ * Get the last processed eventID from Sentry DB.
+ *
+ * @return the stored currentID
+ * @throws Exception
+ */
+ private long getLastProcessedNotificationID() throws Exception {
+ return sentryStore.getLastProcessedNotificationID();
+ }
+
private boolean syncWithPolicyStore(HiveAuthzConf.AuthzConfVars syncConfVar) {
return "true"
.equalsIgnoreCase((authzConf.get(syncConfVar.getVar(), "true")));
http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
index 0e22755..91f15c0 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
@@ -19,11 +19,7 @@
package org.apache.sentry.provider.db.service.persistent;
import java.io.File;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
@@ -2199,6 +2195,14 @@ public class TestSentryStore extends org.junit.Assert {
Map<String, Set<String>> pathsImage = sentryStore.retrieveFullPathsImage();
assertEquals(2, pathsImage.size());
assertEquals(Sets.newHashSet("/user/hive/warehouse/db1.db/table1"), pathsImage.get("db1.table1"));
+
+ Map<String, Set<String>> authzPaths = new HashMap<>();
+ authzPaths.put("db2.table1", Sets.newHashSet("/user/hive/warehouse/db2.db/table1"));
+ authzPaths.put("db2.table2", Sets.newHashSet("/user/hive/warehouse/db2.db/table2"));
+ sentryStore.persistFullPathsImage(authzPaths);
+ pathsImage = sentryStore.retrieveFullPathsImage();
+ assertEquals(4, pathsImage.size());
+ assertEquals(Sets.newHashSet("/user/hive/warehouse/db2.db/table1"), pathsImage.get("db2.table1"));
}
public void testQueryParamBuilder() {