You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ha...@apache.org on 2016/12/13 00:06:30 UTC
sentry git commit: SENTRY-1508: MetastorePlugin.java does not handle
properly initialization failure (Vadim Spector,
Reviewed by: Sravya Tirukkovalur, Alexander Kolbasov and Hao Hao)
Repository: sentry
Updated Branches:
refs/heads/master ff623a944 -> b479df4ba
SENTRY-1508: MetastorePlugin.java does not handle properly initialization failure (Vadim Spector, Reviewed by: Sravya Tirukkovalur, Alexander Kolbasov and Hao Hao)
Change-Id: I95c00a92257553da56ee1cae4ae5c8f8d04a2409
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/b479df4b
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/b479df4b
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/b479df4b
Branch: refs/heads/master
Commit: b479df4ba383a8920661b1b20d086e40a8ac2e1c
Parents: ff623a9
Author: hahao <ha...@cloudera.com>
Authored: Mon Dec 12 16:05:11 2016 -0800
Committer: hahao <ha...@cloudera.com>
Committed: Mon Dec 12 16:05:11 2016 -0800
----------------------------------------------------------------------
.../apache/sentry/hdfs/ServiceConstants.java | 2 -
.../org/apache/sentry/hdfs/MetastorePlugin.java | 634 +++++++++++++------
.../sentry/hdfs/MetastorePluginWithHA.java | 2 +-
3 files changed, 442 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/b479df4b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
index 23552c2..cf94785 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
@@ -59,8 +59,6 @@ public class ServiceConstants {
public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT = 1000;
public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE = "sentry.hdfs.sync.metastore.cache.fail.on.partial.update";
public static final boolean SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT = true;
- public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE = "sentry.hdfs.sync.metastore.cache.async-init.enable";
- public static final boolean SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE_DEFAULT = false;
public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC = "sentry.hdfs.sync.metastore.cache.max-partitions-per-rpc";
public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT = 100;
http://git-wip-us.apache.org/repos/asf/sentry/blob/b479df4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
index 085971b..f6661fd 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
@@ -17,17 +17,11 @@
*/
package org.apache.sentry.hdfs;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -42,87 +36,211 @@ import org.apache.sentry.provider.db.SentryMetastoreListenerPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
- * Plugin implementation of {@link SentryMetastoreListenerPlugin} that hooks
- * into the sites in the {@link MetaStorePreEventListener} that deal with
- * creation/updation and deletion for paths.
+ * Plugin for the components that need to send path creation, update, and deletion
+ * notifications to the Sentry daemon.
+ *
+ * <p>
+ * Implements {@link SentryMetastoreListenerPlugin} that hooks
+ * into the sites in the {@link MetaStorePreEventListener}.
+ *
+ * <p>
+ * Implementation Notes:
+ *
+ * <ol>
+ * <li>MetastorePlugin performs the following functions:
+ *
+ * <ul>
+ * <li> At the construction time:
+ * <ul>
+ * <li> Initializes local HMS cache with HMS paths information.
+ * <li> Sends initial HMS paths information to the Sentry daemon.
+ * </ul>
+ * </li>
+ * <li> Upon receiving path update notification from the hosting client code, via addPath(),
+ * removePath(), removeAllPaths(), and renameAuthzObject() callback methods:
+ * <ul>
+ * <li> Updates local HMS cache accordingly.
+ * <li> Sends partial update with the assigned sequence number to the Sentry daemon.
+ * <li> Maintains the latest Sentry partial update sequence number, incrementing it by 1 on each update.
+ * </ul>
+ * </li>
+ * <li> Periodically, from the housekeeping thread:
+ * <ul>
+ * <li> Contacts the Sentry daemon to ask for the sequence number of the latest received update.
+ * <li> If the sequence number returned by the Sentry daemon does not match the sequence number of the
+ * latest update sent from MetastorePlugin, send the full HMS paths image to the Sentry daemon.
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * <p>
+ * <li>MetastorePlugin must be a singleton.<br>
+ * Only a single instance of MetastorePlugin can be used. MetastorePlugin has HMS cache
+ * that is updated via calling addPath(), removePath(), removeAllPaths(), renameAuthzObject().
+ * This cache must represent full HMS state at any point, so that full updates, when they are
+ * needed, would be correct. Channelling different update requests through different MetastorePlugin
+ * instances would make those caches partial and mutually inconsistent.
+ *
+ * <p>
+ * <li>MetastorePlugin is always created, even though ininitialization may fail.<br>
+ * MetastorePlugin initialization (object construction) may fail for two reasons:
+ * <ul>
+ * <li> HMS cache cannot be initialized, usually due to some invalid HMS path entries.
+ * <li> Initial cache cannot be sent to Sentry, e.g. due to the communication problems.
+ * </ul>
+ *
+ * <p>
+ * In either case, MetastorePlugin is still constructed, in consideration with the design of
+ * the existing client code. However, such an instance is marked as invalid; all update APIs
+ * throw IllegalStateException with the appropriate error message and root cause exception.
+ * <br>TODO: failing to construct MetastorePlugin on initialization failure would be much cleaner,
+ * but it has to be done in coordination with the HMS client code.
+ *
+ * <p>
+ * <li>MetastorePlugin guarantees delivery of HMS paths updates to Sentry daemon in the right order.<br>
+ * Each invocation of addPath(), removePath(), removeAllPaths(), renameAuthzObject()
+ * triggers two actions:
+ * <ul>
+ * <li> increment update sequence number and update the local cache and
+ * <li> send partial update to the Sentry daemon.
+ * </ul>
+ *
+ * <p>
+ * Update sequence number is created at first step, and then it travels as part of the update information,
+ * to the Sentry daemon on the second step. Therefore, the sequence of both steps must be
+ * atomic, to guarantee that updates arrive to the Sentry daemon in the right order,
+ * with sequential update number. This is achieved by using notificationLock. The same lock is used
+ * inside the SyncTask during full Sentry update, when the local and Sentry-side update sequence
+ * numbers are out of sync.
+ *
+ * <p>
+ * <li>MetastorePlugin validates input paths.<br>
+ * Parsing malformed input paths generates SentryMalformedPathException. Since this is a checked
+ * exception, it is re-thrown wrapped into (un-checked) IllegalArgumentException, to preserve
+ * public APIs' signatures.
+ *
+ * </ol>
*/
+
public class MetastorePlugin extends SentryMetastoreListenerPlugin {
private static final Logger LOGGER = LoggerFactory.getLogger(MetastorePlugin.class);
- private static final String initializationFailureMsg = "Cache failed to initialize, cannot send path updates to Sentry." +
- " Please review HMS error logs during startup for additional information. If the initialization failure is due" +
- " to SentryMalformedPathException, you will need to rectify the malformed path in HMS db and restart HMS";
+ /* MetastorePlugin initialization may fail for two different reasons:
+ * a) Failure to initialize HMS paths cache.
+ * b) Failure to send the initial HMS paths to the Sentry daemon.
+ * Each of the two messages below conveys the reason.
+ */
+ private static final String CACHE_INIT_FAILURE_MSG =
+ "Cache failed to initialize, cannot send path updates to Sentry." +
+ " Please review HMS error logs during startup for additional information. If the initialization failure is due" +
+ " to SentryMalformedPathException, you will need to rectify the malformed path in HMS db and restart HMS";
+ private static final String SENTRY_INIT_UPDATE_FAILURE_MSG =
+ "Metastore Plugin failed to initialize - cannot send initial HMS updates to Sentry";
+
+ private static final String SENTRY_COMM_FAILURE_MSG = "Cannot Communicate with Sentry";
+ private final Configuration conf;
+ private final Configuration sentryConf;
+
+ // guard for all local+Sentry notifications
+ private final ReentrantLock notificationLock = new ReentrantLock();
+ // sentryClient may be re-instantiated in case of suspected communication failure
+ // This code ensures that access to sentryClient is protected by notificationLock
+ private SentryHDFSServiceClient sentryClient;
+ // Has to match the value of seqNum
+ // This code ensures that access to lastSentSeqNum is protected by notificationLock
+ protected long lastSentSeqNum;
+
+ // pathUpdateLock guards access to UpdateableAuthzPaths which is not thread-safe
+ private final ReentrantReadWriteLock pathUpdateLock = new ReentrantReadWriteLock();
+ // access to authzPaths must be protected by pathUpdateLock
+ private final UpdateableAuthzPaths authzPaths;
+
+ // Initialized to some value > 1.
+ protected final AtomicLong seqNum = new AtomicLong(5);
+ private final Throwable initError;
+ private final String initErrorMsg;
+ private final ScheduledExecutorService threadPool; //NOPMD
+
+ private static volatile ScheduledExecutorService lastThreadPool = null;
+
+ /*
+ * This task is scheduled to run periodically, to make sure Sentry has all updates
+ * -- only if MetastorePlugin has been successfully initialized.
+ */
class SyncTask implements Runnable {
@Override
public void run() {
- if (!notificiationLock.tryLock()) {
+ if (!notificationLock.tryLock()) {
// No need to sync.. as metastore is in the process of pushing an update..
return;
}
- if (MetastorePlugin.this.authzPaths == null) {
- LOGGER.warn(initializationFailureMsg);
- return;
- }
try {
- long lastSeenBySentry =
- MetastorePlugin.this.getClient().getLastSeenHMSPathSeqNum();
+ long lastSeenBySentry = getLastSeenHMSPathSeqNum();
long lastSent = lastSentSeqNum;
if (lastSeenBySentry != lastSent) {
LOGGER.warn("#### Sentry not in sync with HMS [" + lastSeenBySentry + ", "
+ lastSent + "]");
- PathsUpdate fullImageUpdate =
- MetastorePlugin.this.authzPaths.createFullImageUpdate(lastSent);
- notifySentryNoLock(fullImageUpdate);
- LOGGER.warn("#### Synced Sentry with update [" + lastSent + "]");
+ notifySentryFullUpdate(lastSent);
}
- } catch (Exception e) {
- sentryClient = null;
- LOGGER.error("Error talking to Sentry HDFS Service !!", e);
+ } catch (Exception ignore) {
+ // all methods inside try {} log errors anyway
} finally {
- syncSent = true;
- notificiationLock.unlock();
+ notificationLock.unlock();
}
}
}
- private final Configuration conf;
- private SentryHDFSServiceClient sentryClient;
- private volatile UpdateableAuthzPaths authzPaths;
- private Lock notificiationLock;
-
- // Initialized to some value > 1.
- protected static final AtomicLong seqNum = new AtomicLong(5);
-
- // Has to match the value of seqNum
- protected static volatile long lastSentSeqNum = seqNum.get();
- private volatile boolean syncSent = false;
- private volatile boolean initComplete = false;
- private volatile boolean queueFlushComplete = false;
- private volatile Throwable initError = null;
- private final Queue<PathsUpdate> updateQueue = new LinkedList<PathsUpdate>();
-
- private final ExecutorService threadPool; //NOPMD
- private final Configuration sentryConf;
-
+ /*
+ * Proxy class for RPC calls to the Sentry daemon
+ */
static class ProxyHMSHandler extends HMSHandler {
public ProxyHMSHandler(String name, HiveConf conf) throws MetaException {
super(name, conf);
}
}
- public MetastorePlugin(Configuration conf, Configuration sentryConf) {
- this.notificiationLock = new ReentrantLock();
+ /*
+ * Test-only logic. Testing framework may create multiple MetastorePlugin
+ * instances in sequence, without explicitly shutting down the previous
+ * instance, which does not even have any shutdown API (obvious oversight).
+ * This results in multiple housekeeping thread pools, completely messing
+ * up HMS state on Sentry daemon.
+ * Previous thread pool must be shut down.
+ * In real deployments this code does nothing, because there is only one
+ * instance of MetastorePlugin.
+ */
+ private static synchronized void shutdownPreviousHousekeepingThreadPool() {
+ if (lastThreadPool != null) {
+ LOGGER.info("#### Metastore Plugin: shutting down previous housekeeping thread");
+ try {
+ lastThreadPool.shutdownNow();
+ } catch (Throwable t) {
+ LOGGER.error("#### Metastore Plugin: failure shutting down previous housekeeping thread", t);
+ }
+ lastThreadPool = null;
+ }
+ }
+ public MetastorePlugin(Configuration conf, Configuration sentryConf) {
+ Preconditions.checkNotNull(conf, "NULL Hive Configuration");
+ Preconditions.checkNotNull(sentryConf, "NULL Sentry Configuration");
if (!(conf instanceof HiveConf)) {
- String error = "Configuration is not an instanceof HiveConf";
+ String error = "Hive Configuration is not an instanceof HiveConf: " + conf.getClass().getName();
LOGGER.error(error);
- throw new RuntimeException(error);
+ throw new IllegalArgumentException(error);
}
+
+ /*
+ * Test-only logic. See javadoc for this method.
+ */
+ shutdownPreviousHousekeepingThreadPool();
+
this.conf = new HiveConf((HiveConf)conf);
this.sentryConf = new Configuration(sentryConf);
@@ -130,113 +248,156 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
this.conf.unset(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname);
this.conf.unset(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname);
this.conf.unset(HiveConf.ConfVars.METASTOREURIS.varname);
- Thread initUpdater = new Thread() {
- @Override
- public void run() {
- MetastoreCacheInitializer cacheInitializer = null;
- try {
- cacheInitializer =
- new MetastoreCacheInitializer(new ProxyHMSHandler("sentry.hdfs",
- (HiveConf) MetastorePlugin.this.conf),
- MetastorePlugin.this.conf);
- MetastorePlugin.this.authzPaths =
- cacheInitializer.createInitialUpdate();
- LOGGER.info("#### Metastore Plugin initialization complete !!");
- synchronized (updateQueue) {
- while (!updateQueue.isEmpty()) {
- PathsUpdate update = updateQueue.poll();
- if (update != null) {
- processUpdate(update);
- }
- }
- queueFlushComplete = true;
- }
- LOGGER.info("#### Finished flushing queued updates to Sentry !!");
- } catch (Exception e) {
- LOGGER.error("#### Could not create Initial AuthzPaths or HMSHandler !!", e);
- initError = e;
- } finally {
- if (cacheInitializer != null) {
- try {
- cacheInitializer.close();
- } catch (Exception e) {
- LOGGER.info("#### Exception while closing cacheInitializer !!", e);
- }
- }
- initComplete = true;
- }
+
+ Throwable tmpInitError = null;
+ String tmpInitErrorMsg = null;
+
+ /* Initialization Step #1: initialize local HMS state cache.
+ * To preserve the contract with the existing Hive client code,
+ * MetastorePlugin shall be constructed even if initialization fails,
+ * though it will be completely unoperable.
+ */
+ UpdateableAuthzPaths tmpAuthzPaths;
+ try (MetastoreCacheInitializer cacheInitializer = new MetastoreCacheInitializer(
+ new ProxyHMSHandler("sentry.hdfs", (HiveConf) this.conf),
+ this.conf))
+ {
+ // initialize HMS cache.
+ tmpAuthzPaths = cacheInitializer.createInitialUpdate();
+ LOGGER.info("#### Metastore Plugin HMS cache initialization complete");
+ } catch (Throwable e) {
+ tmpInitError = e;
+ tmpInitErrorMsg = CACHE_INIT_FAILURE_MSG;
+ tmpAuthzPaths = null;
+ LOGGER.error("#### " + tmpInitErrorMsg, e);
+ for (Throwable thr : e.getSuppressed()) {
+ LOGGER.warn("#### Exception while closing cacheInitializer", thr);
}
- };
- if (this.conf.getBoolean(
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE,
- ServerConfig
- .SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE_DEFAULT)) {
- LOGGER.warn("#### Metastore Cache initialization is set to aync..." +
- "HDFS ACL synchronization will not happen until metastore" +
- "cache initialization is completed !!");
- initUpdater.start();
- } else {
- initUpdater.run(); //NOPMD
}
+ this.authzPaths = tmpAuthzPaths;
+
+ /* If HMS cache initialization failed, further initialization shall be skipped.
+ * MetastorePlugin is considered non-operational, and all of its public APIs
+ * shall be throwing an exception.
+ */
+ if (tmpInitError != null) {
+ this.threadPool = null;
+ this.initError = tmpInitError;
+ this.initErrorMsg = tmpInitErrorMsg;
+ return;
+ }
+
+ /* Initialization Step #2: push initial HMS state to Sentry.
+ * Synchronization by notificationLock is for visibility of changes to sentryClient.
+ */
+ notificationLock.lock();
try {
- sentryClient = SentryHDFSServiceClientFactory.create(sentryConf);
- } catch (Exception e) {
- sentryClient = null;
- LOGGER.error("Could not connect to Sentry HDFS Service !!", e);
+ this.lastSentSeqNum = seqNum.get();
+ notifySentryFullUpdate(lastSentSeqNum);
+ LOGGER.info("#### Metastore Plugin Sentry full initial update complete");
+ } catch (Throwable e) {
+ tmpInitError = e;
+ tmpInitErrorMsg = SENTRY_INIT_UPDATE_FAILURE_MSG;
+ LOGGER.error("#### " + tmpInitErrorMsg, e);
+ } finally {
+ notificationLock.unlock();
+ }
+
+ this.initError = tmpInitError;
+ this.initErrorMsg = tmpInitErrorMsg;
+
+ /* If sending HMS state to Sentry failed, further initialization shall be skipped.
+ * MetastorePlugin is considered non-operational, and all of its public APIs
+ * shall be throwing an exception.
+ */
+ if (this.initError != null) {
+ this.threadPool = null;
+ return;
}
- ScheduledExecutorService newThreadPool = Executors.newScheduledThreadPool(1);
- newThreadPool.scheduleWithFixedDelay(new SyncTask(),
- this.conf.getLong(ServerConfig
- .SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
- ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT),
- this.conf.getLong(ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_MS,
- ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_DEFAULT),
- TimeUnit.MILLISECONDS);
- this.threadPool = newThreadPool;
+
+ /* Initialization Step #3: schedulle SyncTask to run periodically, to make
+ * sure Sentry has the current HMS state.
+ */
+ this.threadPool = Executors.newScheduledThreadPool(1);
+ this.threadPool.scheduleWithFixedDelay(new SyncTask(),
+ this.conf.getLong(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
+ ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT),
+ this.conf.getLong(ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_MS,
+ ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_DEFAULT),
+ TimeUnit.MILLISECONDS);
+ MetastorePlugin.lastThreadPool = this.threadPool;
+ LOGGER.info("#### Metastore Plugin Sentry initialization complete");
}
@Override
public void addPath(String authzObj, String path) {
+ assertInit();
+
+ // validate / parse inputs
List<String> pathTree = null;
try {
pathTree = PathsUpdate.parsePath(path);
} catch (SentryMalformedPathException e) {
- LOGGER.error("Unexpected path in addPath: authzObj = " + authzObj + " , path = " + path);
- e.printStackTrace();
- return;
+ String err = "Unexpected path in addPath: authzObj = " + authzObj + " , path = " + path;
+ LOGGER.error(err, e);
+ throw new IllegalArgumentException(err, e);
}
if(pathTree == null) {
+ LOGGER.debug("#### HMS Path Update ["
+ + "OP : addPath, "
+ + "authzObj : " + authzObj.toLowerCase() + ", "
+ + "path : " + path + "] - nothing to add");
return;
}
LOGGER.debug("#### HMS Path Update ["
+ "OP : addPath, "
+ "authzObj : " + authzObj.toLowerCase() + ", "
+ "path : " + path + "]");
- PathsUpdate update = createHMSUpdate();
- update.newPathChange(authzObj.toLowerCase()).addToAddPaths(pathTree);
- notifySentryAndApplyLocal(update);
+
+ // do local and remote updates
+ notificationLock.lock();
+ try {
+ PathsUpdate update = createHMSUpdate();
+ update.newPathChange(authzObj.toLowerCase()).addToAddPaths(pathTree);
+ updateLocalCacheAndNotifySentry(update);
+ } finally {
+ notificationLock.unlock();
+ }
}
@Override
public void removeAllPaths(String authzObj, List<String> childObjects) {
+ assertInit();
+
+ // validate / parse inputs
LOGGER.debug("#### HMS Path Update ["
+ "OP : removeAllPaths, "
+ "authzObj : " + authzObj.toLowerCase() + ", "
+ "childObjs : " + (childObjects == null ? "[]" : childObjects) + "]");
- PathsUpdate update = createHMSUpdate();
- if (childObjects != null) {
- for (String childObj : childObjects) {
- update.newPathChange(authzObj.toLowerCase() + "." + childObj).addToDelPaths(
+
+ // do local and remote updates
+ notificationLock.lock();
+ try {
+ PathsUpdate update = createHMSUpdate();
+ if (childObjects != null) {
+ for (String childObj : childObjects) {
+ update.newPathChange(authzObj.toLowerCase() + "." + childObj).addToDelPaths(
Lists.newArrayList(PathsUpdate.ALL_PATHS));
+ }
}
- }
- update.newPathChange(authzObj.toLowerCase()).addToDelPaths(
+ update.newPathChange(authzObj.toLowerCase()).addToDelPaths(
Lists.newArrayList(PathsUpdate.ALL_PATHS));
- notifySentryAndApplyLocal(update);
+ updateLocalCacheAndNotifySentry(update);
+ } finally {
+ notificationLock.unlock();
+ }
}
@Override
public void removePath(String authzObj, String path) {
+ assertInit();
+
+ // validate / parse inputs
if ("*".equals(path)) {
removeAllPaths(authzObj.toLowerCase(), null);
} else {
@@ -244,154 +405,241 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
try {
pathTree = PathsUpdate.parsePath(path);
} catch (SentryMalformedPathException e) {
- LOGGER.error("Unexpected path in removePath: authzObj = " + authzObj + " , path = " + path);
- e.printStackTrace();
- return;
+ String err = "Unexpected path in removePath: authzObj = " + authzObj + " , path = " + path;
+ LOGGER.error(err, e);
+ throw new IllegalArgumentException(err, e);
}
if(pathTree == null) {
+ LOGGER.debug("#### HMS Path Update ["
+ + "OP : removePath, "
+ + "authzObj : " + authzObj.toLowerCase() + ", "
+ + "path : " + path + "] - nothing to remove");
return;
}
LOGGER.debug("#### HMS Path Update ["
+ "OP : removePath, "
+ "authzObj : " + authzObj.toLowerCase() + ", "
+ "path : " + path + "]");
- PathsUpdate update = createHMSUpdate();
- update.newPathChange(authzObj.toLowerCase()).addToDelPaths(pathTree);
- notifySentryAndApplyLocal(update);
+
+ // do local and remote updates
+ notificationLock.lock();
+ try {
+ PathsUpdate update = createHMSUpdate();
+ update.newPathChange(authzObj.toLowerCase()).addToDelPaths(pathTree);
+ updateLocalCacheAndNotifySentry(update);
+ } finally {
+ notificationLock.unlock();
+ }
}
}
@Override
public void renameAuthzObject(String oldName, String oldPath, String newName,
String newPath) {
+ assertInit();
+
+ // validate / parse inputs
String oldNameLC = oldName != null ? oldName.toLowerCase() : null;
String newNameLC = newName != null ? newName.toLowerCase() : null;
- PathsUpdate update = createHMSUpdate();
LOGGER.debug("#### HMS Path Update ["
+ "OP : renameAuthzObject, "
- + "oldName : " + oldNameLC + ","
- + "oldPath : " + oldPath + ","
- + "newName : " + newNameLC + ","
- + "newPath : " + newPath + "]");
+ + "oldName : " + oldNameLC + ", "
+ + "oldPath : " + oldPath + ", "
+ + "newName : " + newNameLC + ", "
+ + "newPath : " + newPath + "]");
List<String> newPathTree = null;
try {
newPathTree = PathsUpdate.parsePath(newPath);
} catch (SentryMalformedPathException e) {
- LOGGER.error("Unexpected path in renameAuthzObject while parsing newPath: oldName=" + oldName + ", oldPath=" + oldPath +
- ", newName=" + newName + ", newPath=" + newPath);
- e.printStackTrace();
- return;
- }
-
- if( newPathTree != null ) {
- update.newPathChange(newNameLC).addToAddPaths(newPathTree);
+ String err = "Unexpected path in renameAuthzObject while parsing newPath: oldName=" + oldName + ", oldPath=" + oldPath +
+ ", newName=" + newName + ", newPath=" + newPath;
+ LOGGER.error(err, e);
+ throw new IllegalArgumentException(err, e);
}
List<String> oldPathTree = null;
try {
oldPathTree = PathsUpdate.parsePath(oldPath);
} catch (SentryMalformedPathException e) {
- LOGGER.error("Unexpected path in renameAuthzObject while parsing oldPath: oldName=" + oldName + ", oldPath=" + oldPath +
- ", newName=" + newName + ", newPath=" + newPath);
- e.printStackTrace();
- return;
+ String err = "Unexpected path in renameAuthzObject while parsing oldPath: oldName=" + oldName + ", oldPath=" + oldPath +
+ ", newName=" + newName + ", newPath=" + newPath;
+ LOGGER.error(err, e);
+ throw new IllegalArgumentException(err, e);
}
- if( oldPathTree != null ) {
- update.newPathChange(oldNameLC).addToDelPaths(oldPathTree);
+ // do local and remote updates
+ notificationLock.lock();
+ try {
+ PathsUpdate update = createHMSUpdate();
+ if( newPathTree != null ) {
+ update.newPathChange(newNameLC).addToAddPaths(newPathTree);
+ }
+ if( oldPathTree != null ) {
+ update.newPathChange(oldNameLC).addToDelPaths(oldPathTree);
+ }
+ updateLocalCacheAndNotifySentry(update);
+ } finally {
+ notificationLock.unlock();
}
- notifySentryAndApplyLocal(update);
}
- private SentryHDFSServiceClient getClient() {
+ /*
+ * Instantiate client (unless it's already instantiated) to talk to Sentry service.
+ * Call must be protected by notificationLock.
+ */
+ private SentryHDFSServiceClient getClient() throws Exception {
+ assert notificationLock.isHeldByCurrentThread() : "Internal Faulure: access to Sentry client is nt protected by notificationLock";
if (sentryClient == null) {
try {
sentryClient = SentryHDFSServiceClientFactory.create(sentryConf);
} catch (Exception e) {
sentryClient = null;
- LOGGER.error("#### Could not connect to Sentry HDFS Service !!", e);
+ final String err = SENTRY_COMM_FAILURE_MSG;
+ LOGGER.error(err, e);
+ throw new Exception(err, e);
}
}
return sentryClient;
}
+ /*
+ * Initialize HMS update object and assign its sequence number.
+ * Call must be protected by notificationLock.
+ */
private PathsUpdate createHMSUpdate() {
PathsUpdate update = new PathsUpdate(seqNum.incrementAndGet(), false);
LOGGER.debug("#### Creating HMS Path Update SeqNum : [" + seqNum.get() + "]");
return update;
}
- protected void notifySentryNoLock(PathsUpdate update) {
+ /*
+ * Get the last seen HMS path update sequence number from Sentry service.
+ * Call must be protected by notificationLock.
+ */
+ private long getLastSeenHMSPathSeqNum() throws Exception {
+ try {
+ return getClient().getLastSeenHMSPathSeqNum();
+ } catch (Exception e) {
+ final String err = "Could not fetch the last seen HMS Path Sequence number from Sentry HDFS Service";
+ LOGGER.error(err, e);
+ resetClient();
+ throw e;
+ }
+ }
+
+ /*
+ * Send update to Sentry service.
+ * This method, when called from notifySentry(), is followed by updating lastSentSeqNumber.
+ * When called directly, to send full updates (i.e. during initialization and from SyncTask),
+ * the update sequence number does not change.
+ * Call must be protected by notificationLock.
+ */
+ private void notifySentry_NoSeqNumIncr(PathsUpdate update) {
final Timer.Context timerContext =
SentryHdfsMetricsUtil.getNotifyHMSUpdateTimer.time();
try {
getClient().notifyHMSUpdate(update);
} catch (Exception e) {
- LOGGER.error("Could not send update to Sentry HDFS Service !!", e);
+ final String err = "Could not send update to Sentry HDFS Service";
+ LOGGER.error(err, e);
+ resetClient();
SentryHdfsMetricsUtil.getFailedNotifyHMSUpdateCounter.inc();
+ throw new RuntimeException(err, e);
} finally {
timerContext.stop();
}
}
+ /**
+ * Send update to Sentry service and update last sent sequence number.
+ * Called only if MetastorePlugin has been successfully initialized.
+ * Call must be protected by notificationLock.
+ */
protected void notifySentry(PathsUpdate update) {
- notificiationLock.lock();
try {
- if (!syncSent) {
- new SyncTask().run();
- }
-
- notifySentryNoLock(update);
+ notifySentry_NoSeqNumIncr(update);
} finally {
lastSentSeqNum = update.getSeqNum();
- notificiationLock.unlock();
LOGGER.debug("#### HMS Path Last update sent : ["+ lastSentSeqNum + "]");
}
}
+ /*
+ * Send full update to Sentry service.
+ * Called only if MetastorePlugin has been successfully initialized.
+ * Call must be protected by notificationLock.
+ */
+ private void notifySentryFullUpdate(long lastSent) {
+ PathsUpdate fullImageUpdate = null;
+ // access to authzPaths should be consistently protected by pathUpdateLock
+ pathUpdateLock.readLock().lock();
+ try {
+ fullImageUpdate = authzPaths.createFullImageUpdate(lastSent);
+ } finally {
+ pathUpdateLock.readLock().unlock();
+ }
+ notifySentry_NoSeqNumIncr(fullImageUpdate);
+ LOGGER.warn("#### Synced Sentry with update [" + lastSent + "]");
+ }
+
+ /*
+ * When suspecting sentryClient comm error - reset the client
+ * Call must be protected by notificationLock.
+ */
+ private void resetClient() {
+ if (sentryClient != null) {
+ try {
+ sentryClient.close();
+ } catch (Exception ignore) {
+ }
+ sentryClient = null;
+ }
+ }
+
+ /**
+ * Apply paths update to local cache.
+ * Called only if MetastorePlugin has been successfully initialized.
+ * Call must be protected by notificationLock.
+ */
protected void applyLocal(PathsUpdate update) {
final Timer.Context timerContext =
SentryHdfsMetricsUtil.getApplyLocalUpdateTimer.time();
- if(authzPaths == null) {
- LOGGER.error(initializationFailureMsg);
- return;
+ try {
+ authzPaths.updatePartial(Lists.newArrayList(update), pathUpdateLock);
+ } finally {
+ timerContext.stop();
}
- authzPaths.updatePartial(Lists.newArrayList(update), new ReentrantReadWriteLock());
- timerContext.stop();
SentryHdfsMetricsUtil.getApplyLocalUpdateHistogram.update(
update.getPathChanges().size());
}
- private void notifySentryAndApplyLocal(PathsUpdate update) {
- if(authzPaths == null) {
- LOGGER.error(initializationFailureMsg);
- return;
- }
- if (initComplete) {
- processUpdate(update);
- } else {
- if (initError == null) {
- synchronized (updateQueue) {
- if (!queueFlushComplete) {
- updateQueue.add(update);
- } else {
- processUpdate(update);
- }
- }
- } else {
- StringWriter sw = new StringWriter();
- initError.printStackTrace(new PrintWriter(sw));
- LOGGER.error("#### Error initializing Metastore Plugin" +
- "[" + sw.toString() + "] !!");
- throw new RuntimeException(initError);
- }
- LOGGER.warn("#### Path update [" + update.getSeqNum() + "] not sent to Sentry.." +
- "Metastore hasn't been initialized yet !!");
- }
+ /*
+ * Apply paths update to local cache.
+ * Send partial update to Sentry service.
+ * Called only if MetastorePlugin has been successfully initialized.
+ * Call must be protected by notificationLock.
+ */
+ private void updateLocalCacheAndNotifySentry(PathsUpdate update) {
+ applyLocal(update);
+ notifySentry(update);
}
+ /**
+ * Apply paths update to local cache and send partial update to Sentry.
+ * Called only if MetastorePlugin has been successfully initialized.
+ * Call must be protected by notificationLock.
+ */
protected void processUpdate(PathsUpdate update) {
- applyLocal(update);
- notifySentry(update);
+ updateLocalCacheAndNotifySentry(update);
+ }
+
+ /*
+ * Check successfull initialization first, in each update callback method.
+ * Null initError guarantees successful initialization.
+ */
+ private void assertInit() {
+ if (initError != null) {
+ throw new IllegalStateException(initErrorMsg, initError);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/b479df4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
index 6476a01..32b635f 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
@@ -75,7 +75,7 @@ public class MetastorePluginWithHA extends MetastorePlugin {
new SentryMetastoreHACacheListener(this));
// start seq# from the last global seq
seqNum.set(pluginCacheSync.getUpdateCounter());
- MetastorePlugin.lastSentSeqNum = seqNum.get();
+ this.lastSentSeqNum = seqNum.get();
}
@Override