You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by sp...@apache.org on 2018/01/18 15:59:26 UTC
[3/4] sentry git commit: SENTRY-1819: HMSFollower and friends do not
belong in sentry.service.thrift (Xinran Tinney, reviewed by Sergio Pena,
kalyan kumar kalvagadda, Na Li, Steve Moist)
http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
deleted file mode 100644
index 558e695..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.service.thrift;
-
-import org.apache.http.annotation.ThreadSafe;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Waiting for counter to reach certain value.
- * The counter starts from zero and its value increases over time.
- * The class allows for multiple consumers waiting until the value of the
- * counter reaches some value interesting to them.
- * Consumers call {@link #waitFor(long)} which may either return
- * immediately if the counter reached the specified value, or block
- * until this value is reached. Consumers can also specify timeout for the
- * {@link #waitFor(long)} in which case it may return {@link TimeoutException}
- * when the wait was not successfull within the specified time limit.
- * <p>
- * All waiters should be waken up when the counter becomes equal or higher
- * then the value they are waiting for.
- * <p>
- * The counter is updated by a single updater that should only increase the
- * counter value.
- * The updater calls the {@link #update(long)} method to update the counter
- * value and this should wake up all threads waiting for any value smaller or
- * equal to the new one.
- * <p>
- * The class is thread-safe.
- * It is designed for use by multiple waiter threads and a single
- * updater thread, but it will work correctly even in the presence of multiple
- * updater threads.
- */
-@ThreadSafe
-public final class CounterWait {
- // Implementation notes.
- //
- // The implementation is based on:
- //
- // 1) Using an atomic counter value which guarantees consistency.
- // Since everyone needs only to know when the counter value reached the
- // certain value and the counter may only increase its value,
- // it is safe to update the counter by another thread after its value
- // was read.
- //
- // 2) Priority queue of waiters, sorted by their expected values. The smallest
- // value is always at the top of the queue. The priority queue itself
- // is thread-safe, so no locks are needed to protect access to it.
- //
- // Each waiter is implemented using a binary semaphore.
- // This solves the problem of a wakeup that happens before the sleep -
- // in this case the acquire() doesn't block and returns immediately.
- //
- // NOTE: We use PriorityBlockingQueue for waiters because it is thread-safe,
- // we are not using its blocking queue semantics.
-
- private static final Logger LOGGER = LoggerFactory.getLogger(CounterWait.class);
-
- /** Counter value. May only increase. */
- private final AtomicLong currentId = new AtomicLong(0);
-
- private final long waitTimeout;
- private final TimeUnit waitTimeUnit;
-
- /**
- * Waiters sorted by the value of the counter they are waiting for.
- * Note that {@link PriorityBlockingQueue} is thread-safe.
- * We are not using this as a blocking queue, but as a synchronized
- * PriorityQueue.
- */
- private final PriorityBlockingQueue<ValueEvent> waiters =
- new PriorityBlockingQueue<>();
-
- /**
- * Create an instance of CounterWait object that will not timeout during wait
- */
- public CounterWait() {
- this(0, TimeUnit.SECONDS);
- }
-
- /**
- * Create an instance of CounterWait object that will timeout during wait
- * @param waitTimeout maximum time in seconds to wait for counter
- */
- public CounterWait(long waitTimeoutSec) {
- this(waitTimeoutSec, TimeUnit.SECONDS);
- }
-
- /**
- * Create an instance of CounterWait object that will timeout during wait
- * @param waitTimeout maximum time to wait for counter
- * @param waitTimeUnit time units for wait
- */
- public CounterWait(long waitTimeout, TimeUnit waitTimeUnit) {
- this.waitTimeout = waitTimeout;
- this.waitTimeUnit = waitTimeUnit;
- }
-
- /**
- * Update the counter value and wake up all threads waiting for this
- * value or any value below it.
- * <p>
- * The counter value should only increase.
- * An attempt to decrease the value is ignored.
- *
- * @param newValue the new counter value
- */
- public synchronized void update(long newValue) {
- // update() is synchronized so the value can't change.
- long oldValue = currentId.get();
- LOGGER.debug("CounterWait update: oldValue = {}, newValue = {}", oldValue, newValue);
- // Avoid doing extra work if not needed
- if (oldValue == newValue) {
- return; // no-op
- }
-
- // Make sure the counter is never decremented.
- if (newValue < oldValue) {
- LOGGER.error("new counter value {} is smaller then the previous one {}",
- newValue, oldValue);
- return; // no-op
- }
-
- currentId.set(newValue);
-
- // Wake up any threads waiting for a counter to reach this value.
- wakeup(newValue);
- }
-
- /**
- * Explicitly reset the counter value to a new value, but allow setting to a
- * smaller value.
- * This should be used when we have some external event that resets the counter
- * value space.
- * @param newValue New counter value. If this is greater or equal then the current
- * value, this is equivalent to {@link #update(long)}. Otherwise
- * sets the counter to the new smaller value.
- */
- public synchronized void reset(long newValue) {
- long oldValue = currentId.get();
- LOGGER.debug("CounterWait reset: oldValue = {}, newValue = {}", oldValue, newValue);
-
- if (newValue > oldValue) {
- update(newValue);
- } else if (newValue < oldValue) {
- LOGGER.warn("resetting counter from {} to smaller value {}",
- oldValue, newValue);
- currentId.set(newValue);
- // No need to wakeup waiters since no one should wait on the smaller value
- }
- }
-
-
- /**
- * Wait for specified counter value.
- * Returns immediately if the value is reached or blocks until the value
- * is reached.
- * Multiple threads can call the method concurrently.
- *
- * @param value requested counter value
- * @return current counter value that should be no smaller then the requested
- * value
- * @throws InterruptedException if the wait was interrupted, TimeoutException if
- * wait was not successfull within the timeout value specified at the construction time.
- */
- public long waitFor(long value) throws InterruptedException, TimeoutException {
- // Fast path - counter value already reached, no need to block
- if (value <= currentId.get()) {
- return currentId.get();
- }
-
- // Enqueue the waiter for this value
- ValueEvent eid = new ValueEvent(value);
- waiters.put(eid);
-
- // It is possible that between the fast path check and the time the
- // value event is enqueued, the counter value already reached the requested
- // value. In this case we return immediately.
- if (value <= currentId.get()) {
- return currentId.get();
- }
-
- // At this point we may be sure that by the time the event was enqueued,
- // the counter was below the requested value. This means that update()
- // is guaranteed to wake us up when the counter reaches the requested value.
- // The wake up may actually happen before we start waiting, in this case
- // the event's blocking queue will be non-empty and the waitFor() below
- // will not block, so it is safe to wake up before the wait.
- // So sit tight and wait patiently.
- eid.waitFor();
- LOGGER.debug("CounterWait added new value to waitFor: value = {}, currentId = {}", value, currentId.get());
- return currentId.get();
- }
-
- /**
- * Wake up any threads waiting for a counter to reach specified value
- * Peek at the top of the queue. If the queue is empty or the top value
- * exceeds the current value, we are done. Otherwise wakeup the top thread,
- * remove the corresponding waiter and continue.
- * <p>
- * Note that the waiter may be removed under our nose by
- * {@link #waitFor(long)} method, but this is Ok - in this case
- * waiters.remove() will just return false.
- *
- * @param value current counter value
- */
- private void wakeup(long value) {
- while (true) {
- // Get the top of the waiters queue or null if it is empty
- ValueEvent e = waiters.poll();
- if (e == null) {
- // Queue is empty - return.
- return;
- }
- // No one to wake up, return event to the queue and exit
- if (e.getValue() > value) {
- waiters.add(e);
- return;
- }
- // Due for wake-up call
- LOGGER.debug("CounterWait wakeup: event = {} is less than value = {}", e.getValue(), value);
- e.wakeup();
- }
- }
-
- // Useful for debugging
- @Override
- public String toString() {
- return "CounterWait{" + "currentId=" + currentId +
- ", waiters=" + waiters + "}";
- }
-
- /**
- * Return number of waiters. This is mostly useful for metrics/debugging
- *
- * @return number of sleeping waiters
- */
- public int waitersCount() {
- return waiters.size();
- }
-
- /**
- * Representation of the waiting event.
- * The waiting event consists of the expected value and a binary semaphore.
- * <p>
- * Each thread waiting for the given value, creates a ValueEvent and tries
- * to acquire a semaphore. This blocks until the semaphore is released.
- * <p>
- * ValueEvents are stored in priority queue sorted by value, so they should be
- * comparable by the value.
- */
- private class ValueEvent implements Comparable<ValueEvent> {
- /** Value waited for. */
- private final long value;
- /** Binary semaphore to synchronize waiters */
- private final Semaphore semaphore = new Semaphore(1);
-
- /**
- * Instantiates a new Value event.
- *
- * @param v the expected value
- */
- ValueEvent(long v) {
- this.value = v;
- // Acquire the semaphore. Subsequent calls to waitFor() will block until
- // wakeup() releases the semaphore.
- semaphore.acquireUninterruptibly(); // Will not block
- }
-
- /** Wait until signaled or interrupted. May return immediately if already signalled. */
- void waitFor() throws InterruptedException, TimeoutException {
- if (waitTimeout == 0) {
- semaphore.acquire();
- return;
- }
- if (!semaphore.tryAcquire(waitTimeout, waitTimeUnit)) {
- throw new TimeoutException();
- }
- }
-
- /** @return the value we are waiting for. */
- long getValue() {
- return value;
- }
-
- /** Wakeup the waiting thread. */
- void wakeup() {
- semaphore.release();
- }
-
- /**
- * Compare objects by value.
- */
- @Override
- public int compareTo(final ValueEvent o) {
- return value == o.value ? 0
- : value < o.value ? -1
- : 1;
- }
-
- /**
- * Use identity comparison of objects.
- */
- @Override
- public boolean equals(final Object o) {
- return (this == o);
- }
-
- @Override
- public int hashCode() {
- return (int) (value ^ (value >>> 32));
- }
-
- @Override
- public String toString() {
- return String.valueOf(value);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
index 86ff47e..7831430 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
@@ -26,11 +26,11 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
* It is only used to provide try-with-resource semantics for
* {@link HiveMetaStoreClient}.
*/
-class HMSClient implements AutoCloseable {
+public class HMSClient implements AutoCloseable {
private final HiveMetaStoreClient client;
private boolean valid;
- HMSClient(HiveMetaStoreClient client) {
+ public HMSClient(HiveMetaStoreClient client) {
this.client = Preconditions.checkNotNull(client);
valid = true;
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
deleted file mode 100644
index aa1b6a3..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-
-package org.apache.sentry.service.thrift;
-
-import org.apache.sentry.core.common.utils.PubSub;
-import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
-
-import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME;
-import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jdo.JDODataStoreException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.sentry.provider.db.service.persistent.PathsImage;
-import org.apache.sentry.provider.db.service.persistent.SentryStore;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * HMSFollower is the thread which follows the Hive MetaStore state changes from Sentry.
- * It gets the full update and notification logs from HMS and applies it to
- * update permissions stored in Sentry using SentryStore and also update the < obj,path > state
- * stored for HDFS-Sentry sync.
- */
-public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class);
- private static final String FULL_UPDATE_TRIGGER = "FULL UPDATE TRIGGER: ";
- private static boolean connectedToHms = false;
-
- private SentryHMSClient client;
- private final Configuration authzConf;
- private final SentryStore sentryStore;
- private final NotificationProcessor notificationProcessor;
- private boolean readyToServe;
- private final HiveNotificationFetcher notificationFetcher;
- private final boolean hdfsSyncEnabled;
- private final AtomicBoolean fullUpdateHMS = new AtomicBoolean(false);
-
- private final LeaderStatusMonitor leaderMonitor;
-
- /**
- * Current generation of HMS snapshots. HMSFollower is single-threaded, so no need
- * to protect against concurrent modification.
- */
- private long hmsImageId = SentryStore.EMPTY_PATHS_SNAPSHOT_ID;
-
- /**
- * Configuring Hms Follower thread.
- *
- * @param conf sentry configuration
- * @param store sentry store
- * @param leaderMonitor singleton instance of LeaderStatusMonitor
- */
- HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor,
- HiveConnectionFactory hiveConnectionFactory) {
- this(conf, store, leaderMonitor, hiveConnectionFactory, null);
- }
-
- /**
- * Constructor should be used only for testing purposes.
- *
- * @param conf sentry configuration
- * @param store sentry store
- * @param leaderMonitor
- * @param authServerName Server that sentry is Authorizing
- */
- @VisibleForTesting
- public HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor,
- HiveConnectionFactory hiveConnectionFactory, String authServerName) {
- LOGGER.info("HMSFollower is being initialized");
- readyToServe = false;
- authzConf = conf;
- this.leaderMonitor = leaderMonitor;
- sentryStore = store;
-
- if (authServerName == null) {
- authServerName = conf.get(AUTHZ_SERVER_NAME.getVar(),
- conf.get(AUTHZ_SERVER_NAME_DEPRECATED.getVar(), AUTHZ_SERVER_NAME_DEPRECATED.getDefault()));
- }
-
- notificationProcessor = new NotificationProcessor(sentryStore, authServerName, authzConf);
- client = new SentryHMSClient(authzConf, hiveConnectionFactory);
- hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(authzConf); // no cache to test different settings for hdfs sync
- notificationFetcher = new HiveNotificationFetcher(sentryStore, hiveConnectionFactory);
-
- // subscribe to full update notification
- if (conf.getBoolean(ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB, false)) {
- LOGGER.info(FULL_UPDATE_TRIGGER + "subscribing to topic " + PubSub.Topic.HDFS_SYNC_HMS.getName());
- PubSub.getInstance().subscribe(PubSub.Topic.HDFS_SYNC_HMS, this);
- }
-
- }
-
- @VisibleForTesting
- public static boolean isConnectedToHms() {
- return connectedToHms;
- }
-
- @VisibleForTesting
- void setSentryHmsClient(SentryHMSClient client) {
- this.client = client;
- }
-
- @Override
- public void close() {
- if (client != null) {
- // Close any outstanding connections to HMS
- try {
- client.disconnect();
- SentryStateBank.disableState(HMSFollowerState.COMPONENT,HMSFollowerState.CONNECTED);
- } catch (Exception failure) {
- LOGGER.error("Failed to close the Sentry Hms Client", failure);
- }
- }
-
- notificationFetcher.close();
- }
-
- @Override
- public void run() {
- SentryStateBank.enableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED);
- long lastProcessedNotificationId;
- try {
- try {
- // Initializing lastProcessedNotificationId based on the latest persisted notification ID.
- lastProcessedNotificationId = sentryStore.getLastProcessedNotificationID();
- } catch (Exception e) {
- LOGGER.error("Failed to get the last processed notification id from sentry store, "
- + "Skipping the processing", e);
- return;
- }
- // Wake any clients connected to this service waiting for HMS already processed notifications.
- wakeUpWaitingClientsForSync(lastProcessedNotificationId);
- // Only the leader should listen to HMS updates
- if (!isLeader()) {
- // Close any outstanding connections to HMS
- close();
- return;
- }
- syncupWithHms(lastProcessedNotificationId);
- } finally {
- SentryStateBank.disableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED);
- }
- }
-
- private boolean isLeader() {
- return (leaderMonitor == null) || leaderMonitor.isLeader();
- }
-
- @VisibleForTesting
- String getAuthServerName() {
- return notificationProcessor.getAuthServerName();
- }
-
- /**
- * Processes new Hive Metastore notifications.
- *
- * <p>If no notifications are processed yet, then it
- * does a full initial snapshot of the Hive Metastore followed by new notifications updates that
- * could have happened after it.
- *
- * <p>Clients connections waiting for an event notification will be
- * woken up afterwards.
- */
- private void syncupWithHms(long notificationId) {
- try {
- client.connect();
- connectedToHms = true;
- SentryStateBank.enableState(HMSFollowerState.COMPONENT,HMSFollowerState.CONNECTED);
- } catch (Throwable e) {
- LOGGER.error("HMSFollower cannot connect to HMS!!", e);
- return;
- }
-
- try {
- /* Before getting notifications, it checks if a full HMS snapshot is required. */
- if (isFullSnapshotRequired(notificationId)) {
- createFullSnapshot();
- return;
- }
-
- Collection<NotificationEvent> notifications =
- notificationFetcher.fetchNotifications(notificationId);
-
- // After getting notifications, it checks if the HMS did some clean-up and notifications
- // are out-of-sync with Sentry.
- if (areNotificationsOutOfSync(notifications, notificationId)) {
- createFullSnapshot();
- return;
- }
-
- if (!readyToServe) {
- // Allow users and/or applications who look into the Sentry console output to see
- // when Sentry is ready to serve.
- System.out.println("Sentry HMS support is ready");
- readyToServe = true;
- }
-
- // Continue with processing new notifications if no snapshots are done.
- processNotifications(notifications);
- } catch (TException e) {
- LOGGER.error("An error occurred while fetching HMS notifications: ", e);
- close();
- } catch (Throwable t) {
- // catching errors to prevent the executor to halt.
- LOGGER.error("Exception in HMSFollower! Caused by: " + t.getMessage(), t);
-
- close();
- }
- }
-
- /**
- * Checks if a new full HMS snapshot request is needed by checking if:
- * <ul>
- * <li>Sentry HMS Notification table is EMPTY</li>
- * <li>HDFSSync is enabled and Sentry Authz Snapshot table is EMPTY</li>
- * <li>The current notification Id on the HMS is less than the
- * latest processed by Sentry.</li>
- * <li>Full Snapshot Signal is detected</li>
- * </ul>
- *
- * @param latestSentryNotificationId The notification Id to check against the HMS
- * @return True if a full snapshot is required; False otherwise.
- * @throws Exception If an error occurs while checking the SentryStore or the HMS client.
- */
- private boolean isFullSnapshotRequired(long latestSentryNotificationId) throws Exception {
- if (sentryStore.isHmsNotificationEmpty()) {
- LOGGER.debug("Sentry Store has no HMS Notifications. Create Full HMS Snapshot. "
- + "latest sentry notification Id = {}", latestSentryNotificationId);
- return true;
- }
-
- // Once HDFS sync is enabled, and if MAuthzPathsSnapshotId
- // table is still empty, we need to request a full snapshot
- if(hdfsSyncEnabled && sentryStore.isAuthzPathsSnapshotEmpty()) {
- LOGGER.debug("HDFSSync is enabled and MAuthzPathsSnapshotId table is empty. Need to request a full snapshot");
- return true;
- }
-
- long currentHmsNotificationId = notificationFetcher.getCurrentNotificationId();
- if (currentHmsNotificationId < latestSentryNotificationId) {
- LOGGER.info("The current notification ID on HMS = {} is less than the latest processed Sentry "
- + "notification ID = {}. Need to request a full HMS snapshot",
- currentHmsNotificationId, latestSentryNotificationId);
- return true;
- }
-
- // check if forced full update is required, reset update flag to false
- // to only do it once per forced full update request.
- if (fullUpdateHMS.compareAndSet(true, false)) {
- LOGGER.info(FULL_UPDATE_TRIGGER + "initiating full HMS snapshot request");
- return true;
- }
-
- return false;
- }
-
- /**
- * Checks if the HMS and Sentry processed notifications are out-of-sync.
- * This could happen because the HMS did some clean-up of old notifications
- * and Sentry was not requesting notifications during that time.
- *
- * @param events All new notifications to check for an out-of-sync.
- * @param latestProcessedId The latest notification processed by Sentry to check against the
- * list of notifications events.
- * @return True if an out-of-sync is found; False otherwise.
- */
- private boolean areNotificationsOutOfSync(Collection<NotificationEvent> events,
- long latestProcessedId) {
- if (events.isEmpty()) {
- return false;
- }
-
- /*
- * If the sequence of notifications has a gap, then an out-of-sync might
- * have happened due to the following issue:
- *
- * - HDFS sync was disabled or Sentry was shutdown for a time period longer than
- * the HMS notification clean-up thread causing old notifications to be deleted.
- *
- * HMS notifications may contain both gaps in the sequence and duplicates
- * (the same ID repeated more then once for different events).
- *
- * To accept duplicates (see NotificationFetcher for more info), then a gap is found
- * if the 1st notification received is higher than the current ID processed + 1.
- * i.e.
- * 1st ID = 3, latest ID = 3 (duplicate found but no gap detected)
- * 1st ID = 4, latest ID = 3 (consecutive ID found but no gap detected)
- * 1st ID = 5, latest ID = 3 (a gap is detected)
- */
-
- List<NotificationEvent> eventList = (List<NotificationEvent>) events;
- long firstNotificationId = eventList.get(0).getEventId();
-
- if (firstNotificationId > (latestProcessedId + 1)) {
- LOGGER.info("First HMS event notification Id = {} is greater than latest Sentry processed"
- + "notification Id = {} + 1. Need to request a full HMS snapshot.", firstNotificationId, latestProcessedId);
- return true;
- }
-
- return false;
- }
-
- /**
- * Request for full snapshot and persists it if there is no snapshot available in the sentry
- * store. Also, wakes-up any waiting clients.
- *
- * @return ID of last notification processed.
- * @throws Exception if there are failures
- */
- private long createFullSnapshot() throws Exception {
- LOGGER.debug("Attempting to take full HMS snapshot");
- Preconditions.checkState(!SentryStateBank.isEnabled(SentryServiceState.COMPONENT,
- SentryServiceState.FULL_UPDATE_RUNNING),
- "HMSFollower shown loading full snapshot when it should not be.");
- try {
- // Set that the full update is running
- SentryStateBank
- .enableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING);
-
- PathsImage snapshotInfo = client.getFullSnapshot();
- if (snapshotInfo.getPathImage().isEmpty()) {
- LOGGER.debug("Received empty path image from HMS while taking a full snapshot");
- return snapshotInfo.getId();
- }
-
- // Check we're still the leader before persisting the new snapshot
- if (!isLeader()) {
- LOGGER.info("Not persisting full snapshot since not a leader");
- return SentryStore.EMPTY_NOTIFICATION_ID;
- }
- try {
- if (hdfsSyncEnabled) {
- LOGGER.info("Persisting full snapshot for notification Id = {}", snapshotInfo.getId());
- sentryStore.persistFullPathsImage(snapshotInfo.getPathImage(), snapshotInfo.getId());
- } else {
- // We need to persist latest notificationID for next poll
- LOGGER.info("HDFSSync is disabled. Not Persisting full snapshot, "
- + "but only setting last processed notification Id = {}", snapshotInfo.getId());
- sentryStore.setLastProcessedNotificationID(snapshotInfo.getId());
- }
- } catch (Exception failure) {
- LOGGER.error("Received exception while persisting HMS path full snapshot ");
- throw failure;
- }
- // Wake up any HMS waiters that could have been put on hold before getting the
- // eventIDBefore value.
- wakeUpWaitingClientsForSync(snapshotInfo.getId());
- // HMSFollower connected to HMS and it finished full snapshot if that was required
- // Log this message only once
- LOGGER.info("Sentry HMS support is ready");
- return snapshotInfo.getId();
- } catch(Exception failure) {
- LOGGER.error("Received exception while creating HMS path full snapshot ");
- throw failure;
- } finally {
- SentryStateBank
- .disableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING);
- }
- }
-
- /**
- * Process the collection of notifications and wake up any waiting clients.
- * Also, persists the notification ID regardless of processing result.
- *
- * @param events list of event to be processed
- * @throws Exception if the complete notification list is not processed because of JDO Exception
- */
- public void processNotifications(Collection<NotificationEvent> events) throws Exception {
- boolean isNotificationProcessed;
- if (events.isEmpty()) {
- return;
- }
-
- for (NotificationEvent event : events) {
- isNotificationProcessed = false;
- try {
- // Only the leader should process the notifications
- if (!isLeader()) {
- LOGGER.debug("Not processing notifications since not a leader");
- return;
- }
- isNotificationProcessed = notificationProcessor.processNotificationEvent(event);
- } catch (Exception e) {
- if (e.getCause() instanceof JDODataStoreException) {
- LOGGER.info("Received JDO Storage Exception, Could be because of processing "
- + "duplicate notification");
- if (event.getEventId() <= sentryStore.getLastProcessedNotificationID()) {
- // Rest of the notifications need not be processed.
- LOGGER.error("Received event with Id: {} which is smaller then the ID "
- + "persisted in store", event.getEventId());
- break;
- }
- } else {
- LOGGER.error("Processing the notification with ID:{} failed with exception {}",
- event.getEventId(), e);
- }
- }
- if (!isNotificationProcessed) {
- try {
- // Update the notification ID in the persistent store even when the notification is
- // not processed as the content in in the notification is not valid.
- // Continue processing the next notification.
- LOGGER.debug("Explicitly Persisting Notification ID = {} ", event.getEventId());
- sentryStore.persistLastProcessedNotificationID(event.getEventId());
- } catch (Exception failure) {
- LOGGER.error("Received exception while persisting the notification ID = {}", event.getEventId());
- throw failure;
- }
- }
- // Wake up any HMS waiters that are waiting for this ID.
- wakeUpWaitingClientsForSync(event.getEventId());
- }
- }
-
- /**
- * Wakes up HMS waiters waiting for a specific event notification.<p>
- *
- * Verify that HMS image id didn't change since the last time we looked.
- * If id did, it is possible that notifications jumped backward, so reset
- * the counter to the current value.
- *
- * @param eventId Id of a notification
- */
- private void wakeUpWaitingClientsForSync(long eventId) {
- CounterWait counterWait = sentryStore.getCounterWait();
-
- LOGGER.debug("wakeUpWaitingClientsForSync: eventId = {}, hmsImageId = {}", eventId, hmsImageId);
- // Wake up any HMS waiters that are waiting for this ID.
- // counterWait should never be null, but tests mock SentryStore and a mocked one
- // doesn't have it.
- if (counterWait == null) {
- return;
- }
-
- long lastHMSSnapshotId = hmsImageId;
- try {
- // Read actual HMS image ID
- lastHMSSnapshotId = sentryStore.getLastProcessedImageID();
- LOGGER.debug("wakeUpWaitingClientsForSync: lastHMSSnapshotId = {}", lastHMSSnapshotId);
- } catch (Exception e) {
- counterWait.update(eventId);
- LOGGER.error("Failed to get the last processed HMS image id from sentry store");
- return;
- }
-
- // Reset the counter if the persisted image ID is greater than current image ID
- if (lastHMSSnapshotId > hmsImageId) {
- counterWait.reset(eventId);
- hmsImageId = lastHMSSnapshotId;
- LOGGER.debug("wakeUpWaitingClientsForSync: reset counterWait with eventId = {}, new hmsImageId = {}", eventId, hmsImageId);
- }
-
- LOGGER.debug("wakeUpWaitingClientsForSync: update counterWait with eventId = {}, hmsImageId = {}", eventId, hmsImageId);
- counterWait.update(eventId);
- }
-
- /**
- * PubSub.Subscriber callback API
- */
- @Override
- public void onMessage(PubSub.Topic topic, String message) {
- Preconditions.checkArgument(topic == PubSub.Topic.HDFS_SYNC_HMS, "Unexpected topic %s instead of %s", topic, PubSub.Topic.HDFS_SYNC_HMS);
- LOGGER.info(FULL_UPDATE_TRIGGER + "Received [{}, {}] notification", topic, message);
- fullUpdateHMS.set(true);
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
index 097aa62..93cc34f 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
@@ -46,7 +46,7 @@ public final class HiveNotificationFetcher implements AutoCloseable {
private long lastIdFiltered = 0;
private Set<String> cache = new HashSet<>();
- HiveNotificationFetcher(SentryStore sentryStore, HiveConnectionFactory hmsConnectionFactory) {
+ public HiveNotificationFetcher(SentryStore sentryStore, HiveConnectionFactory hmsConnectionFactory) {
this.sentryStore = sentryStore;
this.hmsConnectionFactory = hmsConnectionFactory;
}
@@ -59,7 +59,7 @@ public final class HiveNotificationFetcher implements AutoCloseable {
* @return A list of newer notifications unseen by Sentry.
* @throws Exception If an error occurs on the HMS communication.
*/
- List<NotificationEvent> fetchNotifications(long lastEventId) throws Exception {
+ public List<NotificationEvent> fetchNotifications(long lastEventId) throws Exception {
return fetchNotifications(lastEventId, Integer.MAX_VALUE);
}
@@ -178,7 +178,7 @@ public final class HiveNotificationFetcher implements AutoCloseable {
* @return the latest notification Id logged by the HMS
* @throws Exception when an error occurs when talking to the HMS client
*/
- long getCurrentNotificationId() throws Exception {
+ public long getCurrentNotificationId() throws Exception {
CurrentNotificationEventId eventId;
try {
eventId = getHmsClient().getCurrentNotificationEventId();
http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
index 77634cf..6a19e6b 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
@@ -70,7 +70,7 @@ public final class HiveSimpleConnectionFactory implements HiveConnectionFactory
* @throws IOException
* @throws LoginException
*/
- void init() throws IOException, LoginException {
+ public void init() throws IOException, LoginException {
if (insecure) {
LOGGER.info("Using insecure connection to HMS");
return;
http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusMonitor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusMonitor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusMonitor.java
deleted file mode 100644
index 8e80d55..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusMonitor.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.service.thrift;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
-import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
-import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.provider.db.service.persistent.HAContext;
-
-import javax.annotation.concurrent.ThreadSafe;
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.sentry.service.thrift.ServiceConstants.ServerConfig.*;
-
-/**
- * LeaderStatusMonitor participates in the distributed leader election protocol
- * and allows clients to access the global leaadership status.
- * <p>
- * LeaderStatusMonitor is a singleton that uses Curator framework via
- * {@link HAContext}.The leadership status can be accessed via the
- * {@link #isLeader()} method.<p>
- *
- * Usually leadership re-election is initiated by the Curator framework when one
- * of the nodes disconnects from ZooKeeper, but LeaderStatusMonitor also supports
- * voluntary release of the leadership via the {@link #deactivate()} method. This is
- * intended to be used for debugging purposes.
- * <p>
- * The class also simulates leader election in non-HA environments. In such cases its
- * {@link #isLeader()} method always returns True. The non-HA environment is determined
- * by the absence of the SENTRY_HA_ZOOKEEPER_QUORUM in the configuration.
- *
- * <h2>Implementation notes</h2>
- *
- * <h3>Initialization</h3>
- *
- * Class initialization is split between the constructor and the {@link #init()} method.
- * There are two reasons for it:
- * <ul>
- * <li>We do not want to pass <strong>this</strong> reference to
- * {@link HAContext#newLeaderSelector(String, LeaderSelectorListener)}
- * until it is fully initialized</li>
- * <li>We do not want to call {@link LeaderSelector#start()} method in constructor</li>
- * </ul>
- *
- * Since LeaderStatusMonitor is a singleton and an instance can only be obtained via the
- * {@link #getLeaderStatusMonitor(Configuration)} method, we hide this construction split
- * from the callers.
- *
- * <h3>Synchronization</h3>
- * Singleton synchronization is achieved using the synchronized class builder
- * {@link #getLeaderStatusMonitor(Configuration)}
- * <p>
- * Upon becoming a leader, the code loops in {@link #takeLeadership(CuratorFramework)}
- * until it receives a deactivation signal from {@link #deactivate()}. This is synchronized
- * using a {@link #lock} and condition variable {@link #cond}.
- * <p>
- * Access to the leadership status {@link #isLeader} is also protected by the {@link #lock}.
- * This isn't strictly necessary and a volatile field would be sufficient, but since we
- * already use the {@link #lock} this is more straightforward.
- */
-@ThreadSafe
-final class LeaderStatusMonitor
- extends LeaderSelectorListenerAdapter implements AutoCloseable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(LeaderStatusMonitor.class);
-
- private static final String LEADER_SELECTOR_SUFFIX = "leader";
-
- /** Unique instance of the singleton object */
- private static LeaderStatusMonitor leaderStatusMonitor = null;
-
- private final HAContext haContext;
-
- /** Unique string describing this instance */
- private final String defaultIncarnationId = generateIncarnationId();
- private String incarnationId;
-
- /** True when not using ZooKeeeper */
- private final boolean isSingleNodeMode;
-
- /** Lock and condition used to signal the leader to voluntary release leadership */
- private final Lock lock = new ReentrantLock();
- /** Condition variable used to synchronize voluntary leadership release */
- private final Condition cond = lock.newCondition();
- /** Leadership status - true if leader. */
- private boolean isLeader = false;
-
- /** Curator framework leader monitor */
- private LeaderSelector leaderSelector = null;
-
- /** The number of times this incarnation has become the leader. */
- private final AtomicLong leaderCount = new AtomicLong(0);
-
- /**
- * Constructor. Initialize state and create HA context if configuration
- * specifies ZooKeeper servers.
- * @param conf Configuration. The fields we are interested in are:
- * <ul>
- * <li>SENTRY_HA_ZOOKEEPER_QUORUM</li>
- * </ul>
- * Configuration is also passed to the
- * {@link HAContext#newLeaderSelector(String, LeaderSelectorListener)}
- * which uses more properties.
- * @throws Exception
- */
-
- @VisibleForTesting
- protected LeaderStatusMonitor(Configuration conf) throws Exception {
- // Only enable HA configuration if zookeeper is configured
- String zkServers = conf.get(SENTRY_HA_ZOOKEEPER_QUORUM, "");
- if (zkServers.isEmpty()) {
- isSingleNodeMode = true;
- haContext = null;
- isLeader = true;
- incarnationId = "";
- LOGGER.info("Leader election protocol disabled, assuming single active server");
- return;
- }
- isSingleNodeMode = false;
- incarnationId = defaultIncarnationId;
- haContext = HAContext.getHAServerContext(conf);
-
- LOGGER.info("Created LeaderStatusMonitor(incarnationId={}, "
- + "zkServers='{}')", incarnationId, zkServers);
- }
-
- /**
- * Tests may need to provide custm incarnation ID
- * @param conf confguration
- * @param incarnationId custom incarnation ID
- * @throws Exception
- */
- @VisibleForTesting
- protected LeaderStatusMonitor(Configuration conf, String incarnationId) throws Exception {
- this(conf);
- this.incarnationId = incarnationId;
- }
-
- /**
- * Second half of the constructor links this object with {@link HAContext} and
- * starts leader election protocol.
- */
- @VisibleForTesting
- protected void init() {
- if (isSingleNodeMode) {
- return;
- }
-
- leaderSelector = haContext.newLeaderSelector("/" + LEADER_SELECTOR_SUFFIX, this);
- leaderSelector.setId(incarnationId);
- leaderSelector.autoRequeue();
- leaderSelector.start();
- }
-
- /**
- *
- * @param conf Configuration. See {@link #LeaderStatusMonitor(Configuration)} for details.
- * @return A global LeaderStatusMonitor instance.
- * @throws Exception
- */
- @SuppressWarnings("LawOfDemeter")
- static synchronized LeaderStatusMonitor getLeaderStatusMonitor(Configuration conf)
- throws Exception {
- if (leaderStatusMonitor == null) {
- leaderStatusMonitor = new LeaderStatusMonitor(conf);
- leaderStatusMonitor.init();
- }
- return leaderStatusMonitor;
- }
-
- /**
- * @return number of times this leader was elected. Used for metrics.
- */
- long getLeaderCount() {
- return leaderCount.get();
- }
-
- /**
- * Shut down the LeaderStatusMonitor and wait for it to transition to
- * standby.
- */
- @Override
- public void close() {
- if (leaderSelector != null) {
- // Shut down our Curator hooks.
- leaderSelector.close();
- }
- }
-
- /**
- * Deactivate the current client, if it is active.
- * In non-HA case this is a no-op.
- */
- void deactivate() {
- if (isSingleNodeMode) {
- return;
- }
- lock.lock();
- try {
- cond.signal();
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * @return true iff we are the leader.
- * In non-HA case always returns true
- */
- boolean isLeader() {
- if (isSingleNodeMode) {
- return true;
- }
- lock.lock();
- @SuppressWarnings("FieldAccessNotGuarded")
- boolean leader = isLeader;
- lock.unlock();
- return leader;
- }
-
- /**
- * Curator framework callback which is called when we become a leader.
- * Should return only when we decide to resign.
- */
- @Override
- public void takeLeadership(CuratorFramework client) throws Exception {
- leaderCount.incrementAndGet();
- LOGGER.info("Becoming leader in Sentry HA cluster:{}", this);
- lock.lock();
- try {
- isLeader = true;
- // Wait until we are interrupted or receive a signal
- cond.await();
- } catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
- LOGGER.error("takeLeadership call interrupted:" + this, ignored);
- } finally {
- isLeader = false;
- lock.unlock();
- LOGGER.info("Resigning from leader status in a Sentry HA cluster:{}", this);
- }
- }
-
- /**
- * Generate ID for the activator. <p>
- *
- * Ideally we would like something like host@pid, but Java doesn't provide a good
- * way to determine pid value, so we use
- * {@link RuntimeMXBean#getName()} which usually contains host
- * name and pid.
- */
- private static String generateIncarnationId() {
- return ManagementFactory.getRuntimeMXBean().getName();
- }
-
- @Override
- public String toString() {
- return isSingleNodeMode?"Leader election disabled":
- String.format("{isSingleNodeMode=%b, incarnationId=%s, isLeader=%b, leaderCount=%d}",
- isSingleNodeMode, incarnationId, isLeader, leaderCount);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
deleted file mode 100644
index d09da5f..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
+++ /dev/null
@@ -1,773 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.service.thrift;
-
-import com.codahale.metrics.Timer;
-import com.codahale.metrics.Timer.Context;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAddPartitionMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterPartitionMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterTableMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateDatabaseMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateTableMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropDatabaseMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropPartitionMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropTableMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageDeserializer;
-import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException;
-import org.apache.sentry.core.common.exception.SentryInvalidInputException;
-import org.apache.sentry.core.common.exception.SentryNoSuchObjectException;
-import org.apache.sentry.core.common.utils.PathUtils;
-import org.apache.sentry.hdfs.PathsUpdate;
-import org.apache.sentry.hdfs.PermissionsUpdate;
-import org.apache.sentry.hdfs.SentryMalformedPathException;
-import org.apache.sentry.hdfs.UniquePathsUpdate;
-import org.apache.sentry.hdfs.Updateable.Update;
-import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
-import org.apache.sentry.provider.db.service.persistent.SentryStore;
-import org.apache.sentry.provider.db.service.thrift.SentryMetrics;
-import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-
-import static com.codahale.metrics.MetricRegistry.name;
-import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE;
-import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE;
-
-
-
-/**
- * NotificationProcessor processes various notification events generated from
- * the Hive MetaStore state change, and applies these changes to the complete
- * HMS Paths snapshot or delta update stored in Sentry using SentryStore.
- *
- * <p>NotificationProcessor should not skip processing notification events for any reason.
- * If some notification events are to be skipped, appropriate logic should be added in
- * HMSFollower before invoking NotificationProcessor.
- */
-final class NotificationProcessor {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(NotificationProcessor.class);
- private final SentryStore sentryStore;
- private final SentryJSONMessageDeserializer deserializer;
- private final String authServerName;
- // These variables can be updated even after object is instantiated, for testing purposes.
- private boolean syncStoreOnCreate = false;
- private boolean syncStoreOnDrop = false;
- private final boolean hdfsSyncEnabled;
-
- /**
- * Configuring notification processor.
- *
- * @param sentryStore sentry backend store
- * @param authServerName Server that sentry is authorizing
- * @param conf sentry configuration
- */
- NotificationProcessor(SentryStore sentryStore, String authServerName,
- Configuration conf) {
- this.sentryStore = sentryStore;
- deserializer = new SentryJSONMessageDeserializer();
- this.authServerName = authServerName;
- syncStoreOnCreate = Boolean
- .parseBoolean(conf.get(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getVar(),
- AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getDefault()));
- syncStoreOnDrop = Boolean.parseBoolean(conf.get(AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getVar(),
- AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getDefault()));
- hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabled(conf);
- }
-
- /**
- * Split path into components on the "/" character.
- * The path should not start with "/".
- * This is consumed by Thrift interface, so the return result should be
- * {@code List<String>}
- *
- * @param path input oath e.g. {@code foo/bar}
- * @return list of components, e.g. [foo, bar]
- */
- private static List<String> splitPath(String path) {
- return Lists.newArrayList(PathUtils.splitPath(path));
- }
-
- /**
- * Constructs permission update to be persisted for drop event that can be persisted
- * from thrift object.
- *
- * @param authorizable thrift object that is dropped.
- * @return update to be persisted
- * @throws SentryInvalidInputException if the required fields are set in argument provided
- */
- @VisibleForTesting
- static Update getPermUpdatableOnDrop(TSentryAuthorizable authorizable)
- throws SentryInvalidInputException {
- PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
- String authzObj = SentryServiceUtil.getAuthzObj(authorizable);
- update.addPrivilegeUpdate(authzObj)
- .putToDelPrivileges(PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES);
- return update;
- }
-
- @VisibleForTesting
- String getAuthServerName() {
- return authServerName;
- }
-
- /**
- * Constructs permission update to be persisted for rename event that can be persisted from thrift
- * object.
- *
- * @param oldAuthorizable old thrift object
- * @param newAuthorizable new thrift object
- * @return update to be persisted
- * @throws SentryInvalidInputException if the required fields are set in arguments provided
- */
- @VisibleForTesting
- static Update getPermUpdatableOnRename(TSentryAuthorizable oldAuthorizable,
- TSentryAuthorizable newAuthorizable)
- throws SentryInvalidInputException {
- String oldAuthz = SentryServiceUtil.getAuthzObj(oldAuthorizable);
- String newAuthz = SentryServiceUtil.getAuthzObj(newAuthorizable);
- PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
- TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS);
- privUpdate.putToAddPrivileges(newAuthz, newAuthz);
- privUpdate.putToDelPrivileges(oldAuthz, oldAuthz);
- return update;
- }
-
- /**
- * This function is only used for testing purposes.
- *
- * @param value to be set
- */
- @SuppressWarnings("SameParameterValue")
- @VisibleForTesting
- void setSyncStoreOnCreate(boolean value) {
- syncStoreOnCreate = value;
- }
-
- /**
- * This function is only used for testing purposes.
- *
- * @param value to be set
- */
- @SuppressWarnings("SameParameterValue")
- @VisibleForTesting
- void setSyncStoreOnDrop(boolean value) {
- syncStoreOnDrop = value;
- }
-
- /**
- * Processes the event and persist to sentry store.
- *
- * @param event to be processed
- * @return true, if the event is persisted to sentry store. false, if the event is not persisted.
- * @throws Exception if there is an error processing the event.
- */
- boolean processNotificationEvent(NotificationEvent event) throws Exception {
- LOGGER
- .debug("Processing event with id:{} and Type:{}", event.getEventId(), event.getEventType());
-
- // Expose time used for each request time as a metric.
- // We use lower-case version of the event name.
- EventType eventType = EventType.valueOf(event.getEventType());
- Timer timer = SentryMetrics
- .getInstance()
- .getTimer(name(HMSFollower.class, eventType.toString().toLowerCase()));
-
- try (Context ignored = timer.time()) {
- switch (eventType) {
- case CREATE_DATABASE:
- return processCreateDatabase(event);
- case DROP_DATABASE:
- return processDropDatabase(event);
- case CREATE_TABLE:
- return processCreateTable(event);
- case DROP_TABLE:
- return processDropTable(event);
- case ALTER_TABLE:
- return processAlterTable(event);
- case ADD_PARTITION:
- return processAddPartition(event);
- case DROP_PARTITION:
- return processDropPartition(event);
- case ALTER_PARTITION:
- return processAlterPartition(event);
- default:
- LOGGER.error("Notification with ID:{} has invalid event type: {}", event.getEventId(),
- event.getEventType());
- return false;
- }
- }
- }
-
- /**
- * Processes "create database" notification event, and applies its corresponding
- * snapshot change as well as delta path update into Sentry DB.
- *
- * @param event notification event to be processed.
- * @throws Exception if encounters errors while persisting the path change
- */
- private boolean processCreateDatabase(NotificationEvent event) throws Exception {
- SentryJSONCreateDatabaseMessage message =
- deserializer.getCreateDatabaseMessage(event.getMessage());
- String dbName = message.getDB();
- String location = message.getLocation();
- if ((dbName == null) || (location == null)) {
- LOGGER.error("Create database event "
- + "has incomplete information. dbName: {} location: {}",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(location, "null"));
- return false;
- }
-
- if (syncStoreOnCreate) {
- dropSentryDbPrivileges(dbName, event);
- }
-
- if (hdfsSyncEnabled) {
- List<String> locations = Collections.singletonList(location);
- addPaths(dbName, locations, event);
-
- return true;
- }
-
- return false;
- }
-
- /**
- * Processes "drop database" notification event, and applies its corresponding
- * snapshot change as well as delta path update into Sentry DB.
- *
- * @param event notification event to be processed.
- * @throws Exception if encounters errors while persisting the path change
- */
- private boolean processDropDatabase(NotificationEvent event) throws Exception {
- SentryJSONDropDatabaseMessage dropDatabaseMessage =
- deserializer.getDropDatabaseMessage(event.getMessage());
- String dbName = dropDatabaseMessage.getDB();
- String location = dropDatabaseMessage.getLocation();
- if (dbName == null) {
- LOGGER.error("Drop database event has incomplete information: dbName = null");
- return false;
- }
- if (syncStoreOnDrop) {
- dropSentryDbPrivileges(dbName, event);
- }
-
- if (hdfsSyncEnabled) {
- List<String> locations = Collections.singletonList(location);
- removePaths(dbName, locations, event);
- return true;
- }
- return false;
- }
-
- /**
- * Processes "create table" notification event, and applies its corresponding
- * snapshot change as well as delta path update into Sentry DB.
- *
- * @param event notification event to be processed.
- * @throws Exception if encounters errors while persisting the path change
- */
- private boolean processCreateTable(NotificationEvent event)
- throws Exception {
- SentryJSONCreateTableMessage createTableMessage = deserializer
- .getCreateTableMessage(event.getMessage());
- String dbName = createTableMessage.getDB();
- String tableName = createTableMessage.getTable();
- String location = createTableMessage.getLocation();
- if ((dbName == null) || (tableName == null) || (location == null)) {
- LOGGER.error(String.format("Create table event " + "has incomplete information."
- + " dbName = %s, tableName = %s, location = %s",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(tableName, "null"),
- StringUtils.defaultIfBlank(location, "null")));
- return false;
- }
- if (syncStoreOnCreate) {
- dropSentryTablePrivileges(dbName, tableName, event);
- }
-
- if (hdfsSyncEnabled) {
- String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
- List<String> locations = Collections.singletonList(location);
- addPaths(authzObj, locations, event);
- return true;
- }
-
- return false;
- }
-
- /**
- * Processes "drop table" notification event. It drops all partitions belongs to
- * the table as well. And applies its corresponding snapshot change as well
- * as delta path update into Sentry DB.
- *
- * @param event notification event to be processed.
- * @throws Exception if encounters errors while persisting the path change
- */
- private boolean processDropTable(NotificationEvent event) throws Exception {
- SentryJSONDropTableMessage dropTableMessage = deserializer
- .getDropTableMessage(event.getMessage());
- String dbName = dropTableMessage.getDB();
- String tableName = dropTableMessage.getTable();
- if ((dbName == null) || (tableName == null)) {
- LOGGER.error("Drop table event "
- + "has incomplete information. dbName: {}, tableName: {}",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(tableName, "null"));
- return false;
- }
- if (syncStoreOnDrop) {
- dropSentryTablePrivileges(dbName, tableName, event);
- }
-
- if (hdfsSyncEnabled) {
- String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
- removeAllPaths(authzObj, event);
- return true;
- }
-
- return false;
- }
-
- /**
- * Processes "alter table" notification event, and applies its corresponding
- * snapshot change as well as delta path update into Sentry DB.
- *
- * @param event notification event to be processed.
- * @throws Exception if encounters errors while persisting the path change
- */
- private boolean processAlterTable(NotificationEvent event) throws Exception {
-
- if (!hdfsSyncEnabled) {
- return false;
- }
-
- SentryJSONAlterTableMessage alterTableMessage =
- deserializer.getAlterTableMessage(event.getMessage());
- String oldDbName = alterTableMessage.getDB();
- String oldTableName = alterTableMessage.getTable();
- String newDbName = event.getDbName();
- String newTableName = event.getTableName();
- String oldLocation = alterTableMessage.getOldLocation();
- String newLocation = alterTableMessage.getNewLocation();
-
- if ((oldDbName == null)
- || (oldTableName == null)
- || (newDbName == null)
- || (newTableName == null)
- || (oldLocation == null)
- || (newLocation == null)) {
- LOGGER.error(String.format("Alter table event "
- + "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, "
- + "newDbName = %s, newTableName = %s, newLocation = %s",
- StringUtils.defaultIfBlank(oldDbName, "null"),
- StringUtils.defaultIfBlank(oldTableName, "null"),
- StringUtils.defaultIfBlank(oldLocation, "null"),
- StringUtils.defaultIfBlank(newDbName, "null"),
- StringUtils.defaultIfBlank(newTableName, "null"),
- StringUtils.defaultIfBlank(newLocation, "null")));
- return false;
- }
-
- if ((oldDbName.equals(newDbName))
- && (oldTableName.equals(newTableName))
- && (oldLocation.equals(newLocation))) {
- LOGGER.error(String.format("Alter table notification ignored as neither name nor "
- + "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, "
- + "newLocation = %s", oldDbName + "." + oldTableName, oldLocation,
- newDbName + "." + newTableName, newLocation));
- return false;
- }
-
- if (!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) {
- // Name has changed
- try {
- renamePrivileges(oldDbName, oldTableName, newDbName, newTableName);
- } catch (SentryNoSuchObjectException e) {
- LOGGER.info("Rename Sentry privilege ignored as there are no privileges on the table:"
- + " {}.{}", oldDbName, oldTableName);
- } catch (Exception e) {
- LOGGER.info("Could not process Alter table event. Event: {}", event.toString(), e);
- return false;
- }
- }
- String oldAuthzObj = oldDbName + "." + oldTableName;
- String newAuthzObj = newDbName + "." + newTableName;
- renameAuthzPath(oldAuthzObj, newAuthzObj, oldLocation, newLocation, event);
- return true;
- }
-
- /**
- * Processes "add partition" notification event, and applies its corresponding
- * snapshot change as well as delta path update into Sentry DB.
- *
- * @param event notification event to be processed.
- * @throws Exception if encounters errors while persisting the path change
- */
- private boolean processAddPartition(NotificationEvent event)
- throws Exception {
- if (!hdfsSyncEnabled) {
- return false;
- }
-
- SentryJSONAddPartitionMessage addPartitionMessage =
- deserializer.getAddPartitionMessage(event.getMessage());
- String dbName = addPartitionMessage.getDB();
- String tableName = addPartitionMessage.getTable();
- List<String> locations = addPartitionMessage.getLocations();
- if ((dbName == null) || (tableName == null) || (locations == null)) {
- LOGGER.error(String.format("Create table event has incomplete information. "
- + "dbName = %s, tableName = %s, locations = %s",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(tableName, "null"),
- locations != null ? locations.toString() : "null"));
- return false;
- }
- String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
- addPaths(authzObj, locations, event);
- return true;
- }
-
- /**
- * Processes "drop partition" notification event, and applies its corresponding
- * snapshot change as well as delta path update into Sentry DB.
- *
- * @param event notification event to be processed.
- * @throws Exception if encounters errors while persisting the path change
- */
- private boolean processDropPartition(NotificationEvent event)
- throws Exception {
- if (!hdfsSyncEnabled) {
- return false;
- }
-
- SentryJSONDropPartitionMessage dropPartitionMessage =
- deserializer.getDropPartitionMessage(event.getMessage());
- String dbName = dropPartitionMessage.getDB();
- String tableName = dropPartitionMessage.getTable();
- List<String> locations = dropPartitionMessage.getLocations();
- if ((dbName == null) || (tableName == null) || (locations == null)) {
- LOGGER.error(String.format("Drop partition event "
- + "has incomplete information. dbName = %s, tableName = %s, location = %s",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(tableName, "null"),
- locations != null ? locations.toString() : "null"));
- return false;
- }
- String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
- removePaths(authzObj, locations, event);
- return true;
- }
-
- /**
- * Processes "alter partition" notification event, and applies its corresponding
- * snapshot change as well as delta path update into Sentry DB.
- *
- * @param event notification event to be processed.
- * @throws Exception if encounters errors while persisting the path change
- */
- private boolean processAlterPartition(NotificationEvent event) throws Exception {
- if (!hdfsSyncEnabled) {
- return false;
- }
-
- SentryJSONAlterPartitionMessage alterPartitionMessage =
- deserializer.getAlterPartitionMessage(event.getMessage());
- String dbName = alterPartitionMessage.getDB();
- String tableName = alterPartitionMessage.getTable();
- String oldLocation = alterPartitionMessage.getOldLocation();
- String newLocation = alterPartitionMessage.getNewLocation();
-
- if ((dbName == null)
- || (tableName == null)
- || (oldLocation == null)
- || (newLocation == null)) {
- LOGGER.error(String.format("Alter partition event "
- + "has incomplete information. dbName = %s, tableName = %s, "
- + "oldLocation = %s, newLocation = %s",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(tableName, "null"),
- StringUtils.defaultIfBlank(oldLocation, "null"),
- StringUtils.defaultIfBlank(newLocation, "null")));
- return false;
- }
-
- if (oldLocation.equals(newLocation)) {
- LOGGER.debug(String.format("Alter partition notification ignored as"
- + "location has not changed: AuthzObj = %s, Location = %s", dbName + "."
- + "." + tableName, oldLocation));
- return false;
- }
-
- String oldAuthzObj = dbName + "." + tableName;
- renameAuthzPath(oldAuthzObj, oldAuthzObj, oldLocation, newLocation, event);
- return true;
- }
-
- /**
- * Adds an authzObj along with a set of paths into the authzObj -> [Paths] mapping
- * as well as persist the corresponding delta path change to Sentry DB.
- *
- * @param authzObj the given authzObj
- * @param locations a set of paths need to be added
- * @param event the NotificationEvent object from where authzObj and locations were obtained
- */
- private void addPaths(String authzObj, Collection<String> locations, NotificationEvent event)
- throws Exception {
- // AuthzObj is case insensitive
- authzObj = authzObj.toLowerCase();
-
- UniquePathsUpdate update = new UniquePathsUpdate(event, false);
- Collection<String> paths = new HashSet<>(locations.size());
- // addPath and persist into Sentry DB.
- // Skip update if encounter malformed path.
- for (String location : locations) {
- String pathTree = getPath(location);
- if (pathTree == null) {
- LOGGER.debug("HMS Path Update ["
- + "OP : addPath, "
- + "authzObj : " + authzObj + ", "
- + "path : " + location + "] - nothing to add" + ", "
- + "notification event ID: " + event.getEventId() + "]");
- } else {
- LOGGER.debug("HMS Path Update ["
- + "OP : addPath, " + "authzObj : "
- + authzObj + ", "
- + "path : " + location + ", "
- + "notification event ID: " + event.getEventId() + "]");
- update.newPathChange(authzObj).addToAddPaths(splitPath(pathTree));
- paths.add(pathTree);
- }
- }
- sentryStore.addAuthzPathsMapping(authzObj, paths, update);
- }
-
- /**
- * Removes a set of paths map to a given authzObj from the authzObj -> [Paths] mapping
- * as well as persist the corresponding delta path change to Sentry DB.
- *
- * @param authzObj the given authzObj
- * @param locations a set of paths need to be removed
- * @param event the NotificationEvent object from where authzObj and locations were obtained
- */
- private void removePaths(String authzObj, Collection<String> locations, NotificationEvent event)
- throws Exception {
- // AuthzObj is case insensitive
- authzObj = authzObj.toLowerCase();
-
- UniquePathsUpdate update = new UniquePathsUpdate(event, false);
- Collection<String> paths = new HashSet<>(locations.size());
- for (String location : locations) {
- String pathTree = getPath(location);
- if (pathTree == null) {
- LOGGER.debug("HMS Path Update ["
- + "OP : removePath, "
- + "authzObj : " + authzObj + ", "
- + "path : " + location + "] - nothing to remove" + ", "
- + "notification event ID: " + event.getEventId() + "]");
- } else {
- LOGGER.debug("HMS Path Update ["
- + "OP : removePath, "
- + "authzObj : " + authzObj + ", "
- + "path : " + location + ", "
- + "notification event ID: " + event.getEventId() + "]");
- update.newPathChange(authzObj).addToDelPaths(splitPath(pathTree));
- paths.add(pathTree);
- }
- }
- sentryStore.deleteAuthzPathsMapping(authzObj, paths, update);
- }
-
- /**
- * Removes a given authzObj and all paths belongs to it from the
- * authzObj -> [Paths] mapping as well as persist the corresponding
- * delta path change to Sentry DB.
- *
- * @param authzObj the given authzObj to be deleted
- * @param event the NotificationEvent object from where authzObj and locations were obtained
- */
- private void removeAllPaths(String authzObj, NotificationEvent event)
- throws Exception {
- // AuthzObj is case insensitive
- authzObj = authzObj.toLowerCase();
-
- LOGGER.debug("HMS Path Update ["
- + "OP : removeAllPaths, "
- + "authzObj : " + authzObj + ", "
- + "notification event ID: " + event.getEventId() + "]");
- UniquePathsUpdate update = new UniquePathsUpdate(event, false);
- update.newPathChange(authzObj).addToDelPaths(
- Lists.newArrayList(PathsUpdate.ALL_PATHS));
- sentryStore.deleteAllAuthzPathsMapping(authzObj, update);
- }
-
- /**
- * Renames a given authzObj and alter the paths belongs to it from the
- * authzObj -> [Paths] mapping as well as persist the corresponding
- * delta path change to Sentry DB.
- *
- * @param oldAuthzObj the existing authzObj
- * @param newAuthzObj the new name to be changed to
- * @param oldLocation a existing path of the given authzObj
- * @param newLocation a new path to be changed to
- * @param event the NotificationEvent object from where authzObj and locations were obtained
- */
- private void renameAuthzPath(String oldAuthzObj, String newAuthzObj, String oldLocation,
- String newLocation, NotificationEvent event) throws Exception {
- // AuthzObj is case insensitive
- oldAuthzObj = oldAuthzObj.toLowerCase();
- newAuthzObj = newAuthzObj.toLowerCase();
- String oldPathTree = getPath(oldLocation);
- String newPathTree = getPath(newLocation);
-
- LOGGER.debug("HMS Path Update ["
- + "OP : renameAuthzObject, "
- + "oldAuthzObj : " + oldAuthzObj + ", "
- + "newAuthzObj : " + newAuthzObj + ", "
- + "oldLocation : " + oldLocation + ", "
- + "newLocation : " + newLocation + ", "
- + "notification event ID: " + event.getEventId() + "]");
-
- // In the case of HiveObj name has changed
- if (!oldAuthzObj.equalsIgnoreCase(newAuthzObj)) {
- // Skip update if encounter malformed path for both oldLocation and newLocation.
- if ((oldPathTree != null) && (newPathTree != null)) {
- UniquePathsUpdate update = new UniquePathsUpdate(event, false);
- update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
- update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree));
- if (oldLocation.equals(newLocation)) {
- //Only name has changed
- // - Alter table rename for an external table
- sentryStore.renameAuthzObj(oldAuthzObj, newAuthzObj, update);
- } else {
- // Both name and location has changed
- // - Alter table rename for managed table
- sentryStore.renameAuthzPathsMapping(oldAuthzObj, newAuthzObj, oldPathTree,
- newPathTree, update);
- }
- } else {
- updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, newPathTree, event);
- }
- } else if (!oldLocation.equals(newLocation)) {
- // Only Location has changed, e.g. Alter table set location
- if ((oldPathTree != null) && (newPathTree != null)) {
- UniquePathsUpdate update = new UniquePathsUpdate(event, false);
- update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
- update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree));
- sentryStore.updateAuthzPathsMapping(oldAuthzObj, oldPathTree,
- newPathTree, update);
- } else {
- updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, newPathTree,event);
- }
- } else {
- LOGGER.error("Update Notification for Auhorizable object {}, with no change, skipping",
- oldAuthzObj);
- throw new SentryInvalidHMSEventException("Update Notification for Authorizable object"
- + "with no change");
- }
- }
-
- private void updateAuthzPathsMapping(String oldAuthzObj, String oldPathTree,
- String newAuthzObj, String newPathTree, NotificationEvent event) throws Exception {
- if (oldPathTree != null) {
- UniquePathsUpdate update = new UniquePathsUpdate(event, false);
- update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
- sentryStore.deleteAuthzPathsMapping(oldAuthzObj,
- Collections.singleton(oldPathTree),
- update);
- } else if (newPathTree != null) {
- UniquePathsUpdate update = new UniquePathsUpdate(event, false);
- update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree));
- sentryStore.addAuthzPathsMapping(newAuthzObj,
- Collections.singleton(newPathTree),
- update);
- }
-
- }
-
- /**
- * Get path tree from a given path. It return null if encounters
- * SentryMalformedPathException which indicates a malformed path.
- *
- * @param path a path
- * @return the path tree given a path.
- */
- private String getPath(String path) {
- try {
- return PathsUpdate.parsePath(path);
- } catch (SentryMalformedPathException e) {
- LOGGER.error("Unexpected path while parsing {}", path, e);
- }
- return null;
- }
-
- private void dropSentryDbPrivileges(String dbName, NotificationEvent event) {
- try {
- TSentryAuthorizable authorizable = new TSentryAuthorizable(authServerName);
- authorizable.setDb(dbName);
- sentryStore.dropPrivilege(authorizable, getPermUpdatableOnDrop(authorizable));
- } catch (SentryNoSuchObjectException e) {
- LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the database: {}",
- dbName);
- } catch (Exception e) {
- LOGGER.error("Could not process Drop database event." + "Event: " + event.toString(), e);
- }
- }
-
- private void dropSentryTablePrivileges(String dbName, String tableName,
- NotificationEvent event) {
- try {
- TSentryAuthorizable authorizable = new TSentryAuthorizable(authServerName);
- authorizable.setDb(dbName);
- authorizable.setTable(tableName);
- sentryStore.dropPrivilege(authorizable, getPermUpdatableOnDrop(authorizable));
- } catch (SentryNoSuchObjectException e) {
- LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the table: {}.{}",
- dbName, tableName);
- } catch (Exception e) {
- LOGGER.error("Could not process Drop table event. Event: " + event.toString(), e);
- }
- }
-
- private void renamePrivileges(String oldDbName, String oldTableName, String newDbName,
- String newTableName) throws
- Exception {
- TSentryAuthorizable oldAuthorizable = new TSentryAuthorizable(authServerName);
- oldAuthorizable.setDb(oldDbName);
- oldAuthorizable.setTable(oldTableName);
- TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(authServerName);
- newAuthorizable.setDb(newDbName);
- newAuthorizable.setTable(newTableName);
- Update update =
- getPermUpdatableOnRename(oldAuthorizable, newAuthorizable);
- sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update);
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
index 7e774b4..6fe9b4e 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
@@ -52,7 +52,7 @@ import static java.util.Collections.emptyMap;
* <p>Abstracts communication with HMS and exposes APi's to connect/disconnect to HMS and to
* request HMS snapshots and also for new notifications.
*/
-class SentryHMSClient implements AutoCloseable {
+public class SentryHMSClient implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(SentryHMSClient.class);
private static final String NOT_CONNECTED_MSG = "Client is not connected to HMS";
@@ -69,7 +69,7 @@ class SentryHMSClient implements AutoCloseable {
private final Counter failedSnapshotsCount = SentryMetrics.getInstance()
.getCounter(name(FullUpdateInitializer.class, "failed"));
- SentryHMSClient(Configuration conf, HiveConnectionFactory hiveConnectionFactory) {
+ public SentryHMSClient(Configuration conf, HiveConnectionFactory hiveConnectionFactory) {
this.conf = conf;
this.hiveConnectionFactory = hiveConnectionFactory;
}
@@ -100,7 +100,7 @@ class SentryHMSClient implements AutoCloseable {
* @throws InterruptedException if connection was interrupted
* @throws MetaException if other errors happened
*/
- void connect()
+ public void connect()
throws IOException, InterruptedException, MetaException {
if (client != null) {
return;
@@ -139,7 +139,7 @@ class SentryHMSClient implements AutoCloseable {
*
* @return Full path snapshot and the last notification id on success
*/
- PathsImage getFullSnapshot() {
+ public PathsImage getFullSnapshot() {
if (client == null) {
LOGGER.error(NOT_CONNECTED_MSG);
return new PathsImage(Collections.<String, Collection<String>>emptyMap(),
http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
index 43535a7..96c6810 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
@@ -48,6 +48,8 @@ import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.sentry.Command;
import org.apache.sentry.core.common.utils.SigUtils;
+import org.apache.sentry.provider.db.service.persistent.HMSFollower;
+import org.apache.sentry.provider.db.service.persistent.LeaderStatusMonitor;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
import org.apache.sentry.provider.db.service.thrift.SentryHealthCheckServletContextListener;
import org.apache.sentry.provider.db.service.thrift.SentryMetrics;