You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/08/08 19:43:40 UTC

[1/9] git commit: FALCON-279 Add ACL for Cluster Entity. Contributed by Venkatesh Seetharam

Repository: incubator-falcon
Updated Branches:
  refs/heads/master a13323ef6 -> c4de9eca7


FALCON-279 Add ACL for Cluster Entity. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/b1830605
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/b1830605
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/b1830605

Branch: refs/heads/master
Commit: b1830605b4f50bb6bb59658118af14001795a348
Parents: a13323e
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Aug 8 10:11:17 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Aug 8 10:11:17 2014 -0700

----------------------------------------------------------------------
 .../falcon/entity/v0/AccessControlList.java     | 40 +++++++++
 client/src/main/resources/cluster-0.1.xsd       | 14 +++
 client/src/main/resources/jaxb-binding.xjb      |  4 +
 .../entity/parser/ClusterEntityParser.java      | 92 ++++++++++++++++++--
 .../falcon/entity/parser/EntityParser.java      | 25 ++++++
 common/src/main/resources/startup.properties    |  7 ++
 .../entity/parser/ClusterEntityParserTest.java  | 36 ++++++++
 .../config/cluster/cluster-no-registry.xml      |  1 +
 src/conf/startup.properties                     | 11 ++-
 .../validation/ClusterEntityValidationIT.java   | 34 ++++++--
 10 files changed, 251 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b1830605/client/src/main/java/org/apache/falcon/entity/v0/AccessControlList.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/entity/v0/AccessControlList.java b/client/src/main/java/org/apache/falcon/entity/v0/AccessControlList.java
new file mode 100644
index 0000000..89ce6f9
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/entity/v0/AccessControlList.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity.v0;
+
+/**
+ * Access control list for an Entity.
+ */
+public abstract class AccessControlList {
+
+    public abstract String getOwner();
+
+    public abstract String getGroup();
+
+    public abstract String getPermission();
+
+    @Override
+    public String toString() {
+        return "AccessControlList{"
+                + "owner='" + getOwner() + '\''
+                + ", group='" + getGroup() + '\''
+                + ", permission='" + getPermission() + '\''
+                + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b1830605/client/src/main/resources/cluster-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/cluster-0.1.xsd b/client/src/main/resources/cluster-0.1.xsd
index a7b2750..79d9fdd 100644
--- a/client/src/main/resources/cluster-0.1.xsd
+++ b/client/src/main/resources/cluster-0.1.xsd
@@ -69,6 +69,7 @@
             </xs:element>
             <xs:element type="interfaces" name="interfaces"/>
             <xs:element type="locations" name="locations"/>
+            <xs:element type="ACL" name="ACL" minOccurs="0" maxOccurs="1"/>
             <xs:element type="properties" name="properties" minOccurs="0"/>
         </xs:sequence>
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
@@ -186,4 +187,17 @@
             <xs:pattern value="(\w+=[^,]+)?([,]?[ ]*[\w]+=[^,]+)*"/>
         </xs:restriction>
     </xs:simpleType>
+    <xs:complexType name="ACL">
+        <xs:annotation>
+            <xs:documentation>
+                Access control list for this cluster.
+                owner is the Owner of this entity.
+                group is the one which has access to read - not used at this time.
+                permission is not enforced at this time
+            </xs:documentation>
+        </xs:annotation>
+        <xs:attribute type="xs:string" name="owner"/>
+        <xs:attribute type="xs:string" name="group"/>
+        <xs:attribute type="xs:string" name="permission" default="*"/>
+    </xs:complexType>
 </xs:schema>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b1830605/client/src/main/resources/jaxb-binding.xjb
----------------------------------------------------------------------
diff --git a/client/src/main/resources/jaxb-binding.xjb b/client/src/main/resources/jaxb-binding.xjb
index b44663e..e51ccb7 100644
--- a/client/src/main/resources/jaxb-binding.xjb
+++ b/client/src/main/resources/jaxb-binding.xjb
@@ -28,6 +28,10 @@
         <inheritance:extends>org.apache.falcon.entity.v0.Entity</inheritance:extends>
     </jaxb:bindings>
 
+    <jaxb:bindings schemaLocation="cluster-0.1.xsd" node="//xs:complexType[@name='ACL']">
+        <inheritance:extends>org.apache.falcon.entity.v0.AccessControlList</inheritance:extends>
+    </jaxb:bindings>
+
     <jaxb:bindings schemaLocation="feed-0.1.xsd" node="//xs:complexType[@name='feed']">
         <inheritance:extends>org.apache.falcon.entity.v0.Entity</inheritance:extends>
     </jaxb:bindings>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b1830605/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index 584b867..3eabf9e 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -18,10 +18,6 @@
 
 package org.apache.falcon.entity.parser;
 
-import java.io.IOException;
-
-import javax.jms.ConnectionFactory;
-
 import org.apache.commons.lang.Validate;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.catalog.CatalogServiceFactory;
@@ -29,19 +25,27 @@ import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.StoreAccessException;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.ACL;
 import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.cluster.Location;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.ConnectionFactory;
+import java.io.IOException;
+
 /**
  * Parser that parses cluster entity definition.
  */
@@ -75,6 +79,9 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
         validateWorkflowInterface(cluster);
         validateMessagingInterface(cluster);
         validateRegistryInterface(cluster);
+
+        validateACL(cluster);
+        validateLocations(cluster);
     }
 
     private void validateScheme(Cluster cluster, Interfacetype interfacetype)
@@ -197,4 +204,77 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
             throw new ValidationException("Invalid Catalog server or port: " + catalogUrl, e);
         }
     }
+
+    /**
+     * Validate ACL if authorization is enabled.
+     *
+     * @param cluster cluster entity
+     * @throws ValidationException
+     */
+    private void validateACL(Cluster cluster) throws ValidationException {
+        if (!isAuthorizationEnabled()) {
+            return;
+        }
+
+        // Validate the entity owner is logged-in, authenticated user if authorization is enabled
+        final ACL clusterACL = cluster.getACL();
+        if (clusterACL == null) {
+            throw new ValidationException("Cluster ACL cannot be empty for:  " + cluster.getName());
+        }
+
+        validateOwner(clusterACL.getOwner());
+    }
+
+    /**
+     * Validate the locations on the cluster is owned by falcon.
+     *
+     * @param cluster cluster entity
+     * @throws ValidationException
+     */
+    private void validateLocations(Cluster cluster) throws ValidationException {
+        try {
+            Configuration conf = ClusterHelper.getConfiguration(cluster);
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+            for (Location location : cluster.getLocations().getLocations()) {
+                if (location.getName().equals("temp")) {
+                    continue;
+                }
+
+                try {
+                    Path locationPath = new Path(location.getPath());
+                    if (fs.exists(locationPath)) {
+                        FileStatus fileStatus = fs.getFileStatus(locationPath);
+                        checkPathPermissions(locationPath, fileStatus);
+                        checkPathOwner(locationPath, fileStatus);
+                    }
+                } catch (IOException e) {
+                    throw new ValidationException("Unable to validate the location " + location
+                            + "for cluster.", e);
+                }
+            }
+        } catch (FalconException e) {
+            throw new ValidationException("Unable to validate the locations for cluster.", e);
+        }
+    }
+
+    private void checkPathPermissions(Path locationPath,
+                                      FileStatus fileStatus) throws ValidationException {
+        if (fileStatus.getPermission().getUserAction() != FsAction.ALL) {
+            LOG.error("Path {} doesn't have rwx permissions {}",
+                    locationPath, fileStatus.getPermission());
+            throw new ValidationException("Path " + locationPath
+                    + " doesn't have rwx permissions: " + fileStatus.getPermission());
+        }
+    }
+
+    private void checkPathOwner(Path locationPath,
+                                FileStatus fileStatus) throws IOException, ValidationException {
+        final String owner = UserGroupInformation.getLoginUser().getShortUserName();
+        if (!fileStatus.getOwner().equals(owner)) {
+            LOG.error("Path {} with owner {} doesn't match the actual path owner {}",
+                    locationPath, owner, fileStatus.getOwner());
+            throw new ValidationException("Path [" + locationPath + "] with owner [" + owner
+                    + "] doesn't match the actual path  owner " + fileStatus.getOwner());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b1830605/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
index 8da5139..761a9f5 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
@@ -24,6 +24,8 @@ import org.apache.falcon.Pair;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.StartupProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,4 +111,27 @@ public abstract class EntityParser<T extends Entity> {
     }
 
     public abstract void validate(T entity) throws FalconException;
+
+    /**
+     * Validate if the entity owner is the logged-in authenticated user.
+     *
+     * @param owner entity owner in ACL
+     * @throws ValidationException
+     */
+    protected void validateOwner(String owner) throws ValidationException {
+        if (!CurrentUser.getUser().equals(owner)) {
+            throw new ValidationException("Entity's owner " + owner
+                    + " is not same as the logged in user " + CurrentUser.getUser());
+        }
+    }
+
+    /**
+     * Checks if the user has enabled authorization in the configuration.
+     *
+     * @return true if falcon.security.authorization.enabled is enabled, false otherwise
+     */
+    protected boolean isAuthorizationEnabled() {
+        return Boolean.valueOf(StartupProperties.get().getProperty(
+                "falcon.security.authorization.enabled", "false"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b1830605/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index ccd969f..dafdea9 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -135,3 +135,10 @@ debug.libext.process.paths=${falcon.libext}
 *.falcon.http.authentication.blacklisted.users=
 
 ######### Authentication Properties #########
+
+
+######### Authorization Properties #########
+
+*.falcon.security.authorization.enabled=false
+
+######### Authorization Properties #########

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b1830605/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
index 050efe1..55562d1 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
@@ -148,6 +148,42 @@ public class ClusterEntityParserTest extends AbstractTestBase {
         }
     }
 
+    @Test
+    public void testValidateACLWithNoACLAndAuthorizationEnabled() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+
+        try {
+            InputStream stream = this.getClass().getResourceAsStream(CLUSTER_XML);
+
+            Cluster cluster = parser.parse(stream);
+            Assert.assertNotNull(cluster);
+            Assert.assertNull(cluster.getACL());
+        } finally {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test
+    public void testValidateACLAuthorizationEnabled() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+
+        try {
+            InputStream stream = this.getClass().getResourceAsStream("/config/cluster/cluster-no-registry.xml");
+
+            Cluster cluster = parser.parse(stream);
+            Assert.assertNotNull(cluster);
+            Assert.assertNotNull(cluster.getACL());
+            Assert.assertNotNull(cluster.getACL().getOwner());
+            Assert.assertNotNull(cluster.getACL().getGroup());
+        } finally {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
     @BeforeClass
     public void init() throws Exception {
         this.dfsCluster = EmbeddedCluster.newCluster("testCluster");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b1830605/common/src/test/resources/config/cluster/cluster-no-registry.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/cluster/cluster-no-registry.xml b/common/src/test/resources/config/cluster/cluster-no-registry.xml
index 85dfe32..12b2369 100644
--- a/common/src/test/resources/config/cluster/cluster-no-registry.xml
+++ b/common/src/test/resources/config/cluster/cluster-no-registry.xml
@@ -35,6 +35,7 @@
         <location name="temp" path="/tmp"/>
         <location name="working" path="/projects/falcon/working"/>
     </locations>
+    <ACL owner="falcon" group="falcon"/>
     <properties>
         <property name="field1" value="value1"/>
         <property name="field2" value="value2"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b1830605/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index a42e67b..038026d 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -139,4 +139,13 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 # Comma separated list of black listed users
 *.falcon.http.authentication.blacklisted.users=
 
-######### Authentication Properties #########
\ No newline at end of file
+######### Authentication Properties #########
+
+
+######### Authorization Properties #########
+
+*.falcon.security.authorization.enabled=false
+#*.falcon.security.authorization.admin.users=seetharam
+#*.falcon.security.authorization.admin.groups=seetharam
+
+######### Authorization Properties #########

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b1830605/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java b/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java
index ab09474..6a65bf6 100644
--- a/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java
+++ b/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java
@@ -18,13 +18,10 @@
 
 package org.apache.falcon.validation;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Map;
-
+import com.sun.jersey.api.client.ClientResponse;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.ACL;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interface;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
@@ -34,7 +31,10 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import com.sun.jersey.api.client.ClientResponse;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Map;
 
 /**
  * Tests cluster entity validation to verify if each of the specified
@@ -101,4 +101,26 @@ public class ClusterEntityValidationIT {
         ClientResponse response = context.submitFileToFalcon(EntityType.CLUSTER, tmpFile.getAbsolutePath());
         context.assertFailure(response);
     }
+
+    @Test
+    public void testValidateACL() throws Exception {
+        overlay = context.getUniqueOverlay();
+        String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+        InputStream stream = new FileInputStream(filePath);
+        Cluster cluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
+        Assert.assertNotNull(cluster);
+
+        // Adding ACL with authorization disabled must not hurt
+        ACL clusterACL = new ACL();
+        clusterACL.setOwner(TestContext.REMOTE_USER);
+        clusterACL.setGroup(TestContext.REMOTE_USER);
+        cluster.setACL(clusterACL);
+
+        cluster.setColo("default");  // validations will be ignored if not default & tests fail
+
+        File tmpFile = TestContext.getTempFile();
+        EntityType.CLUSTER.getMarshaller().marshal(cluster, tmpFile);
+        ClientResponse response = context.submitFileToFalcon(EntityType.CLUSTER, tmpFile.getAbsolutePath());
+        context.assertSuccessful(response);
+    }
 }


[5/9] FALCON-464 Enforce Authorization for REST API. Contributed by Venkatesh Seetharam

Posted by ve...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 72669eb..52bb7ae 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -23,26 +23,18 @@ import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.GraphQuery;
 import com.tinkerpop.blueprints.Vertex;
+import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interface;
-import org.apache.falcon.entity.v0.cluster.Interfaces;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
 import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Inputs;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Workflow;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.StartupProperties;
 import org.testng.Assert;
@@ -136,7 +128,8 @@ public class MetadataMappingServiceTest {
 
     @Test
     public void testOnAddClusterEntity() throws Exception {
-        clusterEntity = buildCluster(CLUSTER_ENTITY_NAME, COLO_NAME, "classification=production");
+        clusterEntity = EntityBuilderTestUtil.buildCluster(CLUSTER_ENTITY_NAME, COLO_NAME,
+                "classification=production");
         configStore.publish(EntityType.CLUSTER, clusterEntity);
 
         verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY);
@@ -148,8 +141,9 @@ public class MetadataMappingServiceTest {
 
     @Test (dependsOnMethods = "testOnAddClusterEntity")
     public void testOnAddFeedEntity() throws Exception {
-        Feed impressionsFeed = buildFeed("impression-feed", clusterEntity, "classified-as=Secure", "analytics",
-                Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
+        Feed impressionsFeed = EntityBuilderTestUtil.buildFeed("impression-feed", clusterEntity,
+                "classified-as=Secure", "analytics");
+        addStorage(impressionsFeed, Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
         configStore.publish(EntityType.FEED, impressionsFeed);
         inputFeeds.add(impressionsFeed);
         verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY);
@@ -157,16 +151,18 @@ public class MetadataMappingServiceTest {
         Assert.assertEquals(getVerticesCount(service.getGraph()), 7); // +4 = feed, tag, group, user
         Assert.assertEquals(getEdgesCount(service.getGraph()), 6); // +4 = cluster, tag, group, user
 
-        Feed clicksFeed = buildFeed("clicks-feed", clusterEntity, "classified-as=Secure,classified-as=Financial",
-                "analytics", Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
+        Feed clicksFeed = EntityBuilderTestUtil.buildFeed("clicks-feed", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "analytics");
+        addStorage(clicksFeed, Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
         configStore.publish(EntityType.FEED, clicksFeed);
         inputFeeds.add(clicksFeed);
         verifyEntityWasAddedToGraph(clicksFeed.getName(), RelationshipType.FEED_ENTITY);
         Assert.assertEquals(getVerticesCount(service.getGraph()), 9); // feed and financial vertex
         Assert.assertEquals(getEdgesCount(service.getGraph()), 11); // +5 = cluster + user + 2Group + Tag
 
-        Feed join1Feed = buildFeed("imp-click-join1", clusterEntity, "classified-as=Financial", "reporting,bi",
-                Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+        Feed join1Feed = EntityBuilderTestUtil.buildFeed("imp-click-join1", clusterEntity,
+                "classified-as=Financial", "reporting,bi");
+        addStorage(join1Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
         configStore.publish(EntityType.FEED, join1Feed);
         outputFeeds.add(join1Feed);
         verifyEntityWasAddedToGraph(join1Feed.getName(), RelationshipType.FEED_ENTITY);
@@ -174,8 +170,9 @@ public class MetadataMappingServiceTest {
         Assert.assertEquals(getEdgesCount(service.getGraph()), 16); // +5 = cluster + user +
         // Group + 2Tags
 
-        Feed join2Feed = buildFeed("imp-click-join2", clusterEntity, "classified-as=Secure,classified-as=Financial",
-                "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
+        Feed join2Feed = EntityBuilderTestUtil.buildFeed("imp-click-join2", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "reporting,bi");
+        addStorage(join2Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
         configStore.publish(EntityType.FEED, join2Feed);
         outputFeeds.add(join2Feed);
         verifyEntityWasAddedToGraph(join2Feed.getName(), RelationshipType.FEED_ENTITY);
@@ -187,15 +184,16 @@ public class MetadataMappingServiceTest {
 
     @Test (dependsOnMethods = "testOnAddFeedEntity")
     public void testOnAddProcessEntity() throws Exception {
-        processEntity = buildProcess(PROCESS_ENTITY_NAME, clusterEntity, "classified-as=Critical");
-        addWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
+        processEntity = EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME, clusterEntity,
+                "classified-as=Critical");
+        EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
 
         for (Feed inputFeed : inputFeeds) {
-            addInput(processEntity, inputFeed);
+            EntityBuilderTestUtil.addInput(processEntity, inputFeed);
         }
 
         for (Feed outputFeed : outputFeeds) {
-            addOutput(processEntity, outputFeed);
+            EntityBuilderTestUtil.addOutput(processEntity, outputFeed);
         }
 
         configStore.publish(EntityType.PROCESS, processEntity);
@@ -241,7 +239,8 @@ public class MetadataMappingServiceTest {
         service.init();
 
         // cannot modify cluster, adding a new cluster
-        bcpCluster = buildCluster("bcp-cluster", "east-coast", "classification=bcp");
+        bcpCluster = EntityBuilderTestUtil.buildCluster("bcp-cluster", "east-coast",
+                "classification=bcp");
         configStore.publish(EntityType.CLUSTER, bcpCluster);
         verifyEntityWasAddedToGraph("bcp-cluster", RelationshipType.CLUSTER_ENTITY);
 
@@ -253,9 +252,9 @@ public class MetadataMappingServiceTest {
     @Test(dependsOnMethods = "testOnChange")
     public void testOnFeedEntityChange() throws Exception {
         Feed oldFeed = inputFeeds.get(0);
-        Feed newFeed = buildFeed(oldFeed.getName(), clusterEntity,
-                "classified-as=Secured,source=data-warehouse", "reporting",
-                Storage.TYPE.FILESYSTEM, "jail://global:00/falcon/impression-feed/20140101");
+        Feed newFeed = EntityBuilderTestUtil.buildFeed(oldFeed.getName(), clusterEntity,
+                "classified-as=Secured,source=data-warehouse", "reporting");
+        addStorage(newFeed, Storage.TYPE.FILESYSTEM, "jail://global:00/falcon/impression-feed/20140101");
 
         try {
             configStore.initiateUpdate(newFeed);
@@ -301,9 +300,10 @@ public class MetadataMappingServiceTest {
     @Test(dependsOnMethods = "testOnFeedEntityChange")
     public void testOnProcessEntityChange() throws Exception {
         Process oldProcess = processEntity;
-        Process newProcess = buildProcess(oldProcess.getName(), bcpCluster, null);
-        addWorkflow(newProcess, WORKFLOW_NAME, "2.0.0");
-        addInput(newProcess, inputFeeds.get(0));
+        Process newProcess = EntityBuilderTestUtil.buildProcess(oldProcess.getName(), bcpCluster,
+                null);
+        EntityBuilderTestUtil.addProcessWorkflow(newProcess, WORKFLOW_NAME, "2.0.0");
+        EntityBuilderTestUtil.addInput(newProcess, inputFeeds.get(0));
 
         try {
             configStore.initiateUpdate(newProcess);
@@ -348,44 +348,6 @@ public class MetadataMappingServiceTest {
         }
     }
 
-    private static Cluster buildCluster(String name, String colo, String tags) {
-        Cluster cluster = new Cluster();
-        cluster.setName(name);
-        cluster.setColo(colo);
-        cluster.setTags(tags);
-
-        Interfaces interfaces = new Interfaces();
-        cluster.setInterfaces(interfaces);
-
-        Interface storage = new Interface();
-        storage.setEndpoint("jail://global:00");
-        storage.setType(Interfacetype.WRITE);
-        cluster.getInterfaces().getInterfaces().add(storage);
-
-        return cluster;
-    }
-
-    private static Feed buildFeed(String feedName, Cluster cluster, String tags, String groups,
-                                  Storage.TYPE storageType, String uriTemplate) {
-        Feed feed = new Feed();
-        feed.setName(feedName);
-        feed.setTags(tags);
-        feed.setGroups(groups);
-        feed.setFrequency(Frequency.fromString("hours(1)"));
-
-        org.apache.falcon.entity.v0.feed.Clusters
-                clusters = new org.apache.falcon.entity.v0.feed.Clusters();
-        feed.setClusters(clusters);
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
-                new org.apache.falcon.entity.v0.feed.Cluster();
-        feedCluster.setName(cluster.getName());
-        clusters.getClusters().add(feedCluster);
-
-        addStorage(feed, storageType, uriTemplate);
-
-        return feed;
-    }
-
     private static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
         if (storageType == Storage.TYPE.FILESYSTEM) {
             Locations locations = new Locations();
@@ -402,53 +364,6 @@ public class MetadataMappingServiceTest {
         }
     }
 
-    private static Process buildProcess(String processName, Cluster cluster,
-                                        String tags) throws Exception {
-        Process processEntity = new Process();
-        processEntity.setName(processName);
-        processEntity.setTags(tags);
-
-        org.apache.falcon.entity.v0.process.Cluster processCluster =
-                new org.apache.falcon.entity.v0.process.Cluster();
-        processCluster.setName(cluster.getName());
-        processEntity.setClusters(new org.apache.falcon.entity.v0.process.Clusters());
-        processEntity.getClusters().getClusters().add(processCluster);
-
-        return processEntity;
-    }
-
-    private static void addWorkflow(Process process, String workflowName, String version) {
-        Workflow workflow = new Workflow();
-        workflow.setName(workflowName);
-        workflow.setVersion(version);
-        workflow.setEngine(EngineType.PIG);
-        workflow.setPath("/falcon/test/workflow");
-
-        process.setWorkflow(workflow);
-    }
-
-    private static void addInput(Process process, Feed feed) {
-        if (process.getInputs() == null) {
-            process.setInputs(new Inputs());
-        }
-
-        Inputs inputs = process.getInputs();
-        Input input = new Input();
-        input.setFeed(feed.getName());
-        inputs.getInputs().add(input);
-    }
-
-    private static void addOutput(Process process, Feed feed) {
-        if (process.getOutputs() == null) {
-            process.setOutputs(new Outputs());
-        }
-
-        Outputs outputs = process.getOutputs();
-        Output output = new Output();
-        output.setFeed(feed.getName());
-        outputs.getOutputs().add(output);
-    }
-
     private void verifyEntityWasAddedToGraph(String entityName, RelationshipType entityType) {
         Vertex entityVertex = getEntityVertex(entityName, entityType);
         Assert.assertNotNull(entityVertex);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
index fe7155b..a1861e1 100644
--- a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
+++ b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.security;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -32,4 +33,12 @@ public class CurrentUserTest {
         CurrentUser.authenticate(id);
         Assert.assertEquals(CurrentUser.getUser(), id);
     }
+
+    @Test
+    public void testGetProxyUser() throws Exception {
+        CurrentUser.authenticate("proxy");
+        UserGroupInformation proxyUgi = CurrentUser.getProxyUgi();
+        Assert.assertNotNull(proxyUgi);
+        Assert.assertEquals(proxyUgi.getUserName(), "proxy");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java b/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
new file mode 100644
index 0000000..0fd869e
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.Collection;
+
+/**
+ * Unit tests for DefaultAuthorizationProvider.
+ */
+public class DefaultAuthorizationProviderTest {
+
+    public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
+    public static final String PROCESS_ENTITY_NAME = "sample-process";
+
+    private UserGroupInformation realUser;
+    private ConfigurationStore configStore;
+    private Cluster clusterEntity;
+    private Feed feedEntity;
+    private org.apache.falcon.entity.v0.process.Process processEntity;
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        realUser = UserGroupInformation.createUserForTesting("falcon", new String[]{"falcon", });
+
+        CurrentUser.authenticate(EntityBuilderTestUtil.USER);
+        org.testng.Assert.assertEquals(CurrentUser.getUser(), EntityBuilderTestUtil.USER);
+
+        configStore = ConfigurationStore.get();
+
+        addClusterEntity();
+        addFeedEntity();
+        addProcessEntity();
+        org.testng.Assert.assertNotNull(processEntity);
+    }
+
+    public void addClusterEntity() throws Exception {
+        clusterEntity = EntityBuilderTestUtil.buildCluster(CLUSTER_ENTITY_NAME);
+        configStore.publish(EntityType.CLUSTER, clusterEntity);
+    }
+
+    public void addFeedEntity() throws Exception {
+        feedEntity = EntityBuilderTestUtil.buildFeed("sample-feed", clusterEntity,
+                "classified-as=Secure", "analytics");
+        addStorage(feedEntity, Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
+        configStore.publish(EntityType.FEED, feedEntity);
+    }
+
+    private static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            Locations locations = new Locations();
+            feed.setLocations(locations);
+
+            Location location = new Location();
+            location.setType(LocationType.DATA);
+            location.setPath(uriTemplate);
+            feed.getLocations().getLocations().add(location);
+        } else {
+            CatalogTable table = new CatalogTable();
+            table.setUri(uriTemplate);
+            feed.setTable(table);
+        }
+    }
+
+    public void addProcessEntity() throws Exception {
+        processEntity = EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME,
+                clusterEntity, "classified-as=Critical");
+        EntityBuilderTestUtil.addProcessWorkflow(processEntity);
+        EntityBuilderTestUtil.addProcessACL(processEntity);
+
+        configStore.publish(EntityType.PROCESS, processEntity);
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        cleanupStore();
+    }
+
+    protected void cleanupStore() throws FalconException {
+        configStore = ConfigurationStore.get();
+        for (EntityType type : EntityType.values()) {
+            Collection<String> entities = configStore.getEntities(type);
+            for (String entity : entities) {
+                configStore.remove(type, entity);
+            }
+        }
+    }
+
+    @DataProvider(name = "adminResourceActions")
+    private Object[][] createAdminResourceActions() {
+        return new Object[][] {
+            {"version"},
+            {"stack"},
+            {"config"},
+        };
+    }
+
+    @Test (dataProvider = "adminResourceActions")
+    public void testAuthorizeAdminResourceAdmin(String action) throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("admin", action, null, null, proxyUgi);
+    }
+
+    @Test
+    public void testAuthorizeAdminResourceAdminUserBadGroup() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin-group", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("admin", "version", null, null, proxyUgi);
+    }
+
+    @Test
+    public void testAuthorizeAdminResourceAdminGroupBadUser() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty(
+                "falcon.security.authorization.admin.groups", "admin-group");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin-user", realUser, new String[]{"admin-group", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("admin", "version", null, null, proxyUgi);
+    }
+
+    @Test (expectedExceptions = AuthorizationException.class)
+    public void testAuthorizeAdminResourceInvalidUserAndGroup() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin-user", realUser, new String[]{"admin-group", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("admin", "version", null, null, proxyUgi);
+        Assert.fail("User does not belong to both admin-users not groups");
+    }
+
+    @DataProvider(name = "entityResourceActions")
+    private Object[][] createEntityResourceActions() {
+        return new Object[][] {
+            {"entities", "list", "feed"},
+            {"entities", "list", "process"},
+            {"entities", "list", "cluster"},
+        };
+    }
+
+    @Test (dataProvider = "entityResourceActions")
+    public void testAuthorizeEntitiesInstancesReadOnlyResource(String resource,
+                                                               String action,
+                                                               String entityType) throws Exception {
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin-user", realUser, new String[]{"admin-group", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource(resource, action, entityType, null, proxyUgi);
+    }
+
+    @DataProvider(name = "entityLifecycleResourceActions")
+    private Object[][] createEntityLifecycleResourceActions() {
+        return new Object[][] {
+            {"entities", "status", "cluster", "primary-cluster"},
+            {"entities", "status", "process", "sample-process"},
+            {"entities", "status", "feed", "sample-feed"},
+            {"instance", "status", "process", "sample-process"},
+            {"instance", "running", "process", "sample-process"},
+            {"instance", "running", "feed", "sample-feed"},
+        };
+    }
+
+    @Test(dataProvider = "entityLifecycleResourceActions")
+    public void testAuthorizeEntitiesInstancesLifecycleResource(String resource, String action,
+                                                                String entityType,
+                                                                String entityName) throws Exception {
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                EntityBuilderTestUtil.USER, realUser, new String[]{EntityBuilderTestUtil.USER, });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource(resource, action, entityType, entityName, proxyUgi);
+    }
+
+    @Test(dataProvider = "entityLifecycleResourceActions",
+            expectedExceptions = AuthorizationException.class)
+    public void testAuthorizeEntitiesInstancesLifecycleResourceBadUGI(String resource,
+                                                                      String action,
+                                                                      String entityType,
+                                                                      String entityName)
+        throws Exception {
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin-user", realUser, new String[]{"admin-group", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource(resource, action, entityType, entityName, proxyUgi);
+    }
+
+    @Test (expectedExceptions = AuthorizationException.class)
+    public void testAuthorizeBadResource() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("invalid", "version", null, null, proxyUgi);
+        Assert.fail("Bad resource");
+    }
+
+    @Test (expectedExceptions = IllegalArgumentException.class)
+    public void testAuthorizeNullResource() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource(null, "version", null, null, proxyUgi);
+        Assert.fail("Bad resource");
+    }
+
+    @Test (expectedExceptions = IllegalArgumentException.class)
+    public void testAuthorizeBadAction() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("entities", null, "feedz", null, proxyUgi);
+        Assert.fail("Bad action");
+    }
+
+    @Test (expectedExceptions = IllegalArgumentException.class)
+    public void testAuthorizeNullEntityType() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("entities", "list", null, "primary-cluster", proxyUgi);
+        Assert.fail("Bad entity type");
+    }
+
+    @Test (expectedExceptions = IllegalArgumentException.class)
+    public void testAuthorizeBadEntityType() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("entities", "list", "clusterz", "primary-cluster", proxyUgi);
+        Assert.fail("Bad entity type");
+    }
+
+    @Test
+    public void testAuthorizeValidatePOSTOperations() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        EntityBuilderTestUtil.addProcessACL(processEntity, "admin", "admin");
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeEntity(processEntity.getName(), "process",
+                processEntity.getACL(), "submit", proxyUgi);
+    }
+
+    @Test (expectedExceptions = AuthorizationException.class)
+    public void testAuthorizeResourceOperationsBadEntity() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("entities", "status", "feed", processEntity.getName(), proxyUgi);
+        Assert.fail("Bad entity");
+    }
+
+    @Test
+    public void testAuthorizeValidatePOSTOperationsGroup() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        EntityBuilderTestUtil.addProcessACL(processEntity, "admin-user", "admin");
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeEntity(processEntity.getName(), "process",
+                processEntity.getACL(), "submit", proxyUgi);
+    }
+
+    @Test (expectedExceptions = AuthorizationException.class)
+    public void testAuthorizeValidatePOSTOperationsBadUserAndGroup() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        EntityBuilderTestUtil.addProcessACL(processEntity, "admin-user", "admin-group");
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeEntity(processEntity.getName(), "process",
+                processEntity.getACL(), "submit", proxyUgi);
+    }
+
+    @Test
+    public void testAuthorizeLineageResource() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("lineage", "vertices", null, null, proxyUgi);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java b/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java
index 630aa4b..a36d916 100644
--- a/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java
+++ b/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java
@@ -19,7 +19,6 @@
 package org.apache.falcon.security;
 
 import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -61,9 +60,25 @@ public class SecurityUtilTest {
     }
 
     @Test
-    public void testGetProxyUser() throws Exception {
-        UserGroupInformation proxyUgi = SecurityUtil.getProxyUser("proxy");
-        Assert.assertNotNull(proxyUgi);
-        Assert.assertEquals(proxyUgi.getUserName(), "proxy");
+    public void testIsAuthorizationEnabledByDefault() throws Exception {
+        Assert.assertFalse(SecurityUtil.isAuthorizationEnabled());
+    }
+
+    @Test
+    public void testIsAuthorizationEnabled() throws Exception {
+        try {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+            Assert.assertTrue(SecurityUtil.isAuthorizationEnabled());
+        } finally {
+            // reset
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test
+    public void testGetAuthorizationProviderByDefault() throws Exception {
+        Assert.assertNotNull(SecurityUtil.getAuthorizationProvider());
+        Assert.assertEquals(SecurityUtil.getAuthorizationProvider().getClass(),
+                DefaultAuthorizationProvider.class);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/main/java/org/apache/falcon/FalconWebException.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/FalconWebException.java b/prism/src/main/java/org/apache/falcon/FalconWebException.java
index 736a8b5..251a1c1 100644
--- a/prism/src/main/java/org/apache/falcon/FalconWebException.java
+++ b/prism/src/main/java/org/apache/falcon/FalconWebException.java
@@ -21,6 +21,7 @@ package org.apache.falcon;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,8 +38,13 @@ public class FalconWebException extends WebApplicationException {
 
     private static final Logger LOG = LoggerFactory.getLogger(FalconWebException.class);
 
-    public static FalconWebException newException(Throwable e, Response.Status status) {
-        return newException(getMessage(e), status);
+    public static FalconWebException newException(Throwable e,
+                                                  Response.Status status) {
+        if (e instanceof AuthorizationException) {
+            status = Response.Status.FORBIDDEN;
+        }
+
+        return newException(e.getMessage(), status);
     }
 
     public static FalconWebException newInstanceException(Throwable e, Response.Status status) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 3d9078e..c775510 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -460,8 +460,6 @@ public abstract class AbstractEntityManager {
      *
      * @param type entity type
      * @param fieldStr fields that the query is interested in, separated by comma
-     *
-     * @param type entity type
      * @return String
      */
     public EntityList getEntityList(String type, String fieldStr) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index f98aece..f5329ef 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -25,6 +25,7 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.UnschedulableEntityException;
 import org.apache.falcon.monitors.Dimension;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,8 +45,8 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
     /**
      * Schedules an submitted entity immediately.
      *
-     * @param type
-     * @param entity
+     * @param type   entity type
+     * @param entity entity name
      * @return APIResult
      */
     public APIResult schedule(
@@ -63,7 +64,9 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
         }
     }
 
-    private synchronized void scheduleInternal(String type, String entity) throws FalconException {
+    private synchronized void scheduleInternal(String type, String entity)
+        throws FalconException, AuthorizationException {
+
         checkSchedulableEntity(type);
         Entity entityObj = EntityUtil.getEntity(type, entity);
         getWorkflowEngine().schedule(entityObj);
@@ -72,8 +75,8 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
     /**
      * Submits a new entity and schedules it immediately.
      *
-     * @param type
-     * @return
+     * @param type   entity type
+     * @return APIResult
      */
     public APIResult submitAndSchedule(
             @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
@@ -95,8 +98,8 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
     /**
      * Suspends a running entity.
      *
-     * @param type
-     * @param entity
+     * @param type   entity type
+     * @param entity entity name
      * @return APIResult
      */
     public APIResult suspend(
@@ -123,8 +126,8 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
     /**
      * Resumes a suspended entity.
      *
-     * @param type
-     * @param entity
+     * @param type   entity type
+     * @param entity entity name
      * @return APIResult
      */
     public APIResult resume(

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java b/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java
deleted file mode 100644
index 5a56b9a..0000000
--- a/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
-import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
-import org.apache.log4j.NDC;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.FilterChain;
-import javax.servlet.FilterConfig;
-import javax.servlet.ServletException;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-
-/**
- * This enforces authentication as part of the filter before processing the request.
- * Subclass of {@link AuthenticationFilter}.
- */
-public class BasicAuthFilter extends AuthenticationFilter {
-
-    private static final Logger LOG = LoggerFactory.getLogger(BasicAuthFilter.class);
-
-    /**
-     * Constant for the configuration property that indicates the prefix.
-     */
-    protected static final String FALCON_PREFIX = "falcon.http.authentication.";
-    protected static final String KERBEROS_PRINCIPAL = FALCON_PREFIX + KerberosAuthenticationHandler.PRINCIPAL;
-
-    /**
-     * Constant for the configuration property that indicates the blacklisted super users for falcon.
-     */
-    private static final String BLACK_LISTED_USERS_KEY = FALCON_PREFIX + "blacklisted.users";
-
-    /**
-     * An options servlet is used to authenticate users. OPTIONS method is used for triggering authentication
-     * before invoking the actual resource.
-     */
-    private HttpServlet optionsServlet;
-    private Set<String> blackListedUsers;
-
-    /**
-     * Initialize the filter.
-     *
-     * @param filterConfig filter configuration.
-     * @throws ServletException thrown if the filter could not be initialized.
-     */
-    @Override
-    public void init(FilterConfig filterConfig) throws ServletException {
-        LOG.info("BasicAuthFilter initialization started");
-        super.init(filterConfig);
-
-        optionsServlet = new HttpServlet() {};
-        optionsServlet.init();
-
-        initializeBlackListedUsers();
-    }
-
-    private void initializeBlackListedUsers() {
-        blackListedUsers = new HashSet<String>();
-        String blackListedUserConfig = StartupProperties.get().getProperty(BLACK_LISTED_USERS_KEY);
-        if (!StringUtils.isEmpty(blackListedUserConfig)) {
-            blackListedUsers.addAll(Arrays.asList(blackListedUserConfig.split(",")));
-        }
-    }
-
-    /**
-     * Returns the configuration from Oozie configuration to be used by the authentication filter.
-     * <p/>
-     * All properties from Oozie configuration which name starts with {@link #FALCON_PREFIX} will
-     * be returned. The keys of the returned properties are trimmed from the {@link #FALCON_PREFIX}
-     * prefix, for example the Oozie configuration property name 'oozie.authentication.type' will
-     * be just 'type'.
-     *
-     * @param configPrefix configuration prefix, this parameter is ignored by this implementation.
-     * @param filterConfig filter configuration, this parameter is ignored by this implementation.
-     * @return all Oozie configuration properties prefixed with {@link #FALCON_PREFIX}, without the
-     * prefix.
-     */
-    @Override
-    protected Properties getConfiguration(String configPrefix, FilterConfig filterConfig) {
-        Properties authProperties = new Properties();
-        Properties configProperties = StartupProperties.get();
-
-        // setting the cookie path to root '/' so it is used for all resources.
-        authProperties.setProperty(AuthenticationFilter.COOKIE_PATH, "/");
-
-        for (Map.Entry entry : configProperties.entrySet()) {
-            String name = (String) entry.getKey();
-            if (name.startsWith(FALCON_PREFIX)) {
-                String value = (String) entry.getValue();
-                name = name.substring(FALCON_PREFIX.length());
-                authProperties.setProperty(name, value);
-            }
-        }
-
-        if (UserGroupInformation.isSecurityEnabled()) { // replace _HOST in principal
-            String principal = getKerberosPrincipalWithSubstitutedHost(configProperties);
-            // principal cannot be null in secure mode, is validated in submission
-            authProperties.setProperty(KerberosAuthenticationHandler.PRINCIPAL, principal);
-        }
-
-        return authProperties;
-    }
-
-    /**
-     * Replaces _HOST in the principal with the actual hostname.
-     *
-     * @param configProperties Falcon config properties
-     * @return principal with _HOST substituted
-     */
-    private String getKerberosPrincipalWithSubstitutedHost(Properties configProperties) {
-        String principal = configProperties.getProperty(KERBEROS_PRINCIPAL);
-        try {
-            principal = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(
-                    principal, SecurityUtil.getLocalHostName());
-        } catch (IOException ignored) {
-            // do nothing
-        }
-
-        return principal;
-    }
-
-    @Override
-    public void doFilter(final ServletRequest request, final ServletResponse response,
-                         final FilterChain filterChain) throws IOException, ServletException {
-
-        FilterChain filterChainWrapper = new FilterChain() {
-
-            @Override
-            public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse)
-                throws IOException, ServletException {
-                HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
-
-                if (httpRequest.getMethod().equals("OPTIONS")) { // option request meant only for authentication
-                    optionsServlet.service(request, response);
-                } else {
-                    final String user = getUserFromRequest(httpRequest);
-                    if (StringUtils.isEmpty(user)) {
-                        ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
-                                "User can't be empty");
-                    } else if (blackListedUsers.contains(user)) {
-                        ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
-                                "User can't be a superuser:" + BLACK_LISTED_USERS_KEY);
-                    } else {
-                        try {
-                            String requestId = UUID.randomUUID().toString();
-                            NDC.push(user + ":" + httpRequest.getMethod() + "/" + httpRequest.getPathInfo());
-                            NDC.push(requestId);
-                            CurrentUser.authenticate(user);
-                            LOG.info("Request from user: {}, URL={}", user, getRequestUrl(httpRequest));
-
-                            filterChain.doFilter(servletRequest, servletResponse);
-                        } finally {
-                            NDC.pop();
-                            NDC.pop();
-                        }
-                    }
-                }
-            }
-
-            private String getUserFromRequest(HttpServletRequest httpRequest) {
-                String user = httpRequest.getRemoteUser(); // this is available from wrapper in super class
-                if (!StringUtils.isEmpty(user)) {
-                    return user;
-                }
-
-                user = httpRequest.getParameter("user.name"); // available in query-param
-                if (!StringUtils.isEmpty(user)) {
-                    return user;
-                }
-
-                user = httpRequest.getHeader("Remote-User"); // backwards-compatibility
-                if (!StringUtils.isEmpty(user)) {
-                    return user;
-                }
-
-                return null;
-            }
-
-            private String getRequestUrl(HttpServletRequest request) {
-                StringBuffer url = request.getRequestURL();
-                if (request.getQueryString() != null) {
-                    url.append("?").append(request.getQueryString());
-                }
-
-                return url.toString();
-            }
-        };
-
-        super.doFilter(request, response, filterChainWrapper);
-    }
-
-    @Override
-    public void destroy() {
-        if (optionsServlet != null) {
-            optionsServlet.destroy();
-        }
-
-        super.destroy();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java b/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java
new file mode 100644
index 0000000..54023d1
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
+import org.apache.log4j.NDC;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * This enforces authentication as part of the filter before processing the request.
+ * Subclass of {@link org.apache.hadoop.security.authentication.server.AuthenticationFilter}.
+ */
+public class FalconAuthenticationFilter
+        extends org.apache.hadoop.security.authentication.server.AuthenticationFilter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FalconAuthenticationFilter.class);
+
+    /**
+     * Constant for the configuration property that indicates the prefix.
+     */
+    protected static final String FALCON_PREFIX = "falcon.http.authentication.";
+    protected static final String KERBEROS_PRINCIPAL = FALCON_PREFIX + KerberosAuthenticationHandler.PRINCIPAL;
+
+    /**
+     * Constant for the configuration property that indicates the blacklisted super users for falcon.
+     */
+    private static final String BLACK_LISTED_USERS_KEY = FALCON_PREFIX + "blacklisted.users";
+
+    /**
+     * An options servlet is used to authenticate users. OPTIONS method is used for triggering authentication
+     * before invoking the actual resource.
+     */
+    private HttpServlet optionsServlet;
+    private Set<String> blackListedUsers;
+
+    /**
+     * Initialize the filter.
+     *
+     * @param filterConfig filter configuration.
+     * @throws ServletException thrown if the filter could not be initialized.
+     */
+    @Override
+    public void init(FilterConfig filterConfig) throws ServletException {
+        LOG.info("FalconAuthenticationFilter initialization started");
+        super.init(filterConfig);
+
+        optionsServlet = new HttpServlet() {};
+        optionsServlet.init();
+
+        initializeBlackListedUsers();
+    }
+
+    private void initializeBlackListedUsers() {
+        blackListedUsers = new HashSet<String>();
+        String blackListedUserConfig = StartupProperties.get().getProperty(BLACK_LISTED_USERS_KEY);
+        if (!StringUtils.isEmpty(blackListedUserConfig)) {
+            blackListedUsers.addAll(Arrays.asList(blackListedUserConfig.split(",")));
+        }
+    }
+
+    /**
+     * Returns the configuration from Oozie configuration to be used by the authentication filter.
+     * <p/>
+     * All properties from Oozie configuration which name starts with {@link #FALCON_PREFIX} will
+     * be returned. The keys of the returned properties are trimmed from the {@link #FALCON_PREFIX}
+     * prefix, for example the Oozie configuration property name 'oozie.authentication.type' will
+     * be just 'type'.
+     *
+     * @param configPrefix configuration prefix, this parameter is ignored by this implementation.
+     * @param filterConfig filter configuration, this parameter is ignored by this implementation.
+     * @return all Oozie configuration properties prefixed with {@link #FALCON_PREFIX}, without the
+     * prefix.
+     */
+    @Override
+    protected Properties getConfiguration(String configPrefix, FilterConfig filterConfig) {
+        Properties authProperties = new Properties();
+        Properties configProperties = StartupProperties.get();
+
+        // setting the cookie path to root '/' so it is used for all resources.
+        authProperties.setProperty(
+                org.apache.hadoop.security.authentication.server.AuthenticationFilter.COOKIE_PATH, "/");
+
+        for (Map.Entry entry : configProperties.entrySet()) {
+            String name = (String) entry.getKey();
+            if (name.startsWith(FALCON_PREFIX)) {
+                String value = (String) entry.getValue();
+                name = name.substring(FALCON_PREFIX.length());
+                authProperties.setProperty(name, value);
+            }
+        }
+
+        if (UserGroupInformation.isSecurityEnabled()) { // replace _HOST in principal
+            String principal = getKerberosPrincipalWithSubstitutedHost(configProperties);
+            // principal cannot be null in secure mode, is validated in submission
+            authProperties.setProperty(KerberosAuthenticationHandler.PRINCIPAL, principal);
+        }
+
+        return authProperties;
+    }
+
+    /**
+     * Replaces _HOST in the principal with the actual hostname.
+     *
+     * @param configProperties Falcon config properties
+     * @return principal with _HOST substituted
+     */
+    private String getKerberosPrincipalWithSubstitutedHost(Properties configProperties) {
+        String principal = configProperties.getProperty(KERBEROS_PRINCIPAL);
+        try {
+            principal = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(
+                    principal, SecurityUtil.getLocalHostName());
+        } catch (IOException ignored) {
+            // do nothing
+        }
+
+        return principal;
+    }
+
+    @Override
+    public void doFilter(final ServletRequest request, final ServletResponse response,
+                         final FilterChain filterChain) throws IOException, ServletException {
+
+        FilterChain filterChainWrapper = new FilterChain() {
+
+            @Override
+            public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse)
+                throws IOException, ServletException {
+                HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
+
+                if (httpRequest.getMethod().equals("OPTIONS")) { // option request meant only for authentication
+                    optionsServlet.service(request, response);
+                } else {
+                    final String user = getUserFromRequest(httpRequest);
+                    if (StringUtils.isEmpty(user)) {
+                        ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
+                                "User can't be empty");
+                    } else if (blackListedUsers.contains(user)) {
+                        ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
+                                "User can't be a superuser:" + BLACK_LISTED_USERS_KEY);
+                    } else {
+                        try {
+                            String requestId = UUID.randomUUID().toString();
+                            NDC.push(user + ":" + httpRequest.getMethod() + "/" + httpRequest.getPathInfo());
+                            NDC.push(requestId);
+                            CurrentUser.authenticate(user);
+                            LOG.info("Request from user: {}, URL={}", user, getRequestUrl(httpRequest));
+
+                            filterChain.doFilter(servletRequest, servletResponse);
+                        } finally {
+                            NDC.pop();
+                            NDC.pop();
+                        }
+                    }
+                }
+            }
+
+            private String getUserFromRequest(HttpServletRequest httpRequest) {
+                String user = httpRequest.getRemoteUser(); // this is available from wrapper in super class
+                if (!StringUtils.isEmpty(user)) {
+                    return user;
+                }
+
+                user = httpRequest.getParameter("user.name"); // available in query-param
+                if (!StringUtils.isEmpty(user)) {
+                    return user;
+                }
+
+                user = httpRequest.getHeader("Remote-User"); // backwards-compatibility
+                if (!StringUtils.isEmpty(user)) {
+                    return user;
+                }
+
+                return null;
+            }
+
+            private String getRequestUrl(HttpServletRequest request) {
+                StringBuffer url = request.getRequestURL();
+                if (request.getQueryString() != null) {
+                    url.append("?").append(request.getQueryString());
+                }
+
+                return url.toString();
+            }
+        };
+
+        super.doFilter(request, response, filterChainWrapper);
+    }
+
+    @Override
+    public void destroy() {
+        if (optionsServlet != null) {
+            optionsServlet.destroy();
+        }
+
+        super.destroy();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java b/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
new file mode 100644
index 0000000..ceb13d0
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.falcon.FalconException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This enforces authorization as part of the filter before processing the request.
+ */
+public class FalconAuthorizationFilter implements Filter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FalconAuthorizationFilter.class);
+
+    private boolean isAuthorizationEnabled;
+    private AuthorizationProvider authorizationProvider;
+
+    @Override
+    public void init(FilterConfig filterConfig) throws ServletException {
+        try {
+            isAuthorizationEnabled = SecurityUtil.isAuthorizationEnabled();
+            if (isAuthorizationEnabled) {
+                LOG.info("Falcon is running with authorization enabled");
+            }
+
+            authorizationProvider = SecurityUtil.getAuthorizationProvider();
+        } catch (FalconException e) {
+            throw new ServletException(e);
+        }
+    }
+
+    @Override
+    public void doFilter(ServletRequest request,
+                         ServletResponse response,
+                         FilterChain filterChain) throws IOException, ServletException {
+        HttpServletRequest httpRequest = (HttpServletRequest) request;
+
+        String pathInfo = httpRequest.getPathInfo();
+        String[] paths = getResourcesAndActions(pathInfo);
+        final String resource = paths[0];
+        final String action = paths[1];
+        final String entityType = paths.length > 2 ? paths[2] : null;
+        final String entityName = paths.length > 3 ? paths[3] : null;
+
+        if (isAuthorizationEnabled) {
+            LOG.info("Authorizing user={} against resource={}, action={}, entity name={}, "
+                + "entity type={}", CurrentUser.getUser(), resource, action, entityName, entityType);
+            authorizationProvider.authorizeResource(resource, action,
+                    entityType, entityName, CurrentUser.getProxyUgi());
+        }
+
+        filterChain.doFilter(request, response);
+    }
+
+    private static String[] getResourcesAndActions(String pathInfo) {
+        List<String> splits = new ArrayList<String>();
+        final String[] pathSplits = pathInfo.substring(1).split("/");
+        final String resource = pathSplits[0];
+        if (resource.equals("graphs")) {
+            splits.add(pathSplits[1]);  // resource
+            splits.add(pathSplits[2]);  // action
+        } else {
+            splits.add(pathSplits[0]);  // resource
+            splits.add(pathSplits[1]);  // action
+            if (pathSplits.length > 2) {  // entity type
+                splits.add(pathSplits[2]);
+            }
+            if (pathSplits.length > 3) {  // entity name
+                splits.add(pathSplits[3]);
+            }
+        }
+
+        return splits.toArray(new String[splits.size()]);
+    }
+
+    @Override
+    public void destroy() {
+        authorizationProvider = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/prism/src/main/webapp/WEB-INF/web.xml b/prism/src/main/webapp/WEB-INF/web.xml
index e7c6234..ba139aa 100644
--- a/prism/src/main/webapp/WEB-INF/web.xml
+++ b/prism/src/main/webapp/WEB-INF/web.xml
@@ -26,12 +26,22 @@
     <description>Apache Falcon Prism</description>
 
     <filter>
-        <filter-name>auth</filter-name>
-        <filter-class>org.apache.falcon.security.BasicAuthFilter</filter-class>
+        <filter-name>authentication</filter-name>
+        <filter-class>org.apache.falcon.security.FalconAuthenticationFilter</filter-class>
     </filter>
 
+    <filter>
+        <filter-name>authorization</filter-name>
+        <filter-class>org.apache.falcon.security.FalconAuthorizationFilter</filter-class>
+    </filter>
+
+    <filter-mapping>
+        <filter-name>authentication</filter-name>
+        <servlet-name>FalconProxyAPI</servlet-name>
+    </filter-mapping>
+
     <filter-mapping>
-        <filter-name>auth</filter-name>
+        <filter-name>authorization</filter-name>
         <servlet-name>FalconProxyAPI</servlet-name>
     </filter-mapping>
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
index 26c24c7..d664782 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
@@ -22,27 +22,18 @@ import com.tinkerpop.blueprints.Direction;
 import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.Vertex;
+import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interface;
-import org.apache.falcon.entity.v0.cluster.Interfaces;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
-import org.apache.falcon.entity.v0.process.Clusters;
 import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Inputs;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Workflow;
 import org.apache.falcon.metadata.LineageArgs;
 import org.apache.falcon.metadata.LineageRecorder;
 import org.apache.falcon.metadata.MetadataMappingService;
@@ -463,73 +454,36 @@ public class LineageMetadataResourceTest {
     }
 
     public void addClusterEntity() throws Exception {
-        clusterEntity = buildCluster(CLUSTER_ENTITY_NAME, COLO_NAME, "classification=production");
+        clusterEntity = EntityBuilderTestUtil.buildCluster(CLUSTER_ENTITY_NAME,
+                COLO_NAME, "classification=production");
         configStore.publish(EntityType.CLUSTER, clusterEntity);
     }
 
     public void addFeedEntity() throws Exception {
-        Feed impressionsFeed = buildFeed("impression-feed", clusterEntity, "classified-as=Secure", "analytics",
-                        Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}${MONTH}${DAY}");
+        Feed impressionsFeed = EntityBuilderTestUtil.buildFeed("impression-feed", clusterEntity,
+                "classified-as=Secure", "analytics");
+        addStorage(impressionsFeed, Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}${MONTH}${DAY}");
         configStore.publish(EntityType.FEED, impressionsFeed);
         inputFeeds.add(impressionsFeed);
 
-        Feed clicksFeed = buildFeed("clicks-feed", clusterEntity, null, null,
-                Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}${MONTH}${DAY}");
+        Feed clicksFeed = EntityBuilderTestUtil.buildFeed("clicks-feed", clusterEntity, null, null);
+        addStorage(clicksFeed, Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}${MONTH}${DAY}");
         configStore.publish(EntityType.FEED, clicksFeed);
         inputFeeds.add(clicksFeed);
 
-        Feed join1Feed = buildFeed("imp-click-join1", clusterEntity, "classified-as=Financial",
-                        "reporting,bi",
-                        Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+        Feed join1Feed = EntityBuilderTestUtil.buildFeed("imp-click-join1", clusterEntity,
+                "classified-as=Financial", "reporting,bi");
+        addStorage(join1Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
         configStore.publish(EntityType.FEED, join1Feed);
         outputFeeds.add(join1Feed);
 
-        Feed join2Feed = buildFeed("imp-click-join2", clusterEntity,
-                "classified-as=Secure,classified-as=Financial",
-                "reporting,bi", Storage.TYPE.FILESYSTEM,
-                "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
+        Feed join2Feed = EntityBuilderTestUtil.buildFeed("imp-click-join2", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "reporting,bi");
+        addStorage(join2Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
         configStore.publish(EntityType.FEED, join2Feed);
         outputFeeds.add(join2Feed);
     }
 
-    public static Cluster buildCluster(String name, String colo, String tags) {
-        Cluster cluster = new Cluster();
-        cluster.setName(name);
-        cluster.setColo(colo);
-        cluster.setTags(tags);
-
-        Interfaces interfaces = new Interfaces();
-        cluster.setInterfaces(interfaces);
-
-        Interface storage = new Interface();
-        storage.setEndpoint("jail://global:00");
-        storage.setType(Interfacetype.WRITE);
-        cluster.getInterfaces().getInterfaces().add(storage);
-
-        return cluster;
-    }
-
-    public static Feed buildFeed(String feedName, Cluster cluster, String tags, String groups,
-                                 Storage.TYPE storageType, String uriTemplate) {
-        Feed feed = new Feed();
-        feed.setName(feedName);
-        feed.setTags(tags);
-        feed.setGroups(groups);
-        feed.setFrequency(Frequency.fromString("hours(1)"));
-
-        org.apache.falcon.entity.v0.feed.Clusters
-                clusters = new org.apache.falcon.entity.v0.feed.Clusters();
-        feed.setClusters(clusters);
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
-                new org.apache.falcon.entity.v0.feed.Cluster();
-        feedCluster.setName(cluster.getName());
-        clusters.getClusters().add(feedCluster);
-
-        addStorage(feed, storageType, uriTemplate);
-
-        return feed;
-    }
-
     public static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
         if (storageType == Storage.TYPE.FILESYSTEM) {
             Locations locations = new Locations();
@@ -546,63 +500,17 @@ public class LineageMetadataResourceTest {
         }
     }
 
-    public static Process buildProcess(String processName, Cluster cluster,
-                                       String tags) throws Exception {
-        Process processEntity = new Process();
-        processEntity.setName(processName);
-        processEntity.setTags(tags);
-
-        org.apache.falcon.entity.v0.process.Cluster processCluster =
-                new org.apache.falcon.entity.v0.process.Cluster();
-        processCluster.setName(cluster.getName());
-        processEntity.setClusters(new Clusters());
-        processEntity.getClusters().getClusters().add(processCluster);
-
-        return processEntity;
-    }
-
-    public static void addWorkflow(Process process, String workflowName, String version) {
-        Workflow workflow = new Workflow();
-        workflow.setName(workflowName);
-        workflow.setVersion(version);
-        workflow.setEngine(EngineType.PIG);
-        workflow.setPath("/falcon/test/workflow");
-
-        process.setWorkflow(workflow);
-    }
-
-    public static void addInput(Process process, Feed feed) {
-        if (process.getInputs() == null) {
-            process.setInputs(new Inputs());
-        }
-
-        Inputs inputs = process.getInputs();
-        Input input = new Input();
-        input.setFeed(feed.getName());
-        inputs.getInputs().add(input);
-    }
-
-    public static void addOutput(Process process, Feed feed) {
-        if (process.getOutputs() == null) {
-            process.setOutputs(new Outputs());
-        }
-
-        Outputs outputs = process.getOutputs();
-        Output output = new Output();
-        output.setFeed(feed.getName());
-        outputs.getOutputs().add(output);
-    }
-
     public void addProcessEntity() throws Exception {
-        Process processEntity = buildProcess(PROCESS_ENTITY_NAME, clusterEntity, "classified-as=Critical");
-        addWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
+        Process processEntity = EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME,
+                clusterEntity, "classified-as=Critical");
+        EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
 
         for (Feed inputFeed : inputFeeds) {
-            addInput(processEntity, inputFeed);
+            EntityBuilderTestUtil.addInput(processEntity, inputFeed);
         }
 
         for (Feed outputFeed : outputFeeds) {
-            addOutput(processEntity, outputFeed);
+            EntityBuilderTestUtil.addOutput(processEntity, outputFeed);
         }
 
         configStore.publish(EntityType.PROCESS, processEntity);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java b/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java
new file mode 100644
index 0000000..787e528
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Test for FalconAuthenticationFilter using mock objects.
+ */
+public class FalconAuthenticationFilterTest {
+
+    @Mock
+    private HttpServletRequest mockRequest;
+
+    @Mock
+    private HttpServletResponse mockResponse;
+
+    @Mock
+    private FilterChain mockChain;
+
+    @Mock
+    private FilterConfig mockConfig;
+
+    @Mock
+    private UserGroupInformation mockUgi;
+
+    @BeforeClass
+    public void init() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @BeforeMethod
+    private void initAuthType() {
+        ConcurrentHashMap<String, String> conf = new ConcurrentHashMap<String, String>();
+        conf.put("type", "simple");
+        conf.put("config.prefix.type", "");
+        conf.put("anonymous.allowed", "true");
+        Mockito.when(mockConfig.getInitParameterNames()).thenReturn(conf.keys());
+
+        for (Map.Entry<String, String> entry : conf.entrySet()) {
+            Mockito.when(mockConfig.getInitParameter(entry.getKey())).thenReturn(entry.getValue());
+        }
+
+        Mockito.when(mockRequest.getMethod()).thenReturn("OPTIONS");
+
+        StringBuffer requestUrl = new StringBuffer("http://localhost");
+        Mockito.when(mockRequest.getRequestURL()).thenReturn(requestUrl);
+    }
+
+    @Test
+    public void testDoFilter() throws Exception {
+        Filter filter = new FalconAuthenticationFilter();
+        synchronized (StartupProperties.get()) {
+            filter.init(mockConfig);
+        }
+
+        CurrentUser.authenticate("nouser");
+        Assert.assertEquals(CurrentUser.getUser(), "nouser");
+
+        CurrentUser.authenticate("guest");
+        Mockito.when(mockRequest.getQueryString()).thenReturn("user.name=guest");
+        filter.doFilter(mockRequest, mockResponse, mockChain);
+        Assert.assertEquals(CurrentUser.getUser(), "guest");
+
+        CurrentUser.authenticate("nouser");
+        Assert.assertEquals(CurrentUser.getUser(), "nouser");
+        CurrentUser.authenticate("testuser");
+        Mockito.when(mockRequest.getRemoteUser()).thenReturn("testuser");
+        filter.doFilter(mockRequest, mockResponse, mockChain);
+        Assert.assertEquals(CurrentUser.getUser(), "testuser");
+    }
+
+    @Test
+    public void testAnonymous() throws Exception {
+        Filter filter = new FalconAuthenticationFilter();
+
+        synchronized (StartupProperties.get()) {
+            filter.init(mockConfig);
+        }
+
+        CurrentUser.authenticate("nouser");
+        Assert.assertEquals(CurrentUser.getUser(), "nouser");
+
+        CurrentUser.authenticate("testuser");
+        Mockito.when(mockRequest.getRemoteUser()).thenReturn("testuser");
+        filter.doFilter(mockRequest, mockResponse, mockChain);
+        Assert.assertEquals(CurrentUser.getUser(), "testuser");
+    }
+
+    @Test
+    public void testEmptyUser() throws Exception {
+        Filter filter = new FalconAuthenticationFilter();
+
+        synchronized (StartupProperties.get()) {
+            filter.init(mockConfig);
+        }
+
+        final String userName = System.getProperty("user.name");
+        try {
+            System.setProperty("user.name", "");
+
+            Mockito.when(mockRequest.getMethod()).thenReturn("POST");
+            Mockito.when(mockRequest.getQueryString()).thenReturn("");
+            Mockito.when(mockRequest.getRemoteUser()).thenReturn(null);
+
+            HttpServletResponse errorResponse = Mockito.mock(HttpServletResponse.class);
+            filter.doFilter(mockRequest, errorResponse, mockChain);
+        } finally {
+            System.setProperty("user.name", userName);
+        }
+    }
+
+    @Test
+    public void testDoFilterForClientBackwardsCompatibility() throws Exception {
+        Filter filter = new FalconAuthenticationFilter();
+
+        final String userName = System.getProperty("user.name");
+        final String httpAuthType =
+                StartupProperties.get().getProperty("falcon.http.authentication.type", "simple");
+        try {
+            System.setProperty("user.name", "");
+            StartupProperties.get().setProperty("falcon.http.authentication.type",
+                    "org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler");
+
+            synchronized (StartupProperties.get()) {
+                filter.init(mockConfig);
+            }
+
+            Mockito.when(mockRequest.getMethod()).thenReturn("POST");
+            Mockito.when(mockRequest.getQueryString()).thenReturn("");
+            Mockito.when(mockRequest.getRemoteUser()).thenReturn(null);
+            Mockito.when(mockRequest.getHeader("Remote-User")).thenReturn("remote-user");
+
+            filter.doFilter(mockRequest, mockResponse, mockChain);
+
+            Assert.assertEquals(CurrentUser.getUser(), "remote-user");
+
+        } finally {
+            System.setProperty("user.name", userName);
+            StartupProperties.get().setProperty("falcon.http.authentication.type", httpAuthType);
+        }
+    }
+
+    @Test
+    public void testGetKerberosPrincipalWithSubstitutedHostSecure() throws Exception {
+        String principal = StartupProperties.get().getProperty(FalconAuthenticationFilter.KERBEROS_PRINCIPAL);
+
+        String expectedPrincipal = "falcon/" + SecurityUtil.getLocalHostName() + "@Example.com";
+        try {
+            Configuration conf = new Configuration(false);
+            conf.set("hadoop.security.authentication", "kerberos");
+            UserGroupInformation.setConfiguration(conf);
+            Assert.assertTrue(UserGroupInformation.isSecurityEnabled());
+
+            StartupProperties.get().setProperty(
+                    FalconAuthenticationFilter.KERBEROS_PRINCIPAL, "falcon/_HOST@Example.com");
+            FalconAuthenticationFilter filter = new FalconAuthenticationFilter();
+            Properties properties = filter.getConfiguration(FalconAuthenticationFilter.FALCON_PREFIX, null);
+            Assert.assertEquals(
+                    properties.get(KerberosAuthenticationHandler.PRINCIPAL), expectedPrincipal);
+        } finally {
+            StartupProperties.get().setProperty(FalconAuthenticationFilter.KERBEROS_PRINCIPAL, principal);
+        }
+    }
+
+    @Test
+    public void testGetKerberosPrincipalWithSubstitutedHostNonSecure() throws Exception {
+        String principal = StartupProperties.get().getProperty(FalconAuthenticationFilter.KERBEROS_PRINCIPAL);
+        Configuration conf = new Configuration(false);
+        conf.set("hadoop.security.authentication", "simple");
+        UserGroupInformation.setConfiguration(conf);
+        Assert.assertFalse(UserGroupInformation.isSecurityEnabled());
+
+        FalconAuthenticationFilter filter = new FalconAuthenticationFilter();
+        Properties properties = filter.getConfiguration(FalconAuthenticationFilter.FALCON_PREFIX, null);
+        Assert.assertEquals(properties.get(KerberosAuthenticationHandler.PRINCIPAL), principal);
+    }
+}


[8/9] git commit: FALCON-557 Add super-user who is authorized for all. Contributed by Venkatesh Seetharam

Posted by ve...@apache.org.
FALCON-557 Add super-user who is authorized for all. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/84cc3684
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/84cc3684
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/84cc3684

Branch: refs/heads/master
Commit: 84cc3684de2b29bfd50fcaba755692dbf03cf478
Parents: 7b3e510
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Aug 8 10:20:47 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Aug 8 10:22:56 2014 -0700

----------------------------------------------------------------------
 .../security/DefaultAuthorizationProvider.java  | 106 ++++++++++++++-----
 common/src/main/resources/startup.properties    |   7 +-
 .../DefaultAuthorizationProviderTest.java       |  35 +++++-
 docs/src/site/twiki/Security.twiki              |  25 ++++-
 src/conf/startup.properties                     |   7 +-
 5 files changed, 145 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/84cc3684/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
index 5561429..c7e87f4 100644
--- a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
+++ b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
@@ -70,10 +70,26 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
     private static final String ADMIN_USERS_KEY = FALCON_PREFIX + "admin.users";
     private static final String ADMIN_GROUPS_KEY = FALCON_PREFIX + "admin.groups";
 
+    /**
+     * The super-user is the user with the same identity as falcon process itself.
+     * Loosely, if you started falcon, then you are the super-user.
+     */
+    protected static final String SUPER_USER = System.getProperty("user.name");
+
+    /**
+     * Constant for the configuration property that indicates the super user group.
+     */
+    private static final String SUPER_USER_GROUP_KEY = FALCON_PREFIX + "superusergroup";
+
+    /**
+     * Super ser group.
+     */
+    private String superUserGroup;
     private Set<String> adminUsers;
     private Set<String> adminGroups;
 
     public DefaultAuthorizationProvider() {
+        superUserGroup = StartupProperties.get().getProperty(SUPER_USER_GROUP_KEY);
         adminUsers = getAdminNamesFromConfig(ADMIN_USERS_KEY);
         adminGroups = getAdminNamesFromConfig(ADMIN_GROUPS_KEY);
     }
@@ -104,25 +120,42 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
                                   String entityType, String entityName,
                                   UserGroupInformation proxyUgi) throws AuthorizationException {
         Validate.notEmpty(resource, "Resource cannot be empty or null");
+        Validate.isTrue(RESOURCES.contains(resource), "Illegal resource: " + resource);
         Validate.notEmpty(action, "Action cannot be empty or null");
 
-        Set<String> groups = getGroupNames(proxyUgi);
-        String authenticatedUser = proxyUgi.getShortUserName();
-        LOG.info("Authorizing authenticatedUser={}, groups={} against resource={}, action={}, entity name={}, "
-                + "entity type={}", authenticatedUser, groups, resource, action, entityName, entityType);
+        LOG.info("Authorizing authenticatedUser={}, against resource={}, action={}, entity name={}, "
+                + "entity type={}", proxyUgi.getShortUserName(), resource, action, entityName, entityType);
+
+        if (isSuperUser(proxyUgi)) {
+            return;
+        }
 
         if ("admin".equals(resource)) {
-            authorizeAdminResource(authenticatedUser, groups, action);
+            if (!"version".equals(action)) {
+                authorizeAdminResource(proxyUgi, action);
+            }
         } else if ("entities".equals(resource) || "instance".equals(resource)) {
-            authorizeEntityResource(authenticatedUser, proxyUgi, entityName, entityType, action);
+            authorizeEntityResource(proxyUgi, entityName, entityType, action);
         } else if ("lineage".equals(resource)) {
-            authorizeLineageResource(authenticatedUser, action);
-        } else {
-            throw new AuthorizationException("Unknown resource: " + resource);
+            authorizeLineageResource(proxyUgi.getShortUserName(), action);
         }
     }
 
-    private Set<String> getGroupNames(UserGroupInformation proxyUgi) {
+    /**
+     * Determines if the authenticated user is the user who started this process
+     * or belongs to the super user group.
+     *
+     * @param authenticatedUGI UGI
+     * @return true if super user else false.
+     */
+    protected boolean isSuperUser(UserGroupInformation authenticatedUGI) {
+        final String authenticatedUser = authenticatedUGI.getShortUserName();
+        return SUPER_USER.equals(authenticatedUser)
+                || (!StringUtils.isEmpty(superUserGroup)
+                    && isUserInGroup(superUserGroup, authenticatedUGI));
+    }
+
+    protected Set<String> getGroupNames(UserGroupInformation proxyUgi) {
         HashSet<String> s = new HashSet<String>(Arrays.asList(proxyUgi.getGroupNames()));
         return Collections.unmodifiableSet(s);
     }
@@ -146,6 +179,10 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
         LOG.info("Authorizing authenticatedUser={}, action={}, entity={}, type{}",
                 authenticatedUser, action, entityName, entityType);
 
+        if (isSuperUser(proxyUgi)) {
+            return;
+        }
+        
         checkUser(entityName, acl.getOwner(), acl.getGroup(), action, authenticatedUser, proxyUgi);
     }
 
@@ -164,7 +201,7 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
                              String action, String authenticatedUser,
                              UserGroupInformation proxyUgi) throws AuthorizationException {
         if (isUserACLOwner(authenticatedUser, aclOwner)
-                || isUserInAclGroup(aclGroup, proxyUgi)) {
+                || isUserInGroup(aclGroup, proxyUgi)) {
             return;
         }
 
@@ -179,37 +216,52 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
         throw new AuthorizationException(message.toString());
     }
 
+    /**
+     * Determines if the authenticated user is the entity ACL owner.
+     *
+     * @param authenticatedUser authenticated user
+     * @param aclOwner          entity ACL owner
+     * @return true if authenticated user is the entity acl owner, false otherwise.
+     */
     protected boolean isUserACLOwner(String authenticatedUser, String aclOwner) {
         return authenticatedUser.equals(aclOwner);
     }
 
-    protected boolean isUserInAclGroup(String aclGroup, UserGroupInformation proxyUgi) {
+    /**
+     * Checks if the user's group matches the entity ACL group.
+     *
+     * @param group    Entity ACL group.
+     * @param proxyUgi proxy ugi for the authenticated user.
+     * @return true if user groups contains entity acl group.
+     */
+    protected boolean isUserInGroup(String group, UserGroupInformation proxyUgi) {
         Set<String> groups = getGroupNames(proxyUgi);
-        return groups.contains(aclGroup);
+        return groups.contains(group);
     }
 
     /**
      * Check if the user has admin privileges.
      *
-     * @param user   user name.
-     * @param groups groups that the user belongs to.
-     * @param action admin action on the resource
+     * @param proxyUgi proxy ugi for the authenticated user.
+     * @param action   admin action on the resource.
      * @throws AuthorizationException if the user does not have admin privileges.
      */
-    protected void authorizeAdminResource(String user, Set<String> groups,
+    protected void authorizeAdminResource(UserGroupInformation proxyUgi,
                                           String action) throws AuthorizationException {
-        LOG.debug("Authorizing user={} for admin, action={}", user, action);
-        if (adminUsers.contains(user) || isUserInAdminGroups(groups)) {
+        final String authenticatedUser = proxyUgi.getShortUserName();
+        LOG.debug("Authorizing user={} for admin, action={}", authenticatedUser, action);
+        if (adminUsers.contains(authenticatedUser) || isUserInAdminGroups(proxyUgi)) {
             return;
         }
 
         LOG.error("Permission denied: user {} does not have admin privilege for action={}",
-                user, action);
-        throw new AuthorizationException("Permission denied: user=" + user
+                authenticatedUser, action);
+        throw new AuthorizationException("Permission denied: user=" + authenticatedUser
                 + " does not have admin privilege for action=" + action);
     }
 
-    protected boolean isUserInAdminGroups(Set<String> groups) {
+    protected boolean isUserInAdminGroups(UserGroupInformation proxyUgi) {
+        Set<String> groups = getGroupNames(proxyUgi);
         boolean isUserGroupInAdmin = false;
         for (String group : groups) {
             if (adminGroups.contains(group)) {
@@ -221,13 +273,13 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
         return isUserGroupInAdmin;
     }
 
-    protected void authorizeEntityResource(String authenticatedUser, UserGroupInformation proxyUgi,
+    protected void authorizeEntityResource(UserGroupInformation proxyUgi,
                                            String entityName, String entityType,
                                            String action) throws AuthorizationException {
         Validate.notEmpty(entityType, "Entity type cannot be empty or null");
         LOG.debug("Authorizing authenticatedUser={} against entity/instance action={}, "
-                + "entity name={}, entity type={}", authenticatedUser, action, entityName,
-                entityType);
+                + "entity name={}, entity type={}",
+                proxyUgi.getShortUserName(), action, entityName, entityType);
 
         if (entityName != null) { // lifecycle actions
             Entity entity = getEntity(entityName, entityType);
@@ -260,8 +312,10 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
             return ((org.apache.falcon.entity.v0.process.Process) entity).getACL();
 
         default:
-            throw new AuthorizationException("Cannot get owner for entity: " + entity.getName());
+            break;
         }
+
+        throw new AuthorizationException("Cannot get owner for entity: " + entity.getName());
     }
 
     protected void authorizeLineageResource(String authenticatedUser, String action) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/84cc3684/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 808fed6..0d49b4b 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -142,11 +142,14 @@ debug.libext.process.paths=${falcon.libext}
 # Authorization Enabled flag: false (default)|true
 *.falcon.security.authorization.enabled=false
 
+# The name of the group of super-users
+*.falcon.security.authorization.superusergroup=falcon
+
 # Admin Users, comma separated users
-*.falcon.security.authorization.admin.users=falcon,ambari-qa,seetharam
+*.falcon.security.authorization.admin.users=falcon,ambari-qa
 
 # Admin Group Membership, comma separated users
-*.falcon.security.authorization.admin.groups=falcon,testgroup,staff
+*.falcon.security.authorization.admin.groups=falcon,staff
 
 # Authorization Provider Implementation Fully Qualified Class Name
 *.falcon.security.authorization.provider=org.apache.falcon.security.DefaultAuthorizationProvider

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/84cc3684/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java b/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
index 0fd869e..125aa85 100644
--- a/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
+++ b/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
@@ -121,6 +121,35 @@ public class DefaultAuthorizationProviderTest {
         }
     }
 
+    @Test
+    public void testAuthorizeAdminResourceVersionAction() throws Exception {
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "blah", realUser, new String[]{"blah-group", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("admin", "version", null, null, proxyUgi);
+    }
+
+    @Test
+    public void testAuthorizeSuperUser() throws Exception {
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                EntityBuilderTestUtil.USER, realUser, new String[]{"group", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("entities", "schedule", "feed", feedEntity.getName(), proxyUgi);
+        provider.authorizeResource("instance", "status", "feed", feedEntity.getName(), proxyUgi);
+    }
+
+    @Test
+    public void testAuthorizeSuperUserGroup() throws Exception {
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "blah", realUser, new String[]{"falcon", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("entities", "schedule", "feed", feedEntity.getName(), proxyUgi);
+        provider.authorizeResource("instance", "status", "feed", feedEntity.getName(), proxyUgi);
+    }
+
     @DataProvider(name = "adminResourceActions")
     private Object[][] createAdminResourceActions() {
         return new Object[][] {
@@ -175,7 +204,7 @@ public class DefaultAuthorizationProviderTest {
                 "admin-user", realUser, new String[]{"admin-group", });
 
         DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
-        provider.authorizeResource("admin", "version", null, null, proxyUgi);
+        provider.authorizeResource("admin", "stack", null, null, proxyUgi);
         Assert.fail("User does not belong to both admin-users not groups");
     }
 
@@ -236,7 +265,7 @@ public class DefaultAuthorizationProviderTest {
         provider.authorizeResource(resource, action, entityType, entityName, proxyUgi);
     }
 
-    @Test (expectedExceptions = AuthorizationException.class)
+    @Test (expectedExceptions = IllegalArgumentException.class)
     public void testAuthorizeBadResource() throws Exception {
         StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
         StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
@@ -324,7 +353,7 @@ public class DefaultAuthorizationProviderTest {
                 "admin", realUser, new String[]{"admin", });
 
         DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
-        provider.authorizeResource("entities", "status", "feed", processEntity.getName(), proxyUgi);
+        provider.authorizeResource("entities", "status", "process", feedEntity.getName(), proxyUgi);
         Assert.fail("Bad entity");
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/84cc3684/docs/src/site/twiki/Security.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Security.twiki b/docs/src/site/twiki/Security.twiki
index 4dc0f4d..f91c350 100644
--- a/docs/src/site/twiki/Security.twiki
+++ b/docs/src/site/twiki/Security.twiki
@@ -55,6 +55,19 @@ owner is the Owner of this entity.
 group is the one which has access to read.
 permission indicates the rwx is not enforced at this time.
 
+---+++ Super-User
+
+The super-user is the user with the same identity as falcon process itself. Loosely, if you
+started the falcon, then you are the super-user. The super-user can do anything in that
+permissions checks never fail for the super-user. There is no persistent notion of who was the
+super-user; when the falcon is started the process identity determines who is the super-user
+for now. The Falcon super-user does not have to be the super-user of the falcon host, nor is it
+necessary that all clusters have the same super-user. Also, an experimenter running Falcon on a
+personal workstation, conveniently becomes that installation's super-user without any configuration.
+
+Falcon also allows users to configure a super user group and allows users belonging to this
+group to be a super user.
+
 ---+++ Group Memberships
 
 Once a user has been authenticated and a username has been determined, the list of groups is
@@ -126,8 +139,8 @@ Only users belonging to admin users or groups have access to this resource. Admi
 determined by a static configuration parameter.
 
 | *Resource*                                             | *Description*                               | *Authorization*  |
+| [[restapi/AdminVersion][api/admin/version]]            | Get version of the server                   | No restriction   |
 | [[restapi/AdminStack][api/admin/stack]]                | Get stack of the server                     | Admin User/Group |
-| [[restapi/AdminVersion][api/admin/version]]            | Get version of the server                   | Admin User/Group |
 | [[restapi/AdminConfig][api/admin/config/:config-type]] | Get configuration information of the server | Admin User/Group |
 
 
@@ -230,7 +243,6 @@ startup configuration.
 *.falcon.security.authorization.enabled=true
 </verbatim>
 
-
 ---+++ Authorization Provider
 
 Falcon provides a basic implementation for Authorization bundled, org.apache.falcon.security .DefaultFalconAuthorizationProvider.
@@ -241,6 +253,15 @@ This can be overridden by custom implementations in the startup configuration.
 *.falcon.security.authorization.provider=org.apache.falcon.security.DefaultAuthorizationProvider
 </verbatim>
 
+---+++ Super User Group
+
+Super user group is determined by the configuration:
+
+<verbatim>
+# The name of the group of super-users
+*.falcon.security.authorization.superusergroup=falcon
+</verbatim>
+
 ---+++ Admin Membership
 
 Administrative users are determined by the configuration:

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/84cc3684/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 526656f..2c0bbbc 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -147,11 +147,14 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 # Authorization Enabled flag: false (default)|true
 *.falcon.security.authorization.enabled=false
 
+# The name of the group of super-users
+*.falcon.security.authorization.superusergroup=falcon
+
 # Admin Users, comma separated users
-*.falcon.security.authorization.admin.users=falcon,ambari-qa,seetharam
+*.falcon.security.authorization.admin.users=falcon,ambari-qa
 
 # Admin Group Membership, comma separated users
-*.falcon.security.authorization.admin.groups=falcon,testgroup,staff
+*.falcon.security.authorization.admin.groups=falcon,staff
 
 # Authorization Provider Implementation Fully Qualified Class Name
 *.falcon.security.authorization.provider=org.apache.falcon.security.DefaultAuthorizationProvider


[3/9] git commit: FALCON-463 Validate Feed ACL only if authorization is enabled. Contributed by Venkatesh Seetharam

Posted by ve...@apache.org.
FALCON-463 Validate Feed ACL only if authorization is enabled. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/a3c1ffe8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/a3c1ffe8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/a3c1ffe8

Branch: refs/heads/master
Commit: a3c1ffe844d1c8214ab9289e33e7a1312c9949f1
Parents: 909888f
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Aug 8 10:13:43 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Aug 8 10:13:43 2014 -0700

----------------------------------------------------------------------
 client/src/main/resources/jaxb-binding.xjb      |   4 +
 .../apache/falcon/entity/FileSystemStorage.java |   8 +-
 .../falcon/entity/parser/FeedEntityParser.java  |  28 +--
 .../falcon/entity/FileSystemStorageTest.java    |   6 +-
 .../entity/parser/FeedEntityParserTest.java     | 179 +++++++++++++++++++
 5 files changed, 204 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a3c1ffe8/client/src/main/resources/jaxb-binding.xjb
----------------------------------------------------------------------
diff --git a/client/src/main/resources/jaxb-binding.xjb b/client/src/main/resources/jaxb-binding.xjb
index b7d5359..f644f40 100644
--- a/client/src/main/resources/jaxb-binding.xjb
+++ b/client/src/main/resources/jaxb-binding.xjb
@@ -36,6 +36,10 @@
         <inheritance:extends>org.apache.falcon.entity.v0.Entity</inheritance:extends>
     </jaxb:bindings>
 
+    <jaxb:bindings schemaLocation="feed-0.1.xsd" node="//xs:complexType[@name='ACL']">
+        <inheritance:extends>org.apache.falcon.entity.v0.AccessControlList</inheritance:extends>
+    </jaxb:bindings>
+
     <jaxb:bindings schemaLocation="process-0.1.xsd" node="//xs:complexType[@name='process']">
         <inheritance:extends>org.apache.falcon.entity.v0.Entity</inheritance:extends>
     </jaxb:bindings>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a3c1ffe8/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 0d17f3d..326fccd 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -247,10 +247,10 @@ public class FileSystemStorage implements Storage {
                 if (fileSystem.exists(path)) {
                     FileStatus fileStatus = fileSystem.getFileStatus(path);
                     if (!fileStatus.getOwner().equals(owner)) {
-                        LOG.error("Feed ACL owner {} doesn't match the actual file owner {}",
-                                owner, fileStatus.getOwner());
-                        throw new FalconException("Feed ACL owner " + owner + " doesn't match the actual file owner "
-                                + fileStatus.getOwner());
+                        LOG.error("Feed ACL owner {} doesn't match the actual owner {} for file {}",
+                                owner, fileStatus.getOwner(), path);
+                        throw new FalconException("Feed ACL owner " + owner
+                                + " doesn't match the actual file owner " + fileStatus.getOwner());
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a3c1ffe8/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index e517e39..0bc5c8f 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -40,7 +40,6 @@ import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.group.FeedGroup;
 import org.apache.falcon.group.FeedGroupMap;
-import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.security.SecurityUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,7 +67,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
         }
 
         if (feed.getClusters() == null) {
-            throw new ValidationException("Feed should have atleast one cluster");
+            throw new ValidationException("Feed should have at least one cluster");
         }
 
         for (Cluster cluster : feed.getClusters().getClusters()) {
@@ -82,7 +81,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
         validateFeedStorage(feed);
         validateFeedPartitionExpression(feed);
         validateFeedGroups(feed);
-        validateUser(feed);
+        validateACL(feed);
 
         // Seems like a good enough entity object for a new one
         // But is this an update ?
@@ -105,14 +104,6 @@ public class FeedEntityParser extends EntityParser<Feed> {
         ensureValidityFor(feed, processes);
     }
 
-    protected void validateUser(Feed feed) throws ValidationException {
-        String owner = feed.getACL().getOwner();
-        if (!owner.equals(CurrentUser.getUser())) {
-            throw new ValidationException("Entity's owner " + owner + " is not same as current user "
-                + CurrentUser.getUser());
-        }
-    }
-
     private Set<Process> findProcesses(Set<Entity> referenced) {
         Set<Process> processes = new HashSet<Process>();
         for (Entity entity : referenced) {
@@ -314,9 +305,6 @@ public class FeedEntityParser extends EntityParser<Feed> {
      * Does not matter for FileSystem storage.
      */
     private void validateFeedStorage(Feed feed) throws FalconException {
-        validateUser(feed);
-        validateACL(feed);
-
         final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed);
         validateMultipleSourcesExist(feed, baseFeedStorageType);
         validateUniformStorageType(feed, baseFeedStorageType);
@@ -409,7 +397,19 @@ public class FeedEntityParser extends EntityParser<Feed> {
         }
     }
 
+    /**
+     * Validate ACL if authorization is enabled.
+     *
+     * @param feed Feed entity
+     * @throws ValidationException
+     */
     private void validateACL(Feed feed) throws FalconException {
+        if (!isAuthorizationEnabled()) {
+            return;
+        }
+
+        validateOwner(feed.getACL().getOwner());
+
         for (Cluster cluster : feed.getClusters().getClusters()) {
             org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
                     EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a3c1ffe8/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index 06f660d..47f63bc 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -167,11 +167,11 @@ public class FileSystemStorageTest {
         fs.mkdirs(path);
 
         FileSystemStorage storage = new FileSystemStorage(cluster.getConf().get("fs.default.name"), locations);
-        storage.validateACL(user, user, "rrr");
+        storage.validateACL(user, user, "0x755");
 
         //-ve case
         try {
-            storage.validateACL("random", user, "rrr");
+            storage.validateACL("random", user, "0x755");
             Assert.fail("Validation should have failed");
         } catch(FalconException e) {
             //expected exception
@@ -183,7 +183,7 @@ public class FileSystemStorageTest {
 
         //-ve case
         try {
-            storage.validateACL("random", user, "rrr");
+            storage.validateACL("random", user, "0x755");
             Assert.fail("Validation should have failed");
         } catch(FalconException e) {
             //expected exception

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a3c1ffe8/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index e2da982..e669f44 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.entity.parser;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.AbstractTestBase;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
@@ -32,6 +33,8 @@ import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.*;
 import org.apache.falcon.group.FeedGroupMapTest;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -50,6 +53,8 @@ import static org.testng.AssertJUnit.assertEquals;
  */
 public class FeedEntityParserTest extends AbstractTestBase {
 
+    public static final String USER = System.getProperty("user.name");
+
     private final FeedEntityParser parser = (FeedEntityParser) EntityParserFactory
             .getParser(EntityType.FEED);
 
@@ -60,6 +65,9 @@ public class FeedEntityParserTest extends AbstractTestBase {
         cleanupStore();
         ConfigurationStore store = ConfigurationStore.get();
 
+        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+        this.conf = dfsCluster.getConf();
+
         Unmarshaller unmarshaller = EntityType.CLUSTER.getUnmarshaller();
         Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass()
                 .getResourceAsStream(CLUSTER_XML));
@@ -85,10 +93,14 @@ public class FeedEntityParserTest extends AbstractTestBase {
     @Test(expectedExceptions = ValidationException.class)
     public void testValidateUser() throws Exception {
         CurrentUser.authenticate("unknown");
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
         try {
             parser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
         } finally {
             CurrentUser.authenticate("testuser");
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
         }
     }
 
@@ -535,4 +547,171 @@ public class FeedEntityParserTest extends AbstractTestBase {
         Assert.fail("An exception should have been thrown: Cluster should have registry interface defined with table"
                 + " storage");
     }
+
+    @Test
+    public void testValidateACLWithACLAndAuthorizationDisabled() throws Exception {
+        InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
+
+        Feed feed = parser.parse(stream);
+        Assert.assertNotNull(feed);
+        Assert.assertNotNull(feed.getACL());
+        Assert.assertNotNull(feed.getACL().getOwner());
+        Assert.assertNotNull(feed.getACL().getGroup());
+        Assert.assertNotNull(feed.getACL().getPermission());
+
+        parser.validate(feed);
+    }
+
+    @Test
+    public void testValidateACLAndOwner() throws Exception {
+        CurrentUser.authenticate("testuser");
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+
+        try {
+            InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
+
+            Feed feed = parser.parseAndValidate(stream);
+            Assert.assertNotNull(feed);
+            Assert.assertNotNull(feed.getACL());
+            Assert.assertNotNull(feed.getACL().getOwner());
+            Assert.assertNotNull(feed.getACL().getGroup());
+            Assert.assertNotNull(feed.getACL().getPermission());
+        } finally {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testValidateACLBadOwner() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+        CurrentUser.authenticate("blah");
+
+        try {
+            InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
+
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser =
+                    (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+            Feed feed = feedEntityParser.parse(stream);
+
+            Assert.assertNotNull(feed);
+            Assert.assertNotNull(feed.getACL());
+            Assert.assertNotNull(feed.getACL().getOwner());
+            Assert.assertNotNull(feed.getACL().getGroup());
+            Assert.assertNotNull(feed.getACL().getPermission());
+
+            feedEntityParser.validate(feed);
+            Assert.fail("Validation exception should have been thrown for invalid owner");
+        } finally {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testValidateACLBadOwnerAndGroup() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+        CurrentUser.authenticate("blah");
+
+        try {
+            InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
+
+            Feed feed = parser.parse(stream);
+
+            Assert.assertNotNull(feed);
+            Assert.assertNotNull(feed.getACL());
+            Assert.assertNotNull(feed.getACL().getOwner());
+            Assert.assertNotNull(feed.getACL().getGroup());
+            Assert.assertNotNull(feed.getACL().getPermission());
+
+            parser.validate(feed);
+            Assert.fail("Validation exception should have been thrown for invalid owner");
+        } finally {
+            CurrentUser.authenticate("testuser");
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testValidateACLAndStorageBadOwner() throws Exception {
+        CurrentUser.authenticate("testuser");
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+
+        Feed feed = null;
+        try {
+            InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
+
+            feed = parser.parse(stream);
+            Assert.assertNotNull(feed);
+            Assert.assertNotNull(feed.getACL());
+            Assert.assertNotNull(feed.getACL().getOwner());
+            Assert.assertNotNull(feed.getACL().getGroup());
+            Assert.assertNotNull(feed.getACL().getPermission());
+
+            // create locations
+            createLocations(feed);
+            parser.validate(feed);
+        } finally {
+            if (feed != null) {
+                deleteLocations(feed);
+            }
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test
+    public void testValidateACLAndStorage() throws Exception {
+        CurrentUser.authenticate(USER);
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+
+        Feed feed = null;
+        try {
+            InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
+
+            feed = parser.parse(stream);
+            Assert.assertNotNull(feed);
+            Assert.assertNotNull(feed.getACL());
+            Assert.assertNotNull(feed.getACL().getOwner());
+            Assert.assertNotNull(feed.getACL().getGroup());
+            Assert.assertNotNull(feed.getACL().getPermission());
+
+            feed.getACL().setOwner(USER);
+
+            // create locations
+            createLocations(feed);
+            parser.validate(feed);
+        } finally {
+            if (feed != null) {
+                deleteLocations(feed);
+            }
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    private void createLocations(Feed feed) throws IOException {
+        for (Location location : feed.getLocations().getLocations()) {
+            if (location.getType() == LocationType.DATA) {
+                dfsCluster.getFileSystem().create(new Path(location.getPath()));
+                break;
+            }
+        }
+    }
+
+    private void deleteLocations(Feed feed) throws IOException {
+        for (Location location : feed.getLocations().getLocations()) {
+            if (location.getType() == LocationType.DATA) {
+                dfsCluster.getFileSystem().delete(new Path(location.getPath()), true);
+                break;
+            }
+        }
+    }
 }


[7/9] git commit: FALCON-468 Add User Documentation for authorization feature. Contributed by Venkatesh Seetharam

Posted by ve...@apache.org.
FALCON-468 Add User Documentation for authorization feature. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/7b3e5107
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/7b3e5107
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/7b3e5107

Branch: refs/heads/master
Commit: 7b3e51079c6ac39808130e0a1df2e6adeb55db07
Parents: adca005
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Aug 8 10:17:35 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Aug 8 10:22:40 2014 -0700

----------------------------------------------------------------------
 docs/src/site/twiki/EntitySpecification.twiki |  82 +++++++---
 docs/src/site/twiki/FalconDocumentation.twiki |   5 +
 docs/src/site/twiki/Security.twiki            | 166 ++++++++++++++++++++-
 docs/src/site/twiki/index.twiki               |   2 +-
 4 files changed, 231 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7b3e5107/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index d387c9c..4eb4b17 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -17,7 +17,9 @@ The colo specifies the colo to which this cluster belongs to and name is the nam
 be unique.
 
 
-A cluster has varies interfaces as described below:
+---+++ Interfaces
+
+A cluster has various interfaces as described below:
 <verbatim>
     <interface type="readonly" endpoint="hftp://localhost:50010" version="0.20.2" />
 </verbatim>
@@ -56,15 +58,32 @@ Although Hive metastore supports both RPC and HTTP, Falcon comes with an impleme
 </verbatim>
 A messaging interface specifies the interface for sending feed availability messages, it's endpoint is broker url with tcp address.
 
+---+++ Locations
+
 A cluster has a list of locations defined:
 <verbatim>
 <location name="staging" path="/projects/falcon/staging" />
+<location name="working" path="/projects/falcon/working" />
 </verbatim>
 Location has the name and the path, name is the type of locations like staging, temp and working.
 and path is the hdfs path for each location.
 Falcon would use the location to do intermediate processing of entities in hdfs and hence Falcon
 should have read/write/execute permission on these locations.
 
+---+++ ACL
+
+A cluster has ACL (Access Control List) useful for implementing permission requirements
+and provide a way to set different permissions for specific users or named groups.
+<verbatim>
+    <ACL owner="test-user" group="test-group" permission="*"/>
+</verbatim>
+ACL indicates the Access control list for this cluster.
+owner is the Owner of this entity.
+group is the one which has access to read.
+permission indicates the permission.
+
+---+++ Custom Properties
+
 A cluster has a list of properties:
 A key-value pair, which are propagated to the workflow engine.
 <verbatim>
@@ -217,7 +236,19 @@ upto 8 hours then late-arrival's cut-off="hours(8)"
 
 *Note:* This will only apply for !FileSystem storage but not Table storage until a future time.
 
----++++ Custom Properties
+---+++ ACL
+
+A feed has ACL (Access Control List) useful for implementing permission requirements
+and provide a way to set different permissions for specific users or named groups.
+<verbatim>
+    <ACL owner="test-user" group="test-group" permission="*"/>
+</verbatim>
+ACL indicates the Access control list for this cluster.
+owner is the Owner of this entity.
+group is the one which has access to read.
+permission indicates the permission.
+
+---+++ Custom Properties
 
 <verbatim>
     <properties>
@@ -240,7 +271,7 @@ waiting for the feed instance and parallel decides the concurrent replication in
 A process defines configuration for a workflow. A workflow is a directed acyclic graph(DAG) which defines the job for the workflow engine. A process definition defines  the configurations required to run the workflow job. For example, process defines the frequency at which the workflow should run, the clusters on which the workflow should run, the inputs and outputs for the workflow, how the workflow failures should be handled, how the late inputs should be handled and so on.  
 
 The different details of process are:
----++++ Name
+---+++ Name
 Each process is identified with a unique name.
 Syntax:
 <verbatim>
@@ -249,7 +280,7 @@ Syntax:
 </process>
 </verbatim>
 
----++++ Cluster
+---+++ Cluster
 The cluster on which the workflow should run. A process should contain one or more clusters. Cluster definition for the cluster name gives the end points for workflow execution, name node, job tracker, messaging and so on. Each cluster inturn has validity mentioned, which tell the times between which the job should run on that specified cluster. 
 Syntax:
 <verbatim>
@@ -270,7 +301,7 @@ Syntax:
 </process>
 </verbatim>
 
----++++ Parallel
+---+++ Parallel
 Parallel defines how many instances of the workflow can run concurrently. It should be a positive integer > 0.
 For example, parallel of 1 ensures that only one instance of the workflow can run at a time. The next instance will start only after the running instance completes.
 Syntax:
@@ -282,7 +313,7 @@ Syntax:
 </process>
 </verbatim>
 
----++++ Order
+---+++ Order
 Order defines the order in which the ready instances are picked up. The possible values are FIFO(First In First Out), LIFO(Last In First Out), and ONLYLAST(Last Only).
 Syntax:
 <verbatim>
@@ -293,7 +324,7 @@ Syntax:
 </process>
 </verbatim>
 
----++++ Timeout
+---+++ Timeout
 A optional Timeout specifies the maximum time an instance waits for a dataset before being killed by the workflow engine, a time out is specified like frequency.
 If timeout is not specified, falcon computes a default timeout for a process based on its frequency, which is six times of the frequency of process or 30 minutes if computed timeout is less than 30 minutes.
 <verbatim>
@@ -304,7 +335,7 @@ If timeout is not specified, falcon computes a default timeout for a process bas
 </process>
 </verbatim>
 
----++++ Frequency
+---+++ Frequency
 Frequency defines how frequently the workflow job should run. For example, hours(1) defines the frequency as hourly, days(7) defines weekly frequency. The values for timeunit can be minutes/hours/days/months and the frequency number should be a positive integer > 0. 
 Syntax:
 <verbatim>
@@ -315,7 +346,7 @@ Syntax:
 </process>
 </verbatim>
 
----++++ Validity
+---+++ Validity
 Validity defines how long the workflow should run. It has 3 components - start time, end time and timezone. Start time and end time are timestamps defined in yyyy-MM-dd'T'HH:mm'Z' format and should always be in UTC. Timezone is used to compute the next instances starting from start time. The workflow will start at start time and end before end time specified on a given cluster. So, there will not be a workflow instance at end time.
 Syntax:
 <verbatim>
@@ -347,7 +378,7 @@ The daily workflow will start on Jan 1st 2012 at 00:40 UTC, it will run at 40th
 </verbatim>
 The hourly workflow will start on March 11th 2012 at 00:40 PST, the next instances will be at 01:40 PST, 03:40 PDT, 04:40 PDT and so on till 23:40 PDT. So, there will be just 23 instances of the workflow for March 11th 2012 because of DST switch.
 
----++++ Inputs
+---+++ Inputs
 Inputs define the input data for the workflow. The workflow job will start executing only after the schedule time and when all the inputs are available. There can be 0 or more inputs and each of the input maps to a feed. The path and frequency of input data is picked up from feed definition. Each input should also define start and end instances in terms of [[FalconDocumentation][EL expressions]] and can optionally specify specific partition of input that the workflow requires. The components in partition should be subset of partitions defined in the feed.
 
 For each input, Falcon will create a property with the input name that contains the comma separated list of input paths. This property can be used in workflow actions like pig scripts and so on.
@@ -447,7 +478,7 @@ Example workflow configuration:
 </verbatim>
 
 
----++++ Optional Inputs
+---+++ Optional Inputs
 User can mention one or more inputs as optional inputs. In such cases the job does not wait on those inputs which are
 mentioned as optional. If they are present it considers them otherwise continue with the compulsory ones.
 Example:
@@ -477,7 +508,7 @@ Example:
 *Note:* This is only supported for !FileSystem storage but not Table storage at this point.
 
 
----++++ Outputs
+---+++ Outputs
 Outputs define the output data that is generated by the workflow. A process can define 0 or more outputs. Each output is mapped to a feed and the output path is picked up from feed definition. The output instance that should be generated is specified in terms of [[FalconDocumentation][EL expression]].
 
 For each output, Falcon creates a property with output name that contains the path of output data. This can be used in workflows to store in the path.
@@ -561,7 +592,7 @@ Example workflow configuration:
 </configuration>
 </verbatim>
 
----++++ Properties
+---+++ Custom Properties
 The properties are key value pairs that are passed to the workflow. These properties are optional and can be used
 in workflow to parameterize the workflow.
 Syntax:
@@ -582,7 +613,7 @@ queueName and jobPriority are special properties, which when present are used by
         <property name="jobPriority" value="VERY_HIGH"/>
 </verbatim>
 
----++++ Workflow
+---+++ Workflow
 
 The workflow defines the workflow engine that should be used and the path to the workflow on hdfs.
 The workflow definition on hdfs contains the actual job that should run and it should confirm to
@@ -594,7 +625,7 @@ be available for the workflow.
 
 There are 2 engines supported today.
 
----+++++ Oozie
+---++++ Oozie
 
 As part of oozie workflow engine support, users can embed a oozie workflow.
 Refer to oozie [[http://oozie.apache.org/docs/4.0.0/DG_Overview.html][workflow overview]] and
@@ -621,7 +652,7 @@ Example:
 This defines the workflow engine to be oozie and the workflow xml is defined at
 /projects/bootcamp/workflow/workflow.xml. The libraries are at /projects/bootcamp/workflow/lib.
 
----+++++ Pig
+---++++ Pig
 
 Falcon also adds the Pig engine which enables users to embed a Pig script as a process.
 
@@ -640,7 +671,7 @@ This defines the workflow engine to be pig and the pig script is defined at
 Feeds with Hive table storage will send one more parameter apart from the general ones:
 <verbatim>$input_filter</verbatim>
 
----+++++ Hive
+---++++ Hive
 
 Falcon also adds the Hive engine as part of Hive Integration which enables users to embed a Hive script as a process.
 This would enable users to create materialized queries in a declarative way.
@@ -660,7 +691,7 @@ This defines the workflow engine to be hive and the hive script is defined at
 Feeds with Hive table storage will send one more parameter apart from the general ones:
 <verbatim>$input_filter</verbatim>
 
----++++ Retry
+---+++ Retry
 Retry policy defines how the workflow failures should be handled. Two retry policies are defined: backoff and exp-backoff(exponential backoff). Depending on the delay and number of attempts, the workflow is re-tried after specific intervals.
 Syntax:
 <verbatim>
@@ -681,7 +712,7 @@ Examples:
 </verbatim>
 The workflow is re-tried after 10 mins, 20 mins and 30 mins. With exponential backoff, the workflow will be re-tried after 10 mins, 20 mins and 40 mins.
 
----++++ Late data
+---+++ Late data
 Late data handling defines how the late data should be handled. Each feed is defined with a late cut-off value which specifies the time till which late data is valid. For example, late cut-off of hours(6) means that data for nth hour can get delayed by upto 6 hours. Late data specification in process defines how this late data is handled.
 
 Late data policy defines how frequently check is done to detect late data. The policies supported are: backoff, exp-backoff(exponention backoff) and final(at feed's late cut-off). The policy along with delay defines the interval at which late data check is done.
@@ -724,3 +755,16 @@ Example:
 This late handling specifies that late data detection should run at feed's late cut-off which is 6 hours in this case. If there is late data, Falcon should run the workflow specified at /projects/bootcamp/workflow/lateinput1/workflow.xml
 
 *Note:* This is only supported for !FileSystem storage but not Table storage at this point.
+
+---+++ ACL
+
+A process has ACL (Access Control List) useful for implementing permission requirements
+and provide a way to set different permissions for specific users or named groups.
+<verbatim>
+    <ACL owner="test-user" group="test-group" permission="*"/>
+</verbatim>
+ACL indicates the Access control list for this cluster.
+owner is the Owner of this entity.
+group is the one which has access to read.
+permission indicates the permission.
+

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7b3e5107/docs/src/site/twiki/FalconDocumentation.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconDocumentation.twiki b/docs/src/site/twiki/FalconDocumentation.twiki
index 267e6d1..8d2e10e 100644
--- a/docs/src/site/twiki/FalconDocumentation.twiki
+++ b/docs/src/site/twiki/FalconDocumentation.twiki
@@ -13,6 +13,7 @@
    * <a href="#Alerting_and_Monitoring">Alerting and Monitoring</a>
    * <a href="#Falcon_EL_Expressions">Falcon EL Expressions</a>
    * <a href="#Lineage">Lineage</a>
+   * <a href="#Security">Security</a>
 
 ---++ Architecture
 ---+++ Introduction
@@ -743,3 +744,7 @@ config value: org.apache.falcon.metadata.MetadataMappingService
 
 Lineage is only captured for Process executions. A future release will capture lineage for
 lifecycle policies such as replication and retention.
+
+--++ Security
+
+Security is detailed in [[Security][Security]].

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7b3e5107/docs/src/site/twiki/Security.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Security.twiki b/docs/src/site/twiki/Security.twiki
index c1f7656..4dc0f4d 100644
--- a/docs/src/site/twiki/Security.twiki
+++ b/docs/src/site/twiki/Security.twiki
@@ -2,6 +2,12 @@
 
 ---++ Overview
 
+Apache Falcon enforces authentication and authorization which are detailed below. Falcon also
+provides transport level security ensuring data confidentiality and integrity.
+
+
+---++ Authentication (User Identity)
+
 Apache Falcon enforces authentication on protected resources. Once authentication has been established it sets a
 signed HTTP Cookie that contains an authentication token with the user name, user principal,
 authentication type and expiration time.
@@ -12,20 +18,127 @@ for HTTP. Hadoop Auth also supports additional authentication mechanisms on the
 simple interfaces.
 
 
----++ Authentication Methods
+---+++ Authentication Methods
 
 It supports 2 authentication methods, simple and kerberos out of the box.
 
----+++ Pseudo/Simple Authentication
+---++++ Pseudo/Simple Authentication
 
 Falcon authenticates the user by simply trusting the value of the query string parameter 'user.name'. This is the
 default mode Falcon is configured with.
 
----+++ Kerberos Authentication
+---++++ Kerberos Authentication
 
 Falcon uses HTTP Kerberos SPNEGO to authenticate the user.
 
----++ Server Side Configuration Setup
+
+---++ Authorization
+
+Falcon also enforces authorization on Entities using ACLs (Access Control Lists). ACLs are useful
+for implementing permission requirements and provide a way to set different permissions for
+specific users or named groups.
+
+By default, support for authorization is disabled and can be enabled in startup.properties.
+
+---+++ ACLs in Entity
+
+All Entities now have ACL which needs to be present if authorization is enabled. Only owners who
+own or created the entity will be allowed to update or delete their entities.
+
+An entity has ACLs (Access Control Lists) that are useful for implementing permission requirements
+and provide a way to set different permissions for specific users or named groups.
+<verbatim>
+    <ACL owner="test-user" group="test-group" permission="*"/>
+</verbatim>
+ACL indicates the Access control list for this cluster.
+owner is the Owner of this entity.
+group is the one which has access to read.
+permission indicates the rwx is not enforced at this time.
+
+---+++ Group Memberships
+
+Once a user has been authenticated and a username has been determined, the list of groups is
+determined by a group mapping service, configured by the hadoop.security.group.mapping property
+in Hadoop. The default implementation, org.apache.hadoop.security.ShellBasedUnixGroupsMapping,
+will shell out to the Unix bash -c groups command to resolve a list of groups for a user.
+
+Note that Falcon stores the user and group of an Entity as strings; there is no
+conversion from user and group identity numbers as is conventional in Unix.
+
+---+++ Authorization Provider
+
+Falcon provides a plugin-able provider interface for Authorization. It also ships with a default
+implementation that enforces the following authorization policy.
+
+---++++ Entity and Instance Management Operations Policy
+
+* All Entity and Instance operations are authorized for users who created them, Owners and users
+with group memberships
+* Reference to entities with in a feed or process is allowed with out enforcing permissions
+Any Feed or Process can refer to a Cluster entity not owned by the Feed or Process owner
+Any Process can refer to a Feed entity not owned by the Process owner
+
+The authorization is enforced in the following way:
+
+if admin resource,
+     if authenticated user name matches the admin users configuration
+     Else if groups of the authenticated user matches the admin groups configuration
+     Else authorization exception is thrown
+Else if entities or instance resource
+     if the authenticated user matches the owner in ACL for the entity
+     Else if the groups of the authenticated user matches the group in ACL for the entity
+     Else authorization exception is thrown
+Else if lineage resource
+     All have read-only permissions, reason being folks should be able to examine the dependency
+     and allow reuse
+
+
+*operations on Entity Resource*
+
+| *Resource*                                                                          | *Description*                      | *Authorization* |
+| [[restapi/EntityValidate][api/entities/validate/:entity-type]]                      | Validate the entity                | Owner/Group     |
+| [[restapi/EntitySubmit][api/entities/submit/:entity-type]]                          | Submit the entity                  | Owner/Group     |
+| [[restapi/EntityUpdate][api/entities/update/:entity-type/:entity-name]]             | Update the entity                  | Owner/Group     |
+| [[restapi/EntitySubmitAndSchedule][api/entities/submitAndSchedule/:entity-type]]    | Submit & Schedule the entity       | Owner/Group     |
+| [[restapi/EntitySchedule][api/entities/schedule/:entity-type/:entity-name]]         | Schedule the entity                | Owner/Group     |
+| [[restapi/EntitySuspend][api/entities/suspend/:entity-type/:entity-name]]           | Suspend the entity                 | Owner/Group     |
+| [[restapi/EntityResume][api/entities/resume/:entity-type/:entity-name]]             | Resume the entity                  | Owner/Group     |
+| [[restapi/EntityDelete][api/entities/delete/:entity-type/:entity-name]]             | Delete the entity                  | Owner/Group     |
+| [[restapi/EntityStatus][api/entities/status/:entity-type/:entity-name]]             | Get the status of the entity       | Owner/Group     |
+| [[restapi/EntityDefinition][api/entities/definition/:entity-type/:entity-name]]     | Get the definition of the entity   | Owner/Group     |
+| [[restapi/EntityList][api/entities/list/:entity-type?fields=:fields]]               | Get the list of entities           | Owner/Group     |
+| [[restapi/EntityDependencies][api/entities/dependencies/:entity-type/:entity-name]] | Get the dependencies of the entity | Owner/Group     |
+
+*REST Call on Feed and Process Instances*
+
+| *Resource*                                                                  | *Description*                | *Authorization* |
+| [[restapi/InstanceRunning][api/instance/running/:entity-type/:entity-name]] | List of running instances.   | Owner/Group     |
+| [[restapi/InstanceStatus][api/instance/status/:entity-type/:entity-name]]   | Status of a given instance   | Owner/Group     |
+| [[restapi/InstanceKill][api/instance/kill/:entity-type/:entity-name]]       | Kill a given instance        | Owner/Group     |
+| [[restapi/InstanceSuspend][api/instance/suspend/:entity-type/:entity-name]] | Suspend a running instance   | Owner/Group     |
+| [[restapi/InstanceResume][api/instance/resume/:entity-type/:entity-name]]   | Resume a given instance      | Owner/Group     |
+| [[restapi/InstanceRerun][api/instance/rerun/:entity-type/:entity-name]]     | Rerun a given instance       | Owner/Group     |
+| [[InstanceLogs][api/instance/logs/:entity-type/:entity-name]]               | Get logs of a given instance | Owner/Group     |
+
+---++++ Admin Resources Policy
+
+Only users belonging to admin users or groups have access to this resource. Admin membership is
+determined by a static configuration parameter.
+
+| *Resource*                                             | *Description*                               | *Authorization*  |
+| [[restapi/AdminStack][api/admin/stack]]                | Get stack of the server                     | Admin User/Group |
+| [[restapi/AdminVersion][api/admin/version]]            | Get version of the server                   | Admin User/Group |
+| [[restapi/AdminConfig][api/admin/config/:config-type]] | Get configuration information of the server | Admin User/Group |
+
+
+---++++ Lineage Resource Policy
+
+Lineage is read-only and hence all users can look at lineage for their respective entities.
+
+
+---++ Authentication Configuration
+
+Following is the Server Side Configuration Setup for Authentication.
 
 ---+++ Common Configuration Parameters
 
@@ -105,6 +218,51 @@ Falcon uses HTTP Kerberos SPNEGO to authenticate the user.
 *.falcon.http.authentication.blacklisted.users=
 </verbatim>
 
+---++ Authorization Configuration
+
+---+++ Enabling Authorization
+By default, support for authorization is disabled and specifying ACLs in entities are optional.
+To enable support for authorization, set falcon.security.authorization.enabled to true in the
+startup configuration.
+
+<verbatim>
+# Authorization Enabled flag: false|true
+*.falcon.security.authorization.enabled=true
+</verbatim>
+
+
+---+++ Authorization Provider
+
+Falcon provides a basic implementation for Authorization bundled, org.apache.falcon.security .DefaultFalconAuthorizationProvider.
+This can be overridden by custom implementations in the startup configuration.
+
+<verbatim>
+# Authorization Provider Fully Qualified Class Name
+*.falcon.security.authorization.provider=org.apache.falcon.security.DefaultAuthorizationProvider
+</verbatim>
+
+---+++ Admin Membership
+
+Administrative users are determined by the configuration:
+
+<verbatim>
+# Admin Users, comma separated users
+*.falcon.security.authorization.admin.users=falcon,ambari-qa,seetharam
+</verbatim>
+
+Administrative groups are determined by the configuration:
+
+<verbatim>
+# Admin Group Membership, comma separated users
+*.falcon.security.authorization.admin.groups=falcon,testgroup,staff
+</verbatim>
+
+
+---++ SSL
+
+Falcon provides transport level security ensuring data confidentiality and integrity. This is
+enabled by default for communicating over HTTP between the client and the server.
+
 ---+++ SSL Configuration
 
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7b3e5107/docs/src/site/twiki/index.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/index.twiki b/docs/src/site/twiki/index.twiki
index e7917c5..7437280 100644
--- a/docs/src/site/twiki/index.twiki
+++ b/docs/src/site/twiki/index.twiki
@@ -33,7 +33,7 @@ describes various options for the command line utility provided by Falcon.
 
 Falcon provides OOTB [[HiveIntegration][lifecycle management for Tables in Hive (HCatalog)]]
 such as table replication for BCP and table eviction. Falcon also enforces
-[[Security][kerberos authentication]] on protected resources and enables SSL.
+[[Security][Security]] on protected resources and enables SSL.
 
 #LicenseInfo
 ---+ Licensing Information


[2/9] git commit: FALCON-462 Add ACL for process entity. Contributed by Venkatesh Seetharam

Posted by ve...@apache.org.
FALCON-462 Add ACL for process entity. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/909888f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/909888f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/909888f9

Branch: refs/heads/master
Commit: 909888f9b6703ac4daffeac1605eb78408ccf586
Parents: b183060
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Aug 8 10:13:05 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Aug 8 10:13:05 2014 -0700

----------------------------------------------------------------------
 client/src/main/resources/jaxb-binding.xjb      |  4 +
 client/src/main/resources/process-0.1.xsd       | 15 ++++
 .../entity/parser/ProcessEntityParser.java      | 57 +++++++-----
 .../entity/parser/ProcessEntityParserTest.java  | 95 ++++++++++++++++++++
 .../resources/config/process/process-table.xml  |  3 +
 5 files changed, 154 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/909888f9/client/src/main/resources/jaxb-binding.xjb
----------------------------------------------------------------------
diff --git a/client/src/main/resources/jaxb-binding.xjb b/client/src/main/resources/jaxb-binding.xjb
index e51ccb7..b7d5359 100644
--- a/client/src/main/resources/jaxb-binding.xjb
+++ b/client/src/main/resources/jaxb-binding.xjb
@@ -40,6 +40,10 @@
         <inheritance:extends>org.apache.falcon.entity.v0.Entity</inheritance:extends>
     </jaxb:bindings>
 
+    <jaxb:bindings schemaLocation="process-0.1.xsd" node="//xs:complexType[@name='ACL']">
+        <inheritance:extends>org.apache.falcon.entity.v0.AccessControlList</inheritance:extends>
+    </jaxb:bindings>
+
     <jaxb:globalBindings>
         <xjc:simple/>
     </jaxb:globalBindings>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/909888f9/client/src/main/resources/process-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd
index 0d0f0e9..b1bd426 100644
--- a/client/src/main/resources/process-0.1.xsd
+++ b/client/src/main/resources/process-0.1.xsd
@@ -155,6 +155,7 @@
                     </xs:documentation>
                 </xs:annotation>
             </xs:element>
+            <xs:element type="ACL" name="ACL" minOccurs="0"/>
         </xs:sequence>
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
     </xs:complexType>
@@ -352,4 +353,18 @@
             <xs:pattern value="(\w+=[^,]+)?([,]?[ ]*[\w]+=[^,]+)*"/>
         </xs:restriction>
     </xs:simpleType>
+
+    <xs:complexType name="ACL">
+        <xs:annotation>
+            <xs:documentation>
+                Access control list for this process.
+                owner is the Owner of this entity.
+                group is the one which has access to read - not used at this time.
+                permission is not enforced at this time
+            </xs:documentation>
+        </xs:annotation>
+        <xs:attribute type="xs:string" name="owner"/>
+        <xs:attribute type="xs:string" name="group"/>
+        <xs:attribute type="xs:string" name="permission"/>
+    </xs:complexType>
 </xs:schema>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/909888f9/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 837b86a..7da86a7 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -18,14 +18,6 @@
 
 package org.apache.falcon.entity.parser;
 
-import java.net.ConnectException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
@@ -35,6 +27,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.ACL;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Inputs;
 import org.apache.falcon.entity.v0.process.LateInput;
@@ -46,6 +39,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+
 /**
  * Concrete Parser which has XML parsing and validation logic for Process XML.
  */
@@ -96,6 +97,7 @@ public class ProcessEntityParser extends EntityParser<Process> {
         }
         validateDatasetName(process.getInputs(), process.getOutputs());
         validateLateInputs(process);
+        validateACL(process);
     }
 
     /**
@@ -106,8 +108,8 @@ public class ProcessEntityParser extends EntityParser<Process> {
      * @throws FalconException
      */
     private void validateHDFSPaths(Process process, String clusterName) throws FalconException {
-        org.apache.falcon.entity.v0.cluster.Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER,
-                clusterName);
+        org.apache.falcon.entity.v0.cluster.Cluster cluster =
+                ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
 
         if (!EntityUtil.responsibleFor(cluster.getColo())) {
             return;
@@ -115,7 +117,7 @@ public class ProcessEntityParser extends EntityParser<Process> {
 
         String workflowPath = process.getWorkflow().getPath();
         String libPath = process.getWorkflow().getLib();
-        String nameNode = getNameNode(cluster, clusterName);
+        String nameNode = getNameNode(cluster);
         try {
             Configuration configuration = ClusterHelper.getConfiguration(cluster);
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(configuration);
@@ -127,21 +129,16 @@ public class ProcessEntityParser extends EntityParser<Process> {
             if (libPath != null && !fs.exists(new Path(libPath))) {
                 throw new ValidationException("Lib path: " + libPath + " does not exists in HDFS: " + nameNode);
             }
-        } catch (ValidationException e) {
-            throw new ValidationException(e);
-        } catch (ConnectException e) {
-            throw new ValidationException(
-                    "Unable to connect to Namenode: " + nameNode + " referenced in cluster: " + clusterName);
-        } catch (Exception e) {
-            throw new FalconException(e);
+        } catch (IOException e) {
+            throw new FalconException("Error validating workflow path " + workflowPath, e);
         }
     }
 
-    private String getNameNode(Cluster cluster, String clusterName) throws ValidationException {
+    private String getNameNode(Cluster cluster) throws ValidationException {
         // cluster should never be null as it is validated while submitting feeds.
         if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme() == null) {
             throw new ValidationException(
-                    "Cannot get valid nameNode scheme from write interface of cluster: " + clusterName);
+                    "Cannot get valid nameNode scheme from write interface of cluster: " + cluster.getName());
         }
         return ClusterHelper.getStorageUrl(cluster);
     }
@@ -226,4 +223,24 @@ public class ProcessEntityParser extends EntityParser<Process> {
                     + input.getName());
         }
     }
+
+    /**
+     * Validate ACL if authorization is enabled.
+     *
+     * @param process process entity
+     * @throws ValidationException
+     */
+    private void validateACL(Process process) throws FalconException {
+        if (!isAuthorizationEnabled()) {
+            return;
+        }
+
+        // Validate the entity owner is logged-in, authenticated user if authorization is enabled
+        ACL processACL = process.getACL();
+        if (processACL == null) {
+            throw new ValidationException("Process ACL cannot be empty for:  " + process.getName());
+        }
+
+        validateOwner(processACL.getOwner());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/909888f9/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
index cd6c713..93c34d2 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
@@ -28,6 +28,8 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -39,6 +41,7 @@ import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 import javax.xml.bind.Unmarshaller;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
@@ -330,4 +333,96 @@ public class ProcessEntityParserTest extends AbstractTestBase {
         parser.validate(process);
         Assert.fail("An exception should have been thrown since Input partitions are not supported for table storage");
     }
+
+    @Test
+    public void testValidateACLWithNoACLAndAuthorizationDisabled() throws Exception {
+        InputStream stream = this.getClass().getResourceAsStream(PROCESS_XML);
+
+        Process process = parser.parse(stream);
+        Assert.assertNotNull(process);
+        Assert.assertNull(process.getACL());
+
+        parser.validate(process);
+    }
+
+    @Test
+    public void testValidateACLWithACLAndAuthorizationDisabled() throws Exception {
+        InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml");
+
+        Process process = parser.parse(stream);
+        Assert.assertNotNull(process);
+        Assert.assertNotNull(process.getACL());
+        Assert.assertNotNull(process.getACL().getOwner());
+        Assert.assertNotNull(process.getACL().getGroup());
+        Assert.assertNotNull(process.getACL().getPermission());
+
+        parser.validate(process);
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testValidateACLWithNoACLAndAuthorizationEnabled() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+        CurrentUser.authenticate("falcon");
+
+        try {
+            InputStream stream = this.getClass().getResourceAsStream(PROCESS_XML);
+
+            Process process = parser.parse(stream);
+            Assert.assertNotNull(process);
+            Assert.assertNull(process.getACL());
+
+            parser.validate(process);
+            Assert.fail("Validation exception should have been thrown for empty ACL");
+        } finally {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test
+    public void testValidateACLAuthorizationEnabled() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+        CurrentUser.authenticate("falcon");
+
+        try {
+            InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml");
+
+            Process process = parser.parseAndValidate(stream);
+            Assert.assertNotNull(process);
+            Assert.assertNotNull(process.getACL());
+            Assert.assertNotNull(process.getACL().getOwner());
+            Assert.assertNotNull(process.getACL().getGroup());
+            Assert.assertNotNull(process.getACL().getPermission());
+        } finally {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testValidateACLAuthorizationEnabledBadOwner() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+        CurrentUser.authenticate("blah");
+
+        try {
+            InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml");
+
+            Process process = parser.parse(stream);
+
+            Assert.assertNotNull(process);
+            Assert.assertNotNull(process.getACL());
+            Assert.assertNotNull(process.getACL().getOwner());
+            Assert.assertNotNull(process.getACL().getGroup());
+            Assert.assertNotNull(process.getACL().getPermission());
+
+            parser.validate(process);
+            Assert.fail("Validation exception should have been thrown for invalid owner");
+        } finally {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/909888f9/common/src/test/resources/config/process/process-table.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/process/process-table.xml b/common/src/test/resources/config/process/process-table.xml
index 9408973..bcaf28e 100644
--- a/common/src/test/resources/config/process/process-table.xml
+++ b/common/src/test/resources/config/process/process-table.xml
@@ -43,4 +43,7 @@
     <workflow engine="oozie" path="/falcon/test/workflow"/>
 
     <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+
+    <!-- ACL -->
+    <ACL owner="falcon" group="falcon" permission="*"/>
 </process>


[6/9] git commit: FALCON-464 Enforce Authorization for REST API. Contributed by Venkatesh Seetharam

Posted by ve...@apache.org.
FALCON-464 Enforce Authorization for REST API. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/adca0057
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/adca0057
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/adca0057

Branch: refs/heads/master
Commit: adca0057d279022a9f32900c64ecca8d9f70f8a6
Parents: a3c1ffe
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Aug 8 10:17:08 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Aug 8 10:17:08 2014 -0700

----------------------------------------------------------------------
 client/src/main/resources/feed-0.1.xsd          |   2 +-
 client/src/main/resources/process-0.1.xsd       |   2 +-
 .../apache/falcon/entity/CatalogStorage.java    |   3 +-
 .../apache/falcon/entity/FileSystemStorage.java |  22 +-
 .../java/org/apache/falcon/entity/Storage.java  |   7 +-
 .../entity/parser/ClusterEntityParser.java      |   9 +-
 .../falcon/entity/parser/EntityParser.java      |  46 +--
 .../falcon/entity/parser/FeedEntityParser.java  |  13 +-
 .../entity/parser/ProcessEntityParser.java      |   9 +-
 .../falcon/entity/store/ConfigurationStore.java |   2 +-
 .../falcon/hadoop/HadoopClientFactory.java      |  32 +-
 .../falcon/security/AuthorizationProvider.java  |  62 ++++
 .../org/apache/falcon/security/CurrentUser.java |  37 ++
 .../security/DefaultAuthorizationProvider.java  | 271 ++++++++++++++
 .../apache/falcon/security/SecurityUtil.java    |  41 +-
 common/src/main/resources/startup.properties    |  10 +
 .../apache/falcon/entity/AbstractTestBase.java  |  15 +
 .../falcon/entity/FileSystemStorageTest.java    |  48 ++-
 .../entity/parser/ClusterEntityParserTest.java  |  10 +-
 .../entity/parser/FeedEntityParserTest.java     | 191 ++++++++--
 .../entity/parser/ProcessEntityParserTest.java  |  52 ++-
 .../falcon/hadoop/HadoopClientFactoryTest.java  |  10 +-
 .../metadata/MetadataMappingServiceTest.java    | 143 ++-----
 .../apache/falcon/security/CurrentUserTest.java |   9 +
 .../DefaultAuthorizationProviderTest.java       | 372 +++++++++++++++++++
 .../falcon/security/SecurityUtilTest.java       |  25 +-
 .../org/apache/falcon/FalconWebException.java   |  10 +-
 .../falcon/resource/AbstractEntityManager.java  |   2 -
 .../AbstractSchedulableEntityManager.java       |  21 +-
 .../apache/falcon/security/BasicAuthFilter.java | 233 ------------
 .../security/FalconAuthenticationFilter.java    | 234 ++++++++++++
 .../security/FalconAuthorizationFilter.java     | 108 ++++++
 prism/src/main/webapp/WEB-INF/web.xml           |  16 +-
 .../metadata/LineageMetadataResourceTest.java   | 130 +------
 .../FalconAuthenticationFilterTest.java         | 214 +++++++++++
 .../security/FalconAuthorizationFilterTest.java | 168 +++++++++
 src/conf/startup.properties                     |  12 +-
 .../cluster/util/EntityBuilderTestUtil.java     | 167 +++++++++
 .../src/main/webapp/WEB-INF/distributed/web.xml |  18 +-
 webapp/src/main/webapp/WEB-INF/embedded/web.xml |  16 +-
 webapp/src/main/webapp/WEB-INF/web.xml          |  16 +-
 .../falcon/security/BasicAuthFilterTest.java    | 214 -----------
 42 files changed, 2184 insertions(+), 838 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/client/src/main/resources/feed-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd
index 00b5172..3c96d80 100644
--- a/client/src/main/resources/feed-0.1.xsd
+++ b/client/src/main/resources/feed-0.1.xsd
@@ -280,7 +280,7 @@
         </xs:annotation>
         <xs:attribute type="xs:string" name="owner"/>
         <xs:attribute type="xs:string" name="group"/>
-        <xs:attribute type="xs:string" name="permission"/>
+        <xs:attribute type="xs:string" name="permission" default="*"/>
     </xs:complexType>
     <xs:simpleType name="action-type">
         <xs:restriction base="xs:string">

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/client/src/main/resources/process-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd
index b1bd426..cd4f5d2 100644
--- a/client/src/main/resources/process-0.1.xsd
+++ b/client/src/main/resources/process-0.1.xsd
@@ -365,6 +365,6 @@
         </xs:annotation>
         <xs:attribute type="xs:string" name="owner"/>
         <xs:attribute type="xs:string" name="group"/>
-        <xs:attribute type="xs:string" name="permission"/>
+        <xs:attribute type="xs:string" name="permission" default="*"/>
     </xs:complexType>
 </xs:schema>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index 89e5b3e..7ad0716 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -20,6 +20,7 @@ package org.apache.falcon.entity;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.common.FeedDataPath;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
@@ -339,7 +340,7 @@ public class CatalogStorage implements Storage {
     }
 
     @Override
-    public void validateACL(String owner, String group, String permissions) throws FalconException {
+    public void validateACL(AccessControlList acl) throws FalconException {
         // This is not supported in Hive today as authorization is not enforced on table and
         // partition listing
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 326fccd..4eb3d60 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -20,6 +20,7 @@ package org.apache.falcon.entity;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.common.FeedDataPath;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
@@ -38,6 +39,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.regex.Matcher;
 
 /**
@@ -238,7 +240,7 @@ public class FileSystemStorage implements Storage {
     }
 
     @Override
-    public void validateACL(String owner, String group, String permissions) throws FalconException {
+    public void validateACL(AccessControlList acl) throws FalconException {
         try {
             FileSystem fileSystem = HadoopClientFactory.get().createProxiedFileSystem(getConf());
             for (Location location : getLocations()) {
@@ -246,12 +248,20 @@ public class FileSystemStorage implements Storage {
                 Path path = new Path(pathString);
                 if (fileSystem.exists(path)) {
                     FileStatus fileStatus = fileSystem.getFileStatus(path);
-                    if (!fileStatus.getOwner().equals(owner)) {
-                        LOG.error("Feed ACL owner {} doesn't match the actual owner {} for file {}",
-                                owner, fileStatus.getOwner(), path);
-                        throw new FalconException("Feed ACL owner " + owner
-                                + " doesn't match the actual file owner " + fileStatus.getOwner());
+                    Set<String> groups = CurrentUser.getGroupNames();
+
+                    if (fileStatus.getOwner().equals(acl.getOwner())
+                            || groups.contains(acl.getGroup())) {
+                        return;
                     }
+
+                    LOG.error("Permission denied: Either Feed ACL owner {} or group {} doesn't "
+                                    + "match the actual file owner {} or group {} for file {}",
+                            acl, acl.getGroup(), fileStatus.getOwner(), fileStatus.getGroup(), path);
+                    throw new FalconException("Permission denied: Either Feed ACL owner "
+                            + acl + " or group " + acl.getGroup() + " doesn't match the actual "
+                            + "file owner " + fileStatus.getOwner() + " or group "
+                            + fileStatus.getGroup() + "  for file " + path);
                 }
             }
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/entity/Storage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java
index b49410e..f88e139 100644
--- a/common/src/main/java/org/apache/falcon/entity/Storage.java
+++ b/common/src/main/java/org/apache/falcon/entity/Storage.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.entity;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.feed.LocationType;
 
 /**
@@ -76,10 +77,8 @@ public interface Storage {
     /**
      * Check the permission on the storage, regarding owner/group/permission coming from ACL.
      *
-     * @param owner the owner defined in the ACL.
-     * @param group the group defined in the ACL.
-     * @param permissions the permissions defined in the ACL.
+     * @param acl the ACL defined in the entity.
      * @throws FalconException if the permissions are not valid.
      */
-    void validateACL(String owner, String group, String permissions) throws FalconException;
+    void validateACL(AccessControlList acl) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index 3eabf9e..fbbdbcb 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -212,7 +213,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
      * @throws ValidationException
      */
     private void validateACL(Cluster cluster) throws ValidationException {
-        if (!isAuthorizationEnabled()) {
+        if (isAuthorizationDisabled) {
             return;
         }
 
@@ -222,7 +223,11 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
             throw new ValidationException("Cluster ACL cannot be empty for:  " + cluster.getName());
         }
 
-        validateOwner(clusterACL.getOwner());
+        try {
+            authorize(cluster.getName(), clusterACL);
+        } catch (AuthorizationException e) {
+            throw new ValidationException(e);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
index 761a9f5..8a3f669 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
@@ -20,19 +20,20 @@ package org.apache.falcon.entity.parser;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.falcon.FalconException;
-import org.apache.falcon.Pair;
 import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.xml.bind.Unmarshaller;
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
-import java.util.List;
 
 /**
  * Generic Abstract Entity Parser, the concrete FEED, PROCESS and CLUSTER should extend this parser
@@ -45,9 +46,11 @@ public abstract class EntityParser<T extends Entity> {
     private static final Logger LOG = LoggerFactory.getLogger(EntityParser.class);
 
     private final EntityType entityType;
+    protected final boolean isAuthorizationDisabled;
 
     protected EntityParser(EntityType entityType) {
         this.entityType = entityType;
+        isAuthorizationDisabled = !SecurityUtil.isAuthorizationEnabled();
     }
 
     public EntityType getEntityType() {
@@ -78,6 +81,7 @@ public abstract class EntityParser<T extends Entity> {
      * @return entity
      * @throws FalconException
      */
+    @SuppressWarnings("unchecked")
     public T parse(InputStream xmlStream) throws FalconException {
         try {
             // parse against schema
@@ -102,36 +106,24 @@ public abstract class EntityParser<T extends Entity> {
         }
     }
 
-    protected void validateEntitiesExist(List<Pair<EntityType, String>> entities) throws FalconException {
-        if (entities != null) {
-            for (Pair<EntityType, String> entity : entities) {
-                validateEntityExists(entity.first, entity.second);
-            }
-        }
-    }
-
     public abstract void validate(T entity) throws FalconException;
 
     /**
      * Validate if the entity owner is the logged-in authenticated user.
      *
-     * @param owner entity owner in ACL
-     * @throws ValidationException
+     * @param entityName  entity name
+     * @param acl         entity ACL
+     * @throws AuthorizationException
      */
-    protected void validateOwner(String owner) throws ValidationException {
-        if (!CurrentUser.getUser().equals(owner)) {
-            throw new ValidationException("Entity's owner " + owner
-                    + " is not same as the logged in user " + CurrentUser.getUser());
+    protected void authorize(String entityName,
+                             AccessControlList acl) throws AuthorizationException {
+        try {
+            SecurityUtil.getAuthorizationProvider().authorizeEntity(entityName,
+                    getEntityType().name(), acl, "validate", CurrentUser.getProxyUgi());
+        } catch (FalconException e) {
+            throw new AuthorizationException(e);
+        } catch (IOException e) {
+            throw new AuthorizationException(e);
         }
     }
-
-    /**
-     * Checks if the user has enabled authorization in the configuration.
-     *
-     * @return true if falcon.security.authorization.enabled is enabled, false otherwise
-     */
-    protected boolean isAuthorizationEnabled() {
-        return Boolean.valueOf(StartupProperties.get().getProperty(
-                "falcon.security.authorization.enabled", "false"));
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 0bc5c8f..8fd56e1 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -30,6 +30,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityGraph;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.ACL;
 import org.apache.falcon.entity.v0.feed.Cluster;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
@@ -41,6 +42,7 @@ import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.group.FeedGroup;
 import org.apache.falcon.group.FeedGroupMap;
 import org.apache.falcon.security.SecurityUtil;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -404,11 +406,16 @@ public class FeedEntityParser extends EntityParser<Feed> {
      * @throws ValidationException
      */
     private void validateACL(Feed feed) throws FalconException {
-        if (!isAuthorizationEnabled()) {
+        if (isAuthorizationDisabled) {
             return;
         }
 
-        validateOwner(feed.getACL().getOwner());
+        final ACL feedACL = feed.getACL();
+        try {
+            authorize(feed.getName(), feedACL);
+        } catch (AuthorizationException e) {
+            throw new ValidationException(e);
+        }
 
         for (Cluster cluster : feed.getClusters().getClusters()) {
             org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
@@ -419,7 +426,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
 
             final Storage storage = FeedHelper.createStorage(cluster, feed);
             try {
-                storage.validateACL(feed.getACL().getOwner(), feed.getACL().getGroup(), feed.getACL().getPermission());
+                storage.validateACL(feedACL);
             } catch(FalconException e) {
                 throw new ValidationException(e);
             }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 7da86a7..c2be6bd 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -38,6 +38,7 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 
 import java.io.IOException;
 import java.util.Date;
@@ -231,7 +232,7 @@ public class ProcessEntityParser extends EntityParser<Process> {
      * @throws ValidationException
      */
     private void validateACL(Process process) throws FalconException {
-        if (!isAuthorizationEnabled()) {
+        if (isAuthorizationDisabled) {
             return;
         }
 
@@ -241,6 +242,10 @@ public class ProcessEntityParser extends EntityParser<Process> {
             throw new ValidationException("Process ACL cannot be empty for:  " + process.getName());
         }
 
-        validateOwner(processACL.getOwner());
+        try {
+            authorize(process.getName(), processACL);
+        } catch (AuthorizationException e) {
+            throw new ValidationException(e);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index cb594d1..bbb2b12 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -108,7 +108,7 @@ public final class ConfigurationStore implements FalconService {
 
             return fileSystem;
         } catch (Exception e) {
-            throw new RuntimeException("Unable to bring up config store", e);
+            throw new RuntimeException("Unable to bring up config store for path: " + storePath, e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
index d5fbda8..48b45a2 100644
--- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
+++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
@@ -100,37 +100,27 @@ public final class HadoopClientFactory {
         }
     }
 
-    public FileSystem createProxiedFileSystem(final Configuration conf)
-        throws FalconException {
-        Validate.notNull(conf, "configuration cannot be null");
-
-        String nameNode = conf.get(FS_DEFAULT_NAME_KEY);
-        try {
-            return createProxiedFileSystem(CurrentUser.getUser(), new URI(nameNode), conf);
-        } catch (URISyntaxException e) {
-            throw new FalconException("Exception while getting FileSystem for: " + nameNode, e);
-        }
-    }
-
     /**
-     * Return a FileSystem created with the provided proxyUser for the specified URI.
+     * Return a FileSystem created with the authenticated proxy user for the specified conf.
      *
-     * @param proxyUser proxyUser
-     * @param uri  file system URI.
      * @param conf Configuration with all necessary information to create the FileSystem.
      * @return FileSystem created with the provided proxyUser/group.
      * @throws org.apache.falcon.FalconException
      *          if the filesystem could not be created.
      */
-    public FileSystem createProxiedFileSystem(String proxyUser, final URI uri, final Configuration conf)
+    public FileSystem createProxiedFileSystem(final Configuration conf)
         throws FalconException {
-        Validate.notEmpty(proxyUser, "proxyUser cannot be null");
+        Validate.notNull(conf, "configuration cannot be null");
 
+        String nameNode = conf.get(FS_DEFAULT_NAME_KEY);
         try {
-            UserGroupInformation proxyUgi = SecurityUtil.getProxyUser(proxyUser);
-            return createFileSystem(proxyUgi, uri, conf);
-        } catch (IOException ex) {
-            throw new FalconException("Exception while getting FileSystem: " + ex.getMessage(), ex);
+            return createFileSystem(CurrentUser.getProxyUgi(), new URI(nameNode), conf);
+        } catch (URISyntaxException e) {
+            throw new FalconException("Exception while getting FileSystem for proxy: "
+                    + CurrentUser.getUser(), e);
+        } catch (IOException e) {
+            throw new FalconException("Exception while getting FileSystem for proxy: "
+                    + CurrentUser.getUser(), e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
new file mode 100644
index 0000000..3133d91
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.falcon.entity.v0.AccessControlList;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+
+/**
+ * An interface for authorizing user against an entity operation.
+ */
+public interface AuthorizationProvider {
+
+    /**
+     * Determines if the authenticated user is authorized to execute the action on the resource,
+     * which is typically a REST resource path.
+     * Throws an exception if not authorized.
+     *
+     * @param resource   api resource, admin, entities or instance
+     * @param action     action being authorized on resource and entity if applicable
+     * @param entityType entity type in question, not for admin resource
+     * @param entityName entity name in question, not for admin resource
+     * @param proxyUgi   proxy ugi for the authenticated user
+     * @throws AuthorizationException
+     */
+    void authorizeResource(String resource,
+                           String action,
+                           String entityType,
+                           String entityName,
+                           UserGroupInformation proxyUgi) throws AuthorizationException;
+
+    /**
+     * Determines if the authenticated user is authorized to execute the action on the entity.
+     * Throws an exception if not authorized.
+     *
+     * @param entityName entity in question, applicable for entities and instance resource
+     * @param entityType entity in question, applicable for entities and instance resource
+     * @param acl        entity ACL
+     * @param action     action being authorized on resource and entity if applicable
+     * @param proxyUgi   proxy ugi for the authenticated user
+     * @throws AuthorizationException
+     */
+    void authorizeEntity(String entityName, String entityType,
+                         AccessControlList acl, String action,
+                         UserGroupInformation proxyUgi) throws AuthorizationException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/security/CurrentUser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
index 6fccd1b..b7d2c66 100644
--- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java
+++ b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
@@ -18,13 +18,22 @@
 
 package org.apache.falcon.security;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.security.auth.Subject;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 /**
  * Current authenticated user via REST.
+ * Also doles out proxied UserGroupInformation. Caches proxied users.
  */
 public final class CurrentUser {
 
@@ -79,4 +88,32 @@ public final class CurrentUser {
             return null;
         }
     }
+
+    private static ConcurrentMap<String, UserGroupInformation> userUgiMap =
+            new ConcurrentHashMap<String, UserGroupInformation>();
+
+    /**
+     * Dole out a proxy UGI object for the current authenticated user.
+     *
+     * @return UGI object
+     * @throws java.io.IOException
+     */
+    public static UserGroupInformation getProxyUgi() throws IOException {
+        String proxyUser = getUser();
+
+        UserGroupInformation proxyUgi = userUgiMap.get(proxyUser);
+        if (proxyUgi == null) {
+            // taking care of a race condition, the latest UGI will be discarded
+            proxyUgi = UserGroupInformation
+                    .createProxyUser(proxyUser, UserGroupInformation.getLoginUser());
+            userUgiMap.putIfAbsent(proxyUser, proxyUgi);
+        }
+
+        return proxyUgi;
+    }
+
+    public static Set<String> getGroupNames() throws IOException {
+        HashSet<String> s = new HashSet<String>(Arrays.asList(getProxyUgi().getGroupNames()));
+        return Collections.unmodifiableSet(s);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
new file mode 100644
index 0000000..5561429
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.Validate;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.AccessControlList;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Default implementation of AuthorizationProvider in Falcon.
+ *
+ * The authorization is enforced in the following way:
+ *
+ * if admin resource,
+ *      if authenticated user name matches the admin users configuration
+ *      Else if groups of the authenticated user matches the admin groups configuration
+ * Else if entities or instance resource
+ *      if the authenticated user matches the owner in ACL for the entity
+ *      Else if the groups of the authenticated user matches the group in ACL for the entity
+ * Else if lineage resource
+ *      All have read-only permissions
+ * Else bad resource
+ */
+public class DefaultAuthorizationProvider implements AuthorizationProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultAuthorizationProvider.class);
+
+    private static final Set<String> RESOURCES = new HashSet<String>(
+            Arrays.asList(new String[]{"admin", "entities", "instance", "lineage", }));
+
+    /**
+     * Constant for the configuration property that indicates the prefix.
+     */
+    protected static final String FALCON_PREFIX = "falcon.security.authorization.";
+
+    /**
+     * Constant for the configuration property that indicates the blacklisted super users for falcon.
+     */
+    private static final String ADMIN_USERS_KEY = FALCON_PREFIX + "admin.users";
+    private static final String ADMIN_GROUPS_KEY = FALCON_PREFIX + "admin.groups";
+
+    private Set<String> adminUsers;
+    private Set<String> adminGroups;
+
+    public DefaultAuthorizationProvider() {
+        adminUsers = getAdminNamesFromConfig(ADMIN_USERS_KEY);
+        adminGroups = getAdminNamesFromConfig(ADMIN_GROUPS_KEY);
+    }
+
+    private HashSet<String> getAdminNamesFromConfig(String key) {
+        HashSet<String> adminNames = new HashSet<String>();
+        String adminNamesConfig = StartupProperties.get().getProperty(key);
+        if (!StringUtils.isEmpty(adminNamesConfig)) {
+            adminNames.addAll(Arrays.asList(adminNamesConfig.split(",")));
+        }
+
+        return adminNames;
+    }
+
+    /**
+     * Determines if the authenticated user is authorized to execute the action on the resource.
+     * Throws an exception if not authorized.
+     *
+     * @param resource   api resource, admin, entities or instance
+     * @param action     action being authorized on resource and entity if applicable
+     * @param entityType entity type in question, not for admin resource
+     * @param entityName entity name in question, not for admin resource
+     * @param proxyUgi   proxy ugi for the authenticated user
+     * @throws org.apache.hadoop.security.authorize.AuthorizationException
+     */
+    @Override
+    public void authorizeResource(String resource, String action,
+                                  String entityType, String entityName,
+                                  UserGroupInformation proxyUgi) throws AuthorizationException {
+        Validate.notEmpty(resource, "Resource cannot be empty or null");
+        Validate.notEmpty(action, "Action cannot be empty or null");
+
+        Set<String> groups = getGroupNames(proxyUgi);
+        String authenticatedUser = proxyUgi.getShortUserName();
+        LOG.info("Authorizing authenticatedUser={}, groups={} against resource={}, action={}, entity name={}, "
+                + "entity type={}", authenticatedUser, groups, resource, action, entityName, entityType);
+
+        if ("admin".equals(resource)) {
+            authorizeAdminResource(authenticatedUser, groups, action);
+        } else if ("entities".equals(resource) || "instance".equals(resource)) {
+            authorizeEntityResource(authenticatedUser, proxyUgi, entityName, entityType, action);
+        } else if ("lineage".equals(resource)) {
+            authorizeLineageResource(authenticatedUser, action);
+        } else {
+            throw new AuthorizationException("Unknown resource: " + resource);
+        }
+    }
+
+    private Set<String> getGroupNames(UserGroupInformation proxyUgi) {
+        HashSet<String> s = new HashSet<String>(Arrays.asList(proxyUgi.getGroupNames()));
+        return Collections.unmodifiableSet(s);
+    }
+
+    /**
+     * Determines if the authenticated user is authorized to execute the action on the entity.
+     * Throws an exception if not authorized.
+     *
+     * @param entityName entity in question, applicable for entities and instance resource
+     * @param entityType entity in question, applicable for entities and instance resource
+     * @param acl        entity ACL
+     * @param action     action being authorized on resource and entity if applicable
+     * @param proxyUgi   proxy ugi for the authenticated user
+     * @throws org.apache.hadoop.security.authorize.AuthorizationException
+     */
+    @Override
+    public void authorizeEntity(String entityName, String entityType,
+                                AccessControlList acl, String action,
+                                UserGroupInformation proxyUgi) throws AuthorizationException {
+        String authenticatedUser = proxyUgi.getShortUserName();
+        LOG.info("Authorizing authenticatedUser={}, action={}, entity={}, type{}",
+                authenticatedUser, action, entityName, entityType);
+
+        checkUser(entityName, acl.getOwner(), acl.getGroup(), action, authenticatedUser, proxyUgi);
+    }
+
+    /**
+     * Validate if the entity owner is the logged-in authenticated user.
+     *
+     * @param entityName        entity name.
+     * @param aclOwner          entity ACL Owner.
+     * @param aclGroup          entity ACL group.
+     * @param action            action being authorized on resource and entity if applicable.
+     * @param authenticatedUser authenticated user name.
+     * @param proxyUgi          proxy ugi for the authenticated user.
+     * @throws AuthorizationException
+     */
+    protected void checkUser(String entityName, String aclOwner, String aclGroup,
+                             String action, String authenticatedUser,
+                             UserGroupInformation proxyUgi) throws AuthorizationException {
+        if (isUserACLOwner(authenticatedUser, aclOwner)
+                || isUserInAclGroup(aclGroup, proxyUgi)) {
+            return;
+        }
+
+        StringBuilder message = new StringBuilder("Permission denied: authenticatedUser=");
+        message.append(authenticatedUser);
+        message.append(!authenticatedUser.equals(aclOwner)
+                ? " not entity owner=" + aclOwner
+                : " not in group=" + aclGroup);
+        message.append(", entity=").append(entityName).append(", action=").append(action);
+
+        LOG.error(message.toString());
+        throw new AuthorizationException(message.toString());
+    }
+
+    protected boolean isUserACLOwner(String authenticatedUser, String aclOwner) {
+        return authenticatedUser.equals(aclOwner);
+    }
+
+    protected boolean isUserInAclGroup(String aclGroup, UserGroupInformation proxyUgi) {
+        Set<String> groups = getGroupNames(proxyUgi);
+        return groups.contains(aclGroup);
+    }
+
+    /**
+     * Check if the user has admin privileges.
+     *
+     * @param user   user name.
+     * @param groups groups that the user belongs to.
+     * @param action admin action on the resource
+     * @throws AuthorizationException if the user does not have admin privileges.
+     */
+    protected void authorizeAdminResource(String user, Set<String> groups,
+                                          String action) throws AuthorizationException {
+        LOG.debug("Authorizing user={} for admin, action={}", user, action);
+        if (adminUsers.contains(user) || isUserInAdminGroups(groups)) {
+            return;
+        }
+
+        LOG.error("Permission denied: user {} does not have admin privilege for action={}",
+                user, action);
+        throw new AuthorizationException("Permission denied: user=" + user
+                + " does not have admin privilege for action=" + action);
+    }
+
+    protected boolean isUserInAdminGroups(Set<String> groups) {
+        boolean isUserGroupInAdmin = false;
+        for (String group : groups) {
+            if (adminGroups.contains(group)) {
+                isUserGroupInAdmin = true;
+                break;
+            }
+        }
+
+        return isUserGroupInAdmin;
+    }
+
+    protected void authorizeEntityResource(String authenticatedUser, UserGroupInformation proxyUgi,
+                                           String entityName, String entityType,
+                                           String action) throws AuthorizationException {
+        Validate.notEmpty(entityType, "Entity type cannot be empty or null");
+        LOG.debug("Authorizing authenticatedUser={} against entity/instance action={}, "
+                + "entity name={}, entity type={}", authenticatedUser, action, entityName,
+                entityType);
+
+        if (entityName != null) { // lifecycle actions
+            Entity entity = getEntity(entityName, entityType);
+            authorizeEntity(entity.getName(), entity.getEntityType().name(),
+                    getACL(entity), action, proxyUgi);
+        } else {
+            // non lifecycle actions, lifecycle actions with null entity will validate later
+            LOG.info("Authorization for action={} will be done in the API", action);
+        }
+    }
+
+    private Entity getEntity(String entityName, String entityType) throws AuthorizationException {
+        try {
+            EntityType type = EntityType.valueOf(entityType.toUpperCase());
+            return EntityUtil.getEntity(type, entityName);
+        } catch (FalconException e) {
+            throw new AuthorizationException(e);
+        }
+    }
+
+    protected AccessControlList getACL(Entity entity) throws AuthorizationException {
+        switch (entity.getEntityType()) {
+        case CLUSTER:
+            return ((Cluster) entity).getACL();
+
+        case FEED:
+            return ((org.apache.falcon.entity.v0.feed.Feed) entity).getACL();
+
+        case PROCESS:
+            return ((org.apache.falcon.entity.v0.process.Process) entity).getACL();
+
+        default:
+            throw new AuthorizationException("Cannot get owner for entity: " + entity.getName());
+        }
+    }
+
+    protected void authorizeLineageResource(String authenticatedUser, String action) {
+        LOG.debug("User {} authorized for action {} ", authenticatedUser, action);
+        // todo - do nothing for now, read-only for all
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
index f78043f..b9fd37e 100644
--- a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
+++ b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
@@ -18,20 +18,17 @@
 
 package org.apache.falcon.security;
 
+import org.apache.falcon.FalconException;
+import org.apache.falcon.util.ReflectionUtils;
 import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
 import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 /**
  * Security Util - bunch of security related helper methods.
- * Also doles out proxied UserGroupInformation. Caches proxied users.
  */
 public final class SecurityUtil {
 
@@ -57,9 +54,6 @@ public final class SecurityUtil {
     public static final String HIVE_METASTORE_PRINCIPAL = "hive.metastore.kerberos.principal";
 
 
-    private static ConcurrentMap<String, UserGroupInformation> userUgiMap =
-            new ConcurrentHashMap<String, UserGroupInformation>();
-
     private SecurityUtil() {
     }
 
@@ -68,6 +62,11 @@ public final class SecurityUtil {
                 AUTHENTICATION_TYPE, PseudoAuthenticationHandler.TYPE);
     }
 
+    /**
+     * Checks if kerberos authentication is enabled in the configuration.
+     *
+     * @return true if falcon.authentication.type is kerberos, false otherwise
+     */
     public static boolean isSecurityEnabled() {
         String authenticationType = StartupProperties.get().getProperty(
                 AUTHENTICATION_TYPE, PseudoAuthenticationHandler.TYPE);
@@ -85,18 +84,24 @@ public final class SecurityUtil {
         return useKerberos;
     }
 
-    public static UserGroupInformation getProxyUser(String proxyUser) throws IOException {
-        UserGroupInformation proxyUgi = userUgiMap.get(proxyUser);
-        if (proxyUgi == null) {
-            // taking care of a race condition, the latest UGI will be discarded
-            proxyUgi = UserGroupInformation.createProxyUser(proxyUser, UserGroupInformation.getLoginUser());
-            userUgiMap.putIfAbsent(proxyUser, proxyUgi);
-        }
+    public static String getLocalHostName() throws UnknownHostException {
+        return InetAddress.getLocalHost().getCanonicalHostName();
+    }
 
-        return proxyUgi;
+    /**
+     * Checks if authorization is enabled in the configuration.
+     *
+     * @return true if falcon.security.authorization.enabled is enabled, false otherwise
+     */
+    public static boolean isAuthorizationEnabled() {
+        return Boolean.valueOf(StartupProperties.get().getProperty(
+                "falcon.security.authorization.enabled", "false"));
     }
 
-    public static String getLocalHostName() throws UnknownHostException {
-        return InetAddress.getLocalHost().getCanonicalHostName();
+    public static AuthorizationProvider getAuthorizationProvider() throws FalconException {
+        String providerClassName = StartupProperties.get().getProperty(
+                "falcon.security.authorization.provider",
+                "org.apache.falcon.security.DefaultAuthorizationProvider");
+        return ReflectionUtils.getInstanceByClassName(providerClassName);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index dafdea9..808fed6 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -139,6 +139,16 @@ debug.libext.process.paths=${falcon.libext}
 
 ######### Authorization Properties #########
 
+# Authorization Enabled flag: false (default)|true
 *.falcon.security.authorization.enabled=false
 
+# Admin Users, comma separated users
+*.falcon.security.authorization.admin.users=falcon,ambari-qa,seetharam
+
+# Admin Group Membership, comma separated users
+*.falcon.security.authorization.admin.groups=falcon,testgroup,staff
+
+# Authorization Provider Implementation Fully Qualified Class Name
+*.falcon.security.authorization.provider=org.apache.falcon.security.DefaultAuthorizationProvider
+
 ######### Authorization Properties #########

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index 3c2b63b..c35d1a4 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -39,14 +39,18 @@ import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 import javax.xml.bind.Unmarshaller;
 import java.io.File;
+import java.io.IOException;
 import java.io.StringWriter;
 import java.net.URI;
+import java.util.Arrays;
 import java.util.Collection;
 
 /**
  * Base class for config store test.
  */
 public class AbstractTestBase {
+    protected static final String USER = System.getProperty("user.name");
+
     protected static final String PROCESS_XML = "/config/process/process-0.1.xml";
     protected static final String FEED_XML = "/config/feed/feed-0.1.xml";
     protected static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml";
@@ -138,4 +142,15 @@ public class AbstractTestBase {
         marshaller.marshal(entity, stringWriter);
         return stringWriter.toString();
     }
+
+
+    protected String getGroupName() throws IOException {
+        return getGroupName(true);
+    }
+
+    protected String getGroupName(boolean first) throws IOException {
+        String[] groupNames = CurrentUser.getProxyUgi().getGroupNames();
+        System.out.println("groupNames = " + Arrays.asList(groupNames));
+        return groupNames.length > 1 ? groupNames[first ? 0 : groupNames.length - 1] : "admin";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index 47f63bc..a78c678 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -20,6 +20,7 @@ package org.apache.falcon.entity;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.security.CurrentUser;
@@ -167,11 +168,11 @@ public class FileSystemStorageTest {
         fs.mkdirs(path);
 
         FileSystemStorage storage = new FileSystemStorage(cluster.getConf().get("fs.default.name"), locations);
-        storage.validateACL(user, user, "0x755");
+        storage.validateACL(new TestACL(user, user, "0x755"));
 
         //-ve case
         try {
-            storage.validateACL("random", user, "0x755");
+            storage.validateACL(new TestACL("random", user, "0x755"));
             Assert.fail("Validation should have failed");
         } catch(FalconException e) {
             //expected exception
@@ -179,11 +180,11 @@ public class FileSystemStorageTest {
 
         //Timed path
         location.setPath("/foo/bar/${YEAR}/${MONTH}/${DAY}");
-        storage.validateACL(user, user, "rrr");
+        storage.validateACL(new TestACL(user, user, "rrr"));
 
         //-ve case
         try {
-            storage.validateACL("random", user, "0x755");
+            storage.validateACL(new TestACL("random", user, "0x755"));
             Assert.fail("Validation should have failed");
         } catch(FalconException e) {
             //expected exception
@@ -347,4 +348,43 @@ public class FileSystemStorageTest {
 
         Assert.assertFalse(storage1.isIdentical(storage2));
     }
+
+    private class TestACL extends AccessControlList {
+
+        /**
+         * owner is the Owner of this entity.
+         */
+        private String owner;
+
+        /**
+         * group is the one which has access to read - not used at this time.
+         */
+        private String group;
+
+        /**
+         * permission is not enforced at this time.
+         */
+        private String permission;
+
+        TestACL(String owner, String group, String permission) {
+            this.owner = owner;
+            this.group = group;
+            this.permission = permission;
+        }
+
+        @Override
+        public String getOwner() {
+            return owner;
+        }
+
+        @Override
+        public String getGroup() {
+            return group;
+        }
+
+        @Override
+        public String getPermission() {
+            return permission;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
index 55562d1..41f041a 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
@@ -157,7 +157,10 @@ public class ClusterEntityParserTest extends AbstractTestBase {
         try {
             InputStream stream = this.getClass().getResourceAsStream(CLUSTER_XML);
 
-            Cluster cluster = parser.parse(stream);
+            // need a new parser since it caches authorization enabled flag
+            ClusterEntityParser clusterEntityParser =
+                    (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER);
+            Cluster cluster = clusterEntityParser.parse(stream);
             Assert.assertNotNull(cluster);
             Assert.assertNull(cluster.getACL());
         } finally {
@@ -174,7 +177,10 @@ public class ClusterEntityParserTest extends AbstractTestBase {
         try {
             InputStream stream = this.getClass().getResourceAsStream("/config/cluster/cluster-no-registry.xml");
 
-            Cluster cluster = parser.parse(stream);
+            // need a new parser since it caches authorization enabled flag
+            ClusterEntityParser clusterEntityParser =
+                    (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER);
+            Cluster cluster = clusterEntityParser.parse(stream);
             Assert.assertNotNull(cluster);
             Assert.assertNotNull(cluster.getACL());
             Assert.assertNotNull(cluster.getACL().getOwner());

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index e669f44..98cda04 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -53,8 +53,6 @@ import static org.testng.AssertJUnit.assertEquals;
  */
 public class FeedEntityParserTest extends AbstractTestBase {
 
-    public static final String USER = System.getProperty("user.name");
-
     private final FeedEntityParser parser = (FeedEntityParser) EntityParserFactory
             .getParser(EntityType.FEED);
 
@@ -79,6 +77,7 @@ public class FeedEntityParserTest extends AbstractTestBase {
         cluster.setName("backupCluster");
         store.publish(EntityType.CLUSTER, cluster);
 
+        CurrentUser.authenticate("testuser");
         modifiableFeed = parser.parseAndValidate(this.getClass()
                 .getResourceAsStream(FEED_XML));
     }
@@ -89,21 +88,6 @@ public class FeedEntityParserTest extends AbstractTestBase {
         parser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
     }
 
-
-    @Test(expectedExceptions = ValidationException.class)
-    public void testValidateUser() throws Exception {
-        CurrentUser.authenticate("unknown");
-        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
-        Assert.assertTrue(Boolean.valueOf(
-                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
-        try {
-            parser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
-        } finally {
-            CurrentUser.authenticate("testuser");
-            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
-        }
-    }
-
     @Test
     public void testParse() throws IOException, FalconException, JAXBException {
 
@@ -171,7 +155,6 @@ public class FeedEntityParserTest extends AbstractTestBase {
         parser.validate(feed);
     }
 
-
     @Test
     public void testPartitionExpression() throws FalconException {
         Feed feed = parser.parseAndValidate(ProcessEntityParserTest.class
@@ -548,6 +531,22 @@ public class FeedEntityParserTest extends AbstractTestBase {
                 + " storage");
     }
 
+    @Test(expectedExceptions = ValidationException.class)
+    public void testValidateOwner() throws Exception {
+        CurrentUser.authenticate("unknown");
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+        try {
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser =
+                    (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+            feedEntityParser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
+        } finally {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
     @Test
     public void testValidateACLWithACLAndAuthorizationDisabled() throws Exception {
         InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
@@ -563,8 +562,7 @@ public class FeedEntityParserTest extends AbstractTestBase {
     }
 
     @Test
-    public void testValidateACLAndOwner() throws Exception {
-        CurrentUser.authenticate("testuser");
+    public void testValidateACLOwner() throws Exception {
         StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
         Assert.assertTrue(Boolean.valueOf(
                 StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
@@ -572,7 +570,10 @@ public class FeedEntityParserTest extends AbstractTestBase {
         try {
             InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
 
-            Feed feed = parser.parseAndValidate(stream);
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser =
+                    (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+            Feed feed = feedEntityParser.parseAndValidate(stream);
             Assert.assertNotNull(feed);
             Assert.assertNotNull(feed.getACL());
             Assert.assertNotNull(feed.getACL().getOwner());
@@ -621,7 +622,10 @@ public class FeedEntityParserTest extends AbstractTestBase {
         try {
             InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
 
-            Feed feed = parser.parse(stream);
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser =
+                    (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+            Feed feed = feedEntityParser.parse(stream);
 
             Assert.assertNotNull(feed);
             Assert.assertNotNull(feed.getACL());
@@ -629,17 +633,15 @@ public class FeedEntityParserTest extends AbstractTestBase {
             Assert.assertNotNull(feed.getACL().getGroup());
             Assert.assertNotNull(feed.getACL().getPermission());
 
-            parser.validate(feed);
+            feedEntityParser.validate(feed);
             Assert.fail("Validation exception should have been thrown for invalid owner");
         } finally {
-            CurrentUser.authenticate("testuser");
             StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
         }
     }
 
     @Test (expectedExceptions = ValidationException.class)
     public void testValidateACLAndStorageBadOwner() throws Exception {
-        CurrentUser.authenticate("testuser");
         StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
         Assert.assertTrue(Boolean.valueOf(
                 StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
@@ -648,7 +650,10 @@ public class FeedEntityParserTest extends AbstractTestBase {
         try {
             InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
 
-            feed = parser.parse(stream);
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser =
+                    (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+            feed = feedEntityParser.parse(stream);
             Assert.assertNotNull(feed);
             Assert.assertNotNull(feed.getACL());
             Assert.assertNotNull(feed.getACL().getOwner());
@@ -657,7 +662,38 @@ public class FeedEntityParserTest extends AbstractTestBase {
 
             // create locations
             createLocations(feed);
-            parser.validate(feed);
+            feedEntityParser.validate(feed);
+        } finally {
+            if (feed != null) {
+                deleteLocations(feed);
+            }
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testValidateACLAndStorageBadOwnerAndGroup() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+
+        Feed feed = null;
+        try {
+            InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
+
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser =
+                    (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+            feed = feedEntityParser.parse(stream);
+            Assert.assertNotNull(feed);
+            Assert.assertNotNull(feed.getACL());
+            Assert.assertNotNull(feed.getACL().getOwner());
+            Assert.assertNotNull(feed.getACL().getGroup());
+            Assert.assertNotNull(feed.getACL().getPermission());
+
+            // create locations
+            createLocations(feed);
+            feedEntityParser.validate(feed);
         } finally {
             if (feed != null) {
                 deleteLocations(feed);
@@ -667,7 +703,7 @@ public class FeedEntityParserTest extends AbstractTestBase {
     }
 
     @Test
-    public void testValidateACLAndStorage() throws Exception {
+    public void testValidateACLAndStorageForValidOwnerBadGroup() throws Exception {
         CurrentUser.authenticate(USER);
         StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
         Assert.assertTrue(Boolean.valueOf(
@@ -677,7 +713,9 @@ public class FeedEntityParserTest extends AbstractTestBase {
         try {
             InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
 
-            feed = parser.parse(stream);
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser = (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+            feed = feedEntityParser.parse(stream);
             Assert.assertNotNull(feed);
             Assert.assertNotNull(feed.getACL());
             Assert.assertNotNull(feed.getACL().getOwner());
@@ -688,7 +726,100 @@ public class FeedEntityParserTest extends AbstractTestBase {
 
             // create locations
             createLocations(feed);
-            parser.validate(feed);
+            feedEntityParser.validate(feed);
+        } finally {
+            if (feed != null) {
+                deleteLocations(feed);
+            }
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test
+    public void testValidateACLValidGroupBadOwner() throws Exception {
+        CurrentUser.authenticate(USER);
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+
+        try {
+            InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
+
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser = (FeedEntityParser) EntityParserFactory.getParser(
+                    EntityType.FEED);
+            Feed feed = feedEntityParser.parse(stream);
+            Assert.assertNotNull(feed);
+            Assert.assertNotNull(feed.getACL());
+            Assert.assertNotNull(feed.getACL().getOwner());
+            Assert.assertNotNull(feed.getACL().getGroup());
+            Assert.assertNotNull(feed.getACL().getPermission());
+
+            feed.getACL().setGroup(getGroupName());
+
+            feedEntityParser.validate(feed);
+        } finally {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testValidateACLAndStorageForInvalidOwnerAndGroup() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+
+        Feed feed = null;
+        try {
+            InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
+
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser = (FeedEntityParser) EntityParserFactory.getParser(
+                    EntityType.FEED);
+            feed = feedEntityParser.parse(stream);
+            Assert.assertNotNull(feed);
+            Assert.assertNotNull(feed.getACL());
+            Assert.assertNotNull(feed.getACL().getOwner());
+            Assert.assertNotNull(feed.getACL().getGroup());
+            Assert.assertNotNull(feed.getACL().getPermission());
+
+            // create locations
+            createLocations(feed);
+            feedEntityParser.validate(feed);
+        } finally {
+            if (feed != null) {
+                deleteLocations(feed);
+            }
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test
+    public void testValidateACLAndStorageForValidGroupBadOwner() throws Exception {
+        CurrentUser.authenticate(USER);
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+
+        Feed feed = null;
+        try {
+            InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
+
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser = (FeedEntityParser) EntityParserFactory.getParser(
+                    EntityType.FEED);
+            feed = feedEntityParser.parse(stream);
+            Assert.assertNotNull(feed);
+            Assert.assertNotNull(feed.getACL());
+            Assert.assertNotNull(feed.getACL().getOwner());
+            Assert.assertNotNull(feed.getACL().getGroup());
+            Assert.assertNotNull(feed.getACL().getPermission());
+
+            feed.getACL().setGroup(getGroupName());
+
+            // create locations
+            createLocations(feed);
+            feedEntityParser.validate(feed);
         } finally {
             if (feed != null) {
                 deleteLocations(feed);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
index 93c34d2..3513dab 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
@@ -367,13 +367,16 @@ public class ProcessEntityParserTest extends AbstractTestBase {
         CurrentUser.authenticate("falcon");
 
         try {
+            // need a new parser since it caches authorization enabled flag
+            ProcessEntityParser processEntityParser =
+                    (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS);
             InputStream stream = this.getClass().getResourceAsStream(PROCESS_XML);
 
-            Process process = parser.parse(stream);
+            Process process = processEntityParser.parse(stream);
             Assert.assertNotNull(process);
             Assert.assertNull(process.getACL());
 
-            parser.validate(process);
+            processEntityParser.validate(process);
             Assert.fail("Validation exception should have been thrown for empty ACL");
         } finally {
             StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
@@ -381,7 +384,7 @@ public class ProcessEntityParserTest extends AbstractTestBase {
     }
 
     @Test
-    public void testValidateACLAuthorizationEnabled() throws Exception {
+    public void testValidateACLAuthorizationEnabledValidOwnerBadGroup() throws Exception {
         StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
         Assert.assertTrue(Boolean.valueOf(
                 StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
@@ -390,19 +393,51 @@ public class ProcessEntityParserTest extends AbstractTestBase {
         try {
             InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml");
 
-            Process process = parser.parseAndValidate(stream);
+            // need a new parser since it caches authorization enabled flag
+            ProcessEntityParser processEntityParser =
+                    (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS);
+            Process process = processEntityParser.parseAndValidate(stream);
+            Assert.assertNotNull(process);
+            Assert.assertNotNull(process.getACL());
+            Assert.assertNotNull(process.getACL().getOwner());
+            Assert.assertNotNull(process.getACL().getGroup());
+            Assert.assertNotNull(process.getACL().getPermission());
+        } finally {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test
+    public void testValidateACLAuthorizationEnabledValidGroupBadOwner() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+        CurrentUser.authenticate(USER); // valid user but acl owner is falcon
+
+        try {
+            InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml");
+
+            // need a new parser since it caches authorization enabled flag
+            ProcessEntityParser processEntityParser =
+                    (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS);
+            Process process = processEntityParser.parse(stream);
             Assert.assertNotNull(process);
             Assert.assertNotNull(process.getACL());
             Assert.assertNotNull(process.getACL().getOwner());
             Assert.assertNotNull(process.getACL().getGroup());
             Assert.assertNotNull(process.getACL().getPermission());
+
+            process.getACL().setOwner(USER);
+            process.getACL().setGroup(getGroupName());
+
+            processEntityParser.validate(process);
         } finally {
             StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
         }
     }
 
     @Test (expectedExceptions = ValidationException.class)
-    public void testValidateACLAuthorizationEnabledBadOwner() throws Exception {
+    public void testValidateACLAuthorizationEnabledBadOwnerAndGroup() throws Exception {
         StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
         Assert.assertTrue(Boolean.valueOf(
                 StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
@@ -411,7 +446,10 @@ public class ProcessEntityParserTest extends AbstractTestBase {
         try {
             InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml");
 
-            Process process = parser.parse(stream);
+            // need a new parser since it caches authorization enabled flag
+            ProcessEntityParser processEntityParser =
+                    (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS);
+            Process process = processEntityParser.parse(stream);
 
             Assert.assertNotNull(process);
             Assert.assertNotNull(process.getACL());
@@ -419,7 +457,7 @@ public class ProcessEntityParserTest extends AbstractTestBase {
             Assert.assertNotNull(process.getACL().getGroup());
             Assert.assertNotNull(process.getACL().getPermission());
 
-            parser.validate(process);
+            processEntityParser.validate(process);
             Assert.fail("Validation exception should have been thrown for invalid owner");
         } finally {
             StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java b/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
index 3b4e7f0..8f25f57 100644
--- a/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
+++ b/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.hadoop;
 
 import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.ipc.RemoteException;
@@ -58,11 +59,12 @@ public class HadoopClientFactoryTest {
     @Test (enabled = false) // todo: cheated the conf to impersonate as same user
     public void testCreateFileSystemWithSameUser() {
         String user = System.getProperty("user.name");
+        CurrentUser.authenticate(user);
         try {
             Configuration conf = embeddedCluster.getConf();
             URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
             Assert.assertNotNull(uri);
-            HadoopClientFactory.get().createProxiedFileSystem(user, uri, conf);
+            HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUgi(), uri, conf);
             Assert.fail("Impersonation should have failed.");
         } catch (Exception e) {
             Assert.assertEquals(e.getCause().getClass(), RemoteException.class);
@@ -80,7 +82,7 @@ public class HadoopClientFactoryTest {
 
         URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
         Assert.assertNotNull(uri);
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem("testuser", uri, conf);
+        FileSystem fs = HadoopClientFactory.get().createFileSystem(realUser, uri, conf);
         Assert.assertNotNull(fs);
     }
 
@@ -95,7 +97,9 @@ public class HadoopClientFactoryTest {
 
         URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
         Assert.assertNotNull(uri);
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem("seetharam", uri, conf);
+
+        CurrentUser.authenticate(System.getProperty("user.name"));
+        FileSystem fs = HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUgi(), uri, conf);
         Assert.assertNotNull(fs);
     }
 }


[4/9] FALCON-464 Enforce Authorization for REST API. Contributed by Venkatesh Seetharam

Posted by ve...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/test/java/org/apache/falcon/security/FalconAuthorizationFilterTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/security/FalconAuthorizationFilterTest.java b/prism/src/test/java/org/apache/falcon/security/FalconAuthorizationFilterTest.java
new file mode 100644
index 0000000..289e232
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/security/FalconAuthorizationFilterTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * Test for FalconAuthorizationFilter using mock objects.
+ */
+public class FalconAuthorizationFilterTest {
+
+    public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
+    public static final String PROCESS_ENTITY_NAME = "sample-process";
+
+    @Mock
+    private HttpServletRequest mockRequest;
+
+    @Mock
+    private HttpServletResponse mockResponse;
+
+    @Mock
+    private FilterChain mockChain;
+
+    @Mock
+    private FilterConfig mockConfig;
+
+    @Mock
+    private UserGroupInformation mockUgi;
+
+    private ConfigurationStore configStore;
+    private Cluster clusterEntity;
+    private org.apache.falcon.entity.v0.process.Process processEntity;
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+
+        CurrentUser.authenticate(EntityBuilderTestUtil.USER);
+        Assert.assertEquals(CurrentUser.getUser(), EntityBuilderTestUtil.USER);
+
+        configStore = ConfigurationStore.get();
+
+        addClusterEntity();
+        addProcessEntity();
+        Assert.assertNotNull(processEntity);
+    }
+
+    @DataProvider(name = "resourceWithNoEntity")
+    private Object[][] createOptions() {
+        return new Object[][] {
+            {"/admin/version"},
+            {"/entities/list/feed"},
+            {"/entities/list/process"},
+            {"/entities/list/cluster"},
+            {"/graphs/lineage/vertices/all"},
+            {"/graphs/lineage/vertices/_1"},
+            {"/graphs/lineage/vertices/properties/_1"},
+        };
+    }
+
+    @Test (dataProvider = "resourceWithNoEntity")
+    public void testDoFilter(String resource) throws Exception {
+        Filter filter = new FalconAuthorizationFilter();
+        synchronized (StartupProperties.get()) {
+            filter.init(mockConfig);
+        }
+
+        try {
+            boolean[] enabledFlags = {false, true};
+            for (boolean enabled : enabledFlags) {
+                StartupProperties.get().setProperty(
+                        "falcon.security.authorization.enabled", String.valueOf(enabled));
+
+                StringBuffer requestUrl = new StringBuffer("http://localhost" + resource);
+                Mockito.when(mockRequest.getRequestURL()).thenReturn(requestUrl);
+                Mockito.when(mockRequest.getRequestURI()).thenReturn("/api" + resource);
+                Mockito.when(mockRequest.getPathInfo()).thenReturn(resource);
+
+                filter.doFilter(mockRequest, mockResponse, mockChain);
+            }
+        } finally {
+            filter.destroy();
+        }
+    }
+
+    @DataProvider(name = "resourceWithEntity")
+    private Object[][] createOptionsForResourceWithEntity() {
+        return new Object[][] {
+            {"/entities/status/process/"},
+            {"/entities/suspend/process/"},
+            {"/instance/running/process/"},
+        };
+    }
+
+    @Test (dataProvider = "resourceWithEntity")
+    public void testDoFilterForEntity(String resource) throws Exception {
+        Filter filter = new FalconAuthorizationFilter();
+        synchronized (StartupProperties.get()) {
+            filter.init(mockConfig);
+        }
+
+        try {
+            boolean[] enabledFlags = {false, true};
+            for (boolean enabled : enabledFlags) {
+                StartupProperties.get().setProperty(
+                        "falcon.security.authorization.enabled", String.valueOf(enabled));
+
+                String uri = resource + processEntity.getName();
+                StringBuffer requestUrl = new StringBuffer("http://localhost" + uri);
+                Mockito.when(mockRequest.getRequestURL()).thenReturn(requestUrl);
+                Mockito.when(mockRequest.getRequestURI()).thenReturn("/api" + uri);
+                Mockito.when(mockRequest.getPathInfo()).thenReturn(uri);
+
+                filter.doFilter(mockRequest, mockResponse, mockChain);
+            }
+        } finally {
+            filter.destroy();
+        }
+    }
+
+    public void addClusterEntity() throws Exception {
+        clusterEntity = EntityBuilderTestUtil.buildCluster(CLUSTER_ENTITY_NAME);
+        configStore.publish(EntityType.CLUSTER, clusterEntity);
+    }
+
+    public void addProcessEntity() throws Exception {
+        processEntity = EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME,
+                clusterEntity, "classified-as=Critical");
+        EntityBuilderTestUtil.addProcessWorkflow(processEntity);
+        EntityBuilderTestUtil.addProcessACL(processEntity);
+
+        configStore.publish(EntityType.PROCESS, processEntity);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 038026d..526656f 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -144,8 +144,16 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 
 ######### Authorization Properties #########
 
+# Authorization Enabled flag: false (default)|true
 *.falcon.security.authorization.enabled=false
-#*.falcon.security.authorization.admin.users=seetharam
-#*.falcon.security.authorization.admin.groups=seetharam
+
+# Admin Users, comma separated users
+*.falcon.security.authorization.admin.users=falcon,ambari-qa,seetharam
+
+# Admin Group Membership, comma separated users
+*.falcon.security.authorization.admin.groups=falcon,testgroup,staff
+
+# Authorization Provider Implementation Fully Qualified Class Name
+*.falcon.security.authorization.provider=org.apache.falcon.security.DefaultAuthorizationProvider
 
 ######### Authorization Properties #########

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java b/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
new file mode 100644
index 0000000..edcc728
--- /dev/null
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.cluster.util;
+
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfaces;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Clusters;
+import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Inputs;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Outputs;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Workflow;
+
+/**
+ * Utility class to build entity objects.
+ */
+public final class EntityBuilderTestUtil {
+
+    public static final String USER = System.getProperty("user.name");
+    public static final String COLO_NAME = "west-coast";
+    public static final String WORKFLOW_NAME = "imp-click-join-workflow";
+    public static final String WORKFLOW_VERSION = "1.0.9";
+
+    private EntityBuilderTestUtil() {
+    }
+
+    public static Cluster buildCluster(String name) {
+        return buildCluster(name, COLO_NAME, "classification=production");
+    }
+
+    public static Cluster buildCluster(String name, String colo, String tags) {
+        Cluster cluster = new Cluster();
+        cluster.setName(name);
+        cluster.setColo(colo);
+        cluster.setTags(tags);
+
+        Interfaces interfaces = new Interfaces();
+        cluster.setInterfaces(interfaces);
+
+        Interface storage = new Interface();
+        storage.setEndpoint("jail://global:00");
+        storage.setType(Interfacetype.WRITE);
+        cluster.getInterfaces().getInterfaces().add(storage);
+
+        org.apache.falcon.entity.v0.cluster.ACL clusterACL = new org.apache.falcon.entity.v0
+                .cluster.ACL();
+        clusterACL.setOwner(USER);
+        clusterACL.setGroup(USER);
+        clusterACL.setPermission("*");
+        cluster.setACL(clusterACL);
+
+        return cluster;
+    }
+
+    public static Feed buildFeed(String feedName, Cluster cluster, String tags, String groups) {
+        Feed feed = new Feed();
+        feed.setName(feedName);
+        feed.setTags(tags);
+        feed.setGroups(groups);
+        feed.setFrequency(Frequency.fromString("hours(1)"));
+
+        org.apache.falcon.entity.v0.feed.Clusters
+                clusters = new org.apache.falcon.entity.v0.feed.Clusters();
+        feed.setClusters(clusters);
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                new org.apache.falcon.entity.v0.feed.Cluster();
+        feedCluster.setName(cluster.getName());
+        clusters.getClusters().add(feedCluster);
+
+        org.apache.falcon.entity.v0.feed.ACL feedACL = new org.apache.falcon.entity.v0.feed.ACL();
+        feedACL.setOwner(USER);
+        feedACL.setGroup(USER);
+        feedACL.setPermission("*");
+        feed.setACL(feedACL);
+
+        return feed;
+    }
+
+    public static org.apache.falcon.entity.v0.process.Process buildProcess(String processName,
+                                                                           Cluster cluster,
+                                                                           String tags) throws Exception {
+        org.apache.falcon.entity.v0.process.Process processEntity = new Process();
+        processEntity.setName(processName);
+        processEntity.setTags(tags);
+
+        org.apache.falcon.entity.v0.process.Cluster processCluster =
+                new org.apache.falcon.entity.v0.process.Cluster();
+        processCluster.setName(cluster.getName());
+        processEntity.setClusters(new Clusters());
+        processEntity.getClusters().getClusters().add(processCluster);
+
+        addProcessACL(processEntity);
+
+        return processEntity;
+    }
+
+    public static void addProcessWorkflow(Process process) {
+        addProcessWorkflow(process, WORKFLOW_NAME, WORKFLOW_VERSION);
+    }
+
+    public static void addProcessWorkflow(Process process, String workflowName, String version) {
+        Workflow workflow = new Workflow();
+        workflow.setName(workflowName);
+        workflow.setVersion(version);
+        workflow.setEngine(EngineType.PIG);
+        workflow.setPath("/falcon/test/workflow");
+
+        process.setWorkflow(workflow);
+    }
+
+    public static void addProcessACL(Process processEntity) throws Exception {
+        addProcessACL(processEntity, USER, USER);
+    }
+
+    public static void addProcessACL(Process processEntity, String user,
+                                     String group) throws Exception {
+        org.apache.falcon.entity.v0.process.ACL processACL = new org.apache.falcon.entity.v0.process.ACL();
+        processACL.setOwner(user);
+        processACL.setGroup(group);
+        processACL.setPermission("*");
+        processEntity.setACL(processACL);
+    }
+
+    public static void addInput(Process process, Feed feed) {
+        if (process.getInputs() == null) {
+            process.setInputs(new Inputs());
+        }
+
+        Inputs inputs = process.getInputs();
+        Input input = new Input();
+        input.setFeed(feed.getName());
+        inputs.getInputs().add(input);
+    }
+
+    public static void addOutput(Process process, Feed feed) {
+        if (process.getOutputs() == null) {
+            process.setOutputs(new Outputs());
+        }
+
+        Outputs outputs = process.getOutputs();
+        Output output = new Output();
+        output.setFeed(feed.getName());
+        outputs.getOutputs().add(output);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/webapp/src/main/webapp/WEB-INF/distributed/web.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/webapp/WEB-INF/distributed/web.xml b/webapp/src/main/webapp/WEB-INF/distributed/web.xml
index a5e1161..7a4de55 100644
--- a/webapp/src/main/webapp/WEB-INF/distributed/web.xml
+++ b/webapp/src/main/webapp/WEB-INF/distributed/web.xml
@@ -26,8 +26,13 @@
     <description>Apache Falcon Distributed Server</description>
 
     <filter>
-        <filter-name>auth</filter-name>
-        <filter-class>org.apache.falcon.security.BasicAuthFilter</filter-class>
+        <filter-name>authentication</filter-name>
+        <filter-class>org.apache.falcon.security.FalconAuthenticationFilter</filter-class>
+    </filter>
+
+    <filter>
+        <filter-name>authorization</filter-name>
+        <filter-class>org.apache.falcon.security.FalconAuthorizationFilter</filter-class>
     </filter>
 
     <filter>
@@ -36,12 +41,17 @@
     </filter>
 
     <filter-mapping>
-        <filter-name>auth</filter-name>
+        <filter-name>authentication</filter-name>
+        <servlet-name>FalconRESTApi</servlet-name>
+    </filter-mapping>
+
+    <filter-mapping>
+        <filter-name>authorization</filter-name>
         <servlet-name>FalconRESTApi</servlet-name>
     </filter-mapping>
 
     <filter-mapping>
-        <filter-name>auth</filter-name>
+        <filter-name>authentication</filter-name>
         <servlet-name>SecureApi</servlet-name>
     </filter-mapping>
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/webapp/src/main/webapp/WEB-INF/embedded/web.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/webapp/WEB-INF/embedded/web.xml b/webapp/src/main/webapp/WEB-INF/embedded/web.xml
index 9dc371f..7d0cb08 100644
--- a/webapp/src/main/webapp/WEB-INF/embedded/web.xml
+++ b/webapp/src/main/webapp/WEB-INF/embedded/web.xml
@@ -26,12 +26,22 @@
     <description>Apache Falcon Embedded Server</description>
 
     <filter>
-        <filter-name>auth</filter-name>
-        <filter-class>org.apache.falcon.security.BasicAuthFilter</filter-class>
+        <filter-name>authentication</filter-name>
+        <filter-class>org.apache.falcon.security.FalconAuthenticationFilter</filter-class>
     </filter>
 
+    <filter>
+        <filter-name>authorization</filter-name>
+        <filter-class>org.apache.falcon.security.FalconAuthorizationFilter</filter-class>
+    </filter>
+
+    <filter-mapping>
+        <filter-name>authentication</filter-name>
+        <servlet-name>FalconRESTApi</servlet-name>
+    </filter-mapping>
+
     <filter-mapping>
-        <filter-name>auth</filter-name>
+        <filter-name>authorization</filter-name>
         <servlet-name>FalconRESTApi</servlet-name>
     </filter-mapping>
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/webapp/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/webapp/WEB-INF/web.xml b/webapp/src/main/webapp/WEB-INF/web.xml
index 971fcdd..08c30cb 100644
--- a/webapp/src/main/webapp/WEB-INF/web.xml
+++ b/webapp/src/main/webapp/WEB-INF/web.xml
@@ -26,12 +26,22 @@
     <description>Apache Falcon Placeholder</description>
 
     <filter>
-        <filter-name>auth</filter-name>
-        <filter-class>org.apache.falcon.security.BasicAuthFilter</filter-class>
+        <filter-name>authentication</filter-name>
+        <filter-class>org.apache.falcon.security.FalconAuthenticationFilter</filter-class>
     </filter>
 
+    <filter>
+        <filter-name>authorization</filter-name>
+        <filter-class>org.apache.falcon.security.FalconAuthorizationFilter</filter-class>
+    </filter>
+
+    <filter-mapping>
+        <filter-name>authentication</filter-name>
+        <servlet-name>FalconRESTApi</servlet-name>
+    </filter-mapping>
+
     <filter-mapping>
-        <filter-name>auth</filter-name>
+        <filter-name>authorization</filter-name>
         <servlet-name>FalconRESTApi</servlet-name>
     </filter-mapping>
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/webapp/src/test/java/org/apache/falcon/security/BasicAuthFilterTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/security/BasicAuthFilterTest.java b/webapp/src/test/java/org/apache/falcon/security/BasicAuthFilterTest.java
deleted file mode 100644
index 62e889a..0000000
--- a/webapp/src/test/java/org/apache/falcon/security/BasicAuthFilterTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import javax.servlet.Filter;
-import javax.servlet.FilterChain;
-import javax.servlet.FilterConfig;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * Test for BasicAuthFilter using mock objects.
- */
-public class BasicAuthFilterTest {
-
-    @Mock
-    private HttpServletRequest mockRequest;
-
-    @Mock
-    private HttpServletResponse mockResponse;
-
-    @Mock
-    private FilterChain mockChain;
-
-    @Mock
-    private FilterConfig mockConfig;
-
-    @Mock
-    private UserGroupInformation mockUgi;
-
-    @BeforeClass
-    public void init() {
-        MockitoAnnotations.initMocks(this);
-    }
-
-    @BeforeMethod
-    private void initAuthType() {
-        ConcurrentHashMap<String, String> conf = new ConcurrentHashMap<String, String>();
-        conf.put("type", "simple");
-        conf.put("config.prefix.type", "");
-        conf.put("anonymous.allowed", "true");
-        Mockito.when(mockConfig.getInitParameterNames()).thenReturn(conf.keys());
-
-        for (Map.Entry<String, String> entry : conf.entrySet()) {
-            Mockito.when(mockConfig.getInitParameter(entry.getKey())).thenReturn(entry.getValue());
-        }
-
-        Mockito.when(mockRequest.getMethod()).thenReturn("OPTIONS");
-
-        StringBuffer requestUrl = new StringBuffer("http://localhost");
-        Mockito.when(mockRequest.getRequestURL()).thenReturn(requestUrl);
-    }
-
-    @Test
-    public void testDoFilter() throws Exception {
-        Filter filter = new BasicAuthFilter();
-        synchronized (StartupProperties.get()) {
-            filter.init(mockConfig);
-        }
-
-        CurrentUser.authenticate("nouser");
-        Assert.assertEquals(CurrentUser.getUser(), "nouser");
-
-        CurrentUser.authenticate("guest");
-        Mockito.when(mockRequest.getQueryString()).thenReturn("user.name=guest");
-        filter.doFilter(mockRequest, mockResponse, mockChain);
-        Assert.assertEquals(CurrentUser.getUser(), "guest");
-
-        CurrentUser.authenticate("nouser");
-        Assert.assertEquals(CurrentUser.getUser(), "nouser");
-        CurrentUser.authenticate("testuser");
-        Mockito.when(mockRequest.getRemoteUser()).thenReturn("testuser");
-        filter.doFilter(mockRequest, mockResponse, mockChain);
-        Assert.assertEquals(CurrentUser.getUser(), "testuser");
-    }
-
-    @Test
-    public void testAnonymous() throws Exception {
-        Filter filter = new BasicAuthFilter();
-
-        synchronized (StartupProperties.get()) {
-            filter.init(mockConfig);
-        }
-
-        CurrentUser.authenticate("nouser");
-        Assert.assertEquals(CurrentUser.getUser(), "nouser");
-
-        CurrentUser.authenticate("testuser");
-        Mockito.when(mockRequest.getRemoteUser()).thenReturn("testuser");
-        filter.doFilter(mockRequest, mockResponse, mockChain);
-        Assert.assertEquals(CurrentUser.getUser(), "testuser");
-    }
-
-    @Test
-    public void testEmptyUser() throws Exception {
-        Filter filter = new BasicAuthFilter();
-
-        synchronized (StartupProperties.get()) {
-            filter.init(mockConfig);
-        }
-
-        final String userName = System.getProperty("user.name");
-        try {
-            System.setProperty("user.name", "");
-
-            Mockito.when(mockRequest.getMethod()).thenReturn("POST");
-            Mockito.when(mockRequest.getQueryString()).thenReturn("");
-            Mockito.when(mockRequest.getRemoteUser()).thenReturn(null);
-
-            HttpServletResponse errorResponse = Mockito.mock(HttpServletResponse.class);
-            filter.doFilter(mockRequest, errorResponse, mockChain);
-        } finally {
-            System.setProperty("user.name", userName);
-        }
-    }
-
-    @Test
-    public void testDoFilterForClientBackwardsCompatibility() throws Exception {
-        Filter filter = new BasicAuthFilter();
-
-        final String userName = System.getProperty("user.name");
-        final String httpAuthType =
-                StartupProperties.get().getProperty("falcon.http.authentication.type", "simple");
-        try {
-            System.setProperty("user.name", "");
-            StartupProperties.get().setProperty("falcon.http.authentication.type",
-                    "org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler");
-
-            synchronized (StartupProperties.get()) {
-                filter.init(mockConfig);
-            }
-
-            Mockito.when(mockRequest.getMethod()).thenReturn("POST");
-            Mockito.when(mockRequest.getQueryString()).thenReturn("");
-            Mockito.when(mockRequest.getRemoteUser()).thenReturn(null);
-            Mockito.when(mockRequest.getHeader("Remote-User")).thenReturn("remote-user");
-
-            filter.doFilter(mockRequest, mockResponse, mockChain);
-
-            Assert.assertEquals(CurrentUser.getUser(), "remote-user");
-
-        } finally {
-            System.setProperty("user.name", userName);
-            StartupProperties.get().setProperty("falcon.http.authentication.type", httpAuthType);
-        }
-    }
-
-    @Test
-    public void testGetKerberosPrincipalWithSubstitutedHostSecure() throws Exception {
-        String principal = StartupProperties.get().getProperty(BasicAuthFilter.KERBEROS_PRINCIPAL);
-
-        String expectedPrincipal = "falcon/" + SecurityUtil.getLocalHostName() + "@Example.com";
-        try {
-            Configuration conf = new Configuration(false);
-            conf.set("hadoop.security.authentication", "kerberos");
-            UserGroupInformation.setConfiguration(conf);
-            Assert.assertTrue(UserGroupInformation.isSecurityEnabled());
-
-            StartupProperties.get().setProperty(
-                    BasicAuthFilter.KERBEROS_PRINCIPAL, "falcon/_HOST@Example.com");
-            BasicAuthFilter filter = new BasicAuthFilter();
-            Properties properties = filter.getConfiguration(BasicAuthFilter.FALCON_PREFIX, null);
-            Assert.assertEquals(
-                    properties.get(KerberosAuthenticationHandler.PRINCIPAL), expectedPrincipal);
-        } finally {
-            StartupProperties.get().setProperty(BasicAuthFilter.KERBEROS_PRINCIPAL, principal);
-        }
-    }
-
-    @Test
-    public void testGetKerberosPrincipalWithSubstitutedHostNonSecure() throws Exception {
-        String principal = StartupProperties.get().getProperty(BasicAuthFilter.KERBEROS_PRINCIPAL);
-        Configuration conf = new Configuration(false);
-        conf.set("hadoop.security.authentication", "simple");
-        UserGroupInformation.setConfiguration(conf);
-        Assert.assertFalse(UserGroupInformation.isSecurityEnabled());
-
-        BasicAuthFilter filter = new BasicAuthFilter();
-        Properties properties = filter.getConfiguration(BasicAuthFilter.FALCON_PREFIX, null);
-        Assert.assertEquals(properties.get(KerberosAuthenticationHandler.PRINCIPAL), principal);
-    }
-}


[9/9] git commit: FALCON-400 Add Authorization for Entities. Contributed by Venkatesh Seetharam

Posted by ve...@apache.org.
FALCON-400 Add Authorization for Entities. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/c4de9eca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/c4de9eca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/c4de9eca

Branch: refs/heads/master
Commit: c4de9eca72c621f544e5cf39861ff08d1ad83cc0
Parents: 84cc368
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Aug 8 10:43:13 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Aug 8 10:43:13 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                                  | 8 ++++++++
 .../apache/falcon/security/DefaultAuthorizationProvider.java | 2 +-
 .../oozie/process/OozieProcessWorkflowBuilderTest.java       | 2 +-
 3 files changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c4de9eca/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d090789..a0f9275 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,14 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+   FALCON-400 Add Authorization for Entities (Venkatesh Seetharam)
+      FALCON-279 Add ACL for Cluster Entity
+      FALCON-462 Add ACL for process entity
+      FALCON-463 Validate Feed ACL only if authorization is enabled
+      FALCON-464 Enforce Authorization for REST API
+      FALCON-468 Add User Documentation for authorization feature
+      FALCON-557 Add super-user who is authorized for all
+
    FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
 
   IMPROVEMENTS

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c4de9eca/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
index c7e87f4..e90518d 100644
--- a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
+++ b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
@@ -182,7 +182,7 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
         if (isSuperUser(proxyUgi)) {
             return;
         }
-        
+
         checkUser(entityName, acl.getOwner(), acl.getGroup(), action, authenticatedUser, proxyUgi);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c4de9eca/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 14759e0..ffcc88a 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -632,7 +632,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
         List<CONFIGURATION.Property> props = bundle.getCoordinator().get(0).getConfiguration().getProperty();
         for (CONFIGURATION.Property prop : props) {
-            if(prop.getName().equals("oozie.libpath")) {
+            if (prop.getName().equals("oozie.libpath")) {
                 Assert.assertEquals(prop.getValue().replace("${nameNode}", ""), new Path(bundlePath,
                     "userlib").toString());
             }