You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/02/16 04:31:25 UTC

[4/5] FALCON-11 Add support for security in Falcon. Contributed by Venkatesh Seetharam

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/security/SecurityConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/SecurityConstants.java b/common/src/main/java/org/apache/falcon/security/SecurityConstants.java
deleted file mode 100644
index 8f7ba4a..0000000
--- a/common/src/main/java/org/apache/falcon/security/SecurityConstants.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import com.sun.security.auth.UnixPrincipal;
-
-import java.security.Principal;
-
-/**
- * Constants for the security module.
- */
-public final class SecurityConstants {
-
-    private SecurityConstants() {}
-
-    public static final String OS_LOGIN_MODULE_NAME =
-            "com.sun.security.auth.module.UnixLoginModule";
-    public static final Class<? extends Principal> OS_PRINCIPAL_CLASS =
-            UnixPrincipal.class;
-
-    public static final String FALCON_LOGIN = "FALCON_DEFAULT_LOGIN";
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
new file mode 100644
index 0000000..f78043f
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
+import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Security Util - bunch of security related helper methods.
+ * Also doles out proxied UserGroupInformation. Caches proxied users.
+ */
+public final class SecurityUtil {
+
+    /**
+     * Constant for the configuration property that indicates the prefix.
+     */
+    private static final String CONFIG_PREFIX = "falcon.authentication.";
+
+    /**
+     * Constant for the configuration property that indicates the authentication type.
+     */
+    public static final String AUTHENTICATION_TYPE = CONFIG_PREFIX + "type";
+
+    /**
+     * Constant for the configuration property that indicates the Name node principal.
+     */
+    public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal";
+
+    /**
+     * Constant for the configuration property that indicates the Name node principal.
+     * This is used to talk to Hive Meta Store during parsing and validations only.
+     */
+    public static final String HIVE_METASTORE_PRINCIPAL = "hive.metastore.kerberos.principal";
+
+
+    private static ConcurrentMap<String, UserGroupInformation> userUgiMap =
+            new ConcurrentHashMap<String, UserGroupInformation>();
+
+    private SecurityUtil() {
+    }
+
+    public static String getAuthenticationType() {
+        return StartupProperties.get().getProperty(
+                AUTHENTICATION_TYPE, PseudoAuthenticationHandler.TYPE);
+    }
+
+    public static boolean isSecurityEnabled() {
+        String authenticationType = StartupProperties.get().getProperty(
+                AUTHENTICATION_TYPE, PseudoAuthenticationHandler.TYPE);
+
+        final boolean useKerberos;
+        if (authenticationType == null || PseudoAuthenticationHandler.TYPE.equals(authenticationType)) {
+            useKerberos = false;
+        } else if (KerberosAuthenticationHandler.TYPE.equals(authenticationType)) {
+            useKerberos = true;
+        } else {
+            throw new IllegalArgumentException("Invalid attribute value for "
+                    + AUTHENTICATION_TYPE + " of " + authenticationType);
+        }
+
+        return useKerberos;
+    }
+
+    public static UserGroupInformation getProxyUser(String proxyUser) throws IOException {
+        UserGroupInformation proxyUgi = userUgiMap.get(proxyUser);
+        if (proxyUgi == null) {
+            // taking care of a race condition, the latest UGI will be discarded
+            proxyUgi = UserGroupInformation.createProxyUser(proxyUser, UserGroupInformation.getLoginUser());
+            userUgiMap.putIfAbsent(proxyUser, proxyUgi);
+        }
+
+        return proxyUgi;
+    }
+
+    public static String getLocalHostName() throws UnknownHostException {
+        return InetAddress.getLocalHost().getCanonicalHostName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index 7ed4394..4580bad 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -33,6 +33,7 @@ import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Inputs;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -112,7 +113,8 @@ public final class UpdateHelper {
             }
 
             Path checksum = new Path(bundlePath, EntityUtil.PROCESS_CHECKSUM_FILE);
-            FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(clusterEntity));
+            Configuration conf = ClusterHelper.getConfiguration(clusterEntity);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
             if (!fs.exists(checksum)) {
                 //Update if there is no checksum file(for migration)
                 return true;
@@ -126,11 +128,7 @@ public final class UpdateHelper {
             }
 
             //Update if the user wf/lib is updated i.e., if checksums are different
-            if (!wfPaths.equals(checksums)) {
-                return true;
-            }
-
-            return false;
+            return !wfPaths.equals(checksums);
         } catch (IOException e) {
             throw new FalconException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 3014418..51b3364 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -28,8 +28,9 @@
 *.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager
 *.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService
 
-*.application.services=org.apache.falcon.entity.store.ConfigurationStore,\
+*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
                         org.apache.falcon.service.ProcessSubscriberService,\
+                        org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.rerun.service.RetryService,\
 						org.apache.falcon.rerun.service.LateRunService,\
 						org.apache.falcon.service.SLAMonitoringService,\
@@ -43,8 +44,13 @@
 
 ######### Implementation classes #########
 
+
+######### System startup parameters #########
+
+# Location of libraries that is shipped to Hadoop
 *.system.lib.location=${FALCON_HOME}/sharedlibs
 
+# Location to store user entity configurations
 debug.config.store.uri=file://${user.dir}/target/store
 debug.config.oozie.conf.uri=${user.dir}/target/oozie
 debug.system.lib.location=${system.lib.location}
@@ -56,8 +62,12 @@ debug.libext.process.paths=${falcon.libext}
 
 *.falcon.cleanup.service.frequency=days(1)
 
+
+######### Properties for configuring JMS provider - activemq #########
+# Default Active MQ url
 *.broker.url=tcp://localhost:61616
-#default time-to-live for a JMS message 3 days (time in minutes)
+
+# default time-to-live for a JMS message 3 days (time in minutes)
 *.broker.ttlInMins=4320
 *.entity.topic=FALCON.ENTITY.TOPIC
 *.max.retry.failure.count=1
@@ -65,3 +75,53 @@ debug.libext.process.paths=${falcon.libext}
 
 ######### Properties for configuring iMon client and metric #########
 *.internal.queue.size=1000
+
+
+######### Authentication Properties #########
+
+# Authentication type must be specified: simple|kerberos
+*.falcon.authentication.type=simple
+
+##### Service Configuration
+
+# Indicates the Kerberos principal to be used in Falcon Service.
+*.falcon.service.authentication.kerberos.principal=
+
+# Location of the keytab file with the credentials for the Service principal.
+*.falcon.service.authentication.kerberos.keytab=
+
+# name node principal to talk to config store
+*.dfs.namenode.kerberos.principal=
+
+##### SPNEGO Configuration
+
+# Authentication type must be specified: simple|kerberos|<class>
+# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility
+*.falcon.http.authentication.type=simple
+
+# Indicates how long (in seconds) an authentication token is valid before it has to be renewed.
+*.falcon.http.authentication.token.validity=36000
+
+# The signature secret for signing the authentication tokens.
+*.falcon.http.authentication.signature.secret=falcon
+
+# The domain to use for the HTTP cookie that stores the authentication token.
+*.falcon.http.authentication.cookie.domain=
+
+# Indicates if anonymous requests are allowed when using 'simple' authentication.
+*.falcon.http.authentication.simple.anonymous.allowed=false
+
+# Indicates the Kerberos principal to be used for HTTP endpoint.
+# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
+*.falcon.http.authentication.kerberos.principal=
+
+# Location of the keytab file with the credentials for the HTTP principal.
+*.falcon.http.authentication.kerberos.keytab=
+
+# The kerberos names rules is to resolve kerberos principal names, refer to Hadoop's KerberosName for more details.
+*.falcon.http.authentication.kerberos.name.rules=DEFAULT
+
+# Comma separated list of black listed users
+*.falcon.http.authentication.blacklisted.users=
+
+######### Authentication Properties #########

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index 7668c7f..6ec5b41 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -25,10 +25,10 @@ import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interface;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -71,6 +71,8 @@ public class AbstractTestBase {
                 listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", ""));
         store = ConfigurationStore.get();
         store.init();
+
+        CurrentUser.authenticate("falcon");
     }
 
     protected void cleanupStore() throws FalconException {
@@ -136,13 +138,4 @@ public class AbstractTestBase {
         marshaller.marshal(entity, stringWriter);
         return stringWriter.toString();
     }
-
-    private Interface newInterface(Interfacetype type, String endPoint,
-                                   String version) {
-        Interface iface = new Interface();
-        iface.setType(type);
-        iface.setEndpoint(endPoint);
-        iface.setVersion(version);
-        return iface;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index 7b48d2b..1d56a78 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -162,18 +162,6 @@ public class FileSystemStorageTest {
     }
 
     @Test
-    public void testExists() throws Exception {
-        final Location location = new Location();
-        location.setPath("/foo/bar");
-        location.setType(LocationType.DATA);
-        List<Location> locations = new ArrayList<Location>();
-        locations.add(location);
-
-        FileSystemStorage storage = new FileSystemStorage("jail://global:00", locations);
-        Assert.assertTrue(storage.exists());
-    }
-
-    @Test
     public void testIsIdentical() throws Exception {
         final String storageUrl = "jail://global:00";
         final Location location1 = new Location();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java b/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
new file mode 100644
index 0000000..3b4e7f0
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hadoop;
+
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+
+/**
+ * Unit tests for HadoopClientFactory that doles out FileSystem handles.
+ */
+public class HadoopClientFactoryTest {
+
+    private EmbeddedCluster embeddedCluster;
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        embeddedCluster = EmbeddedCluster.newCluster(getClass().getSimpleName());
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        if (embeddedCluster != null) {
+            embeddedCluster.shutdown();
+        }
+    }
+
+    @Test
+    public void testGet() throws Exception {
+        HadoopClientFactory clientFactory = HadoopClientFactory.get();
+        Assert.assertNotNull(clientFactory);
+    }
+
+    @Test (enabled = false) // todo: cheated the conf to impersonate as same user
+    public void testCreateFileSystemWithSameUser() {
+        String user = System.getProperty("user.name");
+        try {
+            Configuration conf = embeddedCluster.getConf();
+            URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
+            Assert.assertNotNull(uri);
+            HadoopClientFactory.get().createProxiedFileSystem(user, uri, conf);
+            Assert.fail("Impersonation should have failed.");
+        } catch (Exception e) {
+            Assert.assertEquals(e.getCause().getClass(), RemoteException.class);
+        }
+    }
+
+    @Test
+    public void testCreateFileSystem() throws Exception {
+        Configuration conf = embeddedCluster.getConf();
+
+        UserGroupInformation.setConfiguration(conf);
+        UserGroupInformation realUser = UserGroupInformation.createUserForTesting(
+                "testuser", new String[]{"testgroup"});
+        UserGroupInformation.createProxyUserForTesting("proxyuser", realUser, new String[]{"proxygroup"});
+
+        URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
+        Assert.assertNotNull(uri);
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem("testuser", uri, conf);
+        Assert.assertNotNull(fs);
+    }
+
+    @Test
+    public void testCreateFileSystemWithUser() throws Exception {
+        Configuration conf = embeddedCluster.getConf();
+
+        UserGroupInformation realUser = UserGroupInformation.createUserForTesting(
+                "testuser", new String[]{"testgroup"});
+        UserGroupInformation.createProxyUserForTesting("proxyuser", realUser, new String[]{"proxygroup"});
+        UserGroupInformation.setConfiguration(conf);
+
+        URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
+        Assert.assertNotNull(uri);
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem("seetharam", uri, conf);
+        Assert.assertNotNull(fs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/test/java/org/apache/falcon/security/AuthenticationInitializationServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/AuthenticationInitializationServiceTest.java b/common/src/test/java/org/apache/falcon/security/AuthenticationInitializationServiceTest.java
new file mode 100644
index 0000000..9b76a61
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/security/AuthenticationInitializationServiceTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
+import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+
+
+/**
+ * Unit test for AuthenticationInitializationService that employs mocks.
+ */
+public class AuthenticationInitializationServiceTest {
+
+    private AuthenticationInitializationService authenticationService;
+
+    @Mock
+    private UserGroupInformation mockLoginUser;
+
+    @BeforeClass
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        authenticationService = new AuthenticationInitializationService();
+    }
+
+    @Test
+    public void testGetName() {
+        Assert.assertEquals("Authentication initialization service",
+                authenticationService.getName());
+    }
+
+    @Test
+    public void testInitForSimpleAuthenticationMethod() {
+        try {
+            StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE,
+                    PseudoAuthenticationHandler.TYPE);
+            authenticationService.init();
+
+            UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+            Assert.assertFalse(loginUser.isFromKeytab());
+            Assert.assertEquals(loginUser.getAuthenticationMethod().name().toLowerCase(),
+                    PseudoAuthenticationHandler.TYPE);
+            Assert.assertEquals(System.getProperty("user.name"), loginUser.getUserName());
+        } catch (Exception e) {
+            Assert.fail("AuthenticationInitializationService init failed.", e);
+        }
+    }
+
+    @Test
+    public void testKerberosAuthenticationWithKeytabFileDoesNotExist() {
+        try {
+            StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE,
+                    KerberosAuthenticationHandler.TYPE);
+            StartupProperties.get().setProperty(AuthenticationInitializationService.KERBEROS_KEYTAB, "/blah/blah");
+            authenticationService.init();
+            Assert.fail("The keytab file does not exist! must have been thrown.");
+        } catch (Exception e) {
+            Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class);
+        }
+    }
+
+    @Test
+    public void testKerberosAuthenticationWithKeytabFileIsADirectory() {
+        try {
+            StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE,
+                    KerberosAuthenticationHandler.TYPE);
+            StartupProperties.get().setProperty(AuthenticationInitializationService.KERBEROS_KEYTAB, "/tmp/");
+            authenticationService.init();
+            Assert.fail("The keytab file cannot be a directory! must have been thrown.");
+        } catch (Exception e) {
+            Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class);
+        }
+    }
+
+    @Test
+    public void testKerberosAuthenticationWithKeytabFileNotReadable() {
+        File tempFile = new File(".keytabFile");
+        try {
+            assert tempFile.createNewFile();
+            assert tempFile.setReadable(false);
+
+            StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE,
+                    KerberosAuthenticationHandler.TYPE);
+            StartupProperties.get().setProperty(
+                    AuthenticationInitializationService.KERBEROS_KEYTAB, tempFile.toString());
+            authenticationService.init();
+            Assert.fail("The keytab file is not readable! must have been thrown.");
+        } catch (Exception e) {
+            Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class);
+        } finally {
+            assert tempFile.delete();
+        }
+    }
+
+    @Test (enabled = false)
+    public void testInitForKerberosAuthenticationMethod() throws FalconException {
+        Mockito.when(mockLoginUser.getAuthenticationMethod())
+                .thenReturn(UserGroupInformation.AuthenticationMethod.KERBEROS);
+        Mockito.when(mockLoginUser.getUserName()).thenReturn("falcon");
+        Mockito.when(mockLoginUser.isFromKeytab()).thenReturn(Boolean.TRUE);
+
+        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE,
+                KerberosAuthenticationHandler.TYPE);
+        StartupProperties.get().setProperty(
+                AuthenticationInitializationService.KERBEROS_KEYTAB, "falcon.kerberos.keytab");
+        StartupProperties.get().setProperty(AuthenticationInitializationService.KERBEROS_PRINCIPAL, "falcon");
+
+        authenticationService.init();
+
+        Assert.assertTrue(mockLoginUser.isFromKeytab());
+        Assert.assertEquals(mockLoginUser.getAuthenticationMethod().name(),
+                KerberosAuthenticationHandler.TYPE);
+        Assert.assertEquals("falcon", mockLoginUser.getUserName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java b/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java
new file mode 100644
index 0000000..630aa4b
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test for Security utils.
+ */
+public class SecurityUtilTest {
+
+    @Test
+    public void testDefaultGetAuthenticationType() throws Exception {
+        Assert.assertEquals(SecurityUtil.getAuthenticationType(), "simple");
+    }
+
+    @Test
+    public void testGetAuthenticationType() throws Exception {
+        try {
+            StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, "kerberos");
+            Assert.assertEquals(SecurityUtil.getAuthenticationType(), "kerberos");
+        } finally {
+            // reset
+            StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, "simple");
+        }
+    }
+
+    @Test
+    public void testIsSecurityEnabledByDefault() throws Exception {
+        Assert.assertFalse(SecurityUtil.isSecurityEnabled());
+    }
+
+    @Test
+    public void testIsSecurityEnabled() throws Exception {
+        try {
+            StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, "kerberos");
+            Assert.assertTrue(SecurityUtil.isSecurityEnabled());
+        } finally {
+            // reset
+            StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, "simple");
+        }
+    }
+
+    @Test
+    public void testGetProxyUser() throws Exception {
+        UserGroupInformation proxyUgi = SecurityUtil.getProxyUser("proxy");
+        Assert.assertNotNull(proxyUgi);
+        Assert.assertEquals(proxyUgi.getUserName(), "proxy");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/Security.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Security.twiki b/docs/src/site/twiki/Security.twiki
new file mode 100644
index 0000000..c1f7656
--- /dev/null
+++ b/docs/src/site/twiki/Security.twiki
@@ -0,0 +1,193 @@
+---+ Securing Falcon
+
+---++ Overview
+
+Apache Falcon enforces authentication on protected resources. Once authentication has been established it sets a
+signed HTTP Cookie that contains an authentication token with the user name, user principal,
+authentication type and expiration time.
+
+It does so by using [[http://hadoop.apache .org/docs/current/hadoop-auth/index.html][Hadoop Auth]].
+Hadoop Auth is a Java library consisting of a client and a server components to enable Kerberos SPNEGO authentication
+for HTTP. Hadoop Auth also supports additional authentication mechanisms on the client and the server side via 2
+simple interfaces.
+
+
+---++ Authentication Methods
+
+It supports 2 authentication methods, simple and kerberos out of the box.
+
+---+++ Pseudo/Simple Authentication
+
+Falcon authenticates the user by simply trusting the value of the query string parameter 'user.name'. This is the
+default mode Falcon is configured with.
+
+---+++ Kerberos Authentication
+
+Falcon uses HTTP Kerberos SPNEGO to authenticate the user.
+
+---++ Server Side Configuration Setup
+
+---+++ Common Configuration Parameters
+
+<verbatim>
+# Authentication type must be specified: simple|kerberos
+*.falcon.authentication.type=kerberos
+</verbatim>
+
+---+++ Kerberos Configuration
+
+<verbatim>
+##### Service Configuration
+
+# Indicates the Kerberos principal to be used in Falcon Service.
+*.falcon.service.authentication.kerberos.principal=falcon/_HOST@EXAMPLE.COM
+
+# Location of the keytab file with the credentials for the Service principal.
+*.falcon.service.authentication.kerberos.keytab=/etc/security/keytabs/falcon.service.keytab
+
+# name node principal to talk to config store
+*.dfs.namenode.kerberos.principal=nn/_HOST@EXAMPLE.COM
+
+##### SPNEGO Configuration
+
+# Authentication type must be specified: simple|kerberos|<class>
+# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility
+*.falcon.http.authentication.type=kerberos
+
+# Indicates how long (in seconds) an authentication token is valid before it has to be renewed.
+*.falcon.http.authentication.token.validity=36000
+
+# The signature secret for signing the authentication tokens.
+*.falcon.http.authentication.signature.secret=falcon
+
+# The domain to use for the HTTP cookie that stores the authentication token.
+*.falcon.http.authentication.cookie.domain=
+
+# Indicates if anonymous requests are allowed when using 'simple' authentication.
+*.falcon.http.authentication.simple.anonymous.allowed=true
+
+# Indicates the Kerberos principal to be used for HTTP endpoint.
+# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
+*.falcon.http.authentication.kerberos.principal=HTTP/_HOST@EXAMPLE.COM
+
+# Location of the keytab file with the credentials for the HTTP principal.
+*.falcon.http.authentication.kerberos.keytab=/etc/security/keytabs/spnego.service.keytab
+
+# The kerberos names rules is to resolve kerberos principal names, refer to Hadoop's KerberosName for more details.
+*.falcon.http.authentication.kerberos.name.rules=DEFAULT
+
+# Comma separated list of black listed users
+*.falcon.http.authentication.blacklisted.users=
+</verbatim>
+
+---+++ Pseudo/Simple Configuration
+
+<verbatim>
+##### SPNEGO Configuration
+
+# Authentication type must be specified: simple|kerberos|<class>
+# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility
+*.falcon.http.authentication.type=simple
+
+# Indicates how long (in seconds) an authentication token is valid before it has to be renewed.
+*.falcon.http.authentication.token.validity=36000
+
+# The signature secret for signing the authentication tokens.
+*.falcon.http.authentication.signature.secret=falcon
+
+# The domain to use for the HTTP cookie that stores the authentication token.
+*.falcon.http.authentication.cookie.domain=
+
+# Indicates if anonymous requests are allowed when using 'simple' authentication.
+*.falcon.http.authentication.simple.anonymous.allowed=true
+
+# Comma separated list of black listed users
+*.falcon.http.authentication.blacklisted.users=
+</verbatim>
+
+---+++ SSL Configuration
+
+<verbatim>
+*.falcon.enableTLS=true
+*.keystore.file=/path/to/keystore/file
+*.keystore.password=password
+</verbatim>
+
+---+++ Distributed Falcon Setup
+
+Falcon should be configured to communicate with Prism over TLS in secure mode. Its not enabled by default.
+
+
+---++ Changes to ownership and permissions of directories managed by Falcon
+
+| *Directory*             | *Location*                                                        | *Owner* | *Permissions* |
+| Configuration Store     | ${config.store.uri}                                               | falcon  | 750           |
+| Oozie coord/bundle XMLs | ${cluster.staging-location}/workflows/{entity}/{entity-name}      | falcon  | 644           |
+| Shared libs             | {cluster.working}/{lib,libext}                                    | falcon  | 755           |
+| App logs                | ${cluster.staging-location}/workflows/{entity}/{entity-name}/logs | falcon  | 777           |
+
+
+---++ Backwards compatibility
+
+---+++ Scheduled Entities
+
+Entities already scheduled with an earlier version of Falcon are not compatible with this version
+
+---+++ Falcon Clients
+
+Older Falcon clients are backwards compatible wrt Authentication and user information sent as part of the HTTP
+header, Remote-User is still honoured when the authentication type is configured as below:
+
+<verbatim>
+*.falcon.http.authentication.type=org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler
+</verbatim>
+
+---+++ Blacklisted super users for authentication
+
+The blacklist users used to have the following super users: hdfs, mapreduce, oozie, and falcon.
+The list is externalized from code into Startup.properties file and is empty now and needs to be
+configured specifically in the file.
+
+
+---+++ Falcon Dashboard
+
+The dashboard assumes an anonymous user in Pseudo/Simple method and hence anonymous users must be enabled for it to
+work.
+<verbatim>
+# Indicates if anonymous requests are allowed when using 'simple' authentication.
+*.falcon.http.authentication.simple.anonymous.allowed=true
+</verbatim>
+
+In Kerberos method, the browser must support HTTP Kerberos SPNEGO.
+
+
+---++ Known Limitations
+
+   * ActiveMQ topics are not secure but will be in the near future
+   * Entities already scheduled with an earlier version of Falcon are not compatible with this version as new
+   workflow parameters are being passed back into Falcon such as the user are required
+   * Use of hftp as the scheme for read only interface in cluster entity [[https://issues.apache.org/jira/browse/HADOOP-10215][will not work in Oozie]]
+   The alternative is to use webhdfs scheme instead and its been tested with DistCp.
+
+
+---++ Examples
+
+---+++ Accessing the server using Falcon CLI (Java client)
+
+There is no change in the way the CLI is used. The CLI has been changed to work with the configured authentication
+method.
+
+---+++ Accessing the server using curl
+
+Try accessing protected resources using curl. The protected resources are:
+
+<verbatim>
+$ kinit
+Please enter the password for venkatesh@LOCALHOST:
+
+$ curl http://localhost:15000/api/admin/version
+
+$ curl http://localhost:15000/api/admin/version?user.name=venkatesh
+
+$ curl --negotiate -u foo -b ~/cookiejar.txt -c ~/cookiejar.txt curl http://localhost:15000/api/admin/version
+</verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/index.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/index.twiki b/docs/src/site/twiki/index.twiki
index 81c4c3e..ee48fbb 100644
--- a/docs/src/site/twiki/index.twiki
+++ b/docs/src/site/twiki/index.twiki
@@ -30,7 +30,8 @@ describes steps to on-board a pipeline to Falcon. It also gives a sample pipelin
 describes various options for the command line utility provided by Falcon.
 
 Falcon provides OOTB [[HiveIntegration][lifecycle management for Tables in Hive (HCatalog)]]
-such as table replication for BCP and table eviction.
+such as table replication for BCP and table eviction. Falcon also enforces
+[[Security][kerberos authentication]] on protected resources and enables SSL.
 
 #LicenseInfo
 ---+ Licensing Information

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/AdminConfig.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/AdminConfig.twiki b/docs/src/site/twiki/restapi/AdminConfig.twiki
index 2841b25..675b19e 100644
--- a/docs/src/site/twiki/restapi/AdminConfig.twiki
+++ b/docs/src/site/twiki/restapi/AdminConfig.twiki
@@ -17,7 +17,6 @@ Configuration information of the server.
 ---+++ Rest Call
 <verbatim>
 GET http://localhost:15000/api/admin/config/deploy
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/AdminStack.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/AdminStack.twiki b/docs/src/site/twiki/restapi/AdminStack.twiki
index a241999..79dbd9b 100644
--- a/docs/src/site/twiki/restapi/AdminStack.twiki
+++ b/docs/src/site/twiki/restapi/AdminStack.twiki
@@ -16,7 +16,6 @@ Stack trace of the server.
 ---+++ Rest Call
 <verbatim>
 GET http://localhost:15000/api/admin/stack
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/AdminVersion.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/AdminVersion.twiki b/docs/src/site/twiki/restapi/AdminVersion.twiki
index fbf1405..00b0283 100644
--- a/docs/src/site/twiki/restapi/AdminVersion.twiki
+++ b/docs/src/site/twiki/restapi/AdminVersion.twiki
@@ -16,7 +16,6 @@ Version of the server.
 ---+++ Rest Call
 <verbatim>
 GET http://localhost:15000/api/admin/version
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntityDefinition.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityDefinition.twiki b/docs/src/site/twiki/restapi/EntityDefinition.twiki
index 955be71..1f76a4f 100644
--- a/docs/src/site/twiki/restapi/EntityDefinition.twiki
+++ b/docs/src/site/twiki/restapi/EntityDefinition.twiki
@@ -18,7 +18,6 @@ Definition of the entity.
 ---+++ Rest Call
 <verbatim>
 GET http://localhost:15000/api/entities/definition/process/SampleProcess
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntityDelete.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityDelete.twiki b/docs/src/site/twiki/restapi/EntityDelete.twiki
index 7a7e22a..850b451 100644
--- a/docs/src/site/twiki/restapi/EntityDelete.twiki
+++ b/docs/src/site/twiki/restapi/EntityDelete.twiki
@@ -18,7 +18,6 @@ Results of the delete operation.
 ---+++ Rest Call
 <verbatim>
 DELETE http://localhost:15000/api/entities/delete/cluster/SampleProcess
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntityDependencies.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityDependencies.twiki b/docs/src/site/twiki/restapi/EntityDependencies.twiki
index 6daab68..c61bc43 100644
--- a/docs/src/site/twiki/restapi/EntityDependencies.twiki
+++ b/docs/src/site/twiki/restapi/EntityDependencies.twiki
@@ -18,7 +18,6 @@ Dependenciess of the entity.
 ---+++ Rest Call
 <verbatim>
 GET http://localhost:15000/api/entities/dependencies/process/SampleProcess
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntityList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityList.twiki b/docs/src/site/twiki/restapi/EntityList.twiki
index bca84b0..b9cf349 100644
--- a/docs/src/site/twiki/restapi/EntityList.twiki
+++ b/docs/src/site/twiki/restapi/EntityList.twiki
@@ -19,7 +19,6 @@ List of the entities.
 ---+++ Rest Call
 <verbatim>
 GET http://localhost:15000/api/entities/list/feed
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>
@@ -40,7 +39,6 @@ Remote-User: rgautam
 ---+++ Rest Call
 <verbatim>
 GET http://localhost:15000/api/entities/list/feed?fields=status
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntityResume.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityResume.twiki b/docs/src/site/twiki/restapi/EntityResume.twiki
index 223a83f..a2d5184 100644
--- a/docs/src/site/twiki/restapi/EntityResume.twiki
+++ b/docs/src/site/twiki/restapi/EntityResume.twiki
@@ -18,7 +18,6 @@ Result of the resume command.
 ---+++ Rest Call
 <verbatim>
 POST http://localhost:15000/api/entities/resume/process/SampleProcess
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntitySchedule.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntitySchedule.twiki b/docs/src/site/twiki/restapi/EntitySchedule.twiki
index e481613..dd97141 100644
--- a/docs/src/site/twiki/restapi/EntitySchedule.twiki
+++ b/docs/src/site/twiki/restapi/EntitySchedule.twiki
@@ -18,7 +18,6 @@ Result of the schedule command.
 ---+++ Rest Call
 <verbatim>
 POST http://localhost:15000/api/entities/schedule/process/SampleProcess
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntityStatus.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityStatus.twiki b/docs/src/site/twiki/restapi/EntityStatus.twiki
index f0e772b..34d166d 100644
--- a/docs/src/site/twiki/restapi/EntityStatus.twiki
+++ b/docs/src/site/twiki/restapi/EntityStatus.twiki
@@ -18,7 +18,6 @@ Status of the entity.
 ---+++ Rest Call
 <verbatim>
 GET http://localhost:15000/api/entities/status/process/SampleProcess
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntitySubmit.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntitySubmit.twiki b/docs/src/site/twiki/restapi/EntitySubmit.twiki
index e4b608e..925381c 100644
--- a/docs/src/site/twiki/restapi/EntitySubmit.twiki
+++ b/docs/src/site/twiki/restapi/EntitySubmit.twiki
@@ -17,7 +17,6 @@ Result of the submission.
 ---+++ Rest Call
 <verbatim>
 POST http://localhost:15000/api/entities/submit/feed
-Remote-User: rgautam
 <?xml version="1.0" encoding="UTF-8"?>
 <!-- Hourly sample input data -->
 
@@ -59,7 +58,6 @@ Remote-User: rgautam
 ---+++ Rest Call
 <verbatim>
 POST http://localhost:15000/api/entities/submit/process
-Remote-User: rgautam
 <?xml version="1.0" encoding="UTF-8"?>
 <!-- Daily sample process. Runs at 6th hour every day. Input - last day's hourly data. Generates output for yesterday -->
 <process xmlns="uri:falcon:process:0.1" name="SampleProcess" >
@@ -103,4 +101,4 @@ Remote-User: rgautam
     "message": "default\/Submit successful (process) SampleProcess\n",
     "status": "SUCCEEDED"
 }
-</verbatim>
\ No newline at end of file
+</verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntitySubmitAndSchedule.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntitySubmitAndSchedule.twiki b/docs/src/site/twiki/restapi/EntitySubmitAndSchedule.twiki
index fb3649d..042a5fb 100644
--- a/docs/src/site/twiki/restapi/EntitySubmitAndSchedule.twiki
+++ b/docs/src/site/twiki/restapi/EntitySubmitAndSchedule.twiki
@@ -17,7 +17,6 @@ Result of the submit and schedule command.
 ---+++ Rest Call
 <verbatim>
 POST http://localhost:15000/api/entities/submitAndSchedule/process
-Remote-User: rgautam
 <?xml version="1.0" encoding="UTF-8"?>
 <!-- Daily sample process. Runs at 6th hour every day. Input - last day's hourly data. Generates output for yesterday -->
 <process xmlns="uri:falcon:process:0.1" name="SampleProcess" >

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntitySuspend.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntitySuspend.twiki b/docs/src/site/twiki/restapi/EntitySuspend.twiki
index 9d6e9ab..9e5efca 100644
--- a/docs/src/site/twiki/restapi/EntitySuspend.twiki
+++ b/docs/src/site/twiki/restapi/EntitySuspend.twiki
@@ -18,7 +18,6 @@ Status of the entity.
 ---+++ Rest Call
 <verbatim>
 POST http://localhost:15000/api/entities/suspend/process/SampleProcess
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntityUpdate.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityUpdate.twiki b/docs/src/site/twiki/restapi/EntityUpdate.twiki
index 16ec439..f2c2e7e 100644
--- a/docs/src/site/twiki/restapi/EntityUpdate.twiki
+++ b/docs/src/site/twiki/restapi/EntityUpdate.twiki
@@ -19,7 +19,6 @@ Result of the validation.
 ---+++ Rest Call
 <verbatim>
 POST http://localhost:15000/api/entities/update/process/SampleProcess?effective=2014-01-01T00:00Z
-Remote-User: rgautam
 <?xml version="1.0" encoding="UTF-8"?>
 <!-- Daily sample process. Runs at 6th hour every day. Input - last day's hourly data. Generates output for yesterday -->
 <process xmlns="uri:falcon:process:0.1" name="SampleProcess" >

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntityValidate.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityValidate.twiki b/docs/src/site/twiki/restapi/EntityValidate.twiki
index bc0f508..89bd155 100644
--- a/docs/src/site/twiki/restapi/EntityValidate.twiki
+++ b/docs/src/site/twiki/restapi/EntityValidate.twiki
@@ -17,7 +17,6 @@ Result of the validation.
 ---+++ Rest Call
 <verbatim>
 POST http://localhost:15000/api/entities/validate/cluster
-Remote-User: rgautam
 <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
 <cluster xmlns="uri:falcon:cluster:0.1" name="primary-cluster" description="Primary Cluster" colo="west-coast">
     <interfaces>
@@ -46,7 +45,6 @@ Remote-User: rgautam
 ---+++ Rest Call
 <verbatim>
 POST http://localhost:15000/api/entities/validate/feed
-Remote-User: rgautam
 <?xml version="1.0" encoding="UTF-8"?>
 <!-- Hourly sample input data -->
 
@@ -88,7 +86,6 @@ Remote-User: rgautam
 ---+++ Rest Call
 <verbatim>
 POST http://localhost:15000/api/entities/validate/feed
-Remote-User: rgautam
 <?xml version="1.0" encoding="UTF-8"?>
 <!-- Daily sample output data -->
 
@@ -125,7 +122,6 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
 ---+++ Rest Call
 <verbatim>
 POST http://localhost:15000/api/entities/validate/process
-Remote-User: rgautam
 <?xml version="1.0" encoding="UTF-8"?>
 <!-- Daily sample process. Runs at 6th hour every day. Input - last day's hourly data. Generates output for yesterday -->
 <process xmlns="uri:falcon:process:0.1" name="SampleProcess" >
@@ -169,4 +165,4 @@ Remote-User: rgautam
     "message": "Validated successfully (PROCESS) SampleProcess",
     "status": "SUCCEEDED"
 }
-</verbatim>
\ No newline at end of file
+</verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/InstanceKill.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceKill.twiki b/docs/src/site/twiki/restapi/InstanceKill.twiki
index 5c429f6..eff2893 100644
--- a/docs/src/site/twiki/restapi/InstanceKill.twiki
+++ b/docs/src/site/twiki/restapi/InstanceKill.twiki
@@ -19,7 +19,6 @@ Result of the kill operation.
 ---+++ Rest Call
 <verbatim>
 POST http://localhost:15000/api/instance/kill/process/SampleProcess?colo=*&start=2012-04-03T07:00Z
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/InstanceLogs.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceLogs.twiki b/docs/src/site/twiki/restapi/InstanceLogs.twiki
index f84b828..599f2d7 100644
--- a/docs/src/site/twiki/restapi/InstanceLogs.twiki
+++ b/docs/src/site/twiki/restapi/InstanceLogs.twiki
@@ -20,7 +20,6 @@ Log of specified instance.
 ---+++ Rest Call
 <verbatim>
 GET http://localhost:15000/api/instance/logs/process/SampleProcess?colo=*&start=2012-04-03T07:00Z
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/InstanceRerun.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceRerun.twiki b/docs/src/site/twiki/restapi/InstanceRerun.twiki
index cf35475..77608e0 100644
--- a/docs/src/site/twiki/restapi/InstanceRerun.twiki
+++ b/docs/src/site/twiki/restapi/InstanceRerun.twiki
@@ -19,7 +19,6 @@ Results of the rerun command.
 ---+++ Rest Call
 <verbatim>
 POST http://localhost:15000/api/instance/rerun/process/SampleProcess?colo=*&start=2012-04-03T07:00Z
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/InstanceResume.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceResume.twiki b/docs/src/site/twiki/restapi/InstanceResume.twiki
index 2bdd6e1..2d29569 100644
--- a/docs/src/site/twiki/restapi/InstanceResume.twiki
+++ b/docs/src/site/twiki/restapi/InstanceResume.twiki
@@ -19,7 +19,6 @@ Results of the resume command.
 ---+++ Rest Call
 <verbatim>
 POST http://localhost:15000/api/instance/resume/process/SampleProcess?colo=*&start=2012-04-03T07:00Z
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/InstanceRunning.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceRunning.twiki b/docs/src/site/twiki/restapi/InstanceRunning.twiki
index 6b5ee66..116565f 100644
--- a/docs/src/site/twiki/restapi/InstanceRunning.twiki
+++ b/docs/src/site/twiki/restapi/InstanceRunning.twiki
@@ -18,7 +18,6 @@ List of instances currently running.
 ---+++ Rest Call
 <verbatim>
 GET http://localhost:15000/api/instance/running/process/SampleProcess?colo=*
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/InstanceStatus.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceStatus.twiki b/docs/src/site/twiki/restapi/InstanceStatus.twiki
index eddc2c8..99497d1 100644
--- a/docs/src/site/twiki/restapi/InstanceStatus.twiki
+++ b/docs/src/site/twiki/restapi/InstanceStatus.twiki
@@ -20,7 +20,6 @@ Status of the specified instance.
 ---+++ Rest Call
 <verbatim>
 GET http://localhost:15000/api/instance/status/process/SampleProcess?colo=*&start=2012-04-03T07:00Z
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/InstanceSuspend.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceSuspend.twiki b/docs/src/site/twiki/restapi/InstanceSuspend.twiki
index 62cf72b..8f5f7c3 100644
--- a/docs/src/site/twiki/restapi/InstanceSuspend.twiki
+++ b/docs/src/site/twiki/restapi/InstanceSuspend.twiki
@@ -19,7 +19,6 @@ Results of the suspend command.
 ---+++ Rest Call
 <verbatim>
 POST http://localhost:15000/api/instance/suspend/process/SampleProcess?colo=*&start=2012-04-03T07:00Z
-Remote-User: rgautam
 </verbatim>
 ---+++ Result
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/ResourceList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki
index b9ec4b6..6ca0dea 100644
--- a/docs/src/site/twiki/restapi/ResourceList.twiki
+++ b/docs/src/site/twiki/restapi/ResourceList.twiki
@@ -1,8 +1,32 @@
 ---+ RESTful Resources
+
+---++ Resource List
    * <a href="#REST_Call_on_Entity_Resource">REST Call on Entity Resource</a>
    * <a href="#REST_Call_on_Feed_and_Process_Instances">REST Call on Feed/Process Instances</a>
    * <a href="#REST_Call_on_Admin_Resource">REST Call on Admin Resource</a>
 
+---++ Authentication
+
+When security is off (Pseudo/Simple), the authenticated user is the username specified in the user.name query
+parameter. If the user.name parameter is not set, the server may either set the authenticated user to a default web
+user, if there is any, or return an error response.
+
+When security is on (kerberos), authentication is performed by Kerberos SPNEGO.
+
+Below are examples using the curl command tool.
+
+Authentication when security is off (Pseudo/Simple):
+<verbatim>
+curl -i "http://<HOST>:<PORT>/<PATH>?[user.name=<USER>&]<PARAM>=..."
+</verbatim>
+
+Authentication using Kerberos SPNEGO when security is on:
+<verbatim>
+curl -i --negotiate -u : "http://<HOST>:<PORT>/<PATH>?<PARAM>=..."
+</verbatim>
+
+See also: [[../Security.twiki][Security in Falcon]]
+
 ---++ REST Call on Admin Resource
 
 | *Call Type* | *Resource*                                     | *Description*                               |

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index 6ca2134..d6dee77 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -38,6 +38,7 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Property;
 import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
 import org.apache.falcon.messaging.EntityInstanceMessage.EntityOps;
 import org.apache.falcon.oozie.coordinator.ACTION;
@@ -45,6 +46,7 @@ import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.coordinator.WORKFLOW;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
@@ -426,7 +428,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
                     propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
                     propagateTableCopyProperties(srcCluster, sourceTableStorage,
                             trgCluster, targetTableStorage, props);
-                    setupHiveConfiguration(trgCluster, sourceTableStorage, targetTableStorage, wfPath);
+                    setupHiveConfiguration(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, wfPath);
                 }
 
                 propagateLateDataProperties(feed, instancePaths, sourceStorage.getType().name(), props);
@@ -479,9 +481,11 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
             props.put(prefix + "Partition", "${coord:dataInPartitionFilter('input', 'hive')}");
         }
 
-        private void setupHiveConfiguration(Cluster trgCluster, CatalogStorage sourceStorage,
-                                            CatalogStorage targetStorage, Path wfPath) throws IOException {
-            FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(trgCluster));
+        private void setupHiveConfiguration(Cluster srcCluster, CatalogStorage sourceStorage,
+                                            Cluster trgCluster, CatalogStorage targetStorage, Path wfPath)
+            throws IOException, FalconException {
+            Configuration conf = ClusterHelper.getConfiguration(trgCluster);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
 
             // copy import export scripts to stagingDir
             Path scriptPath = new Path(wfPath, "scripts");
@@ -490,8 +494,8 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
 
             // create hive conf to stagingDir
             Path confPath = new Path(wfPath + "/conf");
-            createHiveConf(fs, confPath, sourceStorage.getCatalogUrl(), "falcon-source-");
-            createHiveConf(fs, confPath, targetStorage.getCatalogUrl(), "falcon-target-");
+            createHiveConf(fs, confPath, sourceStorage.getCatalogUrl(), srcCluster, "falcon-source-");
+            createHiveConf(fs, confPath, targetStorage.getCatalogUrl(), trgCluster, "falcon-target-");
         }
 
         private void copyHiveScript(FileSystem fs, Path scriptPath,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/feed/src/main/resources/config/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml
index 91d0285..7a95c35 100644
--- a/feed/src/main/resources/config/workflow/replication-workflow.xml
+++ b/feed/src/main/resources/config/workflow/replication-workflow.xml
@@ -218,6 +218,8 @@
             <arg>${wf:id()}</arg>
             <arg>-logDir</arg>
             <arg>${logDir}/job-${nominalTime}/${srcClusterName}/</arg>
+            <arg>-workflowUser</arg>
+            <arg>${wf:user()}</arg>
             <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
             <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
             <file>${wf:conf("falcon.libpath")}/jms.jar</file>
@@ -283,6 +285,8 @@
             <arg>${wf:id()}</arg>
             <arg>-logDir</arg>
             <arg>${logDir}/job-${nominalTime}/${srcClusterName}/</arg>
+            <arg>-workflowUser</arg>
+            <arg>${wf:user()}</arg>
             <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
             <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
             <file>${wf:conf("falcon.libpath")}/jms.jar</file>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/feed/src/main/resources/config/workflow/retention-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/retention-workflow.xml b/feed/src/main/resources/config/workflow/retention-workflow.xml
index 8b444f5..08795b4 100644
--- a/feed/src/main/resources/config/workflow/retention-workflow.xml
+++ b/feed/src/main/resources/config/workflow/retention-workflow.xml
@@ -107,6 +107,8 @@
             <arg>${wf:conf("broker.ttlInMins")}</arg>
             <arg>-cluster</arg>
             <arg>${cluster}</arg>
+            <arg>-workflowUser</arg>
+            <arg>${wf:user()}</arg>
             <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
             <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
             <file>${wf:conf("falcon.libpath")}/jms.jar</file>
@@ -119,8 +121,8 @@
     </action>
 
     <kill name="fail">
-        <message>Workflow failed, error
-            message[${wf:errorMessage(wf:lastErrorNode())}]
+        <message>
+            Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
         </message>
     </kill>
     <end name='end'/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index a37755b..a153462 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -40,6 +40,7 @@ import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.DECISION;
 import org.apache.falcon.oozie.workflow.JAVA;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
@@ -82,6 +83,8 @@ public class OozieFeedMapperTest {
 
     @BeforeClass
     public void setUpDFS() throws Exception {
+        CurrentUser.authenticate("falcon");
+
         srcMiniDFS = EmbeddedCluster.newCluster("cluster1");
         String srcHdfsUrl = srcMiniDFS.getConf().get("fs.default.name");
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/hadoop-webapp/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-webapp/pom.xml b/hadoop-webapp/pom.xml
index e576310..846a43e 100644
--- a/hadoop-webapp/pom.xml
+++ b/hadoop-webapp/pom.xml
@@ -161,11 +161,6 @@
 
         <!-- Oozie dependencies -->
         <dependency>
-            <groupId>org.apache.hive</groupId>
-            <artifactId>hive-exec</artifactId>
-        </dependency>
-
-        <dependency>
             <groupId>org.apache.hcatalog</groupId>
             <artifactId>webhcat-java-client</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/html5-ui/js/falcon.js
----------------------------------------------------------------------
diff --git a/html5-ui/js/falcon.js b/html5-ui/js/falcon.js
index ff3a929..0a75f6a 100644
--- a/html5-ui/js/falcon.js
+++ b/html5-ui/js/falcon.js
@@ -17,21 +17,26 @@
 (function(exports) {
   "use strict";
 
-  var USER_ID = 'admin';
+  var USER_ID = 'dashboard';
 
   function onError (msg) {
     alert(msg);
   }
 
   function ajax_impl(options) {
-    $.extend(options, {'headers': { 'Remote-User': USER_ID }});
+    // $.extend(options, add_user(options.url));
     return $.ajax(options);
   }
 
+  function add_user(url) {
+    var paramSeparator = (url.indexOf('?') != -1) ? '&' : '?';
+    return url + paramSeparator + 'user.name=' + USER_ID;
+  }
+
   function getJson_impl(url, success) {
     return ajax_impl({
       'dataType': 'json',
-      'url': url,
+      'url': add_user(url),
       'success': success
     });
   }
@@ -39,7 +44,7 @@
   function getText_impl(url, success) {
     return ajax_impl({
       'dataType': 'text',
-      'url': url,
+      'url': add_user(url),
       'success': success
     });
   }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/messaging/pom.xml
----------------------------------------------------------------------
diff --git a/messaging/pom.xml b/messaging/pom.xml
index 9aa5347..aa5765c 100644
--- a/messaging/pom.xml
+++ b/messaging/pom.xml
@@ -82,6 +82,11 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-common</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
             <artifactId>falcon-test-util</artifactId>
             <scope>test</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
index eb49fd5..d8ba4f3 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
@@ -70,7 +70,8 @@ public class EntityInstanceMessage {
         topicName("topicName"),
         status("status"),
         brokerTTL("broker.ttlInMins"),
-        cluster("cluster");
+        cluster("cluster"),
+        workflowUser("workflowUser");
 
         private String propName;
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java
index ecda5eb..c8ea12d 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java
@@ -41,18 +41,16 @@ public class EntityInstanceMessageCreator {
 
     public Message createMessage(Session session) throws JMSException {
         mapMessage = session.createMapMessage();
-        for (Entry<ARG, String> entry : instanceMessage.getKeyValueMap()
-                .entrySet()) {
-            mapMessage.setString(entry.getKey().getArgName(), instanceMessage
-                    .getKeyValueMap().get(entry.getKey()));
+        for (Entry<ARG, String> entry : instanceMessage.getKeyValueMap().entrySet()) {
+            mapMessage.setString(entry.getKey().getArgName(),
+                    instanceMessage.getKeyValueMap().get(entry.getKey()));
         }
-        return mapMessage;
 
+        return mapMessage;
     }
 
     @Override
     public String toString() {
         return this.mapMessage.toString();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
index b37931c..0f9e918 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
@@ -65,7 +65,6 @@ public class MessageProducer extends Configured implements Tool {
         producer.setTimeToLive(messageTTL);
         producer.send(new EntityInstanceMessageCreator(entityInstanceMessage)
                 .createMessage(session));
-
     }
 
     public static void main(String[] args) throws Exception {
@@ -124,6 +123,8 @@ public class MessageProducer extends Configured implements Tool {
                 "workflow id"));
         addOption(options, new Option(ARG.cluster.getArgName(), true,
                 "cluster name"));
+        addOption(options, new Option(ARG.workflowUser.getArgName(), true,
+                "workflow user id"), false);
 
         return new GnuParser().parse(options, arguments);
     }
@@ -152,14 +153,12 @@ public class MessageProducer extends Configured implements Tool {
             return 0;
         }
 
-        MessageProducer falconMessageProducer = new MessageProducer();
         try {
-            falconMessageProducer.createAndStartConnection(
-                    cmd.getOptionValue(ARG.brokerImplClass.name()), "",
+            createAndStartConnection(cmd.getOptionValue(ARG.brokerImplClass.name()), "",
                     "", cmd.getOptionValue(ARG.brokerUrl.name()));
             for (EntityInstanceMessage message : entityInstanceMessage) {
                 LOG.info("Sending message:" + message.getKeyValueMap());
-                falconMessageProducer.sendMessage(message);
+                sendMessage(message);
             }
         } catch (JMSException e) {
             LOG.error("Error in getConnection:", e);
@@ -167,8 +166,8 @@ public class MessageProducer extends Configured implements Tool {
             LOG.error("Error in getConnection:", e);
         } finally {
             try {
-                if (falconMessageProducer.connection != null) {
-                    falconMessageProducer.connection.close();
+                if (connection != null) {
+                    connection.close();
                 }
             } catch (JMSException e) {
                 LOG.error("Error in closing connection:", e);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
index da126c7..3f0c664 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
@@ -37,8 +37,6 @@ import javax.jms.*;
 public class FalconTopicProducerTest {
 
     private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
-    // private static final String BROKER_URL =
-    // "tcp://localhost:61616?daemon=true";
     private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
     private static final String TOPIC_NAME = "FALCON.ENTITY.TOPIC";
     private static final String SECONDARY_TOPIC_NAME = "FALCON.ENTITY.SEC.TOPIC";
@@ -140,6 +138,7 @@ public class FalconTopicProducerTest {
     private List<String> createCommonArgs() {
         return new ArrayList<String>(Arrays.asList(
                 "-" + ARG.workflowId.getArgName(), "workflow-01-00",
+                "-" + ARG.workflowUser.getArgName(), "falcon",
                 "-" + ARG.runId.getArgName(), "1",
                 "-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
                 "-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
@@ -169,6 +168,7 @@ public class FalconTopicProducerTest {
             }
         };
         t.start();
+        Thread.sleep(100);
         for (String[] message : messages) {
             new MessageProducer().run(message);
         }
@@ -203,6 +203,8 @@ public class FalconTopicProducerTest {
                 "agg-coord");
         Assert.assertEquals(m.getString(ARG.workflowId.getArgName()),
                 "workflow-01-00");
+        Assert.assertEquals(m.getString(ARG.workflowUser.getArgName()),
+                "falcon");
         Assert.assertEquals(m.getString(ARG.runId.getArgName()), "1");
         Assert.assertEquals(m.getString(ARG.nominalTime.getArgName()),
                 "2011-01-01T01:00Z");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
index e707567..57ccdc5 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
@@ -73,6 +73,7 @@ public class FeedProducerTest {
                             "-" + ARG.feedInstancePaths.getArgName(),
                             "/click-logs/10/05/05/00/20",
                             "-" + ARG.workflowId.getArgName(), "workflow-01-00",
+                            "-" + ARG.workflowUser.getArgName(), "falcon",
                             "-" + ARG.runId.getArgName(), "1",
                             "-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
                             "-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
@@ -200,6 +201,8 @@ public class FeedProducerTest {
         Assert.assertEquals(m.getString(ARG.operation.getArgName()), "DELETE");
         Assert.assertEquals(m.getString(ARG.workflowId.getArgName()),
                 "workflow-01-00");
+        Assert.assertEquals(m.getString(ARG.workflowUser.getArgName()),
+                "falcon");
         Assert.assertEquals(m.getString(ARG.runId.getArgName()), "1");
         Assert.assertEquals(m.getString(ARG.nominalTime.getArgName()),
                 "2011-01-01T01:00Z");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
index 3a40e76..9f8b07f 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
@@ -35,8 +35,6 @@ public class ProcessProducerTest {
 
     private String[] args;
     private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
-    // private static final String BROKER_URL =
-    // "tcp://localhost:61616?daemon=true";
     private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
     private static final String TOPIC_NAME = "FALCON.PROCESS";
     private BrokerService broker;
@@ -50,6 +48,7 @@ public class ProcessProducerTest {
                             "-" + ARG.feedInstancePaths.getArgName(),
                             "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
                             "-" + ARG.workflowId.getArgName(), "workflow-01-00",
+                            "-" + ARG.workflowUser.getArgName(), "falcon",
                             "-" + ARG.runId.getArgName(), "1",
                             "-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
                             "-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
@@ -139,6 +138,8 @@ public class ProcessProducerTest {
                 TOPIC_NAME);
         Assert.assertEquals(m.getString(ARG.workflowId.getArgName()),
                 "workflow-01-00");
+        Assert.assertEquals(m.getString(ARG.workflowUser.getArgName()),
+                "falcon");
         Assert.assertEquals(m.getString(ARG.runId.getArgName()), "1");
         Assert.assertEquals(m.getString(ARG.nominalTime.getArgName()),
                 "2011-01-01T01:00Z");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
index 275a725..0b680ba 100644
--- a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
+++ b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
@@ -38,6 +38,7 @@ public final class GenericAlert {
             @Dimension(value = "entity-name") String entityName,
             @Dimension(value = "nominal-name") String nominalTime,
             @Dimension(value = "wf-id") String wfId,
+            @Dimension(value = "wf-user") String workflowUser,
             @Dimension(value = "run-id") String runId,
             @Dimension(value = "error-message") String message) {
         return "IGNORE";
@@ -49,6 +50,7 @@ public final class GenericAlert {
             @Dimension(value = "entity-name") String entityName,
             @Dimension(value = "nominal-name") String nominalTime,
             @Dimension(value = "wf-id") String wfId,
+            @Dimension(value = "wf-user") String workflowUser,
             @Dimension(value = "run-id") String runId,
             @Dimension(value = "error-message") String message) {
         return "IGNORE";
@@ -62,6 +64,7 @@ public final class GenericAlert {
             @Dimension(value = "entity-name") String entityName,
             @Dimension(value = "nominal-time") String nominalTime,
             @Dimension(value = "wf-id") String workflowId,
+            @Dimension(value = "wf-user") String workflowUser,
             @Dimension(value = "run-id") String runId,
             @Dimension(value = "operation") String operation,
             @Dimension(value = "start-time") String startTime,
@@ -80,6 +83,7 @@ public final class GenericAlert {
             @Dimension(value = "entity-name") String entityName,
             @Dimension(value = "nominal-time") String nominalTime,
             @Dimension(value = "wf-id") String workflowId,
+            @Dimension(value = "wf-user") String workflowUser,
             @Dimension(value = "run-id") String runId,
             @Dimension(value = "operation") String operation,
             @Dimension(value = "start-time") String startTime,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
index cecdeef..0762514 100644
--- a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
+++ b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
@@ -30,6 +30,7 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Property;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
 import org.apache.falcon.oozie.bundle.COORDINATOR;
@@ -37,6 +38,7 @@ import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.ObjectFactory;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.service.FalconPathFilter;
 import org.apache.falcon.service.SharedLibraryHostingService;
 import org.apache.falcon.util.RuntimeProperties;
@@ -45,8 +47,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Logger;
 import org.apache.oozie.client.OozieClient;
 
@@ -141,7 +145,7 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
             Path coordPath = getCoordPath(bundlePath, coordinatorapp.getName());
             String coordXmlName = marshal(cluster, coordinatorapp, coordPath,
                     EntityUtil.getWorkflowNameSuffix(coordinatorapp.getName(), entity));
-            createTempDir(cluster, coordPath);
+            createLogsDir(cluster, coordPath);
             COORDINATOR bundleCoord = new COORDINATOR();
             bundleCoord.setName(coordinatorapp.getName());
             bundleCoord.setAppPath(getStoragePath(coordPath) + "/" + coordXmlName);
@@ -192,9 +196,9 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
     }
 
     protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, EntityType type, String lifecycle)
-        throws IOException {
+        throws IOException, FalconException {
         String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
-        FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
         addExtensionJars(fs, new Path(libext), wf);
         addExtensionJars(fs, new Path(libext, type.name()), wf);
         if (StringUtils.isNotEmpty(lifecycle)) {
@@ -208,7 +212,6 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
             SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
                     libPath, cluster, FALCON_JAR_FILTER);
         } catch (IOException e) {
-            LOG.error("Failed to copy shared libs on cluster " + cluster.getName(), e);
             throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);
         }
     }
@@ -286,11 +289,11 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
 
     protected void marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
         throws FalconException {
-
         try {
             Marshaller marshaller = jaxbContext.createMarshaller();
             marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
-            FileSystem fs = outPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(
+                    outPath.toUri(), ClusterHelper.getConfiguration(cluster));
             OutputStream out = fs.create(outPath);
             try {
                 marshaller.marshal(jaxbElement, out);
@@ -310,12 +313,16 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
         }
     }
 
-    private void createTempDir(Cluster cluster, Path coordPath) throws FalconException {
+    private void createLogsDir(Cluster cluster, Path coordPath) throws FalconException {
         try {
-            FileSystem fs = coordPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
-            Path tempDir = new Path(coordPath, "../../logs");
-            fs.mkdirs(tempDir);
-            fs.setPermission(tempDir, new FsPermission((short) 511));
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(
+                    coordPath.toUri(), ClusterHelper.getConfiguration(cluster));
+            Path logsDir = new Path(coordPath, "../../logs");
+            fs.mkdirs(logsDir);
+
+            // logs are copied with in oozie as the user in Post Processing and hence 777 permissions
+            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+            fs.setPermission(logsDir, permission);
         } catch (Exception e) {
             throw new FalconException("Unable to create temp dir in " + coordPath, e);
         }
@@ -334,8 +341,7 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
 
         marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
                 BUNDLE_JAXB_CONTEXT,
-                new Path(
-                        outPath, "bundle.xml"));
+                new Path(outPath, "bundle.xml"));
     }
 
     protected void marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
@@ -394,11 +400,17 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
     }
 
     protected void createHiveConf(FileSystem fs, Path confPath, String metastoreUrl,
-                                  String prefix) throws IOException {
+                                  Cluster cluster, String prefix) throws IOException {
         Configuration hiveConf = new Configuration(false);
         hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUrl);
         hiveConf.set("hive.metastore.local", "false");
 
+        if (UserGroupInformation.isSecurityEnabled()) {
+            hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
+                    ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL));
+            hiveConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
+        }
+
         OutputStream out = null;
         try {
             out = fs.create(new Path(confPath, prefix + "hive-site.xml"));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
index d544311..92b90e7 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
@@ -46,7 +46,7 @@ import java.net.URLConnection;
 import java.util.List;
 
 /**
- * Utitlity called in the post process of oozie workflow to move oozie action executor log.
+ * Utility called in the post process of oozie workflow to move oozie action executor log.
  */
 public class LogMover extends Configured implements Tool {