You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ha...@apache.org on 2017/01/27 23:20:52 UTC
[5/7] sentry git commit: SENTRY-1612
SENTRY-1612
Change-Id: If429dee0836cc3ceff9ff276aec670c631fc27d1
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/813d10e8
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/813d10e8
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/813d10e8
Branch: refs/heads/sentry-ha-redesign-1
Commit: 813d10e86aeb35fb6c89c983761c194cb6b0ac7b
Parents: dbd5870
Author: hahao <ha...@cloudera.com>
Authored: Thu Jan 26 17:54:19 2017 -0800
Committer: hahao <ha...@cloudera.com>
Committed: Thu Jan 26 17:54:19 2017 -0800
----------------------------------------------------------------------
.../sentry/hdfs/FullUpdateInitializer.java | 33 +++++++++---
.../sentry/hdfs/TestFullUpdateInitializer.java | 29 ++++------
.../db/service/persistent/SentryStore.java | 57 ++++++++++++++++++--
.../sentry/service/thrift/HMSFollower.java | 42 +++++++++++----
4 files changed, 121 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/813d10e8/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
index a1f970b..c990c53 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
@@ -19,6 +19,7 @@ package org.apache.sentry.hdfs;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -30,8 +31,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -296,9 +296,7 @@ public class FullUpdateInitializer implements Closeable {
ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT);
}
- public UpdateableAuthzPaths createInitialUpdate() throws Exception {
- UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(new
- String[]{"/"});
+ public Map<String, Set<String>> createInitialUpdate() throws Exception {
PathsUpdate tempUpdate = new PathsUpdate(-1, false);
List<String> allDbStr = client.getAllDatabases();
for (String dbName : allDbStr) {
@@ -321,11 +319,32 @@ public class FullUpdateInitializer implements Closeable {
}
}
- authzPaths.updatePartial(Lists.newArrayList(tempUpdate), new ReentrantReadWriteLock());
- return authzPaths;
+ return getAuthzObjToPathMapping(tempUpdate);
}
+ /**
+ * Parsing pathsUpdate to get the mapping of hiveObj -> [Paths].
+ * Only processing AddPaths, since in {@link FullUpdateInitializer} only
+ * adds paths when fetching full HMS Paths snapshot.
+ *
+ * @return mapping of hiveObj -> [Paths].
+ */
+ private Map<String, Set<String>> getAuthzObjToPathMapping(PathsUpdate pathsUpdate) {
+ Map<String, Set<String>> authzObjToPath = new HashMap<>();
+ for (TPathChanges pathChanges : pathsUpdate.getPathChanges()) {
+ // Only processing AddPaths
+ List<List<String>> addPaths = pathChanges.getAddPaths();
+ Set<String> paths = Sets.newHashSet();
+ for (List<String> addPath : addPaths) {
+ paths.add(PathsUpdate.getPath(addPath));
+ }
+ authzObjToPath.put(pathChanges.getAuthzObj(), paths);
+ }
+
+ return authzObjToPath;
+ }
+
@Override
public void close() throws IOException {
if (threadPool != null) {
http://git-wip-us.apache.org/repos/asf/sentry/blob/813d10e8/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
index 0bb6f66..792b847 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
@@ -18,6 +18,7 @@
package org.apache.sentry.hdfs;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -28,9 +29,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
+import java.util.*;
public class TestFullUpdateInitializer {
@@ -107,22 +106,14 @@ public class TestFullUpdateInitializer {
FullUpdateInitializer cacheInitializer = new
FullUpdateInitializer(client, conf);
- UpdateableAuthzPaths update = cacheInitializer.createInitialUpdate();
-
- Assert.assertEquals(new HashSet<String>(Arrays.asList("db1")), update.findAuthzObjectExactMatches(new
- String[]{"db1"}));
- Assert.assertEquals(new HashSet<String>(Arrays.asList("db2")), update.findAuthzObjectExactMatches(new
- String[]{"db2"}));
- Assert.assertEquals(new HashSet<String>(Arrays.asList("db2.tab21")), update.findAuthzObjectExactMatches(new
- String[]{"db2", "tab21"}));
- Assert.assertEquals(new HashSet<String>(Arrays.asList("db3")), update.findAuthzObjectExactMatches(new
- String[]{"db3"}));
- Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new
- String[]{"db3", "tab31"}));
- Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new
- String[]{"db3", "tab31", "part311"}));
- Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new
- String[]{"db3", "tab31", "part312"}));
+ Map<String, Set<String>> update = cacheInitializer.createInitialUpdate();
+
+ Assert.assertEquals(update.get("db1"), Sets.newHashSet("db1"));
+ Assert.assertEquals(update.get("db2"), Sets.newHashSet("db2"));
+ Assert.assertEquals(update.get("db2.tab21"), Sets.newHashSet("db2/tab21"));
+ Assert.assertEquals(update.get("db3.tab31"), Sets.newHashSet("db3/tab31",
+ "db3/tab31/part311", "db3/tab31/part312"));
+
cacheInitializer.close();
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/813d10e8/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index 46e23c6..3536579 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -49,7 +49,6 @@ import org.apache.sentry.core.common.exception.*;
import org.apache.sentry.core.common.utils.SentryConstants;
import org.apache.sentry.core.model.db.AccessConstants;
import org.apache.sentry.core.model.db.DBModelAuthorizable.AuthorizableType;
-import org.apache.sentry.hdfs.Updateable;
import org.apache.sentry.provider.db.service.model.*;
import org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessor;
import org.apache.sentry.provider.db.service.thrift.TSentryActiveRoleSet;
@@ -105,7 +104,7 @@ public class SentryStore {
// is starting from 1.
public static final long INIT_CHANGE_ID = 1L;
- private static final long EMPTY_CHANGE_ID = 0L;
+ public static final long EMPTY_CHANGE_ID = 0L;
// For counters, representation of the "unknown value"
private static final long COUNT_VALUE_UNKNOWN = -1;
@@ -2300,6 +2299,24 @@ public class SentryStore {
return retVal;
}
+ /**
+ * Persist a full hive snapshot into Sentry DB in a single transaction.
+ *
+ * @param authzPaths Mapping of hiveObj -> [Paths]
+ * @throws Exception
+ */
+ public void persistFullPathsImage(final Map<String, Set<String>> authzPaths) throws Exception {
+ tm.executeTransactionWithRetry(
+ new TransactionBlock() {
+ public Object execute(PersistenceManager pm) throws Exception {
+ for (Map.Entry<String, Set<String>> authzPath : authzPaths.entrySet()) {
+ createAuthzPathsMappingCore(pm, authzPath.getKey(), authzPath.getValue());
+ }
+ return null;
+ }
+ });
+ }
+
public void createAuthzPathsMapping(final String hiveObj,
final Set<String> paths) throws Exception {
tm.executeTransactionWithRetry(
@@ -3339,10 +3356,10 @@ public class SentryStore {
}
/**
- * Get the MSentryPermChange object by ChangeID. Internally invoke
+ * Get the last processed perm change ID. Internally invoke
* getLastProcessedPermChangeIDCore().
*
- * @return MSentryPermChange
+ * @return the change id of last processed MSentryPermChange.
*/
@VisibleForTesting
long getLastProcessedPermChangeID() throws Exception {
@@ -3355,6 +3372,38 @@ public class SentryStore {
}
/**
+ * Get the last processed path change ID.
+ *
+ * @param pm the PersistenceManager
+ * @return the last processed path changedID
+ */
+ private long getLastProcessedPathChangeIDCore(PersistenceManager pm) {
+ Query query = pm.newQuery(MSentryPathChange.class);
+ query.setResult("max(this.changeID)");
+ Long changeID = (Long) query.execute();
+ if (changeID == null) {
+ return EMPTY_CHANGE_ID;
+ } else {
+ return changeID;
+ }
+ }
+
+ /**
+ * Get the last processed path change ID. Internally invoke
+ * getLastProcessedPathChangeIDCore().
+ *
+ * @return the change id of last processed MSentryPathChange.
+ */
+ public long getLastProcessedPathChangeID() throws Exception {
+ return tm.executeTransaction(
+ new TransactionBlock<Long>() {
+ public Long execute(PersistenceManager pm) throws Exception {
+ return getLastProcessedPathChangeIDCore(pm);
+ }
+ });
+ }
+
+ /**
* Get the MSentryPermChange object by ChangeID.
*
* @param changeID the given changeID.
http://git-wip-us.apache.org/repos/asf/sentry/blob/813d10e8/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index 59646b6..ad6bdda 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hive.hcatalog.messaging.HCatEventMessage;
import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
import org.apache.sentry.core.common.exception.*;
+import org.apache.sentry.hdfs.PathsUpdate;
import org.apache.sentry.hdfs.PermissionsUpdate;
import org.apache.sentry.hdfs.UpdateableAuthzPaths;
import org.apache.sentry.hdfs.FullUpdateInitializer;
@@ -49,6 +50,8 @@ import java.io.File;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE;
import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE;
@@ -74,7 +77,6 @@ public class HMSFollower implements Runnable {
private static final int maxRetriesForLogin = 3;
private static final int maxRetriesForConnection = 3;
- private volatile UpdateableAuthzPaths authzPaths;
private boolean needHiveSnapshot = true;
private final LeaderStatusMonitor leaderMonitor;
@@ -84,8 +86,11 @@ public class HMSFollower implements Runnable {
authzConf = conf;
this.leaderMonitor = leaderMonitor;
sentryStore = new SentryStore(authzConf);
- //TODO: Initialize currentEventID from Sentry db
- currentEventID = 0;
+
+ // Initialize currentEventID from Sentry db. If currentEventID
+ // is empty, need to retrieve a hive snapshot, otherwise not.
+ currentEventID = getStoredCurrentID();
+ needHiveSnapshot = (currentEventID == SentryStore.EMPTY_CHANGE_ID);
}
@VisibleForTesting
@@ -227,12 +232,13 @@ public class HMSFollower implements Runnable {
CurrentNotificationEventId eventIDBefore = null;
CurrentNotificationEventId eventIDAfter = null;
+ Map<String, Set<String>> pathsFullSnapshot = null;
try {
eventIDBefore = client.getCurrentNotificationEventId();
LOGGER.info(String.format("Before fetching hive full snapshot, Current NotificationID = %s.",
eventIDBefore));
- fetchFullUpdate();
+ pathsFullSnapshot = fetchFullUpdate();
eventIDAfter = client.getCurrentNotificationEventId();
LOGGER.info(String.format("After fetching hive full snapshot, Current NotificationID = %s.",
eventIDAfter));
@@ -252,6 +258,7 @@ public class HMSFollower implements Runnable {
eventIDAfter));
needHiveSnapshot = false;
currentEventID = eventIDAfter.getEventId();
+ sentryStore.persistFullPathsImage(pathsFullSnapshot);
}
NotificationEventResponse response = client.getNextNotification(currentEventID, Integer.MAX_VALUE, null);
@@ -261,25 +268,30 @@ public class HMSFollower implements Runnable {
processNotificationEvents(response.getEvents());
}
} catch (TException e) {
- LOGGER.error("ThriftException occured fetching Notification entries, will try");
+ LOGGER.error("ThriftException occurred fetching Notification entries, will try");
e.printStackTrace();
} catch (SentryInvalidInputException|SentryInvalidHMSEventException e) {
throw new RuntimeException(e);
+ } catch (Exception e) {
+ LOGGER.error("Exception occurred persisting Hive full snapshot into DB");
+ e.printStackTrace();
}
}
/**
- * Retrieve HMS full snapshot.
+ * Retrieve a HMS full snapshot.
+ *
+ * @return UpdateableAuthzPaths
+ * @throws Exception
*/
- private void fetchFullUpdate() throws Exception {
+ private Map<String, Set<String>> fetchFullUpdate() throws Exception {
FullUpdateInitializer updateInitializer = null;
try {
updateInitializer = new FullUpdateInitializer(client, authzConf);
- // TODO - do we need to save returned authz path?
- updateInitializer.createInitialUpdate();
- // TODO: notify HDFS plugin
+ Map<String, Set<String>> pathsUpdate = updateInitializer.createInitialUpdate();
LOGGER.info("#### Hive full update initialization complete !!");
+ return pathsUpdate;
} finally {
if (updateInitializer != null) {
try {
@@ -291,6 +303,16 @@ public class HMSFollower implements Runnable {
}
}
+ /**
+ * Get current eventID from Sentry DB.
+ *
+ * @return the stored currentID
+ * @throws Exception
+ */
+ private long getStoredCurrentID() throws Exception {
+ return sentryStore.getLastProcessedPathChangeID();
+ }
+
private boolean syncWithPolicyStore(HiveAuthzConf.AuthzConfVars syncConfVar) {
return "true"
.equalsIgnoreCase((authzConf.get(syncConfVar.getVar(), "true")));