You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ls...@apache.org on 2014/11/13 08:43:39 UTC
[2/9] incubator-sentry git commit: SENTRY-432: Synchronization of
HDFS permissions to Sentry permissions (Arun Suresh via Lenni Kuff)
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
new file mode 100644
index 0000000..6b3e2e2
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import org.apache.sentry.hdfs.PermissionsUpdate;
+import org.apache.sentry.hdfs.Updateable;
+import org.apache.sentry.hdfs.UpdateForwarder.ExternalImageRetriever;
+
+public class UpdateablePermissions implements Updateable<PermissionsUpdate>{
+
+ private AtomicLong seqNum = new AtomicLong();
+ private final ExternalImageRetriever<PermissionsUpdate> imageRetreiver;
+
+ public UpdateablePermissions(
+ ExternalImageRetriever<PermissionsUpdate> imageRetreiver) {
+ this.imageRetreiver = imageRetreiver;
+ }
+
+ @Override
+ public PermissionsUpdate createFullImageUpdate(long currSeqNum) {
+ return imageRetreiver.retrieveFullImage(currSeqNum);
+ }
+
+ @Override
+ public long getLastUpdatedSeqNum() {
+ return seqNum.get();
+ }
+
+ @Override
+ public void updatePartial(Iterable<PermissionsUpdate> update,
+ ReadWriteLock lock) {
+ for (PermissionsUpdate permsUpdate : update) {
+ seqNum.set(permsUpdate.getSeqNum());
+ }
+ }
+
+ @Override
+ public Updateable<PermissionsUpdate> updateFull(PermissionsUpdate update) {
+ UpdateablePermissions other = new UpdateablePermissions(imageRetreiver);
+ other.seqNum.set(update.getSeqNum());
+ return other;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
new file mode 100644
index 0000000..0c55bb1
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.hdfs;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import junit.framework.Assert;
+
+import org.apache.sentry.hdfs.UpdateForwarder;
+import org.apache.sentry.hdfs.Updateable;
+import org.apache.sentry.hdfs.UpdateForwarder.ExternalImageRetriever;
+import org.apache.sentry.hdfs.Updateable.Update;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+public class TestUpdateForwarder {
+
+ static class DummyUpdate implements Update {
+ private long seqNum = 0;
+ private boolean hasFullUpdate = false;
+ private String state;
+ public DummyUpdate(long seqNum, boolean hasFullUpdate) {
+ this.seqNum = seqNum;
+ this.hasFullUpdate = hasFullUpdate;
+ }
+ public String getState() {
+ return state;
+ }
+ public DummyUpdate setState(String stuff) {
+ this.state = stuff;
+ return this;
+ }
+ @Override
+ public boolean hasFullImage() {
+ return hasFullUpdate;
+ }
+ @Override
+ public long getSeqNum() {
+ return seqNum;
+ }
+ @Override
+ public void setSeqNum(long seqNum) {
+ this.seqNum = seqNum;
+ }
+ }
+
+ static class DummyUpdatable implements Updateable<DummyUpdate> {
+
+ private List<String> state = new LinkedList<String>();
+ private long lastUpdatedSeqNum = 0;
+
+ @Override
+ public void updatePartial(Iterable<DummyUpdate> update, ReadWriteLock lock) {
+ for (DummyUpdate u : update) {
+ state.add(u.getState());
+ lastUpdatedSeqNum = u.seqNum;
+ }
+ }
+
+ @Override
+ public Updateable<DummyUpdate> updateFull(DummyUpdate update) {
+ DummyUpdatable retVal = new DummyUpdatable();
+ retVal.lastUpdatedSeqNum = update.seqNum;
+ retVal.state = Lists.newArrayList(update.state.split(","));
+ return retVal;
+ }
+
+ @Override
+ public long getLastUpdatedSeqNum() {
+ return lastUpdatedSeqNum;
+ }
+
+ @Override
+ public DummyUpdate createFullImageUpdate(long currSeqNum) {
+ DummyUpdate retVal = new DummyUpdate(currSeqNum, true);
+ retVal.state = Joiner.on(",").join(state);
+ return retVal;
+ }
+
+ public String getState() {
+ return Joiner.on(",").join(state);
+ }
+ }
+
+ static class DummyImageRetreiver implements ExternalImageRetriever<DummyUpdate> {
+
+ private String state;
+ public void setState(String state) {
+ this.state = state;
+ }
+ @Override
+ public DummyUpdate retrieveFullImage(long currSeqNum) {
+ DummyUpdate retVal = new DummyUpdate(currSeqNum, true);
+ retVal.state = state;
+ return retVal;
+ }
+ }
+
+ @Test
+ public void testInit() {
+ DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
+ imageRetreiver.setState("a,b,c");
+ UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
+ new DummyUpdatable(), imageRetreiver, 10);
+ Assert.assertEquals(-2, updateForwarder.getLastUpdatedSeqNum());
+ List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
+ Assert.assertTrue(allUpdates.size() == 1);
+ Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
+
+ // If the current process has restarted the input seqNum will be > currSeq
+ allUpdates = updateForwarder.getAllUpdatesFrom(100);
+ Assert.assertTrue(allUpdates.size() == 1);
+ Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
+ Assert.assertEquals(-2, allUpdates.get(0).getSeqNum());
+ allUpdates = updateForwarder.getAllUpdatesFrom(-1);
+ Assert.assertEquals(0, allUpdates.size());
+ }
+
+ @Test
+ public void testUpdateReceive() throws Exception {
+ DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
+ imageRetreiver.setState("a,b,c");
+ UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
+ new DummyUpdatable(), imageRetreiver, 5);
+ updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
+ while(!updateForwarder.areAllUpdatesCommited()) {
+ Thread.sleep(100);
+ }
+ Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
+ List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
+ Assert.assertEquals(2, allUpdates.size());
+ Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
+ Assert.assertEquals("d", allUpdates.get(1).getState());
+ }
+
+ // This happens when we the first update from HMS is a -1 (If the heartbeat
+ // thread checks Sentry's current seqNum before any update has come in)..
+ // This will lead the first and second entries in the updatelog to differ
+ // by more than +1..
+ @Test
+ public void testUpdateReceiveWithNullImageRetriver() throws Exception {
+ UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
+ new DummyUpdatable(), null, 5);
+ updateForwarder.handleUpdateNotification(new DummyUpdate(-1, true).setState("a"));
+ while(!updateForwarder.areAllUpdatesCommited()) {
+ Thread.sleep(100);
+ }
+ List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(1);
+ Assert.assertEquals("a", allUpdates.get(0).getState());
+ updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("b"));
+ while(!updateForwarder.areAllUpdatesCommited()) {
+ Thread.sleep(100);
+ }
+ updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("c"));
+ while(!updateForwarder.areAllUpdatesCommited()) {
+ Thread.sleep(100);
+ }
+ Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
+ allUpdates = updateForwarder.getAllUpdatesFrom(0);
+ Assert.assertEquals(2, allUpdates.size());
+ Assert.assertEquals("b", allUpdates.get(0).getState());
+ Assert.assertEquals("c", allUpdates.get(1).getState());
+ }
+
+ @Test
+ public void testGetUpdates() throws Exception {
+ DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
+ imageRetreiver.setState("a,b,c");
+ UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
+ new DummyUpdatable(), imageRetreiver, 5);
+ updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
+ while(!updateForwarder.areAllUpdatesCommited()) {
+ Thread.sleep(100);
+ }
+ Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
+ List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
+ Assert.assertEquals(2, allUpdates.size());
+
+ updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
+
+ while(!updateForwarder.areAllUpdatesCommited()) {
+ Thread.sleep(100);
+ }
+ Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
+ allUpdates = updateForwarder.getAllUpdatesFrom(0);
+ Assert.assertEquals(4, allUpdates.size());
+ Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
+ Assert.assertEquals(4, allUpdates.get(0).getSeqNum());
+ Assert.assertEquals("d", allUpdates.get(1).getState());
+ Assert.assertEquals(5, allUpdates.get(1).getSeqNum());
+ Assert.assertEquals("e", allUpdates.get(2).getState());
+ Assert.assertEquals(6, allUpdates.get(2).getSeqNum());
+ Assert.assertEquals("f", allUpdates.get(3).getState());
+ Assert.assertEquals(7, allUpdates.get(3).getSeqNum());
+
+ updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
+ while(!updateForwarder.areAllUpdatesCommited()) {
+ Thread.sleep(100);
+ }
+ Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum());
+ allUpdates = updateForwarder.getAllUpdatesFrom(8);
+ Assert.assertEquals(1, allUpdates.size());
+ Assert.assertEquals("g", allUpdates.get(0).getState());
+ }
+
+ @Test
+ public void testGetUpdatesAfterExternalEntityReset() throws Exception {
+ DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
+ imageRetreiver.setState("a,b,c");
+ UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
+ new DummyUpdatable(), imageRetreiver, 5);
+ updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
+ while(!updateForwarder.areAllUpdatesCommited()) {
+ Thread.sleep(100);
+ }
+
+ updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
+
+ while(!updateForwarder.areAllUpdatesCommited()) {
+ Thread.sleep(100);
+ }
+ Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
+ List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
+ Assert.assertEquals(4, allUpdates.size());
+ Assert.assertEquals("f", allUpdates.get(3).getState());
+ Assert.assertEquals(7, allUpdates.get(3).getSeqNum());
+
+ updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
+ while(!updateForwarder.areAllUpdatesCommited()) {
+ Thread.sleep(100);
+ }
+ Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum());
+ allUpdates = updateForwarder.getAllUpdatesFrom(8);
+ Assert.assertEquals(1, allUpdates.size());
+ Assert.assertEquals("g", allUpdates.get(0).getState());
+
+ imageRetreiver.setState("a,b,c,d,e,f,g,h");
+
+ // New update comes with SeqNum = 1
+ updateForwarder.handleUpdateNotification(new DummyUpdate(1, false).setState("h"));
+ while(!updateForwarder.areAllUpdatesCommited()) {
+ Thread.sleep(100);
+ }
+ // NN plugin asks for next update
+ allUpdates = updateForwarder.getAllUpdatesFrom(9);
+ Assert.assertEquals(1, allUpdates.size());
+ Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState());
+ Assert.assertEquals(1, allUpdates.get(0).getSeqNum());
+ }
+
+ @Test
+ public void testUpdateLogCompression() throws Exception {
+ DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
+ imageRetreiver.setState("a,b,c");
+ UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
+ new DummyUpdatable(), imageRetreiver, 5);
+ updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
+ while(!updateForwarder.areAllUpdatesCommited()) {
+ Thread.sleep(100);
+ }
+ Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
+ List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
+ Assert.assertEquals(2, allUpdates.size());
+
+ updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(9, false).setState("h"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(10, false).setState("i"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(11, false).setState("j"));
+
+ while(!updateForwarder.areAllUpdatesCommited()) {
+ Thread.sleep(100);
+ }
+ Assert.assertEquals(11, updateForwarder.getLastUpdatedSeqNum());
+ allUpdates = updateForwarder.getAllUpdatesFrom(0);
+ Assert.assertEquals(3, allUpdates.size());
+ Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState());
+ Assert.assertEquals(9, allUpdates.get(0).getSeqNum());
+ Assert.assertEquals("i", allUpdates.get(1).getState());
+ Assert.assertEquals(10, allUpdates.get(1).getSeqNum());
+ Assert.assertEquals("j", allUpdates.get(2).getState());
+ Assert.assertEquals(11, allUpdates.get(2).getSeqNum());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/pom.xml b/sentry-provider/sentry-provider-db/pom.xml
index fbf831a..e2f035f 100644
--- a/sentry-provider/sentry-provider-db/pom.xml
+++ b/sentry-provider/sentry-provider-db/pom.xml
@@ -42,6 +42,11 @@ limitations under the License.
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <version>2.5.0</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
@@ -89,6 +94,11 @@ limitations under the License.
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>0.13.1-cdh5.2.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
<artifactId>hive-shims</artifactId>
<scope>provided</scope>
</dependency>
@@ -163,6 +173,11 @@ limitations under the License.
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <version>${hive.version}</version>
+ </dependency>
</dependencies>
<build>
@@ -214,68 +229,5 @@ limitations under the License.
</plugin>
</plugins>
</build>
- <profiles>
- <profile>
- <id>thriftif</id>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-antrun-plugin</artifactId>
- <executions>
- <execution>
- <id>generate-thrift-sources</id>
- <phase>generate-sources</phase>
- <configuration>
- <target>
- <taskdef name="for" classname="net.sf.antcontrib.logic.ForTask"
- classpathref="maven.plugin.classpath" />
- <property name="thrift.args" value="-I ${thrift.home} --gen java:beans,hashcode"/>
- <property name="thrift.gen.dir" value="${basedir}/src/gen/thrift"/>
- <delete dir="${thrift.gen.dir}"/>
- <mkdir dir="${thrift.gen.dir}"/>
- <for param="thrift.file">
- <path>
- <fileset dir="${basedir}/src/main/resources/" includes="**/*.thrift" />
- </path>
- <sequential>
- <echo message="Generating Thrift code for @{thrift.file}"/>
- <exec executable="${thrift.home}/bin/thrift" failonerror="true" dir=".">
- <arg line="${thrift.args} -I ${basedir}/src/main/resources/ -o ${thrift.gen.dir} @{thrift.file} " />
- </exec>
- </sequential>
- </for>
- </target>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-enforcer-plugin</artifactId>
- <executions>
- <execution>
- <id>enforce-property</id>
- <goals>
- <goal>enforce</goal>
- </goals>
- <configuration>
- <rules>
- <requireProperty>
- <property>thrift.home</property>
- </requireProperty>
- </rules>
- <fail>true</fail>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryMetastoreListenerPlugin.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryMetastoreListenerPlugin.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryMetastoreListenerPlugin.java
new file mode 100644
index 0000000..79cf4a4
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryMetastoreListenerPlugin.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.provider.db;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Plugin interface providing hooks to implementing classes, which are invoked
+ * on path creation/updation and deletion
+ */
+public abstract class SentryMetastoreListenerPlugin {
+
+ private static List<SentryMetastoreListenerPlugin> registry = new LinkedList<SentryMetastoreListenerPlugin>();
+
+ public static void addToRegistry(SentryMetastoreListenerPlugin plugin) {
+ registry.add(plugin);
+ }
+
+ public static List<SentryMetastoreListenerPlugin> getPlugins() {
+ return registry;
+ }
+
+ public abstract void renameAuthzObject(String oldName, String oldPath,
+ String newName, String newPath);
+
+ public abstract void addPath(String authzObj, String path);
+
+ public abstract void removePath(String authzObj, String path);
+
+ public abstract void removeAllPaths(String authzObj, List<String> childObjects);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java
new file mode 100644
index 0000000..998a48b
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.SentryUserException;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleAddGroupsRequest;
+import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleDeleteGroupsRequest;
+import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleGrantPrivilegeRequest;
+import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleRevokePrivilegeRequest;
+import org.apache.sentry.provider.db.service.thrift.TDropPrivilegesRequest;
+import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleRequest;
+import org.apache.sentry.provider.db.service.thrift.TRenamePrivilegesRequest;
+
+public interface SentryPolicyStorePlugin {
+
+ @SuppressWarnings("serial")
+ public static class SentryPluginException extends SentryUserException {
+ public SentryPluginException(String msg) {
+ super(msg);
+ }
+ public SentryPluginException(String msg, Throwable t) {
+ super(msg, t);
+ }
+ }
+
+ public void initialize(Configuration conf, SentryStore sentryStore) throws SentryPluginException;
+
+ public void onAlterSentryRoleAddGroups(TAlterSentryRoleAddGroupsRequest tRequest) throws SentryPluginException;
+
+ public void onAlterSentryRoleDeleteGroups(TAlterSentryRoleDeleteGroupsRequest tRequest) throws SentryPluginException;
+
+ public void onAlterSentryRoleGrantPrivilege(TAlterSentryRoleGrantPrivilegeRequest tRequest) throws SentryPluginException;
+
+ public void onAlterSentryRoleRevokePrivilege(TAlterSentryRoleRevokePrivilegeRequest tRequest) throws SentryPluginException;
+
+ public void onDropSentryRole(TDropSentryRoleRequest tRequest) throws SentryPluginException;
+
+ public void onRenameSentryPrivilege(TRenamePrivilegesRequest request) throws SentryPluginException;
+
+ public void onDropSentryPrivilege(TDropPrivilegesRequest request) throws SentryPluginException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
index b66037a..5f34b4c 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
@@ -39,9 +39,10 @@ public class SimpleDBProviderBackend implements ProviderBackend {
private static final Logger LOGGER = LoggerFactory
.getLogger(SimpleDBProviderBackend.class);
- private final SentryPolicyServiceClient policyServiceClient;
+ private SentryPolicyServiceClient policyServiceClient;
private volatile boolean initialized;
+ private Configuration conf;
public SimpleDBProviderBackend(Configuration conf, String resourcePath) throws IOException {
// DB Provider doesn't use policy file path
@@ -50,6 +51,8 @@ public class SimpleDBProviderBackend implements ProviderBackend {
public SimpleDBProviderBackend(Configuration conf) throws IOException {
this(new SentryPolicyServiceClient(conf));
+ this.initialized = false;
+ this.conf = conf;
}
@VisibleForTesting
@@ -74,14 +77,28 @@ public class SimpleDBProviderBackend implements ProviderBackend {
*/
@Override
public ImmutableSet<String> getPrivileges(Set<String> groups, ActiveRoleSet roleSet, Authorizable... authorizableHierarchy) {
+ return getPrivileges(1, groups, roleSet, authorizableHierarchy);
+ }
+
+ private ImmutableSet<String> getPrivileges(int retryCount, Set<String> groups, ActiveRoleSet roleSet, Authorizable... authorizableHierarchy) {
if (!initialized) {
throw new IllegalStateException("Backend has not been properly initialized");
}
try {
- return ImmutableSet.copyOf(policyServiceClient.listPrivilegesForProvider(groups, roleSet, authorizableHierarchy));
- } catch (SentryUserException e) {
- String msg = "Unable to obtain privileges from server: " + e.getMessage();
- LOGGER.error(msg, e);
+ return ImmutableSet.copyOf(getSentryClient().listPrivilegesForProvider(groups, roleSet, authorizableHierarchy));
+ } catch (Exception e) {
+ policyServiceClient = null;
+ if (retryCount > 0) {
+ return getPrivileges(retryCount - 1, groups, roleSet, authorizableHierarchy);
+ } else {
+ String msg = "Unable to obtain privileges from server: " + e.getMessage();
+ LOGGER.error(msg, e);
+ try {
+ policyServiceClient.close();
+ } catch (Exception ex2) {
+ // Ignore
+ }
+ }
}
return ImmutableSet.of();
}
@@ -101,6 +118,19 @@ public class SimpleDBProviderBackend implements ProviderBackend {
}
}
+ private SentryPolicyServiceClient getSentryClient() {
+ if (policyServiceClient == null) {
+ try {
+ policyServiceClient = new SentryPolicyServiceClient(conf);
+ } catch (Exception e) {
+ LOGGER.error("Error connecting to Sentry ['{}'] !!",
+ e.getMessage());
+ policyServiceClient = null;
+ return null;
+ }
+ }
+ return policyServiceClient;
+ }
/**
* SimpleDBProviderBackend does not implement validatePolicy()
*/
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index f6699d2..743900b 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -24,6 +24,7 @@ import static org.apache.sentry.provider.common.ProviderConstants.KV_JOINER;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -86,6 +87,7 @@ public class SentryStore {
public static String NULL_COL = "__NULL__";
static final String DEFAULT_DATA_DIR = "sentry_policy_db";
+
/**
* Commit order sequence id. This is used by notification handlers
* to know the order in which events where committed to the database.
@@ -762,7 +764,6 @@ public class SentryStore {
}
}
-
List<MSentryPrivilege> getMSentryPrivileges(Set<String> roleNames, TSentryAuthorizable authHierarchy) {
if ((roleNames.size() == 0)||(roleNames == null)) return new ArrayList<MSentryPrivilege>();
boolean rollbackTransaction = true;
@@ -1506,4 +1507,82 @@ public class SentryStore {
return Sets.newHashSet(conf.getStrings(
ServerConfig.ADMIN_GROUPS, new String[]{}));
}
+
+ /**
+ * This returns a Mapping of AuthZObj(db/table) -> (Role -> permission)
+ */
+ public Map<String, HashMap<String, String>> retrieveFullPrivilegeImage() {
+ Map<String, HashMap<String, String>> retVal = new HashMap<String, HashMap<String,String>>();
+ boolean rollbackTransaction = true;
+ PersistenceManager pm = null;
+ try {
+ pm = openTransaction();
+ Query query = pm.newQuery(MSentryPrivilege.class);
+ String filters = "(serverName != \"__NULL__\") "
+ + "&& (dbName != \"__NULL__\") " + "&& (URI == \"__NULL__\")";
+ query.setFilter(filters.toString());
+ query
+ .setOrdering("serverName ascending, dbName ascending, tableName ascending");
+ List<MSentryPrivilege> privileges = (List<MSentryPrivilege>) query
+ .execute();
+ rollbackTransaction = false;
+ for (MSentryPrivilege mPriv : privileges) {
+ String authzObj = mPriv.getDbName();
+ if (!isNULL(mPriv.getTableName())) {
+ authzObj = authzObj + "." + mPriv.getTableName();
+ }
+ HashMap<String, String> pUpdate = retVal.get(authzObj);
+ if (pUpdate == null) {
+ pUpdate = new HashMap<String, String>();
+ retVal.put(authzObj, pUpdate);
+ }
+ for (MSentryRole mRole : mPriv.getRoles()) {
+ String existingPriv = pUpdate.get(mRole.getRoleName());
+ if (existingPriv == null) {
+ pUpdate.put(mRole.getRoleName(), mPriv.getAction().toUpperCase());
+ } else {
+ pUpdate.put(mRole.getRoleName(), existingPriv + ","
+ + mPriv.getAction().toUpperCase());
+ }
+ }
+ }
+ commitTransaction(pm);
+ return retVal;
+ } finally {
+ if (rollbackTransaction) {
+ rollbackTransaction(pm);
+ }
+ }
+ }
+
+ /**
+ * This returns a Mapping of Role -> [Groups]
+ */
+ public Map<String, LinkedList<String>> retrieveFullRoleImage() {
+ Map<String, LinkedList<String>> retVal = new HashMap<String, LinkedList<String>>();
+ boolean rollbackTransaction = true;
+ PersistenceManager pm = null;
+ try {
+ pm = openTransaction();
+ Query query = pm.newQuery(MSentryGroup.class);
+ List<MSentryGroup> groups = (List<MSentryGroup>) query.execute();
+ for (MSentryGroup mGroup : groups) {
+ for (MSentryRole role : mGroup.getRoles()) {
+ LinkedList<String> rUpdate = retVal.get(role.getRoleName());
+ if (rUpdate == null) {
+ rUpdate = new LinkedList<String>();
+ retVal.put(role.getRoleName(), rUpdate);
+ }
+ rUpdate.add(mGroup.getGroupName());
+ }
+ }
+ commitTransaction(pm);
+ return retVal;
+ } finally {
+ if (rollbackTransaction) {
+ rollbackTransaction(pm);
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
index b20e71e..4774b90 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@@ -18,16 +18,26 @@
package org.apache.sentry.provider.db.service.thrift;
+import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.sentry.SentryUserException;
import org.apache.sentry.core.model.db.AccessConstants;
import org.apache.sentry.provider.common.GroupMappingService;
@@ -35,12 +45,16 @@ import org.apache.sentry.provider.db.SentryAccessDeniedException;
import org.apache.sentry.provider.db.SentryAlreadyExistsException;
import org.apache.sentry.provider.db.SentryInvalidInputException;
import org.apache.sentry.provider.db.SentryNoSuchObjectException;
+import org.apache.sentry.provider.db.SentryPolicyStorePlugin;
+import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
import org.apache.sentry.provider.db.log.entity.JsonLogEntityFactory;
import org.apache.sentry.provider.db.log.util.Constants;
import org.apache.sentry.provider.db.service.persistent.CommitContext;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
import org.apache.sentry.provider.db.service.thrift.PolicyStoreConstants.PolicyStoreServerConfig;
+import org.apache.sentry.service.thrift.ServiceConstants.ConfUtilties;
import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
+import org.apache.sentry.service.thrift.ProcessorFactory;
import org.apache.sentry.service.thrift.Status;
import org.apache.sentry.service.thrift.TSentryResponseStatus;
import org.apache.thrift.TException;
@@ -62,6 +76,8 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
public static final String SENTRY_POLICY_SERVICE_NAME = "SentryPolicyService";
+ public static volatile SentryPolicyStoreProcessor instance;
+
private final String name;
private final Configuration conf;
private final SentryStore sentryStore;
@@ -70,6 +86,8 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
private boolean isReady;
SentryMetrics sentryMetrics;
+ private List<SentryPolicyStorePlugin> sentryPlugins = new LinkedList<SentryPolicyStorePlugin>();
+
public SentryPolicyStoreProcessor(String name, Configuration conf) throws Exception {
super();
this.name = name;
@@ -81,6 +99,23 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
isReady = true;
adminGroups = ImmutableSet.copyOf(toTrimedLower(Sets.newHashSet(conf.getStrings(
ServerConfig.ADMIN_GROUPS, new String[]{}))));
+ Iterable<String> pluginClasses = ConfUtilties.CLASS_SPLITTER
+ .split(conf.get(ServerConfig.SENTRY_POLICY_STORE_PLUGINS,
+ ServerConfig.SENTRY_POLICY_STORE_PLUGINS_DEFAULT).trim());
+ for (String pluginClassStr : pluginClasses) {
+ Class<?> clazz = conf.getClassByName(pluginClassStr);
+ if (!SentryPolicyStorePlugin.class.isAssignableFrom(clazz)) {
+ throw new IllegalArgumentException("Sentry Plugin ["
+ + pluginClassStr + "] is not a "
+ + SentryPolicyStorePlugin.class.getName());
+ }
+ SentryPolicyStorePlugin plugin = (SentryPolicyStorePlugin)clazz.newInstance();
+ plugin.initialize(conf, sentryStore);
+ sentryPlugins.add(plugin);
+ }
+ if (instance == null) {
+ instance = this;
+ }
initMetrics();
}
@@ -108,6 +143,11 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
}
}
+ public void registerPlugin(SentryPolicyStorePlugin plugin) throws SentryPluginException {
+ plugin.initialize(conf, sentryStore);
+ sentryPlugins.add(plugin);
+ }
+
@VisibleForTesting
static List<NotificationHandler> createHandlers(Configuration conf)
throws SentryConfigurationException {
@@ -211,6 +251,9 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
response.setPrivilege(request.getPrivilege());
notificationHandlerInvoker.alter_sentry_role_grant_privilege(commitContext,
request, response);
+ for (SentryPolicyStorePlugin plugin : sentryPlugins) {
+ plugin.onAlterSentryRoleGrantPrivilege(request);
+ }
} catch (SentryNoSuchObjectException e) {
String msg = "Role: " + request.getRoleName() + " doesn't exist.";
LOGGER.error(msg, e);
@@ -246,12 +289,15 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
response.setStatus(Status.OK());
notificationHandlerInvoker.alter_sentry_role_revoke_privilege(commitContext,
request, response);
+ for (SentryPolicyStorePlugin plugin : sentryPlugins) {
+ plugin.onAlterSentryRoleRevokePrivilege(request);
+ }
} catch (SentryNoSuchObjectException e) {
String msg = "Privilege: [server=" + request.getPrivilege().getServerName() +
- ",db=" + request.getPrivilege().getDbName() +
- ",table=" + request.getPrivilege().getTableName() +
- ",URI=" + request.getPrivilege().getURI() +
- ",action=" + request.getPrivilege().getAction() + "] doesn't exist.";
+ ",db=" + request.getPrivilege().getDbName() +
+ ",table=" + request.getPrivilege().getTableName() +
+ ",URI=" + request.getPrivilege().getURI() +
+ ",action=" + request.getPrivilege().getAction() + "] doesn't exist.";
LOGGER.error(msg, e);
response.setStatus(Status.NoSuchObject(msg, e));
} catch (SentryInvalidInputException e) {
@@ -287,6 +333,9 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
response.setStatus(Status.OK());
notificationHandlerInvoker.drop_sentry_role(commitContext,
request, response);
+ for (SentryPolicyStorePlugin plugin : sentryPlugins) {
+ plugin.onDropSentryRole(request);
+ }
} catch (SentryNoSuchObjectException e) {
String msg = "Role :" + request + " does not exist.";
LOGGER.error(msg, e);
@@ -320,6 +369,9 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
response.setStatus(Status.OK());
notificationHandlerInvoker.alter_sentry_role_add_groups(commitContext,
request, response);
+ for (SentryPolicyStorePlugin plugin : sentryPlugins) {
+ plugin.onAlterSentryRoleAddGroups(request);
+ }
} catch (SentryNoSuchObjectException e) {
String msg = "Role: " + request + " does not exist.";
LOGGER.error(msg, e);
@@ -353,6 +405,9 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
response.setStatus(Status.OK());
notificationHandlerInvoker.alter_sentry_role_delete_groups(commitContext,
request, response);
+ for (SentryPolicyStorePlugin plugin : sentryPlugins) {
+ plugin.onAlterSentryRoleDeleteGroups(request);
+ }
} catch (SentryNoSuchObjectException e) {
String msg = "Role: " + request + " does not exist.";
LOGGER.error(msg, e);
@@ -548,7 +603,10 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
try {
authorize(request.getRequestorUserName(), adminGroups);
sentryStore.dropPrivilege(request.getAuthorizable());
- response.setStatus(Status.OK());
+ for (SentryPolicyStorePlugin plugin : sentryPlugins) {
+ plugin.onDropSentryPrivilege(request);
+ }
+ response.setStatus(Status.OK());
} catch (SentryAccessDeniedException e) {
LOGGER.error(e.getMessage(), e);
response.setStatus(Status.AccessDenied(e.getMessage(), e));
@@ -572,6 +630,9 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
authorize(request.getRequestorUserName(), adminGroups);
sentryStore.renamePrivilege(request.getOldAuthorizable(),
request.getNewAuthorizable());
+ for (SentryPolicyStorePlugin plugin : sentryPlugins) {
+ plugin.onRenameSentryPrivilege(request);
+ }
response.setStatus(Status.OK());
} catch (SentryAccessDeniedException e) {
LOGGER.error(e.getMessage(), e);
@@ -633,6 +694,7 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
}
response.setPrivilegesMapByAuth(authRoleMap);
response.setStatus(Status.OK());
+ // TODO : Sentry - HDFS : Have to handle this
} catch (SentryAccessDeniedException e) {
LOGGER.error(e.getMessage(), e);
response.setStatus(Status.AccessDenied(e.getMessage(), e));
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
index 1e20ff1..b19b79c 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
@@ -54,6 +54,7 @@ import org.apache.sentry.service.thrift.ServiceConstants.ConfUtilties;
import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
import org.apache.thrift.TMultiplexedProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TSaslServerTransport;
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
index bc86963..03ed378 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
@@ -67,6 +67,13 @@ public class ServiceConstants {
public static final String RPC_MIN_THREADS = "sentry.service.server-min-threads";
public static final int RPC_MIN_THREADS_DEFAULT = 10;
public static final String ALLOW_CONNECT = "sentry.service.allow.connect";
+
+ public static final String SENTRY_POLICY_STORE_PLUGINS = "sentry.policy.store.plugins";
+ public static final String SENTRY_POLICY_STORE_PLUGINS_DEFAULT = "";
+
+ public static final String SENTRY_METASTORE_PLUGINS = "sentry.metastore.plugins";
+ public static final String SENTRY_METASTORE_PLUGINS_DEFAULT = "";
+
public static final String PROCESSOR_FACTORIES = "sentry.service.processor.factories";
public static final String PROCESSOR_FACTORIES_DEFAULT =
"org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessorFactory";
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyStoreProcessor.java
index 46f8fb8..ea4e967 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyStoreProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyStoreProcessor.java
@@ -21,6 +21,7 @@ import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.provider.db.service.thrift.PolicyStoreConstants.PolicyStoreServerConfig;
+import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerWithoutKerberos.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerWithoutKerberos.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerWithoutKerberos.java
index e5238a6..777c6d8 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerWithoutKerberos.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServerWithoutKerberos.java
@@ -18,7 +18,6 @@
package org.apache.sentry.provider.db.service.thrift;
import static junit.framework.Assert.assertEquals;
-import static org.junit.Assert.assertEquals;
import java.util.HashSet;
import java.util.Set;
@@ -77,7 +76,6 @@ public class TestSentryServerWithoutKerberos extends SentryServiceIntegrationBas
client.grantTablePrivilege(requestorUserName, roleName1, "server", "db2", "table3", "ALL");
client.grantTablePrivilege(requestorUserName, roleName1, "server", "db2", "table4", "ALL");
-
client.dropRoleIfExists(requestorUserName, roleName2);
client.createRole(requestorUserName, roleName2);
client.grantRoleToGroup(requestorUserName, group1, roleName2);
@@ -89,6 +87,7 @@ public class TestSentryServerWithoutKerberos extends SentryServiceIntegrationBas
client.grantTablePrivilege(requestorUserName, roleName2, "server", "db2", "table4", "ALL");
client.grantTablePrivilege(requestorUserName, roleName2, "server", "db3", "table5", "ALL");
+
Set<TSentryPrivilege> listPrivilegesByRoleName = client.listPrivilegesByRoleName(requestorUserName, roleName2, Lists.newArrayList(new Server("server"), new Database("db1")));
assertEquals("Privilege not assigned to role2 !!", 2, listPrivilegesByRoleName.size());
@@ -162,4 +161,5 @@ public class TestSentryServerWithoutKerberos extends SentryServiceIntegrationBas
assertEquals(0, client.listPrivilegesForProvider(requestorUserGroupNames,
ActiveRoleSet.ALL).size());
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-tests/sentry-tests-hive/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/pom.xml b/sentry-tests/sentry-tests-hive/pom.xml
index 10415fc..a3c3295 100644
--- a/sentry-tests/sentry-tests-hive/pom.xml
+++ b/sentry-tests/sentry-tests-hive/pom.xml
@@ -222,6 +222,21 @@ limitations under the License.
</dependency>
<dependency>
<groupId>org.apache.sentry</groupId>
+ <artifactId>sentry-hdfs-common</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sentry</groupId>
+ <artifactId>sentry-hdfs-service</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sentry</groupId>
+ <artifactId>sentry-hdfs-namenode-plugin</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sentry</groupId>
<artifactId>sentry-policy-db</artifactId>
<scope>test</scope>
</dependency>
@@ -229,12 +244,14 @@ limitations under the License.
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
+<!--
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
</exclusion>
</exclusions>
+-->
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
new file mode 100644
index 0000000..a488c94
--- /dev/null
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
@@ -0,0 +1,787 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.tests.e2e.hdfs;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.URL;
+import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.concurrent.TimeoutException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MiniMRClientCluster;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl;
+import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
+import org.apache.sentry.hdfs.SentryAuthorizationProvider;
+import org.apache.sentry.provider.db.SimpleDBProviderBackend;
+import org.apache.sentry.provider.file.LocalGroupResourceAuthorizationProvider;
+import org.apache.sentry.provider.file.PolicyFile;
+import org.apache.sentry.service.thrift.SentryService;
+import org.apache.sentry.service.thrift.SentryServiceFactory;
+import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
+import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
+import org.apache.sentry.tests.e2e.hive.StaticUserGroup;
+import org.apache.sentry.tests.e2e.hive.fs.MiniDFS;
+import org.apache.sentry.tests.e2e.hive.hiveserver.HiveServerFactory;
+import org.apache.sentry.tests.e2e.hive.hiveserver.InternalHiveServer;
+import org.apache.sentry.tests.e2e.hive.hiveserver.InternalMetastoreServer;
+import org.fest.reflect.core.Reflection;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.google.common.io.Resources;
+
+public class TestHDFSIntegration {
+
+ public static class WordCountMapper extends MapReduceBase implements
+ Mapper<LongWritable, Text, String, Long> {
+
+ public void map(LongWritable key, Text value,
+ OutputCollector<String, Long> output, Reporter reporter)
+ throws IOException {
+ StringTokenizer st = new StringTokenizer(value.toString());
+ while (st.hasMoreTokens()) {
+ output.collect(st.nextToken(), 1L);
+ }
+ }
+
+ }
+
+ public static class SumReducer extends MapReduceBase implements
+ Reducer<Text, Long, Text, Long> {
+
+ public void reduce(Text key, Iterator<Long> values,
+ OutputCollector<Text, Long> output, Reporter reporter)
+ throws IOException {
+
+ long sum = 0;
+ while (values.hasNext()) {
+ sum += values.next();
+ }
+ output.collect(key, sum);
+ }
+
+ }
+
+ private static final int NUM_RETRIES = 10;
+ private static final int RETRY_WAIT = 1000;
+
+ private MiniDFSCluster miniDFS;
+ private MiniMRClientCluster miniMR;
+ private InternalHiveServer hiveServer2;
+ private InternalMetastoreServer metastore;
+ private SentryService sentryService;
+ private String fsURI;
+ private int hmsPort;
+ private int sentryPort = -1;
+ private File baseDir;
+ private File policyFileLocation;
+ private UserGroupInformation adminUgi;
+ private UserGroupInformation hiveUgi;
+
+ protected static File assertCreateDir(File dir) {
+ if(!dir.isDirectory()) {
+ Assert.assertTrue("Failed creating " + dir, dir.mkdirs());
+ }
+ return dir;
+ }
+
+ private static int findPort() throws IOException {
+ ServerSocket socket = new ServerSocket(0);
+ int port = socket.getLocalPort();
+ socket.close();
+ return port;
+ }
+
+ private void waitOnSentryService() throws Exception {
+ sentryService.start();
+ final long start = System.currentTimeMillis();
+ while (!sentryService.isRunning()) {
+ Thread.sleep(1000);
+ if (System.currentTimeMillis() - start > 60000L) {
+ throw new TimeoutException("Server did not start after 60 seconds");
+ }
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+ Class.forName("org.apache.hive.jdbc.HiveDriver");
+ baseDir = Files.createTempDir();
+ policyFileLocation = new File(baseDir, HiveServerFactory.AUTHZ_PROVIDER_FILENAME);
+ PolicyFile policyFile = PolicyFile.setAdminOnServer1("hive")
+ .setUserGroupMapping(StaticUserGroup.getStaticMapping());
+ policyFile.write(policyFileLocation);
+
+ adminUgi = UserGroupInformation.createUserForTesting(
+ System.getProperty("user.name"), new String[] { "supergroup" });
+
+ hiveUgi = UserGroupInformation.createUserForTesting(
+ "hive", new String[] { "hive" });
+
+ // Start Sentry
+ startSentry();
+
+ // Start HDFS and MR
+ startDFSandYARN();
+
+ // Start HiveServer2 and Metastore
+ startHiveAndMetastore();
+
+ }
+
+ private void startHiveAndMetastore() throws IOException, InterruptedException {
+ hiveUgi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.set("sentry.metastore.plugins", "org.apache.sentry.hdfs.MetastorePlugin");
+ hiveConf.set("sentry.service.client.server.rpc-address", "localhost");
+ hiveConf.set("sentry.hdfs.service.client.server.rpc-address", "localhost");
+ hiveConf.set("sentry.hdfs.service.client.server.rpc-port", String.valueOf(sentryPort));
+ hiveConf.set("sentry.service.client.server.rpc-port", String.valueOf(sentryPort));
+// hiveConf.set("sentry.service.server.compact.transport", "true");
+// hiveConf.set("sentry.service.client.compact.transport", "true");
+ hiveConf.set("sentry.service.security.mode", "none");
+ hiveConf.set("sentry.hdfs.service.security.mode", "none");
+ hiveConf.set("sentry.hdfs.init.update.retry.delay.ms", "500");
+ hiveConf.set("sentry.hive.provider.backend", "org.apache.sentry.provider.db.SimpleDBProviderBackend");
+ hiveConf.set("sentry.provider", LocalGroupResourceAuthorizationProvider.class.getName());
+ hiveConf.set("sentry.hive.provider", LocalGroupResourceAuthorizationProvider.class.getName());
+ hiveConf.set("sentry.hive.provider.resource", policyFileLocation.getPath());
+ hiveConf.set("sentry.hive.testing.mode", "true");
+ hiveConf.set("sentry.hive.server", "server1");
+
+ hiveConf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING, ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING);
+ hiveConf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, policyFileLocation.getPath());
+ hiveConf.set("fs.defaultFS", fsURI);
+ hiveConf.set("fs.default.name", fsURI);
+ hiveConf.set("hive.metastore.execute.setugi", "true");
+ hiveConf.set("javax.jdo.option.ConnectionURL", "jdbc:derby:;databaseName=" + baseDir.getAbsolutePath() + "/metastore_db;create=true");
+ hiveConf.set("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
+ hiveConf.set("javax.jdo.option.ConnectionUserName", "hive");
+ hiveConf.set("javax.jdo.option.ConnectionPassword", "hive");
+ hiveConf.set("datanucleus.autoCreateSchema", "true");
+ hiveConf.set("datanucleus.fixedDatastore", "false");
+ hiveConf.set("datanucleus.autoStartMechanism", "SchemaTable");
+ hmsPort = findPort();
+ System.out.println("\n\n HMS port : " + hmsPort + "\n\n");
+ hiveConf.set("hive.metastore.uris", "thrift://localhost:" + hmsPort);
+ hiveConf.set("hive.metastore.pre.event.listeners", "org.apache.sentry.binding.metastore.MetastoreAuthzBinding");
+ hiveConf.set("hive.metastore.event.listeners", "org.apache.sentry.binding.metastore.SentryMetastorePostEventListener");
+ hiveConf.set("hive.security.authorization.task.factory", "org.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl");
+ hiveConf.set("hive.server2.session.hook", "org.apache.sentry.binding.hive.HiveAuthzBindingSessionHook");
+
+ HiveAuthzConf authzConf = new HiveAuthzConf(Resources.getResource("sentry-site.xml"));
+ authzConf.addResource(hiveConf);
+ File confDir = assertCreateDir(new File(baseDir, "etc"));
+ File accessSite = new File(confDir, HiveAuthzConf.AUTHZ_SITE_FILE);
+ OutputStream out = new FileOutputStream(accessSite);
+ authzConf.set("fs.defaultFS", fsURI);
+ authzConf.writeXml(out);
+ out.close();
+
+ hiveConf.set("hive.sentry.conf.url", accessSite.getPath());
+ System.out.println("Sentry client file : " + accessSite.getPath());
+
+ File hiveSite = new File(confDir, "hive-site.xml");
+ hiveConf.set("hive.server2.enable.doAs", "false");
+ hiveConf.set(HiveAuthzConf.HIVE_SENTRY_CONF_URL, accessSite.toURI().toURL()
+ .toExternalForm());
+ out = new FileOutputStream(hiveSite);
+ hiveConf.writeXml(out);
+ out.close();
+
+ Reflection.staticField("hiveSiteURL")
+ .ofType(URL.class)
+ .in(HiveConf.class)
+ .set(hiveSite.toURI().toURL());
+
+ metastore = new InternalMetastoreServer(hiveConf);
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ metastore.start();
+ while(true){}
+ } catch (Exception e) {
+ System.out.println("Could not start Hive Server");
+ }
+ }
+ }.start();
+
+ hiveServer2 = new InternalHiveServer(hiveConf);
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ hiveServer2.start();
+ while(true){}
+ } catch (Exception e) {
+ System.out.println("Could not start Hive Server");
+ }
+ }
+ }.start();
+
+ Thread.sleep(10000);
+ return null;
+ }
+ });
+ }
+
+ private void startDFSandYARN() throws IOException,
+ InterruptedException {
+ adminUgi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, "target/test/data");
+ Configuration conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.DFS_NAMENODE_AUTHORIZATION_PROVIDER_KEY,
+ SentryAuthorizationProvider.class.getName());
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+ File dfsDir = assertCreateDir(new File(baseDir, "dfs"));
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dfsDir.getPath());
+ conf.set("hadoop.security.group.mapping",
+ MiniDFS.PseudoGroupMappingService.class.getName());
+ Configuration.addDefaultResource("test.xml");
+
+ conf.set("sentry.authorization-provider.hdfs-path-prefixes", "/user/hive/warehouse,/tmp/external");
+ conf.set("sentry.authorization-provider.cache-refresh-retry-wait.ms", "5000");
+ conf.set("sentry.authorization-provider.cache-stale-threshold.ms", "3000");
+
+ conf.set("sentry.hdfs.service.security.mode", "none");
+ conf.set("sentry.hdfs.service.client.server.rpc-address", "localhost");
+ conf.set("sentry.hdfs.service.client.server.rpc-port", String.valueOf(sentryPort));
+ EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+ miniDFS = new MiniDFSCluster.Builder(conf).build();
+ Path tmpPath = new Path("/tmp");
+ Path hivePath = new Path("/user/hive");
+ Path warehousePath = new Path(hivePath, "warehouse");
+ miniDFS.getFileSystem().mkdirs(warehousePath);
+ boolean directory = miniDFS.getFileSystem().isDirectory(warehousePath);
+ System.out.println("\n\n Is dir :" + directory + "\n\n");
+ System.out.println("\n\n DefaultFS :" + miniDFS.getFileSystem().getUri() + "\n\n");
+ fsURI = miniDFS.getFileSystem().getUri().toString();
+ conf.set("fs.defaultFS", fsURI);
+
+ // Create Yarn cluster
+ // miniMR = MiniMRClientClusterFactory.create(this.getClass(), 1, conf);
+
+ miniDFS.getFileSystem().mkdirs(tmpPath);
+ miniDFS.getFileSystem().setPermission(tmpPath, FsPermission.valueOf("drwxrwxrwx"));
+ miniDFS.getFileSystem().setOwner(hivePath, "hive", "hive");
+ miniDFS.getFileSystem().setOwner(warehousePath, "hive", "hive");
+ System.out.println("\n\n Owner :"
+ + miniDFS.getFileSystem().getFileStatus(warehousePath).getOwner()
+ + ", "
+ + miniDFS.getFileSystem().getFileStatus(warehousePath).getGroup()
+ + "\n\n");
+ System.out.println("\n\n Owner tmp :"
+ + miniDFS.getFileSystem().getFileStatus(tmpPath).getOwner() + ", "
+ + miniDFS.getFileSystem().getFileStatus(tmpPath).getGroup() + ", "
+ + miniDFS.getFileSystem().getFileStatus(tmpPath).getPermission() + ", "
+ + "\n\n");
+
+ int dfsSafeCheckRetry = 30;
+ boolean hasStarted = false;
+ for (int i = dfsSafeCheckRetry; i > 0; i--) {
+ if (!miniDFS.getFileSystem().isInSafeMode()) {
+ hasStarted = true;
+ System.out.println("HDFS safemode check num times : " + (31 - i));
+ break;
+ }
+ }
+ if (!hasStarted) {
+ throw new RuntimeException("HDFS hasnt exited safe mode yet..");
+ }
+
+ return null;
+ }
+ });
+ }
+
+ private void startSentry() throws IOException,
+ InterruptedException {
+ hiveUgi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ Configuration sentryConf = new Configuration(false);
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(HiveServerFactory.AUTHZ_PROVIDER_BACKEND,
+ SimpleDBProviderBackend.class.getName());
+ properties.put(ConfVars.HIVE_AUTHORIZATION_TASK_FACTORY.varname,
+ SentryHiveAuthorizationTaskFactoryImpl.class.getName());
+ properties
+ .put(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS.varname, "2");
+ properties.put("hive.metastore.uris", "thrift://localhost:" + hmsPort);
+ properties.put(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE);
+// properties.put("sentry.service.server.compact.transport", "true");
+ properties.put("sentry.hive.testing.mode", "true");
+ properties.put("sentry.service.reporting", "JMX");
+ properties.put(ServerConfig.ADMIN_GROUPS, "hive,admin");
+ properties.put(ServerConfig.RPC_ADDRESS, "localhost");
+ properties.put(ServerConfig.RPC_PORT, String.valueOf(sentryPort < 0 ? 0 : sentryPort));
+ properties.put(ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false");
+
+ properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING, ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING);
+ properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, policyFileLocation.getPath());
+ properties.put(ServerConfig.SENTRY_STORE_JDBC_URL,
+ "jdbc:derby:;databaseName=" + baseDir.getPath()
+ + "/sentrystore_db;create=true");
+ properties.put("sentry.service.processor.factories",
+ "org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessorFactory,org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory");
+ properties.put("sentry.policy.store.plugins", "org.apache.sentry.hdfs.SentryPlugin");
+ properties.put(ServerConfig.RPC_MIN_THREADS, "3");
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ sentryConf.set(entry.getKey(), entry.getValue());
+ }
+ sentryService = new SentryServiceFactory().create(sentryConf);
+ properties.put(ClientConfig.SERVER_RPC_ADDRESS, sentryService.getAddress()
+ .getHostName());
+ sentryConf.set(ClientConfig.SERVER_RPC_ADDRESS, sentryService.getAddress()
+ .getHostName());
+ properties.put(ClientConfig.SERVER_RPC_PORT,
+ String.valueOf(sentryService.getAddress().getPort()));
+ sentryConf.set(ClientConfig.SERVER_RPC_PORT,
+ String.valueOf(sentryService.getAddress().getPort()));
+ waitOnSentryService();
+ sentryPort = sentryService.getAddress().getPort();
+ System.out.println("\n\n Sentry port : " + sentryPort + "\n\n");
+ return null;
+ }
+ });
+ }
+
+ @After
+ public void cleanUp() throws Exception {
+ try {
+ if (miniDFS != null) {
+ miniDFS.shutdown();
+ }
+ } finally {
+ try {
+ if (hiveServer2 != null) {
+ hiveServer2.shutdown();
+ }
+ } finally {
+ if (metastore != null) {
+ metastore.shutdown();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testEnd2End() throws Throwable {
+
+ Connection conn = hiveServer2.createConnection("hive", "hive");
+ Statement stmt = conn.createStatement();
+ stmt.execute("create role admin_role");
+ stmt.execute("grant role admin_role to group hive");
+ stmt.execute("grant all on server server1 to role admin_role");
+ stmt.execute("create table p1 (s string) partitioned by (month int, day int)");
+ stmt.execute("alter table p1 add partition (month=1, day=1)");
+ stmt.execute("alter table p1 add partition (month=1, day=2)");
+ stmt.execute("alter table p1 add partition (month=2, day=1)");
+ stmt.execute("alter table p1 add partition (month=2, day=2)");
+
+ stmt.execute("create role p1_admin");
+ stmt.execute("grant role p1_admin to group hbase");
+
+ verifyOnAllSubDirs("/user/hive/warehouse/p1", null, "hbase", false);
+
+ loadData(stmt);
+
+ verifyHDFSandMR(stmt);
+
+ stmt.execute("revoke select on table p1 from role p1_admin");
+ verifyOnAllSubDirs("/user/hive/warehouse/p1", null, "hbase", false);
+
+ stmt.execute("grant all on table p1 to role p1_admin");
+ verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.ALL, "hbase", true);
+
+ stmt.execute("revoke select on table p1 from role p1_admin");
+ verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.WRITE_EXECUTE, "hbase", true);
+
+ // Verify table rename works
+ stmt.execute("alter table p1 rename to p3");
+ verifyOnAllSubDirs("/user/hive/warehouse/p3", FsAction.WRITE_EXECUTE, "hbase", true);
+
+ stmt.execute("alter table p3 partition (month=1, day=1) rename to partition (month=1, day=3)");
+ verifyOnAllSubDirs("/user/hive/warehouse/p3", FsAction.WRITE_EXECUTE, "hbase", true);
+ verifyOnAllSubDirs("/user/hive/warehouse/p3/month=1/day=3", FsAction.WRITE_EXECUTE, "hbase", true);
+
+ sentryService.stop();
+ // Verify that Sentry permission are still enforced for the "stale" period
+ verifyOnAllSubDirs("/user/hive/warehouse/p3", FsAction.WRITE_EXECUTE, "hbase", true);
+
+ // Verify that Sentry permission are NOT enforced AFTER "stale" period
+ verifyOnAllSubDirs("/user/hive/warehouse/p3", null, "hbase", false);
+
+ startSentry();
+ // Verify that After Sentry restart permissions are re-enforced
+ verifyOnAllSubDirs("/user/hive/warehouse/p3", FsAction.WRITE_EXECUTE, "hbase", true);
+
+ // Create new table and verify everything is fine after restart...
+ stmt.execute("create table p2 (s string) partitioned by (month int, day int)");
+ stmt.execute("alter table p2 add partition (month=1, day=1)");
+ stmt.execute("alter table p2 add partition (month=1, day=2)");
+ stmt.execute("alter table p2 add partition (month=2, day=1)");
+ stmt.execute("alter table p2 add partition (month=2, day=2)");
+
+ verifyOnAllSubDirs("/user/hive/warehouse/p2", null, "hbase", false);
+
+ stmt.execute("grant select on table p2 to role p1_admin");
+ verifyOnAllSubDirs("/user/hive/warehouse/p2", FsAction.READ_EXECUTE, "hbase", true);
+
+ stmt.execute("grant select on table p2 to role p1_admin");
+ verifyOnAllSubDirs("/user/hive/warehouse/p2", FsAction.READ_EXECUTE, "hbase", true);
+
+ // Create external table
+ writeToPath("/tmp/external/ext1", 5, "foo", "bar");
+
+ stmt.execute("create table ext1 (s string) location \'/tmp/external/ext1\'");
+ verifyQuery(stmt, "ext1", 5);
+
+ // Ensure existing group permissions are never returned..
+ verifyOnAllSubDirs("/tmp/external/ext1", null, "bar", false);
+ verifyOnAllSubDirs("/tmp/external/ext1", null, "hbase", false);
+
+ stmt.execute("grant all on table ext1 to role p1_admin");
+ verifyOnAllSubDirs("/tmp/external/ext1", FsAction.ALL, "hbase", true);
+
+ stmt.execute("revoke select on table ext1 from role p1_admin");
+ verifyOnAllSubDirs("/tmp/external/ext1", FsAction.WRITE_EXECUTE, "hbase", true);
+
+ // Verify database operations works correctly
+ stmt.execute("create database db1");
+ verifyOnAllSubDirs("/user/hive/warehouse/db1.db", null, "hbase", false);
+
+ stmt.execute("create table db1.tbl1 (s string)");
+ verifyOnAllSubDirs("/user/hive/warehouse/db1.db/tbl1", null, "hbase", false);
+ stmt.execute("create table db1.tbl2 (s string)");
+ verifyOnAllSubDirs("/user/hive/warehouse/db1.db/tbl2", null, "hbase", false);
+
+ // Verify db privileges are propagated to tables
+ stmt.execute("grant select on database db1 to role p1_admin");
+ verifyOnAllSubDirs("/user/hive/warehouse/db1.db/tbl1", FsAction.READ_EXECUTE, "hbase", true);
+ verifyOnAllSubDirs("/user/hive/warehouse/db1.db/tbl2", FsAction.READ_EXECUTE, "hbase", true);
+
+ stmt.execute("use db1");
+ stmt.execute("grant all on table tbl1 to role p1_admin");
+
+ verifyOnAllSubDirs("/user/hive/warehouse/db1.db/tbl1", FsAction.ALL, "hbase", true);
+ verifyOnAllSubDirs("/user/hive/warehouse/db1.db/tbl2", FsAction.READ_EXECUTE, "hbase", true);
+
+ // Verify recursive revoke
+ stmt.execute("revoke select on database db1 from role p1_admin");
+
+ verifyOnAllSubDirs("/user/hive/warehouse/db1.db/tbl1", FsAction.WRITE_EXECUTE, "hbase", true);
+ verifyOnAllSubDirs("/user/hive/warehouse/db1.db/tbl2", null, "hbase", false);
+
+ // Verify cleanup..
+ stmt.execute("drop table tbl1");
+ Assert.assertFalse(miniDFS.getFileSystem().exists(new Path("/user/hive/warehouse/db1.db/tbl1")));
+
+ stmt.execute("drop table tbl2");
+ Assert.assertFalse(miniDFS.getFileSystem().exists(new Path("/user/hive/warehouse/db1.db/tbl2")));
+
+ stmt.execute("use default");
+ stmt.execute("drop database db1");
+ Assert.assertFalse(miniDFS.getFileSystem().exists(new Path("/user/hive/warehouse/db1.db")));
+
+ // START : Verify external table set location..
+ writeToPath("/tmp/external/tables/ext2_before/i=1", 5, "foo", "bar");
+ writeToPath("/tmp/external/tables/ext2_before/i=2", 5, "foo", "bar");
+
+ stmt.execute("create external table ext2 (s string) partitioned by (i int) location \'/tmp/external/tables/ext2_before\'");
+ stmt.execute("alter table ext2 add partition (i=1)");
+ stmt.execute("alter table ext2 add partition (i=2)");
+ verifyQuery(stmt, "ext2", 10);
+ verifyOnAllSubDirs("/tmp/external/tables/ext2_before", null, "hbase", false);
+ stmt.execute("grant all on table ext2 to role p1_admin");
+ verifyOnPath("/tmp/external/tables/ext2_before", FsAction.ALL, "hbase", true);
+ verifyOnPath("/tmp/external/tables/ext2_before/i=1", FsAction.ALL, "hbase", true);
+ verifyOnPath("/tmp/external/tables/ext2_before/i=2", FsAction.ALL, "hbase", true);
+ verifyOnPath("/tmp/external/tables/ext2_before/i=1/stuff.txt", FsAction.ALL, "hbase", true);
+ verifyOnPath("/tmp/external/tables/ext2_before/i=2/stuff.txt", FsAction.ALL, "hbase", true);
+
+ writeToPath("/tmp/external/tables/ext2_after/i=1", 6, "foo", "bar");
+ writeToPath("/tmp/external/tables/ext2_after/i=2", 6, "foo", "bar");
+
+ stmt.execute("alter table ext2 set location \'hdfs:///tmp/external/tables/ext2_after\'");
+ // Even though table location is altered, partition location is still old (still 10 rows)
+ verifyQuery(stmt, "ext2", 10);
+ // You have to explicitly alter partition location..
+ verifyOnPath("/tmp/external/tables/ext2_before", null, "hbase", false);
+ verifyOnPath("/tmp/external/tables/ext2_before/i=1", FsAction.ALL, "hbase", true);
+ verifyOnPath("/tmp/external/tables/ext2_before/i=2", FsAction.ALL, "hbase", true);
+ verifyOnPath("/tmp/external/tables/ext2_before/i=1/stuff.txt", FsAction.ALL, "hbase", true);
+ verifyOnPath("/tmp/external/tables/ext2_before/i=2/stuff.txt", FsAction.ALL, "hbase", true);
+
+ stmt.execute("alter table ext2 partition (i=1) set location \'hdfs:///tmp/external/tables/ext2_after/i=1\'");
+ stmt.execute("alter table ext2 partition (i=2) set location \'hdfs:///tmp/external/tables/ext2_after/i=2\'");
+ // Now that partition location is altered, it picks up new data (12 rows instead of 10)
+ verifyQuery(stmt, "ext2", 12);
+
+ verifyOnPath("/tmp/external/tables/ext2_before", null, "hbase", false);
+ verifyOnPath("/tmp/external/tables/ext2_before/i=1", null, "hbase", false);
+ verifyOnPath("/tmp/external/tables/ext2_before/i=2", null, "hbase", false);
+ verifyOnPath("/tmp/external/tables/ext2_before/i=1/stuff.txt", null, "hbase", false);
+ verifyOnPath("/tmp/external/tables/ext2_before/i=2/stuff.txt", null, "hbase", false);
+ verifyOnPath("/tmp/external/tables/ext2_after", FsAction.ALL, "hbase", true);
+ verifyOnPath("/tmp/external/tables/ext2_after/i=1", FsAction.ALL, "hbase", true);
+ verifyOnPath("/tmp/external/tables/ext2_after/i=2", FsAction.ALL, "hbase", true);
+ verifyOnPath("/tmp/external/tables/ext2_after/i=1/stuff.txt", FsAction.ALL, "hbase", true);
+ verifyOnPath("/tmp/external/tables/ext2_after/i=2/stuff.txt", FsAction.ALL, "hbase", true);
+ // END : Verify external table set location..
+
+ stmt.close();
+ conn.close();
+ }
+
+ private void verifyQuery(Statement stmt, String table, int n) throws Throwable {
+ verifyQuery(stmt, table, n, NUM_RETRIES);
+ }
+
+ private void verifyQuery(Statement stmt, String table, int n, int retry) throws Throwable {
+ ResultSet rs = null;
+ try {
+ rs = stmt.executeQuery("select * from " + table);
+ int numRows = 0;
+ while (rs.next()) { numRows++; }
+ Assert.assertEquals(n, numRows);
+ } catch (Throwable th) {
+ if (retry > 0) {
+ Thread.sleep(RETRY_WAIT);
+ verifyQuery(stmt, table, n, retry - 1);
+ } else {
+ throw th;
+ }
+ }
+ }
+
+ private void loadData(Statement stmt) throws IOException, SQLException {
+ FSDataOutputStream f1 = miniDFS.getFileSystem().create(new Path("/tmp/f1.txt"));
+ f1.writeChars("m1d1_t1\n");
+ f1.writeChars("m1d1_t2\n");
+ f1.writeChars("m1d1_t3\n");
+ f1.flush();
+ f1.close();
+ stmt.execute("load data inpath \'/tmp/f1.txt\' overwrite into table p1 partition (month=1, day=1)");
+ FSDataOutputStream f2 = miniDFS.getFileSystem().create(new Path("/tmp/f2.txt"));
+ f2.writeChars("m2d2_t4\n");
+ f2.writeChars("m2d2_t5\n");
+ f2.writeChars("m2d2_t6\n");
+ f2.flush();
+ f2.close();
+ stmt.execute("load data inpath \'/tmp/f2.txt\' overwrite into table p1 partition (month=2, day=2)");
+ ResultSet rs = stmt.executeQuery("select * from p1");
+ List<String> vals = new ArrayList<String>();
+ while (rs.next()) {
+ vals.add(rs.getString(1));
+ }
+ Assert.assertEquals(6, vals.size());
+ rs.close();
+ }
+
+ private void writeToPath(String path, int numRows, String user, String group) throws IOException {
+ Path p = new Path(path);
+ miniDFS.getFileSystem().mkdirs(p);
+ miniDFS.getFileSystem().setOwner(p, user, group);
+// miniDFS.getFileSystem().setPermission(p, FsPermission.valueOf("-rwxrwx---"));
+ FSDataOutputStream f1 = miniDFS.getFileSystem().create(new Path(path + "/stuff.txt"));
+ for (int i = 0; i < numRows; i++) {
+ f1.writeChars("random" + i + "\n");
+ }
+ f1.flush();
+ f1.close();
+ miniDFS.getFileSystem().setOwner(new Path(path + "/stuff.txt"), "asuresh", "supergroup");
+ miniDFS.getFileSystem().setPermission(new Path(path + "/stuff.txt"), FsPermission.valueOf("-rwxrwx---"));
+ }
+
+ private void verifyHDFSandMR(Statement stmt) throws Throwable {
+ // hbase user should not be allowed to read...
+ UserGroupInformation hbaseUgi = UserGroupInformation.createUserForTesting("hbase", new String[] {"hbase"});
+ hbaseUgi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ try {
+ miniDFS.getFileSystem().open(new Path("/user/hive/warehouse/p1/month=1/day=1/f1.txt"));
+ Assert.fail("Should not be allowed !!");
+ } catch (Exception e) {
+ Assert.assertEquals("Wrong Error : " + e.getMessage(), true, e.getMessage().contains("Permission denied: user=hbase"));
+ }
+ return null;
+ }
+ });
+
+ // WordCount should fail..
+ // runWordCount(new JobConf(miniMR.getConfig()), "/user/hive/warehouse/p1/month=1/day=1", "/tmp/wc_out");
+
+ stmt.execute("grant select on table p1 to role p1_admin");
+
+ verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.READ_EXECUTE, "hbase", true);
+ // hbase user should now be allowed to read...
+ hbaseUgi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ Path p = new Path("/user/hive/warehouse/p1/month=2/day=2/f2.txt");
+ BufferedReader in = new BufferedReader(new InputStreamReader(miniDFS.getFileSystem().open(p)));
+ String line = null;
+ List<String> lines = new ArrayList<String>();
+ do {
+ line = in.readLine();
+ if (line != null) lines.add(line);
+ } while (line != null);
+ Assert.assertEquals(3, lines.size());
+ in.close();
+ return null;
+ }
+ });
+
+ }
+
+ private void verifyOnAllSubDirs(String path, FsAction fsAction, String group, boolean groupShouldExist) throws Throwable {
+ verifyOnAllSubDirs(path, fsAction, group, groupShouldExist, true);
+ }
+
+ private void verifyOnPath(String path, FsAction fsAction, String group, boolean groupShouldExist) throws Throwable {
+ verifyOnAllSubDirs(path, fsAction, group, groupShouldExist, false);
+ }
+
+ private void verifyOnAllSubDirs(String path, FsAction fsAction, String group, boolean groupShouldExist, boolean recurse) throws Throwable {
+ verifyOnAllSubDirs(new Path(path), fsAction, group, groupShouldExist, recurse, NUM_RETRIES);
+ }
+
+ private void verifyOnAllSubDirs(Path p, FsAction fsAction, String group, boolean groupShouldExist, boolean recurse, int retry) throws Throwable {
+ FileStatus fStatus = null;
+ try {
+ fStatus = miniDFS.getFileSystem().getFileStatus(p);
+ if (groupShouldExist) {
+ Assert.assertEquals(fsAction, getAcls(p).get(group));
+ } else {
+ Assert.assertFalse(getAcls(p).containsKey(group));
+ }
+ } catch (Throwable th) {
+ if (retry > 0) {
+ Thread.sleep(RETRY_WAIT);
+ verifyOnAllSubDirs(p, fsAction, group, groupShouldExist, recurse, retry - 1);
+ } else {
+ throw th;
+ }
+ }
+ if (recurse) {
+ if (fStatus.isDirectory()) {
+ FileStatus[] children = miniDFS.getFileSystem().listStatus(p);
+ for (FileStatus fs : children) {
+ verifyOnAllSubDirs(fs.getPath(), fsAction, group, groupShouldExist, recurse, NUM_RETRIES);
+ }
+ }
+ }
+ }
+
+ private Map<String, FsAction> getAcls(Path path) throws Exception {
+ AclStatus aclStatus = miniDFS.getFileSystem().getAclStatus(path);
+ Map<String, FsAction> acls = new HashMap<String, FsAction>();
+ for (AclEntry ent : aclStatus.getEntries()) {
+ if (ent.getType().equals(AclEntryType.GROUP)) {
+ acls.put(ent.getName(), ent.getPermission());
+ }
+ }
+ return acls;
+ }
+
+ private void runWordCount(JobConf job, String inPath, String outPath) throws Exception {
+ Path in = new Path(inPath);
+ Path out = new Path(outPath);
+ miniDFS.getFileSystem().delete(out, true);
+ job.setJobName("TestWC");
+ JobClient jobClient = new JobClient(job);
+ RunningJob submittedJob = null;
+ FileInputFormat.setInputPaths(job, in);
+ FileOutputFormat.setOutputPath(job, out);
+ job.set("mapreduce.output.textoutputformat.separator", " ");
+ job.setInputFormat(TextInputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(LongWritable.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(LongWritable.class);
+ job.setMapperClass(WordCountMapper.class);
+ job.setReducerClass(SumReducer.class);
+ job.setOutputFormat(TextOutputFormat.class);
+ job.setNumReduceTasks(1);
+ job.setInt("mapreduce.map.maxattempts", 1);
+ job.setInt("mapreduce.reduce.maxattempts", 1);
+
+ submittedJob = jobClient.submitJob(job);
+ if (!jobClient.monitorAndPrintJob(job, submittedJob)) {
+ throw new IOException("job Failed !!");
+ }
+
+ }
+}