You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ha...@apache.org on 2017/01/27 23:20:53 UTC
[6/7] sentry git commit: SENTRY-1613
SENTRY-1613
Change-Id: I6ae45e29ab6d6987e07ef2fa54037732d47f4b8d
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/18be1d5e
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/18be1d5e
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/18be1d5e
Branch: refs/heads/sentry-ha-redesign-1
Commit: 18be1d5e8c0efcb3bbd6d21cd457f20f2075e301
Parents: 813d10e
Author: hahao <ha...@cloudera.com>
Authored: Thu Jan 26 18:11:54 2017 -0800
Committer: hahao <ha...@cloudera.com>
Committed: Fri Jan 27 14:58:11 2017 -0800
----------------------------------------------------------------------
.../org/apache/sentry/hdfs/ImageRetriever.java | 2 +-
.../apache/sentry/hdfs/ThriftSerializer.java | 10 +-
.../org/apache/sentry/hdfs/UpdateRetriever.java | 38 ++
.../apache/sentry/hdfs/DBUpdateForwarder.java | 67 ++++
.../apache/sentry/hdfs/PathImageRetriever.java | 15 +-
.../apache/sentry/hdfs/PathUpdateRetriever.java | 58 +++
.../apache/sentry/hdfs/PermImageRetriever.java | 6 +-
.../apache/sentry/hdfs/PermUpdateRetriever.java | 58 +++
.../sentry/hdfs/SentryHDFSServiceProcessor.java | 29 +-
.../org/apache/sentry/hdfs/SentryPlugin.java | 77 +---
.../org/apache/sentry/hdfs/UpdateForwarder.java | 335 -----------------
.../sentry/hdfs/UpdateablePermissions.java | 63 ----
.../apache/sentry/hdfs/TestUpdateForwarder.java | 359 -------------------
.../db/service/persistent/SentryStore.java | 102 ++++++
.../sentry/service/thrift/SentryService.java | 3 +-
.../tests/e2e/hdfs/TestHDFSIntegration.java | 6 +-
16 files changed, 362 insertions(+), 866 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java
index 1147c07..5080559 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java
@@ -29,6 +29,6 @@ public interface ImageRetriever<K> {
* @return a full snapshot of type K
* @throws Exception
*/
- K retrieveFullImage(long seqNum) throws Exception;
+ K retrieveFullImage() throws Exception;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
index 69aa098..d7b9923 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
@@ -25,12 +25,12 @@ import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TSimpleJSONProtocol;
+import org.apache.thrift.protocol.TJSONProtocol;
public class ThriftSerializer {
- final static private TSimpleJSONProtocol.Factory tSimpleJSONProtocol =
- new TSimpleJSONProtocol.Factory();
+ final static private TJSONProtocol.Factory tJSONProtocol =
+ new TJSONProtocol.Factory();
// Use default max thrift message size here.
// TODO: Figure out a way to make maxMessageSize configurable, eg. create a serializer singleton at startup by
@@ -67,13 +67,13 @@ public class ThriftSerializer {
public static String serializeToJSON(TBase base) throws TException {
// Initiate a new TSerializer each time for thread safety.
- TSerializer tSerializer = new TSerializer(tSimpleJSONProtocol);
+ TSerializer tSerializer = new TSerializer(tJSONProtocol);
return tSerializer.toString(base);
}
public static void deserializeFromJSON(TBase base, String dataInJson) throws TException {
// Initiate a new TDeserializer each time for thread safety.
- TDeserializer tDeserializer = new TDeserializer(tSimpleJSONProtocol);
+ TDeserializer tDeserializer = new TDeserializer(tJSONProtocol);
tDeserializer.fromString(base, dataInJson);
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateRetriever.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateRetriever.java
new file mode 100644
index 0000000..ef25cfa
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateRetriever.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.hdfs;
+
+import java.util.List;
+
+/**
+ * Interface class for generating/retrieving a full image.
+ */
+public interface UpdateRetriever<K> {
+
+ /**
+ * Retrieve a full image of type k.
+ *
+ * @param seqNum the given seq number
+ * @return a full snapshot of type K
+ * @throws Exception
+ */
+ List<K> retrievePartialUpdate(long seqNum) throws Exception;
+
+ boolean isPartialUpdateAvailable(long seqNum) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java
new file mode 100644
index 0000000..ca70a88
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DBUpdateForwarder is thread safe class
+ */
+public class DBUpdateForwarder<K extends Updateable.Update> {
+
+ private final ImageRetriever<K> imageRetreiver;
+ private final UpdateRetriever<K> updateRetriever;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DBUpdateForwarder.class);
+ private static final String UPDATABLE_TYPE_NAME = "update_forwarder";
+
+ protected DBUpdateForwarder(final ImageRetriever<K> imageRetreiver,
+ final UpdateRetriever<K> updateRetriever) {
+ this.imageRetreiver = imageRetreiver;
+ this.updateRetriever = updateRetriever;
+ }
+
+ /**
+ * Return all updates from requested seqNum (inclusive)
+ *
+ * @param seqNum
+ * @return the list of updates
+ */
+ public List<K> getAllUpdatesFrom(long seqNum) throws Exception {
+ List<K> retVal = new LinkedList<>();
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("#### GetAllUpdatesFrom ["
+ + "reqSeqNum=" + seqNum + " ]");
+ }
+
+ if (seqNum == SentryStore.INIT_CHANGE_ID ||
+ !updateRetriever.isPartialUpdateAvailable(seqNum)) {
+ retVal.add(imageRetreiver.retrieveFullImage());
+ } else {
+ retVal.addAll(updateRetriever.retrievePartialUpdate(seqNum));
+ }
+
+ return retVal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
index 6cfd6d5..2edfcc1 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
@@ -18,13 +18,14 @@
package org.apache.sentry.hdfs;
import com.codahale.metrics.Timer;
+import com.google.common.collect.Lists;
import org.apache.sentry.hdfs.service.thrift.TPathChanges;
import org.apache.sentry.provider.db.service.persistent.PathsImage;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
@@ -35,7 +36,7 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
}
@Override
- public PathsUpdate retrieveFullImage(long seqNum) throws Exception {
+ public PathsUpdate retrieveFullImage() throws Exception {
try (final Timer.Context timerContext =
SentryHdfsMetricsUtil.getRetrievePathFullImageTimer.time()) {
@@ -48,7 +49,7 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
// Generate a corresponding PathsUpdate.
// TODO: use curSeqNum from DB instead of seqNum when doing SENTRY-1567
- PathsUpdate pathsUpdate = new PathsUpdate(seqNum, true);
+ PathsUpdate pathsUpdate = new PathsUpdate(curSeqNum, true);
for (Map.Entry<String, Set<String>> pathEnt : pathImage.entrySet()) {
TPathChanges pathChange = pathsUpdate.newPathChange(pathEnt.getKey());
@@ -59,7 +60,13 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
SentryHdfsMetricsUtil.getPathChangesHistogram.update(pathsUpdate
.getPathChanges().size());
+
+ UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(
+ new String[]{"/"});
+ authzPaths.updatePartial(Lists.newArrayList(pathsUpdate),
+ new ReentrantReadWriteLock());
+ pathsUpdate.toThrift().setPathsDump(authzPaths.getPathsDump().createPathsDump());
return pathsUpdate;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathUpdateRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathUpdateRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathUpdateRetriever.java
new file mode 100644
index 0000000..469ffe7
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathUpdateRetriever.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+
+import com.google.common.collect.Lists;
+import org.apache.sentry.provider.db.service.model.MSentryPathChange;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+
+import java.util.List;
+
+public class PathUpdateRetriever implements UpdateRetriever<PathsUpdate> {
+
+ private final SentryStore sentryStore;
+
+ public PathUpdateRetriever(SentryStore sentryStore) {
+ this.sentryStore = sentryStore;
+ }
+
+ @Override
+ public List<PathsUpdate> retrievePartialUpdate(long seqNum) throws Exception {
+ List<MSentryPathChange> mSentryPathChanges =
+ sentryStore.getMSentryPathChanges(seqNum);
+ List<PathsUpdate> updates = Lists.newArrayList();
+ for (MSentryPathChange mSentryPathChange : mSentryPathChanges) {
+ // get changeID from stored MSentryPathChange
+ long changeID = mSentryPathChange.getChangeID();
+ // Create a corresponding PathsUpdate and deserialize the stored
+ // JSON string to TPathsUpdate. Then set the corresponding
+ // changeID.
+ PathsUpdate pathsUpdate = new PathsUpdate();
+ pathsUpdate.JSONDeserialize(mSentryPathChange.getPathChange());
+ pathsUpdate.setSeqNum(changeID);
+ updates.add(pathsUpdate);
+ }
+ return updates;
+ }
+
+ @Override
+ public boolean isPartialUpdateAvailable(long seqNum) throws Exception {
+ return sentryStore.findMSentryPathChangeByID(seqNum);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java
index 56985c2..323d090 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java
@@ -37,7 +37,7 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> {
}
@Override
- public PermissionsUpdate retrieveFullImage(long seqNum) throws Exception {
+ public PermissionsUpdate retrieveFullImage() throws Exception {
try(Timer.Context timerContext =
SentryHdfsMetricsUtil.getRetrievePermFullImageTimer.time()) {
@@ -72,7 +72,7 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> {
PermissionsUpdate permissionsUpdate = new PermissionsUpdate(tPermUpdate);
// TODO: use curSeqNum from DB instead of seqNum when doing SENTRY-1567
- permissionsUpdate.setSeqNum(seqNum);
+ permissionsUpdate.setSeqNum(curSeqNum);
SentryHdfsMetricsUtil.getPrivilegeChangesHistogram.update(
tPermUpdate.getPrivilegeChangesSize());
SentryHdfsMetricsUtil.getRoleChangesHistogram.update(
@@ -81,4 +81,4 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> {
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermUpdateRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermUpdateRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermUpdateRetriever.java
new file mode 100644
index 0000000..e7e0e77
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermUpdateRetriever.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+
+import com.google.common.collect.Lists;
+import org.apache.sentry.provider.db.service.model.MSentryPermChange;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+
+import java.util.List;
+
+public class PermUpdateRetriever implements UpdateRetriever<PermissionsUpdate> {
+
+ private final SentryStore sentryStore;
+
+ public PermUpdateRetriever(SentryStore sentryStore) {
+ this.sentryStore = sentryStore;
+ }
+
+ @Override
+ public List<PermissionsUpdate> retrievePartialUpdate(long seqNum) throws Exception {
+ List<MSentryPermChange> mSentryPermChanges =
+ sentryStore.getMSentryPermChanges(seqNum);
+ List<PermissionsUpdate> updates = Lists.newArrayList();
+ for (MSentryPermChange mSentryPermChange : mSentryPermChanges) {
+ // get changeID from stored MSentryPermChange
+ long changeID = mSentryPermChange.getChangeID();
+ // Create a corresponding PermissionsUpdate and deserialize the stored
+ // JSON string to TPermissionsUpdate. Then set the corresponding
+ // changeID.
+ PermissionsUpdate permsUpdate = new PermissionsUpdate();
+ permsUpdate.JSONDeserialize(mSentryPermChange.getPermChange());
+ permsUpdate.setSeqNum(changeID);
+ updates.add(permsUpdate);
+ }
+ return updates;
+ }
+
+ @Override
+ public boolean isPartialUpdateAvailable(long seqNum) throws Exception {
+ return sentryStore.findMSentryPermChangeByID(seqNum);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
index e4f3f58..5b146b7 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
@@ -42,10 +42,6 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface {
retVal.setAuthzPathUpdate(new LinkedList<TPathsUpdate>());
retVal.setAuthzPermUpdate(new LinkedList<TPermissionsUpdate>());
if (SentryPlugin.instance != null) {
- if (SentryPlugin.instance.isOutOfSync()) {
- throw new TException(
- "This Sentry server is not communicating with other nodes and out of sync ");
- }
final Timer.Context timerContext =
SentryHdfsMetricsUtil.getAllAuthzUpdatesTimer.time();
try {
@@ -99,33 +95,12 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface {
@Override
public void handle_hms_notification(TPathsUpdate update) throws TException {
- final Timer.Context timerContext =
- SentryHdfsMetricsUtil.getHandleHmsNotificationTimer.time();
- try {
- PathsUpdate hmsUpdate = new PathsUpdate(update);
- if (SentryPlugin.instance != null) {
- SentryPlugin.instance.handlePathUpdateNotification(hmsUpdate);
- LOGGER.debug("Authz Paths update [" + hmsUpdate.getSeqNum() + "]..");
- } else {
- LOGGER.error("SentryPlugin not initialized yet !!");
- }
- } catch (Exception e) {
- LOGGER.error("Error handling notification from HMS", e);
- SentryHdfsMetricsUtil.getFailedHandleHmsNotificationCounter.inc();
- throw new TException(e);
- } finally {
- timerContext.stop();
- SentryHdfsMetricsUtil.getHandleHmsPathChangeHistogram.update(
- update.getPathChangesSize());
- if (update.isHasFullImage()) {
- SentryHdfsMetricsUtil.getHandleHmsHasFullImageCounter.inc();
- }
- }
+ throw new TException("handle_hms_notification is no longer supported");
}
@Override
public long check_hms_seq_num(long pathSeqNum) throws TException {
- return SentryPlugin.instance.getLastSeenHMSPathSeqNum();
+ throw new TException("check_hms_seq_num is no longer supported");
}
/**
http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
index b99013e..f5ee179 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
@@ -108,18 +108,16 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
private static final Logger LOGGER = LoggerFactory.getLogger(SentryPlugin.class);
- private final AtomicBoolean fullUpdateHMSWait = new AtomicBoolean(false);
private final AtomicBoolean fullUpdateHMS = new AtomicBoolean(false);
private final AtomicBoolean fullUpdateNN = new AtomicBoolean(false);
public static volatile SentryPlugin instance;
- private UpdateForwarder<PathsUpdate> pathsUpdater;
- private UpdateForwarder<PermissionsUpdate> permsUpdater;
+ private DBUpdateForwarder<PathsUpdate> pathsUpdater;
+ private DBUpdateForwarder<PermissionsUpdate> permsUpdater;
+
// TODO: Each perm change sequence number should be generated during persistence at sentry store.
private final AtomicLong permSeqNum = new AtomicLong(5);
- private PermImageRetriever permImageRetriever;
- private boolean outOfSync = false;
/*
* This number is smaller than starting sequence numbers used by NN and HMS
* so in both cases its effect is to create appearance of out-of-sync
@@ -129,33 +127,15 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
*/
private static final long NO_LAST_SEEN_HMS_PATH_SEQ_NUM = 0L;
- /*
- * Call from HMS to get the last known update sequence #.
- */
- long getLastSeenHMSPathSeqNum() {
- if (!fullUpdateHMS.getAndSet(false)) {
- return pathsUpdater.getLastSeen();
- } else {
- LOGGER.info("SIGNAL HANDLING: asking for full update from HMS");
- return NO_LAST_SEEN_HMS_PATH_SEQ_NUM;
- }
- }
-
@Override
public void initialize(Configuration conf, SentryStore sentryStore) throws SentryPluginException {
- final String[] pathPrefixes = conf
- .getStrings(ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES,
- ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES_DEFAULT);
- final int initUpdateRetryDelayMs =
- conf.getInt(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
- ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT);
- permImageRetriever = new PermImageRetriever(sentryStore);
-
- pathsUpdater = UpdateForwarder.create(conf, new UpdateableAuthzPaths(
- pathPrefixes), new PathsUpdate(0, false), null, 100, initUpdateRetryDelayMs, false);
- permsUpdater = UpdateForwarder.create(conf,
- new UpdateablePermissions(permImageRetriever), new PermissionsUpdate(0, false),
- permImageRetriever, 100, initUpdateRetryDelayMs, true);
+ PermImageRetriever permImageRetriever = new PermImageRetriever(sentryStore);
+ PathImageRetriever pathImageRetriever = new PathImageRetriever(sentryStore);
+ PermUpdateRetriever permUpdateRetriever = new PermUpdateRetriever(sentryStore);
+ PathUpdateRetriever pathUpdateRetriever = new PathUpdateRetriever(sentryStore);
+ pathsUpdater = new DBUpdateForwarder<>(pathImageRetriever, pathUpdateRetriever);
+ permsUpdater = new DBUpdateForwarder<>(permImageRetriever, permUpdateRetriever);
+
LOGGER.info("Sentry HDFS plugin initialized !!");
instance = this;
@@ -181,7 +161,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
if (!fullUpdateNN.get()) {
// Most common case - Sentry is NOT handling a full update.
return pathsUpdater.getAllUpdatesFrom(pathSeqNum);
- } else if (!fullUpdateHMSWait.get()) {
+ } else {
/*
* Sentry is in the middle of signal-triggered full update.
* It already got a full update from HMS
@@ -215,10 +195,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
LOGGER.warn("SIGNAL HANDLING: returned NULL instead of full update to NameNode (???)");
}
return updates;
- } else {
- // Sentry is handling a full update, but not yet received full update from HMS
- LOGGER.warn("SIGNAL HANDLING: sending partial update to NameNode: still waiting for full update from HMS");
- return pathsUpdater.getAllUpdatesFrom(pathSeqNum);
}
}
@@ -226,21 +202,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
return permsUpdater.getAllUpdatesFrom(permSeqNum);
}
- /*
- * Handle partial (most common) or full update from HMS
- */
- public void handlePathUpdateNotification(PathsUpdate update)
- throws SentryPluginException {
- pathsUpdater.handleUpdateNotification(update);
- if (!update.hasFullImage()) { // most common case of partial update
- LOGGER.debug("Recieved Authz Path update [" + update.getSeqNum() + "]..");
- } else { // rare case of full update
- LOGGER.warn("Recieved Authz Path FULL update [" + update.getSeqNum() + "]..");
- // indicate that we're ready to send full update to NameNode
- fullUpdateHMSWait.set(false);
- }
- }
-
@Override
public DeltaTransactionBlock onAlterSentryRoleAddGroups(
TAlterSentryRoleAddGroupsRequest request) throws SentryPluginException {
@@ -250,7 +211,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
rUpdate.addToAddGroups(group.getGroupName());
}
- permsUpdater.handleUpdateNotification(update);
LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "]..");
return new DeltaTransactionBlock(update);
}
@@ -265,7 +225,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
rUpdate.addToDelGroups(group.getGroupName());
}
- permsUpdater.handleUpdateNotification(update);
LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "]..");
return new DeltaTransactionBlock(update);
}
@@ -301,7 +260,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
update.addPrivilegeUpdate(authzObj).putToAddPrivileges(
roleName, privilege.getAction().toUpperCase());
- permsUpdater.handleUpdateNotification(update);
LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + "]..");
return update;
}
@@ -316,7 +274,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
privUpdate.putToAddPrivileges(newAuthz, newAuthz);
privUpdate.putToDelPrivileges(oldAuthz, oldAuthz);
- permsUpdater.handleUpdateNotification(update);
LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + newAuthz + ", " + oldAuthz + "]..");
return new DeltaTransactionBlock(update);
}
@@ -342,14 +299,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
}
}
- public boolean isOutOfSync() {
- return outOfSync;
- }
-
- public void setOutOfSync(boolean outOfSync) {
- this.outOfSync = outOfSync;
- }
-
private PermissionsUpdate onAlterSentryRoleRevokePrivilegeCore(String roleName, TSentryPrivilege privilege)
throws SentryPluginException {
String authzObj = getAuthzObj(privilege);
@@ -361,7 +310,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
roleName, privilege.getAction().toUpperCase());
- permsUpdater.handleUpdateNotification(update);
LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + authzObj + "]..");
return update;
}
@@ -374,7 +322,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
request.getRoleName(), PermissionsUpdate.ALL_AUTHZ_OBJ);
update.addRoleUpdate(request.getRoleName()).addToDelGroups(PermissionsUpdate.ALL_GROUPS);
- permsUpdater.handleUpdateNotification(update);
LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "]..");
return new DeltaTransactionBlock(update);
}
@@ -387,7 +334,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES);
- permsUpdater.handleUpdateNotification(update);
LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + authzObj + "]..");
return new DeltaTransactionBlock(update);
}
@@ -396,7 +342,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
public void onSignal(final String sigName) {
LOGGER.info("SIGNAL HANDLING: Received signal " + sigName + ", triggering full update");
fullUpdateHMS.set(true);
- fullUpdateHMSWait.set(true);
fullUpdateNN.set(true);
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
deleted file mode 100644
index 4bfc473..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class UpdateForwarder<K extends Updateable.Update> implements
- Updateable<K>, Closeable {
-
- private final AtomicLong lastSeenSeqNum = new AtomicLong(0);
- protected final AtomicLong lastCommittedSeqNum = new AtomicLong(0);
- // Updates should be handled in order
- private final Executor updateHandler = Executors.newSingleThreadExecutor();
-
- // Update log is used when propagate updates to a downstream cache.
- // The preUpdate log stores all commits that were applied to this cache.
- // When the update log is filled to capacity (getMaxUpdateLogSize()), all
- // entries are cleared and a compact image if the state of the cache is
- // appended to the log.
- // The first entry in an update log (consequently the first preUpdate a
- // downstream cache sees) will be a full image. All subsequent entries are
- // partial edits
- protected final LinkedList<K> updateLog = new LinkedList<K>();
- // UpdateLog is disabled when getMaxUpdateLogSize() = 0;
- private final int maxUpdateLogSize;
-
- private final ImageRetriever<K> imageRetreiver;
-
- private volatile Updateable<K> updateable;
-
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
- protected static final long INIT_SEQ_NUM = -2;
- protected static final int INIT_UPDATE_RETRY_DELAY = 5000;
-
- private static final Logger LOGGER = LoggerFactory.getLogger(UpdateForwarder.class);
- private static final String UPDATABLE_TYPE_NAME = "update_forwarder";
-
- public UpdateForwarder(Configuration conf, Updateable<K> updateable,
- ImageRetriever<K> imageRetreiver, int maxUpdateLogSize, boolean shouldInit) {
- this(conf, updateable, imageRetreiver, maxUpdateLogSize, INIT_UPDATE_RETRY_DELAY, shouldInit);
- }
-
- protected UpdateForwarder(Configuration conf, Updateable<K> updateable, //NOPMD
- ImageRetriever<K> imageRetreiver, int maxUpdateLogSize,
- int initUpdateRetryDelay, boolean shouldInit) {
- this.maxUpdateLogSize = maxUpdateLogSize;
- this.imageRetreiver = imageRetreiver;
- if (shouldInit) {
- spawnInitialUpdater(updateable, initUpdateRetryDelay);
- } else {
- this.updateable = updateable;
- }
- }
-
- public static <K extends Updateable.Update> UpdateForwarder<K> create(Configuration conf,
- Updateable<K> updateable, K update, ImageRetriever<K> imageRetreiver,
- int maxUpdateLogSize, boolean shouldInit) throws SentryPluginException {
- return create(conf, updateable, update, imageRetreiver, maxUpdateLogSize,
- INIT_UPDATE_RETRY_DELAY, shouldInit);
- }
-
- public static <K extends Updateable.Update> UpdateForwarder<K> create(Configuration conf,
- Updateable<K> updateable, K update, ImageRetriever<K> imageRetreiver,
- int maxUpdateLogSize, int initUpdateRetryDelay, boolean shouldInit) throws SentryPluginException {
- return new UpdateForwarder<K>(conf, updateable, imageRetreiver,
- maxUpdateLogSize, initUpdateRetryDelay, shouldInit);
- }
-
- private void spawnInitialUpdater(final Updateable<K> updateable,
- final int initUpdateRetryDelay) {
- K firstFullImage = null;
- try {
- firstFullImage = imageRetreiver.retrieveFullImage(INIT_SEQ_NUM);
- } catch (Exception e) {
- LOGGER.warn("InitialUpdater encountered exception !! ", e);
- firstFullImage = null;
- Thread initUpdater = new Thread() {
- @Override
- public void run() {
- while (UpdateForwarder.this.updateable == null) {
- try {
- Thread.sleep(initUpdateRetryDelay);
- } catch (InterruptedException e) {
- LOGGER.warn("Thread interrupted !! ", e);
- break;
- }
- K fullImage = null;
- try {
- fullImage =
- UpdateForwarder.this.imageRetreiver
- .retrieveFullImage(INIT_SEQ_NUM);
- appendToUpdateLog(fullImage);
- } catch (Exception e) {
- LOGGER.warn("InitialUpdater encountered exception !! ", e);
- }
- if (fullImage != null) {
- UpdateForwarder.this.updateable = updateable.updateFull(fullImage);
- }
- }
- }
- };
- initUpdater.start();
- }
- if (firstFullImage != null) {
- try {
- appendToUpdateLog(firstFullImage);
- } catch (Exception e) {
- LOGGER.warn("failed to update append log: ", e);
- }
- this.updateable = updateable.updateFull(firstFullImage);
- }
- }
- /**
- * Handle notifications from HMS plug-in or upstream Cache
- * @param update
- */
- public void handleUpdateNotification(final K update) throws SentryPluginException {
- // Correct the seqNums on the first update
- if (lastCommittedSeqNum.get() == INIT_SEQ_NUM) {
- K firstUpdate = getUpdateLog().peek();
- long firstSeqNum = update.getSeqNum() - 1;
- if (firstUpdate != null) {
- firstUpdate.setSeqNum(firstSeqNum);
- }
- lastCommittedSeqNum.set(firstSeqNum);
- lastSeenSeqNum.set(firstSeqNum);
- }
- final boolean editNotMissed =
- lastSeenSeqNum.incrementAndGet() == update.getSeqNum();
- if (!editNotMissed) {
- lastSeenSeqNum.set(update.getSeqNum());
- }
- Runnable task = new Runnable() {
- @Override
- public void run() {
- K toUpdate = update;
- if (update.hasFullImage()) {
- updateable = updateable.updateFull(update);
- } else {
- if (editNotMissed) {
- // apply partial preUpdate
- updateable.updatePartial(Collections.singletonList(update), lock);
- } else {
- // Retrieve full update from External Source and
- if (imageRetreiver != null) {
- try {
- toUpdate = imageRetreiver
- .retrieveFullImage(update.getSeqNum());
- } catch (Exception e) {
- LOGGER.warn("failed to retrieve full image: ", e);
- }
- updateable = updateable.updateFull(toUpdate);
- }
- }
- }
- try {
- appendToUpdateLog(toUpdate);
- } catch (Exception e) {
- LOGGER.warn("failed to append to update log", e);
- }
- }
- };
- updateHandler.execute(task);
- }
-
- protected void appendToUpdateLog(K update) throws Exception {
- synchronized (getUpdateLog()) {
- boolean logCompacted = false;
- if (getMaxUpdateLogSize() > 0) {
- if (update.hasFullImage() || getUpdateLog().size() == getMaxUpdateLogSize()) {
- // Essentially a log compaction
- getUpdateLog().clear();
- getUpdateLog().add(update.hasFullImage() ? update
- : createFullImageUpdate(update.getSeqNum()));
- logCompacted = true;
- } else {
- getUpdateLog().add(update);
- }
- }
- lastCommittedSeqNum.set(update.getSeqNum());
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("#### Appending to Update Log ["
- + "type=" + update.getClass() + ", "
- + "lastCommit=" + lastCommittedSeqNum.get() + ", "
- + "lastSeen=" + lastSeenSeqNum.get() + ", "
- + "logCompacted=" + logCompacted + "]");
- }
- }
- }
-
- /**
- * Return all updates from requested seqNum (inclusive)
- * @param seqNum
- * @return
- */
- public List<K> getAllUpdatesFrom(long seqNum) throws Exception {
- List<K> retVal = new LinkedList<K>();
- synchronized (getUpdateLog()) {
- long currSeqNum = lastCommittedSeqNum.get();
- if (LOGGER.isDebugEnabled() && updateable != null) {
- LOGGER.debug("#### GetAllUpdatesFrom ["
- + "type=" + updateable.getClass() + ", "
- + "reqSeqNum=" + seqNum + ", "
- + "lastCommit=" + lastCommittedSeqNum.get() + ", "
- + "lastSeen=" + lastSeenSeqNum.get() + ", "
- + "getMaxUpdateLogSize()=" + getUpdateLog().size() + "]");
- }
- if (getMaxUpdateLogSize() == 0) {
- // no updatelog configured..
- return retVal;
- }
- K head = getUpdateLog().peek();
- if (head == null) {
- return retVal;
- }
- if (seqNum > currSeqNum + 1) {
- // This process has probably restarted since downstream
- // recieved last update
- retVal.addAll(getUpdateLog());
- return retVal;
- }
- if (head.getSeqNum() > seqNum) {
- // Caller has diverged greatly..
- if (head.hasFullImage()) {
- // head is a refresh(full) image
- // Send full image along with partial updates
- for (K u : getUpdateLog()) {
- retVal.add(u);
- }
- } else {
- // Create a full image
- // clear updateLog
- // add fullImage to head of Log
- // NOTE : This should ideally never happen
- K fullImage = createFullImageUpdate(currSeqNum);
- getUpdateLog().clear();
- getUpdateLog().add(fullImage);
- retVal.add(fullImage);
- }
- } else {
- // increment iterator to requested seqNum
- Iterator<K> iter = getUpdateLog().iterator();
- while (iter.hasNext()) {
- K elem = iter.next();
- if (elem.getSeqNum() >= seqNum) {
- retVal.add(elem);
- }
- }
- }
- }
- return retVal;
- }
-
- public boolean areAllUpdatesCommited() {
- return lastCommittedSeqNum.get() == lastSeenSeqNum.get();
- }
-
- public long getLastCommitted() {
- return lastCommittedSeqNum.get();
- }
-
- public long getLastSeen() {
- return lastSeenSeqNum.get();
- }
-
- @Override
- public Updateable<K> updateFull(K update) {
- return (updateable != null) ? updateable.updateFull(update) : null;
- }
-
- @Override
- public void updatePartial(Iterable<K> updates, ReadWriteLock lock) {
- if (updateable != null) {
- updateable.updatePartial(updates, lock);
- }
- }
-
- @Override
- public long getLastUpdatedSeqNum() {
- return (updateable != null) ? updateable.getLastUpdatedSeqNum() : INIT_SEQ_NUM;
- }
-
- @Override
- public K createFullImageUpdate(long currSeqNum) throws Exception {
- return (updateable != null) ? updateable.createFullImageUpdate(currSeqNum) : null;
- }
-
- @Override
- public String getUpdateableTypeName() {
- // TODO Auto-generated method stub
- return UPDATABLE_TYPE_NAME;
- }
-
- protected LinkedList<K> getUpdateLog() {
- return updateLog;
- }
-
- protected int getMaxUpdateLogSize() {
- return maxUpdateLogSize;
- }
-
- @Override
- public void close() throws IOException {
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
deleted file mode 100644
index 03c67d6..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-
-public class UpdateablePermissions implements Updateable<PermissionsUpdate>{
- private static final String UPDATABLE_TYPE_NAME = "perm_update";
-
- private AtomicLong seqNum = new AtomicLong();
- private final ImageRetriever<PermissionsUpdate> imageRetreiver;
-
- public UpdateablePermissions(
- ImageRetriever<PermissionsUpdate> imageRetreiver) {
- this.imageRetreiver = imageRetreiver;
- }
-
- @Override
- public PermissionsUpdate createFullImageUpdate(long currSeqNum) throws Exception {
- return imageRetreiver.retrieveFullImage(currSeqNum);
- }
-
- @Override
- public long getLastUpdatedSeqNum() {
- return seqNum.get();
- }
-
- @Override
- public void updatePartial(Iterable<PermissionsUpdate> update,
- ReadWriteLock lock) {
- for (PermissionsUpdate permsUpdate : update) {
- seqNum.set(permsUpdate.getSeqNum());
- }
- }
-
- @Override
- public Updateable<PermissionsUpdate> updateFull(PermissionsUpdate update) {
- UpdateablePermissions other = new UpdateablePermissions(imageRetreiver);
- other.seqNum.set(update.getSeqNum());
- return other;
- }
-
- @Override
- public String getUpdateableTypeName() {
- return UPDATABLE_TYPE_NAME;
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
deleted file mode 100644
index d12b134..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.hdfs;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.locks.ReadWriteLock;
-
-import org.apache.thrift.TException;
-import org.junit.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.hdfs.Updateable.Update;
-import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
-import org.junit.After;
-import org.junit.Assume;
-import org.junit.Test;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-
-public class TestUpdateForwarder {
-
- public static class DummyUpdate implements Update {
- private long seqNum = 0;
- private boolean hasFullUpdate = false;
- private String state;
- public DummyUpdate() {
- this(0, false);
- }
- public DummyUpdate(long seqNum, boolean hasFullUpdate) {
- this.seqNum = seqNum;
- this.hasFullUpdate = hasFullUpdate;
- }
- public String getState() {
- return state;
- }
- public DummyUpdate setState(String stuff) {
- this.state = stuff;
- return this;
- }
- @Override
- public boolean hasFullImage() {
- return hasFullUpdate;
- }
- @Override
- public long getSeqNum() {
- return seqNum;
- }
- @Override
- public void setSeqNum(long seqNum) {
- this.seqNum = seqNum;
- }
- @Override
- public byte[] serialize() throws IOException {
- return state.getBytes();
- }
-
- @Override
- public void deserialize(byte[] data) throws IOException {
- state = new String(data);
- }
-
- @Override
- public String JSONSerialize() throws TException {
- return state;
- }
-
- @Override
- public void JSONDeserialize(String update) throws TException {
- state = new String(update);
- }
- }
-
- static class DummyUpdatable implements Updateable<DummyUpdate> {
-
- private List<String> state = new LinkedList<String>();
- private long lastUpdatedSeqNum = 0;
-
- @Override
- public void updatePartial(Iterable<DummyUpdate> update, ReadWriteLock lock) {
- for (DummyUpdate u : update) {
- state.add(u.getState());
- lastUpdatedSeqNum = u.seqNum;
- }
- }
-
- @Override
- public Updateable<DummyUpdate> updateFull(DummyUpdate update) {
- DummyUpdatable retVal = new DummyUpdatable();
- retVal.lastUpdatedSeqNum = update.seqNum;
- retVal.state = Lists.newArrayList(update.state.split(","));
- return retVal;
- }
-
- @Override
- public long getLastUpdatedSeqNum() {
- return lastUpdatedSeqNum;
- }
-
- @Override
- public DummyUpdate createFullImageUpdate(long currSeqNum) {
- DummyUpdate retVal = new DummyUpdate(currSeqNum, true);
- retVal.state = Joiner.on(",").join(state);
- return retVal;
- }
-
- public String getState() {
- return Joiner.on(",").join(state);
- }
-
- @Override
- public String getUpdateableTypeName() {
- // TODO Auto-generated method stub
- return "DummyUpdator";
- }
- }
-
- static class DummyImageRetreiver implements ImageRetriever<DummyUpdate> {
-
- private String state;
- public void setState(String state) {
- this.state = state;
- }
- @Override
- public DummyUpdate retrieveFullImage(long currSeqNum) {
- DummyUpdate retVal = new DummyUpdate(currSeqNum, true);
- retVal.state = state;
- return retVal;
- }
- }
-
- protected Configuration testConf = new Configuration();
- protected UpdateForwarder<DummyUpdate> updateForwarder;
-
- @After
- public void cleanup() throws Exception {
- if (updateForwarder != null) {
- updateForwarder.close();
- updateForwarder = null;
- }
- }
-
- @Test
- public void testInit() throws Exception {
- DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
- imageRetreiver.setState("a,b,c");
- updateForwarder = UpdateForwarder.create(
- testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 10, true);
- Assert.assertEquals(-2, updateForwarder.getLastUpdatedSeqNum());
- List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
- Assert.assertTrue(allUpdates.size() == 1);
- Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
-
- // If the current process has restarted the input seqNum will be > currSeq
- allUpdates = updateForwarder.getAllUpdatesFrom(100);
- Assert.assertTrue(allUpdates.size() == 1);
- Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
- Assert.assertEquals(-2, allUpdates.get(0).getSeqNum());
- allUpdates = updateForwarder.getAllUpdatesFrom(-1);
- Assert.assertEquals(0, allUpdates.size());
- }
-
- @Test
- public void testUpdateReceive() throws Exception {
- DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
- imageRetreiver.setState("a,b,c");
- updateForwarder = UpdateForwarder.create(
- testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5, true);
- updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
- List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
- Assert.assertEquals(2, allUpdates.size());
- Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
- Assert.assertEquals("d", allUpdates.get(1).getState());
- }
-
- // This happens when we the first update from HMS is a -1 (If the heartbeat
- // thread checks Sentry's current seqNum before any update has come in)..
- // This will lead the first and second entries in the updatelog to differ
- // by more than +1..
- @Test
- public void testUpdateReceiveWithNullImageRetriver() throws Exception {
- Assume.assumeTrue(!testConf.getBoolean(ServerConfig.SENTRY_HA_ENABLED,
- false));
- updateForwarder = UpdateForwarder.create(
- testConf, new DummyUpdatable(), new DummyUpdate(), null, 5, false);
- updateForwarder.handleUpdateNotification(new DummyUpdate(-1, true).setState("a"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(1);
- Assert.assertEquals("a", allUpdates.get(0).getState());
- updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("b"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("c"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
- allUpdates = updateForwarder.getAllUpdatesFrom(0);
- Assert.assertEquals(2, allUpdates.size());
- Assert.assertEquals("b", allUpdates.get(0).getState());
- Assert.assertEquals("c", allUpdates.get(1).getState());
- }
-
- @Test
- public void testGetUpdates() throws Exception {
- DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
- imageRetreiver.setState("a,b,c");
- updateForwarder = UpdateForwarder.create(
- testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5, true);
- updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
- List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
- Assert.assertEquals(2, allUpdates.size());
-
- updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
-
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
- allUpdates = updateForwarder.getAllUpdatesFrom(0);
- Assert.assertEquals(4, allUpdates.size());
- Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
- Assert.assertEquals(4, allUpdates.get(0).getSeqNum());
- Assert.assertEquals("d", allUpdates.get(1).getState());
- Assert.assertEquals(5, allUpdates.get(1).getSeqNum());
- Assert.assertEquals("e", allUpdates.get(2).getState());
- Assert.assertEquals(6, allUpdates.get(2).getSeqNum());
- Assert.assertEquals("f", allUpdates.get(3).getState());
- Assert.assertEquals(7, allUpdates.get(3).getSeqNum());
-
- updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum());
- allUpdates = updateForwarder.getAllUpdatesFrom(8);
- Assert.assertEquals(1, allUpdates.size());
- Assert.assertEquals("g", allUpdates.get(0).getState());
- }
-
- @Test
- public void testGetUpdatesAfterExternalEntityReset() throws Exception {
- /*
- * Disabled for Sentry HA. Since the sequence numbers are trakced in ZK, the
- * lower sequence updates are ignored which causes this test to fail in HA
- * mode
- */
- Assume.assumeTrue(!testConf.getBoolean(ServerConfig.SENTRY_HA_ENABLED,
- false));
-
- DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
- imageRetreiver.setState("a,b,c");
- updateForwarder = UpdateForwarder.create(
- testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5, true);
- updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
-
- updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
-
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
- List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
- Assert.assertEquals(4, allUpdates.size());
- Assert.assertEquals("f", allUpdates.get(3).getState());
- Assert.assertEquals(7, allUpdates.get(3).getSeqNum());
-
- updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum());
- allUpdates = updateForwarder.getAllUpdatesFrom(8);
- Assert.assertEquals(1, allUpdates.size());
- Assert.assertEquals("g", allUpdates.get(0).getState());
-
- imageRetreiver.setState("a,b,c,d,e,f,g,h");
-
- // New update comes with SeqNum = 1
- updateForwarder.handleUpdateNotification(new DummyUpdate(1, false).setState("h"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- // NN plugin asks for next update
- allUpdates = updateForwarder.getAllUpdatesFrom(9);
- Assert.assertEquals(1, allUpdates.size());
- Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState());
- // Assert.assertEquals(1, allUpdates.get(0).getSeqNum());
- }
-
- @Test
- public void testUpdateLogCompression() throws Exception {
- DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
- imageRetreiver.setState("a,b,c");
- updateForwarder = UpdateForwarder.create(
- testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5, true);
- updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
- List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
- Assert.assertEquals(2, allUpdates.size());
-
- updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(9, false).setState("h"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(10, false).setState("i"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(11, false).setState("j"));
-
- while(!updateForwarder.areAllUpdatesCommited()) {
- Thread.sleep(100);
- }
- Assert.assertEquals(11, updateForwarder.getLastUpdatedSeqNum());
- allUpdates = updateForwarder.getAllUpdatesFrom(0);
- Assert.assertEquals(3, allUpdates.size());
- Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState());
- Assert.assertEquals(9, allUpdates.get(0).getSeqNum());
- Assert.assertEquals("i", allUpdates.get(1).getState());
- Assert.assertEquals(10, allUpdates.get(1).getSeqNum());
- Assert.assertEquals("j", allUpdates.get(2).getState());
- Assert.assertEquals(11, allUpdates.get(2).getSeqNum());
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index 3536579..d1edcb1 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -3404,6 +3404,33 @@ public class SentryStore {
}
/**
+ * Get the list of MSentryPermChange objects greater than and
+ * equal with the given ChangeID.
+ *
+ * @param changeID
+ * @return the list of MSentryPermChange objects
+ * @throws Exception
+ */
+ public List<MSentryPermChange> getMSentryPermChanges(final long changeID)
+ throws Exception {
+ return tm.executeTransaction(
+ new TransactionBlock<List<MSentryPermChange>>() {
+ public List<MSentryPermChange> execute(PersistenceManager pm) throws Exception {
+ Query query = pm.newQuery(MSentryPermChange.class);
+ query.setFilter("this.changeID >= t");
+ query.declareParameters("long t");
+ List<MSentryPermChange> permChanges =
+ (List<MSentryPermChange>)query.execute(changeID);
+ if (permChanges == null) {
+ noSuchUpdate(changeID);
+ }
+
+ return permChanges;
+ }
+ });
+ }
+
+ /**
* Get the MSentryPermChange object by ChangeID.
*
* @param changeID the given changeID.
@@ -3429,6 +3456,57 @@ public class SentryStore {
}
/**
+ * Find the MSentryPermChange object by ChangeID.
+ *
+ * @param changeID
+ * @return true if found the MSentryPermChange object, otherwise false.
+ * @throws Exception
+ */
+ public boolean findMSentryPermChangeByID(final long changeID) throws Exception {
+ return tm.executeTransaction(
+ new TransactionBlock<Boolean>() {
+ public Boolean execute(PersistenceManager pm) throws Exception {
+ Query query = pm.newQuery(MSentryPermChange.class);
+ query.setFilter("this.changeID == t");
+ query.declareParameters("long t");
+ List<MSentryPermChange> pathChanges =
+ (List<MSentryPermChange>)query.execute(changeID);
+ if (pathChanges == null) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+ });
+ }
+
+ /**
+ * Get the list of MSentryPathChange objects greater than and
+ * equal with the given ChangeID.
+ *
+ * @param changeID
+ * @return the list of MSentryPathChange objects
+ * @throws Exception
+ */
+ public List<MSentryPathChange> getMSentryPathChanges(final long changeID)
+ throws Exception {
+ return tm.executeTransaction(
+ new TransactionBlock<List<MSentryPathChange>>() {
+ public List<MSentryPathChange> execute(PersistenceManager pm) throws Exception {
+ Query query = pm.newQuery(MSentryPathChange.class);
+ query.setFilter("this.changeID >= t");
+ query.declareParameters("long t");
+ List<MSentryPathChange> pathChanges =
+ (List<MSentryPathChange>)query.execute(changeID);
+ if (pathChanges == null) {
+ noSuchUpdate(changeID);
+ }
+ return pathChanges;
+ }
+ });
+ }
+
+ /**
* Get the MSentryPathChange object by ChangeID.
*/
public MSentryPathChange getMSentryPathChangeByID(final long changeID) throws Exception {
@@ -3451,6 +3529,30 @@ public class SentryStore {
}
/**
+ * Find the MSentryPathChange object by ChangeID.
+ *
+ * @param changeID
+ * @return true if found the MSentryPathChange object, otherwise false.
+ * @throws Exception
+ */
+ public boolean findMSentryPathChangeByID(final long changeID) throws Exception {
+ return tm.executeTransaction(
+ new TransactionBlock<Boolean>() {
+ public Boolean execute(PersistenceManager pm) throws Exception {
+ Query query = pm.newQuery(MSentryPathChange.class);
+ query.setFilter("this.changeID == t");
+ query.declareParameters("long t");
+ List<MSentryPathChange> pathChanges =
+ (List<MSentryPathChange>)query.execute(changeID);
+ if (pathChanges == null) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+ });
+ }
+ /**
* Execute Perm/Path UpdateTransaction and corresponding actual
* action transaction, e.g dropSentryRole, in a single transaction.
* The order of the transaction does not matter because there is no
http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
index be59c1e..7d818c1 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
@@ -158,8 +158,9 @@ public class SentryService implements Callable, SigUtils.SigListener {
if (notificationLogEnabled) {
try {
hmsFollowerExecutor = Executors.newScheduledThreadPool(1);
+ //TODO: make initialDelay and period time to be configurable
hmsFollowerExecutor.scheduleAtFixedRate(new HMSFollower(conf, leaderMonitor),
- 60000, 500, TimeUnit.MILLISECONDS);
+ 30000, 500, TimeUnit.MILLISECONDS);
} catch (Exception e) {
//TODO: Handle
LOGGER.error("Could not start HMSFollower");
http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
index 4f4d3e6..7b6d753 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
@@ -221,7 +221,7 @@ public class TestHDFSIntegration {
@Override
public Void run() throws Exception {
HiveConf hiveConf = new HiveConf();
- hiveConf.set("sentry.metastore.plugins", "org.apache.sentry.hdfs.MetastorePlugin");
+ //hiveConf.set("sentry.metastore.plugins", "org.apache.sentry.hdfs.MetastorePlugin");
hiveConf.set("sentry.service.client.server.rpc-address", "localhost");
hiveConf.set("sentry.hdfs.service.client.server.rpc-address", "localhost");
hiveConf.set("sentry.hdfs.service.client.server.rpc-port", String.valueOf(sentryPort));
@@ -261,7 +261,8 @@ public class TestHDFSIntegration {
hiveConf.set("hive.metastore.authorization.storage.checks", "true");
hiveConf.set("hive.metastore.uris", "thrift://localhost:" + hmsPort);
hiveConf.set("hive.metastore.pre.event.listeners", "org.apache.sentry.binding.metastore.MetastoreAuthzBinding");
- hiveConf.set("hive.metastore.event.listeners", "org.apache.sentry.binding.metastore.SentryMetastorePostEventListener");
+ hiveConf.set("hive.metastore.event.listeners", "org.apache.sentry.binding.metastore.SentryMetastorePostEventListenerNotificationLog");
+ hiveConf.set("hcatalog.message.factory.impl.json","org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory");
hiveConf.set("hive.security.authorization.task.factory", "org.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl");
hiveConf.set("hive.server2.session.hook", "org.apache.sentry.binding.hive.HiveAuthzBindingSessionHook");
hiveConf.set("sentry.metastore.service.users", "hive");// queries made by hive user (beeline) skip meta store check
@@ -451,6 +452,7 @@ public class TestHDFSIntegration {
properties.put(ServerConfig.RPC_ADDRESS, "localhost");
properties.put(ServerConfig.RPC_PORT, String.valueOf(sentryPort > 0 ? sentryPort : 0));
properties.put(ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false");
+ properties.put(ServerConfig.SENTRY_NOTIFICATION_LOG_ENABLED,"true");
properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING, ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING);
properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, policyFileLocation.getPath());