You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/10/17 01:27:13 UTC

[1/2] FALCON-753 Change the ownership for staging dir to user submitting the feed. Contributed by Venkatesh Seetharam

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 1caadaf20 -> 15b89bc31


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index d5432eb..5c4fea3 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -21,6 +21,7 @@ package org.apache.falcon.workflow.engine;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
@@ -30,6 +31,7 @@ import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.oozie.OozieBundleBuilder;
 import org.apache.falcon.oozie.OozieEntityBuilder;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
@@ -63,6 +65,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
@@ -146,6 +149,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             OozieEntityBuilder builder = OozieEntityBuilder.get(entity);
             for (String clusterName: schedClusters) {
                 Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName);
+                prepareEntityBuildPath(entity, cluster);
                 Path buildPath = EntityUtil.getNewStagingPath(cluster, entity);
                 Properties properties = builder.build(cluster, buildPath);
                 if (properties == null) {
@@ -160,6 +164,29 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
+    /**
+     * Prepare the staging and logs dir for this entity with default permissions.
+     *
+     * @param entity  entity
+     * @param cluster cluster entity
+     * @throws FalconException
+     */
+    private void prepareEntityBuildPath(Entity entity, Cluster cluster) throws FalconException {
+        Path stagingPath = EntityUtil.getBaseStagingPath(cluster, entity);
+        Path logPath = EntityUtil.getLogPath(cluster, entity);
+
+        try {
+            HadoopClientFactory.mkdirsWithDefaultPerms(
+                    HadoopClientFactory.get().createProxiedFileSystem(
+                            ClusterHelper.getConfiguration(cluster)), stagingPath);
+            HadoopClientFactory.mkdirsWithDefaultPerms(
+                    HadoopClientFactory.get().createProxiedFileSystem(
+                            ClusterHelper.getConfiguration(cluster)), logPath);
+        } catch (IOException e) {
+            throw new FalconException("Error preparing base staging dirs: " + stagingPath, e);
+        }
+    }
+
     @Override
     public void dryRun(Entity entity, String clusterName) throws FalconException {
         OozieEntityBuilder builder = OozieEntityBuilder.get(entity);
@@ -434,7 +461,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
             for (String cluster : clusters) {
                 ProxyOozieClient client = OozieClientFactory.get(cluster);
-                List<String> wfNames = EntityUtil.getWorkflowNames(entity, cluster);
+                List<String> wfNames = EntityUtil.getWorkflowNames(entity);
                 List<WorkflowJob> wfs = getRunningWorkflows(cluster, wfNames);
                 if (wfs != null) {
                     for (WorkflowJob job : wfs) {
@@ -812,6 +839,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
+    @SuppressWarnings("MagicConstant")
     protected Map<String, List<CoordinatorAction>> getCoordActions(Entity entity, Date start, Date end,
                                                                    List<LifeCycle> lifeCycles) throws FalconException {
         Map<String, List<BundleJob>> bundlesMap = findBundles(entity);
@@ -1071,6 +1099,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return new Date(1000L * 60 * minute + date.getTime());
     }
 
+    @SuppressWarnings("MagicConstant")
     private Date getCoordLastActionTime(CoordinatorJob coord) {
         if (coord.getNextMaterializedTime() != null) {
             Calendar cal = Calendar.getInstance(EntityUtil.getTimeZone(coord.getTimeZone()));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 68911ce..74cc509 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -51,8 +51,11 @@ import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -92,7 +95,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
     @BeforeClass
     public void setUpDFS() throws Exception {
-        CurrentUser.authenticate("falcon");
+        CurrentUser.authenticate(System.getProperty("user.name"));
 
         srcMiniDFS = EmbeddedCluster.newCluster("cluster1");
         String srcHdfsUrl = srcMiniDFS.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY);
@@ -240,7 +243,8 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
     }
 
     private COORDINATORAPP getCoordinator(EmbeddedCluster cluster, String appPath) throws Exception {
-        return getCoordinator(cluster.getFileSystem(), new Path(StringUtils.removeStart(appPath, "${nameNode}")));
+        return getCoordinator(cluster.getFileSystem(),
+                new Path(StringUtils.removeStart(appPath, "${nameNode}")));
     }
 
     private String getWorkflowAppPath() {
@@ -270,7 +274,8 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
     public void testReplicationCoordsForFSStorageWithMultipleTargets() throws Exception {
         OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(fsReplFeed, Tag.REPLICATION);
 
-        List<Properties> alphaCoords = builder.buildCoords(alphaTrgCluster, new Path("/alpha/falcon/"));
+        List<Properties> alphaCoords = builder.buildCoords(alphaTrgCluster,
+                new Path("/alpha/falcon/"));
         final COORDINATORAPP alphaCoord = getCoordinator(trgMiniDFS,
             alphaCoords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
         Assert.assertEquals(alphaCoord.getStart(), "2012-10-01T12:05Z");
@@ -339,9 +344,11 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
     }
 
     public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP workflow, boolean isTable) {
-        Assert.assertEquals(EntityUtil.getWorkflowName(Tag.REPLICATION, aFeed).toString(), workflow.getName());
+        Assert.assertEquals(EntityUtil.getWorkflowName(Tag.REPLICATION, aFeed).toString(),
+                workflow.getName());
 
-        boolean preProcess = RuntimeProperties.get().getProperty("feed.late.allowed", "true").equalsIgnoreCase("true");
+        boolean preProcess = RuntimeProperties.get().getProperty("feed.late.allowed", "true").equalsIgnoreCase(
+                "true");
         if (preProcess) {
             assertAction(workflow, "pre-processing", true);
         }
@@ -421,8 +428,10 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
         HashMap<String, String> props = getCoordProperties(coord);
 
-        final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
-        final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster, tableFeed);
+        final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster,
+                tableFeed);
+        final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster,
+                tableFeed);
 
         // verify the replication param that feed replicator depends on
         Assert.assertEquals(props.get("sourceRelativePaths"), "IGNORE");
@@ -516,18 +525,40 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get(prefix + "Partition"), "${coord:dataInPartitions('input', 'hive-export')}");
     }
 
-    @Test
-    public void testRetentionCoords() throws Exception {
-        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName());
+    @DataProvider(name = "uMaskOptions")
+    private Object[][] createUMaskOptions() {
+        return new Object[][] {
+            {"000"}, // {FsAction.ALL, FsAction.ALL, FsAction.ALL},
+            {"077"}, // {FsAction.ALL, FsAction.NONE, FsAction.NONE}
+            {"027"}, // {FsAction.ALL, FsAction.READ_EXECUTE, FsAction.NONE}
+            {"017"}, // {FsAction.ALL, FsAction.READ_WRITE, FsAction.NONE}
+            {"012"}, // {FsAction.ALL, FsAction.READ_WRITE, FsAction.READ_EXECUTE}
+            {"022"}, // {FsAction.ALL, FsAction.READ_EXECUTE, FsAction.READ_EXECUTE}
+        };
+    }
+
+    @Test(dataProvider = "uMaskOptions")
+    public void testRetentionCoords(String umask) throws Exception {
+        FileSystem fs = srcMiniDFS.getFileSystem();
+        Configuration conf = fs.getConf();
+        conf.set("fs.permissions.umask-mode", umask);
+
+        // ClusterHelper constructs new fs Conf. Add it to cluster properties so that it gets added to FS conf
+        setUmaskInFsConf(srcCluster, umask);
+
+        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed,
+                srcCluster.getName());
         final Calendar instance = Calendar.getInstance();
         instance.roll(Calendar.YEAR, 1);
         cluster.getValidity().setEnd(instance.getTime());
 
         OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(feed, Tag.RETENTION);
-        List<Properties> coords = builder.buildCoords(srcCluster, new Path("/projects/falcon/"));
+        List<Properties> coords = builder.buildCoords(
+                srcCluster, new Path("/projects/falcon/" + umask));
         COORDINATORAPP coord = getCoordinator(srcMiniDFS, coords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
 
-        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
+        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(),
+                "${nameNode}/projects/falcon/" + umask + "/RETENTION");
         Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
         Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
 
@@ -553,16 +584,30 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("feedNames"), feed.getName());
         Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
 
-        assertWorkflowRetries(coord);
-        verifyEntityProperties(feed, srcCluster,
-                WorkflowExecutionContext.EntityOperations.DELETE, props);
-        verifyBrokerProperties(srcCluster, props);
+        assertWorkflowRetries(getWorkflowapp(srcMiniDFS.getFileSystem(), coord));
+
+        try {
+            verifyClusterLocationsUMask(srcCluster, fs);
+            verifyWorkflowUMask(fs, coord, umask);
+        } finally {
+            cleanupWorkflowState(fs, coord);
+            FileSystem.closeAll();
+        }
     }
 
     @Test (dataProvider = "secureOptions")
     public void testRetentionCoordsForTable(String secureOption) throws Exception {
         StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
 
+        final String umask = "000";
+
+        FileSystem fs = trgMiniDFS.getFileSystem();
+        Configuration conf = fs.getConf();
+        conf.set("fs.permissions.umask-mode", umask);
+
+        // ClusterHelper constructs new fs Conf. Add it to cluster properties so that it gets added to FS conf
+        setUmaskInFsConf(trgCluster, umask);
+
         org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(tableFeed, trgCluster.getName());
         final Calendar instance = Calendar.getInstance();
         instance.roll(Calendar.YEAR, 1);
@@ -604,9 +649,16 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
                 WorkflowExecutionContext.EntityOperations.DELETE, props);
 
         Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
-        assertHCatCredentials(
-            getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
-            new Path(coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "")).toString());
+        assertHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
+                coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""));
+
+        try {
+            verifyClusterLocationsUMask(trgCluster, fs);
+            verifyWorkflowUMask(fs, coord, umask);
+        } finally {
+            cleanupWorkflowState(fs, coord);
+            FileSystem.closeAll();
+        }
     }
 
     private void assertHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
@@ -636,4 +688,53 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
             }
         }
     }
+
+    private void verifyClusterLocationsUMask(Cluster aCluster, FileSystem fs) throws IOException {
+        String stagingLocation = ClusterHelper.getLocation(aCluster, "staging");
+        Path stagingPath = new Path(stagingLocation);
+        if (fs.exists(stagingPath)) {
+            FileStatus fileStatus = fs.getFileStatus(stagingPath);
+            Assert.assertEquals(fileStatus.getPermission().toShort(), 511);
+        }
+
+        String workingLocation = ClusterHelper.getLocation(aCluster, "working");
+        Path workingPath = new Path(workingLocation);
+        if (fs.exists(workingPath)) {
+            FileStatus fileStatus = fs.getFileStatus(workingPath);
+            Assert.assertEquals(fileStatus.getPermission().toShort(), 493);
+        }
+    }
+
+    private void verifyWorkflowUMask(FileSystem fs, COORDINATORAPP coord,
+                                     String defaultUMask) throws IOException {
+        Assert.assertEquals(fs.getConf().get("fs.permissions.umask-mode"), defaultUMask);
+
+        String appPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        Path wfPath = new Path(appPath);
+        FileStatus[] fileStatuses = fs.listStatus(wfPath);
+        for (FileStatus fileStatus : fileStatuses) {
+            Assert.assertEquals(fileStatus.getOwner(), CurrentUser.getProxyUGI().getShortUserName());
+
+            final FsPermission permission = fileStatus.getPermission();
+            if (!fileStatus.isDirectory()) {
+                Assert.assertEquals(permission.toString(),
+                        HadoopClientFactory.getFileDefaultPermission(fs.getConf()).toString());
+            }
+        }
+    }
+
+    private void cleanupWorkflowState(FileSystem fs, COORDINATORAPP coord) throws Exception {
+        String appPath = coord.getAction().getWorkflow().getAppPath();
+        Path wfPath = new Path(appPath.replace("${nameNode}", ""));
+        fs.delete(wfPath, true);
+    }
+
+    private static void setUmaskInFsConf(Cluster cluster, String umask) {
+        // ClusterHelper constructs new fs Conf. Add it to cluster properties so that it gets added to FS conf
+        org.apache.falcon.entity.v0.cluster.Property property =
+                new org.apache.falcon.entity.v0.cluster.Property();
+        property.setName("fs.permissions.umask-mode");
+        property.setValue(umask);
+        cluster.getProperties().getProperties().add(property);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index e50da2d..23e0535 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -232,13 +232,15 @@ public abstract class AbstractEntityManager {
 
     // Parallel update can get very clumsy if two feeds are updated which
     // are referred by a single process. Sequencing them.
-    public synchronized APIResult update(HttpServletRequest request, String type, String entityName, String colo,
-                                         String effectiveTimeStr) {
+    public synchronized APIResult update(HttpServletRequest request, String type, String entityName,
+                                         String colo, String effectiveTimeStr) {
         checkColo(colo);
         try {
             EntityType entityType = EntityType.valueOf(type.toUpperCase());
             Entity oldEntity = EntityUtil.getEntity(type, entityName);
             Entity newEntity = deserializeEntity(request, entityType);
+            // KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass
+            decorateEntityWithACL(newEntity);
             validate(newEntity);
 
             validateUpdate(oldEntity, newEntity);
@@ -326,6 +328,8 @@ public abstract class AbstractEntityManager {
 
         EntityType entityType = EntityType.valueOf(type.toUpperCase());
         Entity entity = deserializeEntity(request, entityType);
+        // KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass
+        decorateEntityWithACL(entity);
 
         Entity existingEntity = configStore.get(entityType, entity.getName());
         if (existingEntity != null) {
@@ -344,6 +348,50 @@ public abstract class AbstractEntityManager {
         return entity;
     }
 
+    /**
+     * KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass.
+     * existingEntity in config store will have teh decoration and equals check fails
+     * if entity passed is not decorated for checking if entity already exists.
+     *
+     * @param entity entity
+     */
+    private void decorateEntityWithACL(Entity entity) {
+        if (SecurityUtil.isAuthorizationEnabled() || EntityUtil.getACL(entity) != null) {
+            return; // not necessary to decorate
+        }
+
+        final String proxyUser = CurrentUser.getUser();
+        final String defaultGroupName = CurrentUser.getPrimaryGroupName();
+        switch (entity.getEntityType()) {
+        case CLUSTER:
+            org.apache.falcon.entity.v0.cluster.ACL clusterACL =
+                    new org.apache.falcon.entity.v0.cluster.ACL();
+            clusterACL.setOwner(proxyUser);
+            clusterACL.setGroup(defaultGroupName);
+            ((org.apache.falcon.entity.v0.cluster.Cluster) entity).setACL(clusterACL);
+            break;
+
+        case FEED:
+            org.apache.falcon.entity.v0.feed.ACL feedACL =
+                    new org.apache.falcon.entity.v0.feed.ACL();
+            feedACL.setOwner(proxyUser);
+            feedACL.setGroup(defaultGroupName);
+            ((org.apache.falcon.entity.v0.feed.Feed) entity).setACL(feedACL);
+            break;
+
+        case PROCESS:
+            org.apache.falcon.entity.v0.process.ACL processACL =
+                    new org.apache.falcon.entity.v0.process.ACL();
+            processACL.setOwner(proxyUser);
+            processACL.setGroup(defaultGroupName);
+            ((org.apache.falcon.entity.v0.process.Process) entity).setACL(processACL);
+            break;
+
+        default:
+            break;
+        }
+    }
+
     protected Entity deserializeEntity(HttpServletRequest request, EntityType entityType)
         throws IOException, FalconException {
 
@@ -612,7 +660,7 @@ public abstract class AbstractEntityManager {
     protected boolean isEntityAuthorized(Entity entity) {
         try {
             SecurityUtil.getAuthorizationProvider().authorizeResource("entities", "list",
-                    entity.getEntityType().toString(), entity.getName(), CurrentUser.getProxyUgi());
+                    entity.getEntityType().toString(), entity.getName(), CurrentUser.getProxyUGI());
         } catch (Exception e) {
             LOG.error("Authorization failed for entity=" + entity.getName()
                     + " for user=" + CurrentUser.getUser(), e);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java b/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
index b61360b..3daa419 100644
--- a/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
+++ b/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
@@ -67,7 +67,7 @@ public class FalconAuthorizationFilter implements Filter {
             LOG.info("Authorizing user={} against request={}", CurrentUser.getUser(), requestParts);
             authorizationProvider.authorizeResource(requestParts.getResource(),
                     requestParts.getAction(), requestParts.getEntityType(),
-                    requestParts.getEntityName(), CurrentUser.getProxyUgi());
+                    requestParts.getEntityName(), CurrentUser.getProxyUGI());
         }
 
         filterChain.doFilter(request, response);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index 211c8bf..2bf2a98 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -19,7 +19,9 @@ package org.apache.falcon.replication;
 
 import org.apache.commons.cli.*;
 import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -129,9 +131,10 @@ public class FeedReplicator extends Configured implements Tool {
         return listPaths;
     }
 
-    private void executePostProcessing(DistCpOptions options) throws IOException {
+    private void executePostProcessing(DistCpOptions options) throws IOException, FalconException {
         Path targetPath = options.getTargetPath();
-        FileSystem fs = targetPath.getFileSystem(getConf());
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                targetPath.toUri(), getConf());
         List<Path> inPaths = options.getSourcePaths();
         assert inPaths.size() == 1 : "Source paths more than 1 can't be handled";
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index d854bdd..4e8e1cd 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.slf4j.Logger;
@@ -130,16 +128,13 @@ public class LateDataHandler extends Configured implements Tool {
         return computedMetrics;
     }
 
-    private void persistMetrics(Map<String, Long> metrics, Path file) throws IOException, FalconException {
+    private void persistMetrics(Map<String, Long> metrics,
+                                Path file) throws IOException, FalconException {
         OutputStream out = null;
-        try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(file.toUri(), getConf());
+        try {  // created in a map job
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(file.toUri());
             out = fs.create(file);
 
-            // making sure falcon can read this file
-            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-            fs.setPermission(file, permission);
-
             for (Map.Entry<String, Long> entry : metrics.entrySet()) {
                 out.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
             }
@@ -212,7 +207,7 @@ public class LateDataHandler extends Configured implements Tool {
     }
 
     private long usage(Path inPath, Configuration conf) throws IOException, FalconException {
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(inPath.toUri(), conf);
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(inPath.toUri(), conf);
         FileStatus[] fileStatuses = fs.globStatus(inPath);
         if (fileStatuses == null || fileStatuses.length == 0) {
             return 0;
@@ -261,7 +256,7 @@ public class LateDataHandler extends Configured implements Tool {
         throws Exception {
 
         StringBuilder buffer = new StringBuilder();
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(file.toUri(), conf);
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(file.toUri(), conf);
         BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(file)));
         String line;
         try {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
index 459da84..9ee94c5 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
@@ -66,6 +66,7 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
                     continue;
                 }
 
+                // Login the user to access WfEngine as this user
                 CurrentUser.authenticate(message.getWorkflowUser());
                 String jobStatus = handler.getWfEngine().getWorkflowStatus(
                         message.getClusterName(), message.getWfId());

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index 80a3b83..9ba632e 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -99,7 +99,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
 
         final String storageEndpoint = properties.getProperty(AbstractWorkflowEngine.NAME_NODE);
         Configuration conf = LateRerunHandler.getConfiguration(storageEndpoint);
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(lateLogPath.toUri(), conf);
         if (!fs.exists(lateLogPath)) {
             LOG.warn("Late log file: {} not found", lateLogPath);
             return "";

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index bacc20f..c2cb09e 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -37,7 +37,6 @@ import org.apache.falcon.rerun.event.LaterunEvent;
 import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
 import org.apache.falcon.rerun.policy.RerunPolicyFactory;
 import org.apache.falcon.rerun.queue.DelayedQueue;
-import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.conf.Configuration;
@@ -66,7 +65,6 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
             if (wait == -1) {
                 LOG.info("Late rerun expired for entity: {} ({})", entityType, entityName);
 
-                CurrentUser.authenticate(workflowUser);
                 java.util.Properties properties =
                         this.getWfEngine().getWorkflowProperties(cluster, wfId);
                 String logDir = properties.getProperty("logDir");
@@ -77,7 +75,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
                 LOG.info("Going to delete path: {}", lateLogPath);
                 final String storageEndpoint = properties.getProperty(AbstractWorkflowEngine.NAME_NODE);
                 Configuration conf = getConfiguration(storageEndpoint);
-                FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+                FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
                 if (fs.exists(lateLogPath)) {
                     boolean deleted = fs.delete(lateLogPath, true);
                     if (deleted) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index f7a4493..67ea181 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -35,6 +35,7 @@ import org.apache.falcon.entity.common.FeedDataPath;
 import org.apache.falcon.entity.common.FeedDataPath.VARS;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -127,7 +128,8 @@ public class FeedEvictor extends Configured implements Tool {
 
         Path path = new Path(logFile);
         EvictedInstanceSerDe.serializeEvictedInstancePaths(
-                path.getFileSystem(getConf()), path, instancePaths);
+                HadoopClientFactory.get().createProxiedFileSystem(
+                        path.toUri(), getConf()), path, instancePaths);
 
         int len = buffer.length();
         if (len > 0) {
@@ -157,11 +159,10 @@ public class FeedEvictor extends Configured implements Tool {
         }
     }
 
-    private void fileSystemEvictor(String feedPath, String retentionLimit, String timeZone)
-        throws IOException, ELException {
-
+    private void fileSystemEvictor(String feedPath, String retentionLimit,
+                                   String timeZone) throws IOException, ELException, FalconException {
         Path normalizedPath = new Path(feedPath);
-        FileSystem fs = normalizedPath.getFileSystem(getConf());
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(normalizedPath.toUri());
         feedPath = normalizedPath.toUri().getPath();
         LOG.info("Normalized path: {}", feedPath);
 
@@ -526,7 +527,8 @@ public class FeedEvictor extends Configured implements Tool {
             if (isTableExternal) { // nuke the dirs if an external table
                 final String location = partitionToDrop.getLocation();
                 final Path path = new Path(location);
-                deleted = path.getFileSystem(new Configuration()).delete(path, true);
+                deleted = HadoopClientFactory.get()
+                        .createProxiedFileSystem(path.toUri()) .delete(path, true);
             }
             if (!isTableExternal || deleted) {
                 // replace ',' with ';' since message producer splits instancePaths string by ','

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
index b1e518d..c67b8fc 100644
--- a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
@@ -127,7 +127,7 @@ public class EmbeddedCluster {
         locs.getLocations().add(location);
         location = new Location();
         location.setName("working");
-        location.setPath("/project/falcon/working");
+        location.setPath("/projects/falcon/working");
         locs.getLocations().add(location);
         clusterEntity.setLocations(locs);
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 6694af1..8cd86cc 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -1014,6 +1014,7 @@ public class FalconCLIIT {
         OutputStream out = new FileOutputStream(file);
         props.setProperty("falcon.recipe.processName", context.getProcessName());
         props.setProperty("falcon.recipe.src.cluster.name", context.getClusterName());
+        props.setProperty("falcon.recipe.processEndDate", context.getProcessEndTime());
         props.setProperty("falcon.recipe.inputFeedName", context.getInputFeedName());
         props.setProperty("falcon.recipe.outputFeedName", context.getOutputFeedName());
         props.setProperty("falcon.recipe.workflow.path", TestContext.class.getResource("/fs-workflow.xml").getPath());

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
index 96c99c5..cdeba63 100644
--- a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
+++ b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
@@ -26,6 +26,7 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.latedata.LateDataHandler;
 import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.FSUtils;
 import org.apache.falcon.util.HiveTestUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -65,6 +66,7 @@ public class LateDataHandlerIT {
 
     @BeforeClass
     public void prepare() throws Exception {
+        CurrentUser.authenticate(TestContext.REMOTE_USER);
         TestContext.cleanupStore();
 
         String filePath = TestContext.overlayParametersOverTemplate(

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index ed70a0b..f7e6bdb 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -109,8 +109,8 @@ public class EntityManagerJerseyIT {
         ClientResponse response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
         context.assertSuccessful(response);
         FileSystem fs = context.getCluster().getFileSystem();
-        assertLibs(fs, new Path("/project/falcon/working/libext/FEED/retention"));
-        assertLibs(fs, new Path("/project/falcon/working/libext/PROCESS"));
+        assertLibs(fs, new Path("/projects/falcon/working/libext/FEED/retention"));
+        assertLibs(fs, new Path("/projects/falcon/working/libext/PROCESS"));
 
         String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
         Feed feed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(new File(tmpFileName));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index e9545d1..64f98d4 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -32,11 +32,13 @@ import org.apache.falcon.cli.FalconCLI;
 import org.apache.falcon.client.FalconCLIException;
 import org.apache.falcon.client.FalconClient;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.StartupProperties;
@@ -103,6 +105,7 @@ public class TestContext {
 
     protected String clusterName;
     protected String processName;
+    protected String processEndTime;
     protected String inputFeedName;
     protected String outputFeedName;
 
@@ -189,6 +192,10 @@ public class TestContext {
         return processName;
     }
 
+    public String getProcessEndTime() {
+        return processEndTime;
+    }
+
     public String getInputFeedName() {
         return inputFeedName;
     }
@@ -279,6 +286,8 @@ public class TestContext {
             try {
                 cluster = EmbeddedCluster.newCluster(overlay.get("cluster"), true);
                 clusterName = cluster.getCluster().getName();
+                deleteClusterLocations(cluster.getCluster(), cluster.getFileSystem());
+                createClusterLocations(cluster.getCluster(), cluster.getFileSystem());
             } catch (Exception e) {
                 throw new IOException("Unable to setup cluster info", e);
             }
@@ -286,6 +295,37 @@ public class TestContext {
         return submitFileToFalcon(entityType, tmpFile);
     }
 
+    public static void deleteClusterLocations(Cluster clusterEntity,
+                                              FileSystem fs) throws IOException {
+        String stagingLocation = ClusterHelper.getLocation(clusterEntity, "staging");
+        Path stagingPath = new Path(stagingLocation);
+        if (fs.exists(stagingPath)) {
+            fs.delete(stagingPath, true);
+        }
+
+        String workingLocation = ClusterHelper.getLocation(clusterEntity, "working");
+        Path workingPath = new Path(workingLocation);
+        if (fs.exists(workingPath)) {
+            fs.delete(workingPath, true);
+        }
+    }
+
+    public static void createClusterLocations(Cluster clusterEntity,
+                                              FileSystem fs) throws IOException {
+        String stagingLocation = ClusterHelper.getLocation(clusterEntity, "staging");
+        Path stagingPath = new Path(stagingLocation);
+        if (!fs.exists(stagingPath)) {
+            HadoopClientFactory.mkdirs(fs, stagingPath, HadoopClientFactory.ALL_PERMISSION);
+        }
+
+        String workingLocation = ClusterHelper.getLocation(clusterEntity, "working");
+        Path workingPath = new Path(workingLocation);
+        if (!fs.exists(workingPath)) {
+            HadoopClientFactory
+                    .mkdirs(fs, workingPath, HadoopClientFactory.READ_EXECUTE_PERMISSION);
+        }
+    }
+
     public ClientResponse submitFileToFalcon(EntityType entityType, String tmpFile) throws IOException {
 
         ServletInputStream rawlogStream = getServletInputStream(tmpFile);
@@ -376,6 +416,8 @@ public class TestContext {
         //only feeds with future dates can be scheduled
         Date endDate = new Date(System.currentTimeMillis() + 15 * 60 * 1000);
         overlay.put("feedEndDate", SchemaHelper.formatDateUTC(endDate));
+        processEndTime = SchemaHelper.formatDateUTC(endDate);
+        overlay.put("processEndDate", processEndTime);
         outputFeedName = "out" + time;
         overlay.put("outputFeedName", outputFeedName);
         processName = "p" + time;
@@ -436,6 +478,18 @@ public class TestContext {
         mkdir(fs, new Path(wfParent, "input/2012/04/20/00"));
         Path outPath = new Path(wfParent, "output");
         mkdir(fs, outPath, new FsPermission((short) 511));
+
+        // init cluster locations
+        initClusterLocations(cluster, fs);
+    }
+
+    private static void initClusterLocations(EmbeddedCluster cluster,
+                                             FileSystem fs) throws Exception {
+        String stagingPath = ClusterHelper.getLocation(cluster.getCluster(), "staging");
+        mkdir(fs, new Path(stagingPath), HadoopClientFactory.ALL_PERMISSION);
+
+        String workingPath = ClusterHelper.getLocation(cluster.getCluster(), "working");
+        mkdir(fs, new Path(workingPath), HadoopClientFactory.READ_EXECUTE_PERMISSION);
     }
 
     public static void cleanupStore() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java b/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java
index 6a65bf6..cbbf90a 100644
--- a/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java
+++ b/webapp/src/test/java/org/apache/falcon/validation/ClusterEntityValidationIT.java
@@ -20,12 +20,19 @@ package org.apache.falcon.validation;
 
 import com.sun.jersey.api.client.ClientResponse;
 import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.parser.ClusterEntityParser;
+import org.apache.falcon.entity.parser.EntityParserFactory;
+import org.apache.falcon.entity.parser.ValidationException;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.ACL;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interface;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.resource.TestContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -33,6 +40,7 @@ import org.testng.annotations.Test;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
 
@@ -41,13 +49,31 @@ import java.util.Map;
  * interface endpoints are valid.
  */
 public class ClusterEntityValidationIT {
+    private static final FsPermission OWNER_ONLY_PERMISSION =
+            new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
+
     private final TestContext context = new TestContext();
     private Map<String, String> overlay;
 
+    private final ClusterEntityParser parser =
+            (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER);
+    private Cluster cluster;
+    private FileSystem fs;
+
 
     @BeforeClass
     public void setup() throws Exception {
         TestContext.prepare();
+
+        overlay = context.getUniqueOverlay();
+        String filePath = TestContext.overlayParametersOverTemplate(
+                TestContext.CLUSTER_TEMPLATE, overlay);
+        context.setCluster(filePath);
+        InputStream stream = new FileInputStream(filePath);
+        cluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
+        Assert.assertNotNull(cluster);
+
+        fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
     }
 
     /**
@@ -89,15 +115,15 @@ public class ClusterEntityValidationIT {
         overlay = context.getUniqueOverlay();
         String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
         InputStream stream = new FileInputStream(filePath);
-        Cluster cluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
-        Assert.assertNotNull(cluster);
-        cluster.setColo("default");  // validations will be ignored if not default & tests fail
+        Cluster clusterEntity = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
+        Assert.assertNotNull(clusterEntity);
+        clusterEntity.setColo("default");  // validations will be ignored if not default & tests fail
 
-        Interface anInterface = ClusterHelper.getInterface(cluster, interfacetype);
+        Interface anInterface = ClusterHelper.getInterface(clusterEntity, interfacetype);
         anInterface.setEndpoint(endpoint);
 
         File tmpFile = TestContext.getTempFile();
-        EntityType.CLUSTER.getMarshaller().marshal(cluster, tmpFile);
+        EntityType.CLUSTER.getMarshaller().marshal(clusterEntity, tmpFile);
         ClientResponse response = context.submitFileToFalcon(EntityType.CLUSTER, tmpFile.getAbsolutePath());
         context.assertFailure(response);
     }
@@ -107,20 +133,55 @@ public class ClusterEntityValidationIT {
         overlay = context.getUniqueOverlay();
         String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
         InputStream stream = new FileInputStream(filePath);
-        Cluster cluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
-        Assert.assertNotNull(cluster);
+        Cluster clusterEntity = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
+        Assert.assertNotNull(clusterEntity);
 
         // Adding ACL with authorization disabled must not hurt
         ACL clusterACL = new ACL();
         clusterACL.setOwner(TestContext.REMOTE_USER);
         clusterACL.setGroup(TestContext.REMOTE_USER);
-        cluster.setACL(clusterACL);
+        clusterEntity.setACL(clusterACL);
 
-        cluster.setColo("default");  // validations will be ignored if not default & tests fail
+        clusterEntity.setColo("default");  // validations will be ignored if not default & tests fail
 
         File tmpFile = TestContext.getTempFile();
-        EntityType.CLUSTER.getMarshaller().marshal(cluster, tmpFile);
+        EntityType.CLUSTER.getMarshaller().marshal(clusterEntity, tmpFile);
         ClientResponse response = context.submitFileToFalcon(EntityType.CLUSTER, tmpFile.getAbsolutePath());
         context.assertSuccessful(response);
     }
+
+    @Test
+    public void testValidateClusterLocations() throws Exception {
+        TestContext.createClusterLocations(cluster, fs);
+        parser.validate(cluster);
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testValidateClusterLocationsThatDontExist() throws Exception {
+        TestContext.deleteClusterLocations(cluster, fs);
+        parser.validate(cluster);
+        Assert.fail("Should have thrown a validation exception");
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testValidateClusterLocationsThatExistWithBadOwner() throws Exception {
+        TestContext.deleteClusterLocations(cluster, fs);
+        createClusterLocationsBadPermissions(cluster);
+        parser.validate(cluster);
+        Assert.fail("Should have thrown a validation exception");
+    }
+
+    private void createClusterLocationsBadPermissions(Cluster clusterEntity) throws IOException {
+        String stagingLocation = ClusterHelper.getLocation(clusterEntity, "staging");
+        Path stagingPath = new Path(stagingLocation);
+        if (!fs.exists(stagingPath)) {
+            FileSystem.mkdirs(fs, stagingPath, OWNER_ONLY_PERMISSION);
+        }
+
+        String workingLocation = ClusterHelper.getLocation(clusterEntity, "working");
+        Path workingPath = new Path(workingLocation);
+        if (!fs.exists(workingPath)) {
+            FileSystem.mkdirs(fs, stagingPath, OWNER_ONLY_PERMISSION);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/webapp/src/test/resources/cluster-template.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/cluster-template.xml b/webapp/src/test/resources/cluster-template.xml
index bc17fb9..16b7c8c 100644
--- a/webapp/src/test/resources/cluster-template.xml
+++ b/webapp/src/test/resources/cluster-template.xml
@@ -34,7 +34,7 @@
     <locations>
         <location name="staging" path="/projects/falcon/staging"/>
         <location name="temp" path="/tmp"/>
-        <location name="working" path="/project/falcon/working"/>
+        <location name="working" path="/projects/falcon/working"/>
     </locations>
     <properties>
     </properties>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/webapp/src/test/resources/process-template.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/process-template.xml b/webapp/src/test/resources/process-template.xml
index 0add06a..cdce1b9 100644
--- a/webapp/src/test/resources/process-template.xml
+++ b/webapp/src/test/resources/process-template.xml
@@ -22,7 +22,7 @@
     <pipelines>testPipeline,dataReplicationPipeline</pipelines>
     <clusters>
         <cluster name="##src.cluster.name##">
-            <validity end="2012-04-21T00:00Z" start="2012-04-20T00:00Z"/>
+            <validity end="##processEndDate##" start="2012-04-20T00:00Z"/>
         </cluster>
     </clusters>
 


[2/2] git commit: FALCON-753 Change the ownership for staging dir to user submitting the feed. Contributed by Venkatesh Seetharam

Posted by ve...@apache.org.
FALCON-753 Change the ownership for staging dir to user submitting the feed. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/15b89bc3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/15b89bc3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/15b89bc3

Branch: refs/heads/master
Commit: 15b89bc31438fd76071585aacefd9a549d99ac38
Parents: 1caadaf
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Thu Oct 16 16:27:23 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Thu Oct 16 16:27:23 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../falcon/cleanup/AbstractCleanupHandler.java  |  53 ++++---
 .../org/apache/falcon/entity/EntityUtil.java    |  34 +++--
 .../entity/parser/ClusterEntityParser.java      |  98 +++++++------
 .../falcon/entity/parser/EntityParser.java      |   2 +-
 .../falcon/entity/store/ConfigurationStore.java |  28 ++--
 .../falcon/hadoop/HadoopClientFactory.java      |  76 +++++++---
 .../org/apache/falcon/security/CurrentUser.java |  46 +++++-
 .../security/DefaultAuthorizationProvider.java  |  21 +--
 .../workflow/WorkflowExecutionContext.java      |  12 +-
 .../falcon/cleanup/LogCleanupServiceTest.java   |   3 +-
 .../apache/falcon/entity/AbstractTestBase.java  |  49 ++++++-
 .../falcon/hadoop/HadoopClientFactoryTest.java  |   4 +-
 .../apache/falcon/security/CurrentUserTest.java |   2 +-
 docs/src/site/twiki/EntitySpecification.twiki   |   5 +
 docs/src/site/twiki/InstallationSteps.twiki     |   3 +
 docs/src/site/twiki/OnBoarding.twiki            |   4 +
 docs/src/site/twiki/Security.twiki              |  16 ++-
 .../apache/falcon/hadoop/JailedFileSystem.java  |   5 +
 .../falcon/messaging/JMSMessageConsumer.java    |   4 +
 .../falcon/messaging/JMSMessageProducer.java    |   9 +-
 .../org/apache/falcon/logging/JobLogMover.java  |   3 +-
 .../org/apache/falcon/logging/LogProvider.java  |   2 +-
 .../apache/falcon/oozie/OozieBundleBuilder.java |  34 +----
 .../falcon/oozie/OozieCoordinatorBuilder.java   |   3 +-
 .../apache/falcon/oozie/OozieEntityBuilder.java |  22 ++-
 .../OozieOrchestrationWorkflowBuilder.java      |  12 +-
 .../feed/FeedReplicationCoordinatorBuilder.java |   7 +-
 .../oozie/process/ProcessBundleBuilder.java     |   3 +-
 .../ProcessExecutionWorkflowBuilder.java        |   4 +-
 .../service/SharedLibraryHostingService.java    |  60 ++++----
 .../engine/OozieHouseKeepingService.java        |   2 +-
 .../workflow/engine/OozieWorkflowEngine.java    |  31 ++++-
 .../feed/OozieFeedWorkflowBuilderTest.java      | 139 ++++++++++++++++---
 .../falcon/resource/AbstractEntityManager.java  |  54 ++++++-
 .../security/FalconAuthorizationFilter.java     |   2 +-
 .../falcon/replication/FeedReplicator.java      |   7 +-
 .../apache/falcon/latedata/LateDataHandler.java |  17 +--
 .../rerun/handler/AbstractRerunConsumer.java    |   1 +
 .../falcon/rerun/handler/LateRerunConsumer.java |   2 +-
 .../falcon/rerun/handler/LateRerunHandler.java  |   4 +-
 .../apache/falcon/retention/FeedEvictor.java    |  14 +-
 .../falcon/cluster/util/EmbeddedCluster.java    |   2 +-
 .../java/org/apache/falcon/cli/FalconCLIIT.java |   1 +
 .../apache/falcon/late/LateDataHandlerIT.java   |   2 +
 .../falcon/resource/EntityManagerJerseyIT.java  |   4 +-
 .../org/apache/falcon/resource/TestContext.java |  54 +++++++
 .../validation/ClusterEntityValidationIT.java   |  81 +++++++++--
 webapp/src/test/resources/cluster-template.xml  |   2 +-
 webapp/src/test/resources/process-template.xml  |   2 +-
 50 files changed, 744 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d53017f..7af3263 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,8 @@ Apache Falcon (incubating) Change log
 Trunk (Unreleased)
 
   INCOMPATIBLE CHANGES
+   FALCON-753 Change the ownership for staging dir to user submitting the feed
+   (Venkatesh Seetharam)
 
   NEW FEATURES
    FALCON-687 Add hooks for extensions in Audit (Venkatesh Seetharam)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
index c315c25..cd088b2 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
@@ -22,6 +22,7 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
@@ -29,6 +30,7 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
@@ -72,9 +74,8 @@ public abstract class AbstractCleanupHandler {
                 "log.cleanup.frequency." + timeunit + ".retention", "days(1)");
     }
 
-    protected FileStatus[] getAllLogs(org.apache.falcon.entity.v0.cluster.Cluster cluster,
+    protected FileStatus[] getAllLogs(FileSystem fs, Cluster cluster,
                                       Entity entity) throws FalconException {
-        FileSystem fs = getFileSystem(cluster);
         FileStatus[] paths;
         try {
             Path logPath = getLogPath(cluster, entity);
@@ -91,27 +92,42 @@ public abstract class AbstractCleanupHandler {
         return new Path(EntityUtil.getLogPath(cluster, entity), getRelativeLogPath());
     }
 
-    protected FileSystem getFileSystem(org.apache.falcon.entity.v0.cluster.Cluster cluster)
-        throws FalconException {
+    private FileSystem getFileSystemAsEntityOwner(Cluster cluster,
+                                                  Entity entity) throws FalconException {
+        try {
+            final AccessControlList acl = EntityUtil.getACL(entity);
+            if (acl == null) {
+                throw new FalconException("ACL for entity " + entity.getName() + " is empty");
+            }
 
-        return HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+            final String proxyUser = acl.getOwner();
+            // user for proxying
+            CurrentUser.authenticate(proxyUser);
+            return HadoopClientFactory.get().createProxiedFileSystem(
+                    ClusterHelper.getConfiguration(cluster));
+        } catch (Exception e) {
+            throw new FalconException(e);
+        }
     }
 
     protected void delete(String clusterName, Entity entity, long retention) throws FalconException {
         Cluster currentCluster = STORE.get(EntityType.CLUSTER, clusterName);
-        if (isClusterInCurrentColo(currentCluster.getColo())) {
-            LOG.info("Cleaning up logs for {}: {} in cluster: {} with retention: {}",
-                    entity.getEntityType(), entity.getName(), clusterName, retention);
-            FileStatus[] logs = getAllLogs(currentCluster, entity);
-            deleteInternal(currentCluster, entity, retention, logs);
-        } else {
+        if (!isClusterInCurrentColo(currentCluster.getColo())) {
             LOG.info("Ignoring cleanup for {}: {} in cluster: {} as this does not belong to current colo",
                     entity.getEntityType(), entity.getName(), clusterName);
+            return;
         }
+
+        LOG.info("Cleaning up logs for {}: {} in cluster: {} with retention: {}",
+                entity.getEntityType(), entity.getName(), clusterName, retention);
+
+        FileSystem fs = getFileSystemAsEntityOwner(currentCluster, entity);
+        FileStatus[] logs = getAllLogs(fs, currentCluster, entity);
+        deleteInternal(fs, currentCluster, entity, retention, logs);
     }
 
-    protected void deleteInternal(Cluster cluster, Entity entity,
-                                  long retention, FileStatus[] logs) throws FalconException {
+    private void deleteInternal(FileSystem fs, Cluster cluster, Entity entity,
+                                long retention, FileStatus[] logs) throws FalconException {
         if (logs == null || logs.length == 0) {
             LOG.info("Nothing to delete for cluster: {}, entity: {}", cluster.getName(),
                     entity.getName());
@@ -123,13 +139,10 @@ public abstract class AbstractCleanupHandler {
         for (FileStatus log : logs) {
             if (now - log.getModificationTime() > retention) {
                 try {
-                    boolean isDeleted = getFileSystem(cluster).delete(log.getPath(), true);
-                    if (!isDeleted) {
-                        LOG.error("Unable to delete path: {}", log.getPath());
-                    } else {
-                        LOG.info("Deleted path: {}", log.getPath());
-                    }
-                    deleteParentIfEmpty(getFileSystem(cluster), log.getPath().getParent());
+                    boolean isDeleted = fs.delete(log.getPath(), true);
+                    LOG.error(isDeleted ? "Deleted path: {}" : "Unable to delete path: {}",
+                            log.getPath());
+                    deleteParentIfEmpty(fs, log.getPath().getParent());
                 } catch (IOException e) {
                     throw new FalconException(" Unable to delete log file : "
                             + log.getPath() + " for entity " + entity.getName()

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index b8f2d7d..1a10986 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -27,6 +27,7 @@ import org.apache.falcon.Pair;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.WorkflowNameBuilder.WorkflowName;
 import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
@@ -197,6 +198,7 @@ public final class EntityUtil {
     }
 
     public static int getParallel(Feed feed) {
+        // todo - how this this supposed to work?
         return 1;
     }
 
@@ -442,7 +444,7 @@ public final class EntityUtil {
         return builder.getWorkflowTag(workflowName);
     }
 
-    public static List<String> getWorkflowNames(Entity entity, String cluster) {
+    public static List<String> getWorkflowNames(Entity entity) {
         switch(entity.getEntityType()) {
         case FEED:
             return Arrays.asList(getWorkflowName(Tag.RETENTION, entity).toString(),
@@ -581,20 +583,16 @@ public final class EntityUtil {
         Entity entity)
         throws FalconException {
         Path basePath = getBaseStagingPath(cluster, entity);
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                ClusterHelper.getConfiguration(cluster));
         try {
-            FileStatus[] filesArray = fs.listStatus(basePath, new PathFilter() {
+            return fs.listStatus(basePath, new PathFilter() {
                 @Override
                 public boolean accept(Path path) {
-                    if (path.getName().equals("logs")) {
-                        return false;
-                    }
-                    return true;
+                    return !path.getName().equals("logs");
                 }
             });
 
-            return filesArray;
-
         } catch (FileNotFoundException e) {
             LOG.info("Staging path " + basePath + " doesn't exist, entity is not scheduled");
             //Staging path doesn't exist if entity is not scheduled
@@ -755,4 +753,22 @@ public final class EntityUtil {
         return new Pair<Date, Date>(clusterMinStartDate.first, clusterMaxEndDate.first);
     }
 
+    public static AccessControlList getACL(Entity entity) {
+        switch (entity.getEntityType()) {
+        case CLUSTER:
+            return ((org.apache.falcon.entity.v0.cluster.Cluster) entity).getACL();
+
+        case FEED:
+            return ((org.apache.falcon.entity.v0.feed.Feed) entity).getACL();
+
+        case PROCESS:
+            return ((org.apache.falcon.entity.v0.process.Process) entity).getACL();
+
+        default:
+            break;
+        }
+
+        throw new IllegalArgumentException("Unknown entity type: " + entity.getEntityType()
+                + " for: " + entity.getName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index fbbdbcb..7c4b99d 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
@@ -52,7 +52,7 @@ import java.io.IOException;
  */
 public class ClusterEntityParser extends EntityParser<Cluster> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ProcessEntityParser.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ClusterEntityParser.class);
 
     public ClusterEntityParser() {
         super(EntityType.CLUSTER);
@@ -122,9 +122,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
                 conf.set(SecurityUtil.NN_PRINCIPAL, nameNodePrincipal);
             }
 
-            // todo: ideally check if the end user has access using createProxiedFileSystem
-            // hftp won't work and bug is logged at HADOOP-10215
-            HadoopClientFactory.get().createFileSystem(conf);
+            HadoopClientFactory.get().createProxiedFileSystem(conf);
         } catch (FalconException e) {
             throw new ValidationException("Invalid storage server or port: " + storageUrl, e);
         }
@@ -135,7 +133,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
         LOG.info("Validating execute interface: {}", executeUrl);
 
         try {
-            HadoopClientFactory.validateJobClient(executeUrl);
+            HadoopClientFactory.get().validateJobClient(executeUrl);
         } catch (IOException e) {
             throw new ValidationException("Invalid Execute server or port: " + executeUrl, e);
         }
@@ -231,55 +229,69 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
     }
 
     /**
-     * Validate the locations on the cluster is owned by falcon.
+     * Validate the locations on the cluster exists with appropriate permissions
+     * for the user to write to this directory.
      *
      * @param cluster cluster entity
      * @throws ValidationException
      */
     private void validateLocations(Cluster cluster) throws ValidationException {
+        Configuration conf = ClusterHelper.getConfiguration(cluster);
+        FileSystem fs;
         try {
-            Configuration conf = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
-            for (Location location : cluster.getLocations().getLocations()) {
-                if (location.getName().equals("temp")) {
-                    continue;
-                }
-
-                try {
-                    Path locationPath = new Path(location.getPath());
-                    if (fs.exists(locationPath)) {
-                        FileStatus fileStatus = fs.getFileStatus(locationPath);
-                        checkPathPermissions(locationPath, fileStatus);
-                        checkPathOwner(locationPath, fileStatus);
-                    }
-                } catch (IOException e) {
-                    throw new ValidationException("Unable to validate the location " + location
-                            + "for cluster.", e);
-                }
-            }
+            fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
         } catch (FalconException e) {
-            throw new ValidationException("Unable to validate the locations for cluster.", e);
+            throw new ValidationException(
+                    "Unable to get file system handle for cluster " + cluster.getName(), e);
         }
-    }
 
-    private void checkPathPermissions(Path locationPath,
-                                      FileStatus fileStatus) throws ValidationException {
-        if (fileStatus.getPermission().getUserAction() != FsAction.ALL) {
-            LOG.error("Path {} doesn't have rwx permissions {}",
-                    locationPath, fileStatus.getPermission());
-            throw new ValidationException("Path " + locationPath
-                    + " doesn't have rwx permissions: " + fileStatus.getPermission());
+        for (Location location : cluster.getLocations().getLocations()) {
+            final String locationName = location.getName();
+            if (locationName.equals("temp")) {
+                continue;
+            }
+
+            try {
+                checkPathOwnerAndPermission(cluster.getName(), location.getPath(), fs,
+                        "staging".equals(locationName)
+                                ? HadoopClientFactory.ALL_PERMISSION
+                                : HadoopClientFactory.READ_EXECUTE_PERMISSION);
+            } catch (IOException e) {
+                throw new ValidationException("Unable to validate the location " + location
+                        + " for cluster " + cluster.getName(), e);
+            }
         }
     }
 
-    private void checkPathOwner(Path locationPath,
-                                FileStatus fileStatus) throws IOException, ValidationException {
-        final String owner = UserGroupInformation.getLoginUser().getShortUserName();
-        if (!fileStatus.getOwner().equals(owner)) {
-            LOG.error("Path {} with owner {} doesn't match the actual path owner {}",
-                    locationPath, owner, fileStatus.getOwner());
-            throw new ValidationException("Path [" + locationPath + "] with owner [" + owner
-                    + "] doesn't match the actual path  owner " + fileStatus.getOwner());
+    private void checkPathOwnerAndPermission(String clusterName, String location, FileSystem fs,
+                                             FsPermission expectedPermission)
+        throws IOException, ValidationException {
+
+        Path locationPath = new Path(location);
+        FileStatus fileStatus = fs.getFileStatus(locationPath);
+        if (!fs.exists(locationPath)) {
+            throw new ValidationException("Location " + location
+                    + " for cluster " + clusterName + " must exist.");
         }
+
+        // falcon owns this path on each cluster
+        final String loginUser = UserGroupInformation.getLoginUser().getShortUserName();
+        final String locationOwner = fileStatus.getOwner();
+        if (!locationOwner.equals(loginUser)) {
+            LOG.error("Location {} has owner {}, should be the process user {}",
+                    locationPath, locationOwner, loginUser);
+            throw new ValidationException("Path [" + locationPath + "] has owner [" + locationOwner
+                    + "], should be the process user " + loginUser);
+        }
+
+        if (fileStatus.getPermission().toShort() != expectedPermission.toShort()) {
+            LOG.error("Location {} has permissions {}, should be {}",
+                    locationPath, fileStatus.getPermission(), expectedPermission);
+            throw new ValidationException("Path " + locationPath + " has permissions: "
+                    + fileStatus.getPermission() + ", should be " + expectedPermission);
+        }
+
+        // try to list to see if the user is able to write to this folder
+        fs.listStatus(locationPath);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
index 8a3f669..ac58280 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
@@ -119,7 +119,7 @@ public abstract class EntityParser<T extends Entity> {
                              AccessControlList acl) throws AuthorizationException {
         try {
             SecurityUtil.getAuthorizationProvider().authorizeEntity(entityName,
-                    getEntityType().name(), acl, "validate", CurrentUser.getProxyUgi());
+                    getEntityType().name(), acl, "validate", CurrentUser.getProxyUGI());
         } catch (FalconException e) {
             throw new AuthorizationException(e);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index 25fc21b..62a3e5b 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -58,22 +58,16 @@ public final class ConfigurationStore implements FalconService {
     private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT");
     private static final String UTF_8 = CharEncoding.UTF_8;
 
-    private static final ConfigurationStore STORE = new ConfigurationStore();
+    private static final FsPermission STORE_PERMISSION =
+            new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
 
     private Set<ConfigurationChangeListener> listeners = new LinkedHashSet<ConfigurationChangeListener>();
 
     private ThreadLocal<Entity> updatesInProgress = new ThreadLocal<Entity>();
 
-    public static ConfigurationStore get() {
-        return STORE;
-    }
-
     private final Map<EntityType, ConcurrentHashMap<String, Entity>> dictionary
         = new HashMap<EntityType, ConcurrentHashMap<String, Entity>>();
 
-    private final FileSystem fs;
-    private final Path storePath;
-
     private static final Entity NULL = new Entity() {
         @Override
         public String getName() {
@@ -81,6 +75,15 @@ public final class ConfigurationStore implements FalconService {
         }
     };
 
+    private static final ConfigurationStore STORE = new ConfigurationStore();
+
+    public static ConfigurationStore get() {
+        return STORE;
+    }
+
+    private final FileSystem fs;
+    private final Path storePath;
+
     private ConfigurationStore() {
         for (EntityType type : EntityType.values()) {
             dictionary.put(type, new ConcurrentHashMap<String, Entity>());
@@ -98,13 +101,12 @@ public final class ConfigurationStore implements FalconService {
      */
     private FileSystem initializeFileSystem() {
         try {
-            FileSystem fileSystem = HadoopClientFactory.get().createFileSystem(storePath.toUri());
+            FileSystem fileSystem =
+                    HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
             if (!fileSystem.exists(storePath)) {
                 LOG.info("Creating configuration store directory: {}", storePath);
-                fileSystem.mkdirs(storePath);
                 // set permissions so config store dir is owned by falcon alone
-                FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
-                fileSystem.setPermission(storePath, permission);
+                HadoopClientFactory.mkdirs(fileSystem, storePath, STORE_PERMISSION);
             }
 
             return fileSystem;
@@ -331,7 +333,7 @@ public final class ConfigurationStore implements FalconService {
      */
     private void archive(EntityType type, String name) throws IOException {
         Path archivePath = new Path(storePath, "archive" + Path.SEPARATOR + type);
-        fs.mkdirs(archivePath);
+        HadoopClientFactory.mkdirs(fs, archivePath, STORE_PERMISSION);
         fs.rename(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml"),
                 new Path(archivePath, URLEncoder.encode(name, UTF_8) + "." + System.currentTimeMillis()));
         LOG.info("Archived configuration {}/{}", type, name);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
index ecdbf14..3011b65 100644
--- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
+++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
@@ -26,6 +26,9 @@ import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -44,6 +47,11 @@ public final class HadoopClientFactory {
     public static final String MR_JT_ADDRESS_KEY = "mapreduce.jobtracker.address";
     public static final String YARN_RM_ADDRESS_KEY = "yarn.resourcemanager.address";
 
+    public static final FsPermission READ_EXECUTE_PERMISSION =
+            new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.READ_EXECUTE);
+    public static final FsPermission ALL_PERMISSION =
+            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+
     private static final HadoopClientFactory INSTANCE = new HadoopClientFactory();
 
     private HadoopClientFactory() {
@@ -61,7 +69,7 @@ public final class HadoopClientFactory {
      * @throws org.apache.falcon.FalconException
      *          if the filesystem could not be created.
      */
-    public FileSystem createFileSystem(final URI uri) throws FalconException {
+    public FileSystem createFalconFileSystem(final URI uri) throws FalconException {
         Validate.notNull(uri, "uri cannot be null");
 
         try {
@@ -76,7 +84,7 @@ public final class HadoopClientFactory {
         }
     }
 
-    public FileSystem createFileSystem(final Configuration conf)
+    public FileSystem createFalconFileSystem(final Configuration conf)
         throws FalconException {
         Validate.notNull(conf, "configuration cannot be null");
 
@@ -90,17 +98,6 @@ public final class HadoopClientFactory {
         }
     }
 
-    public FileSystem createFileSystem(final URI uri, final Configuration conf)
-        throws FalconException {
-        Validate.notNull(uri, "uri cannot be null");
-
-        try {
-            return createFileSystem(UserGroupInformation.getLoginUser(), uri, conf);
-        } catch (IOException e) {
-            throw new FalconException("Exception while getting FileSystem for: " + uri, e);
-        }
-    }
-
     /**
      * Return a FileSystem created with the authenticated proxy user for the specified conf.
      *
@@ -115,7 +112,7 @@ public final class HadoopClientFactory {
 
         String nameNode = conf.get(FS_DEFAULT_NAME_KEY);
         try {
-            return createFileSystem(CurrentUser.getProxyUgi(), new URI(nameNode), conf);
+            return createFileSystem(getProxyUGI(), new URI(nameNode), conf);
         } catch (URISyntaxException e) {
             throw new FalconException("Exception while getting FileSystem for proxy: "
                     + CurrentUser.getUser(), e);
@@ -125,6 +122,32 @@ public final class HadoopClientFactory {
         }
     }
 
+    public FileSystem createProxiedFileSystem(final URI uri) throws FalconException {
+        return createProxiedFileSystem(uri, new Configuration());
+    }
+
+    public FileSystem createProxiedFileSystem(final URI uri,
+                                              final Configuration conf) throws FalconException {
+        Validate.notNull(uri, "uri cannot be null");
+
+        try {
+            return createFileSystem(getProxyUGI(), uri, conf);
+        } catch (IOException e) {
+            throw new FalconException("Exception while getting FileSystem for proxy: "
+                    + CurrentUser.getUser(), e);
+        }
+    }
+
+    private UserGroupInformation getProxyUGI() throws IOException {
+        try { // get the authenticated user
+            return CurrentUser.getProxyUGI();
+        } catch (Exception ignore) {
+            // ignore since the user authentication might have failed or in oozie
+        }
+
+        return UserGroupInformation.getCurrentUser();
+    }
+
     /**
      * Return a FileSystem created with the provided user for the specified URI.
      *
@@ -136,8 +159,8 @@ public final class HadoopClientFactory {
      *          if the filesystem could not be created.
      */
     @SuppressWarnings("ResultOfMethodCallIgnored")
-    public FileSystem createFileSystem(UserGroupInformation ugi, final URI uri, final Configuration conf)
-        throws FalconException {
+    public FileSystem createFileSystem(UserGroupInformation ugi, final URI uri,
+                                       final Configuration conf) throws FalconException {
         Validate.notNull(ugi, "ugi cannot be null");
         Validate.notNull(conf, "configuration cannot be null");
 
@@ -172,7 +195,7 @@ public final class HadoopClientFactory {
      * @param executeUrl jt url or RM url
      * @throws IOException
      */
-    public static void validateJobClient(String executeUrl) throws IOException {
+    public void validateJobClient(String executeUrl) throws IOException {
         final JobConf jobConf = new JobConf();
         jobConf.set(MR_JT_ADDRESS_KEY, executeUrl);
         jobConf.set(YARN_RM_ADDRESS_KEY, executeUrl);
@@ -190,4 +213,23 @@ public final class HadoopClientFactory {
             throw new IOException("Exception creating job client:" + e.getMessage(), e);
         }
     }
+
+    public static FsPermission getDirDefaultPermission(Configuration conf) {
+        return FsPermission.getDirDefault().applyUMask(FsPermission.getUMask(conf));
+    }
+
+    public static FsPermission getFileDefaultPermission(Configuration conf) {
+        return FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf));
+    }
+
+    public static void mkdirsWithDefaultPerms(FileSystem fs, Path path) throws IOException {
+        mkdirs(fs, path, getDirDefaultPermission(fs.getConf()));
+    }
+
+    public static void mkdirs(FileSystem fs, Path path,
+                              FsPermission permission) throws IOException {
+        if (!FileSystem.mkdirs(fs, path, permission)) {
+            throw new IOException("mkdir failed for " + path);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/security/CurrentUser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
index b7d2c66..cfea143 100644
--- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java
+++ b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
@@ -49,7 +49,7 @@ public final class CurrentUser {
 
     private final ThreadLocal<Subject> currentSubject = new ThreadLocal<Subject>();
 
-    public static void authenticate(String user) {
+    public static void authenticate(final String user) {
         if (user == null || user.isEmpty()) {
             throw new IllegalStateException("Bad user name sent for authentication");
         }
@@ -59,6 +59,13 @@ public final class CurrentUser {
 
         Subject subject = new Subject();
         subject.getPrincipals().add(new FalconPrincipal(user));
+
+        try {  // initialize proxy user
+            createProxyUGI(user);
+        } catch (IOException e) {
+            throw new IllegalStateException("Unable to create a proxy user");
+        }
+
         LOG.info("Logging in {}", user);
         INSTANCE.currentSubject.set(subject);
     }
@@ -93,19 +100,38 @@ public final class CurrentUser {
             new ConcurrentHashMap<String, UserGroupInformation>();
 
     /**
+     * Create a proxy UGI object for the current authenticated user.
+     *
+     * @param proxyUser logged in user
+     * @return UGI object
+     * @throws IOException
+     */
+    public static UserGroupInformation createProxyUGI(String proxyUser) throws IOException {
+        UserGroupInformation proxyUgi = userUgiMap.get(proxyUser);
+        if (proxyUgi == null) {
+            // taking care of a race condition, the latest UGI will be discarded
+            proxyUgi = UserGroupInformation.createProxyUser(
+                    proxyUser, UserGroupInformation.getLoginUser());
+            userUgiMap.putIfAbsent(proxyUser, proxyUgi);
+        }
+
+        return proxyUgi;
+    }
+
+    /**
      * Dole out a proxy UGI object for the current authenticated user.
      *
      * @return UGI object
      * @throws java.io.IOException
      */
-    public static UserGroupInformation getProxyUgi() throws IOException {
+    public static UserGroupInformation getProxyUGI() throws IOException {
         String proxyUser = getUser();
 
         UserGroupInformation proxyUgi = userUgiMap.get(proxyUser);
         if (proxyUgi == null) {
             // taking care of a race condition, the latest UGI will be discarded
-            proxyUgi = UserGroupInformation
-                    .createProxyUser(proxyUser, UserGroupInformation.getLoginUser());
+            proxyUgi = UserGroupInformation.createProxyUser(
+                    proxyUser, UserGroupInformation.getLoginUser());
             userUgiMap.putIfAbsent(proxyUser, proxyUgi);
         }
 
@@ -113,7 +139,17 @@ public final class CurrentUser {
     }
 
     public static Set<String> getGroupNames() throws IOException {
-        HashSet<String> s = new HashSet<String>(Arrays.asList(getProxyUgi().getGroupNames()));
+        HashSet<String> s = new HashSet<String>(Arrays.asList(getProxyUGI().getGroupNames()));
         return Collections.unmodifiableSet(s);
     }
+
+    public static String getPrimaryGroupName() {
+        try {
+            return getProxyUGI().getPrimaryGroupName();
+        } catch (IOException ignore) {
+            // ignored
+        }
+
+        return "unknown"; // this can only happen in tests
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
index e90518d..4b7c4a9 100644
--- a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
+++ b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
@@ -25,7 +25,6 @@ import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -284,7 +283,7 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
         if (entityName != null) { // lifecycle actions
             Entity entity = getEntity(entityName, entityType);
             authorizeEntity(entity.getName(), entity.getEntityType().name(),
-                    getACL(entity), action, proxyUgi);
+                    EntityUtil.getACL(entity), action, proxyUgi);
         } else {
             // non lifecycle actions, lifecycle actions with null entity will validate later
             LOG.info("Authorization for action={} will be done in the API", action);
@@ -300,24 +299,6 @@ public class DefaultAuthorizationProvider implements AuthorizationProvider {
         }
     }
 
-    protected AccessControlList getACL(Entity entity) throws AuthorizationException {
-        switch (entity.getEntityType()) {
-        case CLUSTER:
-            return ((Cluster) entity).getACL();
-
-        case FEED:
-            return ((org.apache.falcon.entity.v0.feed.Feed) entity).getACL();
-
-        case PROCESS:
-            return ((org.apache.falcon.entity.v0.process.Process) entity).getACL();
-
-        default:
-            break;
-        }
-
-        throw new AuthorizationException("Cannot get owner for entity: " + entity.getName());
-    }
-
     protected void authorizeLineageResource(String authenticatedUser, String action) {
         LOG.debug("User {} authorized for action {} ", authenticatedUser, action);
         // todo - do nothing for now, read-only for all

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index f3e643e..ebd4dd4 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -30,8 +30,6 @@ import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -285,13 +283,8 @@ public class WorkflowExecutionContext {
         OutputStream out = null;
         Path file = new Path(contextFile);
         try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(file.toUri());
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(file.toUri());
             out = fs.create(file);
-
-            // making sure falcon can read this file
-            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-            fs.setPermission(file, permission);
-
             out.write(JSONValue.toJSONString(context).getBytes());
         } catch (IOException e) {
             throw new FalconException("Error serializing context to: " + contextFile,  e);
@@ -315,7 +308,8 @@ public class WorkflowExecutionContext {
     public static WorkflowExecutionContext deSerialize(String contextFile) throws FalconException {
         try {
             Path lineageDataPath = new Path(contextFile); // file has 777 permissions
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(lineageDataPath.toUri());
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    lineageDataPath.toUri());
 
             BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(lineageDataPath)));
             return new WorkflowExecutionContext((Map<WorkflowExecutionArgs, String>) JSONValue.parse(in));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
index 6ad742e..8e2e544 100644
--- a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
@@ -25,6 +25,7 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
@@ -69,7 +70,7 @@ public class LogCleanupServiceTest extends AbstractTestBase {
     @Override
     @BeforeClass
     public void setup() throws Exception {
-        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+        this.dfsCluster = EmbeddedCluster.newCluster("testCluster", CurrentUser.getUser());
         conf = dfsCluster.getConf();
         fs = dfsCluster.getFileSystem();
         fs.delete(new Path("/"), true);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index 2d41661..a1319b8 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -91,6 +91,9 @@ public class AbstractTestBase {
     }
 
     protected void storeEntity(EntityType type, String name) throws Exception {
+        final String proxyUser = CurrentUser.getUser();
+        final String defaultGroupName = CurrentUser.getPrimaryGroupName();
+
         Unmarshaller unmarshaller = type.getUnmarshaller();
         store = ConfigurationStore.get();
         store.remove(type, name);
@@ -100,12 +103,16 @@ public class AbstractTestBase {
             cluster.setName(name);
             ClusterHelper.getInterface(cluster, Interfacetype.WRITE)
                     .setEndpoint(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
+            decorateACL(proxyUser, defaultGroupName, cluster);
+
             store.publish(type, cluster);
             break;
 
         case FEED:
             Feed feed = (Feed) unmarshaller.unmarshal(this.getClass().getResource(FEED_XML));
             feed.setName(name);
+            decorateACL(proxyUser, defaultGroupName, feed);
+
             store.publish(type, feed);
             break;
 
@@ -117,12 +124,52 @@ public class AbstractTestBase {
             if (!fs.exists(new Path(process.getWorkflow() + "/lib"))) {
                 fs.mkdirs(new Path(process.getWorkflow() + "/lib"));
             }
+
+            decorateACL(proxyUser, defaultGroupName, process);
+
             store.publish(type, process);
             break;
         default:
         }
     }
 
+    private void decorateACL(String proxyUser, String defaultGroupName, Cluster cluster) {
+        if (cluster.getACL() != null) {
+            return;
+        }
+
+        org.apache.falcon.entity.v0.cluster.ACL clusterACL =
+                new org.apache.falcon.entity.v0.cluster.ACL();
+        clusterACL.setOwner(proxyUser);
+        clusterACL.setGroup(defaultGroupName);
+        cluster.setACL(clusterACL);
+    }
+
+    private void decorateACL(String proxyUser, String defaultGroupName, Feed feed) {
+        if (feed.getACL() != null) {
+            return;
+        }
+
+        org.apache.falcon.entity.v0.feed.ACL feedACL =
+                new org.apache.falcon.entity.v0.feed.ACL();
+        feedACL.setOwner(proxyUser);
+        feedACL.setGroup(defaultGroupName);
+        feed.setACL(feedACL);
+    }
+
+    private void decorateACL(String proxyUser, String defaultGroupName,
+                             Process process) {
+        if (process.getACL() != null) {
+            return;
+        }
+
+        org.apache.falcon.entity.v0.process.ACL processACL =
+                new org.apache.falcon.entity.v0.process.ACL();
+        processACL.setOwner(proxyUser);
+        processACL.setGroup(defaultGroupName);
+        process.setACL(processACL);
+    }
+
     public void setup() throws Exception {
         store = ConfigurationStore.get();
         for (EntityType type : EntityType.values()) {
@@ -147,7 +194,7 @@ public class AbstractTestBase {
 
     // assumes there will always be at least one group for a logged in user
     protected String getGroupName() throws IOException {
-        String[] groupNames = CurrentUser.getProxyUgi().getGroupNames();
+        String[] groupNames = CurrentUser.getProxyUGI().getGroupNames();
         System.out.println("groupNames = " + Arrays.asList(groupNames));
         return groupNames[0];
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java b/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
index 8f25f57..1cb08ac 100644
--- a/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
+++ b/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
@@ -64,7 +64,7 @@ public class HadoopClientFactoryTest {
             Configuration conf = embeddedCluster.getConf();
             URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
             Assert.assertNotNull(uri);
-            HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUgi(), uri, conf);
+            HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUGI(), uri, conf);
             Assert.fail("Impersonation should have failed.");
         } catch (Exception e) {
             Assert.assertEquals(e.getCause().getClass(), RemoteException.class);
@@ -99,7 +99,7 @@ public class HadoopClientFactoryTest {
         Assert.assertNotNull(uri);
 
         CurrentUser.authenticate(System.getProperty("user.name"));
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUgi(), uri, conf);
+        FileSystem fs = HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUGI(), uri, conf);
         Assert.assertNotNull(fs);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
index a1861e1..187d85e 100644
--- a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
+++ b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
@@ -37,7 +37,7 @@ public class CurrentUserTest {
     @Test
     public void testGetProxyUser() throws Exception {
         CurrentUser.authenticate("proxy");
-        UserGroupInformation proxyUgi = CurrentUser.getProxyUgi();
+        UserGroupInformation proxyUgi = CurrentUser.getProxyUGI();
         Assert.assertNotNull(proxyUgi);
         Assert.assertEquals(proxyUgi.getUserName(), "proxy");
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index a710dfd..b0dfb7f 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -69,6 +69,11 @@ Location has the name and the path, name is the type of locations like staging,
 and path is the hdfs path for each location.
 Falcon would use the location to do intermediate processing of entities in hdfs and hence Falcon
 should have read/write/execute permission on these locations.
+These locations MUST be created prior to submitting a cluster entity to Falcon.
+*staging* must have 777 permissions and the parent dirs must have execute permissions so multiple
+users can write to this location
+*working* must have 755 permissions and the parent dirs must have execute permissions so multiple
+users can read from this location
 
 ---+++ ACL
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/docs/src/site/twiki/InstallationSteps.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/InstallationSteps.twiki b/docs/src/site/twiki/InstallationSteps.twiki
index 9cddcbb..88f3226 100644
--- a/docs/src/site/twiki/InstallationSteps.twiki
+++ b/docs/src/site/twiki/InstallationSteps.twiki
@@ -261,6 +261,9 @@ src/bin/package.sh <<hadoop-version>> <<oozie-version>>
 bin/falcon-start
 </verbatim>
 Make sure the hadoop and oozie endpoints are according to your setup in examples/entity/filesystem/standalone-cluster.xml
+The cluster locations,staging and working dirs, MUST be created prior to submitting a cluster entity to Falcon.
+*staging* must have 777 permissions and the parent dirs must have execute permissions
+*working* must have 755 permissions and the parent dirs must have execute permissions
 <verbatim>
 bin/falcon entity -submit -type cluster -file examples/entity/filesystem/standalone-cluster.xml
 </verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/docs/src/site/twiki/OnBoarding.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/OnBoarding.twiki b/docs/src/site/twiki/OnBoarding.twiki
index 75d0d7e..4f49c5a 100644
--- a/docs/src/site/twiki/OnBoarding.twiki
+++ b/docs/src/site/twiki/OnBoarding.twiki
@@ -16,6 +16,10 @@
 ---+++ Sample Pipeline
 ---++++ Cluster   
 Cluster definition that contains end points for name node, job tracker, oozie and jms server:
+The cluster locations MUST be created prior to submitting a cluster entity to Falcon.
+*staging* must have 777 permissions and the parent dirs must have execute permissions
+*working* must have 755 permissions and the parent dirs must have execute permissions
+
 <verbatim>
 <?xml version="1.0"?>
 <!--

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/docs/src/site/twiki/Security.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Security.twiki b/docs/src/site/twiki/Security.twiki
index 4e33182..f4db28b 100644
--- a/docs/src/site/twiki/Security.twiki
+++ b/docs/src/site/twiki/Security.twiki
@@ -301,11 +301,17 @@ Falcon should be configured to communicate with Prism over TLS in secure mode. I
 
 ---++ Changes to ownership and permissions of directories managed by Falcon
 
-| *Directory*             | *Location*                                                        | *Owner* | *Permissions* |
-| Configuration Store     | ${config.store.uri}                                               | falcon  | 750           |
-| Oozie coord/bundle XMLs | ${cluster.staging-location}/workflows/{entity}/{entity-name}      | falcon  | 644           |
-| Shared libs             | {cluster.working}/{lib,libext}                                    | falcon  | 755           |
-| App logs                | ${cluster.staging-location}/workflows/{entity}/{entity-name}/logs | falcon  | 777           |
+| *Directory*              | *Location*                                                        | *Owner* | *Permissions* |
+| Configuration Store      | ${config.store.uri}                                               | falcon  | 700           |
+| Cluster Staging Location | ${cluster.staging-location}                                       | falcon  | 777           |
+| Cluster Working Location | ${cluster.working-location}                                       | falcon  | 755           |
+| Shared libs              | {cluster.working}/{lib,libext}                                    | falcon  | 755           |
+| Oozie coord/bundle XMLs  | ${cluster.staging-location}/workflows/{entity}/{entity-name}      | $user   | cluster umask |
+| App logs                 | ${cluster.staging-location}/workflows/{entity}/{entity-name}/logs | $user   | cluster umask |
+
+*Note:* Please note that the cluster staging and working locations MUST be created prior to
+submitting a cluster entity to Falcon. Also, note that the the parent dirs must have execute
+permissions.
 
 
 ---++ Backwards compatibility

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
index 72e390e..7156bbd 100644
--- a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
+++ b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
@@ -142,6 +142,11 @@ public class JailedFileSystem extends FileSystem {
     }
 
     @Override
+    public void setPermission(Path p, FsPermission permission) throws IOException {
+        localFS.setPermission(toLocalPath(p), permission);
+    }
+
+    @Override
     public FileChecksum getFileChecksum(Path f) throws IOException {
         final byte[] md5 = DigestUtils.md5(FileUtils.readFileToByteArray(new File(toLocalPath(f).toString())));
         return new FileChecksum() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index 4a0bc2a..300fecf 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -21,6 +21,7 @@ package org.apache.falcon.messaging;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
@@ -99,6 +100,9 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
             WorkflowExecutionContext context = createContext(mapMessage);
             LOG.info("Created context from JMS message {}", context);
 
+            // Login the user so listeners can access FS and WfEngine as this user
+            CurrentUser.authenticate(context.getWorkflowUser());
+
             if (context.hasWorkflowFailed()) {
                 onFailure(context);
             } else if (context.hasWorkflowSucceeded()) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index 629e6a5..dece932 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -18,10 +18,10 @@
 
 package org.apache.falcon.messaging;
 
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.retention.EvictedInstanceSerDe;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -35,7 +35,6 @@ import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.Session;
 import javax.jms.Topic;
-import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -184,7 +183,7 @@ public class JMSMessageProducer {
         String[] feedPaths;
         try {
             feedPaths = getFeedPaths();
-        } catch (IOException e) {
+        } catch (Exception e) {
             LOG.error("Error getting instance paths: ", e);
             throw new RuntimeException(e);
         }
@@ -248,7 +247,7 @@ public class JMSMessageProducer {
         message.put(key.getName(), value);
     }
 
-    private String[] getFeedPaths() throws IOException {
+    private String[] getFeedPaths() throws Exception {
         WorkflowExecutionContext.EntityOperations operation = context.getOperation();
         if (operation == WorkflowExecutionContext.EntityOperations.GENERATE
                 || operation == WorkflowExecutionContext.EntityOperations.REPLICATE) {
@@ -258,7 +257,7 @@ public class JMSMessageProducer {
 
         // else case of feed retention
         Path logFile = new Path(context.getLogFile());
-        FileSystem fs = FileSystem.get(logFile.toUri(), new Configuration());
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(logFile.toUri());
 
         if (!fs.exists(logFile)) {
             // Evictor Failed without deleting a single path

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index 1a08ada..4a22ff2 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -20,6 +20,7 @@ package org.apache.falcon.logging;
 
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -71,7 +72,7 @@ public class JobLogMover {
 
             Path path = new Path(context.getLogDir() + "/"
                     + String.format("%03d", context.getWorkflowRunId()));
-            FileSystem fs = path.getFileSystem(getConf());
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(path.toUri());
 
             if (EntityType.FEED.name().equalsIgnoreCase(context.getEntityType())
                     || notUserWorkflowEngineIsOozie(context.getUserWorkflowEngine())) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
index 4ed8f52..2e5dffb 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
@@ -53,7 +53,7 @@ public final class LogProvider {
         try {
             Configuration conf = ClusterHelper.getConfiguration(clusterObj);
             // fs on behalf of the end user.
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
             String resolvedRunId = getResolvedRunId(fs, clusterObj, entity, instance, runId);
             // if runId param is not resolved, i.e job is killed or not started or running
             if (resolvedRunId.equals("-")

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
index 82f7251..957300a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
@@ -18,7 +18,6 @@
 
 package org.apache.falcon.oozie;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
@@ -34,8 +33,6 @@ import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.oozie.client.OozieClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +42,6 @@ import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 import javax.xml.transform.stream.StreamSource;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Properties;
@@ -77,8 +73,6 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
         bundle.setName(EntityUtil.getWorkflowName(entity).toString());
         // all the properties are set prior to bundle and coordinators creation
 
-        createLogsDir(cluster, buildPath); //create logs dir
-
         for (Properties coordProps : coords) {
             // add the coordinator to the bundle
             COORDINATOR coord = new COORDINATOR();
@@ -133,23 +127,6 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
         return properties;
     }
 
-    private void createLogsDir(Cluster cluster, Path buildPath) throws FalconException {
-        try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(buildPath.toUri(),
-                ClusterHelper.getConfiguration(cluster));
-            Path logsDir = new Path(buildPath.getParent(), "logs");
-            if (!fs.mkdirs(logsDir)) {
-                throw new FalconException("Failed to create " + logsDir);
-            }
-
-            // logs are copied with in oozie as the user in Post Processing and hence 777 permissions
-            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-            fs.setPermission(logsDir, permission);
-        } catch (IOException e) {
-            throw new FalconException(e);
-        }
-    }
-
     protected Path marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
         return marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
             OozieUtils.BUNDLE_JAXB_CONTEXT, new Path(outPath, "bundle.xml"));
@@ -160,20 +137,17 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
     protected abstract List<Properties> buildCoords(Cluster cluster, Path bundlePath) throws FalconException;
 
     public static BUNDLEAPP unmarshal(Cluster cluster, Path path) throws FalconException {
-        InputStream resourceAsStream = null;
         try {
-            FileSystem fs =
-                HadoopClientFactory.get().createFileSystem(path.toUri(), ClusterHelper.getConfiguration(cluster));
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                        path.toUri(), ClusterHelper.getConfiguration(cluster));
             Unmarshaller unmarshaller = OozieUtils.BUNDLE_JAXB_CONTEXT.createUnmarshaller();
-            @SuppressWarnings("unchecked") JAXBElement<BUNDLEAPP> jaxbElement = (JAXBElement<BUNDLEAPP>)
-                unmarshaller.unmarshal(new StreamSource(fs.open(path)), BUNDLEAPP.class);
+            @SuppressWarnings("unchecked") JAXBElement<BUNDLEAPP> jaxbElement =
+                    unmarshaller.unmarshal(new StreamSource(fs.open(path)), BUNDLEAPP.class);
             return jaxbElement.getValue();
         } catch (JAXBException e) {
             throw new FalconException(e);
         } catch (IOException e) {
             throw new FalconException(e);
-        } finally {
-            IOUtils.closeQuietly(resourceAsStream);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index fe2136b..2ceb91e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -120,7 +120,8 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
         props.put(WorkflowExecutionArgs.TIMESTAMP.getName(), ACTUAL_TIME_EL);
         props.put("falconDataOperation", getOperation().name());
 
-        props.put(WorkflowExecutionArgs.LOG_DIR.getName(), getLogDirectory(cluster));
+        props.put(WorkflowExecutionArgs.LOG_DIR.getName(),
+                getStoragePath(EntityUtil.getLogPath(cluster, entity)));
         props.put(OozieClient.EXTERNAL_ID,
             new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
                 "${coord:nominalTime()}").getId());

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index 1c3085c..e341fb8 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -23,7 +23,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -127,8 +126,8 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
     }
 
-    protected Path marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
-        throws FalconException {
+    protected Path marshal(Cluster cluster, JAXBElement<?> jaxbElement,
+                           JAXBContext jaxbContext, Path outPath) throws FalconException {
         try {
             Marshaller marshaller = jaxbContext.createMarshaller();
             marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
@@ -140,8 +139,8 @@ public abstract class OozieEntityBuilder<T extends Entity> {
                 LOG.debug(writer.getBuffer().toString());
             }
 
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(
-                outPath.toUri(), ClusterHelper.getConfiguration(cluster));
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    outPath.toUri(), ClusterHelper.getConfiguration(cluster));
             OutputStream out = fs.create(outPath);
             try {
                 marshaller.marshal(jaxbElement, out);
@@ -261,8 +260,11 @@ public abstract class OozieEntityBuilder<T extends Entity> {
 
     protected void copySharedLibs(Cluster cluster, Path libPath) throws FalconException {
         try {
-            SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
-                libPath, cluster, FALCON_JAR_FILTER);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    libPath.toUri(), ClusterHelper.getConfiguration(cluster));
+            SharedLibraryHostingService.pushLibsToHDFS(
+                    fs, StartupProperties.get().getProperty("system.lib.location"),
+                    libPath, FALCON_JAR_FILTER);
         } catch (IOException e) {
             throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);
         }
@@ -279,16 +281,11 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         return prop;
     }
 
-    protected String getLogDirectory(Cluster cluster) {
-        return getStoragePath(new Path(EntityUtil.getBaseStagingPath(cluster, entity), "logs"));
-    }
-
     protected <T> T unmarshal(String template, JAXBContext context, Class<T> cls) throws FalconException {
         InputStream resourceAsStream = null;
         try {
             resourceAsStream = OozieEntityBuilder.class.getResourceAsStream(template);
             Unmarshaller unmarshaller = context.createUnmarshaller();
-            @SuppressWarnings("unchecked")
             JAXBElement<T> jaxbElement = unmarshaller.unmarshal(new StreamSource(resourceAsStream), cls);
             return jaxbElement.getValue();
         } catch (JAXBException e) {
@@ -310,5 +307,4 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         }
         throw new IllegalArgumentException("Unhandled type " + entity.getEntityType());
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 2339284..3a3e26e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -210,7 +210,8 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
 
     protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag) throws FalconException {
         String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                ClusterHelper.getConfiguration(cluster));
         try {
             addExtensionJars(fs, new Path(libext), wf);
             addExtensionJars(fs, new Path(libext, entity.getEntityType().name()), wf);
@@ -261,12 +262,13 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
     }
 
     // creates hive-site.xml configuration in conf dir for the given cluster on the same cluster.
-    protected void createHiveConfiguration(Cluster cluster, Path workflowPath, String prefix) throws FalconException {
+    protected void createHiveConfiguration(Cluster cluster, Path workflowPath,
+                                           String prefix) throws FalconException {
         Configuration hiveConf = getHiveCredentialsAsConf(cluster);
 
         try {
             Configuration conf = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
 
             // create hive conf to stagingDir
             Path confPath = new Path(workflowPath + "/conf");
@@ -277,8 +279,8 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         }
     }
 
-    private void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf, String prefix)
-        throws IOException {
+    private void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf,
+                                          String prefix) throws IOException {
         OutputStream out = null;
         try {
             out = fs.create(new Path(confPath, prefix + "hive-site.xml"));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 801d733..c5366dc 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -281,7 +281,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
     private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster,
                                         Path buildPath) throws FalconException {
         Configuration conf = ClusterHelper.getConfiguration(trgCluster);
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
 
         try {
             // copy import export scripts to stagingDir
@@ -298,7 +298,8 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
         }
     }
 
-    private void copyHiveScript(FileSystem fs, Path scriptPath, String resource) throws IOException {
+    private void copyHiveScript(FileSystem fs, Path scriptPath,
+                                String resource) throws IOException {
         OutputStream out = null;
         InputStream in = null;
         try {
@@ -312,7 +313,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
     }
 
     protected void persistHiveConfiguration(FileSystem fs, Path confPath,
-        Cluster cluster, String prefix) throws IOException {
+                                            Cluster cluster, String prefix) throws IOException {
         Configuration hiveConf = getHiveCredentialsAsConf(cluster);
         OutputStream out = null;
         try {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
index 8f97ffa..a38fdf6 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
@@ -120,7 +120,8 @@ public class ProcessBundleBuilder extends OozieBundleBuilder<Process> {
 
     private void copyUserWorkflow(Cluster cluster, Path buildPath) throws FalconException {
         try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    ClusterHelper.getConfiguration(cluster));
 
             //Copy user workflow and lib to staging dir
             Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getPath()),

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 865beaf..2700802 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -32,6 +32,7 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.CONFIGURATION;
@@ -221,7 +222,8 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
         }
 
         try {
-            final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
+            final FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    ClusterHelper.getConfiguration(cluster));
             if (fs.isFile(libPath)) {  // File, not a Dir
                 archiveList.add(libPath.toString());
                 return;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
index d273d61..9567c5f 100644
--- a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
+++ b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
@@ -28,7 +28,6 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -73,40 +72,35 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
         Path lib = new Path(ClusterHelper.getLocation(cluster, "working"), "lib");
         Path libext = new Path(ClusterHelper.getLocation(cluster, "working"), "libext");
         try {
+            FileSystem fs = HadoopClientFactory.get().createFalconFileSystem(
+                    ClusterHelper.getConfiguration(cluster));
+
             Properties properties = StartupProperties.get();
-            pushLibsToHDFS(properties.getProperty("system.lib.location"), lib, cluster, NON_FALCON_JAR_FILTER);
-            pushLibsToHDFS(properties.getProperty("libext.paths"), libext, cluster, null);
-            pushLibsToHDFS(properties.getProperty("libext.feed.paths"),
-                    new Path(libext, EntityType.FEED.name()) , cluster, null);
-            pushLibsToHDFS(properties.getProperty("libext.feed.replication.paths"),
-                    new Path(libext, EntityType.FEED.name() + "/replication"), cluster, null);
-            pushLibsToHDFS(properties.getProperty("libext.feed.retention.paths"),
-                    new Path(libext, EntityType.FEED.name() + "/retention"), cluster, null);
-            pushLibsToHDFS(properties.getProperty("libext.process.paths"),
-                    new Path(libext, EntityType.PROCESS.name()) , cluster, null);
+            pushLibsToHDFS(fs, properties.getProperty("system.lib.location"), lib,
+                    NON_FALCON_JAR_FILTER);
+            pushLibsToHDFS(fs, properties.getProperty("libext.paths"), libext, null);
+            pushLibsToHDFS(fs, properties.getProperty("libext.feed.paths"),
+                    new Path(libext, EntityType.FEED.name()) , null);
+            pushLibsToHDFS(fs, properties.getProperty("libext.feed.replication.paths"),
+                    new Path(libext, EntityType.FEED.name() + "/replication"), null);
+            pushLibsToHDFS(fs, properties.getProperty("libext.feed.retention.paths"),
+                    new Path(libext, EntityType.FEED.name() + "/retention"), null);
+            pushLibsToHDFS(fs, properties.getProperty("libext.process.paths"),
+                    new Path(libext, EntityType.PROCESS.name()) , null);
         } catch (IOException e) {
             throw new FalconException("Failed to copy shared libs to cluster" + cluster.getName(), e);
         }
     }
 
-    public static void pushLibsToHDFS(String src, Path target, Cluster cluster, FalconPathFilter pathFilter)
-        throws IOException, FalconException {
+    @SuppressWarnings("ConstantConditions")
+    public static void pushLibsToHDFS(FileSystem fs, String src, Path target,
+                                      FalconPathFilter pathFilter) throws IOException, FalconException {
         if (StringUtils.isEmpty(src)) {
             return;
         }
 
         LOG.debug("Copying libs from {}", src);
-        FileSystem fs;
-        try {
-            fs = getFileSystem(cluster);
-            fs.getConf().set("dfs.umaskmode", "022");  // drwxr-xr-x
-        } catch (Exception e) {
-            throw new FalconException("Unable to connect to HDFS: "
-                    + ClusterHelper.getStorageUrl(cluster), e);
-        }
-        if (!fs.exists(target) && !fs.mkdirs(target)) {
-            throw new FalconException("mkdir " + target + " failed");
-        }
+        createTargetPath(fs, target);
 
         for(String srcPaths : src.split(",")) {
             File srcFile = new File(srcPaths);
@@ -133,18 +127,20 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
                     }
                 }
                 fs.copyFromLocalFile(false, true, new Path(file.getAbsolutePath()), targetFile);
-                LOG.info("Copied {} to {} in {}", file.getAbsolutePath(), targetFile.toString(), fs.getUri());
+                fs.setPermission(targetFile, HadoopClientFactory.READ_EXECUTE_PERMISSION);
+                LOG.info("Copied {} to {} in {}",
+                        file.getAbsolutePath(), targetFile.toString(), fs.getUri());
             }
         }
     }
 
-    // the dir is owned by Falcon but world-readable
-    private static FileSystem getFileSystem(Cluster cluster)
-        throws FalconException, IOException {
-        Configuration conf = ClusterHelper.getConfiguration(cluster);
-        conf.setInt("ipc.client.connect.max.retries", 10);
-
-        return HadoopClientFactory.get().createFileSystem(conf);
+    private static void createTargetPath(FileSystem fs,
+                                         Path target) throws IOException, FalconException {
+        // the dir and files MUST be readable by all users
+        if (!fs.exists(target)
+                && !FileSystem.mkdirs(fs, target, HadoopClientFactory.READ_EXECUTE_PERMISSION)) {
+            throw new FalconException("mkdir " + target + " failed");
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/15b89bc3/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
index bbed949..54cab51 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
@@ -58,7 +58,7 @@ public class OozieHouseKeepingService implements WorkflowEngineActionListener {
             LOG.info("Deleting entity path {} on cluster {}", entityPath, clusterName);
 
             Configuration conf = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
             if (fs.exists(entityPath) && !fs.delete(entityPath, true)) {
                 throw new FalconException("Unable to cleanup entity path: " + entityPath);
             }