You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by pr...@apache.org on 2015/04/18 19:05:52 UTC
incubator-sentry git commit: SENTRY-696: Improve Metastoreplugin
Cache Initialization time (Arun Suresh via Prasad Mujumdar)
Repository: incubator-sentry
Updated Branches:
refs/heads/master 8e16e87ce -> fd31d2cd4
SENTRY-696: Improve Metastoreplugin Cache Initialization time (Arun Suresh via Prasad Mujumdar)
Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/fd31d2cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/fd31d2cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/fd31d2cd
Branch: refs/heads/master
Commit: fd31d2cd4b7d0d63dc6f8a61d2cf6b3cf4d4d72e
Parents: 8e16e87
Author: Prasad Mujumdar <pr...@cloudera.com>
Authored: Sat Apr 18 10:05:39 2015 -0700
Committer: Prasad Mujumdar <pr...@cloudera.com>
Committed: Sat Apr 18 10:05:39 2015 -0700
----------------------------------------------------------------------
.../apache/sentry/hdfs/ServiceConstants.java | 8 +
sentry-hdfs/sentry-hdfs-namenode-plugin/pom.xml | 4 +
sentry-hdfs/sentry-hdfs-service/pom.xml | 5 +
.../sentry/hdfs/MetastoreCacheInitializer.java | 252 +++++++++++++++++++
.../org/apache/sentry/hdfs/MetastorePlugin.java | 150 +++++++----
.../sentry/hdfs/MetastorePluginWithHA.java | 2 +-
.../hdfs/TestMetastoreCacheInitializer.java | 133 ++++++++++
7 files changed, 498 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/fd31d2cd/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
index 489d165..19b0b49 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
@@ -51,7 +51,15 @@ public class ServiceConstants {
public static final String SENTRY_HDFS_HA_ZOOKEEPER_NAMESPACE_DEFAULT = "/sentry_hdfs";
public static final String SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE = "sentry.metastore.ha.zookeeper.namespace";
public static final String SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE_DEFAULT = "/sentry_metastore";
+ public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS = "sentry.hdfs.sync.metastore.cache.init.threads";
+ public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT = 10;
+ public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE = "sentry.hdfs.sync.metastore.cache.async-init.enable";
+ public static final boolean SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE_DEFAULT = false;
+ public static String SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC = "sentry.hdfs.sync.metastore.cache.max-partitions-per-rpc";
+ public static int SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT = 100;
+ public static String SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC = "sentry.hdfs.sync.metastore.cache.max-tables-per-rpc";
+ public static int SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT = 100;
}
public static class ClientConfig {
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/fd31d2cd/sentry-hdfs/sentry-hdfs-namenode-plugin/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/pom.xml b/sentry-hdfs/sentry-hdfs-namenode-plugin/pom.xml
index f35baf4..04b79d8 100644
--- a/sentry-hdfs/sentry-hdfs-namenode-plugin/pom.xml
+++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/pom.xml
@@ -54,6 +54,10 @@ limitations under the License.
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/fd31d2cd/sentry-hdfs/sentry-hdfs-service/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/pom.xml b/sentry-hdfs/sentry-hdfs-service/pom.xml
index 4d65edf..5d5d525 100644
--- a/sentry-hdfs/sentry-hdfs-service/pom.xml
+++ b/sentry-hdfs/sentry-hdfs-service/pom.xml
@@ -33,6 +33,11 @@ limitations under the License.
<artifactId>sentry-binding-hive</artifactId>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/fd31d2cd/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastoreCacheInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastoreCacheInitializer.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastoreCacheInitializer.java
new file mode 100644
index 0000000..093d21a
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastoreCacheInitializer.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.sentry.hdfs.service.thrift.TPathChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+class MetastoreCacheInitializer implements Closeable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger
+ (MetastoreCacheInitializer.class);
+
+ static class CallResult {
+ final Exception failure;
+
+ CallResult(Exception ex) {
+ failure = null;
+ }
+ }
+
+ abstract class BaseTask implements Callable<CallResult> {
+
+ BaseTask() { taskCounter.incrementAndGet(); }
+
+ @Override
+ public CallResult call() throws Exception {
+ try {
+ doTask();
+ } catch (Exception ex) {
+ // Ignore if object requested does not exists
+ return new CallResult(
+ (ex instanceof NoSuchObjectException) ? null : ex);
+ } finally {
+ taskCounter.decrementAndGet();
+ }
+ return new CallResult(null);
+ }
+
+ abstract void doTask() throws Exception;
+ }
+
+ class PartitionTask extends BaseTask {
+ private final String dbName;
+ private final String tblName;
+ private final List<String> partNames;
+ private final TPathChanges tblPathChange;
+
+ PartitionTask(String dbName, String tblName, List<String> partNames,
+ TPathChanges tblPathChange) {
+ super();
+ this.dbName = dbName;
+ this.tblName = tblName;
+ this.partNames = partNames;
+ this.tblPathChange = tblPathChange;
+ }
+
+ @Override
+ public void doTask() throws Exception {
+ List<Partition> tblParts =
+ hmsHandler.get_partitions_by_names(dbName, tblName, partNames);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("#### Fetching partitions " +
+ "[" + dbName + "." + tblName + "]" + "[" + partNames + "]");
+ }
+ for (Partition part : tblParts) {
+ List<String> partPath = PathsUpdate.parsePath(part.getSd()
+ .getLocation());
+ if (partPath != null) {
+ synchronized (tblPathChange) {
+ tblPathChange.addToAddPaths(partPath);
+ }
+ }
+ }
+ }
+ }
+
+ class TableTask extends BaseTask {
+ private final Database db;
+ private final List<String> tableNames;
+ private final PathsUpdate update;
+
+ TableTask(Database db, List<String> tableNames, PathsUpdate update) {
+ super();
+ this.db = db;
+ this.tableNames = tableNames;
+ this.update = update;
+ }
+
+ @Override
+ public void doTask() throws Exception {
+ List<Table> tables =
+ hmsHandler.get_table_objects_by_name(db.getName(), tableNames);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("#### Fetching tables [" + db.getName() + "][" +
+ tableNames + "]");
+ }
+ for (Table tbl : tables) {
+ TPathChanges tblPathChange;
+ synchronized (update) {
+ tblPathChange = update.newPathChange(tbl.getDbName() + "." + tbl
+ .getTableName());
+ }
+ if (tbl.getSd().getLocation() != null) {
+ List<String> tblPath =
+ PathsUpdate.parsePath(tbl.getSd().getLocation());
+ tblPathChange.addToAddPaths(tblPath);
+ List<String> tblPartNames =
+ hmsHandler.get_partition_names(db.getName(), tbl
+ .getTableName(), (short) -1);
+ for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) {
+ List<String> partsToFetch =
+ tblPartNames.subList(i, Math.min(
+ i + maxPartitionsPerCall, tblPartNames.size()));
+ Callable<CallResult> partTask =
+ new PartitionTask(db.getName(), tbl.getTableName(),
+ partsToFetch, tblPathChange);
+ synchronized (results) {
+ results.add(threadPool.submit(partTask));
+ }
+ }
+ }
+ }
+ }
+ }
+
+ class DbTask extends BaseTask {
+
+ private final PathsUpdate update;
+ private final String dbName;
+
+ DbTask(PathsUpdate update, String dbName) {
+ super();
+ this.update = update;
+ this.dbName = dbName;
+ }
+
+ @Override
+ public void doTask() throws Exception {
+ Database db = hmsHandler.get_database(dbName);
+ List<String> dbPath = PathsUpdate.parsePath(db.getLocationUri());
+ if (dbPath != null) {
+ synchronized (update) {
+ update.newPathChange(db.getName()).addToAddPaths(dbPath);
+ }
+ }
+ List<String> allTblStr = hmsHandler.get_all_tables(db.getName());
+ for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) {
+ List<String> tablesToFetch =
+ allTblStr.subList(i, Math.min(
+ i + maxTablesPerCall, allTblStr.size()));
+ Callable<CallResult> tableTask =
+ new TableTask(db, tablesToFetch, update);
+ synchronized (results) {
+ results.add(threadPool.submit(tableTask));
+ }
+ }
+ }
+ }
+
+ private final ExecutorService threadPool;
+ private final IHMSHandler hmsHandler;
+ private final int maxPartitionsPerCall;
+ private final int maxTablesPerCall;
+ private final List<Future<CallResult>> results =
+ new ArrayList<Future<CallResult>>();
+ private final AtomicInteger taskCounter = new AtomicInteger(0);
+
+ MetastoreCacheInitializer(IHMSHandler hmsHandler, Configuration conf) {
+ this.hmsHandler = hmsHandler;
+ this.maxPartitionsPerCall = conf.getInt(
+ ServiceConstants.ServerConfig
+ .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC,
+ ServiceConstants.ServerConfig
+ .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT);
+ this.maxTablesPerCall = conf.getInt(
+ ServiceConstants.ServerConfig
+ .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC,
+ ServiceConstants.ServerConfig
+ .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT);
+ threadPool = Executors.newFixedThreadPool(conf.getInt(
+ ServiceConstants.ServerConfig
+ .SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS,
+ ServiceConstants.ServerConfig
+ .SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT));
+ }
+
+ UpdateableAuthzPaths createInitialUpdate() throws
+ Exception {
+ UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(new
+ String[]{"/"});
+ PathsUpdate tempUpdate = new PathsUpdate(-1, false);
+ List<String> allDbStr = hmsHandler.get_all_databases();
+ List<Future<CallResult>> results = new ArrayList<Future<CallResult>>();
+ for (String dbName : allDbStr) {
+ Callable<CallResult> dbTask = new DbTask(tempUpdate, dbName);
+ results.add(threadPool.submit(dbTask));
+ }
+
+ while (taskCounter.get() > 0) {
+ Thread.sleep(1000);
+ // Wait until no more tasks remain
+ }
+ for (Future<CallResult> result : results) {
+ CallResult callResult = result.get();
+ if (callResult.failure != null) {
+ throw new RuntimeException(callResult.failure);
+ }
+ }
+ authzPaths.updatePartial(Lists.newArrayList(tempUpdate),
+ new ReentrantReadWriteLock());
+ return authzPaths;
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ if (threadPool != null) {
+ threadPool.shutdownNow();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/fd31d2cd/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
index 7106e74..d7b5d5a 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
@@ -17,8 +17,11 @@
*/
package org.apache.sentry.hdfs;
-import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -31,14 +34,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
-import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.hive.metastore.MetaStorePreEventListener;
-import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
-import org.apache.sentry.hdfs.service.thrift.TPathChanges;
import org.apache.sentry.provider.db.SentryMetastoreListenerPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +59,11 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
// No need to sync.. as metastore is in the process of pushing an update..
return;
}
+ if (MetastorePlugin.this.authzPaths == null) {
+ LOGGER.info("#### Metastore Plugin cache has not finished" +
+ "initialization.");
+ return;
+ }
try {
long lastSeenBySentry =
MetastorePlugin.this.getClient().getLastSeenHMSPathSeqNum();
@@ -85,7 +88,7 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
private final Configuration conf;
private SentryHDFSServiceClient sentryClient;
- private UpdateableAuthzPaths authzPaths;
+ private volatile UpdateableAuthzPaths authzPaths;
private Lock notificiationLock;
// Initialized to some value > 1.
@@ -94,6 +97,11 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
// Has to match the value of seqNum
protected static volatile long lastSentSeqNum = seqNum.get();
private volatile boolean syncSent = false;
+ private volatile boolean initComplete = false;
+ private volatile boolean queueFlushComplete = false;
+ private volatile Throwable initError = null;
+ private final Queue<PathsUpdate> updateQueue = new LinkedList<PathsUpdate>();
+
private final ExecutorService threadPool;
private final Configuration sentryConf;
@@ -111,11 +119,53 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
this.conf.unset(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname);
this.conf.unset(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname);
this.conf.unset(HiveConf.ConfVars.METASTOREURIS.varname);
- try {
- this.authzPaths = createInitialUpdate(new ProxyHMSHandler("sentry.hdfs", (HiveConf)this.conf));
- } catch (Exception e1) {
- LOGGER.error("Could not create Initial AuthzPaths or HMSHandler !!", e1);
- throw new RuntimeException(e1);
+ Thread initUpdater = new Thread() {
+ @Override
+ public void run() {
+ MetastoreCacheInitializer cacheInitializer = null;
+ try {
+ cacheInitializer =
+ new MetastoreCacheInitializer(new ProxyHMSHandler("sentry.hdfs",
+ (HiveConf) MetastorePlugin.this.conf),
+ MetastorePlugin.this.conf);
+ MetastorePlugin.this.authzPaths =
+ cacheInitializer.createInitialUpdate();
+ LOGGER.info("#### Metastore Plugin initialization complete !!");
+ synchronized (updateQueue) {
+ while (!updateQueue.isEmpty()) {
+ PathsUpdate update = updateQueue.poll();
+ if (update != null) {
+ processUpdate(update);
+ }
+ }
+ queueFlushComplete = true;
+ }
+ LOGGER.info("#### Finished flushing queued updates to Sentry !!");
+ } catch (Exception e) {
+ LOGGER.error("#### Could not create Initial AuthzPaths or HMSHandler !!", e);
+ initError = e;
+ } finally {
+ if (cacheInitializer != null) {
+ try {
+ cacheInitializer.close();
+ } catch (Exception e) {
+ LOGGER.info("#### Exception while closing cacheInitializer !!", e);
+ }
+ }
+ initComplete = true;
+ }
+ }
+ };
+ if (this.conf.getBoolean(
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE,
+ ServerConfig
+ .SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE_DEFAULT)) {
+ LOGGER.warn("#### Metastore Cache initialization is set to aync..." +
+ "HDFS ACL synchronization will not happen until metastore" +
+ "cache initialization is completed !!");
+ initUpdater.start();
+ } else {
+ initUpdater.run();
}
try {
sentryClient = SentryHDFSServiceClientFactory.create(sentryConf);
@@ -125,49 +175,15 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
}
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
threadPool.scheduleWithFixedDelay(new SyncTask(),
- this.conf.getLong(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
- ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT),
- this.conf.getLong(ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_MS,
- ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_DEFAULT),
- TimeUnit.MILLISECONDS);
+ this.conf.getLong(ServerConfig
+ .SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
+ ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT),
+ this.conf.getLong(ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_MS,
+ ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_DEFAULT),
+ TimeUnit.MILLISECONDS);
this.threadPool = threadPool;
}
- private UpdateableAuthzPaths createInitialUpdate(IHMSHandler hmsHandler) throws Exception {
- UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(new String[] {"/"});
- PathsUpdate tempUpdate = new PathsUpdate(-1, false);
- List<String> allDbStr = hmsHandler.get_all_databases();
- for (String dbName : allDbStr) {
- Database db = hmsHandler.get_database(dbName);
- List<String> dbPath = PathsUpdate.parsePath(db.getLocationUri());
- if(dbPath != null) {
- tempUpdate.newPathChange(db.getName()).addToAddPaths(dbPath);
- }
- List<String> allTblStr = hmsHandler.get_all_tables(db.getName());
- for (String tblName : allTblStr) {
- Table tbl = hmsHandler.get_table(db.getName(), tblName);
- TPathChanges tblPathChange = tempUpdate.newPathChange(tbl
- .getDbName() + "." + tbl.getTableName());
- List<Partition> tblParts =
- hmsHandler.get_partitions(db.getName(), tbl.getTableName(), (short) -1);
- List<String> tb1Path = PathsUpdate.parsePath(tbl.getSd().getLocation() == null ?
- db.getLocationUri() : tbl.getSd().getLocation());
- if(tb1Path != null) {
- tblPathChange.addToAddPaths(tb1Path);
- }
- for (Partition part : tblParts) {
- List<String> partPath = PathsUpdate.parsePath(part.getSd().getLocation());
- if(partPath != null) {
- tblPathChange.addToAddPaths(partPath);
- }
- }
- }
- }
- authzPaths.updatePartial(Lists.newArrayList(tempUpdate),
- new ReentrantReadWriteLock());
- return authzPaths;
- }
-
@Override
public void addPath(String authzObj, String path) {
List<String> pathTree = PathsUpdate.parsePath(path);
@@ -197,7 +213,7 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
}
}
update.newPathChange(authzObj).addToDelPaths(
- Lists.newArrayList(PathsUpdate.ALL_PATHS));
+ Lists.newArrayList(PathsUpdate.ALL_PATHS));
notifySentryAndApplyLocal(update);
}
@@ -247,7 +263,7 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
sentryClient = SentryHDFSServiceClientFactory.create(sentryConf);
} catch (Exception e) {
sentryClient = null;
- LOGGER.error("Could not connect to Sentry HDFS Service !!", e);
+ LOGGER.error("#### Could not connect to Sentry HDFS Service !!", e);
}
}
return sentryClient;
@@ -285,7 +301,31 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
authzPaths.updatePartial(Lists.newArrayList(update), new ReentrantReadWriteLock());
}
- protected void notifySentryAndApplyLocal(PathsUpdate update) {
+ private void notifySentryAndApplyLocal(PathsUpdate update) {
+ if (initComplete) {
+ processUpdate(update);
+ } else {
+ if (initError == null) {
+ synchronized (updateQueue) {
+ if (!queueFlushComplete) {
+ updateQueue.add(update);
+ } else {
+ processUpdate(update);
+ }
+ }
+ } else {
+ StringWriter sw = new StringWriter();
+ initError.printStackTrace(new PrintWriter(sw));
+ LOGGER.error("#### Error initializing Metastore Plugin" +
+ "[" + sw.toString() + "] !!");
+ throw new RuntimeException(initError);
+ }
+ LOGGER.warn("#### Path update [" + update.getSeqNum() + "] not sent to Sentry.." +
+ "Metastore hasn't been initialized yet !!");
+ }
+ }
+
+ protected void processUpdate(PathsUpdate update) {
applyLocal(update);
notifySentry(update);
}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/fd31d2cd/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
index ee5e0f9..4f6d7ca 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
@@ -79,7 +79,7 @@ public class MetastorePluginWithHA extends MetastorePlugin {
}
@Override
- protected void notifySentryAndApplyLocal(PathsUpdate update) {
+ protected void processUpdate(PathsUpdate update) {
try {
// push to ZK in order to keep the metastore local cache in sync
pluginCacheSync.handleCacheUpdate(update);
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/fd31d2cd/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestMetastoreCacheInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestMetastoreCacheInitializer.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestMetastoreCacheInitializer.java
new file mode 100644
index 0000000..a5a165a
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestMetastoreCacheInitializer.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+
+public class TestMetastoreCacheInitializer {
+
+ @Test
+ public void testInitializer() throws Exception {
+
+ Database db1 = Mockito.mock(Database.class);
+ Mockito.when(db1.getName()).thenReturn("db1");
+ Mockito.when(db1.getLocationUri()).thenReturn("hdfs:///db1");
+ Database db2 = Mockito.mock(Database.class);
+ Mockito.when(db2.getName()).thenReturn("db2");
+ Mockito.when(db2.getLocationUri()).thenReturn("hdfs:///db2");
+ Database db3 = Mockito.mock(Database.class);
+ Mockito.when(db3.getName()).thenReturn("db3");
+ Mockito.when(db3.getLocationUri()).thenReturn("hdfs:///db3");
+
+ Table tab21 = Mockito.mock(Table.class);
+ Mockito.when(tab21.getDbName()).thenReturn("db2");
+ Mockito.when(tab21.getTableName()).thenReturn("tab21");
+ StorageDescriptor sd21 = Mockito.mock(StorageDescriptor.class);
+ Mockito.when(sd21.getLocation()).thenReturn("hdfs:///db2/tab21");
+ Mockito.when(tab21.getSd()).thenReturn(sd21);
+
+ Table tab31 = Mockito.mock(Table.class);
+ Mockito.when(tab31.getDbName()).thenReturn("db3");
+ Mockito.when(tab31.getTableName()).thenReturn("tab31");
+ StorageDescriptor sd31 = Mockito.mock(StorageDescriptor.class);
+ Mockito.when(sd31.getLocation()).thenReturn("hdfs:///db3/tab31");
+ Mockito.when(tab31.getSd()).thenReturn(sd31);
+
+ Partition part311 = Mockito.mock(Partition.class);
+ StorageDescriptor sd311 = Mockito.mock(StorageDescriptor.class);
+ Mockito.when(sd311.getLocation()).thenReturn("hdfs:///db3/tab31/part311");
+ Mockito.when(part311.getSd()).thenReturn(sd311);
+
+ Partition part312 = Mockito.mock(Partition.class);
+ StorageDescriptor sd312 = Mockito.mock(StorageDescriptor.class);
+ Mockito.when(sd312.getLocation()).thenReturn("hdfs:///db3/tab31/part312");
+ Mockito.when(part312.getSd()).thenReturn(sd312);
+
+ IHMSHandler hmsHandler = Mockito.mock(IHMSHandler.class);
+ Mockito.when(hmsHandler.get_all_databases()).thenReturn(Lists
+ .newArrayList("db1", "db2", "db3"));
+ Mockito.when(hmsHandler.get_database("db1")).thenReturn(db1);
+ Mockito.when(hmsHandler.get_all_tables("db1")).thenReturn(new
+ ArrayList<String>());
+
+ Mockito.when(hmsHandler.get_database("db2")).thenReturn(db2);
+ Mockito.when(hmsHandler.get_all_tables("db2")).thenReturn(Lists
+ .newArrayList("tab21"));
+ Mockito.when(hmsHandler.get_table_objects_by_name("db2",
+ Lists.newArrayList("tab21")))
+ .thenReturn(Lists.newArrayList(tab21));
+ Mockito.when(hmsHandler.get_partition_names("db2", "tab21", (short) -1))
+ .thenReturn(new ArrayList<String>());
+
+ Mockito.when(hmsHandler.get_database("db3")).thenReturn(db3);
+ Mockito.when(hmsHandler.get_all_tables("db3")).thenReturn(Lists
+ .newArrayList("tab31"));
+ Mockito.when(hmsHandler.get_table_objects_by_name("db3",
+ Lists.newArrayList("tab31")))
+ .thenReturn(Lists.newArrayList(tab31));
+ Mockito.when(hmsHandler.get_partition_names("db3", "tab31", (short) -1))
+ .thenReturn(Lists.newArrayList("part311", "part312"));
+
+ Mockito.when(hmsHandler.get_partitions_by_names("db3", "tab31",
+ Lists.newArrayList("part311")))
+ .thenReturn(Lists.newArrayList(part311));
+ Mockito.when(hmsHandler.get_partitions_by_names("db3", "tab31",
+ Lists.newArrayList("part312")))
+ .thenReturn(Lists.newArrayList(part312));
+
+ Configuration conf = new Configuration();
+ conf.setInt(ServiceConstants.ServerConfig
+ .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, 1);
+ conf.setInt(ServiceConstants.ServerConfig
+ .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, 1);
+ conf.setInt(ServiceConstants.ServerConfig
+ .SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, 1);
+
+ MetastoreCacheInitializer cacheInitializer = new
+ MetastoreCacheInitializer(hmsHandler, conf);
+ UpdateableAuthzPaths update = cacheInitializer.createInitialUpdate();
+
+ Assert.assertEquals("db1", update.findAuthzObjectExactMatch(new
+ String[]{"db1"}));
+ Assert.assertEquals("db2", update.findAuthzObjectExactMatch(new
+ String[]{"db2"}));
+ Assert.assertEquals("db2.tab21", update.findAuthzObjectExactMatch(new
+ String[]{"db2", "tab21"}));
+ Assert.assertEquals("db3", update.findAuthzObjectExactMatch(new
+ String[]{"db3"}));
+ Assert.assertEquals("db3.tab31", update.findAuthzObjectExactMatch(new
+ String[]{"db3", "tab31"}));
+ Assert.assertEquals("db3.tab31", update.findAuthzObjectExactMatch(new
+ String[]{"db3", "tab31", "part311"}));
+ Assert.assertEquals("db3.tab31", update.findAuthzObjectExactMatch(new
+ String[]{"db3", "tab31", "part312"}));
+ cacheInitializer.close();
+
+ }
+}