You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by sd...@apache.org on 2016/01/26 02:22:26 UTC

incubator-sentry git commit: SENTRY-1012: Add core model for Kafka (Ashish K Singh via Dapeng Sun)

Repository: incubator-sentry
Updated Branches:
  refs/heads/kafka ef9b73884 -> d114d2d5b


SENTRY-1012: Add core model for Kafka (Ashish K Singh via Dapeng Sun)


Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/d114d2d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/d114d2d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/d114d2d5

Branch: refs/heads/kafka
Commit: d114d2d5b96dbaf1a073a56cd3cce6d43f41fed7
Parents: ef9b738
Author: Sun Dapeng <sd...@apache.org>
Authored: Tue Jan 26 09:11:07 2016 +0800
Committer: Sun Dapeng <sd...@apache.org>
Committed: Tue Jan 26 09:11:32 2016 +0800

----------------------------------------------------------------------
 pom.xml                                         |   5 +
 sentry-core/pom.xml                             |   1 +
 sentry-core/sentry-core-model-kafka/pom.xml     |  43 +++++
 .../apache/sentry/core/model/kafka/Cluster.java |  48 +++++
 .../sentry/core/model/kafka/ConsumerGroup.java  |  46 +++++
 .../apache/sentry/core/model/kafka/Host.java    |  48 +++++
 .../core/model/kafka/KafkaActionConstant.java   |  32 ++++
 .../core/model/kafka/KafkaActionFactory.java    | 105 +++++++++++
 .../core/model/kafka/KafkaAuthorizable.java     |  35 ++++
 .../apache/sentry/core/model/kafka/Topic.java   |  46 +++++
 .../core/model/kafka/TestKafkaAction.java       | 180 +++++++++++++++++++
 .../core/model/kafka/TestKafkaAuthorizable.java |  61 +++++++
 12 files changed, 650 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d114d2d5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0475f69..aa4a021 100644
--- a/pom.xml
+++ b/pom.xml
@@ -338,6 +338,11 @@ limitations under the License.
         <version>${project.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.sentry</groupId>
+        <artifactId>sentry-core-model-kafka</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.hive</groupId>
         <artifactId>hive-jdbc</artifactId>
         <version>${hive.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d114d2d5/sentry-core/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-core/pom.xml b/sentry-core/pom.xml
index 59d32c4..06d92de 100644
--- a/sentry-core/pom.xml
+++ b/sentry-core/pom.xml
@@ -34,6 +34,7 @@ limitations under the License.
     <module>sentry-core-model-indexer</module>
     <module>sentry-core-model-search</module>
     <module>sentry-core-model-sqoop</module>
+    <module>sentry-core-model-kafka</module>
   </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d114d2d5/sentry-core/sentry-core-model-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/pom.xml b/sentry-core/sentry-core-model-kafka/pom.xml
new file mode 100644
index 0000000..cadd4ac
--- /dev/null
+++ b/sentry-core/sentry-core-model-kafka/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.sentry</groupId>
+    <artifactId>sentry-core</artifactId>
+    <version>1.7.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>sentry-core-model-kafka</artifactId>
+  <name>Sentry Core Model Kafka</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-core-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d114d2d5/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Cluster.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Cluster.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Cluster.java
new file mode 100644
index 0000000..b1fc063
--- /dev/null
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Cluster.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.model.kafka;
+
+/**
+ * Represents the Cluster authorizable in the Kafka model
+ */
+public class Cluster implements KafkaAuthorizable {
+  /**
+   * Represents all clusters
+   */
+  public static final Cluster ALL = new Cluster(KafkaAuthorizable.ALL);
+
+  private String name;
+  public Cluster(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public AuthorizableType getAuthzType() {
+    return AuthorizableType.CLUSTER;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public String getTypeName() {
+    return getAuthzType().name();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d114d2d5/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/ConsumerGroup.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/ConsumerGroup.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/ConsumerGroup.java
new file mode 100644
index 0000000..9525aaf
--- /dev/null
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/ConsumerGroup.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.model.kafka;
+/**
+ * Represents the ConsumerGroup authorizable in the Kafka model
+ */
+public class ConsumerGroup implements KafkaAuthorizable {
+  /**
+   * Represents all consumer groups
+   */
+  public static ConsumerGroup ALL = new ConsumerGroup(KafkaAuthorizable.ALL);
+
+  private String name;
+  public ConsumerGroup(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public AuthorizableType getAuthzType() {
+    return AuthorizableType.CONSUMERGROUP;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public String getTypeName() {
+    return getAuthzType().name();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d114d2d5/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Host.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Host.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Host.java
new file mode 100644
index 0000000..e0f4160
--- /dev/null
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Host.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.model.kafka;
+
+/**
+ * Represents the Host authorizable in the Kafka model
+ */
+public class Host implements KafkaAuthorizable {
+  /**
+   * Represents all hosts
+   */
+  public static Host ALL = new Host(KafkaAuthorizable.ALL);
+
+  private String name;
+  public Host(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public AuthorizableType getAuthzType() {
+    return AuthorizableType.HOST;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public String getTypeName() {
+    return getAuthzType().name();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d114d2d5/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java
new file mode 100644
index 0000000..13421f9
--- /dev/null
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.model.kafka;
+
+public class KafkaActionConstant {
+
+  public static final String ALL = "*";
+  public static final String ALL_NAME = "ALL";
+  public static final String READ = "read";
+  public static final String WRITE = "write";
+  public static final String CREATE = "create";
+  public static final String DELETE = "delete";
+  public static final String ALTER = "alter";
+  public static final String DESCRIBE = "describe";
+  public static final String CLUSTER_ACTION = "clusteraction";
+
+  public static final String actionName = "action";
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d114d2d5/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
new file mode 100644
index 0000000..2577406
--- /dev/null
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.sentry.core.model.kafka;
+
+import java.util.List;
+
+import org.apache.sentry.core.common.BitFieldAction;
+import org.apache.sentry.core.common.BitFieldActionFactory;
+
+import com.google.common.collect.Lists;
+
+public class KafkaActionFactory extends BitFieldActionFactory {
+
+  enum KafkaActionType {
+    READ(KafkaActionConstant.READ, 1),
+    WRITE(KafkaActionConstant.WRITE, 2),
+    CREATE(KafkaActionConstant.CREATE, 4),
+    DELETE(KafkaActionConstant.DELETE, 8),
+    ALTER(KafkaActionConstant.ALTER, 16),
+    DESCRIBE(KafkaActionConstant.DESCRIBE, 32),
+    ADMIN(KafkaActionConstant.CLUSTER_ACTION, 64),
+    ALL(KafkaActionConstant.ALL, READ.getCode() | WRITE.getCode() | CREATE.getCode()
+        | DELETE.getCode() | ALTER.getCode()| DESCRIBE.getCode() | ADMIN.getCode());
+
+    private String name;
+    private int code;
+
+    KafkaActionType(String name, int code) {
+      this.name = name;
+      this.code = code;
+    }
+
+    public int getCode() {
+      return code;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    static KafkaActionType getActionByName(String name) {
+      for (KafkaActionType action : KafkaActionType.values()) {
+        if (action.name.equalsIgnoreCase(name)) {
+          return action;
+        }
+      }
+      throw new RuntimeException("can't get ActionType by name:" + name);
+    }
+
+    static List<KafkaActionType> getActionByCode(int code) {
+      List<KafkaActionType> actions = Lists.newArrayList();
+      for (KafkaActionType action : KafkaActionType.values()) {
+        if (((action.code & code) == action.code) && (action != KafkaActionType.ALL)) {
+          // KafkaActionType.ALL action should not return in the list
+          actions.add(action);
+        }
+      }
+      if (actions.isEmpty()) {
+        throw new RuntimeException("can't get ActionType by code:" + code);
+      }
+      return actions;
+    }
+  }
+
+  public static class KafkaAction extends BitFieldAction {
+    public KafkaAction(String name) {
+      this(KafkaActionType.getActionByName(name));
+    }
+
+    public KafkaAction(KafkaActionType actionType) {
+      super(actionType.name, actionType.code);
+    }
+  }
+
+  @Override
+  public List<KafkaAction> getActionsByCode(int actionCode) {
+    List<KafkaAction> actions = Lists.newArrayList();
+    for (KafkaActionType action : KafkaActionType.getActionByCode(actionCode)) {
+      actions.add(new KafkaAction(action));
+    }
+    return actions;
+  }
+
+  @Override
+  public KafkaAction getActionByName(String name) {
+    // Check the name is All
+    if (KafkaActionConstant.ALL_NAME.equalsIgnoreCase(name)) {
+      return new KafkaAction(KafkaActionType.ALL);
+    }
+    return new KafkaAction(name);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d114d2d5/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java
new file mode 100644
index 0000000..0d2155e
--- /dev/null
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.model.kafka;
+
+import org.apache.sentry.core.common.Authorizable;
+
+/**
+ * This interface represents authorizable resource in the Kafka component.
+ * It used conjunction with the generic authorization model(SENTRY-398).
+ */
+public interface KafkaAuthorizable extends Authorizable {
+  public static final String ALL = "*"; // NOPMD - TODO(sdp) Remove before merge
+  public enum AuthorizableType {
+    CLUSTER,
+    HOST,
+    TOPIC,
+    CONSUMERGROUP
+  };
+
+  public AuthorizableType getAuthzType(); // NOPMD - TODO(sdp) Remove before merge
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d114d2d5/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Topic.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Topic.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Topic.java
new file mode 100644
index 0000000..9e288b0
--- /dev/null
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Topic.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.model.kafka;
+/**
+ * Represents the Topic authorizable in the Kafka model
+ */
+public class Topic implements KafkaAuthorizable {
+  /**
+   * Represents all topics
+   */
+  public static Topic ALL = new Topic(KafkaAuthorizable.ALL);
+
+  private String name;
+  public Topic(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public AuthorizableType getAuthzType() {
+    return AuthorizableType.TOPIC;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public String getTypeName() {
+    return getAuthzType().name();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d114d2d5/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java
new file mode 100644
index 0000000..f22ebc0
--- /dev/null
+++ b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.sentry.core.model.kafka;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertTrue;
+
+import org.apache.sentry.core.model.kafka.KafkaActionConstant;
+import org.apache.sentry.core.model.kafka.KafkaActionFactory;
+import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestKafkaAction {
+  private KafkaActionFactory factory = new KafkaActionFactory();
+
+  @Test
+  public void testImpliesAction() {
+    KafkaAction readAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.READ);
+    KafkaAction writeAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.WRITE);
+    KafkaAction createAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CREATE);
+    KafkaAction deleteAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.DELETE);
+    KafkaAction alterAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALTER);
+    KafkaAction describeAction =
+        (KafkaAction) factory.getActionByName(KafkaActionConstant.DESCRIBE);
+    KafkaAction adminAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION);
+    KafkaAction allAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALL);
+    KafkaAction allNameAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALL_NAME);
+
+    assertTrue(allAction.implies(readAction));
+    assertTrue(allAction.implies(writeAction));
+    assertTrue(allAction.implies(createAction));
+    assertTrue(allAction.implies(deleteAction));
+    assertTrue(allAction.implies(alterAction));
+    assertTrue(allAction.implies(describeAction));
+    assertTrue(allAction.implies(adminAction));
+    assertTrue(allAction.implies(allAction));
+
+    assertTrue(readAction.implies(readAction));
+    assertFalse(readAction.implies(writeAction));
+    assertFalse(readAction.implies(createAction));
+    assertFalse(readAction.implies(deleteAction));
+    assertFalse(readAction.implies(alterAction));
+    assertFalse(readAction.implies(describeAction));
+    assertFalse(readAction.implies(adminAction));
+    assertFalse(readAction.implies(allAction));
+
+    assertFalse(writeAction.implies(readAction));
+    assertTrue(writeAction.implies(writeAction));
+    assertFalse(writeAction.implies(createAction));
+    assertFalse(writeAction.implies(deleteAction));
+    assertFalse(writeAction.implies(alterAction));
+    assertFalse(writeAction.implies(describeAction));
+    assertFalse(writeAction.implies(adminAction));
+    assertFalse(writeAction.implies(allAction));
+
+    assertFalse(createAction.implies(readAction));
+    assertFalse(createAction.implies(writeAction));
+    assertTrue(createAction.implies(createAction));
+    assertFalse(createAction.implies(deleteAction));
+    assertFalse(createAction.implies(alterAction));
+    assertFalse(createAction.implies(describeAction));
+    assertFalse(createAction.implies(adminAction));
+    assertFalse(createAction.implies(allAction));
+
+    assertFalse(deleteAction.implies(readAction));
+    assertFalse(deleteAction.implies(writeAction));
+    assertFalse(deleteAction.implies(createAction));
+    assertTrue(deleteAction.implies(deleteAction));
+    assertFalse(deleteAction.implies(alterAction));
+    assertFalse(deleteAction.implies(describeAction));
+    assertFalse(deleteAction.implies(adminAction));
+    assertFalse(deleteAction.implies(allAction));
+
+    assertFalse(alterAction.implies(readAction));
+    assertFalse(alterAction.implies(writeAction));
+    assertFalse(alterAction.implies(createAction));
+    assertFalse(alterAction.implies(deleteAction));
+    assertTrue(alterAction.implies(alterAction));
+    assertFalse(alterAction.implies(describeAction));
+    assertFalse(alterAction.implies(adminAction));
+    assertFalse(alterAction.implies(allAction));
+
+    assertFalse(describeAction.implies(readAction));
+    assertFalse(describeAction.implies(writeAction));
+    assertFalse(describeAction.implies(createAction));
+    assertFalse(describeAction.implies(deleteAction));
+    assertFalse(describeAction.implies(alterAction));
+    assertTrue(describeAction.implies(describeAction));
+    assertFalse(describeAction.implies(adminAction));
+    assertFalse(describeAction.implies(allAction));
+
+    assertFalse(adminAction.implies(readAction));
+    assertFalse(adminAction.implies(writeAction));
+    assertFalse(adminAction.implies(createAction));
+    assertFalse(adminAction.implies(deleteAction));
+    assertFalse(adminAction.implies(alterAction));
+    assertFalse(adminAction.implies(describeAction));
+    assertTrue(adminAction.implies(adminAction));
+    assertFalse(adminAction.implies(allAction));
+
+    assertTrue(allNameAction.implies(readAction));
+    assertTrue(allNameAction.implies(writeAction));
+    assertTrue(allNameAction.implies(createAction));
+    assertTrue(allNameAction.implies(deleteAction));
+    assertTrue(allNameAction.implies(alterAction));
+    assertTrue(allNameAction.implies(describeAction));
+    assertTrue(allNameAction.implies(adminAction));
+    assertTrue(allNameAction.implies(allAction));
+  }
+
+  @Test
+  public void testGetActionByName() throws Exception {
+    KafkaAction readAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.READ);
+    KafkaAction writeAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.WRITE);
+    KafkaAction createAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CREATE);
+    KafkaAction deleteAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.DELETE);
+    KafkaAction alterAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALTER);
+    KafkaAction describeAction =
+        (KafkaAction) factory.getActionByName(KafkaActionConstant.DESCRIBE);
+    KafkaAction adminAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION);
+    KafkaAction allAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALL);
+    KafkaAction allNameAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALL_NAME);
+
+    assertTrue(readAction.equals(new KafkaAction(KafkaActionConstant.READ)));
+    assertTrue(writeAction.equals(new KafkaAction(KafkaActionConstant.WRITE)));
+    assertTrue(createAction.equals(new KafkaAction(KafkaActionConstant.CREATE)));
+    assertTrue(deleteAction.equals(new KafkaAction(KafkaActionConstant.DELETE)));
+    assertTrue(alterAction.equals(new KafkaAction(KafkaActionConstant.ALTER)));
+    assertTrue(describeAction.equals(new KafkaAction(KafkaActionConstant.DESCRIBE)));
+    assertTrue(adminAction.equals(new KafkaAction(KafkaActionConstant.CLUSTER_ACTION)));
+    assertTrue(allAction.equals(new KafkaAction(KafkaActionConstant.ALL)));
+    assertTrue(allNameAction.equals(new KafkaAction(KafkaActionConstant.ALL)));
+  }
+
+  @Test
+  public void testGetActionsByCode() throws Exception {
+    KafkaAction readAction = new KafkaAction(KafkaActionConstant.READ);
+    KafkaAction writeAction = new KafkaAction(KafkaActionConstant.WRITE);
+    KafkaAction createAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CREATE);
+    KafkaAction deleteAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.DELETE);
+    KafkaAction alterAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALTER);
+    KafkaAction describeAction =
+        (KafkaAction) factory.getActionByName(KafkaActionConstant.DESCRIBE);
+    KafkaAction adminAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION);
+    KafkaAction allAction = new KafkaAction(KafkaActionConstant.ALL);
+
+    assertEquals(Lists.newArrayList(readAction),
+        factory.getActionsByCode(readAction.getActionCode()));
+    assertEquals(Lists.newArrayList(writeAction),
+        factory.getActionsByCode(writeAction.getActionCode()));
+    assertEquals(Lists.newArrayList(createAction),
+        factory.getActionsByCode(createAction.getActionCode()));
+    assertEquals(Lists.newArrayList(deleteAction),
+        factory.getActionsByCode(deleteAction.getActionCode()));
+    assertEquals(Lists.newArrayList(alterAction),
+        factory.getActionsByCode(alterAction.getActionCode()));
+    assertEquals(Lists.newArrayList(describeAction),
+        factory.getActionsByCode(describeAction.getActionCode()));
+    assertEquals(Lists.newArrayList(adminAction),
+        factory.getActionsByCode(adminAction.getActionCode()));
+    assertEquals(Lists.newArrayList(readAction, writeAction, createAction, deleteAction,
+        alterAction, describeAction, adminAction), factory.getActionsByCode(allAction
+        .getActionCode()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d114d2d5/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
new file mode 100644
index 0000000..1abb116
--- /dev/null
+++ b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.model.kafka;
+
+import junit.framework.Assert;
+
+import org.apache.sentry.core.model.kafka.Cluster;
+import org.apache.sentry.core.model.kafka.ConsumerGroup;
+import org.apache.sentry.core.model.kafka.KafkaAuthorizable.AuthorizableType;
+import org.apache.sentry.core.model.kafka.Host;
+import org.apache.sentry.core.model.kafka.Topic;
+import org.junit.Test;
+
+public class TestKafkaAuthorizable {
+
+  @Test
+  public void testSimpleName() throws Exception {
+    String name = "simple";
+    Host host = new Host(name);
+    Assert.assertEquals(host.getName(), name);
+
+    Cluster cluster = new Cluster(name);
+    Assert.assertEquals(cluster.getName(), name);
+
+    Topic topic = new Topic(name);
+    Assert.assertEquals(topic.getName(), name);
+
+    ConsumerGroup consumerGroup = new ConsumerGroup(name);
+    Assert.assertEquals(consumerGroup.getName(), name);
+  }
+
+  @Test
+  public void testAuthType() throws Exception {
+    Host host = new Host("host1");
+    Assert.assertEquals(host.getAuthzType(), AuthorizableType.HOST);
+
+    Cluster cluster = new Cluster("cluster1");
+    Assert.assertEquals(cluster.getAuthzType(), AuthorizableType.CLUSTER);
+
+    Topic topic = new Topic("topic1");
+    Assert.assertEquals(topic.getAuthzType(), AuthorizableType.TOPIC);
+
+    ConsumerGroup consumerGroup = new ConsumerGroup("consumerGroup1");
+    Assert.assertEquals(consumerGroup.getAuthzType(), AuthorizableType.CONSUMERGROUP);
+  }
+}