You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/02/16 04:31:26 UTC

[5/5] git commit: FALCON-11 Add support for security in Falcon. Contributed by Venkatesh Seetharam

FALCON-11 Add support for security in Falcon. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/3c51f105
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/3c51f105
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/3c51f105

Branch: refs/heads/master
Commit: 3c51f1053a3524557d1e1e58f0a4fc778431e75c
Parents: 2cb42df
Author: Venkatesh Seetharam <ve...@hortonworks.com>
Authored: Sat Feb 15 19:30:01 2014 -0800
Committer: Venkatesh Seetharam <ve...@hortonworks.com>
Committed: Sat Feb 15 19:30:01 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 client/pom.xml                                  |  24 +
 .../java/org/apache/falcon/cli/FalconCLI.java   |  43 +-
 .../org/apache/falcon/client/FalconClient.java  | 159 +++---
 common/pom.xml                                  |   5 +
 .../falcon/catalog/AbstractCatalogService.java  |  19 +-
 .../falcon/catalog/HiveCatalogService.java      |  88 ++-
 .../falcon/cleanup/AbstractCleanupHandler.java  |  19 +-
 .../falcon/cleanup/FeedCleanupHandler.java      |   2 +-
 .../apache/falcon/entity/CatalogStorage.java    |   6 -
 .../org/apache/falcon/entity/ClusterHelper.java |  24 +-
 .../org/apache/falcon/entity/EntityUtil.java    |   3 +-
 .../apache/falcon/entity/FileSystemStorage.java |   6 -
 .../java/org/apache/falcon/entity/Storage.java  |   9 -
 .../entity/parser/ClusterEntityParser.java      |  50 +-
 .../falcon/entity/parser/FeedEntityParser.java  |  29 +-
 .../entity/parser/ProcessEntityParser.java      |  13 +-
 .../falcon/entity/store/ConfigurationStore.java |  27 +-
 .../falcon/hadoop/HadoopClientFactory.java      | 202 +++++++
 .../AuthenticationInitializationService.java    | 122 ++++
 .../org/apache/falcon/security/CurrentUser.java |   3 +-
 .../falcon/security/FalconLoginModule.java      |  89 ---
 .../security/FalconSecurityConfiguration.java   |  52 --
 .../falcon/security/SecurityConstants.java      |  38 --
 .../apache/falcon/security/SecurityUtil.java    | 102 ++++
 .../org/apache/falcon/update/UpdateHelper.java  |  10 +-
 common/src/main/resources/startup.properties    |  64 ++-
 .../apache/falcon/entity/AbstractTestBase.java  |  13 +-
 .../falcon/entity/FileSystemStorageTest.java    |  12 -
 .../falcon/hadoop/HadoopClientFactoryTest.java  | 101 ++++
 ...AuthenticationInitializationServiceTest.java | 142 +++++
 .../falcon/security/SecurityUtilTest.java       |  69 +++
 docs/src/site/twiki/Security.twiki              | 193 +++++++
 docs/src/site/twiki/index.twiki                 |   3 +-
 docs/src/site/twiki/restapi/AdminConfig.twiki   |   1 -
 docs/src/site/twiki/restapi/AdminStack.twiki    |   1 -
 docs/src/site/twiki/restapi/AdminVersion.twiki  |   1 -
 .../site/twiki/restapi/EntityDefinition.twiki   |   1 -
 docs/src/site/twiki/restapi/EntityDelete.twiki  |   1 -
 .../site/twiki/restapi/EntityDependencies.twiki |   1 -
 docs/src/site/twiki/restapi/EntityList.twiki    |   2 -
 docs/src/site/twiki/restapi/EntityResume.twiki  |   1 -
 .../src/site/twiki/restapi/EntitySchedule.twiki |   1 -
 docs/src/site/twiki/restapi/EntityStatus.twiki  |   1 -
 docs/src/site/twiki/restapi/EntitySubmit.twiki  |   4 +-
 .../twiki/restapi/EntitySubmitAndSchedule.twiki |   1 -
 docs/src/site/twiki/restapi/EntitySuspend.twiki |   1 -
 docs/src/site/twiki/restapi/EntityUpdate.twiki  |   1 -
 .../src/site/twiki/restapi/EntityValidate.twiki |   6 +-
 docs/src/site/twiki/restapi/InstanceKill.twiki  |   1 -
 docs/src/site/twiki/restapi/InstanceLogs.twiki  |   1 -
 docs/src/site/twiki/restapi/InstanceRerun.twiki |   1 -
 .../src/site/twiki/restapi/InstanceResume.twiki |   1 -
 .../site/twiki/restapi/InstanceRunning.twiki    |   1 -
 .../src/site/twiki/restapi/InstanceStatus.twiki |   1 -
 .../site/twiki/restapi/InstanceSuspend.twiki    |   1 -
 docs/src/site/twiki/restapi/ResourceList.twiki  |  24 +
 .../falcon/converter/OozieFeedMapper.java       |  16 +-
 .../config/workflow/replication-workflow.xml    |   4 +
 .../config/workflow/retention-workflow.xml      |   6 +-
 .../falcon/converter/OozieFeedMapperTest.java   |   3 +
 hadoop-webapp/pom.xml                           |   5 -
 html5-ui/js/falcon.js                           |  13 +-
 messaging/pom.xml                               |   5 +
 .../falcon/messaging/EntityInstanceMessage.java |   3 +-
 .../messaging/EntityInstanceMessageCreator.java |  10 +-
 .../falcon/messaging/MessageProducer.java       |  13 +-
 .../messaging/FalconTopicProducerTest.java      |   6 +-
 .../falcon/messaging/FeedProducerTest.java      |   3 +
 .../falcon/messaging/ProcessProducerTest.java   |   5 +-
 .../org/apache/falcon/aspect/GenericAlert.java  |   4 +
 .../converter/AbstractOozieEntityMapper.java    |  40 +-
 .../org/apache/falcon/logging/LogMover.java     |   2 +-
 .../org/apache/falcon/logging/LogProvider.java  |  20 +-
 .../service/SharedLibraryHostingService.java    |  19 +-
 .../falcon/workflow/FalconPostProcessing.java   |  15 +-
 .../workflow/engine/OozieClientFactory.java     |  24 +-
 .../engine/OozieHouseKeepingService.java        |   3 +-
 .../workflow/engine/OozieWorkflowEngine.java    |  89 ++-
 .../apache/oozie/client/CustomOozieClient.java  | 101 ----
 .../apache/oozie/client/ProxyOozieClient.java   | 562 +++++++++++++++++++
 .../workflow/FalconPostProcessingTest.java      |   5 +
 pom.xml                                         |  17 +-
 .../falcon/resource/channel/HTTPChannel.java    |  24 +-
 .../apache/falcon/security/BasicAuthFilter.java | 186 ++++--
 ...eUserInHeaderBasedAuthenticationHandler.java |  49 ++
 .../falcon/service/FalconTopicSubscriber.java   |  67 +--
 .../service/ProcessSubscriberService.java       |   4 +-
 .../apache/falcon/aspect/GenericAlertTest.java  |   4 +-
 .../service/FalconTopicSubscriberTest.java      |   8 +-
 .../falcon/converter/OozieProcessMapper.java    |  43 +-
 .../config/workflow/process-parent-workflow.xml |   4 +
 .../converter/OozieProcessMapperTest.java       |   3 +
 .../apache/falcon/latedata/LateDataHandler.java |  22 +-
 .../apache/falcon/rerun/event/LaterunEvent.java |   9 +-
 .../apache/falcon/rerun/event/RerunEvent.java   |   8 +-
 .../falcon/rerun/event/RerunEventFactory.java   |   4 +-
 .../apache/falcon/rerun/event/RetryEvent.java   |   6 +-
 .../rerun/handler/AbstractRerunConsumer.java    |   8 +-
 .../rerun/handler/AbstractRerunHandler.java     |   7 +-
 .../falcon/rerun/handler/LateRerunConsumer.java |  26 +-
 .../falcon/rerun/handler/LateRerunHandler.java  |  18 +-
 .../falcon/rerun/handler/RetryConsumer.java     |   6 +-
 .../falcon/rerun/handler/RetryHandler.java      |  15 +-
 .../falcon/rerun/queue/InMemoryQueue.java       |   5 +-
 .../rerun/handler/TestLateRerunHandler.java     |   2 +-
 .../apache/falcon/rerun/queue/ActiveMQTest.java |   2 +-
 .../falcon/rerun/queue/InMemoryQueueTest.java   |   6 +-
 src/bin/falcon                                  |   2 +-
 src/conf/log4j.xml                              |  29 +
 src/conf/startup.properties                     |  68 ++-
 .../falcon/cluster/util/EmbeddedCluster.java    |  10 +-
 webapp/pom.xml                                  |   5 +
 webapp/src/conf/oozie/conf/oozie-site.xml       |  44 +-
 webapp/src/main/resources/log4j.xml             |  29 +
 .../falcon/catalog/HiveCatalogServiceIT.java    |  16 +-
 .../java/org/apache/falcon/cli/FalconCLIIT.java |  72 +--
 .../org/apache/falcon/cli/FalconCLISmokeIT.java |  18 +-
 .../apache/falcon/late/LateDataHandlerIT.java   |   2 +-
 .../lifecycle/FileSystemFeedReplicationIT.java  |  40 +-
 .../lifecycle/TableStorageFeedEvictorIT.java    |   1 +
 .../TableStorageFeedReplicationIT.java          |  20 +-
 .../org/apache/falcon/process/PigProcessIT.java |  12 +-
 .../falcon/process/TableStorageProcessIT.java   |  16 +-
 .../falcon/resource/EntityManagerJerseyIT.java  | 235 ++++----
 .../resource/EntityManagerJerseySmokeIT.java    |  36 +-
 .../resource/ProcessInstanceManagerIT.java      |  33 +-
 .../org/apache/falcon/resource/TestContext.java | 258 +++------
 .../falcon/security/BasicAuthFilterTest.java    |  92 ++-
 .../org/apache/falcon/util/OozieTestUtils.java  | 104 +++-
 .../util/ResourcesReflectionUtilTest.java       |   2 +-
 .../validation/ClusterEntityValidationIT.java   |   4 +-
 .../validation/FeedEntityValidationIT.java      |   2 +-
 133 files changed, 3123 insertions(+), 1353 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0462902..ecab28a 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@ Apache Falcon (incubating) Change log
 Trunk (Unreleased)
 
   INCOMPATIBLE CHANGES
+    FALCON-11 Add support for security in Falcon (Venkatesh Seetharam)
 
   NEW FEATURES
     FALCON-281 Design Action Interface. (Srikanth Sundarrajan)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index a43f7f5..63c4cce 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -32,6 +32,30 @@
     <name>Apache Falcon CLI client</name>
     <packaging>jar</packaging>
 
+    <profiles>
+        <profile>
+            <id>hadoop-1</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-core</artifactId>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
+            <id>hadoop-2</id>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+
     <dependencies>
         <dependency>
             <groupId>commons-cli</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index aa712ad..a414e32 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -138,12 +138,18 @@ public class FalconCLI {
             int exitValue = 0;
             if (command.getName().equals(HELP_CMD)) {
                 parser.showHelp();
-            } else if (command.getName().equals(ADMIN_CMD)) {
-                exitValue = adminCommand(command.getCommandLine());
-            } else if (command.getName().equals(ENTITY_CMD)) {
-                entityCommand(command.getCommandLine());
-            } else if (command.getName().equals(INSTANCE_CMD)) {
-                instanceCommand(command.getCommandLine());
+            } else {
+                CommandLine commandLine = command.getCommandLine();
+                String falconUrl = getFalconEndpoint(commandLine);
+                FalconClient client = new FalconClient(falconUrl);
+
+                if (command.getName().equals(ADMIN_CMD)) {
+                    exitValue = adminCommand(commandLine, client, falconUrl);
+                } else if (command.getName().equals(ENTITY_CMD)) {
+                    entityCommand(commandLine, client);
+                } else if (command.getName().equals(INSTANCE_CMD)) {
+                    instanceCommand(commandLine, client);
+                }
             }
 
             return exitValue;
@@ -167,10 +173,8 @@ public class FalconCLI {
         }
     }
 
-    private void instanceCommand(CommandLine commandLine) throws FalconCLIException, IOException {
-        String falconUrl = getFalconEndpoint(commandLine);
-        FalconClient client = new FalconClient(falconUrl);
-
+    private void instanceCommand(CommandLine commandLine, FalconClient client)
+        throws FalconCLIException, IOException {
         Set<String> optionsList = new HashSet<String>();
         for (Option option : commandLine.getOptions()) {
             optionsList.add(option.getOpt());
@@ -257,12 +261,8 @@ public class FalconCLI {
         }
     }
 
-    private void entityCommand(CommandLine commandLine)
+    private void entityCommand(CommandLine commandLine, FalconClient client)
         throws FalconCLIException, IOException {
-
-        String falconUrl = getFalconEndpoint(commandLine);
-        FalconClient client = new FalconClient(falconUrl);
-
         Set<String> optionsList = new HashSet<String>();
         for (Option option : commandLine.getOptions()) {
             optionsList.add(option.getOpt());
@@ -395,9 +395,12 @@ public class FalconCLI {
                 "show the current system status");
         Option version = new Option(VERSION_OPTION, false,
                 "show Falcon server build version");
+        Option stack = new Option(STACK_OPTION, false,
+                "show the thread stack dump");
         Option help = new Option("help", false, "show Falcon help");
         group.addOption(status);
         group.addOption(version);
+        group.addOption(stack);
         group.addOption(help);
 
         adminOptions.addOptionGroup(group);
@@ -587,11 +590,9 @@ public class FalconCLI {
         return url;
     }
 
-    private int adminCommand(CommandLine commandLine) throws FalconCLIException, IOException {
+    private int adminCommand(CommandLine commandLine, FalconClient client,
+                             String falconUrl) throws FalconCLIException, IOException {
         String result;
-        String falconUrl = getFalconEndpoint(commandLine);
-        FalconClient client = new FalconClient(falconUrl);
-
         Set<String> optionsList = new HashSet<String>();
         for (Option option : commandLine.getOptions()) {
             optionsList.add(option.getOpt());
@@ -603,9 +604,8 @@ public class FalconCLI {
         }
         int exitValue = 0;
         if (optionsList.contains(STATUS_OPTION)) {
-            int status = 0;
             try {
-                status = client.getStatus();
+                int status = client.getStatus();
                 if (status != 200) {
                     ERR.get().println("Falcon server is not fully operational (on " + falconUrl + "). "
                             + "Please check log files.");
@@ -623,6 +623,7 @@ public class FalconCLI {
         } else if (optionsList.contains(HELP_CMD)) {
             OUT.get().println("Falcon Help");
         }
+
         return exitValue;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 367fcc5..a5c31c2 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -27,6 +27,9 @@ import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.resource.InstancesResult;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
+import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
 import org.apache.falcon.resource.InstancesSummaryResult;
 
 import javax.ws.rs.HttpMethod;
@@ -41,6 +44,7 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
+import java.net.URL;
 import java.util.Date;
 import java.util.Map;
 import java.util.Properties;
@@ -51,55 +55,83 @@ import java.util.Properties;
  */
 public class FalconClient {
 
-    private final WebResource service;
     public static final String WS_HEADER_PREFIX = "header:";
-    private static final String REMOTE_USER = "Remote-User";
-    private static final String USER = System.getProperty("user.name");
+    public static final String USER = System.getProperty("user.name");
+    public static final String AUTH_URL = "api/options?" + PseudoAuthenticator.USER_NAME + "=" + USER;
+
     private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
     private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters";
 
     /**
+     * Name of the HTTP cookie used for the authentication token between the client and the server.
+     */
+    public static final String AUTH_COOKIE = "hadoop.auth";
+    private static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "=";
+    private static final KerberosAuthenticator AUTHENTICATOR = new KerberosAuthenticator();
+
+    private final WebResource service;
+    private final AuthenticatedURL.Token authenticationToken;
+
+    /**
      * Create a Falcon client instance.
      *
      * @param falconUrl of the server to which client interacts
-     * @throws IOException
+     * @throws FalconCLIException
      */
-    public FalconClient(String falconUrl) throws IOException {
+    public FalconClient(String falconUrl) throws FalconCLIException {
         String baseUrl = notEmpty(falconUrl, "FalconUrl");
         if (!baseUrl.endsWith("/")) {
             baseUrl += "/";
         }
+
         Client client = Client.create(new DefaultClientConfig());
         setFalconTimeOut(client);
         service = client.resource(UriBuilder.fromUri(baseUrl).build());
-        client.resource(UriBuilder.fromUri(baseUrl).build());
 
-        // addHeaders();
+        authenticationToken = getToken(baseUrl);
     }
 
-    private void setFalconTimeOut(Client client) throws IOException {
-        Properties prop = new Properties();
-        int readTimeout;
-        int connectTimeout;
-        InputStream inputStream = null;
+    private void setFalconTimeOut(Client client) throws FalconCLIException {
         try {
-            inputStream = FalconClient.class.getResourceAsStream("/client.properties");
-            if (inputStream != null) {
-                prop.load(inputStream);
-                readTimeout = prop.containsKey("falcon.read.timeout") ? Integer
-                        .parseInt(prop.getProperty("falcon.read.timeout")) : 180000;
-                connectTimeout = prop.containsKey("falcon.connect.timeout") ? Integer
-                        .parseInt(prop.getProperty("falcon.connect.timeout"))
-                        : 180000;
-            } else {
-                readTimeout = 180000;
-                connectTimeout = 180000;
+            Properties prop = new Properties();
+            int readTimeout;
+            int connectTimeout;
+            InputStream inputStream = null;
+            try {
+                inputStream = FalconClient.class.getResourceAsStream("/client.properties");
+                if (inputStream != null) {
+                    prop.load(inputStream);
+                    readTimeout = prop.containsKey("falcon.read.timeout") ? Integer
+                            .parseInt(prop.getProperty("falcon.read.timeout")) : 180000;
+                    connectTimeout = prop.containsKey("falcon.connect.timeout") ? Integer
+                            .parseInt(prop.getProperty("falcon.connect.timeout"))
+                            : 180000;
+                } else {
+                    readTimeout = 180000;
+                    connectTimeout = 180000;
+                }
+            } finally {
+                IOUtils.closeQuietly(inputStream);
             }
-        } finally {
-            IOUtils.closeQuietly(inputStream);
+            client.setConnectTimeout(connectTimeout);
+            client.setReadTimeout(readTimeout);
+        } catch (IOException e) {
+            throw new FalconCLIException("An error occurred while reading client.properties file.", e);
         }
-        client.setConnectTimeout(connectTimeout);
-        client.setReadTimeout(readTimeout);
+    }
+
+    public static AuthenticatedURL.Token getToken(String baseUrl) throws FalconCLIException {
+        AuthenticatedURL.Token currentToken = new AuthenticatedURL.Token();
+        try {
+            URL url = new URL(baseUrl + AUTH_URL);
+            // using KerberosAuthenticator which falls back to PsuedoAuthenticator
+            // instead of passing authentication type from the command line - bad factory
+            new AuthenticatedURL(AUTHENTICATOR).openConnection(url, currentToken);
+        } catch (Exception ex) {
+            throw new FalconCLIException("Could not authenticate, " + ex.getMessage(), ex);
+        }
+
+        return currentToken;
     }
 
     /**
@@ -234,10 +266,11 @@ public class FalconClient {
         if (effectiveTime != null) {
             resource = resource.queryParam("time", SchemaHelper.formatDateUTC(effectiveTime));
         }
-        ClientResponse clientResponse = resource.header(REMOTE_USER, USER)
+        ClientResponse clientResponse = resource
+                .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(operation.mimeType).type(MediaType.TEXT_XML)
                 .method(operation.method, ClientResponse.class, entityStream);
-        checkIfSuccessfull(clientResponse);
+        checkIfSuccessful(clientResponse);
         return parseAPIResult(clientResponse);
     }
 
@@ -253,7 +286,6 @@ public class FalconClient {
         throws FalconCLIException {
 
         return sendEntityRequest(Entities.STATUS, entityType, entityName, colo);
-
     }
 
     public String getDefinition(String entityType, String entityName)
@@ -261,7 +293,6 @@ public class FalconClient {
 
         return sendDefinitionRequest(Entities.DEFINITION, entityType,
                 entityName);
-
     }
 
     public String getDependency(String entityType, String entityName)
@@ -378,8 +409,9 @@ public class FalconClient {
     public int getStatus() throws FalconCLIException {
         AdminOperations job =  AdminOperations.VERSION;
         ClientResponse clientResponse = service.path(job.path)
-                .header(REMOTE_USER, USER).accept(job.mimeType)
-                .type(MediaType.TEXT_PLAIN).method(job.method, ClientResponse.class);
+                .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+                .accept(job.mimeType).type(MediaType.TEXT_PLAIN)
+                .method(job.method, ClientResponse.class);
         return clientResponse.getStatus();
     }
 
@@ -443,11 +475,12 @@ public class FalconClient {
         if (colo != null) {
             resource = resource.queryParam("colo", colo);
         }
-        ClientResponse clientResponse = resource.header(REMOTE_USER, USER)
+        ClientResponse clientResponse = resource
+                .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(entities.mimeType).type(MediaType.TEXT_XML)
                 .method(entities.method, ClientResponse.class);
 
-        checkIfSuccessfull(clientResponse);
+        checkIfSuccessful(clientResponse);
 
         return parseAPIResult(clientResponse);
     }
@@ -455,24 +488,26 @@ public class FalconClient {
     private String sendDefinitionRequest(Entities entities, String entityType,
                                          String entityName) throws FalconCLIException {
 
-        ClientResponse clientResponse = service.path(entities.path)
-                .path(entityType).path(entityName).header(REMOTE_USER, USER)
+        ClientResponse clientResponse = service
+                .path(entities.path).path(entityType).path(entityName)
+                .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(entities.mimeType).type(MediaType.TEXT_XML)
                 .method(entities.method, ClientResponse.class);
 
-        checkIfSuccessfull(clientResponse);
+        checkIfSuccessful(clientResponse);
         return clientResponse.getEntity(String.class);
     }
 
     private String sendDependencyRequest(Entities entities, String entityType,
                                          String entityName) throws FalconCLIException {
 
-        ClientResponse clientResponse = service.path(entities.path)
-                .path(entityType).path(entityName).header(REMOTE_USER, USER)
+        ClientResponse clientResponse = service
+                .path(entities.path).path(entityType).path(entityName)
+                .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(entities.mimeType).type(MediaType.TEXT_XML)
                 .method(entities.method, ClientResponse.class);
 
-        checkIfSuccessfull(clientResponse);
+        checkIfSuccessful(clientResponse);
 
         return parseEntityList(clientResponse);
     }
@@ -480,12 +515,13 @@ public class FalconClient {
     private String sendListRequest(Entities entities, String entityType)
         throws FalconCLIException {
 
-        ClientResponse clientResponse = service.path(entities.path)
-                .path(entityType).header(REMOTE_USER, USER)
+        ClientResponse clientResponse = service
+                .path(entities.path).path(entityType)
+                .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(entities.mimeType).type(MediaType.TEXT_XML)
                 .method(entities.method, ClientResponse.class);
 
-        checkIfSuccessfull(clientResponse);
+        checkIfSuccessful(clientResponse);
 
         return parseEntityList(clientResponse);
     }
@@ -497,29 +533,16 @@ public class FalconClient {
         if (colo != null) {
             resource = resource.queryParam("colo", colo);
         }
-        ClientResponse clientResponse = resource.header(REMOTE_USER, USER)
+        ClientResponse clientResponse = resource
+                .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(entities.mimeType).type(MediaType.TEXT_XML)
                 .method(entities.method, ClientResponse.class, requestObject);
 
-        checkIfSuccessfull(clientResponse);
+        checkIfSuccessful(clientResponse);
 
         return parseAPIResult(clientResponse);
     }
 
-    public InstancesResult instanceCmd(Instances instances, String type, String name,
-                                       String start, String end, String colo) {
-        WebResource resource = service.path(instances.path).path(type).path(name);
-        resource = resource.queryParam("start", start);
-        if (end != null) {
-            resource = resource.queryParam("end", end);
-        }
-        resource = resource.queryParam("colo", colo);
-
-        return resource.header(REMOTE_USER, USER)
-                .accept(instances.mimeType)
-                .method(instances.method, InstancesResult.class);
-    }
-
     //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
     private String sendInstanceRequest(Instances instances, String type,
                                        String entity, String start, String end, InputStream props,
@@ -541,15 +564,17 @@ public class FalconClient {
 
         ClientResponse clientResponse;
         if (props == null) {
-            clientResponse = resource.header(REMOTE_USER, USER)
+            clientResponse = resource
+                    .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                     .accept(instances.mimeType)
                     .method(instances.method, ClientResponse.class);
         } else {
-            clientResponse = resource.header(REMOTE_USER, USER)
+            clientResponse = resource
+                    .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                     .accept(instances.mimeType)
                     .method(instances.method, ClientResponse.class, props);
         }
-        checkIfSuccessfull(clientResponse);
+        checkIfSuccessful(clientResponse);
 
         if (instances.name().equals("LOG")) {
             return parseProcessInstanceResultLogs(clientResponse, runid);
@@ -566,8 +591,10 @@ public class FalconClient {
         throws FalconCLIException {
 
         ClientResponse clientResponse = service.path(job.path)
-                .header(REMOTE_USER, USER).accept(job.mimeType)
-                .type(MediaType.TEXT_PLAIN).method(job.method, ClientResponse.class);
+                .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+                .accept(job.mimeType)
+                .type(job.mimeType)
+                .method(job.method, ClientResponse.class);
         return parseStringResult(clientResponse);
     }
 
@@ -720,7 +747,7 @@ public class FalconClient {
         return sb.toString();
     }
 
-    private void checkIfSuccessfull(ClientResponse clientResponse)
+    private void checkIfSuccessful(ClientResponse clientResponse)
         throws FalconCLIException {
 
         if (clientResponse.getStatus() == Response.Status.BAD_REQUEST

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 068a22c..c55c989 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -153,6 +153,11 @@
         </dependency>
 
         <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>net.sourceforge.findbugs</groupId>
             <artifactId>annotations</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
index 691d805..fc9c3b1 100644
--- a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
@@ -32,11 +32,13 @@ public abstract class AbstractCatalogService {
     /**
      * This method checks if the catalog service is alive.
      *
-     * @param catalogBaseUrl url for the catalog service
+     * @param catalogUrl url for the catalog service
+     * @param metaStorePrincipal kerberos principal for hive metastore as this is executed in falcon on behalf of user
      * @return if the service was reachable
      * @throws FalconException exception
      */
-    public abstract boolean isAlive(String catalogBaseUrl) throws FalconException;
+    public abstract boolean isAlive(String catalogUrl,
+                                    String metaStorePrincipal) throws FalconException;
 
     /**
      * This method checks if the given table exists in the catalog.
@@ -44,14 +46,15 @@ public abstract class AbstractCatalogService {
      * @param catalogUrl url for the catalog service
      * @param database database the table belongs to
      * @param tableName tableName to check if it exists
+     * @param metaStorePrincipal kerberos principal for hive metastore as this is executed in falcon on behalf of user
      * @return if the table exists
      * @throws FalconException exception
      */
-    public abstract boolean tableExists(String catalogUrl, String database, String tableName)
-        throws FalconException;
+    public abstract boolean tableExists(String catalogUrl, String database, String tableName,
+                                        String metaStorePrincipal) throws FalconException;
 
     /**
-     * Returns if the table is external or not.
+     * Returns if the table is external or not. Executed in the workflow engine.
      *
      * @param catalogUrl url for the catalog service
      * @param database database the table belongs to
@@ -63,7 +66,7 @@ public abstract class AbstractCatalogService {
                                             String tableName) throws FalconException;
 
     /**
-     * List partitions by filter.
+     * List partitions by filter. Executed in the workflow engine.
      *
      * @param catalogUrl url for the catalog service
      * @param database database the table belongs to
@@ -79,7 +82,7 @@ public abstract class AbstractCatalogService {
         throws FalconException;
 
     /**
-     * Drops a given partition.
+     * Drops a given partition. Executed in the workflow engine.
      *
      * @param catalogUrl url for the catalog service
      * @param database database the table belongs to
@@ -92,7 +95,7 @@ public abstract class AbstractCatalogService {
                                            Map<String, String> partitions) throws FalconException;
 
     /**
-     * Gets the partition.
+     * Gets the partition. Executed in the workflow engine.
      *
      * @param catalogUrl url for the catalog service
      * @param database database the table belongs to

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
index 51e4d6e..3c3660e 100644
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -23,6 +23,7 @@ import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hcatalog.api.HCatClient;
 import org.apache.hcatalog.api.HCatDatabase;
 import org.apache.hcatalog.api.HCatPartition;
@@ -32,6 +33,8 @@ import org.apache.hcatalog.common.HCatException;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.log4j.Logger;
 
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -67,63 +70,94 @@ public class HiveCatalogService extends AbstractCatalogService {
 
     private static HCatClient getHCatClient(String metastoreUrl) throws FalconException {
         try {
-            HiveConf hcatConf = new HiveConf();
-            hcatConf.set("hive.metastore.local", "false");
-            hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl);
-            hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
-            hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
-                    HCatSemanticAnalyzer.class.getName());
-            hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
-
-            hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
-            hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
-
+            HiveConf hcatConf = createHiveConf(metastoreUrl);
             return HCatClient.create(hcatConf);
         } catch (HCatException e) {
-            throw new FalconException(e);
+            throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e);
         }
     }
 
+    private static HiveConf createHiveConf(String metastoreUrl) {
+        HiveConf hcatConf = new HiveConf();
+        hcatConf.set("hive.metastore.local", "false");
+        hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl);
+        hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+        hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+
+        hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+        hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+        return hcatConf;
+    }
+
+    public static synchronized HCatClient getProxiedClient(String catalogUrl,
+                                                           String metaStorePrincipal) throws FalconException {
+        if (!CACHE.containsKey(catalogUrl)) {
+            try {
+                final HiveConf hcatConf = createHiveConf(catalogUrl);
+                if (UserGroupInformation.isSecurityEnabled()) {
+                    hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, metaStorePrincipal);
+                    hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
+                }
+
+                LOG.info("Creating and caching HCatalog client object for " + catalogUrl);
+                UserGroupInformation currentUser = UserGroupInformation.getLoginUser();
+                HCatClient hcatClient = currentUser.doAs(new PrivilegedExceptionAction<HCatClient>() {
+                    public HCatClient run() throws Exception {
+                        return HCatClient.create(hcatConf);
+                    }
+                });
+                CACHE.putIfAbsent(catalogUrl, hcatClient);
+            } catch (IOException e) {
+                throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(), e);
+            } catch (InterruptedException e) {
+                throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(), e);
+            }
+        }
+
+        return CACHE.get(catalogUrl);
+    }
 
     @Override
-    public boolean isAlive(String catalogBaseUrl) throws FalconException {
-        LOG.info("Checking if the service is alive for: " + catalogBaseUrl);
+    public boolean isAlive(final String catalogUrl,
+                           final String metaStorePrincipal) throws FalconException {
+        LOG.info("Checking if the service is alive for: " + catalogUrl);
 
         try {
-            HCatClient client = get(catalogBaseUrl);
-            client.close();
+            HCatClient client = getProxiedClient(catalogUrl, metaStorePrincipal);
             HCatDatabase database = client.getDatabase("default");
             return database != null;
         } catch (HCatException e) {
-            throw new FalconException(e);
+            throw new FalconException("Exception checking if the service is alive:" + e.getMessage(), e);
         }
     }
 
     @Override
-    public boolean tableExists(String catalogUrl, String database, String tableName)
-        throws FalconException {
+    public boolean tableExists(final String catalogUrl, final String database, final String tableName,
+                               final String metaStorePrincipal) throws FalconException {
         LOG.info("Checking if the table exists: " + tableName);
 
         try {
-            HCatClient client = get(catalogUrl);
+            HCatClient client = getProxiedClient(catalogUrl, metaStorePrincipal);
             HCatTable table = client.getTable(database, tableName);
             return table != null;
         } catch (HCatException e) {
-            throw new FalconException(e);
+            throw new FalconException("Exception checking if the table exists:" + e.getMessage(), e);
         }
     }
 
     @Override
     public boolean isTableExternal(String catalogUrl, String database, String tableName)
         throws FalconException {
-        LOG.info("Returns a list of table properties for:" + tableName);
+        LOG.info("Checking if the table is external:" + tableName);
 
         try {
             HCatClient client = get(catalogUrl);
             HCatTable table = client.getTable(database, tableName);
             return !table.getTabletype().equals("MANAGED_TABLE");
         } catch (HCatException e) {
-            throw new FalconException(e);
+            throw new FalconException("Exception checking if the table is external:" + e.getMessage(), e);
         }
     }
 
@@ -145,7 +179,7 @@ public class HiveCatalogService extends AbstractCatalogService {
 
             return catalogPartitionList;
         } catch (HCatException e) {
-            throw new FalconException(e);
+            throw new FalconException("Exception listing partitions:" + e.getMessage(), e);
         }
     }
 
@@ -180,7 +214,7 @@ public class HiveCatalogService extends AbstractCatalogService {
             HCatClient client = get(catalogUrl);
             client.dropPartitions(database, tableName, partitions, true);
         } catch (HCatException e) {
-            throw new FalconException(e);
+            throw new FalconException("Exception dropping partitions:" + e.getMessage(), e);
         }
 
         return true;
@@ -189,14 +223,14 @@ public class HiveCatalogService extends AbstractCatalogService {
     @Override
     public CatalogPartition getPartition(String catalogUrl, String database, String tableName,
                                          Map<String, String> partitionSpec) throws FalconException {
-        LOG.info("List partitions for : " + tableName + ", partition spec: " + partitionSpec);
+        LOG.info("Fetch partition for : " + tableName + ", partition spec: " + partitionSpec);
 
         try {
             HCatClient client = get(catalogUrl);
             HCatPartition hCatPartition = client.getPartition(database, tableName, partitionSpec);
             return createCatalogPartition(hCatPartition);
         } catch (HCatException e) {
-            throw new FalconException(e);
+            throw new FalconException("Exception fetching partition:" + e.getMessage(), e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
index 644afd2..20d46c3 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
@@ -26,9 +26,9 @@ import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -44,8 +44,8 @@ import java.io.IOException;
  */
 public abstract class AbstractCleanupHandler {
 
-    protected static final Logger LOG = Logger
-            .getLogger(AbstractCleanupHandler.class);
+    protected static final Logger LOG = Logger.getLogger(AbstractCleanupHandler.class);
+
     protected static final ConfigurationStore STORE = ConfigurationStore.get();
     public static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
     public static final ExpressionHelper RESOLVER = ExpressionHelper.get();
@@ -66,7 +66,6 @@ public abstract class AbstractCleanupHandler {
     private String getRetentionValue(Frequency.TimeUnit timeunit) {
         return RuntimeProperties.get().getProperty(
                 "log.cleanup.frequency." + timeunit + ".retention", "days(1)");
-
     }
 
     protected FileStatus[] getAllLogs(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
@@ -87,14 +86,7 @@ public abstract class AbstractCleanupHandler {
     protected FileSystem getFileSystem(org.apache.falcon.entity.v0.cluster.Cluster cluster)
         throws FalconException {
 
-        FileSystem fs;
-        try {
-            fs = new Path(ClusterHelper.getStorageUrl(cluster))
-                    .getFileSystem(new Configuration());
-        } catch (IOException e) {
-            throw new FalconException(e);
-        }
-        return fs;
+        return HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
     }
 
     protected void delete(Cluster cluster, Entity entity, long retention)
@@ -116,8 +108,7 @@ public abstract class AbstractCleanupHandler {
         for (FileStatus log : logs) {
             if (now - log.getModificationTime() > retention) {
                 try {
-                    boolean isDeleted = getFileSystem(cluster).delete(
-                            log.getPath(), true);
+                    boolean isDeleted = getFileSystem(cluster).delete(log.getPath(), true);
                     if (!isDeleted) {
                         LOG.error("Unable to delete path: " + log.getPath());
                     } else {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
index 7dbac58..58d2199 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
@@ -56,7 +56,7 @@ public class FeedCleanupHandler extends AbstractCleanupHandler {
                     delete(currentCluster, feed, retention);
                     deleteStagedData(currentCluster, feed, retention);
                 } else {
-                    LOG.info("Ignoring cleanup for process:" + feedName
+                    LOG.info("Ignoring cleanup for feed:" + feedName
                             + " in  cluster: " + cluster.getName() + " as this does not belong to current colo");
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index 32f7605..ed9b238 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -19,7 +19,6 @@
 package org.apache.falcon.entity;
 
 import org.apache.falcon.FalconException;
-import org.apache.falcon.catalog.CatalogServiceFactory;
 import org.apache.falcon.entity.common.FeedDataPath;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
@@ -325,11 +324,6 @@ public class CatalogStorage implements Storage {
     }
 
     @Override
-    public boolean exists() throws FalconException {
-        return CatalogServiceFactory.getCatalogService().tableExists(catalogUrl, database, table);
-    }
-
-    @Override
     public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
         CatalogStorage catalogStorage = (CatalogStorage) toCompareAgainst;
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index 38b5c5b..c0f3ee2 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -18,14 +18,11 @@
 
 package org.apache.falcon.entity;
 
-import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.cluster.*;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import java.io.IOException;
-
 /**
  * Helper to get end points relating to the cluster.
  */
@@ -37,22 +34,21 @@ public final class ClusterHelper {
 
     public static Configuration getConfiguration(Cluster cluster) {
         Configuration conf = new Configuration();
-        conf.set("fs.default.name", getStorageUrl(cluster));
-        conf.set("mapred.job.tracker", getMREndPoint(cluster));
+
+        final String storageUrl = getStorageUrl(cluster);
+        conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl);
+
+        final String executeEndPoint = getMREndPoint(cluster);
+        conf.set(HadoopClientFactory.MR_JOB_TRACKER_KEY, executeEndPoint);
+        conf.set(HadoopClientFactory.YARN_RM_ADDRESS_KEY, executeEndPoint);
+
         if (cluster.getProperties() != null) {
             for (Property prop : cluster.getProperties().getProperties()) {
                 conf.set(prop.getName(), prop.getValue());
             }
         }
-        return conf;
-    }
 
-    public static FileSystem getFileSystem(Cluster cluster) throws FalconException {
-        try {
-            return FileSystem.get(getConfiguration(cluster));
-        } catch (IOException e) {
-            throw new FalconException(e);
-        }
+        return conf;
     }
 
     public static String getOozieUrl(Cluster cluster) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index a3ad83d..b4bc07d 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -34,6 +34,7 @@ import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.*;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.hadoop.fs.FileStatus;
@@ -553,7 +554,7 @@ public final class EntityUtil {
     private static Path getStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Path path)
         throws FalconException {
         try {
-            FileSystem fs = ClusterHelper.getFileSystem(cluster);
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
             FileStatus latest = null;
             FileStatus[] files = fs.globStatus(path, new PathFilter() {
                 @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 68370c7..41917c8 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -170,12 +170,6 @@ public class FileSystemStorage implements Storage {
     }
 
     @Override
-    public boolean exists() throws FalconException {
-        // Directories on FS will be created if they don't exist.
-        return true;
-    }
-
-    @Override
     public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
         FileSystemStorage fsStorage = (FileSystemStorage) toCompareAgainst;
         final List<Location> fsStorageLocations = fsStorage.getLocations();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/Storage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java
index 0634969..60c87c5 100644
--- a/common/src/main/java/org/apache/falcon/entity/Storage.java
+++ b/common/src/main/java/org/apache/falcon/entity/Storage.java
@@ -65,15 +65,6 @@ public interface Storage {
     String getUriTemplate(LocationType locationType);
 
     /**
-     * Check if the storage, filesystem location or catalog table exists.
-     * Filesystem location always returns true.
-     *
-     * @return true if table exists else false
-     * @throws FalconException an exception
-     */
-    boolean exists() throws FalconException;
-
-    /**
      * Check for equality of this instance against the one in question.
      *
      * @param toCompareAgainst instance to compare

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index e633838..831bfdc 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import javax.jms.ConnectionFactory;
 
+import org.apache.commons.lang.Validate;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.catalog.CatalogServiceFactory;
 import org.apache.falcon.entity.ClusterHelper;
@@ -29,16 +30,17 @@ import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.StoreAccessException;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Logger;
+
 /**
  * Parser that parses cluster entity definition.
  */
@@ -51,8 +53,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
     }
 
     @Override
-    public void validate(Cluster cluster) throws StoreAccessException,
-                                                 ValidationException {
+    public void validate(Cluster cluster) throws StoreAccessException, ValidationException {
         // validating scheme in light of fail-early
         validateScheme(cluster, Interfacetype.READONLY);
         validateScheme(cluster, Interfacetype.WRITE);
@@ -88,23 +89,34 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
         final String readOnlyStorageUrl = ClusterHelper.getReadOnlyStorageUrl(cluster);
         LOG.info("Validating read interface: " + readOnlyStorageUrl);
 
-        validateFileSystem(readOnlyStorageUrl);
+        validateFileSystem(cluster, readOnlyStorageUrl);
     }
 
     private void validateWriteInterface(Cluster cluster) throws ValidationException {
         final String writeStorageUrl = ClusterHelper.getStorageUrl(cluster);
         LOG.info("Validating write interface: " + writeStorageUrl);
 
-        validateFileSystem(writeStorageUrl);
+        validateFileSystem(cluster, writeStorageUrl);
     }
 
-    private void validateFileSystem(String storageUrl) throws ValidationException {
+    private void validateFileSystem(Cluster cluster, String storageUrl) throws ValidationException {
         try {
             Configuration conf = new Configuration();
-            conf.set("fs.default.name", storageUrl);
+            conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl);
             conf.setInt("ipc.client.connect.max.retries", 10);
-            FileSystem.get(conf);
-        } catch (IOException e) {
+
+            if (UserGroupInformation.isSecurityEnabled()) {
+                String nameNodePrincipal = ClusterHelper.getPropertyValue(cluster, SecurityUtil.NN_PRINCIPAL);
+                Validate.notEmpty(nameNodePrincipal,
+                    "Cluster definition missing required namenode credential property: " + SecurityUtil.NN_PRINCIPAL);
+
+                conf.set(SecurityUtil.NN_PRINCIPAL, nameNodePrincipal);
+            }
+
+            // todo: ideally check if the end user has access using createProxiedFileSystem
+            // hftp won't work and bug is logged at HADOOP-10215
+            HadoopClientFactory.get().createFileSystem(conf);
+        } catch (FalconException e) {
             throw new ValidationException("Invalid storage server or port: " + storageUrl, e);
         }
     }
@@ -114,11 +126,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
         LOG.info("Validating execute interface: " + executeUrl);
 
         try {
-            JobConf jobConf = new JobConf();
-            jobConf.set("mapred.job.tracker", executeUrl);
-            jobConf.set("yarn.resourcemanager.address", executeUrl);
-            JobClient jobClient = new JobClient(jobConf);
-            jobClient.getClusterStatus().getMapTasks();
+            HadoopClientFactory.validateJobClient(executeUrl);
         } catch (IOException e) {
             throw new ValidationException("Invalid Execute server or port: " + executeUrl, e);
         }
@@ -173,7 +181,15 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
         LOG.info("Validating catalog registry interface: " + catalogUrl);
 
         try {
-            if (!CatalogServiceFactory.getCatalogService().isAlive(catalogUrl)) {
+            String metaStorePrincipal = null;
+            if (UserGroupInformation.isSecurityEnabled()) {
+                metaStorePrincipal = ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL);
+                Validate.notEmpty(metaStorePrincipal,
+                        "Cluster definition missing required metastore credential property: "
+                                + SecurityUtil.HIVE_METASTORE_PRINCIPAL);
+            }
+
+            if (!CatalogServiceFactory.getCatalogService().isAlive(catalogUrl, metaStorePrincipal)) {
                 throw new ValidationException("Unable to reach Catalog server:" + catalogUrl);
             }
         } catch (FalconException e) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 5c1d9ad..d138179 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -20,7 +20,9 @@ package org.apache.falcon.entity.parser;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.catalog.CatalogServiceFactory;
 import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.Storage;
@@ -38,6 +40,7 @@ import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.group.FeedGroup;
 import org.apache.falcon.group.FeedGroupMap;
+import org.apache.falcon.security.SecurityUtil;
 import org.apache.log4j.Logger;
 
 import java.util.Date;
@@ -344,6 +347,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
     }
 
     private void validateStorageExists(Feed feed) throws FalconException {
+        StringBuilder buffer = new StringBuilder();
         for (Cluster cluster : feed.getClusters().getClusters()) {
             org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
                     EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
@@ -352,12 +356,27 @@ public class FeedEntityParser extends EntityParser<Feed> {
             }
 
             final Storage storage = FeedHelper.createStorage(cluster, feed);
-            if (!storage.exists()) {
-                // this is only true for table, filesystem always returns true
-                CatalogStorage catalogStorage = (CatalogStorage) storage;
-                throw new ValidationException("Table [" + catalogStorage.getTable()
-                        + "] does not exist for feed: " + feed.getName() + ", cluster: " + cluster.getName());
+            // this is only true for table, filesystem always returns true
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                continue;
+            }
+
+            CatalogStorage catalogStorage = (CatalogStorage) storage;
+            String metaStorePrincipal = ClusterHelper.getPropertyValue(clusterEntity,
+                    SecurityUtil.HIVE_METASTORE_PRINCIPAL);
+            if (!CatalogServiceFactory.getCatalogService().tableExists(catalogStorage.getCatalogUrl(),
+                    catalogStorage.getDatabase(), catalogStorage.getTable(), metaStorePrincipal)) {
+                buffer.append("Table [")
+                        .append(catalogStorage.getTable())
+                        .append("] does not exist for feed: ")
+                        .append(feed.getName())
+                        .append(" in cluster: ")
+                        .append(cluster.getName());
             }
         }
+
+        if (buffer.length() > 0) {
+            throw new ValidationException(buffer.toString());
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 8647d43..837b86a 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -41,6 +41,7 @@ import org.apache.falcon.entity.v0.process.LateInput;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -97,6 +98,13 @@ public class ProcessEntityParser extends EntityParser<Process> {
         validateLateInputs(process);
     }
 
+    /**
+     * Validate if the user submitting this entity has access to the specific dirs on HDFS.
+     *
+     * @param process process
+     * @param clusterName cluster the process is materialized on
+     * @throws FalconException
+     */
     private void validateHDFSPaths(Process process, String clusterName) throws FalconException {
         org.apache.falcon.entity.v0.cluster.Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER,
                 clusterName);
@@ -109,9 +117,8 @@ public class ProcessEntityParser extends EntityParser<Process> {
         String libPath = process.getWorkflow().getLib();
         String nameNode = getNameNode(cluster, clusterName);
         try {
-            Configuration configuration = new Configuration();
-            configuration.set("fs.default.name", nameNode);
-            FileSystem fs = FileSystem.get(configuration);
+            Configuration configuration = ClusterHelper.getConfiguration(cluster);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(configuration);
             if (!fs.exists(new Path(workflowPath))) {
                 throw new ValidationException(
                         "Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index 18ceb6e..156fafe 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -21,14 +21,16 @@ package org.apache.falcon.entity.store;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.service.ConfigurationChangeListener;
 import org.apache.falcon.service.FalconService;
 import org.apache.falcon.util.ReflectionUtils;
 import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.log4j.Logger;
 
 import javax.xml.bind.JAXBException;
@@ -84,8 +86,26 @@ public final class ConfigurationStore implements FalconService {
 
         String uri = StartupProperties.get().getProperty("config.store.uri");
         storePath = new Path(uri);
+        fs = initializeFileSystem();
+    }
+
+    /**
+     * Falcon owns this dir on HDFS which no one has permissions to read.
+     *
+     * @return FileSystem handle
+     */
+    private FileSystem initializeFileSystem() {
         try {
-            fs = FileSystem.get(storePath.toUri(), new Configuration());
+            FileSystem fileSystem = HadoopClientFactory.get().createFileSystem(storePath.toUri());
+            if (!fileSystem.exists(storePath)) {
+                LOG.info("Creating configuration store directory: " + storePath);
+                fileSystem.mkdirs(storePath);
+                // set permissions so config store dir is owned by falcon alone
+                FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
+                fileSystem.setPermission(storePath, permission);
+            }
+
+            return fileSystem;
         } catch (Exception e) {
             throw new RuntimeException("Unable to bring up config store", e);
         }
@@ -305,8 +325,7 @@ public final class ConfigurationStore implements FalconService {
         Path archivePath = new Path(storePath, "archive" + Path.SEPARATOR + type);
         fs.mkdirs(archivePath);
         fs.rename(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml"),
-                new Path(archivePath,
-                        URLEncoder.encode(name, UTF_8) + "." + System.currentTimeMillis()));
+                new Path(archivePath, URLEncoder.encode(name, UTF_8) + "." + System.currentTimeMillis()));
         LOG.info("Archived configuration " + type + "/" + name);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
new file mode 100644
index 0000000..d5fbda8
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hadoop;
+
+import org.apache.commons.lang.Validate;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * A factory implementation to dole out FileSystem handles based on the logged in user.
+ */
+public final class HadoopClientFactory {
+
+    public static final String FS_DEFAULT_NAME_KEY = "fs.default.name";
+    public static final String MR_JOB_TRACKER_KEY = "mapred.job.tracker";
+    public static final String YARN_RM_ADDRESS_KEY = "yarn.resourcemanager.address";
+
+    private static final HadoopClientFactory INSTANCE = new HadoopClientFactory();
+
+    private HadoopClientFactory() {
+    }
+
+    public static HadoopClientFactory get() {
+        return INSTANCE;
+    }
+
+    /**
+     * This method is only used by Falcon internally to talk to the config store on HDFS.
+     *
+     * @param uri file system URI for config store.
+     * @return FileSystem created with the provided proxyUser/group.
+     * @throws org.apache.falcon.FalconException
+     *          if the filesystem could not be created.
+     */
+    public FileSystem createFileSystem(final URI uri) throws FalconException {
+        Validate.notNull(uri, "uri cannot be null");
+
+        try {
+            Configuration conf = new Configuration();
+            if (UserGroupInformation.isSecurityEnabled()) {
+                conf.set(SecurityUtil.NN_PRINCIPAL, StartupProperties.get().getProperty(SecurityUtil.NN_PRINCIPAL));
+            }
+
+            return createFileSystem(UserGroupInformation.getLoginUser(), uri, conf);
+        } catch (IOException e) {
+            throw new FalconException("Exception while getting FileSystem for: " + uri, e);
+        }
+    }
+
+    public FileSystem createFileSystem(final Configuration conf)
+        throws FalconException {
+        Validate.notNull(conf, "configuration cannot be null");
+
+        String nameNode = conf.get(FS_DEFAULT_NAME_KEY);
+        try {
+            return createFileSystem(UserGroupInformation.getLoginUser(), new URI(nameNode), conf);
+        } catch (URISyntaxException e) {
+            throw new FalconException("Exception while getting FileSystem for: " + nameNode, e);
+        } catch (IOException e) {
+            throw new FalconException("Exception while getting FileSystem for: " + nameNode, e);
+        }
+    }
+
+    public FileSystem createFileSystem(final URI uri, final Configuration conf)
+        throws FalconException {
+        Validate.notNull(uri, "uri cannot be null");
+
+        try {
+            return createFileSystem(UserGroupInformation.getLoginUser(), uri, conf);
+        } catch (IOException e) {
+            throw new FalconException("Exception while getting FileSystem for: " + uri, e);
+        }
+    }
+
+    public FileSystem createProxiedFileSystem(final Configuration conf)
+        throws FalconException {
+        Validate.notNull(conf, "configuration cannot be null");
+
+        String nameNode = conf.get(FS_DEFAULT_NAME_KEY);
+        try {
+            return createProxiedFileSystem(CurrentUser.getUser(), new URI(nameNode), conf);
+        } catch (URISyntaxException e) {
+            throw new FalconException("Exception while getting FileSystem for: " + nameNode, e);
+        }
+    }
+
+    /**
+     * Return a FileSystem created with the provided proxyUser for the specified URI.
+     *
+     * @param proxyUser proxyUser
+     * @param uri  file system URI.
+     * @param conf Configuration with all necessary information to create the FileSystem.
+     * @return FileSystem created with the provided proxyUser/group.
+     * @throws org.apache.falcon.FalconException
+     *          if the filesystem could not be created.
+     */
+    public FileSystem createProxiedFileSystem(String proxyUser, final URI uri, final Configuration conf)
+        throws FalconException {
+        Validate.notEmpty(proxyUser, "proxyUser cannot be null");
+
+        try {
+            UserGroupInformation proxyUgi = SecurityUtil.getProxyUser(proxyUser);
+            return createFileSystem(proxyUgi, uri, conf);
+        } catch (IOException ex) {
+            throw new FalconException("Exception while getting FileSystem: " + ex.getMessage(), ex);
+        }
+    }
+
+    /**
+     * Return a FileSystem created with the provided user for the specified URI.
+     *
+     * @param ugi user group information
+     * @param uri  file system URI.
+     * @param conf Configuration with all necessary information to create the FileSystem.
+     * @return FileSystem created with the provided user/group.
+     * @throws org.apache.falcon.FalconException
+     *          if the filesystem could not be created.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public FileSystem createFileSystem(UserGroupInformation ugi, final URI uri, final Configuration conf)
+        throws FalconException {
+        Validate.notNull(ugi, "ugi cannot be null");
+        Validate.notNull(conf, "configuration cannot be null");
+
+        String nameNode = uri.getAuthority();
+        if (nameNode == null) {
+            nameNode = conf.get(FS_DEFAULT_NAME_KEY);
+            if (nameNode != null) {
+                try {
+                    new URI(nameNode).getAuthority();
+                } catch (URISyntaxException ex) {
+                    throw new FalconException("Exception while getting FileSystem", ex);
+                }
+            }
+        }
+
+        try {
+            return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+                public FileSystem run() throws Exception {
+                    return FileSystem.get(uri, conf);
+                }
+            });
+        } catch (InterruptedException ex) {
+            throw new FalconException("Exception creating FileSystem:" + ex.getMessage(), ex);
+        } catch (IOException ex) {
+            throw new FalconException("Exception creating FileSystem:" + ex.getMessage(), ex);
+        }
+    }
+
+    /**
+     * This method validates if the execute url is able to reach the MR endpoint.
+     *
+     * @param executeUrl jt url or RM url
+     * @throws IOException
+     */
+    public static void validateJobClient(String executeUrl) throws IOException {
+        final JobConf jobConf = new JobConf();
+        jobConf.set(MR_JOB_TRACKER_KEY, executeUrl);
+        jobConf.set(YARN_RM_ADDRESS_KEY, executeUrl);
+
+        UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+        try {
+            JobClient jobClient = loginUser.doAs(new PrivilegedExceptionAction<JobClient>() {
+                public JobClient run() throws Exception {
+                    return new JobClient(jobConf);
+                }
+            });
+
+            jobClient.getClusterStatus().getMapTasks();
+        } catch (InterruptedException e) {
+            throw new IOException("Exception creating job client:" + e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/security/AuthenticationInitializationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/AuthenticationInitializationService.java b/common/src/main/java/org/apache/falcon/security/AuthenticationInitializationService.java
new file mode 100644
index 0000000..264d5b8
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/security/AuthenticationInitializationService.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.commons.lang.Validate;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.service.FalconService;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.util.Properties;
+
+
+/**
+ * Authentication Service at startup that initializes the authentication credentials
+ * based on authentication type. If Kerberos is enabled, it logs in the user with the key tab.
+ */
+public class AuthenticationInitializationService implements FalconService {
+
+    private static final Logger LOG = Logger.getLogger(AuthenticationInitializationService.class);
+
+    /**
+     * Constant for the configuration property that indicates the prefix.
+     */
+    protected static final String CONFIG_PREFIX = "falcon.service.authentication.";
+
+    /**
+     * Constant for the configuration property that indicates the keytab file path.
+     */
+    protected static final String KERBEROS_KEYTAB = CONFIG_PREFIX + KerberosAuthenticationHandler.KEYTAB;
+    /**
+     * Constant for the configuration property that indicates the kerberos principal.
+     */
+    protected static final String KERBEROS_PRINCIPAL = CONFIG_PREFIX + KerberosAuthenticationHandler.PRINCIPAL;
+
+
+    @Override
+    public String getName() {
+        return "Authentication initialization service";
+    }
+
+    @Override
+    public void init() throws FalconException {
+
+        if (SecurityUtil.isSecurityEnabled()) {
+            LOG.info("Falcon Kerberos Authentication Enabled!");
+            initializeKerberos();
+        } else {
+            LOG.info("Falcon Simple Authentication Enabled!");
+            Configuration ugiConf = new Configuration();
+            ugiConf.set("hadoop.security.authentication", "simple");
+            UserGroupInformation.setConfiguration(ugiConf);
+        }
+    }
+
+    protected void initializeKerberos() throws FalconException {
+        try {
+            Properties configuration = StartupProperties.get();
+            String principal = configuration.getProperty(KERBEROS_PRINCIPAL);
+            Validate.notEmpty(principal,
+                    "Missing required configuration property: " + KERBEROS_PRINCIPAL);
+            principal = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(
+                    principal, SecurityUtil.getLocalHostName());
+
+            String keytabFilePath = configuration.getProperty(KERBEROS_KEYTAB);
+            Validate.notEmpty(keytabFilePath,
+                    "Missing required configuration property: " + KERBEROS_KEYTAB);
+            checkIsReadable(keytabFilePath);
+
+            Configuration conf = new Configuration();
+            conf.set("hadoop.security.authentication", "kerberos");
+
+            UserGroupInformation.setConfiguration(conf);
+            UserGroupInformation.loginUserFromKeytab(principal, keytabFilePath);
+
+            LOG.info("Got Kerberos ticket, keytab: " + keytabFilePath
+                    + ", Falcon principal principal: " + principal);
+        } catch (Exception ex) {
+            throw new FalconException("Could not initialize " + getName()
+                    + ": " + ex.getMessage(), ex);
+        }
+    }
+
+    private static void checkIsReadable(String keytabFilePath) {
+        File keytabFile = new File(keytabFilePath);
+        if (!keytabFile.exists()) {
+            throw new IllegalArgumentException("The keytab file does not exist! " + keytabFilePath);
+        }
+
+        if (!keytabFile.isFile()) {
+            throw new IllegalArgumentException("The keytab file cannot be a directory! " + keytabFilePath);
+        }
+
+        if (!keytabFile.canRead()) {
+            throw new IllegalArgumentException("The keytab file is not readable! " + keytabFilePath);
+        }
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/security/CurrentUser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
index 4d2299e..cd7d0b0 100644
--- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java
+++ b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
@@ -37,8 +37,7 @@ public final class CurrentUser {
         return INSTANCE;
     }
 
-    private final ThreadLocal<Subject> currentSubject =
-            new ThreadLocal<Subject>();
+    private final ThreadLocal<Subject> currentSubject = new ThreadLocal<Subject>();
 
     public static void authenticate(String user) {
         if (user == null || user.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java b/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java
deleted file mode 100644
index d95e147..0000000
--- a/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-
-import com.sun.security.auth.UnixPrincipal;
-import org.apache.log4j.Logger;
-
-import javax.security.auth.Subject;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.login.LoginException;
-import javax.security.auth.spi.LoginModule;
-import java.security.Principal;
-import java.util.Map;
-
-/**
- * Falcon JAAS login module.
- */
-public class FalconLoginModule implements LoginModule {
-    private static final Logger LOG = Logger.getLogger(FalconLoginModule.class);
-
-    private Subject subject;
-
-    public Subject getSubject() {
-        return subject;
-    }
-
-    @Override
-    public boolean abort() throws LoginException {
-        return true;
-    }
-
-    private <T extends Principal> T getCanonicalUser(Class<T> cls) {
-        for (T user : subject.getPrincipals(cls)) {
-            return user;
-        }
-        return null;
-    }
-
-    @Override
-    public boolean commit() throws LoginException {
-        if (!subject.getPrincipals(SecurityConstants.OS_PRINCIPAL_CLASS).
-                isEmpty()) {
-            return true;
-        }
-
-        Principal user = getCanonicalUser(SecurityConstants.OS_PRINCIPAL_CLASS);
-        if (user != null) {
-            subject.getPrincipals().add(new UnixPrincipal(user.getName()));
-            return true;
-        }
-        LOG.error("No such user " + subject);
-        throw new LoginException("No such user " + subject);
-    }
-
-    //SUSPEND CHECKSTYLE CHECK HiddenFieldCheck
-    @Override
-    public void initialize(Subject subject, CallbackHandler callbackHandler,
-                           Map<String, ?> sharedState, Map<String, ?> options) {
-        this.subject = subject;
-    }
-    //RESUME CHECKSTYLE CHECK HiddenFieldCheck
-
-    @Override
-    public boolean login() throws LoginException {
-        return true;
-    }
-
-    @Override
-    public boolean logout() throws LoginException {
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java b/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java
deleted file mode 100644
index b80ab6d..0000000
--- a/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.Configuration;
-import java.util.HashMap;
-
-/**
- * Falcon JAAS security configuration.
- */
-public class FalconSecurityConfiguration extends Configuration {
-
-    private static final AppConfigurationEntry OS_SPECIFIC_LOGIN =
-            new AppConfigurationEntry(SecurityConstants.OS_LOGIN_MODULE_NAME,
-                    AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
-                    new HashMap<String, String>());
-
-    private static final AppConfigurationEntry[] SIMPLE_CONF =
-            new AppConfigurationEntry[]{OS_SPECIFIC_LOGIN};
-
-    private final Configuration parent;
-
-    public FalconSecurityConfiguration(Configuration parent) {
-        this.parent = parent;
-    }
-
-    @Override
-    public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
-        if (parent == null || appName.equals(SecurityConstants.FALCON_LOGIN)) {
-            return SIMPLE_CONF.clone();
-        } else {
-            return parent.getAppConfigurationEntry(appName);
-        }
-    }
-}