You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/02/16 04:31:23 UTC

[2/5] FALCON-11 Add support for security in Falcon. Contributed by Venkatesh Seetharam

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index 87be709..6d0297e 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -41,6 +41,7 @@ import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Property;
 import org.apache.falcon.entity.v0.process.Workflow;
 import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
 import org.apache.falcon.oozie.coordinator.CONTROLS;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
@@ -84,20 +85,10 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         super(entity);
     }
 
-    private void mkdir(FileSystem fs, Path path) throws FalconException {
-        try {
-            if (!fs.exists(path) && !fs.mkdirs(path)) {
-                throw new FalconException("mkdir failed for " + path);
-            }
-        } catch (IOException e) {
-            throw new FalconException("mkdir failed for " + path, e);
-        }
-    }
-
     @Override
     protected List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
         try {
-            FileSystem fs = ClusterHelper.getFileSystem(cluster);
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
             Process process = getEntity();
 
             //Copy user workflow and lib to staging dir
@@ -136,11 +127,11 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
 
     private Path getUserWorkflowPath(Cluster cluster, Path bundlePath) throws FalconException {
         try {
-            FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
             Process process = getEntity();
             Path wfPath = new Path(process.getWorkflow().getPath());
             if (fs.isFile(wfPath)) {
-                return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName().toString());
+                return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName());
             } else {
                 return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR);
             }
@@ -151,14 +142,15 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
 
     private Path getUserLibPath(Cluster cluster, Path bundlePath) throws FalconException {
         try {
-            FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
             Process process = getEntity();
             if (process.getWorkflow().getLib() == null) {
                 return null;
             }
             Path libPath = new Path(process.getWorkflow().getLib());
+
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
             if (fs.isFile(libPath)) {
-                return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName().toString());
+                return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
             } else {
                 return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR);
             }
@@ -516,9 +508,9 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
             if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
                 action.getSubWorkflow().setAppPath("${nameNode}" + userWfPath);
             } else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
-                decoratePIGAction(cluster, process, processWorkflow, action.getPig(), parentWfPath);
+                decoratePIGAction(cluster, process, action.getPig(), parentWfPath);
             } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
-                decorateHiveAction(cluster, process, processWorkflow, action, parentWfPath);
+                decorateHiveAction(cluster, process, action, parentWfPath);
             } else if (FALCON_ACTIONS.contains(actionName)) {
                 decorateWithOozieRetries(action);
             }
@@ -529,7 +521,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
     }
 
     private void decoratePIGAction(Cluster cluster, Process process,
-                                   Workflow processWorkflow, PIG pigAction, Path parentWfPath) throws FalconException {
+                                   PIG pigAction, Path parentWfPath) throws FalconException {
         Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
         pigAction.setScript("${nameNode}" + userWfPath.toString());
 
@@ -548,11 +540,11 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
             pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
         }
 
-        addArchiveForCustomJars(cluster, processWorkflow, pigAction.getArchive(),
+        addArchiveForCustomJars(cluster, pigAction.getArchive(),
                 getUserLibPath(cluster, parentWfPath.getParent()));
     }
 
-    private void decorateHiveAction(Cluster cluster, Process process, Workflow processWorkflow, ACTION wfAction,
+    private void decorateHiveAction(Cluster cluster, Process process, ACTION wfAction,
                                     Path parentWfPath) throws FalconException {
 
         JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = unMarshalHiveAction(wfAction);
@@ -571,7 +563,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
 
         setupHiveConfiguration(cluster, parentWfPath, "falcon-");
 
-        addArchiveForCustomJars(cluster, processWorkflow, hiveAction.getArchive(),
+        addArchiveForCustomJars(cluster, hiveAction.getArchive(),
                 getUserLibPath(cluster, parentWfPath.getParent()));
 
         marshalHiveAction(wfAction, actionJaxbElement);
@@ -750,16 +742,16 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
                                         String prefix) throws FalconException {
         String catalogUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
         try {
-            FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
             Path confPath = new Path(wfPath, "conf");
-            createHiveConf(fs, confPath, catalogUrl, prefix);
+            createHiveConf(fs, confPath, catalogUrl, cluster, prefix);
         } catch (IOException e) {
             throw new FalconException(e);
         }
     }
 
-    private void addArchiveForCustomJars(Cluster cluster, Workflow processWorkflow,
-                                         List<String> archiveList, Path libPath) throws FalconException {
+    private void addArchiveForCustomJars(Cluster cluster, List<String> archiveList,
+                                         Path libPath) throws FalconException {
         if (libPath == null) {
             return;
         }
@@ -814,5 +806,4 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
             throw new RuntimeException("Unable to marshall hive action.", e);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/process/src/main/resources/config/workflow/process-parent-workflow.xml
----------------------------------------------------------------------
diff --git a/process/src/main/resources/config/workflow/process-parent-workflow.xml b/process/src/main/resources/config/workflow/process-parent-workflow.xml
index 494bf20..f53c1e7 100644
--- a/process/src/main/resources/config/workflow/process-parent-workflow.xml
+++ b/process/src/main/resources/config/workflow/process-parent-workflow.xml
@@ -189,6 +189,8 @@
             <arg>${userWorkflowEngine}</arg>
             <arg>-logDir</arg>
             <arg>${logDir}/job-${nominalTime}/</arg>
+            <arg>-workflowUser</arg>
+            <arg>${wf:user()}</arg>
             <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
             <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
             <file>${wf:conf("falcon.libpath")}/jms.jar</file>
@@ -256,6 +258,8 @@
             <arg>${userWorkflowEngine}</arg>
             <arg>-logDir</arg>
             <arg>${logDir}/job-${nominalTime}/</arg>
+            <arg>-workflowUser</arg>
+            <arg>${wf:user()}</arg>
             <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
             <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
             <file>${wf:conf("falcon.libpath")}/jms.jar</file>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index 61ddbdc..b4c059a 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -46,6 +46,7 @@ import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.DECISION;
 import org.apache.falcon.oozie.workflow.PIG;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -82,6 +83,8 @@ public class OozieProcessMapperTest extends AbstractTestBase {
 
     @BeforeClass
     public void setUpDFS() throws Exception {
+        CurrentUser.authenticate("falcon");
+
         EmbeddedCluster cluster = EmbeddedCluster.newCluster("testCluster");
         Configuration conf = cluster.getConf();
         hdfsUrl = conf.get("fs.default.name");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index 4b35760..f204b15 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -25,11 +25,14 @@ import org.apache.falcon.catalog.CatalogServiceFactory;
 import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.Storage;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
@@ -125,10 +128,15 @@ public class LateDataHandler extends Configured implements Tool {
         return computedMetrics;
     }
 
-    private void persistMetrics(Map<String, Long> metrics, Path file) throws IOException {
+    private void persistMetrics(Map<String, Long> metrics, Path file) throws IOException, FalconException {
         OutputStream out = null;
         try {
-            out = file.getFileSystem(getConf()).create(file);
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(file.toUri(), getConf());
+            out = fs.create(file);
+
+            // making sure falcon can read this file
+            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+            fs.setPermission(file, permission);
 
             for (Map.Entry<String, Long> entry : metrics.entrySet()) {
                 out.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
@@ -191,7 +199,7 @@ public class LateDataHandler extends Configured implements Tool {
      * @throws IOException
      */
     private long getFileSystemUsageMetric(String pathGroup, Configuration conf)
-        throws IOException {
+        throws IOException, FalconException {
         long usage = 0;
         for (String pathElement : pathGroup.split(",")) {
             Path inPath = new Path(pathElement);
@@ -201,8 +209,8 @@ public class LateDataHandler extends Configured implements Tool {
         return usage;
     }
 
-    private long usage(Path inPath, Configuration conf) throws IOException {
-        FileSystem fs = inPath.getFileSystem(conf);
+    private long usage(Path inPath, Configuration conf) throws IOException, FalconException {
+        FileSystem fs = HadoopClientFactory.get().createFileSystem(inPath.toUri(), conf);
         FileStatus[] fileStatuses = fs.globStatus(inPath);
         if (fileStatuses == null || fileStatuses.length == 0) {
             return 0;
@@ -251,8 +259,8 @@ public class LateDataHandler extends Configured implements Tool {
         throws Exception {
 
         StringBuilder buffer = new StringBuilder();
-        BufferedReader in = new BufferedReader(new InputStreamReader(
-                file.getFileSystem(conf).open(file)));
+        FileSystem fs = HadoopClientFactory.get().createFileSystem(file.toUri(), conf);
+        BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(file)));
         String line;
         try {
             Map<String, Long> recordedMetrics = new LinkedHashMap<String, Long>();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
index b5ac121..2b52762 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
@@ -23,10 +23,11 @@ package org.apache.falcon.rerun.event;
 public class LaterunEvent extends RerunEvent {
 
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
-    public LaterunEvent(String clusterName, String wfId, long msgInsertTime, long delay,
-                        String entityType, String entityName, String instance, int runId) {
+    public LaterunEvent(String clusterName, String wfId, long msgInsertTime,
+                        long delay, String entityType, String entityName,
+                        String instance, int runId, String workflowUser) {
         super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
-                instance, runId);
+                instance, runId, workflowUser);
     }
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
@@ -37,6 +38,6 @@ public class LaterunEvent extends RerunEvent {
                 + "msgInsertTime=" + msgInsertTime + SEP + "delayInMilliSec="
                 + delayInMilliSec + SEP + "entityType=" + entityType + SEP
                 + "entityName=" + entityName + SEP + "instance=" + instance
-                + SEP + "runId=" + runId;
+                + SEP + "runId=" + runId + SEP + "workflowUser=" + workflowUser;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
index baf4601..254f285 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
@@ -38,6 +38,7 @@ public class RerunEvent implements Delayed {
 
     protected String clusterName;
     protected String wfId;
+    protected String workflowUser;
     protected long msgInsertTime;
     protected long delayInMilliSec;
     protected String entityType;
@@ -47,9 +48,10 @@ public class RerunEvent implements Delayed {
 
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public RerunEvent(String clusterName, String wfId, long msgInsertTime, long delay,
-                      String entityType, String entityName, String instance, int runId) {
+                      String entityType, String entityName, String instance, int runId, String workflowUser) {
         this.clusterName = clusterName;
         this.wfId = wfId;
+        this.workflowUser = workflowUser;
         this.msgInsertTime = msgInsertTime;
         this.delayInMilliSec = delay;
         this.entityName = entityName;
@@ -67,6 +69,10 @@ public class RerunEvent implements Delayed {
         return wfId;
     }
 
+    public String getWorkflowUser() {
+        return workflowUser;
+    }
+
     public long getDelayInMilliSec() {
         return delayInMilliSec;
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
index 03230f9..c2a8fe2 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
@@ -45,7 +45,7 @@ public class RerunEventFactory<T extends RerunEvent> {
         return (T) new LaterunEvent(map.get("clusterName"), map.get("wfId"),
                 Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map.get("delayInMilliSec")),
                 map.get("entityType"), map.get("entityName"), map.get("instance"),
-                Integer.parseInt(map.get("runId")));
+                Integer.parseInt(map.get("runId")), map.get("workflowUser"));
     }
 
     @SuppressWarnings("unchecked")
@@ -55,7 +55,7 @@ public class RerunEventFactory<T extends RerunEvent> {
                 Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map.get("delayInMilliSec")),
                 map.get("entityType"), map.get("entityName"), map.get("instance"),
                 Integer.parseInt(map.get("runId")), Integer.parseInt(map.get("attempts")),
-                Integer.parseInt(map.get("failRetryCount")));
+                Integer.parseInt(map.get("failRetryCount")), map.get("workflowUser"));
     }
 
     private Map<String, String> getMap(String message) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
index 1396f19..b5312a6 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
@@ -28,9 +28,9 @@ public class RetryEvent extends RerunEvent {
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public RetryEvent(String clusterName, String wfId, long msgInsertTime,
                       long delay, String entityType, String entityName, String instance,
-                      int runId, int attempts, int failRetryCount) {
+                      int runId, int attempts, int failRetryCount, String workflowUser) {
         super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
-                instance, runId);
+                instance, runId, workflowUser);
         this.attempts = attempts;
         this.failRetryCount = failRetryCount;
     }
@@ -56,7 +56,7 @@ public class RetryEvent extends RerunEvent {
                 + delayInMilliSec + SEP + "entityType=" + entityType + SEP
                 + "entityName=" + entityName + SEP + "instance=" + instance
                 + SEP + "runId=" + runId + SEP + "attempts=" + attempts + SEP
-                + "failRetryCount=" + failRetryCount;
+                + "failRetryCount=" + failRetryCount + SEP + "workflowUser=" + workflowUser;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
index b073117..ca2304e 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
@@ -24,6 +24,7 @@ import org.apache.falcon.rerun.event.RerunEvent;
 import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
 import org.apache.falcon.rerun.policy.ExpBackoffPolicy;
 import org.apache.falcon.rerun.queue.DelayedQueue;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.log4j.Logger;
 
 /**
@@ -51,7 +52,7 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
         Frequency frequency = new Frequency("minutes(1)");
         while (true) {
             try {
-                T message = null;
+                T message;
                 try {
                     message = handler.takeFromQueue();
                     attempt = 1;
@@ -64,6 +65,8 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
                     attempt++;
                     continue;
                 }
+
+                CurrentUser.authenticate(message.getWorkflowUser());
                 String jobStatus = handler.getWfEngine().getWorkflowStatus(
                         message.getClusterName(), message.getWfId());
                 handleRerun(message.getClusterName(), jobStatus, message);
@@ -72,8 +75,7 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
                 LOG.error("Error in rerun consumer:", e);
             }
         }
-
     }
 
-    protected abstract void handleRerun(String cluster, String jobStatus, T message);
+    protected abstract void handleRerun(String clusterName, String jobStatus, T message);
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index ab7f472..0333918 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -46,8 +46,11 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay
         this.delayQueue.init();
     }
 
-    public abstract void handleRerun(String cluster, String entityType, String entityName,
-                                     String nominalTime, String runId, String wfId, long msgReceivedTime);
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    public abstract void handleRerun(String clusterName, String entityType,
+                                     String entityName, String nominalTime, String runId,
+                                     String wfId, String workflowUser, long msgReceivedTime);
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     public AbstractWorkflowEngine getWfEngine() {
         return wfEngine;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index fffd5cd..17f4337 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -22,6 +22,7 @@ import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.process.LateInput;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.latedata.LateDataHandler;
 import org.apache.falcon.rerun.event.LaterunEvent;
 import org.apache.falcon.rerun.queue.DelayedQueue;
@@ -45,7 +46,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
     }
 
     @Override
-    protected void handleRerun(String cluster, String jobStatus,
+    protected void handleRerun(String clusterName, String jobStatus,
                                LaterunEvent message) {
         try {
             if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")
@@ -65,10 +66,9 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
                         + message.getWfId()
                         + " at "
                         + SchemaHelper.formatDateUTC(new Date()));
-                handler.handleRerun(cluster, message.getEntityType(),
-                        message.getEntityName(), message.getInstance(),
-                        Integer.toString(message.getRunId()),
-                        message.getWfId(), System.currentTimeMillis());
+                handler.handleRerun(clusterName, message.getEntityType(), message.getEntityName(),
+                        message.getInstance(), Integer.toString(message.getRunId()),
+                        message.getWfId(), message.getWorkflowUser(), System.currentTimeMillis());
                 return;
             }
 
@@ -78,18 +78,14 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
             LOG.info("Scheduled late rerun for wf-id: " + message.getWfId()
                     + " on cluster: " + message.getClusterName());
         } catch (Exception e) {
-            LOG.warn(
-                    "Late Re-run failed for instance "
+            LOG.warn("Late Re-run failed for instance "
                             + message.getEntityName() + ":"
                             + message.getInstance() + " after "
-                            + message.getDelayInMilliSec() + " with message:",
-                    e);
-            GenericAlert.alertLateRerunFailed(message.getEntityType(),
-                    message.getEntityName(), message.getInstance(),
-                    message.getWfId(), Integer.toString(message.getRunId()),
-                    e.getMessage());
+                            + message.getDelayInMilliSec() + " with message:", e);
+            GenericAlert.alertLateRerunFailed(message.getEntityType(), message.getEntityName(),
+                    message.getInstance(), message.getWfId(), message.getWorkflowUser(),
+                    Integer.toString(message.getRunId()), e.getMessage());
         }
-
     }
 
     public String detectLate(LaterunEvent message) throws Exception {
@@ -106,7 +102,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
 
         final String storageEndpoint = properties.getProperty(AbstractWorkflowEngine.NAME_NODE);
         Configuration conf = LateRerunHandler.getConfiguration(storageEndpoint);
-        FileSystem fs = FileSystem.get(conf);
+        FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
         if (!fs.exists(lateLogPath)) {
             LOG.warn("Late log file:" + lateLogPath + " not found:");
             return "";

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 897e7ab..72f93cb 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -29,13 +29,13 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.*;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.rerun.event.LaterunEvent;
 import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
 import org.apache.falcon.rerun.policy.RerunPolicyFactory;
 import org.apache.falcon.rerun.queue.DelayedQueue;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -50,8 +50,9 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
         AbstractRerunHandler<LaterunEvent, M> {
 
     @Override
-    public void handleRerun(String cluster, String entityType, String entityName,
-                            String nominalTime, String runId, String wfId, long msgReceivedTime) {
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    public void handleRerun(String cluster, String entityType, String entityName, String nominalTime,
+                            String runId, String wfId, String workflowUser, long msgReceivedTime) {
         try {
             Entity entity = EntityUtil.getEntity(entityType, entityName);
             try {
@@ -66,6 +67,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
                 LOG.error("Unable to get Late Process for entity:" + entityName);
                 return;
             }
+
             int intRunId = Integer.parseInt(runId);
             Date msgInsertTime = EntityUtil.parseDateUTC(nominalTime);
             Long wait = getEventDelay(entity, nominalTime);
@@ -81,7 +83,8 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
 
                 LOG.info("Going to delete path:" + lateLogPath);
                 final String storageEndpoint = properties.getProperty(AbstractWorkflowEngine.NAME_NODE);
-                FileSystem fs = FileSystem.get(getConfiguration(storageEndpoint));
+                Configuration conf = getConfiguration(storageEndpoint);
+                FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
                 if (fs.exists(lateLogPath)) {
                     boolean deleted = fs.delete(lateLogPath, true);
                     if (deleted) {
@@ -95,16 +98,17 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
                     + entityType + "(" + entityName + ")" + ":" + nominalTime
                     + " And WorkflowId: " + wfId);
             LaterunEvent event = new LaterunEvent(cluster, wfId, msgInsertTime.getTime(),
-                    wait, entityType, entityName, nominalTime, intRunId);
+                    wait, entityType, entityName, nominalTime, intRunId, workflowUser);
             offerToQueue(event);
         } catch (Exception e) {
             LOG.error("Unable to schedule late rerun for entity instance : "
                     + entityType + "(" + entityName + ")" + ":" + nominalTime
                     + " And WorkflowId: " + wfId, e);
             GenericAlert.alertLateRerunFailed(entityType, entityName,
-                    nominalTime, wfId, runId, e.getMessage());
+                    nominalTime, wfId, workflowUser, runId, e.getMessage());
         }
     }
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
 
     private long getEventDelay(Entity entity, String nominalTime) throws FalconException {
 
@@ -217,7 +221,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
 
     public static Configuration getConfiguration(String storageEndpoint) throws FalconException {
         Configuration conf = new Configuration();
-        conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, storageEndpoint);
+        conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageEndpoint);
         return conf;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index 63dade8..bb0b34a 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -38,7 +38,7 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
     }
 
     @Override
-    protected void handleRerun(String cluster, String jobStatus,
+    protected void handleRerun(String clusterName, String jobStatus,
                                RetryEvent message) {
         try {
             if (!jobStatus.equals("KILLED")) {
@@ -80,7 +80,7 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
                     LOG.error("Unable to re-offer to queue:", ex);
                     GenericAlert.alertRetryFailed(message.getEntityType(),
                             message.getEntityName(), message.getInstance(),
-                            message.getWfId(),
+                            message.getWfId(), message.getWorkflowUser(),
                             Integer.toString(message.getRunId()),
                             ex.getMessage());
                 }
@@ -91,7 +91,7 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
                                 + message.getInstance(), e);
                 GenericAlert.alertRetryFailed(message.getEntityType(),
                         message.getEntityName(), message.getInstance(),
-                        message.getWfId(),
+                        message.getWfId(), message.getWorkflowUser(),
                         Integer.toString(message.getRunId()),
                         "Failure retry attempts exhausted");
             }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index 2b41a7c..ef49c3a 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -38,8 +38,9 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
         AbstractRerunHandler<RetryEvent, M> {
 
     @Override
-    public void handleRerun(String cluster, String entityType, String entityName,
-                            String nominalTime, String runId, String wfId, long msgReceivedTime) {
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    public void handleRerun(String clusterName, String entityType, String entityName, String nominalTime,
+                            String runId, String wfId, String workflowUser, long msgReceivedTime) {
         try {
             Entity entity = getEntity(entityType, entityName);
             Retry retry = getRetry(entity);
@@ -58,9 +59,9 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
             if (attempts > intRunId) {
                 AbstractRerunPolicy rerunPolicy = RerunPolicyFactory.getRetryPolicy(policy);
                 long delayTime = rerunPolicy.getDelay(delay, Integer.parseInt(runId));
-                RetryEvent event = new RetryEvent(cluster, wfId,
+                RetryEvent event = new RetryEvent(clusterName, wfId,
                         msgReceivedTime, delayTime, entityType, entityName,
-                        nominalTime, intRunId, attempts, 0);
+                        nominalTime, intRunId, attempts, 0, workflowUser);
                 offerToQueue(event);
             } else {
                 LOG.warn("All retry attempt failed out of configured: "
@@ -69,15 +70,17 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
                         + wfId);
 
                 GenericAlert.alertRetryFailed(entityType, entityName,
-                        nominalTime, wfId, runId,
+                        nominalTime, wfId, workflowUser, runId,
                         "All retry attempt failed out of configured: "
                                 + attempts + " attempt for entity instance::");
             }
         } catch (FalconException e) {
             LOG.error("Error during retry of entity instance " + entityName + ":" + nominalTime, e);
-            GenericAlert.alertRetryFailed(entityType, entityName, nominalTime, wfId, runId, e.getMessage());
+            GenericAlert.alertRetryFailed(entityType, entityName, nominalTime,
+                    wfId, workflowUser, runId, e.getMessage());
         }
     }
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     @Override
     public void init(M aDelayQueue) throws FalconException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
index 8234d8a..bc7c999 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
@@ -109,9 +109,8 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> {
         if (!retryFile.exists()) {
             LOG.warn("Rerun file deleted or renamed for process-instance: "
                     + event.getEntityName() + ":" + event.getInstance());
-            GenericAlert.alertRetryFailed(event.getEntityType(),
-                    event.getEntityName(), event.getInstance(),
-                    event.getWfId(), Integer.toString(event.getRunId()),
+            GenericAlert.alertRetryFailed(event.getEntityType(), event.getEntityName(), event.getInstance(),
+                    event.getWfId(), event.getWorkflowUser(), Integer.toString(event.getRunId()),
                     "Rerun file deleted or renamed for process-instance:");
         } else {
             if (!retryFile.delete()) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateRerunHandler.java b/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateRerunHandler.java
index e02b495..8137f60 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateRerunHandler.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateRerunHandler.java
@@ -18,12 +18,12 @@
 
 package org.apache.falcon.rerun.handler;
 
-import junit.framework.Assert;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LateArrival;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import java.util.Date;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
index 01d0415..6b6b834 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
@@ -51,7 +51,7 @@ public class ActiveMQTest {
 
         RerunEvent event = new LaterunEvent("clusterName", "wfId",
                 System.currentTimeMillis(), 60 * 1000, "entityType",
-                "entityName", "instance", 0);
+                "entityName", "instance", 0, "falcon");
 
         try {
             activeMQueue.offer(event);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
index 6aafaa5..8508d37 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
@@ -45,7 +45,7 @@ public class InMemoryQueueTest {
             long time = System.currentTimeMillis();
             int delay = ((5 - index) / 2) * 50;
             MyEvent event = new MyEvent("someCluster", Integer.toString(index),
-                    time, delay, "someType", "someName", "someInstance", 0);
+                    time, delay, "someType", "someName", "someInstance", 0, "falcon");
             queue.offer(event);
             boolean inserted = false;
             for (int posn = 0; posn < events.size(); posn++) {
@@ -73,9 +73,9 @@ public class InMemoryQueueTest {
         //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
         public MyEvent(String clusterName, String wfId,
                        long msgInsertTime, long delay, String entityType,
-                       String entityName, String instance, int runId) {
+                       String entityName, String instance, int runId, String workflowUser) {
             super(clusterName, wfId, msgInsertTime, delay,
-                    entityType, entityName, instance, runId);
+                    entityType, entityName, instance, runId, workflowUser);
         }
         //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/src/bin/falcon
----------------------------------------------------------------------
diff --git a/src/bin/falcon b/src/bin/falcon
index d196a5d..113c9a7 100755
--- a/src/bin/falcon
+++ b/src/bin/falcon
@@ -30,4 +30,4 @@ BASEDIR=`dirname ${PRG}`
 BASEDIR=`cd ${BASEDIR}/..;pwd`
 . ${BASEDIR}/bin/falcon-config.sh 'client'
 
-${JAVA_BIN} -cp ${FALCONCPPATH} org.apache.falcon.cli.FalconCLI "${@}"
+${JAVA_BIN} -cp ${FALCONCPPATH} -Dfalcon.log.dir=$HOME -Dfalcon.app.type=client org.apache.falcon.cli.FalconCLI "${@}"

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/src/conf/log4j.xml
----------------------------------------------------------------------
diff --git a/src/conf/log4j.xml b/src/conf/log4j.xml
index 58ebd80..90abe26 100644
--- a/src/conf/log4j.xml
+++ b/src/conf/log4j.xml
@@ -51,6 +51,15 @@
         </layout>
     </appender>
 
+    <appender name="SECURITY" class="org.apache.log4j.DailyRollingFileAppender">
+        <param name="File" value="${falcon.log.dir}/${falcon.app.type}.security.audit.log"/>
+        <param name="Append" value="true"/>
+        <param name="Threshold" value="debug"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %x %m%n"/>
+        </layout>
+    </appender>
+
     <logger name="org.apache.falcon" additivity="false">
         <level value="debug"/>
         <appender-ref ref="FILE"/>
@@ -66,6 +75,26 @@
         <appender-ref ref="METRIC"/>
     </logger>
 
+    <logger name="org.apache.hadoop.security" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="SECURITY"/>
+    </logger>
+
+    <logger name="org.apache.hadoop" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="FILE"/>
+    </logger>
+
+    <logger name="org.apache.oozie" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="FILE"/>
+    </logger>
+
+    <logger name="org.apache.hadoop.hive" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="FILE"/>
+    </logger>
+
     <root>
         <priority value="info"/>
         <appender-ref ref="FILE"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 79cd211..0d0ab41 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -32,8 +32,9 @@
 *.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager
 *.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService
 
-*.application.services=org.apache.falcon.entity.store.ConfigurationStore,\
+*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
                         org.apache.falcon.service.ProcessSubscriberService,\
+                        org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.rerun.service.RetryService,\
 						org.apache.falcon.rerun.service.LateRunService,\
 						org.apache.falcon.service.SLAMonitoringService,\
@@ -51,18 +52,79 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 
 ######### Implementation classes #########
 
+
+######### System startup parameters #########
+
+# Location to store user entity configurations
 *.config.store.uri=file://${falcon.home}/store
+
+# Location of libraries that is shipped to Hadoop
 *.system.lib.location=${falcon.home}/server/webapp/falcon/WEB-INF/lib
 prism.system.lib.location=${falcon.home}/server/webapp/prism/WEB-INF/lib
-*.broker.url=tcp://localhost:61616
+
 *.retry.recorder.path=${falcon.log.dir}/retry
 
 *.falcon.cleanup.service.frequency=days(1)
 
-#default time-to-live for a JMS message 3 days (time in minutes)
+
+######### Properties for configuring JMS provider - activemq #########
+# Default Active MQ url
+*.broker.url=tcp://localhost:61616
+
+# default time-to-live for a JMS message 3 days (time in minutes)
 *.broker.ttlInMins=4320
 *.entity.topic=FALCON.ENTITY.TOPIC
 *.max.retry.failure.count=1
 
 ######### Properties for configuring iMon client and metric #########
 *.internal.queue.size=1000
+
+
+######### Authentication Properties #########
+
+# Authentication type must be specified: simple|kerberos
+*.falcon.authentication.type=simple
+
+##### Service Configuration
+
+# Indicates the Kerberos principal to be used in Falcon Service.
+*.falcon.service.authentication.kerberos.principal=
+
+# Location of the keytab file with the credentials for the Service principal.
+*.falcon.service.authentication.kerberos.keytab=
+
+# name node principal to talk to config store
+*.dfs.namenode.kerberos.principal=
+
+##### SPNEGO Configuration
+
+# Authentication type must be specified: simple|kerberos|<class>
+# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility
+*.falcon.http.authentication.type=simple
+
+# Indicates how long (in seconds) an authentication token is valid before it has to be renewed.
+*.falcon.http.authentication.token.validity=36000
+
+# The signature secret for signing the authentication tokens.
+*.falcon.http.authentication.signature.secret=falcon
+
+# The domain to use for the HTTP cookie that stores the authentication token.
+*.falcon.http.authentication.cookie.domain=
+
+# Indicates if anonymous requests are allowed when using 'simple' authentication.
+*.falcon.http.authentication.simple.anonymous.allowed=true
+
+# Indicates the Kerberos principal to be used for HTTP endpoint.
+# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
+*.falcon.http.authentication.kerberos.principal=
+
+# Location of the keytab file with the credentials for the HTTP principal.
+*.falcon.http.authentication.kerberos.keytab=
+
+# The kerberos names rules is to resolve kerberos principal names, refer to Hadoop's KerberosName for more details.
+*.falcon.http.authentication.kerberos.name.rules=DEFAULT
+
+# Comma separated list of black listed users
+*.falcon.http.authentication.blacklisted.users=
+
+######### Authentication Properties #########

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
index 2b55407..af29f93 100644
--- a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
@@ -43,7 +43,6 @@ public class EmbeddedCluster {
     protected EmbeddedCluster() {
     }
 
-    //private MiniDFSCluster dfsCluster;
     protected Configuration conf = newConfiguration();
     protected Cluster clusterEntity;
 
@@ -81,6 +80,7 @@ public class EmbeddedCluster {
         cluster.conf.set("jail.base", System.getProperty("hadoop.tmp.dir",
                 cluster.conf.get("hadoop.tmp.dir", "/tmp")));
         cluster.conf.set("fs.default.name", "jail://" + (global ? "global" : name) + ":00");
+
         String hdfsUrl = cluster.conf.get("fs.default.name");
         LOG.info("Cluster Namenode = " + hdfsUrl);
         cluster.buildClusterObject(name);
@@ -133,17 +133,9 @@ public class EmbeddedCluster {
     }
 
     public void shutdown() {
-        //dfsCluster.shutdown();
     }
 
     public Cluster getCluster() {
         return clusterEntity;
     }
-
-    public Cluster clone(String cloneName) {
-        EmbeddedCluster clone = new EmbeddedCluster();
-        clone.conf = this.conf;
-        clone.buildClusterObject(cloneName);
-        return clone.clusterEntity;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 8c37409..0c9e601 100644
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -207,6 +207,11 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.hcatalog</groupId>
+            <artifactId>webhcat-java-client</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.falcon</groupId>
             <artifactId>falcon-hadoop-webapp</artifactId>
             <type>war</type>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/src/conf/oozie/conf/oozie-site.xml
----------------------------------------------------------------------
diff --git a/webapp/src/conf/oozie/conf/oozie-site.xml b/webapp/src/conf/oozie/conf/oozie-site.xml
index e5f404a..5f644a2 100644
--- a/webapp/src/conf/oozie/conf/oozie-site.xml
+++ b/webapp/src/conf/oozie/conf/oozie-site.xml
@@ -304,7 +304,7 @@
 
     <property>
         <name>oozie.authentication.simple.anonymous.allowed</name>
-        <value>true</value>
+        <value>false</value>
         <description>
             Indicates if anonymous requests are allowed.
             This setting is meaningful only when using 'simple' authentication.
@@ -485,51 +485,11 @@
     <!-- Proxyuser Configuration -->
     <property>
         <name>oozie.service.ProxyUserService.proxyuser.${user.name}.hosts</name>
-        <value>localhost</value>
-        <description></description>
-    </property>
-    <property>
-        <name>oozie.service.ProxyUserService.proxyuser.${user.name}.groups</name>
-        <value>users</value>
-        <description></description>
-    </property>
-
-    <!--
-
-    <property>
-        <name>oozie.service.ProxyUserService.proxyuser.#USER#.hosts</name>
         <value>*</value>
-        <description>
-            List of hosts the '#USER#' user is allowed to perform 'doAs'
-            operations.
-
-            The '#USER#' must be replaced with the username o the user who is
-            allowed to perform 'doAs' operations.
-
-            The value can be the '*' wildcard or a list of hostnames.
-
-            For multiple users copy this property and replace the user name
-            in the property name.
-        </description>
     </property>
-
     <property>
-        <name>oozie.service.ProxyUserService.proxyuser.#USER#.groups</name>
+        <name>oozie.service.ProxyUserService.proxyuser.${user.name}.groups</name>
         <value>*</value>
-        <description>
-            List of groups the '#USER#' user is allowed to impersonate users
-            from to perform 'doAs' operations.
-
-            The '#USER#' must be replaced with the username o the user who is
-            allowed to perform 'doAs' operations.
-
-            The value can be the '*' wildcard or a list of groups.
-
-            For multiple users copy this property and replace the user name
-            in the property name.
-        </description>
     </property>
 
-    -->
-
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/resources/log4j.xml b/webapp/src/main/resources/log4j.xml
index d133b8e..5ba6f16 100644
--- a/webapp/src/main/resources/log4j.xml
+++ b/webapp/src/main/resources/log4j.xml
@@ -54,6 +54,15 @@
         </layout>
     </appender>
 
+    <appender name="SECURITY" class="org.apache.log4j.DailyRollingFileAppender">
+        <param name="File" value="${user.dir}/target/logs/security.audit.log"/>
+        <param name="Append" value="true"/>
+        <param name="Threshold" value="debug"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %x %m%n"/>
+        </layout>
+    </appender>
+
     <logger name="org.apache.falcon" additivity="false">
         <level value="debug"/>
         <appender-ref ref="FILE"/>
@@ -69,6 +78,26 @@
         <appender-ref ref="METRIC"/>
     </logger>
 
+    <logger name="org.apache.hadoop.security" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="SECURITY"/>
+    </logger>
+
+    <logger name="org.apache.hadoop" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="FILE"/>
+    </logger>
+
+    <logger name="org.apache.oozie" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="FILE"/>
+    </logger>
+
+    <logger name="org.apache.hadoop.hive" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="FILE"/>
+    </logger>
+
     <root>
         <priority value="info"/>
         <appender-ref ref="FILE"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
index 9909140..fd004a1 100644
--- a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
+++ b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
@@ -19,6 +19,8 @@
 package org.apache.falcon.catalog;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.hcatalog.api.HCatAddPartitionDesc;
 import org.apache.hcatalog.api.HCatClient;
 import org.apache.hcatalog.api.HCatCreateDBDesc;
@@ -55,6 +57,9 @@ public class HiveCatalogServiceIT {
 
     @BeforeClass
     public void setUp() throws Exception {
+        // setup a logged in user
+        CurrentUser.authenticate(TestContext.REMOTE_USER);
+
         hiveCatalogService = new HiveCatalogService();
         client = HiveCatalogService.get(METASTORE_URL);
 
@@ -168,22 +173,23 @@ public class HiveCatalogServiceIT {
 
     @Test
     public void testIsAlive() throws Exception {
-        Assert.assertTrue(hiveCatalogService.isAlive(METASTORE_URL));
+        Assert.assertTrue(hiveCatalogService.isAlive(METASTORE_URL, "metaStorePrincipal"));
     }
 
-    @Test (expectedExceptions = FalconException.class)
+    @Test (expectedExceptions = Exception.class)
     public void testIsAliveNegative() throws Exception {
-        hiveCatalogService.isAlive("thrift://localhost:9999");
+        hiveCatalogService.isAlive("thrift://localhost:9999", "metaStorePrincipal");
     }
 
     @Test (expectedExceptions = FalconException.class)
     public void testTableExistsNegative() throws Exception {
-        hiveCatalogService.tableExists(METASTORE_URL, DATABASE_NAME, "blah");
+        hiveCatalogService.tableExists(METASTORE_URL, DATABASE_NAME, "blah", "metaStorePrincipal");
     }
 
     @Test
     public void testTableExists() throws Exception {
-        Assert.assertTrue(hiveCatalogService.tableExists(METASTORE_URL, DATABASE_NAME, TABLE_NAME));
+        Assert.assertTrue(hiveCatalogService.tableExists(
+                METASTORE_URL, DATABASE_NAME, TABLE_NAME, "metaStorePrincipal"));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 72369c0..5cd7beb 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.cli;
 
 import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.OozieTestUtils;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -39,9 +40,6 @@ import java.util.Map;
 public class FalconCLIIT {
 
     private InMemoryWriter stream = new InMemoryWriter(System.out);
-    // private static final String BROKER_URL =
-    // "tcp://localhost:61616?daemon=true";
-    private static final boolean TEST_ENABLED = true;
 
     @BeforeClass
     public void prepare() throws Exception {
@@ -56,7 +54,7 @@ public class FalconCLIIT {
         TestContext context = new TestContext();
         Map<String, String> overlay = context.getUniqueOverlay();
 
-        filePath = context.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
         Assert.assertEquals(
                 0,
                 executeWithURL("entity -submit -type cluster -file " + filePath));
@@ -64,7 +62,7 @@ public class FalconCLIIT {
         Assert.assertEquals(stream.buffer.toString().trim(),
                 "default/Submit successful (cluster) " + context.getClusterName());
 
-        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -submit -type feed -file " + filePath));
         Assert.assertEquals(
@@ -72,7 +70,7 @@ public class FalconCLIIT {
                 "default/Submit successful (feed) "
                         + overlay.get("inputFeedName"));
 
-        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -submit -type feed -file " + filePath));
         Assert.assertEquals(
@@ -80,7 +78,7 @@ public class FalconCLIIT {
                 "default/Submit successful (feed) "
                         + overlay.get("outputFeedName"));
 
-        filePath = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
         Assert.assertEquals(
                 0,
                 executeWithURL("entity -submit -type process -file " + filePath));
@@ -102,29 +100,29 @@ public class FalconCLIIT {
         TestContext context = new TestContext();
         Map<String, String> overlay = context.getUniqueOverlay();
 
-        filePath = context.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
+        filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
         Assert.assertEquals(-1,
                 executeWithURL("entity -submitAndSchedule -type cluster -file "
                         + filePath));
         context.setCluster(overlay.get("cluster"));
 
-        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -submitAndSchedule -type feed -file "
                         + filePath));
-        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -submitAndSchedule -type feed -file "
                         + filePath));
-        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -submit -type feed -file " + filePath));
 
-        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -submit -type feed -file " + filePath));
 
-        filePath = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -submitAndSchedule -type process -file "
                         + filePath));
@@ -136,7 +134,7 @@ public class FalconCLIIT {
         TestContext context = new TestContext();
         Map<String, String> overlay = context.getUniqueOverlay();
 
-        filePath = context.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
+        filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -validate -type cluster -file "
                         + filePath));
@@ -146,19 +144,19 @@ public class FalconCLIIT {
                 executeWithURL("entity -submit -type cluster -file " + filePath));
         context.setCluster(overlay.get("cluster"));
 
-        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -validate -type feed -file " + filePath));
         Assert.assertEquals(0,
                 executeWithURL("entity -submit -type feed -file " + filePath));
 
-        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -validate -type feed -file " + filePath));
         Assert.assertEquals(0,
                 executeWithURL("entity -submit -type feed -file " + filePath));
 
-        filePath = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -validate -type process -file "
                         + filePath));
@@ -239,7 +237,7 @@ public class FalconCLIIT {
                 executeWithURL("entity -schedule -type process -name "
                         + overlay.get("processName")));
 
-        context.waitForProcessWFtoStart();
+        OozieTestUtils.waitForProcessWFtoStart(context);
 
         Assert.assertEquals(
                 0,
@@ -329,7 +327,7 @@ public class FalconCLIIT {
 
         TestContext context = new TestContext();
         Map<String, String> overlay = context.getUniqueOverlay();
-        context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
         Assert.assertEquals(-1,
                 executeWithURL("entity -submit -type feed -name " + "name"));
 
@@ -349,7 +347,7 @@ public class FalconCLIIT {
         Assert.assertEquals(0,
                 executeWithURL("entity -schedule -type feed -name "
                         + overlay.get("outputFeedName")));
-        context.waitForProcessWFtoStart();
+        OozieTestUtils.waitForProcessWFtoStart(context);
 
         Assert.assertEquals(0,
                 executeWithURL("instance -status -type feed -name "
@@ -378,7 +376,7 @@ public class FalconCLIIT {
         Assert.assertEquals(0,
                 executeWithURL("entity -schedule -type feed -name "
                         + overlay.get("outputFeedName")));
-        context.waitForProcessWFtoStart();
+        OozieTestUtils.waitForProcessWFtoStart(context);
 
         Assert.assertEquals(0,
                 executeWithURL("instance -status -type feed -name "
@@ -428,7 +426,7 @@ public class FalconCLIIT {
                 executeWithURL("entity -schedule -type process -name "
                         + overlay.get("processName")));
 
-        context.waitForProcessWFtoStart();
+        OozieTestUtils.waitForProcessWFtoStart(context);
         Assert.assertEquals(
                 0,
                 executeWithURL("instance -kill -type process -name "
@@ -452,7 +450,7 @@ public class FalconCLIIT {
                 executeWithURL("entity -schedule -type process -name "
                         + overlay.get("processName")));
 
-        context.waitForProcessWFtoStart();
+        OozieTestUtils.waitForProcessWFtoStart(context);
         Assert.assertEquals(
                 0,
                 executeWithURL("instance -kill -type process -name "
@@ -481,7 +479,6 @@ public class FalconCLIIT {
         Assert.assertEquals(-1,
                 executeWithURL("instance -kill -type process -name "
                         + " -start 2010-01-01T01:00Z  -end 2010-01-01T03:00Z"));
-
     }
 
     public void testFalconURL() throws Exception {
@@ -495,8 +492,6 @@ public class FalconCLIIT {
                         + "processName -url http://unknownhost:1234/"
                         + " -start 2010-01-01T01:00Z  -end 2010-01-01T03:00Z")
                         .split("\\s")));
-
-
     }
 
     public void testClientProperties() throws Exception {
@@ -504,8 +499,7 @@ public class FalconCLIIT {
         Map<String, String> overlay = context.getUniqueOverlay();
         submitTestFiles(context, overlay);
 
-        Assert.assertEquals(
-                0,
+        Assert.assertEquals(0,
                 new FalconCLI().run(("entity -schedule -type feed -name "
                         + overlay.get("outputFeedName") + " -url "
                         + TestContext.BASE_URL).split("\\s+")));
@@ -514,15 +508,21 @@ public class FalconCLIIT {
                 new FalconCLI().run(("entity -schedule -type process -name "
                         + overlay.get("processName")+ " -url "
                         + TestContext.BASE_URL).split("\\s+")));
-
     }
 
     public void testGetVersion() throws Exception {
         Assert.assertEquals(0,
-                new FalconCLI().run("admin -version".split("\\s")));
+                new FalconCLI().run(("admin -version -url " + TestContext.BASE_URL).split("\\s")));
+    }
+
+    public void testGetStatus() throws Exception {
+        Assert.assertEquals(0,
+                new FalconCLI().run(("admin -status -url " + TestContext.BASE_URL).split("\\s")));
+    }
 
+    public void testGetThreadStackDump() throws Exception {
         Assert.assertEquals(0,
-                new FalconCLI().run("admin -stack".split("\\s")));
+                new FalconCLI().run(("admin -stack -url " + TestContext.BASE_URL).split("\\s")));
     }
 
     public void testInstanceGetLogs() throws Exception {
@@ -538,7 +538,6 @@ public class FalconCLIIT {
                 executeWithURL("instance -logs -type process -name "
                         + overlay.get("processName")
                         + " -start " + START_INSTANCE + " -end " + START_INSTANCE));
-
     }
 
     private int executeWithURL(String command) throws Exception {
@@ -560,22 +559,22 @@ public class FalconCLIIT {
 
     private void submitTestFiles(TestContext context, Map<String, String> overlay) throws Exception {
 
-        String filePath = context.overlayParametersOverTemplate(context.getClusterFileTemplate(),
+        String filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(),
                 overlay);
         Assert.assertEquals(
                 0,
                 executeWithURL("entity -submit -type cluster -file " + filePath));
         context.setCluster(overlay.get("cluster"));
 
-        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -submit -type feed -file " + filePath));
 
-        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -submit -type feed -file " + filePath));
 
-        filePath = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
         Assert.assertEquals(
                 0,
                 executeWithURL("entity -submit -type process -file " + filePath));
@@ -596,6 +595,7 @@ public class FalconCLIIT {
             super.println(x);
         }
 
+        @SuppressWarnings("UnusedDeclaration")
         public String getBuffer() {
             return buffer.toString();
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
index 55f240f..d503735 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.cli;
 
 import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.OozieTestUtils;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -44,39 +45,39 @@ public class FalconCLISmokeIT {
         TestContext context = new TestContext();
         Map<String, String> overlay = context.getUniqueOverlay();
 
-        filePath = context.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
+        filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
         Assert.assertEquals(-1,
                 executeWithURL("entity -submitAndSchedule -type cluster -file "
                         + filePath));
         context.setCluster(overlay.get("cluster"));
 
-        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -submitAndSchedule -type feed -file "
                         + filePath));
-        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -submitAndSchedule -type feed -file "
                         + filePath));
-        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -submit -type feed -file " + filePath));
 
-        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -submit -type feed -file " + filePath));
 
-        filePath = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -validate -type process -file "
                         + filePath));
 
-        filePath = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -submitAndSchedule -type process -file "
                         + filePath));
 
-        context.waitForProcessWFtoStart();
+        OozieTestUtils.waitForProcessWFtoStart(context);
 
         Assert.assertEquals(0,
                 executeWithURL("entity -definition -type cluster -name "
@@ -90,7 +91,6 @@ public class FalconCLISmokeIT {
         Assert.assertEquals(0,
                 executeWithURL("instance -running -type process -name "
                         + overlay.get("processName")));
-
     }
 
     private int executeWithURL(String command) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
index 6cfa4e6..ab60307 100644
--- a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
+++ b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
@@ -67,7 +67,7 @@ public class LateDataHandlerIT {
     public void prepare() throws Exception {
         TestContext.cleanupStore();
 
-        String filePath = context.overlayParametersOverTemplate(
+        String filePath = TestContext.overlayParametersOverTemplate(
                 TestContext.CLUSTER_TEMPLATE, context.getUniqueOverlay());
         context.setCluster(filePath);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
index 058b35c..92ff8ac 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
@@ -64,7 +64,7 @@ public class FileSystemFeedReplicationIT {
         TestContext.cleanupStore();
 
         Map<String, String> overlay = sourceContext.getUniqueOverlay();
-        String sourceFilePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+        String sourceFilePath = TestContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
         sourceContext.setCluster(sourceFilePath);
 
         final Cluster sourceCluster = sourceContext.getCluster().getCluster();
@@ -74,21 +74,21 @@ public class FileSystemFeedReplicationIT {
         final String sourcePath = sourceStorageUrl + SOURCE_LOCATION + PARTITION_VALUE;
         FSUtils.copyResourceToHDFS("/apps/data/data.txt", "data.txt", sourcePath);
 
-        String targetFilePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
+        String targetFilePath = TestContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
         targetContext.setCluster(targetFilePath);
 
         final Cluster targetCluster = targetContext.getCluster().getCluster();
         copyLibsToHDFS(targetCluster);
 
-        String file = targetAlphaContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
+        String file = TestContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
         targetAlphaContext.setCluster(file);
         copyLibsToHDFS(targetAlphaContext.getCluster().getCluster());
 
-        file = targetBetaContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
+        file = TestContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
         targetBetaContext.setCluster(file);
         copyLibsToHDFS(targetBetaContext.getCluster().getCluster());
 
-        file = targetGammaContext.overlayParametersOverTemplate("/table/target-cluster-gamma.xml", overlay);
+        file = TestContext.overlayParametersOverTemplate("/table/target-cluster-gamma.xml", overlay);
         targetGammaContext.setCluster(file);
         copyLibsToHDFS(targetGammaContext.getCluster().getCluster());
     }
@@ -123,17 +123,17 @@ public class FileSystemFeedReplicationIT {
     @Test (enabled = false)
     public void testFSReplicationSingleSourceToTarget() throws Exception {
         final Map<String, String> overlay = sourceContext.getUniqueOverlay();
-        String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+        String filePath = TestContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
         Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
 
-        filePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
+        filePath = TestContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
         Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
 
         // verify if the partition on the source exists - precondition
         FileSystem sourceFS = FileSystem.get(ClusterHelper.getConfiguration(sourceContext.getCluster().getCluster()));
         Assert.assertTrue(sourceFS.exists(new Path(SOURCE_LOCATION + PARTITION_VALUE)));
 
-        filePath = sourceContext.overlayParametersOverTemplate("/table/customer-fs-replicating-feed.xml", overlay);
+        filePath = TestContext.overlayParametersOverTemplate("/table/customer-fs-replicating-feed.xml", overlay);
         Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
 
         // wait until the workflow job completes
@@ -148,7 +148,7 @@ public class FileSystemFeedReplicationIT {
         Assert.assertTrue(fs.exists(new Path(TARGET_LOCATION + PARTITION_VALUE)));
 
         InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
-                .header("Remote-User", "guest")
+                .header("Cookie", targetContext.getAuthenticationToken())
                 .accept(MediaType.APPLICATION_JSON)
                 .get(InstancesResult.class);
         Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
@@ -161,23 +161,23 @@ public class FileSystemFeedReplicationIT {
     @Test (enabled = false)
     public void testFSReplicationSingleSourceToMultipleTargets() throws Exception {
         final Map<String, String> overlay = sourceContext.getUniqueOverlay();
-        String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+        String filePath = TestContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
         Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
 
-        filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
+        filePath = TestContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
         Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
 
-        filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
+        filePath = TestContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
         Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
 
-        filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-gamma.xml", overlay);
+        filePath = TestContext.overlayParametersOverTemplate("/table/target-cluster-gamma.xml", overlay);
         Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
 
         // verify if the partition on the source exists - precondition
         FileSystem sourceFS = FileSystem.get(ClusterHelper.getConfiguration(sourceContext.getCluster().getCluster()));
         Assert.assertTrue(sourceFS.exists(new Path(SOURCE_LOCATION + PARTITION_VALUE)));
 
-        filePath = sourceContext.overlayParametersOverTemplate("/table/multiple-targets-replicating-feed.xml", overlay);
+        filePath = TestContext.overlayParametersOverTemplate("/table/multiple-targets-replicating-feed.xml", overlay);
         Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
 
         // wait until the workflow job completes
@@ -201,7 +201,7 @@ public class FileSystemFeedReplicationIT {
                 gamma.exists(new Path("/falcon/test/target-cluster-gamma/customer_gamma/" + PARTITION_VALUE)));
 
         InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
-                .header("Remote-User", "guest")
+                .header("Cookie", targetContext.getAuthenticationToken())
                 .accept(MediaType.APPLICATION_JSON)
                 .get(InstancesResult.class);
         Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
@@ -226,19 +226,19 @@ public class FileSystemFeedReplicationIT {
         Assert.assertTrue(sourceFS.exists(sourcePath));
 
         final Map<String, String> overlay = sourceContext.getUniqueOverlay();
-        String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+        String filePath = TestContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
         Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
 
-        filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
+        filePath = TestContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
         Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
 
-        filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
+        filePath = TestContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
         Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
 
         // verify if the partition on the source exists - precondition
         Assert.assertTrue(sourceFS.exists(sourcePath));
 
-        filePath = sourceContext.overlayParametersOverTemplate("/table/complex-replicating-feed.xml", overlay);
+        filePath = TestContext.overlayParametersOverTemplate("/table/complex-replicating-feed.xml", overlay);
         Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
 
         // wait until the workflow job completes
@@ -257,7 +257,7 @@ public class FileSystemFeedReplicationIT {
         Assert.assertTrue(beta.exists(new Path("/localDC/rc/billing/ua2/" + partitionValue)));
 
         InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
-                .header("Remote-User", "guest")
+                .header("Cookie", targetContext.getAuthenticationToken())
                 .accept(MediaType.APPLICATION_JSON)
                 .get(InstancesResult.class);
         Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
index 37226e2..770780e 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
@@ -356,6 +356,7 @@ public class TableStorageFeedEvictorIT {
             super.println(x);
         }
 
+        @SuppressWarnings("UnusedDeclaration")
         public String getBuffer() {
             return buffer.toString();
         }