You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ha...@apache.org on 2017/03/24 19:53:09 UTC
[1/2] sentry git commit: SENTRY-1613: Add propagating logic for
Perm/Path updates in Sentry service (Hao Hao,
Reviewed by: Alexander Kolbasov and Lei Xu)
Repository: sentry
Updated Branches:
refs/heads/sentry-ha-redesign 268ee50ef -> 2811311ea
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
deleted file mode 100644
index d12b134..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.hdfs;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.locks.ReadWriteLock;
-
-import org.apache.thrift.TException;
-import org.junit.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.hdfs.Updateable.Update;
-import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
-import org.junit.After;
-import org.junit.Assume;
-import org.junit.Test;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-
-public class TestUpdateForwarder {
-
- public static class DummyUpdate implements Update {
- private long seqNum = 0;
- private boolean hasFullUpdate = false;
- private String state;
- public DummyUpdate() {
- this(0, false);
- }
- public DummyUpdate(long seqNum, boolean hasFullUpdate) {
- this.seqNum = seqNum;
- this.hasFullUpdate = hasFullUpdate;
- }
- public String getState() {
- return state;
- }
- public DummyUpdate setState(String stuff) {
- this.state = stuff;
- return this;
- }
- @Override
- public boolean hasFullImage() {
- return hasFullUpdate;
- }
- @Override
- public long getSeqNum() {
- return seqNum;
- }
- @Override
- public void setSeqNum(long seqNum) {
- this.seqNum = seqNum;
- }
- @Override
- public byte[] serialize() throws IOException {
- return state.getBytes();
- }
-
- @Override
- public void deserialize(byte[] data) throws IOException {
- state = new String(data);
- }
-
- @Override
- public String JSONSerialize() throws TException {
- return state;
- }
-
- @Override
- public void JSONDeserialize(String update) throws TException {
- state = new String(update);
- }
- }
-
- static class DummyUpdatable implements Updateable<DummyUpdate> {
-
- private List<String> state = new LinkedList<String>();
- private long lastUpdatedSeqNum = 0;
-
- @Override
- public void updatePartial(Iterable<DummyUpdate> update, ReadWriteLock lock) {
- for (DummyUpdate u : update) {
- state.add(u.getState());
- lastUpdatedSeqNum = u.seqNum;
- }
- }
-
- @Override
- public Updateable<DummyUpdate> updateFull(DummyUpdate update) {
- DummyUpdatable retVal = new DummyUpdatable();
- retVal.lastUpdatedSeqNum = update.seqNum;
- retVal.state = Lists.newArrayList(update.state.split(","));
- return retVal;
- }
-
- @Override
- public long getLastUpdatedSeqNum() {
- return lastUpdatedSeqNum;
- }
-
- @Override
- public DummyUpdate createFullImageUpdate(long currSeqNum) {
- DummyUpdate retVal = new DummyUpdate(currSeqNum, true);
- retVal.state = Joiner.on(",").join(state);
- return retVal;
- }
-
- public String getState() {
- return Joiner.on(",").join(state);
- }
-
- @Override
- public String getUpdateableTypeName() {
- // TODO Auto-generated method stub
- return "DummyUpdator";
- }
- }
-
- static class DummyImageRetreiver implements ImageRetriever<DummyUpdate> {
-
- private String state;
- public void setState(String state) {
- this.state = state;
- }
- @Override
- public DummyUpdate retrieveFullImage(long currSeqNum) {
- DummyUpdate retVal = new DummyUpdate(currSeqNum, true);
- retVal.state = state;
- return retVal;
- }
- }
-
- protected Configuration testConf = new Configuration();
- protected UpdateForwarder<DummyUpdate> updateForwarder;
-
- @After
- public void cleanup() throws Exception {
- if (updateForwarder != null) {
- updateForwarder.close();
- updateForwarder = null;
- }
- }
-
- @Test
- public void testInit() throws Exception {
- DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
- imageRetreiver.setState("a,b,c");
- updateForwarder = UpdateForwarder.create(
- testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 10, true);
- Assert.assertEquals(-2, updateForwarder.getLastUpdatedSeqNum());
- List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
- Assert.assertTrue(allUpdates.size() == 1);
- Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
-
- // If the current process has restarted the input seqNum will be > currSeq
- allUpdates = updateForwarder.getAllUpdatesFrom(100);
- Assert.assertTrue(allUpdates.size() == 1);
- Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
- Assert.assertEquals(-2, allUpdates.get(0).getSeqNum());
- allUpdates = updateForwarder.getAllUpdatesFrom(-1);
- Assert.assertEquals(0, allUpdates.size());
- }
-
- @Test
- public void testUpdateReceive() throws Exception {
- DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
- imageRetreiver.setState("a,b,c");
- updateForwarder = UpdateForwarder.create(
- testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5, true);
- updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
- List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
- Assert.assertEquals(2, allUpdates.size());
- Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
- Assert.assertEquals("d", allUpdates.get(1).getState());
- }
-
- // This happens when we the first update from HMS is a -1 (If the heartbeat
- // thread checks Sentry's current seqNum before any update has come in)..
- // This will lead the first and second entries in the updatelog to differ
- // by more than +1..
- @Test
- public void testUpdateReceiveWithNullImageRetriver() throws Exception {
- Assume.assumeTrue(!testConf.getBoolean(ServerConfig.SENTRY_HA_ENABLED,
- false));
- updateForwarder = UpdateForwarder.create(
- testConf, new DummyUpdatable(), new DummyUpdate(), null, 5, false);
- updateForwarder.handleUpdateNotification(new DummyUpdate(-1, true).setState("a"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(1);
- Assert.assertEquals("a", allUpdates.get(0).getState());
- updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("b"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("c"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
- allUpdates = updateForwarder.getAllUpdatesFrom(0);
- Assert.assertEquals(2, allUpdates.size());
- Assert.assertEquals("b", allUpdates.get(0).getState());
- Assert.assertEquals("c", allUpdates.get(1).getState());
- }
-
- @Test
- public void testGetUpdates() throws Exception {
- DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
- imageRetreiver.setState("a,b,c");
- updateForwarder = UpdateForwarder.create(
- testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5, true);
- updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
- List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
- Assert.assertEquals(2, allUpdates.size());
-
- updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
-
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
- allUpdates = updateForwarder.getAllUpdatesFrom(0);
- Assert.assertEquals(4, allUpdates.size());
- Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
- Assert.assertEquals(4, allUpdates.get(0).getSeqNum());
- Assert.assertEquals("d", allUpdates.get(1).getState());
- Assert.assertEquals(5, allUpdates.get(1).getSeqNum());
- Assert.assertEquals("e", allUpdates.get(2).getState());
- Assert.assertEquals(6, allUpdates.get(2).getSeqNum());
- Assert.assertEquals("f", allUpdates.get(3).getState());
- Assert.assertEquals(7, allUpdates.get(3).getSeqNum());
-
- updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum());
- allUpdates = updateForwarder.getAllUpdatesFrom(8);
- Assert.assertEquals(1, allUpdates.size());
- Assert.assertEquals("g", allUpdates.get(0).getState());
- }
-
- @Test
- public void testGetUpdatesAfterExternalEntityReset() throws Exception {
- /*
- * Disabled for Sentry HA. Since the sequence numbers are trakced in ZK, the
- * lower sequence updates are ignored which causes this test to fail in HA
- * mode
- */
- Assume.assumeTrue(!testConf.getBoolean(ServerConfig.SENTRY_HA_ENABLED,
- false));
-
- DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
- imageRetreiver.setState("a,b,c");
- updateForwarder = UpdateForwarder.create(
- testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5, true);
- updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
-
- updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
-
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
- List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
- Assert.assertEquals(4, allUpdates.size());
- Assert.assertEquals("f", allUpdates.get(3).getState());
- Assert.assertEquals(7, allUpdates.get(3).getSeqNum());
-
- updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum());
- allUpdates = updateForwarder.getAllUpdatesFrom(8);
- Assert.assertEquals(1, allUpdates.size());
- Assert.assertEquals("g", allUpdates.get(0).getState());
-
- imageRetreiver.setState("a,b,c,d,e,f,g,h");
-
- // New update comes with SeqNum = 1
- updateForwarder.handleUpdateNotification(new DummyUpdate(1, false).setState("h"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- // NN plugin asks for next update
- allUpdates = updateForwarder.getAllUpdatesFrom(9);
- Assert.assertEquals(1, allUpdates.size());
- Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState());
- // Assert.assertEquals(1, allUpdates.get(0).getSeqNum());
- }
-
- @Test
- public void testUpdateLogCompression() throws Exception {
- DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
- imageRetreiver.setState("a,b,c");
- updateForwarder = UpdateForwarder.create(
- testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5, true);
- updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
- List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
- Assert.assertEquals(2, allUpdates.size());
-
- updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(9, false).setState("h"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(10, false).setState("i"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(11, false).setState("j"));
-
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(11, updateForwarder.getLastUpdatedSeqNum());
- allUpdates = updateForwarder.getAllUpdatesFrom(0);
- Assert.assertEquals(3, allUpdates.size());
- Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState());
- Assert.assertEquals(9, allUpdates.get(0).getSeqNum());
- Assert.assertEquals("i", allUpdates.get(1).getState());
- Assert.assertEquals(10, allUpdates.get(1).getSeqNum());
- Assert.assertEquals("j", allUpdates.get(2).getState());
- Assert.assertEquals(11, allUpdates.get(2).getSeqNum());
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index 6ea6d3f..bbfa713 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -3156,7 +3156,7 @@ public class SentryStore {
}
/**
- * Get the last processed change ID for perm/path delta changes.
+ * Gets the last processed change ID for perm/path delta changes.
*
* @param pm the PersistenceManager
* @param changeCls the class of a delta c
@@ -3164,7 +3164,7 @@ public class SentryStore {
* @return the last processed changedID for the delta changes. If no
* change found then return 0.
*/
- private <T extends MSentryChange> long getLastProcessedChangeIDCore(
+ private <T extends MSentryChange> Long getLastProcessedChangeIDCore(
PersistenceManager pm, Class<T> changeCls) {
Query query = pm.newQuery(changeCls);
query.setResult("max(changeID)");
@@ -3173,14 +3173,13 @@ public class SentryStore {
}
/**
- * Get the last processed change ID for perm delta changes.
+ * Gets the last processed change ID for perm delta changes.
*
* Internally invoke {@link #getLastProcessedChangeIDCore(PersistenceManager, Class)}
*
* @return latest perm change ID.
*/
- @VisibleForTesting
- long getLastProcessedPermChangeID() throws Exception {
+ public Long getLastProcessedPermChangeID() throws Exception {
return tm.executeTransaction(
new TransactionBlock<Long>() {
public Long execute(PersistenceManager pm) throws Exception {
@@ -3190,12 +3189,26 @@ public class SentryStore {
}
/**
+ * Gets the last processed change ID for path delta changes.
+ *
+ * @return latest path change ID.
+ */
+ public Long getLastProcessedPathChangeID() throws Exception {
+ return tm.executeTransaction(
+ new TransactionBlock<Long>() {
+ public Long execute(PersistenceManager pm) throws Exception {
+ return getLastProcessedChangeIDCore(pm, MSentryPathChange.class);
+ }
+ });
+ }
+
+ /**
* Get the notification ID of last processed path delta change.
*
* @return the notification ID of latest path change. If no change
* found then return 0.
*/
- public long getLastProcessedNotificationID() throws Exception {
+ public Long getLastProcessedNotificationID() throws Exception {
return tm.executeTransaction(
new TransactionBlock<Long>() {
public Long execute(PersistenceManager pm) throws Exception {
@@ -3221,8 +3234,8 @@ public class SentryStore {
new TransactionBlock<Object>() {
public Object execute(PersistenceManager pm) throws Exception {
Query query = pm.newQuery(MSentryPermChange.class);
- query.setFilter("this.changeID == t");
- query.declareParameters("long t");
+ query.setFilter("this.changeID == id");
+ query.declareParameters("long id");
List<MSentryPermChange> permChanges = (List<MSentryPermChange>)query.execute(changeID);
if (permChanges == null) {
noSuchUpdate(changeID);
@@ -3244,7 +3257,8 @@ public class SentryStore {
* @throws Exception
*/
@SuppressWarnings("unchecked")
- private <T extends MSentryChange> List<T> getMSentryChanges(final Class<T> cls) throws Exception {
+ private <T extends MSentryChange> List<T> getMSentryChanges(final Class<T> cls)
+ throws Exception {
return tm.executeTransaction(
new TransactionBlock<List<T>>() {
public List<T> execute(PersistenceManager pm) throws Exception {
@@ -3266,15 +3280,71 @@ public class SentryStore {
}
/**
- * Get the MSentryPathChange object by ChangeID.
+ * Checks if any MSentryChange object exists with the given changeID.
+ *
+ * @param pm PersistenceManager
+ * @param changeCls class instance of type {@link MSentryChange}
+ * @param changeID changeID
+ * @return true if found the MSentryChange object, otherwise false.
+ * @throws Exception
+ */
+ @SuppressWarnings("unchecked")
+ private <T extends MSentryChange> Boolean changeExistsCore(
+ PersistenceManager pm, Class<T> changeCls, final long changeID)
+ throws Exception {
+ Query query = pm.newQuery(changeCls);
+ query.setFilter("this.changeID == id");
+ query.declareParameters("long id");
+ List<T> changes = (List<T>)query.execute(changeID);
+ return !changes.isEmpty();
+ }
+
+ /**
+ * Checks if any MSentryPermChange object exists with the given changeID.
+ *
+ * @param changeID
+ * @return true if found the MSentryPermChange object, otherwise false.
+ * @throws Exception
+ */
+ public Boolean permChangeExists(final long changeID) throws Exception {
+ return tm.executeTransaction(
+ new TransactionBlock<Boolean>() {
+ public Boolean execute(PersistenceManager pm) throws Exception {
+ return changeExistsCore(pm, MSentryPermChange.class, changeID);
+ }
+ });
+ }
+
+ /**
+ * Checks if any MSentryPathChange object exists with the given changeID.
+ *
+ * @param changeID
+ * @return true if found the MSentryPathChange object, otherwise false.
+ * @throws Exception
+ */
+ public Boolean pathChangeExists(final long changeID) throws Exception {
+ return tm.executeTransaction(
+ new TransactionBlock<Boolean>() {
+ public Boolean execute(PersistenceManager pm) throws Exception {
+ return changeExistsCore(pm, MSentryPathChange.class, changeID);
+ }
+ });
+ }
+
+ /**
+ * Gets the MSentryPathChange object by ChangeID.
+ *
+ * @param changeID the given changeID
+ * @return the MSentryPathChange object with corresponding changeID.
+ * @throws Exception
*/
public MSentryPathChange getMSentryPathChangeByID(final long changeID) throws Exception {
return (MSentryPathChange) tm.executeTransaction(
new TransactionBlock<Object>() {
public Object execute(PersistenceManager pm) throws Exception {
Query query = pm.newQuery(MSentryPathChange.class);
- query.setFilter("this.changeID == t");
- query.declareParameters("long t");
+ query.setFilter("this.changeID == id");
+ query.declareParameters("long id");
List<MSentryPathChange> pathChanges = (List<MSentryPathChange>)query.execute(changeID);
if (pathChanges == null) {
noSuchUpdate(changeID);
@@ -3297,6 +3367,88 @@ public class SentryStore {
}
/**
+ * Gets a list of MSentryChange objects greater than or equal to the given changeID.
+ *
+ * @param changeID
+ * @return a list of MSentryChange objects. It can returns an empty list.
+ * @throws Exception
+ */
+ @SuppressWarnings("unchecked")
+ private <T extends MSentryChange> List<T> getMSentryChangesCore(PersistenceManager pm,
+ Class<T> changeCls, final long changeID) throws Exception {
+ Query query = pm.newQuery(changeCls);
+ query.setFilter("this.changeID >= t");
+ query.declareParameters("long t");
+ List<T> changes = (List<T>) query.execute(changeID);
+ return changes;
+ }
+
+ /**
+ * Gets a list of MSentryPathChange objects greater than or equal to the given changeID.
+ * If there is any path deltas missing in {@link MSentryPathChange} table, which means
+ * the size of retrieved paths deltas is less than the requested one, an empty list will
+ * be returned to caller.
+ *
+ * @param changeID
+ * @return a list of MSentryPathChange objects. It can returns an empty list.
+ * @throws Exception
+ */
+ public List<MSentryPathChange> getMSentryPathChanges(final long changeID)
+ throws Exception {
+ return tm.executeTransaction(new TransactionBlock<List<MSentryPathChange>>() {
+ public List<MSentryPathChange> execute(PersistenceManager pm) throws Exception {
+ List<MSentryPathChange> pathChanges =
+ getMSentryChangesCore(pm, MSentryPathChange.class, changeID);
+ long curChangeID = getLastProcessedChangeIDCore(pm, MSentryPathChange.class);
+ long expectedSize = curChangeID - changeID + 1;
+ long actualSize = pathChanges.size();
+ if (actualSize < expectedSize) {
+ LOGGER.error(String.format("Certain path delta is missing in " +
+ "SENTRY_PATH_CHANEG table! Current size of elements = %s and expected size = %s, " +
+ "from changeID: %s. The table may get corrupted.",
+ actualSize, expectedSize, changeID));
+ return Collections.emptyList();
+ } else {
+ return pathChanges;
+ }
+ }
+ });
+ }
+
+ /**
+ * Gets a list of MSentryPermChange objects greater than or equal to the given ChangeID.
+ * If there is any perm deltas missing in {@link MSentryPermChange} table, which means
+ * the size of retrieved perm deltas is less than the requested one, an empty list will
+ * be returned to caller.
+ *
+ * @param changeID
+ * @return a list of MSentryPermChange objects
+ * @throws Exception
+ */
+ public List<MSentryPermChange> getMSentryPermChanges(final long changeID)
+ throws Exception {
+ return tm.executeTransaction(
+ new TransactionBlock<List<MSentryPermChange>>() {
+ public List<MSentryPermChange> execute(PersistenceManager pm) throws Exception {
+ List<MSentryPermChange> permChanges =
+ getMSentryChangesCore(pm, MSentryPermChange.class, changeID);
+ long curChangeID = getLastProcessedChangeIDCore(pm, MSentryPermChange.class);
+ long expectedSize = curChangeID - changeID + 1;
+ long actualSize = permChanges.size();
+ if (actualSize < expectedSize) {
+ LOGGER.error(String.format("Certain perm delta is missing in " +
+ "SENTRY_PERM_CHANEG table! Current size of elements = %s and expected size = %s, " +
+ "from changeID: %s. The table may get corrupted.",
+ actualSize, expectedSize, changeID));
+ return Collections.emptyList();
+ } else {
+ return permChanges;
+ }
+ }
+ });
+ }
+
+ /**
* Execute Perm/Path UpdateTransaction and corresponding actual
* action transaction, e.g dropSentryRole, in a single transaction.
* The order of the transaction does not matter because there is no
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index f3f51da..6c14f5e 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -32,9 +32,7 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hive.hcatalog.messaging.HCatEventMessage;
import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
import org.apache.sentry.core.common.exception.*;
-import org.apache.sentry.hdfs.PathsUpdate;
import org.apache.sentry.hdfs.PermissionsUpdate;
-import org.apache.sentry.hdfs.UpdateableAuthzPaths;
import org.apache.sentry.hdfs.FullUpdateInitializer;
import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
import org.apache.sentry.provider.db.SentryPolicyStorePlugin;
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
index 75f855c..aaa0b9f 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
@@ -2371,8 +2371,17 @@ public class TestSentryStore extends org.junit.Assert {
assertEquals(0, privileges.size());
// Query the persisted perm change and ensure it equals to the original one
- MSentryPermChange delPermChange = sentryStore.getMSentryPermChangeByID(lastChangeID + 1);
+ lastChangeID = sentryStore.getLastProcessedPermChangeID();
+ MSentryPermChange delPermChange = sentryStore.getMSentryPermChangeByID(lastChangeID);
assertEquals(delUpdate.JSONSerialize(), delPermChange.getPermChange());
+
+ // Verify getMSentryPermChanges will return all MSentryPermChanges up
+ // to the given changeID.
+ List<MSentryPermChange> mSentryPermChanges = sentryStore.getMSentryPermChanges(1);
+ assertEquals(lastChangeID, mSentryPermChanges.size());
+
+ // Verify ifPermChangeExists will return true for persisted MSentryPermChange.
+ assertEquals(true, sentryStore.permChangeExists(1));
}
@Test
@@ -2480,7 +2489,7 @@ public class TestSentryStore extends org.junit.Assert {
@Test
public void testRenameObjWithPermUpdate() throws Exception {
- String roleName1 = "role1", roleName2 = "role2", roleName3 = "role3";
+ String roleName1 = "role1";
String grantor = "g1";
String table1 = "tbl1", table2 = "tbl2";
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-tests/sentry-tests-hive-v2/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive-v2/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java b/sentry-tests/sentry-tests-hive-v2/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
index 5e8b2fa..1530eb2 100644
--- a/sentry-tests/sentry-tests-hive-v2/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
+++ b/sentry-tests/sentry-tests-hive-v2/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
@@ -216,7 +216,6 @@ public class TestHDFSIntegration {
@Override
public Void run() throws Exception {
HiveConf hiveConf = new HiveConf();
- hiveConf.set("sentry.metastore.plugins", "org.apache.sentry.hdfs.MetastorePlugin");
hiveConf.set("sentry.service.client.server.rpc-address", "localhost");
hiveConf.set("sentry.hdfs.service.client.server.rpc-address", "localhost");
hiveConf.set("sentry.hdfs.service.client.server.rpc-port", String.valueOf(sentryPort));
@@ -444,6 +443,7 @@ public class TestHDFSIntegration {
properties.put(ServerConfig.RPC_ADDRESS, "localhost");
properties.put(ServerConfig.RPC_PORT, String.valueOf(sentryPort > 0 ? sentryPort : 0));
properties.put(ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false");
+ properties.put(ServerConfig.SENTRY_NOTIFICATION_LOG_ENABLED,"true");
properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING, ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING);
properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, policyFileLocation.getPath());
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationAdvanced.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationAdvanced.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationAdvanced.java
index 1b5eb53..8de4f29 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationAdvanced.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationAdvanced.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
/**
* Advanced tests for HDFS Sync integration
*/
+@Ignore
public class TestHDFSIntegrationAdvanced extends TestHDFSIntegrationBase {
private static final Logger LOGGER = LoggerFactory
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java
index 859c8f8..7769f24 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java
@@ -497,7 +497,7 @@ public abstract class TestHDFSIntegrationBase {
hiveConf.set("hive.metastore.authorization.storage.checks", "true");
hiveConf.set("hive.metastore.uris", "thrift://localhost:" + hmsPort);
hiveConf.set("hive.metastore.pre.event.listeners", "org.apache.sentry.binding.metastore.MetastoreAuthzBinding");
- hiveConf.set("hive.metastore.event.listeners", "org.apache.sentry.binding.metastore.SentryMetastorePostEventListener");
+ hiveConf.set("hive.metastore.transactional.event.listeners", "org.apache.hive.hcatalog.listener.DbNotificationListener");
hiveConf.set("hive.security.authorization.task.factory", "org.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl");
hiveConf.set("hive.server2.session.hook", "org.apache.sentry.binding.hive.HiveAuthzBindingSessionHook");
hiveConf.set("sentry.metastore.service.users", "hive");// queries made by hive user (beeline) skip meta store check
@@ -696,6 +696,7 @@ public abstract class TestHDFSIntegrationBase {
properties.put(ServerConfig.SENTRY_STORE_JDBC_PASS, "dummy");
properties.put("sentry.service.processor.factories",
"org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessorFactory,org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory");
+ properties.put(ServerConfig.SENTRY_NOTIFICATION_LOG_ENABLED,"true");
properties.put("sentry.policy.store.plugins", "org.apache.sentry.hdfs.SentryPlugin");
properties.put(ServerConfig.RPC_MIN_THREADS, "3");
for (Map.Entry<String, String> entry : properties.entrySet()) {
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationEnd2End.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationEnd2End.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationEnd2End.java
index c791272..0e97466 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationEnd2End.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationEnd2End.java
@@ -40,6 +40,7 @@ import java.util.ArrayList;
/**
* This test class includes all HDFS Sync smoke tests
*/
+@Ignore
public class TestHDFSIntegrationEnd2End extends TestHDFSIntegrationBase {
private static final Logger LOGGER = LoggerFactory
.getLogger(TestHDFSIntegrationEnd2End.class);
[2/2] sentry git commit: SENTRY-1613: Add propagating logic for
Perm/Path updates in Sentry service (Hao Hao,
Reviewed by: Alexander Kolbasov and Lei Xu)
Posted by ha...@apache.org.
SENTRY-1613: Add propagating logic for Perm/Path updates in Sentry service (Hao Hao, Reviewed by: Alexander Kolbasov and Lei Xu)
Change-Id: I1223a45df8ab1c169772b2ffe92762f0dcc4e82e
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/2811311e
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/2811311e
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/2811311e
Branch: refs/heads/sentry-ha-redesign
Commit: 2811311ea6dfe2e26af67a545333486ca0e89092
Parents: 268ee50
Author: hahao <ha...@cloudera.com>
Authored: Thu Mar 23 17:51:15 2017 -0700
Committer: hahao <ha...@cloudera.com>
Committed: Fri Mar 24 12:51:39 2017 -0700
----------------------------------------------------------------------
.../org/apache/sentry/hdfs/DeltaRetriever.java | 67 ++++
.../org/apache/sentry/hdfs/ImageRetriever.java | 7 +-
.../apache/sentry/hdfs/ThriftSerializer.java | 10 +-
.../apache/sentry/hdfs/DBUpdateForwarder.java | 88 ++++
.../org/apache/sentry/hdfs/MetastorePlugin.java | 397 -------------------
.../apache/sentry/hdfs/PathDeltaRetriever.java | 76 ++++
.../apache/sentry/hdfs/PathImageRetriever.java | 26 +-
.../apache/sentry/hdfs/PermDeltaRetriever.java | 76 ++++
.../apache/sentry/hdfs/PermImageRetriever.java | 14 +-
.../sentry/hdfs/SentryHDFSServiceProcessor.java | 29 +-
.../sentry/hdfs/SentryHdfsMetricsUtil.java | 19 -
.../org/apache/sentry/hdfs/SentryPlugin.java | 168 +++-----
.../org/apache/sentry/hdfs/UpdateForwarder.java | 335 ----------------
.../sentry/hdfs/UpdateablePermissions.java | 63 ---
.../apache/sentry/hdfs/TestUpdateForwarder.java | 359 -----------------
.../db/service/persistent/SentryStore.java | 176 +++++++-
.../sentry/service/thrift/HMSFollower.java | 2 -
.../db/service/persistent/TestSentryStore.java | 13 +-
.../tests/e2e/hdfs/TestHDFSIntegration.java | 2 +-
.../e2e/hdfs/TestHDFSIntegrationAdvanced.java | 1 +
.../tests/e2e/hdfs/TestHDFSIntegrationBase.java | 3 +-
.../e2e/hdfs/TestHDFSIntegrationEnd2End.java | 1 +
22 files changed, 572 insertions(+), 1360 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/DeltaRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/DeltaRetriever.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/DeltaRetriever.java
new file mode 100644
index 0000000..0e58593
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/DeltaRetriever.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.hdfs;
+
+import java.util.Collection;
+
+import static org.apache.sentry.hdfs.Updateable.Update;
+
+/**
+ * DeltaRetriever obtains a delta update of either Sentry Permissions or Sentry
+ * representation of HMS Paths.
+ * <p>
+ * Sentry permissions are represented as {@link PermissionsUpdate} and HMS Paths
+ * are represented as {@link PathsUpdate}. The delta update contains change
+ * from a state to another.
+ * The {@link #retrieveDelta(long)} method obtains such delta update from a persistent storage.
+ * Delta update is propagated to a consumer of Sentry, such as HDFS NameNode whenever
+ * the consumer needs to synchronize the update.
+ */
+public interface DeltaRetriever<K extends Update> {
+
+ /**
+ * Retrieves all delta updates of type {@link Update} newer than or equal with
+ * the given sequence number/change ID (inclusive) from a persistent storage.
+ * An empty collection can be returned.
+ *
+ * @param seqNum the given seq number
+ * @return a collect of delta updates of type K
+ * @throws Exception when there is an error in operation on persistent storage
+ */
+ Collection<K> retrieveDelta(long seqNum) throws Exception;
+
+ /**
+ * Checks if there the delta update is available, given the sequence number/change
+ * ID, from a persistent storage.
+ *
+ * @param seqNum the given seq number
+ * @return true if there are such delta updates available.
+ * Otherwise it will be false.
+ * @throws Exception when there is an error in operation on persistent storage
+ */
+ boolean isDeltaAvailable(long seqNum) throws Exception;
+
+ /**
+ * Gets the latest updated delta ID.
+ *
+ * @return the latest updated delta ID.
+ * @throws Exception when there is an error in operation on persistent storage
+ */
+ long getLatestDeltaID() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java
index 0e40756..e96140d 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java
@@ -25,7 +25,7 @@ import static org.apache.sentry.hdfs.Updateable.Update;
* ({@code PathsUpdate}).
* <p>
* The snapshot image should represent a consistent state.
- * The {@link #retrieveFullImage(long)} method obtains such state snapshot from
+ * The {@link #retrieveFullImage()} method obtains such state snapshot from
* a persistent storage.
* The Snapshots are propagated to a consumer of Sentry, such as HDFS NameNode,
* whenever the consumer needs to synchronize its full state.
@@ -33,13 +33,12 @@ import static org.apache.sentry.hdfs.Updateable.Update;
public interface ImageRetriever<K extends Update> {
/**
- * Retrieve a complete snapshot of type {@code Update} from a persistent storage.
+ * Retrieves a complete snapshot of type {@code Update} from a persistent storage.
*
- * @param seqNum
* @return a complete snapshot of type {@link Update}, e.g {@link PermissionsUpdate}
* or {@link PathsUpdate}
* @throws Exception
*/
- K retrieveFullImage(long seqNum) throws Exception;
+ K retrieveFullImage() throws Exception;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
index 69aa098..d7b9923 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
@@ -25,12 +25,12 @@ import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TSimpleJSONProtocol;
+import org.apache.thrift.protocol.TJSONProtocol;
public class ThriftSerializer {
- final static private TSimpleJSONProtocol.Factory tSimpleJSONProtocol =
- new TSimpleJSONProtocol.Factory();
+ final static private TJSONProtocol.Factory tJSONProtocol =
+ new TJSONProtocol.Factory();
// Use default max thrift message size here.
// TODO: Figure out a way to make maxMessageSize configurable, eg. create a serializer singleton at startup by
@@ -67,13 +67,13 @@ public class ThriftSerializer {
public static String serializeToJSON(TBase base) throws TException {
// Initiate a new TSerializer each time for thread safety.
- TSerializer tSerializer = new TSerializer(tSimpleJSONProtocol);
+ TSerializer tSerializer = new TSerializer(tJSONProtocol);
return tSerializer.toString(base);
}
public static void deserializeFromJSON(TBase base, String dataInJson) throws TException {
// Initiate a new TDeserializer each time for thread safety.
- TDeserializer tDeserializer = new TDeserializer(tSimpleJSONProtocol);
+ TDeserializer tDeserializer = new TDeserializer(tJSONProtocol);
tDeserializer.fromString(base, dataInJson);
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java
new file mode 100644
index 0000000..b8542b3
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * DBUpdateForwarder propagates a complete snapshot or delta update of either
+ * Sentry Permissions ({@code PermissionsUpdate}) or Sentry representation of
+ * HMS Paths ({@code PathsUpdate}), retrieved from a persistent storage, to a
+ * Sentry client, e.g HDFS NameNode.
+ * <p>
+ * It is a thread safe class, as all the underlying database operation is thread safe.
+ */
+@ThreadSafe
+class DBUpdateForwarder<K extends Updateable.Update> {
+
+ private final ImageRetriever<K> imageRetriever;
+ private final DeltaRetriever<K> deltaRetriever;
+ private static final Logger LOGGER = LoggerFactory.getLogger(DBUpdateForwarder.class);
+
+ DBUpdateForwarder(final ImageRetriever<K> imageRetriever,
+ final DeltaRetriever<K> deltaRetriever) {
+ this.imageRetriever = imageRetriever;
+ this.deltaRetriever = deltaRetriever;
+ }
+
+ /**
+ * Retrieves all delta updates from the requested sequence number (inclusive) from
+ * a persistent storage.
+ * It first checks if there is such newer deltas exists in the persistent storage.
+ * If there is, returns a list of delta updates.
+ * Otherwise, a complete snapshot will be returned.
+ *
+ * @param seqNum the requested sequence number
+ * @return a list of delta updates, e.g. {@link PathsUpdate} or {@link PermissionsUpdate}
+ */
+ List<K> getAllUpdatesFrom(long seqNum) throws Exception {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("#### GetAllUpdatesFrom [reqSeqNum = {} ]", seqNum);
+ }
+
+ // No newer updates available than the requested one.
+ long curSeqNum = deltaRetriever.getLatestDeltaID();
+ if (seqNum > curSeqNum) {
+ return Collections.emptyList();
+ }
+
+ // Checks if there is newer deltas exists in the persistent storage.
+ // If there is, returns a list of delta updates.
+ if ((seqNum != SentryStore.INIT_CHANGE_ID) &&
+ deltaRetriever.isDeltaAvailable(seqNum)) {
+ Collection<K> deltas = deltaRetriever.retrieveDelta(seqNum);
+ if (!deltas.isEmpty()) {
+ return new LinkedList<>(deltas);
+ }
+ }
+
+ // Otherwise, a complete snapshot will be returned.
+ List<K> retVal = new LinkedList<>();
+ retVal.add(imageRetriever.retrieveFullImage());
+ return retVal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
deleted file mode 100644
index 16ffa1b..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
+++ /dev/null
@@ -1,397 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.codahale.metrics.Timer;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
-import org.apache.hadoop.hive.metastore.MetaStorePreEventListener;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
-import org.apache.sentry.provider.db.SentryMetastoreListenerPlugin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * Plugin implementation of {@link SentryMetastoreListenerPlugin} that hooks
- * into the sites in the {@link MetaStorePreEventListener} that deal with
- * creation/updation and deletion for paths.
- */
-public class MetastorePlugin extends SentryMetastoreListenerPlugin {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(MetastorePlugin.class);
-
- private static final String initializationFailureMsg = "Cache failed to initialize, cannot send path updates to Sentry." +
- " Please review HMS error logs during startup for additional information. If the initialization failure is due" +
- " to SentryMalformedPathException, you will need to rectify the malformed path in HMS db and restart HMS";
-
- class SyncTask implements Runnable {
- @Override
- public void run() {
- if (!notificiationLock.tryLock()) {
- // No need to sync.. as metastore is in the process of pushing an update..
- return;
- }
- if (MetastorePlugin.this.authzPaths == null) {
- LOGGER.warn(initializationFailureMsg);
- return;
- }
- try {
- long lastSeenBySentry =
- MetastorePlugin.this.getClient().getLastSeenHMSPathSeqNum();
- long lastSent = lastSentSeqNum;
- if (lastSeenBySentry != lastSent) {
- LOGGER.warn("#### Sentry not in sync with HMS [" + lastSeenBySentry + ", "
- + lastSent + "]");
- PathsUpdate fullImageUpdate =
- MetastorePlugin.this.authzPaths.createFullImageUpdate(lastSent);
- notifySentryNoLock(fullImageUpdate);
- LOGGER.warn("#### Synced Sentry with update [" + lastSent + "]");
- }
- } catch (Exception e) {
- sentryClient = null;
- LOGGER.error("Error talking to Sentry HDFS Service !!", e);
- } finally {
- syncSent = true;
- notificiationLock.unlock();
- }
- }
- }
-
- private final Configuration conf;
- private SentryHDFSServiceClient sentryClient;
- private volatile UpdateableAuthzPaths authzPaths;
- private Lock notificiationLock;
-
- // Initialized to some value > 1.
- protected static final AtomicLong seqNum = new AtomicLong(5);
-
- // Has to match the value of seqNum
- protected static volatile long lastSentSeqNum = seqNum.get();
- private volatile boolean syncSent = false;
- private volatile boolean initComplete = false;
- private volatile boolean queueFlushComplete = false;
- private volatile Throwable initError = null;
- private final Queue<PathsUpdate> updateQueue = new LinkedList<PathsUpdate>();
-
- private final ExecutorService threadPool; //NOPMD
- private final Configuration sentryConf;
-
- static class ProxyHMSHandler extends HMSHandler {
- public ProxyHMSHandler(String name, HiveConf conf) throws MetaException {
- super(name, conf);
- }
- }
-
- public MetastorePlugin(Configuration conf, Configuration sentryConf) {
- this.notificiationLock = new ReentrantLock();
-
- if (!(conf instanceof HiveConf)) {
- String error = "Configuration is not an instanceof HiveConf";
- LOGGER.error(error);
- throw new RuntimeException(error);
- }
- this.conf = new HiveConf((HiveConf)conf);
-
- this.sentryConf = new Configuration(sentryConf);
- this.conf.unset(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS.varname);
- this.conf.unset(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname);
- this.conf.unset(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname);
- this.conf.unset(HiveConf.ConfVars.METASTOREURIS.varname);
- Thread initUpdater = new Thread() {
- @Override
- public void run() {
- MetastoreCacheInitializer cacheInitializer = null;
- try {
- cacheInitializer =
- new MetastoreCacheInitializer(new ProxyHMSHandler("sentry.hdfs",
- (HiveConf) MetastorePlugin.this.conf),
- MetastorePlugin.this.conf);
- MetastorePlugin.this.authzPaths =
- cacheInitializer.createInitialUpdate();
- LOGGER.info("#### Metastore Plugin initialization complete !!");
- synchronized (updateQueue) {
- while (!updateQueue.isEmpty()) {
- PathsUpdate update = updateQueue.poll();
- if (update != null) {
- processUpdate(update);
- }
- }
- queueFlushComplete = true;
- }
- LOGGER.info("#### Finished flushing queued updates to Sentry !!");
- } catch (Exception e) {
- LOGGER.error("#### Could not create Initial AuthzPaths or HMSHandler !!", e);
- initError = e;
- } finally {
- if (cacheInitializer != null) {
- try {
- cacheInitializer.close();
- } catch (Exception e) {
- LOGGER.info("#### Exception while closing cacheInitializer !!", e);
- }
- }
- initComplete = true;
- }
- }
- };
- if (this.conf.getBoolean(
- ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE,
- ServerConfig
- .SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE_DEFAULT)) {
- LOGGER.warn("#### Metastore Cache initialization is set to aync..." +
- "HDFS ACL synchronization will not happen until metastore" +
- "cache initialization is completed !!");
- initUpdater.start();
- } else {
- initUpdater.run(); //NOPMD
- }
- try {
- sentryClient = SentryHDFSServiceClientFactory.create(sentryConf);
- } catch (Exception e) {
- sentryClient = null;
- LOGGER.error("Could not connect to Sentry HDFS Service !!", e);
- }
- ScheduledExecutorService newThreadPool = Executors.newScheduledThreadPool(1);
- newThreadPool.scheduleWithFixedDelay(new SyncTask(),
- this.conf.getLong(ServerConfig
- .SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
- ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT),
- this.conf.getLong(ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_MS,
- ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_DEFAULT),
- TimeUnit.MILLISECONDS);
- this.threadPool = newThreadPool;
- }
-
- @Override
- public void addPath(String authzObj, String path) {
- List<String> pathTree = null;
- try {
- pathTree = PathsUpdate.parsePath(path);
- } catch (SentryMalformedPathException e) {
- LOGGER.error("Unexpected path in addPath: authzObj = " + authzObj + " , path = " + path);
- e.printStackTrace();
- return;
- }
- if(pathTree == null) {
- return;
- }
- LOGGER.debug("#### HMS Path Update ["
- + "OP : addPath, "
- + "authzObj : " + authzObj.toLowerCase() + ", "
- + "path : " + path + "]");
- PathsUpdate update = createHMSUpdate();
- update.newPathChange(authzObj.toLowerCase()).addToAddPaths(pathTree);
- notifySentryAndApplyLocal(update);
- }
-
- @Override
- public void removeAllPaths(String authzObj, List<String> childObjects) {
- LOGGER.debug("#### HMS Path Update ["
- + "OP : removeAllPaths, "
- + "authzObj : " + authzObj.toLowerCase() + ", "
- + "childObjs : " + (childObjects == null ? "[]" : childObjects) + "]");
- PathsUpdate update = createHMSUpdate();
- if (childObjects != null) {
- for (String childObj : childObjects) {
- update.newPathChange(authzObj.toLowerCase() + "." + childObj).addToDelPaths(
- Lists.newArrayList(PathsUpdate.ALL_PATHS));
- }
- }
- update.newPathChange(authzObj.toLowerCase()).addToDelPaths(
- Lists.newArrayList(PathsUpdate.ALL_PATHS));
- notifySentryAndApplyLocal(update);
- }
-
- @Override
- public void removePath(String authzObj, String path) {
- if ("*".equals(path)) {
- removeAllPaths(authzObj.toLowerCase(), null);
- } else {
- List<String> pathTree = null;
- try {
- pathTree = PathsUpdate.parsePath(path);
- } catch (SentryMalformedPathException e) {
- LOGGER.error("Unexpected path in removePath: authzObj = " + authzObj + " , path = " + path);
- e.printStackTrace();
- return;
- }
- if(pathTree == null) {
- return;
- }
- LOGGER.debug("#### HMS Path Update ["
- + "OP : removePath, "
- + "authzObj : " + authzObj.toLowerCase() + ", "
- + "path : " + path + "]");
- PathsUpdate update = createHMSUpdate();
- update.newPathChange(authzObj.toLowerCase()).addToDelPaths(pathTree);
- notifySentryAndApplyLocal(update);
- }
- }
-
- @Override
- public void renameAuthzObject(String oldName, String oldPath, String newName,
- String newPath) {
- String oldNameLC = oldName != null ? oldName.toLowerCase() : null;
- String newNameLC = newName != null ? newName.toLowerCase() : null;
- PathsUpdate update = createHMSUpdate();
- LOGGER.debug("#### HMS Path Update ["
- + "OP : renameAuthzObject, "
- + "oldName : " + oldNameLC + ","
- + "oldPath : " + oldPath + ","
- + "newName : " + newNameLC + ","
- + "newPath : " + newPath + "]");
- List<String> newPathTree = null;
- try {
- newPathTree = PathsUpdate.parsePath(newPath);
- } catch (SentryMalformedPathException e) {
- LOGGER.error("Unexpected path in renameAuthzObject while parsing newPath: oldName=" + oldName + ", oldPath=" + oldPath +
- ", newName=" + newName + ", newPath=" + newPath);
- e.printStackTrace();
- return;
- }
-
- if( newPathTree != null ) {
- update.newPathChange(newNameLC).addToAddPaths(newPathTree);
- }
- List<String> oldPathTree = null;
- try {
- oldPathTree = PathsUpdate.parsePath(oldPath);
- } catch (SentryMalformedPathException e) {
- LOGGER.error("Unexpected path in renameAuthzObject while parsing oldPath: oldName=" + oldName + ", oldPath=" + oldPath +
- ", newName=" + newName + ", newPath=" + newPath);
- e.printStackTrace();
- return;
- }
-
- if( oldPathTree != null ) {
- update.newPathChange(oldNameLC).addToDelPaths(oldPathTree);
- }
- notifySentryAndApplyLocal(update);
- }
-
- private SentryHDFSServiceClient getClient() {
- if (sentryClient == null) {
- try {
- sentryClient = SentryHDFSServiceClientFactory.create(sentryConf);
- } catch (Exception e) {
- sentryClient = null;
- LOGGER.error("Could not connect to Sentry HDFS Service !!", e);
- }
- }
- return sentryClient;
- }
-
- private PathsUpdate createHMSUpdate() {
- PathsUpdate update = new PathsUpdate(seqNum.incrementAndGet(), false);
- LOGGER.debug("#### Creating HMS Path Update SeqNum : [" + seqNum.get() + "]");
- return update;
- }
-
- protected void notifySentryNoLock(PathsUpdate update) {
- final Timer.Context timerContext =
- SentryHdfsMetricsUtil.getNotifyHMSUpdateTimer.time();
- try {
- getClient().notifyHMSUpdate(update);
- } catch (Exception e) {
- LOGGER.error("Could not send update to Sentry HDFS Service !!", e);
- SentryHdfsMetricsUtil.getFailedNotifyHMSUpdateCounter.inc();
- } finally {
- timerContext.stop();
- }
- }
-
- protected void notifySentry(PathsUpdate update) {
- notificiationLock.lock();
- try {
- if (!syncSent) {
- new SyncTask().run();
- }
-
- notifySentryNoLock(update);
- } finally {
- lastSentSeqNum = update.getSeqNum();
- notificiationLock.unlock();
- LOGGER.debug("#### HMS Path Last update sent : ["+ lastSentSeqNum + "]");
- }
- }
-
- protected void applyLocal(PathsUpdate update) {
- final Timer.Context timerContext =
- SentryHdfsMetricsUtil.getApplyLocalUpdateTimer.time();
- if(authzPaths == null) {
- LOGGER.error(initializationFailureMsg);
- return;
- }
- authzPaths.updatePartial(Lists.newArrayList(update), new ReentrantReadWriteLock());
- timerContext.stop();
- SentryHdfsMetricsUtil.getApplyLocalUpdateHistogram.update(
- update.getPathChanges().size());
- }
-
- private void notifySentryAndApplyLocal(PathsUpdate update) {
- if(authzPaths == null) {
- LOGGER.error(initializationFailureMsg);
- return;
- }
- if (initComplete) {
- processUpdate(update);
- } else {
- if (initError == null) {
- synchronized (updateQueue) {
- if (!queueFlushComplete) {
- updateQueue.add(update);
- } else {
- processUpdate(update);
- }
- }
- } else {
- StringWriter sw = new StringWriter();
- initError.printStackTrace(new PrintWriter(sw));
- LOGGER.error("#### Error initializing Metastore Plugin" +
- "[" + sw.toString() + "] !!");
- throw new RuntimeException(initError);
- }
- LOGGER.warn("#### Path update [" + update.getSeqNum() + "] not sent to Sentry.." +
- "Metastore hasn't been initialized yet !!");
- }
- }
-
- protected void processUpdate(PathsUpdate update) {
- applyLocal(update);
- notifySentry(update);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathDeltaRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathDeltaRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathDeltaRetriever.java
new file mode 100644
index 0000000..cea5b9d
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathDeltaRetriever.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import org.apache.sentry.provider.db.service.model.MSentryPathChange;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * PathDeltaRetriever retrieves delta updates of Hive Paths from a persistent
+ * storage and translates them into a collection of {@code PathsUpdate} that the
+ * consumers, such as HDFS NameNode, can understand.
+ * <p>
+ * It is a thread safe class, as all the underlying database operation is thread safe.
+ */
+@ThreadSafe
+public class PathDeltaRetriever implements DeltaRetriever<PathsUpdate> {
+
+ private final SentryStore sentryStore;
+
+ PathDeltaRetriever(SentryStore sentryStore) {
+ this.sentryStore = sentryStore;
+ }
+
+ @Override
+ public Collection<PathsUpdate> retrieveDelta(long seqNum) throws Exception {
+ Collection<MSentryPathChange> mSentryPathChanges =
+ sentryStore.getMSentryPathChanges(seqNum);
+ if (mSentryPathChanges.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ Collection<PathsUpdate> updates = new ArrayList<>(mSentryPathChanges.size());
+ for (MSentryPathChange mSentryPathChange : mSentryPathChanges) {
+ // Gets the changeID from the persisted MSentryPathChange.
+ long changeID = mSentryPathChange.getChangeID();
+ // Creates a corresponding PathsUpdate and deserialize the
+ // persisted delta update in JSON format to TPathsUpdate with
+ // associated changeID.
+ PathsUpdate pathsUpdate = new PathsUpdate();
+ pathsUpdate.JSONDeserialize(mSentryPathChange.getPathChange());
+ pathsUpdate.setSeqNum(changeID);
+ updates.add(pathsUpdate);
+ }
+ return updates;
+ }
+
+ @Override
+ public boolean isDeltaAvailable(long seqNum) throws Exception {
+ return sentryStore.pathChangeExists(seqNum);
+ }
+
+ @Override
+ public long getLatestDeltaID() throws Exception {
+ return sentryStore.getLastProcessedPathChangeID();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
index 16a1604..0eaac80 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
@@ -18,28 +18,35 @@
package org.apache.sentry.hdfs;
import com.codahale.metrics.Timer;
+import com.google.common.collect.Lists;
import org.apache.sentry.hdfs.service.thrift.TPathChanges;
import org.apache.sentry.provider.db.service.persistent.PathsImage;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import javax.annotation.concurrent.ThreadSafe;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* PathImageRetriever obtains a complete snapshot of Hive Paths from a persistent
- * storage and translate it into {@code PathsUpdate} that the consumers, such as
- * HDFS NameNod, can understand.
+ * storage and translates it into {@code PathsUpdate} that the consumers, such as
+ * HDFS NameNode, can understand.
+ * <p>
+ * It is a thread safe class, as all the underlying database operation is thread safe.
*/
+@ThreadSafe
public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
private final SentryStore sentryStore;
+ private final static String[] root = {"/"};
PathImageRetriever(SentryStore sentryStore) {
this.sentryStore = sentryStore;
}
@Override
- public PathsUpdate retrieveFullImage(long seqNum) throws Exception {
+ public PathsUpdate retrieveFullImage() throws Exception {
try (final Timer.Context timerContext =
SentryHdfsMetricsUtil.getRetrievePathFullImageTimer.time()) {
@@ -54,8 +61,7 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
// Adds all <hiveObj, paths> mapping to be included in this paths update.
// And label it with the latest delta change sequence number for consumer
// to be aware of the next delta change it should continue with.
- // TODO: use curSeqNum from DB instead of seqNum when doing SENTRY-1613
- PathsUpdate pathsUpdate = new PathsUpdate(seqNum, true);
+ PathsUpdate pathsUpdate = new PathsUpdate(curSeqNum, true);
for (Map.Entry<String, Set<String>> pathEnt : pathImage.entrySet()) {
TPathChanges pathChange = pathsUpdate.newPathChange(pathEnt.getKey());
@@ -66,7 +72,15 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
SentryHdfsMetricsUtil.getPathChangesHistogram.update(pathsUpdate
.getPathChanges().size());
+
+ // Translate PathsUpdate that contains a full image to TPathsDump for
+ // consumer (NN) to be able to quickly construct UpdateableAuthzPaths
+ // from TPathsDump.
+ UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(root);
+ authzPaths.updatePartial(Lists.newArrayList(pathsUpdate),
+ new ReentrantReadWriteLock());
+ pathsUpdate.toThrift().setPathsDump(authzPaths.getPathsDump().createPathsDump());
return pathsUpdate;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermDeltaRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermDeltaRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermDeltaRetriever.java
new file mode 100644
index 0000000..9649b02
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermDeltaRetriever.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import org.apache.sentry.provider.db.service.model.MSentryPermChange;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * PermDeltaRetriever retrieves delta updates of Sentry permission from a persistent
+ * storage and translates it into a collection of {@code PermissionsUpdate} that the
+ * consumers, such as HDFS NameNode, can understand.
+ * <p>
+ * It is a thread safe class, as all the underlying database operation is thread safe.
+ */
+@ThreadSafe
+public class PermDeltaRetriever implements DeltaRetriever<PermissionsUpdate> {
+
+ private final SentryStore sentryStore;
+
+ PermDeltaRetriever(SentryStore sentryStore) {
+ this.sentryStore = sentryStore;
+ }
+
+ @Override
+ public Collection<PermissionsUpdate> retrieveDelta(long seqNum) throws Exception {
+ Collection<MSentryPermChange> mSentryPermChanges =
+ sentryStore.getMSentryPermChanges(seqNum);
+ if (mSentryPermChanges.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ Collection<PermissionsUpdate> updates = new ArrayList<>(mSentryPermChanges.size());
+ for (MSentryPermChange mSentryPermChange : mSentryPermChanges) {
+ // Get the changeID from the persisted MSentryPermChange
+ long changeID = mSentryPermChange.getChangeID();
+ // Create a corresponding PermissionsUpdate and deserialize the
+ // persisted delta update in JSON format to TPermissionsUpdate with
+ // associated changeID.
+ PermissionsUpdate permsUpdate = new PermissionsUpdate();
+ permsUpdate.JSONDeserialize(mSentryPermChange.getPermChange());
+ permsUpdate.setSeqNum(changeID);
+ updates.add(permsUpdate);
+ }
+ return updates;
+ }
+
+ @Override
+ public boolean isDeltaAvailable(long seqNum) throws Exception {
+ return sentryStore.permChangeExists(seqNum);
+ }
+
+ @Override
+ public long getLatestDeltaID() throws Exception {
+ return sentryStore.getLastProcessedPermChangeID();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java
index 3017c9e..5964f17 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java
@@ -24,6 +24,7 @@ import org.apache.sentry.hdfs.service.thrift.TRoleChanges;
import org.apache.sentry.provider.db.service.persistent.PermissionsImage;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import javax.annotation.concurrent.ThreadSafe;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -31,9 +32,12 @@ import java.util.Map;
/**
* PermImageRetriever obtains a complete snapshot of Sentry permission from a persistent
- * storage and translate it into {@code PermissionsUpdate} that the consumers, such as
- * HDFS NameNod, can understand.
+ * storage and translates it into {@code PermissionsUpdate} that the consumers, such as
+ * HDFS NameNode, can understand.
+ * <p>
+ * It is a thread safe class, as all the underlying database operation is thread safe.
*/
+@ThreadSafe
public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> {
private final SentryStore sentryStore;
@@ -43,7 +47,7 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> {
}
@Override
- public PermissionsUpdate retrieveFullImage(long seqNum) throws Exception {
+ public PermissionsUpdate retrieveFullImage() throws Exception {
try(Timer.Context timerContext =
SentryHdfsMetricsUtil.getRetrievePermFullImageTimer.time()) {
@@ -80,8 +84,6 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> {
}
PermissionsUpdate permissionsUpdate = new PermissionsUpdate(tPermUpdate);
- // TODO: use curSeqNum from DB instead of seqNum when doing SENTRY-1567
- permissionsUpdate.setSeqNum(seqNum);
SentryHdfsMetricsUtil.getPrivilegeChangesHistogram.update(
tPermUpdate.getPrivilegeChangesSize());
SentryHdfsMetricsUtil.getRoleChangesHistogram.update(
@@ -90,4 +92,4 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> {
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
index e4f3f58..395618a 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
@@ -42,10 +42,6 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface {
retVal.setAuthzPathUpdate(new LinkedList<TPathsUpdate>());
retVal.setAuthzPermUpdate(new LinkedList<TPermissionsUpdate>());
if (SentryPlugin.instance != null) {
- if (SentryPlugin.instance.isOutOfSync()) {
- throw new TException(
- "This Sentry server is not communicating with other nodes and out of sync ");
- }
final Timer.Context timerContext =
SentryHdfsMetricsUtil.getAllAuthzUpdatesTimer.time();
try {
@@ -99,33 +95,12 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface {
@Override
public void handle_hms_notification(TPathsUpdate update) throws TException {
- final Timer.Context timerContext =
- SentryHdfsMetricsUtil.getHandleHmsNotificationTimer.time();
- try {
- PathsUpdate hmsUpdate = new PathsUpdate(update);
- if (SentryPlugin.instance != null) {
- SentryPlugin.instance.handlePathUpdateNotification(hmsUpdate);
- LOGGER.debug("Authz Paths update [" + hmsUpdate.getSeqNum() + "]..");
- } else {
- LOGGER.error("SentryPlugin not initialized yet !!");
- }
- } catch (Exception e) {
- LOGGER.error("Error handling notification from HMS", e);
- SentryHdfsMetricsUtil.getFailedHandleHmsNotificationCounter.inc();
- throw new TException(e);
- } finally {
- timerContext.stop();
- SentryHdfsMetricsUtil.getHandleHmsPathChangeHistogram.update(
- update.getPathChangesSize());
- if (update.isHasFullImage()) {
- SentryHdfsMetricsUtil.getHandleHmsHasFullImageCounter.inc();
- }
- }
+ throw new UnsupportedOperationException("handle_hms_notification");
}
@Override
public long check_hms_seq_num(long pathSeqNum) throws TException {
- return SentryPlugin.instance.getLastSeenHMSPathSeqNum();
+ throw new UnsupportedOperationException("check_hms_seq_num");
}
/**
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java
index be14569..28bf20e 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java
@@ -83,25 +83,6 @@ public class SentryHdfsMetricsUtil {
MetricRegistry.name(PathImageRetriever.class, "retrieve-path-full-image",
"path-changes-size"));
-
- // Metrics for notifySentry HMS update in MetaStorePlugin
- // The timer used for each notifySentry
- public static final Timer getNotifyHMSUpdateTimer = sentryMetrics.getTimer(
- MetricRegistry.name(MetastorePlugin.class, "notify-sentry-HMS-update"));
- // The number of failed notifySentry
- public static final Counter getFailedNotifyHMSUpdateCounter = sentryMetrics.getCounter(
- MetricRegistry.name(MetastorePlugin.class, "notify-sentry-HMS-update",
- "failed-num"));
-
- // Metrics for applyLocal update in MetastorePlugin
- // The time used for each applyLocal
- public static final Timer getApplyLocalUpdateTimer = sentryMetrics.getTimer(
- MetricRegistry.name(MetastorePlugin.class, "apply-local-update"));
- // The size of path changes for each applyLocal
- public static final Histogram getApplyLocalUpdateHistogram = sentryMetrics.getHistogram(
- MetricRegistry.name(MetastorePlugin.class, "apply-local-update",
- "path-change-size"));
-
private SentryHdfsMetricsUtil() {
// Make constructor private to avoid instantiation
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
index 029f9d5..0bd0833 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
@@ -46,22 +46,22 @@ import org.slf4j.LoggerFactory;
import static org.apache.sentry.hdfs.Updateable.Update;
/**
- * SentryPlugin facilitates HDFS synchronization between HMS and NameNode.
+ * SentryPlugin listens to all sentry permission update events, persists permission
+ * changes into database. It also facilitates HDFS synchronization between HMS and NameNode.
* <p>
- * Normally, synchronization happens via partial (incremental) updates:
+ * Synchronization happens via a complete snapshot or partial (incremental) updates.
+ * Normally, it is the latter:
* <ol>
* <li>
- * Whenever updates happen on HMS, they are immediately pushed to Sentry.
- * Commonly, it's a single update per remote call.
+ * Whenever updates happen on HMS, a corresponding notification log is generated,
+ * and {@link HMSFollower} will process the notification event and persist it in database.
* <li>
* The NameNode periodically asks Sentry for updates. Sentry may return zero
- * or more updates previously received from HMS.
+ * or more updates previously received via HMS notification log.
* </ol>
* <p>
- * Each individual update is assigned a corresponding sequence number. Those
- * numbers serve to detect the out-of-sync situations between HMS and Sentry and
- * between Sentry and NameNode. Detecting out-of-sync situation triggers full
- * update between the components that are out-of-sync.
+ * Each individual update is assigned a corresponding sequence number to synchronize
+ * updates between Sentry and NameNode.
* <p>
* SentryPlugin also implements signal-triggered mechanism of full path
* updates from HMS to Sentry and from Sentry to NameNode, to address
@@ -69,39 +69,18 @@ import static org.apache.sentry.hdfs.Updateable.Update;
* Those out-of-sync situations may not be detectable via the exsiting sequence
* numbers mechanism (most likely due to the implementation bugs).
* <p>
- * To facilitate signal-triggered full update from HMS to Sentry and from Sentry
- * to the NameNode, the following 3 boolean variables are defined:
- * fullUpdateHMS, fullUpdateHMSWait, and fullUpdateNN.
- * <ol>
- * <li>
- * The purpose of fullUpdateHMS is to ensure that Sentry asks HMS for full
- * update, and does so only once per signal.
- * <li>
- * The purpose of fullUpdateNN is to ensure that Sentry sends full update
- * to NameNode, and does so only once per signal.
- * <li>
- * The purpose of fullUpdateHMSWait is to ensure that NN update only happens
- * after HMS update.
+ * To facilitate signal-triggered full update from Sentry to NameNode,
+ * the boolean variables 'fullUpdateNN' is used to ensure that Sentry sends full
+ * update to NameNode, and does so only once per signal.
* </ol>
* The details:
* <ol>
* <li>
- * Upon receiving a signal, fullUpdateHMS, fullUpdateHMSWait, and fullUpdateNN
- * are all set to true.
- * <li>
- * On the next call to getLastSeenHMSPathSeqNum() from HMS, Sentry checks if
- * fullUpdateHMS == true. If yes, it returns invalid (zero) sequence number
- * to HMS, so HMS would push full update by calling handlePathUpdateNotification()
- * next time. fullUpdateHMS is immediately reset to false, to only trigger one
- * full update request to HMS per signal.
- * <li>
- * When HMS calls handlePathUpdateNotification(), Sentry checks if the update
- * is a full image. If it is, fullUpdateHMSWait is set to false.
+ * Upon receiving a signal, fullUpdateNN is set to true.
* <li>
* When NameNode calls getAllPathsUpdatesFrom() asking for partial update,
- * Sentry checks if both fullUpdateNN == true and fullUpdateHMSWait == false.
- * If yes, it sends full update back to NameNode and immediately resets
- * fullUpdateNN to false.
+ * Sentry checks if both fullUpdateNN == true. If yes, it sends full update back
+ * to NameNode and immediately resets fullUpdateNN to false.
* </ol>
*/
@@ -109,18 +88,12 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
private static final Logger LOGGER = LoggerFactory.getLogger(SentryPlugin.class);
- private final AtomicBoolean fullUpdateHMSWait = new AtomicBoolean(false);
- private final AtomicBoolean fullUpdateHMS = new AtomicBoolean(false);
private final AtomicBoolean fullUpdateNN = new AtomicBoolean(false);
-
public static volatile SentryPlugin instance;
- private UpdateForwarder<PathsUpdate> pathsUpdater;
- private UpdateForwarder<PermissionsUpdate> permsUpdater;
- // TODO: Each perm change sequence number should be generated during persistence at sentry store.
- private final AtomicLong permSeqNum = new AtomicLong(5);
- private PermImageRetriever permImageRetriever;
- private boolean outOfSync = false;
+ private DBUpdateForwarder<PathsUpdate> pathsUpdater;
+ private DBUpdateForwarder<PermissionsUpdate> permsUpdater;
+
/*
* This number is smaller than starting sequence numbers used by NN and HMS
* so in both cases its effect is to create appearance of out-of-sync
@@ -130,33 +103,15 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
*/
private static final long NO_LAST_SEEN_HMS_PATH_SEQ_NUM = 0L;
- /*
- * Call from HMS to get the last known update sequence #.
- */
- long getLastSeenHMSPathSeqNum() {
- if (!fullUpdateHMS.getAndSet(false)) {
- return pathsUpdater.getLastSeen();
- } else {
- LOGGER.info("SIGNAL HANDLING: asking for full update from HMS");
- return NO_LAST_SEEN_HMS_PATH_SEQ_NUM;
- }
- }
-
@Override
public void initialize(Configuration conf, SentryStore sentryStore) throws SentryPluginException {
- final String[] pathPrefixes = conf
- .getStrings(ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES,
- ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES_DEFAULT);
- final int initUpdateRetryDelayMs =
- conf.getInt(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
- ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT);
- permImageRetriever = new PermImageRetriever(sentryStore);
-
- pathsUpdater = UpdateForwarder.create(conf, new UpdateableAuthzPaths(
- pathPrefixes), new PathsUpdate(0, false), null, 100, initUpdateRetryDelayMs, false);
- permsUpdater = UpdateForwarder.create(conf,
- new UpdateablePermissions(permImageRetriever), new PermissionsUpdate(0, false),
- permImageRetriever, 100, initUpdateRetryDelayMs, true);
+ PermImageRetriever permImageRetriever = new PermImageRetriever(sentryStore);
+ PathImageRetriever pathImageRetriever = new PathImageRetriever(sentryStore);
+ PermDeltaRetriever permDeltaRetriever = new PermDeltaRetriever(sentryStore);
+ PathDeltaRetriever pathDeltaRetriever = new PathDeltaRetriever(sentryStore);
+ pathsUpdater = new DBUpdateForwarder<>(pathImageRetriever, pathDeltaRetriever);
+ permsUpdater = new DBUpdateForwarder<>(permImageRetriever, permDeltaRetriever);
+
LOGGER.info("Sentry HDFS plugin initialized !!");
instance = this;
@@ -182,7 +137,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
if (!fullUpdateNN.get()) {
// Most common case - Sentry is NOT handling a full update.
return pathsUpdater.getAllUpdatesFrom(pathSeqNum);
- } else if (!fullUpdateHMSWait.get()) {
+ } else {
/*
* Sentry is in the middle of signal-triggered full update.
* It already got a full update from HMS
@@ -216,10 +171,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
LOGGER.warn("SIGNAL HANDLING: returned NULL instead of full update to NameNode (???)");
}
return updates;
- } else {
- // Sentry is handling a full update, but not yet received full update from HMS
- LOGGER.warn("SIGNAL HANDLING: sending partial update to NameNode: still waiting for full update from HMS");
- return pathsUpdater.getAllUpdatesFrom(pathSeqNum);
}
}
@@ -227,32 +178,17 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
return permsUpdater.getAllUpdatesFrom(permSeqNum);
}
- /*
- * Handle partial (most common) or full update from HMS
- */
- public void handlePathUpdateNotification(PathsUpdate update)
- throws SentryPluginException {
- pathsUpdater.handleUpdateNotification(update);
- if (!update.hasFullImage()) { // most common case of partial update
- LOGGER.debug("Recieved Authz Path update [" + update.getSeqNum() + "]..");
- } else { // rare case of full update
- LOGGER.warn("Recieved Authz Path FULL update [" + update.getSeqNum() + "]..");
- // indicate that we're ready to send full update to NameNode
- fullUpdateHMSWait.set(false);
- }
- }
-
@Override
public Update onAlterSentryRoleAddGroups(
TAlterSentryRoleAddGroupsRequest request) throws SentryPluginException {
- PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+ PermissionsUpdate update = new PermissionsUpdate();
TRoleChanges rUpdate = update.addRoleUpdate(request.getRoleName());
for (TSentryGroup group : request.getGroups()) {
rUpdate.addToAddGroups(group.getGroupName());
}
- permsUpdater.handleUpdateNotification(update);
- LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "]..");
+ LOGGER.debug(String.format("onAlterSentryRoleAddGroups, Authz Perm preUpdate[ %s ]",
+ request.getRoleName()));
return update;
}
@@ -260,14 +196,14 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
public Update onAlterSentryRoleDeleteGroups(
TAlterSentryRoleDeleteGroupsRequest request)
throws SentryPluginException {
- PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+ PermissionsUpdate update = new PermissionsUpdate();
TRoleChanges rUpdate = update.addRoleUpdate(request.getRoleName());
for (TSentryGroup group : request.getGroups()) {
rUpdate.addToDelGroups(group.getGroupName());
}
- permsUpdater.handleUpdateNotification(update);
- LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "]..");
+ LOGGER.debug(String.format("onAlterSentryRoleDeleteGroups, Authz Perm preUpdate [ %s ]",
+ request.getRoleName()));
return update;
}
@@ -296,12 +232,12 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
return null;
}
- PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+ PermissionsUpdate update = new PermissionsUpdate();
update.addPrivilegeUpdate(authzObj).putToAddPrivileges(
roleName, privilege.getAction().toUpperCase());
- permsUpdater.handleUpdateNotification(update);
- LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + "]..");
+ LOGGER.debug(String.format("onAlterSentryRoleGrantPrivilegeCore, Authz Perm preUpdate [ %s ]",
+ authzObj));
return update;
}
@@ -310,13 +246,13 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
throws SentryPluginException {
String oldAuthz = HMSFollower.getAuthzObj(request.getOldAuthorizable());
String newAuthz = HMSFollower.getAuthzObj(request.getNewAuthorizable());
- PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+ PermissionsUpdate update = new PermissionsUpdate();
TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS);
privUpdate.putToAddPrivileges(newAuthz, newAuthz);
privUpdate.putToDelPrivileges(oldAuthz, oldAuthz);
- permsUpdater.handleUpdateNotification(update);
- LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + newAuthz + ", " + oldAuthz + "]..");
+ LOGGER.debug(String.format("onRenameSentryPrivilege, Authz Perm preUpdate [ %s ]",
+ oldAuthz));
return update;
}
@@ -339,14 +275,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
}
}
- public boolean isOutOfSync() {
- return outOfSync;
- }
-
- public void setOutOfSync(boolean outOfSync) {
- this.outOfSync = outOfSync;
- }
-
private PermissionsUpdate onAlterSentryRoleRevokePrivilegeCore(String roleName, TSentryPrivilege privilege)
throws SentryPluginException {
String authzObj = getAuthzObj(privilege);
@@ -354,46 +282,44 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
return null;
}
- PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+ PermissionsUpdate update = new PermissionsUpdate();
update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
roleName, privilege.getAction().toUpperCase());
- permsUpdater.handleUpdateNotification(update);
- LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + authzObj + "]..");
+ LOGGER.debug(String.format("onAlterSentryRoleRevokePrivilegeCore, Authz Perm preUpdate [ %s ]",
+ authzObj));
return update;
}
@Override
public Update onDropSentryRole(TDropSentryRoleRequest request)
throws SentryPluginException {
- PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+ PermissionsUpdate update = new PermissionsUpdate();
update.addPrivilegeUpdate(PermissionsUpdate.ALL_AUTHZ_OBJ).putToDelPrivileges(
request.getRoleName(), PermissionsUpdate.ALL_AUTHZ_OBJ);
update.addRoleUpdate(request.getRoleName()).addToDelGroups(PermissionsUpdate.ALL_GROUPS);
- permsUpdater.handleUpdateNotification(update);
- LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "]..");
+ LOGGER.debug(String.format("onDropSentryRole, Authz Perm preUpdate [ %s ]",
+ request.getRoleName()));
return update;
}
@Override
public Update onDropSentryPrivilege(TDropPrivilegesRequest request)
throws SentryPluginException {
- PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+ PermissionsUpdate update = new PermissionsUpdate();
String authzObj = HMSFollower.getAuthzObj(request.getAuthorizable());
update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES);
- permsUpdater.handleUpdateNotification(update);
- LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + authzObj + "]..");
+ LOGGER.debug(String.format("onDropSentryPrivilege, Authz Perm preUpdate [ %s ]",
+ authzObj));
return update;
}
@Override
public void onSignal(final String sigName) {
LOGGER.info("SIGNAL HANDLING: Received signal " + sigName + ", triggering full update");
- fullUpdateHMS.set(true);
- fullUpdateHMSWait.set(true);
fullUpdateNN.set(true);
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
deleted file mode 100644
index 22c5769..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class UpdateForwarder<K extends Updateable.Update> implements
- Updateable<K>, Closeable {
-
- private final AtomicLong lastSeenSeqNum = new AtomicLong(0);
- protected final AtomicLong lastCommittedSeqNum = new AtomicLong(0);
- // Updates should be handled in order
- private final Executor updateHandler = Executors.newSingleThreadExecutor();
-
- // Update log is used when propagate updates to a downstream cache.
- // The preUpdate log stores all commits that were applied to this cache.
- // When the update log is filled to capacity (getMaxUpdateLogSize()), all
- // entries are cleared and a compact image if the state of the cache is
- // appended to the log.
- // The first entry in an update log (consequently the first preUpdate a
- // downstream cache sees) will be a full image. All subsequent entries are
- // partial edits
- protected final LinkedList<K> updateLog = new LinkedList<K>();
- // UpdateLog is disabled when getMaxUpdateLogSize() = 0;
- private final int maxUpdateLogSize;
-
- private final ImageRetriever<K> imageRetreiver;
-
- private volatile Updateable<K> updateable;
-
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
- protected static final long INIT_SEQ_NUM = -2;
- protected static final int INIT_UPDATE_RETRY_DELAY = 5000;
-
- private static final Logger LOGGER = LoggerFactory.getLogger(UpdateForwarder.class);
- private static final String UPDATABLE_TYPE_NAME = "update_forwarder";
-
- public UpdateForwarder(Configuration conf, Updateable<K> updateable,
- ImageRetriever<K> imageRetreiver, int maxUpdateLogSize, boolean shouldInit) {
- this(conf, updateable, imageRetreiver, maxUpdateLogSize, INIT_UPDATE_RETRY_DELAY, shouldInit);
- }
-
- protected UpdateForwarder(Configuration conf, Updateable<K> updateable, //NOPMD
- ImageRetriever<K> imageRetreiver, int maxUpdateLogSize,
- int initUpdateRetryDelay, boolean shouldInit) {
- this.maxUpdateLogSize = maxUpdateLogSize;
- this.imageRetreiver = imageRetreiver;
- if (shouldInit) {
- spawnInitialUpdater(updateable, initUpdateRetryDelay);
- } else {
- this.updateable = updateable;
- }
- }
-
- public static <K extends Updateable.Update> UpdateForwarder<K> create(Configuration conf,
- Updateable<K> updateable, K update, ImageRetriever<K> imageRetreiver,
- int maxUpdateLogSize, boolean shouldInit) throws SentryPluginException {
- return create(conf, updateable, update, imageRetreiver, maxUpdateLogSize,
- INIT_UPDATE_RETRY_DELAY, shouldInit);
- }
-
- public static <K extends Updateable.Update> UpdateForwarder<K> create(Configuration conf,
- Updateable<K> updateable, K update, ImageRetriever<K> imageRetreiver,
- int maxUpdateLogSize, int initUpdateRetryDelay, boolean shouldInit) throws SentryPluginException {
- return new UpdateForwarder<K>(conf, updateable, imageRetreiver,
- maxUpdateLogSize, initUpdateRetryDelay, shouldInit);
- }
-
- private void spawnInitialUpdater(final Updateable<K> updateable,
- final int initUpdateRetryDelay) {
- K firstFullImage = null;
- try {
- firstFullImage = imageRetreiver.retrieveFullImage(INIT_SEQ_NUM);
- } catch (Exception e) {
- LOGGER.warn("InitialUpdater encountered exception !! ", e);
- firstFullImage = null;
- Thread initUpdater = new Thread() {
- @Override
- public void run() {
- while (UpdateForwarder.this.updateable == null) {
- try {
- Thread.sleep(initUpdateRetryDelay);
- } catch (InterruptedException e) {
- LOGGER.warn("Thread interrupted !! ", e);
- break;
- }
- K fullImage = null;
- try {
- fullImage =
- UpdateForwarder.this.imageRetreiver
- .retrieveFullImage(INIT_SEQ_NUM);
- appendToUpdateLog(fullImage);
- } catch (Exception e) {
- LOGGER.warn("InitialUpdater encountered exception !! ", e);
- }
- if (fullImage != null) {
- UpdateForwarder.this.updateable = updateable.updateFull(fullImage);
- }
- }
- }
- };
- initUpdater.start();
- }
- if (firstFullImage != null) {
- try {
- appendToUpdateLog(firstFullImage);
- } catch (Exception e) {
- LOGGER.warn("failed to update append log: ", e);
- }
- this.updateable = updateable.updateFull(firstFullImage);
- }
- }
- /**
- * Handle notifications from HMS plug-in or upstream Cache
- * @param update
- */
- public void handleUpdateNotification(final K update) throws SentryPluginException {
- // Correct the seqNums on the first update
- if (lastCommittedSeqNum.get() == INIT_SEQ_NUM) {
- K firstUpdate = getUpdateLog().peek();
- long firstSeqNum = update.getSeqNum() - 1;
- if (firstUpdate != null) {
- firstUpdate.setSeqNum(firstSeqNum);
- }
- lastCommittedSeqNum.set(firstSeqNum);
- lastSeenSeqNum.set(firstSeqNum);
- }
- final boolean editNotMissed =
- lastSeenSeqNum.incrementAndGet() == update.getSeqNum();
- if (!editNotMissed) {
- lastSeenSeqNum.set(update.getSeqNum());
- }
- Runnable task = new Runnable() {
- @Override
- public void run() {
- K toUpdate = update;
- if (update.hasFullImage()) {
- updateable = updateable.updateFull(update);
- } else {
- if (editNotMissed) {
- // apply partial preUpdate
- updateable.updatePartial(Collections.singletonList(update), lock);
- } else {
- // Retrieve full update from External Source and
- if (imageRetreiver != null) {
- try {
- toUpdate = imageRetreiver
- .retrieveFullImage(update.getSeqNum());
- } catch (Exception e) {
- LOGGER.warn("failed to retrieve full image: ", e);
- }
- updateable = updateable.updateFull(toUpdate);
- }
- }
- }
- try {
- appendToUpdateLog(toUpdate);
- } catch (Exception e) {
- LOGGER.warn("failed to append to update log", e);
- }
- }
- };
- updateHandler.execute(task);
- }
-
- protected void appendToUpdateLog(K update) throws Exception {
- synchronized (getUpdateLog()) {
- boolean logCompacted = false;
- if (getMaxUpdateLogSize() > 0) {
- if (update.hasFullImage() || getUpdateLog().size() == getMaxUpdateLogSize()) {
- // Essentially a log compaction
- getUpdateLog().clear();
- getUpdateLog().add(update.hasFullImage() ? update
- : createFullImageUpdate(update.getSeqNum()));
- logCompacted = true;
- } else {
- getUpdateLog().add(update);
- }
- }
- lastCommittedSeqNum.set(update.getSeqNum());
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("#### Appending to Update Log ["
- + "type=" + update.getClass() + ", "
- + "lastCommit=" + lastCommittedSeqNum.get() + ", "
- + "lastSeen=" + lastSeenSeqNum.get() + ", "
- + "logCompacted=" + logCompacted + "]");
- }
- }
- }
-
- /**
- * Return all updates from requested seqNum (inclusive)
- * @param seqNum
- * @return
- */
- public List<K> getAllUpdatesFrom(long seqNum) throws Exception {
- List<K> retVal = new LinkedList<K>();
- synchronized (getUpdateLog()) {
- long currSeqNum = lastCommittedSeqNum.get();
- if (LOGGER.isDebugEnabled() && updateable != null) {
- LOGGER.debug("#### GetAllUpdatesFrom ["
- + "type=" + updateable.getClass() + ", "
- + "reqSeqNum=" + seqNum + ", "
- + "lastCommit=" + lastCommittedSeqNum.get() + ", "
- + "lastSeen=" + lastSeenSeqNum.get() + ", "
- + "getMaxUpdateLogSize()=" + getUpdateLog().size() + "]");
- }
- if (getMaxUpdateLogSize() == 0) {
- // no updatelog configured..
- return retVal;
- }
- K head = getUpdateLog().peek();
- if (head == null) {
- return retVal;
- }
- if (seqNum > currSeqNum + 1) {
- // This process has probably restarted since downstream
- // recieved last update
- retVal.addAll(getUpdateLog());
- return retVal;
- }
- if (head.getSeqNum() > seqNum) {
- // Caller has diverged greatly..
- if (head.hasFullImage()) {
- // head is a refresh(full) image
- // Send full image along with partial updates
- for (K u : getUpdateLog()) {
- retVal.add(u);
- }
- } else {
- // Create a full image
- // clear updateLog
- // add fullImage to head of Log
- // NOTE : This should ideally never happen
- K fullImage = createFullImageUpdate(currSeqNum);
- getUpdateLog().clear();
- getUpdateLog().add(fullImage);
- retVal.add(fullImage);
- }
- } else {
- // increment iterator to requested seqNum
- Iterator<K> iter = getUpdateLog().iterator();
- while (iter.hasNext()) {
- K elem = iter.next();
- if (elem.getSeqNum() >= seqNum) {
- retVal.add(elem);
- }
- }
- }
- }
- return retVal;
- }
-
- public boolean areAllUpdatesCommited() {
- return lastCommittedSeqNum.get() == lastSeenSeqNum.get();
- }
-
- public long getLastCommitted() {
- return lastCommittedSeqNum.get();
- }
-
- public long getLastSeen() {
- return lastSeenSeqNum.get();
- }
-
- @Override
- public Updateable<K> updateFull(K update) {
- return (updateable != null) ? updateable.updateFull(update) : null;
- }
-
- @Override
- public void updatePartial(Iterable<K> updates, ReadWriteLock lock) {
- if (updateable != null) {
- updateable.updatePartial(updates, lock);
- }
- }
-
- @Override
- public long getLastUpdatedSeqNum() {
- return (updateable != null) ? updateable.getLastUpdatedSeqNum() : INIT_SEQ_NUM;
- }
-
- @Override
- public K createFullImageUpdate(long currSeqNum) throws Exception {
- return (updateable != null) ? updateable.createFullImageUpdate(currSeqNum) : null;
- }
-
- @Override
- public String getUpdateableTypeName() {
- // TODO Auto-generated method stub
- return UPDATABLE_TYPE_NAME;
- }
-
- protected LinkedList<K> getUpdateLog() {
- return updateLog;
- }
-
- protected int getMaxUpdateLogSize() {
- return maxUpdateLogSize;
- }
-
- @Override
- public void close() throws IOException {
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
deleted file mode 100644
index 03c67d6..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-
-public class UpdateablePermissions implements Updateable<PermissionsUpdate>{
- private static final String UPDATABLE_TYPE_NAME = "perm_update";
-
- private AtomicLong seqNum = new AtomicLong();
- private final ImageRetriever<PermissionsUpdate> imageRetreiver;
-
- public UpdateablePermissions(
- ImageRetriever<PermissionsUpdate> imageRetreiver) {
- this.imageRetreiver = imageRetreiver;
- }
-
- @Override
- public PermissionsUpdate createFullImageUpdate(long currSeqNum) throws Exception {
- return imageRetreiver.retrieveFullImage(currSeqNum);
- }
-
- @Override
- public long getLastUpdatedSeqNum() {
- return seqNum.get();
- }
-
- @Override
- public void updatePartial(Iterable<PermissionsUpdate> update,
- ReadWriteLock lock) {
- for (PermissionsUpdate permsUpdate : update) {
- seqNum.set(permsUpdate.getSeqNum());
- }
- }
-
- @Override
- public Updateable<PermissionsUpdate> updateFull(PermissionsUpdate update) {
- UpdateablePermissions other = new UpdateablePermissions(imageRetreiver);
- other.seqNum.set(update.getSeqNum());
- return other;
- }
-
- @Override
- public String getUpdateableTypeName() {
- return UPDATABLE_TYPE_NAME;
- }
-}