You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/08/08 19:43:45 UTC

[6/9] git commit: FALCON-464 Enforce Authorization for REST API. Contributed by Venkatesh Seetharam

FALCON-464 Enforce Authorization for REST API. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/adca0057
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/adca0057
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/adca0057

Branch: refs/heads/master
Commit: adca0057d279022a9f32900c64ecca8d9f70f8a6
Parents: a3c1ffe
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Aug 8 10:17:08 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Aug 8 10:17:08 2014 -0700

----------------------------------------------------------------------
 client/src/main/resources/feed-0.1.xsd          |   2 +-
 client/src/main/resources/process-0.1.xsd       |   2 +-
 .../apache/falcon/entity/CatalogStorage.java    |   3 +-
 .../apache/falcon/entity/FileSystemStorage.java |  22 +-
 .../java/org/apache/falcon/entity/Storage.java  |   7 +-
 .../entity/parser/ClusterEntityParser.java      |   9 +-
 .../falcon/entity/parser/EntityParser.java      |  46 +--
 .../falcon/entity/parser/FeedEntityParser.java  |  13 +-
 .../entity/parser/ProcessEntityParser.java      |   9 +-
 .../falcon/entity/store/ConfigurationStore.java |   2 +-
 .../falcon/hadoop/HadoopClientFactory.java      |  32 +-
 .../falcon/security/AuthorizationProvider.java  |  62 ++++
 .../org/apache/falcon/security/CurrentUser.java |  37 ++
 .../security/DefaultAuthorizationProvider.java  | 271 ++++++++++++++
 .../apache/falcon/security/SecurityUtil.java    |  41 +-
 common/src/main/resources/startup.properties    |  10 +
 .../apache/falcon/entity/AbstractTestBase.java  |  15 +
 .../falcon/entity/FileSystemStorageTest.java    |  48 ++-
 .../entity/parser/ClusterEntityParserTest.java  |  10 +-
 .../entity/parser/FeedEntityParserTest.java     | 191 ++++++++--
 .../entity/parser/ProcessEntityParserTest.java  |  52 ++-
 .../falcon/hadoop/HadoopClientFactoryTest.java  |  10 +-
 .../metadata/MetadataMappingServiceTest.java    | 143 ++-----
 .../apache/falcon/security/CurrentUserTest.java |   9 +
 .../DefaultAuthorizationProviderTest.java       | 372 +++++++++++++++++++
 .../falcon/security/SecurityUtilTest.java       |  25 +-
 .../org/apache/falcon/FalconWebException.java   |  10 +-
 .../falcon/resource/AbstractEntityManager.java  |   2 -
 .../AbstractSchedulableEntityManager.java       |  21 +-
 .../apache/falcon/security/BasicAuthFilter.java | 233 ------------
 .../security/FalconAuthenticationFilter.java    | 234 ++++++++++++
 .../security/FalconAuthorizationFilter.java     | 108 ++++++
 prism/src/main/webapp/WEB-INF/web.xml           |  16 +-
 .../metadata/LineageMetadataResourceTest.java   | 130 +------
 .../FalconAuthenticationFilterTest.java         | 214 +++++++++++
 .../security/FalconAuthorizationFilterTest.java | 168 +++++++++
 src/conf/startup.properties                     |  12 +-
 .../cluster/util/EntityBuilderTestUtil.java     | 167 +++++++++
 .../src/main/webapp/WEB-INF/distributed/web.xml |  18 +-
 webapp/src/main/webapp/WEB-INF/embedded/web.xml |  16 +-
 webapp/src/main/webapp/WEB-INF/web.xml          |  16 +-
 .../falcon/security/BasicAuthFilterTest.java    | 214 -----------
 42 files changed, 2184 insertions(+), 838 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/client/src/main/resources/feed-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd
index 00b5172..3c96d80 100644
--- a/client/src/main/resources/feed-0.1.xsd
+++ b/client/src/main/resources/feed-0.1.xsd
@@ -280,7 +280,7 @@
         </xs:annotation>
         <xs:attribute type="xs:string" name="owner"/>
         <xs:attribute type="xs:string" name="group"/>
-        <xs:attribute type="xs:string" name="permission"/>
+        <xs:attribute type="xs:string" name="permission" default="*"/>
     </xs:complexType>
     <xs:simpleType name="action-type">
         <xs:restriction base="xs:string">

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/client/src/main/resources/process-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd
index b1bd426..cd4f5d2 100644
--- a/client/src/main/resources/process-0.1.xsd
+++ b/client/src/main/resources/process-0.1.xsd
@@ -365,6 +365,6 @@
         </xs:annotation>
         <xs:attribute type="xs:string" name="owner"/>
         <xs:attribute type="xs:string" name="group"/>
-        <xs:attribute type="xs:string" name="permission"/>
+        <xs:attribute type="xs:string" name="permission" default="*"/>
     </xs:complexType>
 </xs:schema>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index 89e5b3e..7ad0716 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -20,6 +20,7 @@ package org.apache.falcon.entity;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.common.FeedDataPath;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
@@ -339,7 +340,7 @@ public class CatalogStorage implements Storage {
     }
 
     @Override
-    public void validateACL(String owner, String group, String permissions) throws FalconException {
+    public void validateACL(AccessControlList acl) throws FalconException {
         // This is not supported in Hive today as authorization is not enforced on table and
         // partition listing
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 326fccd..4eb3d60 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -20,6 +20,7 @@ package org.apache.falcon.entity;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.common.FeedDataPath;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
@@ -38,6 +39,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.regex.Matcher;
 
 /**
@@ -238,7 +240,7 @@ public class FileSystemStorage implements Storage {
     }
 
     @Override
-    public void validateACL(String owner, String group, String permissions) throws FalconException {
+    public void validateACL(AccessControlList acl) throws FalconException {
         try {
             FileSystem fileSystem = HadoopClientFactory.get().createProxiedFileSystem(getConf());
             for (Location location : getLocations()) {
@@ -246,12 +248,20 @@ public class FileSystemStorage implements Storage {
                 Path path = new Path(pathString);
                 if (fileSystem.exists(path)) {
                     FileStatus fileStatus = fileSystem.getFileStatus(path);
-                    if (!fileStatus.getOwner().equals(owner)) {
-                        LOG.error("Feed ACL owner {} doesn't match the actual owner {} for file {}",
-                                owner, fileStatus.getOwner(), path);
-                        throw new FalconException("Feed ACL owner " + owner
-                                + " doesn't match the actual file owner " + fileStatus.getOwner());
+                    Set<String> groups = CurrentUser.getGroupNames();
+
+                    if (fileStatus.getOwner().equals(acl.getOwner())
+                            || groups.contains(acl.getGroup())) {
+                        return;
                     }
+
+                    LOG.error("Permission denied: Either Feed ACL owner {} or group {} doesn't "
+                                    + "match the actual file owner {} or group {} for file {}",
+                            acl, acl.getGroup(), fileStatus.getOwner(), fileStatus.getGroup(), path);
+                    throw new FalconException("Permission denied: Either Feed ACL owner "
+                            + acl + " or group " + acl.getGroup() + " doesn't match the actual "
+                            + "file owner " + fileStatus.getOwner() + " or group "
+                            + fileStatus.getGroup() + "  for file " + path);
                 }
             }
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/entity/Storage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java
index b49410e..f88e139 100644
--- a/common/src/main/java/org/apache/falcon/entity/Storage.java
+++ b/common/src/main/java/org/apache/falcon/entity/Storage.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.entity;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.feed.LocationType;
 
 /**
@@ -76,10 +77,8 @@ public interface Storage {
     /**
      * Check the permission on the storage, regarding owner/group/permission coming from ACL.
      *
-     * @param owner the owner defined in the ACL.
-     * @param group the group defined in the ACL.
-     * @param permissions the permissions defined in the ACL.
+     * @param acl the ACL defined in the entity.
      * @throws FalconException if the permissions are not valid.
      */
-    void validateACL(String owner, String group, String permissions) throws FalconException;
+    void validateACL(AccessControlList acl) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index 3eabf9e..fbbdbcb 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -212,7 +213,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
      * @throws ValidationException
      */
     private void validateACL(Cluster cluster) throws ValidationException {
-        if (!isAuthorizationEnabled()) {
+        if (isAuthorizationDisabled) {
             return;
         }
 
@@ -222,7 +223,11 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
             throw new ValidationException("Cluster ACL cannot be empty for:  " + cluster.getName());
         }
 
-        validateOwner(clusterACL.getOwner());
+        try {
+            authorize(cluster.getName(), clusterACL);
+        } catch (AuthorizationException e) {
+            throw new ValidationException(e);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
index 761a9f5..8a3f669 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
@@ -20,19 +20,20 @@ package org.apache.falcon.entity.parser;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.falcon.FalconException;
-import org.apache.falcon.Pair;
 import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.xml.bind.Unmarshaller;
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
-import java.util.List;
 
 /**
  * Generic Abstract Entity Parser, the concrete FEED, PROCESS and CLUSTER should extend this parser
@@ -45,9 +46,11 @@ public abstract class EntityParser<T extends Entity> {
     private static final Logger LOG = LoggerFactory.getLogger(EntityParser.class);
 
     private final EntityType entityType;
+    protected final boolean isAuthorizationDisabled;
 
     protected EntityParser(EntityType entityType) {
         this.entityType = entityType;
+        isAuthorizationDisabled = !SecurityUtil.isAuthorizationEnabled();
     }
 
     public EntityType getEntityType() {
@@ -78,6 +81,7 @@ public abstract class EntityParser<T extends Entity> {
      * @return entity
      * @throws FalconException
      */
+    @SuppressWarnings("unchecked")
     public T parse(InputStream xmlStream) throws FalconException {
         try {
             // parse against schema
@@ -102,36 +106,24 @@ public abstract class EntityParser<T extends Entity> {
         }
     }
 
-    protected void validateEntitiesExist(List<Pair<EntityType, String>> entities) throws FalconException {
-        if (entities != null) {
-            for (Pair<EntityType, String> entity : entities) {
-                validateEntityExists(entity.first, entity.second);
-            }
-        }
-    }
-
     public abstract void validate(T entity) throws FalconException;
 
     /**
      * Validate if the entity owner is the logged-in authenticated user.
      *
-     * @param owner entity owner in ACL
-     * @throws ValidationException
+     * @param entityName  entity name
+     * @param acl         entity ACL
+     * @throws AuthorizationException
      */
-    protected void validateOwner(String owner) throws ValidationException {
-        if (!CurrentUser.getUser().equals(owner)) {
-            throw new ValidationException("Entity's owner " + owner
-                    + " is not same as the logged in user " + CurrentUser.getUser());
+    protected void authorize(String entityName,
+                             AccessControlList acl) throws AuthorizationException {
+        try {
+            SecurityUtil.getAuthorizationProvider().authorizeEntity(entityName,
+                    getEntityType().name(), acl, "validate", CurrentUser.getProxyUgi());
+        } catch (FalconException e) {
+            throw new AuthorizationException(e);
+        } catch (IOException e) {
+            throw new AuthorizationException(e);
         }
     }
-
-    /**
-     * Checks if the user has enabled authorization in the configuration.
-     *
-     * @return true if falcon.security.authorization.enabled is enabled, false otherwise
-     */
-    protected boolean isAuthorizationEnabled() {
-        return Boolean.valueOf(StartupProperties.get().getProperty(
-                "falcon.security.authorization.enabled", "false"));
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 0bc5c8f..8fd56e1 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -30,6 +30,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityGraph;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.ACL;
 import org.apache.falcon.entity.v0.feed.Cluster;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
@@ -41,6 +42,7 @@ import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.group.FeedGroup;
 import org.apache.falcon.group.FeedGroupMap;
 import org.apache.falcon.security.SecurityUtil;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -404,11 +406,16 @@ public class FeedEntityParser extends EntityParser<Feed> {
      * @throws ValidationException
      */
     private void validateACL(Feed feed) throws FalconException {
-        if (!isAuthorizationEnabled()) {
+        if (isAuthorizationDisabled) {
             return;
         }
 
-        validateOwner(feed.getACL().getOwner());
+        final ACL feedACL = feed.getACL();
+        try {
+            authorize(feed.getName(), feedACL);
+        } catch (AuthorizationException e) {
+            throw new ValidationException(e);
+        }
 
         for (Cluster cluster : feed.getClusters().getClusters()) {
             org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
@@ -419,7 +426,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
 
             final Storage storage = FeedHelper.createStorage(cluster, feed);
             try {
-                storage.validateACL(feed.getACL().getOwner(), feed.getACL().getGroup(), feed.getACL().getPermission());
+                storage.validateACL(feedACL);
             } catch(FalconException e) {
                 throw new ValidationException(e);
             }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 7da86a7..c2be6bd 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -38,6 +38,7 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 
 import java.io.IOException;
 import java.util.Date;
@@ -231,7 +232,7 @@ public class ProcessEntityParser extends EntityParser<Process> {
      * @throws ValidationException
      */
     private void validateACL(Process process) throws FalconException {
-        if (!isAuthorizationEnabled()) {
+        if (isAuthorizationDisabled) {
             return;
         }
 
@@ -241,6 +242,10 @@ public class ProcessEntityParser extends EntityParser<Process> {
             throw new ValidationException("Process ACL cannot be empty for:  " + process.getName());
         }
 
-        validateOwner(processACL.getOwner());
+        try {
+            authorize(process.getName(), processACL);
+        } catch (AuthorizationException e) {
+            throw new ValidationException(e);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index cb594d1..bbb2b12 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -108,7 +108,7 @@ public final class ConfigurationStore implements FalconService {
 
             return fileSystem;
         } catch (Exception e) {
-            throw new RuntimeException("Unable to bring up config store", e);
+            throw new RuntimeException("Unable to bring up config store for path: " + storePath, e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
index d5fbda8..48b45a2 100644
--- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
+++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
@@ -100,37 +100,27 @@ public final class HadoopClientFactory {
         }
     }
 
-    public FileSystem createProxiedFileSystem(final Configuration conf)
-        throws FalconException {
-        Validate.notNull(conf, "configuration cannot be null");
-
-        String nameNode = conf.get(FS_DEFAULT_NAME_KEY);
-        try {
-            return createProxiedFileSystem(CurrentUser.getUser(), new URI(nameNode), conf);
-        } catch (URISyntaxException e) {
-            throw new FalconException("Exception while getting FileSystem for: " + nameNode, e);
-        }
-    }
-
     /**
-     * Return a FileSystem created with the provided proxyUser for the specified URI.
+     * Return a FileSystem created with the authenticated proxy user for the specified conf.
      *
-     * @param proxyUser proxyUser
-     * @param uri  file system URI.
      * @param conf Configuration with all necessary information to create the FileSystem.
      * @return FileSystem created with the provided proxyUser/group.
      * @throws org.apache.falcon.FalconException
      *          if the filesystem could not be created.
      */
-    public FileSystem createProxiedFileSystem(String proxyUser, final URI uri, final Configuration conf)
+    public FileSystem createProxiedFileSystem(final Configuration conf)
         throws FalconException {
-        Validate.notEmpty(proxyUser, "proxyUser cannot be null");
+        Validate.notNull(conf, "configuration cannot be null");
 
+        String nameNode = conf.get(FS_DEFAULT_NAME_KEY);
         try {
-            UserGroupInformation proxyUgi = SecurityUtil.getProxyUser(proxyUser);
-            return createFileSystem(proxyUgi, uri, conf);
-        } catch (IOException ex) {
-            throw new FalconException("Exception while getting FileSystem: " + ex.getMessage(), ex);
+            return createFileSystem(CurrentUser.getProxyUgi(), new URI(nameNode), conf);
+        } catch (URISyntaxException e) {
+            throw new FalconException("Exception while getting FileSystem for proxy: "
+                    + CurrentUser.getUser(), e);
+        } catch (IOException e) {
+            throw new FalconException("Exception while getting FileSystem for proxy: "
+                    + CurrentUser.getUser(), e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
new file mode 100644
index 0000000..3133d91
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.falcon.entity.v0.AccessControlList;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+
+/**
+ * An interface for authorizing user against an entity operation.
+ */
+public interface AuthorizationProvider {
+
+    /**
+     * Determines if the authenticated user is authorized to execute the action on the resource,
+     * which is typically a REST resource path.
+     * Throws an exception if not authorized.
+     *
+     * @param resource   api resource, admin, entities or instance
+     * @param action     action being authorized on resource and entity if applicable
+     * @param entityType entity type in question, not for admin resource
+     * @param entityName entity name in question, not for admin resource
+     * @param proxyUgi   proxy ugi for the authenticated user
+     * @throws AuthorizationException
+     */
+    void authorizeResource(String resource,
+                           String action,
+                           String entityType,
+                           String entityName,
+                           UserGroupInformation proxyUgi) throws AuthorizationException;
+
+    /**
+     * Determines if the authenticated user is authorized to execute the action on the entity.
+     * Throws an exception if not authorized.
+     *
+     * @param entityName entity in question, applicable for entities and instance resource
+     * @param entityType entity in question, applicable for entities and instance resource
+     * @param acl        entity ACL
+     * @param action     action being authorized on resource and entity if applicable
+     * @param proxyUgi   proxy ugi for the authenticated user
+     * @throws AuthorizationException
+     */
+    void authorizeEntity(String entityName, String entityType,
+                         AccessControlList acl, String action,
+                         UserGroupInformation proxyUgi) throws AuthorizationException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/security/CurrentUser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
index 6fccd1b..b7d2c66 100644
--- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java
+++ b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
@@ -18,13 +18,22 @@
 
 package org.apache.falcon.security;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.security.auth.Subject;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 /**
  * Current authenticated user via REST.
+ * Also doles out proxied UserGroupInformation. Caches proxied users.
  */
 public final class CurrentUser {
 
@@ -79,4 +88,32 @@ public final class CurrentUser {
             return null;
         }
     }
+
+    private static ConcurrentMap<String, UserGroupInformation> userUgiMap =
+            new ConcurrentHashMap<String, UserGroupInformation>();
+
+    /**
+     * Dole out a proxy UGI object for the current authenticated user.
+     *
+     * @return UGI object
+     * @throws java.io.IOException
+     */
+    public static UserGroupInformation getProxyUgi() throws IOException {
+        String proxyUser = getUser();
+
+        UserGroupInformation proxyUgi = userUgiMap.get(proxyUser);
+        if (proxyUgi == null) {
+            // taking care of a race condition, the latest UGI will be discarded
+            proxyUgi = UserGroupInformation
+                    .createProxyUser(proxyUser, UserGroupInformation.getLoginUser());
+            userUgiMap.putIfAbsent(proxyUser, proxyUgi);
+        }
+
+        return proxyUgi;
+    }
+
+    public static Set<String> getGroupNames() throws IOException {
+        HashSet<String> s = new HashSet<String>(Arrays.asList(getProxyUgi().getGroupNames()));
+        return Collections.unmodifiableSet(s);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
new file mode 100644
index 0000000..5561429
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.Validate;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.AccessControlList;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Default implementation of AuthorizationProvider in Falcon.
+ *
+ * The authorization is enforced in the following way:
+ *
+ * if admin resource,
+ *      if authenticated user name matches the admin users configuration
+ *      Else if groups of the authenticated user matches the admin groups configuration
+ * Else if entities or instance resource
+ *      if the authenticated user matches the owner in ACL for the entity
+ *      Else if the groups of the authenticated user matches the group in ACL for the entity
+ * Else if lineage resource
+ *      All have read-only permissions
+ * Else bad resource
+ */
+public class DefaultAuthorizationProvider implements AuthorizationProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultAuthorizationProvider.class);
+
+    private static final Set<String> RESOURCES = new HashSet<String>(
+            Arrays.asList(new String[]{"admin", "entities", "instance", "lineage", }));
+
+    /**
+     * Constant for the configuration property that indicates the prefix.
+     */
+    protected static final String FALCON_PREFIX = "falcon.security.authorization.";
+
+    /**
+     * Constant for the configuration property that indicates the blacklisted super users for falcon.
+     */
+    private static final String ADMIN_USERS_KEY = FALCON_PREFIX + "admin.users";
+    private static final String ADMIN_GROUPS_KEY = FALCON_PREFIX + "admin.groups";
+
+    private Set<String> adminUsers;
+    private Set<String> adminGroups;
+
+    public DefaultAuthorizationProvider() {
+        adminUsers = getAdminNamesFromConfig(ADMIN_USERS_KEY);
+        adminGroups = getAdminNamesFromConfig(ADMIN_GROUPS_KEY);
+    }
+
+    private HashSet<String> getAdminNamesFromConfig(String key) {
+        HashSet<String> adminNames = new HashSet<String>();
+        String adminNamesConfig = StartupProperties.get().getProperty(key);
+        if (!StringUtils.isEmpty(adminNamesConfig)) {
+            adminNames.addAll(Arrays.asList(adminNamesConfig.split(",")));
+        }
+
+        return adminNames;
+    }
+
+    /**
+     * Determines if the authenticated user is authorized to execute the action on the resource.
+     * Throws an exception if not authorized.
+     *
+     * @param resource   api resource, admin, entities or instance
+     * @param action     action being authorized on resource and entity if applicable
+     * @param entityType entity type in question, not for admin resource
+     * @param entityName entity name in question, not for admin resource
+     * @param proxyUgi   proxy ugi for the authenticated user
+     * @throws org.apache.hadoop.security.authorize.AuthorizationException
+     */
+    @Override
+    public void authorizeResource(String resource, String action,
+                                  String entityType, String entityName,
+                                  UserGroupInformation proxyUgi) throws AuthorizationException {
+        Validate.notEmpty(resource, "Resource cannot be empty or null");
+        Validate.notEmpty(action, "Action cannot be empty or null");
+
+        Set<String> groups = getGroupNames(proxyUgi);
+        String authenticatedUser = proxyUgi.getShortUserName();
+        LOG.info("Authorizing authenticatedUser={}, groups={} against resource={}, action={}, entity name={}, "
+                + "entity type={}", authenticatedUser, groups, resource, action, entityName, entityType);
+
+        if ("admin".equals(resource)) {
+            authorizeAdminResource(authenticatedUser, groups, action);
+        } else if ("entities".equals(resource) || "instance".equals(resource)) {
+            authorizeEntityResource(authenticatedUser, proxyUgi, entityName, entityType, action);
+        } else if ("lineage".equals(resource)) {
+            authorizeLineageResource(authenticatedUser, action);
+        } else {
+            throw new AuthorizationException("Unknown resource: " + resource);
+        }
+    }
+
+    private Set<String> getGroupNames(UserGroupInformation proxyUgi) {
+        HashSet<String> s = new HashSet<String>(Arrays.asList(proxyUgi.getGroupNames()));
+        return Collections.unmodifiableSet(s);
+    }
+
+    /**
+     * Determines if the authenticated user is authorized to execute the action on the entity.
+     * Throws an exception if not authorized.
+     *
+     * @param entityName entity in question, applicable for entities and instance resource
+     * @param entityType entity in question, applicable for entities and instance resource
+     * @param acl        entity ACL
+     * @param action     action being authorized on resource and entity if applicable
+     * @param proxyUgi   proxy ugi for the authenticated user
+     * @throws org.apache.hadoop.security.authorize.AuthorizationException
+     */
+    @Override
+    public void authorizeEntity(String entityName, String entityType,
+                                AccessControlList acl, String action,
+                                UserGroupInformation proxyUgi) throws AuthorizationException {
+        String authenticatedUser = proxyUgi.getShortUserName();
+        LOG.info("Authorizing authenticatedUser={}, action={}, entity={}, type{}",
+                authenticatedUser, action, entityName, entityType);
+
+        checkUser(entityName, acl.getOwner(), acl.getGroup(), action, authenticatedUser, proxyUgi);
+    }
+
+    /**
+     * Validate if the entity owner is the logged-in authenticated user.
+     *
+     * @param entityName        entity name.
+     * @param aclOwner          entity ACL Owner.
+     * @param aclGroup          entity ACL group.
+     * @param action            action being authorized on resource and entity if applicable.
+     * @param authenticatedUser authenticated user name.
+     * @param proxyUgi          proxy ugi for the authenticated user.
+     * @throws AuthorizationException
+     */
+    protected void checkUser(String entityName, String aclOwner, String aclGroup,
+                             String action, String authenticatedUser,
+                             UserGroupInformation proxyUgi) throws AuthorizationException {
+        if (isUserACLOwner(authenticatedUser, aclOwner)
+                || isUserInAclGroup(aclGroup, proxyUgi)) {
+            return;
+        }
+
+        StringBuilder message = new StringBuilder("Permission denied: authenticatedUser=");
+        message.append(authenticatedUser);
+        message.append(!authenticatedUser.equals(aclOwner)
+                ? " not entity owner=" + aclOwner
+                : " not in group=" + aclGroup);
+        message.append(", entity=").append(entityName).append(", action=").append(action);
+
+        LOG.error(message.toString());
+        throw new AuthorizationException(message.toString());
+    }
+
+    protected boolean isUserACLOwner(String authenticatedUser, String aclOwner) {
+        return authenticatedUser.equals(aclOwner);
+    }
+
+    protected boolean isUserInAclGroup(String aclGroup, UserGroupInformation proxyUgi) {
+        Set<String> groups = getGroupNames(proxyUgi);
+        return groups.contains(aclGroup);
+    }
+
+    /**
+     * Check if the user has admin privileges.
+     *
+     * @param user   user name.
+     * @param groups groups that the user belongs to.
+     * @param action admin action on the resource
+     * @throws AuthorizationException if the user does not have admin privileges.
+     */
+    protected void authorizeAdminResource(String user, Set<String> groups,
+                                          String action) throws AuthorizationException {
+        LOG.debug("Authorizing user={} for admin, action={}", user, action);
+        if (adminUsers.contains(user) || isUserInAdminGroups(groups)) {
+            return;
+        }
+
+        LOG.error("Permission denied: user {} does not have admin privilege for action={}",
+                user, action);
+        throw new AuthorizationException("Permission denied: user=" + user
+                + " does not have admin privilege for action=" + action);
+    }
+
+    protected boolean isUserInAdminGroups(Set<String> groups) {
+        boolean isUserGroupInAdmin = false;
+        for (String group : groups) {
+            if (adminGroups.contains(group)) {
+                isUserGroupInAdmin = true;
+                break;
+            }
+        }
+
+        return isUserGroupInAdmin;
+    }
+
+    protected void authorizeEntityResource(String authenticatedUser, UserGroupInformation proxyUgi,
+                                           String entityName, String entityType,
+                                           String action) throws AuthorizationException {
+        Validate.notEmpty(entityType, "Entity type cannot be empty or null");
+        LOG.debug("Authorizing authenticatedUser={} against entity/instance action={}, "
+                + "entity name={}, entity type={}", authenticatedUser, action, entityName,
+                entityType);
+
+        if (entityName != null) { // lifecycle actions
+            Entity entity = getEntity(entityName, entityType);
+            authorizeEntity(entity.getName(), entity.getEntityType().name(),
+                    getACL(entity), action, proxyUgi);
+        } else {
+            // non lifecycle actions, lifecycle actions with null entity will validate later
+            LOG.info("Authorization for action={} will be done in the API", action);
+        }
+    }
+
+    private Entity getEntity(String entityName, String entityType) throws AuthorizationException {
+        try {
+            EntityType type = EntityType.valueOf(entityType.toUpperCase());
+            return EntityUtil.getEntity(type, entityName);
+        } catch (FalconException e) {
+            throw new AuthorizationException(e);
+        }
+    }
+
+    protected AccessControlList getACL(Entity entity) throws AuthorizationException {
+        switch (entity.getEntityType()) {
+        case CLUSTER:
+            return ((Cluster) entity).getACL();
+
+        case FEED:
+            return ((org.apache.falcon.entity.v0.feed.Feed) entity).getACL();
+
+        case PROCESS:
+            return ((org.apache.falcon.entity.v0.process.Process) entity).getACL();
+
+        default:
+            throw new AuthorizationException("Cannot get owner for entity: " + entity.getName());
+        }
+    }
+
+    protected void authorizeLineageResource(String authenticatedUser, String action) {
+        LOG.debug("User {} authorized for action {} ", authenticatedUser, action);
+        // todo - do nothing for now, read-only for all
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
index f78043f..b9fd37e 100644
--- a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
+++ b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
@@ -18,20 +18,17 @@
 
 package org.apache.falcon.security;
 
+import org.apache.falcon.FalconException;
+import org.apache.falcon.util.ReflectionUtils;
 import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
 import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 /**
  * Security Util - bunch of security related helper methods.
- * Also doles out proxied UserGroupInformation. Caches proxied users.
  */
 public final class SecurityUtil {
 
@@ -57,9 +54,6 @@ public final class SecurityUtil {
     public static final String HIVE_METASTORE_PRINCIPAL = "hive.metastore.kerberos.principal";
 
 
-    private static ConcurrentMap<String, UserGroupInformation> userUgiMap =
-            new ConcurrentHashMap<String, UserGroupInformation>();
-
     private SecurityUtil() {
     }
 
@@ -68,6 +62,11 @@ public final class SecurityUtil {
                 AUTHENTICATION_TYPE, PseudoAuthenticationHandler.TYPE);
     }
 
+    /**
+     * Checks if kerberos authentication is enabled in the configuration.
+     *
+     * @return true if falcon.authentication.type is kerberos, false otherwise
+     */
     public static boolean isSecurityEnabled() {
         String authenticationType = StartupProperties.get().getProperty(
                 AUTHENTICATION_TYPE, PseudoAuthenticationHandler.TYPE);
@@ -85,18 +84,24 @@ public final class SecurityUtil {
         return useKerberos;
     }
 
-    public static UserGroupInformation getProxyUser(String proxyUser) throws IOException {
-        UserGroupInformation proxyUgi = userUgiMap.get(proxyUser);
-        if (proxyUgi == null) {
-            // taking care of a race condition, the latest UGI will be discarded
-            proxyUgi = UserGroupInformation.createProxyUser(proxyUser, UserGroupInformation.getLoginUser());
-            userUgiMap.putIfAbsent(proxyUser, proxyUgi);
-        }
+    public static String getLocalHostName() throws UnknownHostException {
+        return InetAddress.getLocalHost().getCanonicalHostName();
+    }
 
-        return proxyUgi;
+    /**
+     * Checks if authorization is enabled in the configuration.
+     *
+     * @return true if falcon.security.authorization.enabled is enabled, false otherwise
+     */
+    public static boolean isAuthorizationEnabled() {
+        return Boolean.valueOf(StartupProperties.get().getProperty(
+                "falcon.security.authorization.enabled", "false"));
     }
 
-    public static String getLocalHostName() throws UnknownHostException {
-        return InetAddress.getLocalHost().getCanonicalHostName();
+    public static AuthorizationProvider getAuthorizationProvider() throws FalconException {
+        String providerClassName = StartupProperties.get().getProperty(
+                "falcon.security.authorization.provider",
+                "org.apache.falcon.security.DefaultAuthorizationProvider");
+        return ReflectionUtils.getInstanceByClassName(providerClassName);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index dafdea9..808fed6 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -139,6 +139,16 @@ debug.libext.process.paths=${falcon.libext}
 
 ######### Authorization Properties #########
 
+# Authorization Enabled flag: false (default)|true
 *.falcon.security.authorization.enabled=false
 
+# Admin Users, comma separated users
+*.falcon.security.authorization.admin.users=falcon,ambari-qa,seetharam
+
+# Admin Group Membership, comma separated users
+*.falcon.security.authorization.admin.groups=falcon,testgroup,staff
+
+# Authorization Provider Implementation Fully Qualified Class Name
+*.falcon.security.authorization.provider=org.apache.falcon.security.DefaultAuthorizationProvider
+
 ######### Authorization Properties #########

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index 3c2b63b..c35d1a4 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -39,14 +39,18 @@ import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 import javax.xml.bind.Unmarshaller;
 import java.io.File;
+import java.io.IOException;
 import java.io.StringWriter;
 import java.net.URI;
+import java.util.Arrays;
 import java.util.Collection;
 
 /**
  * Base class for config store test.
  */
 public class AbstractTestBase {
+    protected static final String USER = System.getProperty("user.name");
+
     protected static final String PROCESS_XML = "/config/process/process-0.1.xml";
     protected static final String FEED_XML = "/config/feed/feed-0.1.xml";
     protected static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml";
@@ -138,4 +142,15 @@ public class AbstractTestBase {
         marshaller.marshal(entity, stringWriter);
         return stringWriter.toString();
     }
+
+
+    protected String getGroupName() throws IOException {
+        return getGroupName(true);
+    }
+
+    protected String getGroupName(boolean first) throws IOException {
+        String[] groupNames = CurrentUser.getProxyUgi().getGroupNames();
+        System.out.println("groupNames = " + Arrays.asList(groupNames));
+        return groupNames.length > 1 ? groupNames[first ? 0 : groupNames.length - 1] : "admin";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index 47f63bc..a78c678 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -20,6 +20,7 @@ package org.apache.falcon.entity;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.security.CurrentUser;
@@ -167,11 +168,11 @@ public class FileSystemStorageTest {
         fs.mkdirs(path);
 
         FileSystemStorage storage = new FileSystemStorage(cluster.getConf().get("fs.default.name"), locations);
-        storage.validateACL(user, user, "0x755");
+        storage.validateACL(new TestACL(user, user, "0x755"));
 
         //-ve case
         try {
-            storage.validateACL("random", user, "0x755");
+            storage.validateACL(new TestACL("random", user, "0x755"));
             Assert.fail("Validation should have failed");
         } catch(FalconException e) {
             //expected exception
@@ -179,11 +180,11 @@ public class FileSystemStorageTest {
 
         //Timed path
         location.setPath("/foo/bar/${YEAR}/${MONTH}/${DAY}");
-        storage.validateACL(user, user, "rrr");
+        storage.validateACL(new TestACL(user, user, "rrr"));
 
         //-ve case
         try {
-            storage.validateACL("random", user, "0x755");
+            storage.validateACL(new TestACL("random", user, "0x755"));
             Assert.fail("Validation should have failed");
         } catch(FalconException e) {
             //expected exception
@@ -347,4 +348,43 @@ public class FileSystemStorageTest {
 
         Assert.assertFalse(storage1.isIdentical(storage2));
     }
+
+    private class TestACL extends AccessControlList {
+
+        /**
+         * owner is the Owner of this entity.
+         */
+        private String owner;
+
+        /**
+         * group is the one which has access to read - not used at this time.
+         */
+        private String group;
+
+        /**
+         * permission is not enforced at this time.
+         */
+        private String permission;
+
+        TestACL(String owner, String group, String permission) {
+            this.owner = owner;
+            this.group = group;
+            this.permission = permission;
+        }
+
+        @Override
+        public String getOwner() {
+            return owner;
+        }
+
+        @Override
+        public String getGroup() {
+            return group;
+        }
+
+        @Override
+        public String getPermission() {
+            return permission;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
index 55562d1..41f041a 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
@@ -157,7 +157,10 @@ public class ClusterEntityParserTest extends AbstractTestBase {
         try {
             InputStream stream = this.getClass().getResourceAsStream(CLUSTER_XML);
 
-            Cluster cluster = parser.parse(stream);
+            // need a new parser since it caches authorization enabled flag
+            ClusterEntityParser clusterEntityParser =
+                    (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER);
+            Cluster cluster = clusterEntityParser.parse(stream);
             Assert.assertNotNull(cluster);
             Assert.assertNull(cluster.getACL());
         } finally {
@@ -174,7 +177,10 @@ public class ClusterEntityParserTest extends AbstractTestBase {
         try {
             InputStream stream = this.getClass().getResourceAsStream("/config/cluster/cluster-no-registry.xml");
 
-            Cluster cluster = parser.parse(stream);
+            // need a new parser since it caches authorization enabled flag
+            ClusterEntityParser clusterEntityParser =
+                    (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER);
+            Cluster cluster = clusterEntityParser.parse(stream);
             Assert.assertNotNull(cluster);
             Assert.assertNotNull(cluster.getACL());
             Assert.assertNotNull(cluster.getACL().getOwner());

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index e669f44..98cda04 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -53,8 +53,6 @@ import static org.testng.AssertJUnit.assertEquals;
  */
 public class FeedEntityParserTest extends AbstractTestBase {
 
-    public static final String USER = System.getProperty("user.name");
-
     private final FeedEntityParser parser = (FeedEntityParser) EntityParserFactory
             .getParser(EntityType.FEED);
 
@@ -79,6 +77,7 @@ public class FeedEntityParserTest extends AbstractTestBase {
         cluster.setName("backupCluster");
         store.publish(EntityType.CLUSTER, cluster);
 
+        CurrentUser.authenticate("testuser");
         modifiableFeed = parser.parseAndValidate(this.getClass()
                 .getResourceAsStream(FEED_XML));
     }
@@ -89,21 +88,6 @@ public class FeedEntityParserTest extends AbstractTestBase {
         parser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
     }
 
-
-    @Test(expectedExceptions = ValidationException.class)
-    public void testValidateUser() throws Exception {
-        CurrentUser.authenticate("unknown");
-        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
-        Assert.assertTrue(Boolean.valueOf(
-                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
-        try {
-            parser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
-        } finally {
-            CurrentUser.authenticate("testuser");
-            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
-        }
-    }
-
     @Test
     public void testParse() throws IOException, FalconException, JAXBException {
 
@@ -171,7 +155,6 @@ public class FeedEntityParserTest extends AbstractTestBase {
         parser.validate(feed);
     }
 
-
     @Test
     public void testPartitionExpression() throws FalconException {
         Feed feed = parser.parseAndValidate(ProcessEntityParserTest.class
@@ -548,6 +531,22 @@ public class FeedEntityParserTest extends AbstractTestBase {
                 + " storage");
     }
 
+    @Test(expectedExceptions = ValidationException.class)
+    public void testValidateOwner() throws Exception {
+        CurrentUser.authenticate("unknown");
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+        try {
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser =
+                    (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+            feedEntityParser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
+        } finally {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
     @Test
     public void testValidateACLWithACLAndAuthorizationDisabled() throws Exception {
         InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
@@ -563,8 +562,7 @@ public class FeedEntityParserTest extends AbstractTestBase {
     }
 
     @Test
-    public void testValidateACLAndOwner() throws Exception {
-        CurrentUser.authenticate("testuser");
+    public void testValidateACLOwner() throws Exception {
         StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
         Assert.assertTrue(Boolean.valueOf(
                 StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
@@ -572,7 +570,10 @@ public class FeedEntityParserTest extends AbstractTestBase {
         try {
             InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
 
-            Feed feed = parser.parseAndValidate(stream);
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser =
+                    (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+            Feed feed = feedEntityParser.parseAndValidate(stream);
             Assert.assertNotNull(feed);
             Assert.assertNotNull(feed.getACL());
             Assert.assertNotNull(feed.getACL().getOwner());
@@ -621,7 +622,10 @@ public class FeedEntityParserTest extends AbstractTestBase {
         try {
             InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
 
-            Feed feed = parser.parse(stream);
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser =
+                    (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+            Feed feed = feedEntityParser.parse(stream);
 
             Assert.assertNotNull(feed);
             Assert.assertNotNull(feed.getACL());
@@ -629,17 +633,15 @@ public class FeedEntityParserTest extends AbstractTestBase {
             Assert.assertNotNull(feed.getACL().getGroup());
             Assert.assertNotNull(feed.getACL().getPermission());
 
-            parser.validate(feed);
+            feedEntityParser.validate(feed);
             Assert.fail("Validation exception should have been thrown for invalid owner");
         } finally {
-            CurrentUser.authenticate("testuser");
             StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
         }
     }
 
     @Test (expectedExceptions = ValidationException.class)
     public void testValidateACLAndStorageBadOwner() throws Exception {
-        CurrentUser.authenticate("testuser");
         StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
         Assert.assertTrue(Boolean.valueOf(
                 StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
@@ -648,7 +650,10 @@ public class FeedEntityParserTest extends AbstractTestBase {
         try {
             InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
 
-            feed = parser.parse(stream);
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser =
+                    (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+            feed = feedEntityParser.parse(stream);
             Assert.assertNotNull(feed);
             Assert.assertNotNull(feed.getACL());
             Assert.assertNotNull(feed.getACL().getOwner());
@@ -657,7 +662,38 @@ public class FeedEntityParserTest extends AbstractTestBase {
 
             // create locations
             createLocations(feed);
-            parser.validate(feed);
+            feedEntityParser.validate(feed);
+        } finally {
+            if (feed != null) {
+                deleteLocations(feed);
+            }
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testValidateACLAndStorageBadOwnerAndGroup() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+
+        Feed feed = null;
+        try {
+            InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
+
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser =
+                    (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+            feed = feedEntityParser.parse(stream);
+            Assert.assertNotNull(feed);
+            Assert.assertNotNull(feed.getACL());
+            Assert.assertNotNull(feed.getACL().getOwner());
+            Assert.assertNotNull(feed.getACL().getGroup());
+            Assert.assertNotNull(feed.getACL().getPermission());
+
+            // create locations
+            createLocations(feed);
+            feedEntityParser.validate(feed);
         } finally {
             if (feed != null) {
                 deleteLocations(feed);
@@ -667,7 +703,7 @@ public class FeedEntityParserTest extends AbstractTestBase {
     }
 
     @Test
-    public void testValidateACLAndStorage() throws Exception {
+    public void testValidateACLAndStorageForValidOwnerBadGroup() throws Exception {
         CurrentUser.authenticate(USER);
         StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
         Assert.assertTrue(Boolean.valueOf(
@@ -677,7 +713,9 @@ public class FeedEntityParserTest extends AbstractTestBase {
         try {
             InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
 
-            feed = parser.parse(stream);
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser = (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+            feed = feedEntityParser.parse(stream);
             Assert.assertNotNull(feed);
             Assert.assertNotNull(feed.getACL());
             Assert.assertNotNull(feed.getACL().getOwner());
@@ -688,7 +726,100 @@ public class FeedEntityParserTest extends AbstractTestBase {
 
             // create locations
             createLocations(feed);
-            parser.validate(feed);
+            feedEntityParser.validate(feed);
+        } finally {
+            if (feed != null) {
+                deleteLocations(feed);
+            }
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test
+    public void testValidateACLValidGroupBadOwner() throws Exception {
+        CurrentUser.authenticate(USER);
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+
+        try {
+            InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
+
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser = (FeedEntityParser) EntityParserFactory.getParser(
+                    EntityType.FEED);
+            Feed feed = feedEntityParser.parse(stream);
+            Assert.assertNotNull(feed);
+            Assert.assertNotNull(feed.getACL());
+            Assert.assertNotNull(feed.getACL().getOwner());
+            Assert.assertNotNull(feed.getACL().getGroup());
+            Assert.assertNotNull(feed.getACL().getPermission());
+
+            feed.getACL().setGroup(getGroupName());
+
+            feedEntityParser.validate(feed);
+        } finally {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testValidateACLAndStorageForInvalidOwnerAndGroup() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+
+        Feed feed = null;
+        try {
+            InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
+
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser = (FeedEntityParser) EntityParserFactory.getParser(
+                    EntityType.FEED);
+            feed = feedEntityParser.parse(stream);
+            Assert.assertNotNull(feed);
+            Assert.assertNotNull(feed.getACL());
+            Assert.assertNotNull(feed.getACL().getOwner());
+            Assert.assertNotNull(feed.getACL().getGroup());
+            Assert.assertNotNull(feed.getACL().getPermission());
+
+            // create locations
+            createLocations(feed);
+            feedEntityParser.validate(feed);
+        } finally {
+            if (feed != null) {
+                deleteLocations(feed);
+            }
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test
+    public void testValidateACLAndStorageForValidGroupBadOwner() throws Exception {
+        CurrentUser.authenticate(USER);
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+
+        Feed feed = null;
+        try {
+            InputStream stream = this.getClass().getResourceAsStream(FEED_XML);
+
+            // need a new parser since it caches authorization enabled flag
+            FeedEntityParser feedEntityParser = (FeedEntityParser) EntityParserFactory.getParser(
+                    EntityType.FEED);
+            feed = feedEntityParser.parse(stream);
+            Assert.assertNotNull(feed);
+            Assert.assertNotNull(feed.getACL());
+            Assert.assertNotNull(feed.getACL().getOwner());
+            Assert.assertNotNull(feed.getACL().getGroup());
+            Assert.assertNotNull(feed.getACL().getPermission());
+
+            feed.getACL().setGroup(getGroupName());
+
+            // create locations
+            createLocations(feed);
+            feedEntityParser.validate(feed);
         } finally {
             if (feed != null) {
                 deleteLocations(feed);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
index 93c34d2..3513dab 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
@@ -367,13 +367,16 @@ public class ProcessEntityParserTest extends AbstractTestBase {
         CurrentUser.authenticate("falcon");
 
         try {
+            // need a new parser since it caches authorization enabled flag
+            ProcessEntityParser processEntityParser =
+                    (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS);
             InputStream stream = this.getClass().getResourceAsStream(PROCESS_XML);
 
-            Process process = parser.parse(stream);
+            Process process = processEntityParser.parse(stream);
             Assert.assertNotNull(process);
             Assert.assertNull(process.getACL());
 
-            parser.validate(process);
+            processEntityParser.validate(process);
             Assert.fail("Validation exception should have been thrown for empty ACL");
         } finally {
             StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
@@ -381,7 +384,7 @@ public class ProcessEntityParserTest extends AbstractTestBase {
     }
 
     @Test
-    public void testValidateACLAuthorizationEnabled() throws Exception {
+    public void testValidateACLAuthorizationEnabledValidOwnerBadGroup() throws Exception {
         StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
         Assert.assertTrue(Boolean.valueOf(
                 StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
@@ -390,19 +393,51 @@ public class ProcessEntityParserTest extends AbstractTestBase {
         try {
             InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml");
 
-            Process process = parser.parseAndValidate(stream);
+            // need a new parser since it caches authorization enabled flag
+            ProcessEntityParser processEntityParser =
+                    (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS);
+            Process process = processEntityParser.parseAndValidate(stream);
+            Assert.assertNotNull(process);
+            Assert.assertNotNull(process.getACL());
+            Assert.assertNotNull(process.getACL().getOwner());
+            Assert.assertNotNull(process.getACL().getGroup());
+            Assert.assertNotNull(process.getACL().getPermission());
+        } finally {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test
+    public void testValidateACLAuthorizationEnabledValidGroupBadOwner() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        Assert.assertTrue(Boolean.valueOf(
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+        CurrentUser.authenticate(USER); // valid user but acl owner is falcon
+
+        try {
+            InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml");
+
+            // need a new parser since it caches authorization enabled flag
+            ProcessEntityParser processEntityParser =
+                    (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS);
+            Process process = processEntityParser.parse(stream);
             Assert.assertNotNull(process);
             Assert.assertNotNull(process.getACL());
             Assert.assertNotNull(process.getACL().getOwner());
             Assert.assertNotNull(process.getACL().getGroup());
             Assert.assertNotNull(process.getACL().getPermission());
+
+            process.getACL().setOwner(USER);
+            process.getACL().setGroup(getGroupName());
+
+            processEntityParser.validate(process);
         } finally {
             StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
         }
     }
 
     @Test (expectedExceptions = ValidationException.class)
-    public void testValidateACLAuthorizationEnabledBadOwner() throws Exception {
+    public void testValidateACLAuthorizationEnabledBadOwnerAndGroup() throws Exception {
         StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
         Assert.assertTrue(Boolean.valueOf(
                 StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
@@ -411,7 +446,10 @@ public class ProcessEntityParserTest extends AbstractTestBase {
         try {
             InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml");
 
-            Process process = parser.parse(stream);
+            // need a new parser since it caches authorization enabled flag
+            ProcessEntityParser processEntityParser =
+                    (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS);
+            Process process = processEntityParser.parse(stream);
 
             Assert.assertNotNull(process);
             Assert.assertNotNull(process.getACL());
@@ -419,7 +457,7 @@ public class ProcessEntityParserTest extends AbstractTestBase {
             Assert.assertNotNull(process.getACL().getGroup());
             Assert.assertNotNull(process.getACL().getPermission());
 
-            parser.validate(process);
+            processEntityParser.validate(process);
             Assert.fail("Validation exception should have been thrown for invalid owner");
         } finally {
             StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java b/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
index 3b4e7f0..8f25f57 100644
--- a/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
+++ b/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.hadoop;
 
 import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.ipc.RemoteException;
@@ -58,11 +59,12 @@ public class HadoopClientFactoryTest {
     @Test (enabled = false) // todo: cheated the conf to impersonate as same user
     public void testCreateFileSystemWithSameUser() {
         String user = System.getProperty("user.name");
+        CurrentUser.authenticate(user);
         try {
             Configuration conf = embeddedCluster.getConf();
             URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
             Assert.assertNotNull(uri);
-            HadoopClientFactory.get().createProxiedFileSystem(user, uri, conf);
+            HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUgi(), uri, conf);
             Assert.fail("Impersonation should have failed.");
         } catch (Exception e) {
             Assert.assertEquals(e.getCause().getClass(), RemoteException.class);
@@ -80,7 +82,7 @@ public class HadoopClientFactoryTest {
 
         URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
         Assert.assertNotNull(uri);
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem("testuser", uri, conf);
+        FileSystem fs = HadoopClientFactory.get().createFileSystem(realUser, uri, conf);
         Assert.assertNotNull(fs);
     }
 
@@ -95,7 +97,9 @@ public class HadoopClientFactoryTest {
 
         URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
         Assert.assertNotNull(uri);
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem("seetharam", uri, conf);
+
+        CurrentUser.authenticate(System.getProperty("user.name"));
+        FileSystem fs = HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUgi(), uri, conf);
         Assert.assertNotNull(fs);
     }
 }