You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ha...@apache.org on 2016/03/22 07:23:59 UTC

[02/11] incubator-sentry git commit: SENTRY-1013: Add policy engine for Kafka (Ashish K Singh via Dapeng Sun)

SENTRY-1013: Add policy engine for Kafka (Ashish K Singh via Dapeng Sun)

Change-Id: I157bd09d6c053866e2c655383eeedee0fbc8fd50


Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/2575adde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/2575adde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/2575adde

Branch: refs/heads/master
Commit: 2575adde2a1fddc2594ecd0f3cdce7e5866cd228
Parents: ac6c0f0
Author: Sun Dapeng <sd...@apache.org>
Authored: Mon Feb 1 09:19:21 2016 +0800
Committer: hahao <ha...@cloudera.com>
Committed: Mon Mar 21 23:08:11 2016 -0700

----------------------------------------------------------------------
 pom.xml                                         |   5 +
 sentry-policy/pom.xml                           |   1 +
 sentry-policy/sentry-policy-kafka/pom.xml       |  80 +++++++
 .../policy/kafka/KafkaModelAuthorizables.java   |  57 +++++
 .../policy/kafka/KafkaPrivilegeValidator.java   |  68 ++++++
 .../policy/kafka/KafkaWildcardPrivilege.java    | 131 ++++++++++++
 .../policy/kafka/SimpleKafkaPolicyEngine.java   |  87 ++++++++
 .../kafka/KafkaPolicyFileProviderBackend.java   |  35 +++
 .../kafka/MockGroupMappingServiceProvider.java  |  39 ++++
 .../kafka/TestKafkaModelAuthorizables.java      |  54 +++++
 .../kafka/TestKafkaPrivilegeValidator.java      | 118 +++++++++++
 .../kafka/TestKafkaWildcardPrivilege.java       | 179 ++++++++++++++++
 .../engine/AbstractTestKafkaPolicyEngine.java   | 163 ++++++++++++++
 .../kafka/engine/TestKafkaPolicyEngineDFS.java  |  76 +++++++
 .../engine/TestKafkaPolicyEngineLocalFS.java    |  47 ++++
 ...tKafkaAuthorizationProviderGeneralCases.java | 212 +++++++++++++++++++
 ...tKafkaAuthorizationProviderSpecialCases.java |  88 ++++++++
 .../kafka/provider/TestKafkaPolicyNegative.java | 105 +++++++++
 .../src/test/resources/log4j.properties         |  31 +++
 .../src/test/resources/test-authz-provider.ini  |  38 ++++
 20 files changed, 1614 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6396853..ac2d596 100644
--- a/pom.xml
+++ b/pom.xml
@@ -486,6 +486,11 @@ limitations under the License.
       </dependency>
       <dependency>
         <groupId>org.apache.sentry</groupId>
+        <artifactId>sentry-policy-kafka</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.sentry</groupId>
         <artifactId>sentry-dist</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-policy/pom.xml b/sentry-policy/pom.xml
index ef938a6..45dc675 100644
--- a/sentry-policy/pom.xml
+++ b/sentry-policy/pom.xml
@@ -35,6 +35,7 @@ limitations under the License.
     <module>sentry-policy-indexer</module>
     <module>sentry-policy-search</module>
     <module>sentry-policy-sqoop</module>
+    <module>sentry-policy-kafka</module>
   </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/pom.xml b/sentry-policy/sentry-policy-kafka/pom.xml
new file mode 100644
index 0000000..21d34eb
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.sentry</groupId>
+    <artifactId>sentry-policy</artifactId>
+    <version>1.7.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>sentry-policy-kafka</artifactId>
+  <name>Sentry Policy for Kafka</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.shiro</groupId>
+      <artifactId>shiro-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-core-model-kafka</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-provider-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-provider-file</artifactId>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java
new file mode 100644
index 0000000..ba93036
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import org.apache.sentry.core.model.kafka.Cluster;
+import org.apache.sentry.core.model.kafka.ConsumerGroup;
+import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
+import org.apache.sentry.core.model.kafka.KafkaAuthorizable.AuthorizableType;
+import org.apache.sentry.core.model.kafka.Host;
+import org.apache.sentry.core.model.kafka.Topic;
+import org.apache.sentry.provider.common.KeyValue;
+
+public class KafkaModelAuthorizables {
+  public static KafkaAuthorizable from(KeyValue keyValue) {
+    String prefix = keyValue.getKey().toLowerCase();
+    String name = keyValue.getValue().toLowerCase();
+    for (AuthorizableType type : AuthorizableType.values()) {
+      if (prefix.equalsIgnoreCase(type.name())) {
+        return from(type, name);
+      }
+    }
+    return null;
+  }
+
+  public static KafkaAuthorizable from(String keyValue) {
+    return from(new KeyValue(keyValue));
+  }
+
+  public static KafkaAuthorizable from(AuthorizableType type, String name) {
+    switch (type) {
+      case HOST:
+        return new Host(name);
+      case CLUSTER:
+        return new Cluster(name);
+      case TOPIC:
+        return new Topic(name);
+      case CONSUMERGROUP:
+        return new ConsumerGroup(name);
+      default:
+        return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java
new file mode 100644
index 0000000..ecad355
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import static org.apache.sentry.provider.common.ProviderConstants.AUTHORIZABLE_SPLITTER;
+import static org.apache.sentry.provider.common.ProviderConstants.PRIVILEGE_PREFIX;
+
+import java.util.List;
+
+import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
+import org.apache.sentry.core.model.kafka.Host;
+import org.apache.sentry.policy.common.PrivilegeValidator;
+import org.apache.sentry.policy.common.PrivilegeValidatorContext;
+import org.apache.shiro.config.ConfigurationException;
+
+import com.google.common.collect.Lists;
+
+public class KafkaPrivilegeValidator implements PrivilegeValidator {
+
+  public KafkaPrivilegeValidator() {
+  }
+
+  @Override
+  public void validate(PrivilegeValidatorContext context)
+      throws ConfigurationException {
+    Iterable<KafkaAuthorizable> authorizables = parsePrivilege(context.getPrivilege());
+    boolean hostnameMatched = false;
+    for (KafkaAuthorizable authorizable : authorizables) {
+      if (authorizable instanceof Host) {
+        hostnameMatched = true;
+        break;
+      }
+    }
+    if (!hostnameMatched) {
+      String msg = "host=[name] in " + context.getPrivilege() + " is required.";
+      throw new ConfigurationException(msg);
+    }
+  }
+
+  private Iterable<KafkaAuthorizable> parsePrivilege(String string) {
+    List<KafkaAuthorizable> result = Lists.newArrayList();
+    for(String section : AUTHORIZABLE_SPLITTER.split(string)) {
+      if(!section.toLowerCase().startsWith(PRIVILEGE_PREFIX)) {
+        KafkaAuthorizable authorizable = KafkaModelAuthorizables.from(section);
+        if(authorizable == null) {
+          String msg = "No authorizable found for " + section;
+          throw new ConfigurationException(msg);
+        }
+        result.add(authorizable);
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
new file mode 100644
index 0000000..e04aeb7
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import static org.apache.sentry.provider.common.ProviderConstants.AUTHORIZABLE_SPLITTER;
+
+import java.util.List;
+
+import org.apache.sentry.core.model.kafka.KafkaActionConstant;
+import org.apache.sentry.policy.common.Privilege;
+import org.apache.sentry.policy.common.PrivilegeFactory;
+import org.apache.sentry.provider.common.KeyValue;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class KafkaWildcardPrivilege implements Privilege {
+
+  public static class Factory implements PrivilegeFactory {
+    @Override
+    public Privilege createPrivilege(String permission) {
+      return new KafkaWildcardPrivilege(permission);
+    }
+  }
+
+  private final ImmutableList<KeyValue> parts;
+
+  public KafkaWildcardPrivilege(String permission) {
+    if (Strings.isNullOrEmpty(permission)) {
+      throw new IllegalArgumentException("Permission string cannot be null or empty.");
+    }
+    List<KeyValue>parts = Lists.newArrayList();
+    for (String authorizable : AUTHORIZABLE_SPLITTER.trimResults().split(permission.trim())) {
+      if (authorizable.isEmpty()) {
+        throw new IllegalArgumentException("Privilege '" + permission + "' has an empty section");
+      }
+      parts.add(new KeyValue(authorizable));
+    }
+    if (parts.isEmpty()) {
+      throw new AssertionError("Privilege,  " + permission + ", did not consist of any valid authorizable.");
+    }
+    this.parts = ImmutableList.copyOf(parts);
+  }
+
+  @Override
+  public boolean implies(Privilege p) {
+    if (!(p instanceof KafkaWildcardPrivilege)) {
+      return false;
+    }
+    KafkaWildcardPrivilege wp = (KafkaWildcardPrivilege)p;
+    List<KeyValue> otherParts = wp.parts;
+    if(equals(wp)) {
+      return true;
+    }
+    int index = 0;
+    for (KeyValue otherPart : otherParts) {
+      // If this privilege has less parts than the other privilege, everything
+      // after the number of parts contained
+      // in this privilege is automatically implied, so return true
+      if (parts.size() - 1 < index) {
+        return true;
+      } else {
+        KeyValue part = parts.get(index);
+        // Support for action inheritance from parent to child
+        if (part.getKey().equalsIgnoreCase(KafkaActionConstant.actionName)
+            && !(otherPart.getKey().equalsIgnoreCase(KafkaActionConstant.actionName))) {
+          continue;
+        }
+        // are the keys even equal
+        if(!part.getKey().equalsIgnoreCase(otherPart.getKey())) {
+          return false;
+        }
+        if (!impliesKeyValue(part, otherPart)) {
+          return false;
+        }
+        index++;
+      }
+    }
+    // If this privilege has more parts than
+    // the other parts, only imply it if
+    // all of the other parts are "*" or "ALL"
+    for (; index < parts.size(); index++) {
+      KeyValue part = parts.get(index);
+      if (!part.getValue().equals(KafkaActionConstant.ALL)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean impliesKeyValue(KeyValue policyPart, KeyValue requestPart) {
+    Preconditions.checkState(policyPart.getKey().equalsIgnoreCase(requestPart.getKey()),
+        "Please report, this method should not be called with two different keys");
+    if(policyPart.getValue().equalsIgnoreCase(KafkaActionConstant.ALL) ||
+        policyPart.getValue().equalsIgnoreCase(KafkaActionConstant.ALL_NAME) ||
+        policyPart.equals(requestPart)) {
+      return true;
+    } else if (!KafkaActionConstant.actionName.equalsIgnoreCase(policyPart.getKey())
+        && KafkaActionConstant.ALL.equalsIgnoreCase(requestPart.getValue())) {
+      /* privilege request is to match with any object of given type */
+      return true;
+    }
+    return false;
+
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    for(KeyValue kv: this.parts) {
+      sb.append(kv.getKey() + "=" + kv.getValue() + "->");
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/SimpleKafkaPolicyEngine.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/SimpleKafkaPolicyEngine.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/SimpleKafkaPolicyEngine.java
new file mode 100644
index 0000000..7e043e1
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/SimpleKafkaPolicyEngine.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import java.util.Set;
+
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.SentryConfigurationException;
+import org.apache.sentry.policy.common.PolicyEngine;
+import org.apache.sentry.policy.common.PrivilegeFactory;
+import org.apache.sentry.policy.common.PrivilegeValidator;
+import org.apache.sentry.provider.common.ProviderBackend;
+import org.apache.sentry.provider.common.ProviderBackendContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+public class SimpleKafkaPolicyEngine implements PolicyEngine {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaPolicyEngine.class);
+  private final ProviderBackend providerBackend;
+
+  public SimpleKafkaPolicyEngine(ProviderBackend providerBackend) {
+    this.providerBackend = providerBackend;
+    ProviderBackendContext context = new ProviderBackendContext();
+    context.setAllowPerDatabase(false);
+    context.setValidators(ImmutableList.<PrivilegeValidator>of(new KafkaPrivilegeValidator()));
+    this.providerBackend.initialize(context);
+  }
+
+  @Override
+  public PrivilegeFactory getPrivilegeFactory() {
+    return new KafkaWildcardPrivilege.Factory();
+  }
+
+  @Override
+  public ImmutableSet<String> getAllPrivileges(Set<String> groups, ActiveRoleSet roleSet)
+      throws SentryConfigurationException {
+    return getPrivileges(groups, roleSet);
+  }
+
+  @Override
+  public ImmutableSet<String> getPrivileges(Set<String> groups, ActiveRoleSet roleSet,
+                                            Authorizable... authorizableHierarchy)
+      throws SentryConfigurationException {
+    if(LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Getting permissions for {}", groups);
+    }
+    ImmutableSet<String> result = providerBackend.getPrivileges(groups, roleSet);
+    if(LOGGER.isDebugEnabled()) {
+      LOGGER.debug("result = " + result);
+    }
+    return result;
+  }
+
+  @Override
+  public void close() {
+    if (providerBackend != null) {
+      providerBackend.close();
+    }
+  }
+
+  @Override
+  public void validatePolicy(boolean strictValidation)
+      throws SentryConfigurationException {
+    if (providerBackend != null) {
+      providerBackend.validatePolicy(strictValidation);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyFileProviderBackend.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyFileProviderBackend.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyFileProviderBackend.java
new file mode 100644
index 0000000..47a053d
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyFileProviderBackend.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.policy.kafka.SimpleKafkaPolicyEngine;
+import org.apache.sentry.provider.file.SimpleFileProviderBackend;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaPolicyFileProviderBackend extends SimpleKafkaPolicyEngine {
+  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPolicyFileProviderBackend.class);
+  public KafkaPolicyFileProviderBackend(String resource) throws IOException {
+    super(new SimpleFileProviderBackend(new Configuration(), resource));
+    LOGGER.warn("The DB provider backend is the preferred option over file provider backend as the kafka policy engine");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java
new file mode 100644
index 0000000..572c74d
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import java.util.Set;
+
+import org.apache.sentry.provider.common.GroupMappingService;
+
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+public class MockGroupMappingServiceProvider implements GroupMappingService {
+  private final Multimap<String, String> userToGroupMap;
+
+  public MockGroupMappingServiceProvider(Multimap<String, String> userToGroupMap) {
+    this.userToGroupMap = userToGroupMap;
+  }
+  @Override
+  public Set<String> getGroups(String user) {
+    return Sets.newHashSet(userToGroupMap.get(user));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
new file mode 100644
index 0000000..46a0078
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNull;
+
+import org.apache.sentry.core.model.kafka.Host;
+import org.junit.Test;
+
+public class TestKafkaModelAuthorizables {
+
+  @Test
+  public void testHost() throws Exception {
+    Host host1 = (Host)KafkaModelAuthorizables.from("HOST=host1");
+    assertEquals("host1", host1.getName());
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testNoKV() throws Exception {
+    System.out.println(KafkaModelAuthorizables.from("nonsense"));
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testEmptyKey() throws Exception {
+    System.out.println(KafkaModelAuthorizables.from("=host1"));
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testEmptyValue() throws Exception {
+    System.out.println(KafkaModelAuthorizables.from("HOST="));
+  }
+
+  @Test
+  public void testNotAuthorizable() throws Exception {
+    assertNull(KafkaModelAuthorizables.from("k=v"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
new file mode 100644
index 0000000..ba670f7
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import junit.framework.Assert;
+
+import org.apache.sentry.policy.common.PrivilegeValidatorContext;
+import org.apache.shiro.config.ConfigurationException;
+import org.junit.Test;
+
+public class TestKafkaPrivilegeValidator {
+  @Test
+  public void testOnlyHostResource() {
+    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1"));
+    } catch (ConfigurationException ex) {
+      Assert.fail("Unexpected ConfigurationException.");
+    }
+  }
+
+  @Test
+  public void testWithoutHostResource() throws Exception {
+    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("cluster=c1->action=read"));
+      Assert.fail("Expected ConfigurationException");
+    } catch (ConfigurationException ex) {
+    }
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("topic=t1->action=read"));
+      Assert.fail("Expected ConfigurationException");
+    } catch (ConfigurationException ex) {
+    }
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("consumergroup=g1->action=read"));
+      Assert.fail("Expected ConfigurationException");
+    } catch (ConfigurationException ex) {
+    }
+  }
+
+  @Test
+  public void testValidPrivileges() throws Exception {
+    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->cluster=c1->action=read"));
+    } catch (ConfigurationException ex) {
+      Assert.fail("Not expected ConfigurationException");
+    }
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->topic=t1->action=read"));
+    } catch (ConfigurationException ex) {
+      Assert.fail("Not expected ConfigurationException");
+    }
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->consumergroup=g1->action=read"));
+    } catch (ConfigurationException ex) {
+      Assert.fail("Not expected ConfigurationException");
+    }
+  }
+
+  @Test
+  public void testInvalidHostResource() throws Exception {
+    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("hhost=host1->cluster=c1->action=read"));
+      Assert.fail("Expected ConfigurationException");
+    } catch (ConfigurationException ex) {
+    }
+  }
+
+  @Test
+  public void testInvalidClusterResource() throws Exception {
+    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->clluster=c1->action=read"));
+      Assert.fail("Expected ConfigurationException");
+    } catch (ConfigurationException ex) {
+    }
+  }
+
+  @Test
+  public void testInvalidTopicResource() throws Exception {
+    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->ttopic=t1->action=read"));
+      Assert.fail("Expected ConfigurationException");
+    } catch (ConfigurationException ex) {
+    }
+  }
+
+  @Test
+  public void testInvalidConsumerGroupResource() throws Exception {
+    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->coonsumergroup=g1->action=read"));
+      Assert.fail("Expected ConfigurationException");
+    } catch (ConfigurationException ex) {
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java
new file mode 100644
index 0000000..720c98f
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertTrue;
+import static org.apache.sentry.provider.common.ProviderConstants.AUTHORIZABLE_JOINER;
+import static org.apache.sentry.provider.common.ProviderConstants.KV_JOINER;
+import static org.apache.sentry.provider.common.ProviderConstants.KV_SEPARATOR;
+
+import org.apache.sentry.core.model.kafka.KafkaActionConstant;
+import org.apache.sentry.policy.common.Privilege;
+import org.apache.sentry.policy.kafka.KafkaWildcardPrivilege;
+import org.apache.sentry.provider.common.KeyValue;
+import org.junit.Test;
+
+public class TestKafkaWildcardPrivilege {
+  private static final Privilege KAFKA_HOST1_ALL =
+      create(new KeyValue("HOST", "host1"), new KeyValue("action", KafkaActionConstant.ALL));
+  private static final Privilege KAFKA_HOST1_READ =
+      create(new KeyValue("HOST", "host1"), new KeyValue("action", KafkaActionConstant.READ));
+  private static final Privilege KAFKA_HOST1_WRITE =
+      create(new KeyValue("HOST", "host1"), new KeyValue("action", KafkaActionConstant.WRITE));
+
+  private static final Privilege KAFKA_HOST1_TOPIC1_ALL =
+      create(new KeyValue("HOST", "host1"), new KeyValue("TOPIC", "topic1"), new KeyValue("action", KafkaActionConstant.ALL));
+  private static final Privilege KAFKA_HOST1_TOPIC1_READ =
+      create(new KeyValue("HOST", "host1"), new KeyValue("TOPIC", "topic1"), new KeyValue("action", KafkaActionConstant.READ));
+  private static final Privilege KAFKA_HOST1_TOPIC1_WRITE =
+      create(new KeyValue("HOST", "host1"), new KeyValue("TOPIC", "topic1"), new KeyValue("action", KafkaActionConstant.WRITE));
+
+  private static final Privilege KAFKA_HOST1_CLUSTER1_ALL =
+      create(new KeyValue("HOST", "host1"), new KeyValue("CLUSTER", "cluster1"), new KeyValue("action", KafkaActionConstant.ALL));
+  private static final Privilege KAFKA_HOST1_CLUSTER1_READ =
+      create(new KeyValue("HOST", "host1"), new KeyValue("CLUSTER", "cluster1"), new KeyValue("action", KafkaActionConstant.READ));
+  private static final Privilege KAFKA_HOST1_CLUSTER1_WRITE =
+      create(new KeyValue("HOST", "host1"), new KeyValue("CLUSTER", "cluster1"), new KeyValue("action", KafkaActionConstant.WRITE));
+
+  private static final Privilege KAFKA_HOST1_GROUP1_ALL =
+      create(new KeyValue("HOST", "host1"), new KeyValue("GROUP", "cgroup1"), new KeyValue("action", KafkaActionConstant.ALL));
+  private static final Privilege KAFKA_HOST1_GROUP1_READ =
+      create(new KeyValue("HOST", "host1"), new KeyValue("GROUP", "cgroup1"), new KeyValue("action", KafkaActionConstant.READ));
+  private static final Privilege KAFKA_HOST1_GROUP1_WRITE =
+      create(new KeyValue("HOST", "host1"), new KeyValue("GROUP", "cgroup1"), new KeyValue("action", KafkaActionConstant.WRITE));
+
+
+  @Test
+  public void testSimpleAction() throws Exception {
+    //host
+    assertFalse(KAFKA_HOST1_WRITE.implies(KAFKA_HOST1_READ));
+    assertFalse(KAFKA_HOST1_READ.implies(KAFKA_HOST1_WRITE));
+    //consumer group
+    assertFalse(KAFKA_HOST1_GROUP1_WRITE.implies(KAFKA_HOST1_GROUP1_READ));
+    assertFalse(KAFKA_HOST1_GROUP1_READ.implies(KAFKA_HOST1_GROUP1_WRITE));
+    //topic
+    assertFalse(KAFKA_HOST1_TOPIC1_READ.implies(KAFKA_HOST1_TOPIC1_WRITE));
+    assertFalse(KAFKA_HOST1_TOPIC1_WRITE.implies(KAFKA_HOST1_TOPIC1_READ));
+    //cluster
+    assertFalse(KAFKA_HOST1_CLUSTER1_READ.implies(KAFKA_HOST1_CLUSTER1_WRITE));
+    assertFalse(KAFKA_HOST1_CLUSTER1_WRITE.implies(KAFKA_HOST1_CLUSTER1_READ));
+  }
+
+  @Test
+  public void testShorterThanRequest() throws Exception {
+    //topic
+    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_TOPIC1_ALL));
+    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_TOPIC1_READ));
+    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_TOPIC1_WRITE));
+
+    assertFalse(KAFKA_HOST1_WRITE.implies(KAFKA_HOST1_READ));
+    assertTrue(KAFKA_HOST1_READ.implies(KAFKA_HOST1_TOPIC1_READ));
+    assertTrue(KAFKA_HOST1_WRITE.implies(KAFKA_HOST1_TOPIC1_WRITE));
+
+    //cluster
+    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_CLUSTER1_ALL));
+    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_CLUSTER1_READ));
+    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_CLUSTER1_WRITE));
+
+    assertTrue(KAFKA_HOST1_READ.implies(KAFKA_HOST1_CLUSTER1_READ));
+    assertTrue(KAFKA_HOST1_WRITE.implies(KAFKA_HOST1_CLUSTER1_WRITE));
+
+    //consumer group
+    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_GROUP1_ALL));
+    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_GROUP1_READ));
+    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_GROUP1_WRITE));
+
+    assertTrue(KAFKA_HOST1_READ.implies(KAFKA_HOST1_GROUP1_READ));
+    assertTrue(KAFKA_HOST1_WRITE.implies(KAFKA_HOST1_GROUP1_WRITE));
+  }
+
+  @Test
+  public void testActionAll() throws Exception {
+    //host
+    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_READ));
+    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_WRITE));
+
+    //topic
+    assertTrue(KAFKA_HOST1_TOPIC1_ALL.implies(KAFKA_HOST1_TOPIC1_READ));
+    assertTrue(KAFKA_HOST1_TOPIC1_ALL.implies(KAFKA_HOST1_TOPIC1_WRITE));
+
+    //cluster
+    assertTrue(KAFKA_HOST1_CLUSTER1_ALL.implies(KAFKA_HOST1_CLUSTER1_READ));
+    assertTrue(KAFKA_HOST1_CLUSTER1_ALL.implies(KAFKA_HOST1_CLUSTER1_WRITE));
+
+    //consumer group
+    assertTrue(KAFKA_HOST1_GROUP1_ALL.implies(KAFKA_HOST1_GROUP1_READ));
+    assertTrue(KAFKA_HOST1_GROUP1_ALL.implies(KAFKA_HOST1_GROUP1_WRITE));
+  }
+
+  @Test
+  public void testUnexpected() throws Exception {
+    Privilege p = new Privilege() {
+      @Override
+      public boolean implies(Privilege p) {
+        return false;
+      }
+    };
+    Privilege topic1 = create(new KeyValue("HOST", "host"), new KeyValue("TOPIC", "topic1"));
+    assertFalse(topic1.implies(null));
+    assertFalse(topic1.implies(p));
+    assertFalse(topic1.equals(null));
+    assertFalse(topic1.equals(p));
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testNullString() throws Exception {
+    System.out.println(create((String)null));
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testEmptyString() throws Exception {
+    System.out.println(create(""));
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testEmptyKey() throws Exception {
+    System.out.println(create(KV_JOINER.join("", "host1")));
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testEmptyValue() throws Exception {
+    System.out.println(create(KV_JOINER.join("HOST", "")));
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testEmptyPart() throws Exception {
+    System.out.println(create(AUTHORIZABLE_JOINER.
+        join(KV_JOINER.join("HOST", "host1"), "")));
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testOnlySeperators() throws Exception {
+    System.out.println(create(AUTHORIZABLE_JOINER.
+        join(KV_SEPARATOR, KV_SEPARATOR, KV_SEPARATOR)));
+  }
+
+  static KafkaWildcardPrivilege create(KeyValue... keyValues) {
+    return create(AUTHORIZABLE_JOINER.join(keyValues));
+
+  }
+  static KafkaWildcardPrivilege create(String s) {
+    return new KafkaWildcardPrivilege(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java
new file mode 100644
index 0000000..4da506b
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.sentry.policy.kafka.engine;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.TreeSet;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.policy.common.PolicyEngine;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+public abstract class AbstractTestKafkaPolicyEngine {
+
+  private static final String ADMIN = "host=*";
+  private static final String ADMIN_HOST1 = "host=host1";
+  private static final String CONSUMER_T1_ALL = "host=*->topic=t1->action=read";
+  private static final String CONSUMER_T1_HOST1 = "host=host1->topic=t1->action=read";
+  private static final String CONSUMER_T2_HOST2 = "host=host2->topic=t2->action=read";
+  private static final String PRODUCER_T1_ALL = "host=*->topic=t1->action=write";
+  private static final String PRODUCER_T1_HOST1 = "host=host1->topic=t1->action=write";
+  private static final String PRODUCER_T2_HOST2 = "host=host2->topic=t2->action=write";
+  private static final String CONSUMER_PRODUCER_T1 = "host=host1->topic=t1->action=all";
+
+  private PolicyEngine policy;
+  private static File baseDir;
+
+  @BeforeClass
+  public static void setupClazz() throws IOException {
+    baseDir = Files.createTempDir();
+  }
+
+  @AfterClass
+  public static void teardownClazz() throws IOException {
+    if (baseDir != null) {
+      FileUtils.deleteQuietly(baseDir);
+    }
+  }
+
+  protected void setPolicy(PolicyEngine policy) {
+    this.policy = policy;
+  }
+
+  protected static File getBaseDir() {
+    return baseDir;
+  }
+
+  @Before
+  public void setup() throws IOException {
+    afterSetup();
+  }
+
+  @After
+  public void teardown() throws IOException {
+    beforeTeardown();
+  }
+
+  protected void afterSetup() throws IOException {}
+
+  protected void beforeTeardown() throws IOException {}
+
+
+  @Test
+  public void testConsumer0() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T1_ALL));
+    Assert.assertEquals(expected.toString(),
+        new TreeSet<String>(policy.getPrivileges(set("consumer_group0"), ActiveRoleSet.ALL))
+            .toString());
+  }
+
+  @Test
+  public void testConsumer1() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T1_HOST1));
+    Assert.assertEquals(expected.toString(),
+        new TreeSet<String>(policy.getPrivileges(set("consumer_group1"), ActiveRoleSet.ALL))
+            .toString());
+  }
+
+  @Test
+  public void testConsumer2() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T2_HOST2));
+    Assert.assertEquals(expected.toString(),
+        new TreeSet<String>(policy.getPrivileges(set("consumer_group2"), ActiveRoleSet.ALL))
+            .toString());
+  }
+
+  @Test
+  public void testProducer0() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T1_ALL));
+    Assert.assertEquals(expected.toString(),
+        new TreeSet<String>(policy.getPrivileges(set("producer_group0"), ActiveRoleSet.ALL))
+            .toString());
+  }
+
+  @Test
+  public void testProducer1() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T1_HOST1));
+    Assert.assertEquals(expected.toString(),
+        new TreeSet<String>(policy.getPrivileges(set("producer_group1"), ActiveRoleSet.ALL))
+            .toString());
+  }
+
+
+  @Test
+  public void testProducer2() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T2_HOST2));
+    Assert.assertEquals(expected.toString(),
+        new TreeSet<String>(policy.getPrivileges(set("producer_group2"), ActiveRoleSet.ALL))
+            .toString());
+  }
+
+  @Test
+  public void testConsumerProducer0() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_PRODUCER_T1));
+    Assert.assertEquals(expected.toString(),
+        new TreeSet<String>(policy.getPrivileges(set("consumer_producer_group0"), ActiveRoleSet.ALL))
+            .toString());
+  }
+
+  @Test
+  public void testSubAdmin() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(ADMIN_HOST1));
+    Assert.assertEquals(expected.toString(),
+        new TreeSet<String>(policy.getPrivileges(set("subadmin_group"), ActiveRoleSet.ALL))
+            .toString());
+  }
+
+  @Test
+  public void testAdmin() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(ADMIN));
+    Assert
+        .assertEquals(expected.toString(),
+            new TreeSet<String>(policy.getPrivileges(set("admin_group"), ActiveRoleSet.ALL))
+                .toString());
+  }
+
+  private static Set<String> set(String... values) {
+    return Sets.newHashSet(values);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineDFS.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineDFS.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineDFS.java
new file mode 100644
index 0000000..f2bd3c8
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineDFS.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka.engine;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.sentry.policy.kafka.KafkaPolicyFileProviderBackend;
+import org.apache.sentry.provider.file.PolicyFiles;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestKafkaPolicyEngineDFS extends AbstractTestKafkaPolicyEngine {
+  private static MiniDFSCluster dfsCluster;
+  private static FileSystem fileSystem;
+  private static Path root;
+  private static Path etc;
+
+  @BeforeClass
+  public static void setupLocalClazz() throws IOException {
+    File baseDir = getBaseDir();
+    Assert.assertNotNull(baseDir);
+    File dfsDir = new File(baseDir, "dfs");
+    Assert.assertTrue(dfsDir.isDirectory() || dfsDir.mkdirs());
+    Configuration conf = new Configuration();
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dfsDir.getPath());
+    dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    fileSystem = dfsCluster.getFileSystem();
+    root = new Path(fileSystem.getUri().toString());
+    etc = new Path(root, "/etc");
+    fileSystem.mkdirs(etc);
+  }
+
+  @AfterClass
+  public static void teardownLocalClazz() {
+    if(dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+  }
+
+  @Override
+  protected void afterSetup() throws IOException {
+    fileSystem.delete(etc, true);
+    fileSystem.mkdirs(etc);
+    PolicyFiles.copyToDir(fileSystem, etc, "test-authz-provider.ini");
+    setPolicy(new KafkaPolicyFileProviderBackend(new Path(etc,
+        "test-authz-provider.ini").toString()));
+  }
+
+  @Override
+  protected void beforeTeardown() throws IOException {
+    fileSystem.delete(etc, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineLocalFS.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineLocalFS.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineLocalFS.java
new file mode 100644
index 0000000..4bc061d
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineLocalFS.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka.engine;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.sentry.policy.kafka.KafkaPolicyFileProviderBackend;
+import org.apache.sentry.provider.file.PolicyFiles;
+
+public class TestKafkaPolicyEngineLocalFS extends AbstractTestKafkaPolicyEngine {
+
+  @Override
+  protected void  afterSetup() throws IOException {
+    File baseDir = getBaseDir();
+    Assert.assertNotNull(baseDir);
+    Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
+    PolicyFiles.copyToDir(baseDir, "test-authz-provider.ini");
+    setPolicy(new KafkaPolicyFileProviderBackend(new File(baseDir, "test-authz-provider.ini").getPath()));
+  }
+
+  @Override
+  protected void beforeTeardown() throws IOException {
+    File baseDir = getBaseDir();
+    Assert.assertNotNull(baseDir);
+    FileUtils.deleteQuietly(baseDir);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java
new file mode 100644
index 0000000..bcc1198
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka.provider;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.sentry.core.common.Action;
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.Subject;
+import org.apache.sentry.core.model.kafka.Cluster;
+import org.apache.sentry.core.model.kafka.ConsumerGroup;
+import org.apache.sentry.core.model.kafka.KafkaActionConstant;
+import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction;
+import org.apache.sentry.core.model.kafka.Host;
+import org.apache.sentry.core.model.kafka.Topic;
+import org.apache.sentry.policy.kafka.KafkaPolicyFileProviderBackend;
+import org.apache.sentry.policy.kafka.MockGroupMappingServiceProvider;
+import org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider;
+import org.apache.sentry.provider.common.ResourceAuthorizationProvider;
+import org.apache.sentry.provider.file.PolicyFiles;
+import org.junit.After;
+import org.junit.Test;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+public class TestKafkaAuthorizationProviderGeneralCases {
+  private static final Multimap<String, String> USER_TO_GROUP_MAP = HashMultimap.create();
+
+  private static final Host HOST_1 = new Host("host1");
+  private static final Host HOST_2 = new Host("host2");
+  private static final Cluster cluster1 = new Cluster("kafka-cluster");
+  private static final Topic topic1 = new Topic("t1");
+  private static final Topic topic2 = new Topic("t2");
+  private static final ConsumerGroup cgroup1 = new ConsumerGroup("cg1");
+  private static final ConsumerGroup cgroup2 = new ConsumerGroup("cg2");
+
+  private static final KafkaAction ALL = new KafkaAction(KafkaActionConstant.ALL);
+  private static final KafkaAction READ = new KafkaAction(KafkaActionConstant.READ);
+  private static final KafkaAction WRITE = new KafkaAction(KafkaActionConstant.WRITE);
+  private static final KafkaAction CREATE = new KafkaAction(KafkaActionConstant.CREATE);
+  private static final KafkaAction DELETE = new KafkaAction(KafkaActionConstant.DELETE);
+  private static final KafkaAction ALTER = new KafkaAction(KafkaActionConstant.ALTER);
+  private static final KafkaAction DESCRIBE = new KafkaAction(KafkaActionConstant.DESCRIBE);
+  private static final KafkaAction CLUSTER_ACTION = new KafkaAction(
+      KafkaActionConstant.CLUSTER_ACTION);
+
+  private static final Set<KafkaAction> allActions = Sets.newHashSet(ALL, READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, CLUSTER_ACTION);
+
+  private static final Subject ADMIN = new Subject("admin1");
+  private static final Subject SUB_ADMIN = new Subject("subadmin1");
+  private static final Subject CONSUMER0 = new Subject("consumer0");
+  private static final Subject CONSUMER1 = new Subject("consumer1");
+  private static final Subject CONSUMER2 = new Subject("consumer2");
+  private static final Subject PRODUCER0 = new Subject("producer0");
+  private static final Subject PRODUCER1 = new Subject("producer1");
+  private static final Subject PRODUCER2 = new Subject("producer2");
+  private static final Subject CONSUMER_PRODUCER0 = new Subject("consumer_producer0");
+
+  private static final String ADMIN_GROUP  = "admin_group";
+  private static final String SUBADMIN_GROUP  = "subadmin_group";
+  private static final String CONSUMER_GROUP0 = "consumer_group0";
+  private static final String CONSUMER_GROUP1 = "consumer_group1";
+  private static final String CONSUMER_GROUP2 = "consumer_group2";
+  private static final String PRODUCER_GROUP0 = "producer_group0";
+  private static final String PRODUCER_GROUP1 = "producer_group1";
+  private static final String PRODUCER_GROUP2 = "producer_group2";
+  private static final String CONSUMER_PRODUCER_GROUP0 = "consumer_producer_group0";
+
+  static {
+    USER_TO_GROUP_MAP.putAll(ADMIN.getName(), Arrays.asList(ADMIN_GROUP));
+    USER_TO_GROUP_MAP.putAll(SUB_ADMIN.getName(),  Arrays.asList(SUBADMIN_GROUP ));
+    USER_TO_GROUP_MAP.putAll(CONSUMER0.getName(),  Arrays.asList(CONSUMER_GROUP0));
+    USER_TO_GROUP_MAP.putAll(CONSUMER1.getName(),  Arrays.asList(CONSUMER_GROUP1));
+    USER_TO_GROUP_MAP.putAll(CONSUMER2.getName(),  Arrays.asList(CONSUMER_GROUP2));
+    USER_TO_GROUP_MAP.putAll(PRODUCER0.getName(),  Arrays.asList(PRODUCER_GROUP0));
+    USER_TO_GROUP_MAP.putAll(PRODUCER1.getName(),  Arrays.asList(PRODUCER_GROUP1));
+    USER_TO_GROUP_MAP.putAll(PRODUCER2.getName(),  Arrays.asList(PRODUCER_GROUP2));
+    USER_TO_GROUP_MAP.putAll(CONSUMER_PRODUCER0.getName(),  Arrays.asList(CONSUMER_PRODUCER_GROUP0));
+  }
+
+  private final ResourceAuthorizationProvider authzProvider;
+  private File baseDir;
+
+  public TestKafkaAuthorizationProviderGeneralCases() throws IOException {
+    baseDir = Files.createTempDir();
+    PolicyFiles.copyToDir(baseDir, "test-authz-provider.ini");
+    authzProvider = new HadoopGroupResourceAuthorizationProvider(
+        new KafkaPolicyFileProviderBackend(new File(baseDir, "test-authz-provider.ini").getPath()),
+        new MockGroupMappingServiceProvider(USER_TO_GROUP_MAP));
+  }
+
+  @After
+  public void teardown() {
+    if(baseDir != null) {
+      FileUtils.deleteQuietly(baseDir);
+    }
+  }
+
+  private void doTestResourceAuthorizationProvider(Subject subject, List<? extends Authorizable> authorizableHierarchy,
+      Set<? extends Action> actions, boolean expected) throws Exception {
+    Objects.ToStringHelper helper = Objects.toStringHelper("TestParameters");
+    helper.add("Subject", subject).add("authzHierarchy", authorizableHierarchy).add("action", actions);
+    Assert.assertEquals(helper.toString(), expected,
+        authzProvider.hasAccess(subject, authorizableHierarchy, actions, ActiveRoleSet.ALL));
+  }
+
+  @Test
+  public void testAdmin() throws Exception {
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cluster1), allActions, true);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,topic1), allActions, true);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,topic2), allActions, true);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cgroup1), allActions, true);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cgroup2), allActions, true);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1), allActions, true);
+
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cluster1), allActions, false);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,topic1), allActions, false);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,topic2), allActions, false);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cgroup1), allActions, false);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cgroup2), allActions, false);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2), allActions, false);
+
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cluster1), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,topic1), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,topic2), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cgroup1), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cgroup2), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1), allActions, true);
+
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cluster1), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,topic1), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,topic2), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cgroup1), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cgroup2), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2), allActions, true);
+  }
+
+  @Test
+  public void testConsumer() throws Exception {
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2))
+        doTestResourceAuthorizationProvider(CONSUMER0, Arrays.asList(host, topic1),
+            Sets.newHashSet(action), READ.equals(action));
+    }
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2))
+        doTestResourceAuthorizationProvider(CONSUMER1, Arrays.asList(host, topic1),
+            Sets.newHashSet(action), HOST_1.equals(host) && READ.equals(action));
+    }
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2))
+        doTestResourceAuthorizationProvider(CONSUMER2, Arrays.asList(host, topic2),
+            Sets.newHashSet(action), HOST_2.equals(host) && READ.equals(action));
+    }
+  }
+
+  @Test
+  public void testProducer() throws Exception {
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2))
+        doTestResourceAuthorizationProvider(PRODUCER0, Arrays.asList(host, topic1),
+            Sets.newHashSet(action), WRITE.equals(action));
+    }
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2))
+        doTestResourceAuthorizationProvider(PRODUCER1, Arrays.asList(host, topic1),
+            Sets.newHashSet(action), HOST_1.equals(host) && WRITE.equals(action));
+    }
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2))
+        doTestResourceAuthorizationProvider(PRODUCER2, Arrays.asList(host, topic2),
+            Sets.newHashSet(action), HOST_2.equals(host) && WRITE.equals(action));
+    }
+  }
+
+  @Test
+  public void testConsumerProducer() throws Exception {
+    for (KafkaAction action : allActions) {
+      doTestResourceAuthorizationProvider(CONSUMER_PRODUCER0, Arrays.asList(HOST_1, topic1),
+          Sets.newHashSet(action), true);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderSpecialCases.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderSpecialCases.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderSpecialCases.java
new file mode 100644
index 0000000..0a453ce
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderSpecialCases.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka.provider;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.sentry.core.common.Action;
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.Subject;
+import org.apache.sentry.core.model.kafka.KafkaActionConstant;
+import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction;
+import org.apache.sentry.core.model.kafka.Host;
+import org.apache.sentry.core.model.kafka.Topic;
+import org.apache.sentry.policy.kafka.KafkaPolicyFileProviderBackend;
+import org.apache.sentry.provider.common.AuthorizationProvider;
+import org.apache.sentry.provider.file.LocalGroupResourceAuthorizationProvider;
+import org.apache.sentry.provider.file.PolicyFile;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+public class TestKafkaAuthorizationProviderSpecialCases {
+  private AuthorizationProvider authzProvider;
+  private PolicyFile policyFile;
+  private File baseDir;
+  private File iniFile;
+  private String initResource;
+  @Before
+  public void setup() throws IOException {
+    baseDir = Files.createTempDir();
+    iniFile = new File(baseDir, "policy.ini");
+    initResource = "file://" + iniFile.getPath();
+    policyFile = new PolicyFile();
+  }
+
+  @After
+  public void teardown() throws IOException {
+    if(baseDir != null) {
+      FileUtils.deleteQuietly(baseDir);
+    }
+  }
+
+  @Test
+  public void testDuplicateEntries() throws Exception {
+    Subject user1 = new Subject("user1");
+    Host host1 = new Host("host1");
+    Topic topic1 = new Topic("t1");
+    Set<? extends Action> actions = Sets.newHashSet(new KafkaAction(KafkaActionConstant.READ));
+    policyFile.addGroupsToUser(user1.getName(), true, "group1", "group1")
+      .addRolesToGroup("group1",  true, "role1", "role1")
+      .addPermissionsToRole("role1", true, "host=host1->topic=t1->action=read",
+          "host=host1->topic=t1->action=read");
+    policyFile.write(iniFile);
+    KafkaPolicyFileProviderBackend policy = new KafkaPolicyFileProviderBackend(initResource);
+    authzProvider = new LocalGroupResourceAuthorizationProvider(initResource, policy);
+    List<? extends Authorizable> authorizableHierarchy = ImmutableList.of(host1, topic1);
+    Assert.assertTrue(authorizableHierarchy.toString(),
+        authzProvider.hasAccess(user1, authorizableHierarchy, actions, ActiveRoleSet.ALL));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java
new file mode 100644
index 0000000..0186cc9
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.policy.kafka.provider;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.policy.common.PolicyEngine;
+import org.apache.sentry.policy.kafka.KafkaPolicyFileProviderBackend;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+public class TestKafkaPolicyNegative {
+  private File baseDir;
+  private File globalPolicyFile;
+
+  @Before
+  public void setup() {
+    baseDir = Files.createTempDir();
+    globalPolicyFile = new File(baseDir, "global.ini");
+  }
+
+  @After
+  public void teardown() {
+    if(baseDir != null) {
+      FileUtils.deleteQuietly(baseDir);
+    }
+  }
+
+  private void append(String from, File to) throws IOException {
+    Files.append(from + "\n", to, Charsets.UTF_8);
+  }
+
+  @Test
+  public void testauthorizedKafkaInPolicyFile() throws Exception {
+    append("[groups]", globalPolicyFile);
+    append("other_group = other_role", globalPolicyFile);
+    append("[roles]", globalPolicyFile);
+    append("other_role = host=host1->topic=t1->action=read, host=host1->consumergroup=l1->action=read", globalPolicyFile);
+    PolicyEngine policy = new KafkaPolicyFileProviderBackend(globalPolicyFile.getPath());
+    //malicious_group has no privilege
+    ImmutableSet<String> permissions = policy.getAllPrivileges(Sets.newHashSet("malicious_group"), ActiveRoleSet.ALL);
+    Assert.assertTrue(permissions.toString(), permissions.isEmpty());
+    //other_group has two privileges
+    permissions = policy.getAllPrivileges(Sets.newHashSet("other_group"), ActiveRoleSet.ALL);
+    Assert.assertTrue(permissions.toString(), permissions.size() == 2);
+  }
+
+  @Test
+  public void testNoHostNameConfig() throws Exception {
+    append("[groups]", globalPolicyFile);
+    append("other_group = malicious_role", globalPolicyFile);
+    append("[roles]", globalPolicyFile);
+    append("malicious_role = topic=t1->action=read", globalPolicyFile);
+    PolicyEngine policy = new KafkaPolicyFileProviderBackend(globalPolicyFile.getPath());
+    ImmutableSet<String> permissions = policy.getAllPrivileges(Sets.newHashSet("other_group"), ActiveRoleSet.ALL);
+    Assert.assertTrue(permissions.toString(), permissions.isEmpty());
+  }
+
+  @Test
+  public void testHostAllName() throws Exception {
+    append("[groups]", globalPolicyFile);
+    append("group = malicious_role", globalPolicyFile);
+    append("[roles]", globalPolicyFile);
+    append("malicious_role = host=*", globalPolicyFile);
+    PolicyEngine policy = new KafkaPolicyFileProviderBackend(globalPolicyFile.getPath());
+    ImmutableSet<String> permissions = policy.getAllPrivileges(Sets.newHashSet("group"), ActiveRoleSet.ALL);
+    Assert.assertTrue(permissions.toString(), permissions.size() == 1);
+  }
+
+  @Test
+  public void testAll() throws Exception {
+    append("[groups]", globalPolicyFile);
+    append("group = malicious_role", globalPolicyFile);
+    append("[roles]", globalPolicyFile);
+    append("malicious_role = *", globalPolicyFile);
+    PolicyEngine policy = new KafkaPolicyFileProviderBackend(globalPolicyFile.getPath());
+    ImmutableSet<String> permissions = policy.getAllPrivileges(Sets.newHashSet("group"), ActiveRoleSet.ALL);
+    Assert.assertTrue(permissions.toString(), permissions.isEmpty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/resources/log4j.properties b/sentry-policy/sentry-policy-kafka/src/test/resources/log4j.properties
new file mode 100644
index 0000000..7703069
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Define some default values that can be overridden by system properties.
+#
+# For testing, it may also be convenient to specify
+
+log4j.rootLogger=DEBUG,console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n
+
+log4j.logger.org.apache.hadoop.conf.Configuration=INFO
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2575adde/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini b/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini
new file mode 100644
index 0000000..c533e69
--- /dev/null
+++ b/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[groups]
+admin_group = admin_all
+subadmin_group = admin_host1
+consumer_group0 = consumer_t1_all
+consumer_group1 = consumer_t1_host1
+consumer_group2 = consumer_t2_host2
+producer_group0 = producer_t1_all
+producer_group1 = producer_t1_host1
+producer_group2 = producer_t2_host2
+consumer_producer_group0 = consumer_producer_t1
+
+[roles]
+admin_all = host=*
+admin_host1 = host=host1
+consumer_t1_all = host=*->topic=t1->action=read
+consumer_t1_host1 = host=host1->topic=t1->action=read
+consumer_t2_host2 = host=host2->topic=t2->action=read
+producer_t1_all = host=*->topic=t1->action=write
+producer_t1_host1 = host=host1->topic=t1->action=write
+producer_t2_host2 = host=host2->topic=t2->action=write
+consumer_producer_t1 = host=host1->topic=t1->action=all