You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/02/16 04:31:25 UTC
[4/5] FALCON-11 Add support for security in Falcon. Contributed by
Venkatesh Seetharam
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/security/SecurityConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/SecurityConstants.java b/common/src/main/java/org/apache/falcon/security/SecurityConstants.java
deleted file mode 100644
index 8f7ba4a..0000000
--- a/common/src/main/java/org/apache/falcon/security/SecurityConstants.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import com.sun.security.auth.UnixPrincipal;
-
-import java.security.Principal;
-
-/**
- * Constants for the security module.
- */
-public final class SecurityConstants {
-
- private SecurityConstants() {}
-
- public static final String OS_LOGIN_MODULE_NAME =
- "com.sun.security.auth.module.UnixLoginModule";
- public static final Class<? extends Principal> OS_PRINCIPAL_CLASS =
- UnixPrincipal.class;
-
- public static final String FALCON_LOGIN = "FALCON_DEFAULT_LOGIN";
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
new file mode 100644
index 0000000..f78043f
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
+import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Security Util - bunch of security related helper methods.
+ * Also doles out proxied UserGroupInformation. Caches proxied users.
+ */
+public final class SecurityUtil {
+
+ /**
+ * Constant for the configuration property that indicates the prefix.
+ */
+ private static final String CONFIG_PREFIX = "falcon.authentication.";
+
+ /**
+ * Constant for the configuration property that indicates the authentication type.
+ */
+ public static final String AUTHENTICATION_TYPE = CONFIG_PREFIX + "type";
+
+ /**
+ * Constant for the configuration property that indicates the Name node principal.
+ */
+ public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal";
+
+ /**
+ * Constant for the configuration property that indicates the Name node principal.
+ * This is used to talk to Hive Meta Store during parsing and validations only.
+ */
+ public static final String HIVE_METASTORE_PRINCIPAL = "hive.metastore.kerberos.principal";
+
+
+ private static ConcurrentMap<String, UserGroupInformation> userUgiMap =
+ new ConcurrentHashMap<String, UserGroupInformation>();
+
+ private SecurityUtil() {
+ }
+
+ public static String getAuthenticationType() {
+ return StartupProperties.get().getProperty(
+ AUTHENTICATION_TYPE, PseudoAuthenticationHandler.TYPE);
+ }
+
+ public static boolean isSecurityEnabled() {
+ String authenticationType = StartupProperties.get().getProperty(
+ AUTHENTICATION_TYPE, PseudoAuthenticationHandler.TYPE);
+
+ final boolean useKerberos;
+ if (authenticationType == null || PseudoAuthenticationHandler.TYPE.equals(authenticationType)) {
+ useKerberos = false;
+ } else if (KerberosAuthenticationHandler.TYPE.equals(authenticationType)) {
+ useKerberos = true;
+ } else {
+ throw new IllegalArgumentException("Invalid attribute value for "
+ + AUTHENTICATION_TYPE + " of " + authenticationType);
+ }
+
+ return useKerberos;
+ }
+
+ public static UserGroupInformation getProxyUser(String proxyUser) throws IOException {
+ UserGroupInformation proxyUgi = userUgiMap.get(proxyUser);
+ if (proxyUgi == null) {
+ // taking care of a race condition, the latest UGI will be discarded
+ proxyUgi = UserGroupInformation.createProxyUser(proxyUser, UserGroupInformation.getLoginUser());
+ userUgiMap.putIfAbsent(proxyUser, proxyUgi);
+ }
+
+ return proxyUgi;
+ }
+
+ public static String getLocalHostName() throws UnknownHostException {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index 7ed4394..4580bad 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -33,6 +33,7 @@ import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Inputs;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -112,7 +113,8 @@ public final class UpdateHelper {
}
Path checksum = new Path(bundlePath, EntityUtil.PROCESS_CHECKSUM_FILE);
- FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(clusterEntity));
+ Configuration conf = ClusterHelper.getConfiguration(clusterEntity);
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
if (!fs.exists(checksum)) {
//Update if there is no checksum file(for migration)
return true;
@@ -126,11 +128,7 @@ public final class UpdateHelper {
}
//Update if the user wf/lib is updated i.e., if checksums are different
- if (!wfPaths.equals(checksums)) {
- return true;
- }
-
- return false;
+ return !wfPaths.equals(checksums);
} catch (IOException e) {
throw new FalconException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 3014418..51b3364 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -28,8 +28,9 @@
*.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager
*.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService
-*.application.services=org.apache.falcon.entity.store.ConfigurationStore,\
+*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
org.apache.falcon.service.ProcessSubscriberService,\
+ org.apache.falcon.entity.store.ConfigurationStore,\
org.apache.falcon.rerun.service.RetryService,\
org.apache.falcon.rerun.service.LateRunService,\
org.apache.falcon.service.SLAMonitoringService,\
@@ -43,8 +44,13 @@
######### Implementation classes #########
+
+######### System startup parameters #########
+
+# Location of libraries that is shipped to Hadoop
*.system.lib.location=${FALCON_HOME}/sharedlibs
+# Location to store user entity configurations
debug.config.store.uri=file://${user.dir}/target/store
debug.config.oozie.conf.uri=${user.dir}/target/oozie
debug.system.lib.location=${system.lib.location}
@@ -56,8 +62,12 @@ debug.libext.process.paths=${falcon.libext}
*.falcon.cleanup.service.frequency=days(1)
+
+######### Properties for configuring JMS provider - activemq #########
+# Default Active MQ url
*.broker.url=tcp://localhost:61616
-#default time-to-live for a JMS message 3 days (time in minutes)
+
+# default time-to-live for a JMS message 3 days (time in minutes)
*.broker.ttlInMins=4320
*.entity.topic=FALCON.ENTITY.TOPIC
*.max.retry.failure.count=1
@@ -65,3 +75,53 @@ debug.libext.process.paths=${falcon.libext}
######### Properties for configuring iMon client and metric #########
*.internal.queue.size=1000
+
+
+######### Authentication Properties #########
+
+# Authentication type must be specified: simple|kerberos
+*.falcon.authentication.type=simple
+
+##### Service Configuration
+
+# Indicates the Kerberos principal to be used in Falcon Service.
+*.falcon.service.authentication.kerberos.principal=
+
+# Location of the keytab file with the credentials for the Service principal.
+*.falcon.service.authentication.kerberos.keytab=
+
+# name node principal to talk to config store
+*.dfs.namenode.kerberos.principal=
+
+##### SPNEGO Configuration
+
+# Authentication type must be specified: simple|kerberos|<class>
+# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility
+*.falcon.http.authentication.type=simple
+
+# Indicates how long (in seconds) an authentication token is valid before it has to be renewed.
+*.falcon.http.authentication.token.validity=36000
+
+# The signature secret for signing the authentication tokens.
+*.falcon.http.authentication.signature.secret=falcon
+
+# The domain to use for the HTTP cookie that stores the authentication token.
+*.falcon.http.authentication.cookie.domain=
+
+# Indicates if anonymous requests are allowed when using 'simple' authentication.
+*.falcon.http.authentication.simple.anonymous.allowed=false
+
+# Indicates the Kerberos principal to be used for HTTP endpoint.
+# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
+*.falcon.http.authentication.kerberos.principal=
+
+# Location of the keytab file with the credentials for the HTTP principal.
+*.falcon.http.authentication.kerberos.keytab=
+
+# The kerberos names rules is to resolve kerberos principal names, refer to Hadoop's KerberosName for more details.
+*.falcon.http.authentication.kerberos.name.rules=DEFAULT
+
+# Comma separated list of black listed users
+*.falcon.http.authentication.blacklisted.users=
+
+######### Authentication Properties #########
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index 7668c7f..6ec5b41 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -25,10 +25,10 @@ import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interface;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -71,6 +71,8 @@ public class AbstractTestBase {
listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", ""));
store = ConfigurationStore.get();
store.init();
+
+ CurrentUser.authenticate("falcon");
}
protected void cleanupStore() throws FalconException {
@@ -136,13 +138,4 @@ public class AbstractTestBase {
marshaller.marshal(entity, stringWriter);
return stringWriter.toString();
}
-
- private Interface newInterface(Interfacetype type, String endPoint,
- String version) {
- Interface iface = new Interface();
- iface.setType(type);
- iface.setEndpoint(endPoint);
- iface.setVersion(version);
- return iface;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index 7b48d2b..1d56a78 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -162,18 +162,6 @@ public class FileSystemStorageTest {
}
@Test
- public void testExists() throws Exception {
- final Location location = new Location();
- location.setPath("/foo/bar");
- location.setType(LocationType.DATA);
- List<Location> locations = new ArrayList<Location>();
- locations.add(location);
-
- FileSystemStorage storage = new FileSystemStorage("jail://global:00", locations);
- Assert.assertTrue(storage.exists());
- }
-
- @Test
public void testIsIdentical() throws Exception {
final String storageUrl = "jail://global:00";
final Location location1 = new Location();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java b/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
new file mode 100644
index 0000000..3b4e7f0
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hadoop;
+
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+
+/**
+ * Unit tests for HadoopClientFactory that doles out FileSystem handles.
+ */
+public class HadoopClientFactoryTest {
+
+ private EmbeddedCluster embeddedCluster;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ embeddedCluster = EmbeddedCluster.newCluster(getClass().getSimpleName());
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ if (embeddedCluster != null) {
+ embeddedCluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ HadoopClientFactory clientFactory = HadoopClientFactory.get();
+ Assert.assertNotNull(clientFactory);
+ }
+
+ @Test (enabled = false) // todo: cheated the conf to impersonate as same user
+ public void testCreateFileSystemWithSameUser() {
+ String user = System.getProperty("user.name");
+ try {
+ Configuration conf = embeddedCluster.getConf();
+ URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
+ Assert.assertNotNull(uri);
+ HadoopClientFactory.get().createProxiedFileSystem(user, uri, conf);
+ Assert.fail("Impersonation should have failed.");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getCause().getClass(), RemoteException.class);
+ }
+ }
+
+ @Test
+ public void testCreateFileSystem() throws Exception {
+ Configuration conf = embeddedCluster.getConf();
+
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation realUser = UserGroupInformation.createUserForTesting(
+ "testuser", new String[]{"testgroup"});
+ UserGroupInformation.createProxyUserForTesting("proxyuser", realUser, new String[]{"proxygroup"});
+
+ URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
+ Assert.assertNotNull(uri);
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem("testuser", uri, conf);
+ Assert.assertNotNull(fs);
+ }
+
+ @Test
+ public void testCreateFileSystemWithUser() throws Exception {
+ Configuration conf = embeddedCluster.getConf();
+
+ UserGroupInformation realUser = UserGroupInformation.createUserForTesting(
+ "testuser", new String[]{"testgroup"});
+ UserGroupInformation.createProxyUserForTesting("proxyuser", realUser, new String[]{"proxygroup"});
+ UserGroupInformation.setConfiguration(conf);
+
+ URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY));
+ Assert.assertNotNull(uri);
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem("seetharam", uri, conf);
+ Assert.assertNotNull(fs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/test/java/org/apache/falcon/security/AuthenticationInitializationServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/AuthenticationInitializationServiceTest.java b/common/src/test/java/org/apache/falcon/security/AuthenticationInitializationServiceTest.java
new file mode 100644
index 0000000..9b76a61
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/security/AuthenticationInitializationServiceTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
+import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+
+
+/**
+ * Unit test for AuthenticationInitializationService that employs mocks.
+ */
+public class AuthenticationInitializationServiceTest {
+
+ private AuthenticationInitializationService authenticationService;
+
+ @Mock
+ private UserGroupInformation mockLoginUser;
+
+ @BeforeClass
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ authenticationService = new AuthenticationInitializationService();
+ }
+
+ @Test
+ public void testGetName() {
+ Assert.assertEquals("Authentication initialization service",
+ authenticationService.getName());
+ }
+
+ @Test
+ public void testInitForSimpleAuthenticationMethod() {
+ try {
+ StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE,
+ PseudoAuthenticationHandler.TYPE);
+ authenticationService.init();
+
+ UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+ Assert.assertFalse(loginUser.isFromKeytab());
+ Assert.assertEquals(loginUser.getAuthenticationMethod().name().toLowerCase(),
+ PseudoAuthenticationHandler.TYPE);
+ Assert.assertEquals(System.getProperty("user.name"), loginUser.getUserName());
+ } catch (Exception e) {
+ Assert.fail("AuthenticationInitializationService init failed.", e);
+ }
+ }
+
+ @Test
+ public void testKerberosAuthenticationWithKeytabFileDoesNotExist() {
+ try {
+ StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE,
+ KerberosAuthenticationHandler.TYPE);
+ StartupProperties.get().setProperty(AuthenticationInitializationService.KERBEROS_KEYTAB, "/blah/blah");
+ authenticationService.init();
+ Assert.fail("The keytab file does not exist! must have been thrown.");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class);
+ }
+ }
+
+ @Test
+ public void testKerberosAuthenticationWithKeytabFileIsADirectory() {
+ try {
+ StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE,
+ KerberosAuthenticationHandler.TYPE);
+ StartupProperties.get().setProperty(AuthenticationInitializationService.KERBEROS_KEYTAB, "/tmp/");
+ authenticationService.init();
+ Assert.fail("The keytab file cannot be a directory! must have been thrown.");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class);
+ }
+ }
+
+ @Test
+ public void testKerberosAuthenticationWithKeytabFileNotReadable() {
+ File tempFile = new File(".keytabFile");
+ try {
+ assert tempFile.createNewFile();
+ assert tempFile.setReadable(false);
+
+ StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE,
+ KerberosAuthenticationHandler.TYPE);
+ StartupProperties.get().setProperty(
+ AuthenticationInitializationService.KERBEROS_KEYTAB, tempFile.toString());
+ authenticationService.init();
+ Assert.fail("The keytab file is not readable! must have been thrown.");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class);
+ } finally {
+ assert tempFile.delete();
+ }
+ }
+
+ @Test (enabled = false)
+ public void testInitForKerberosAuthenticationMethod() throws FalconException {
+ Mockito.when(mockLoginUser.getAuthenticationMethod())
+ .thenReturn(UserGroupInformation.AuthenticationMethod.KERBEROS);
+ Mockito.when(mockLoginUser.getUserName()).thenReturn("falcon");
+ Mockito.when(mockLoginUser.isFromKeytab()).thenReturn(Boolean.TRUE);
+
+ StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE,
+ KerberosAuthenticationHandler.TYPE);
+ StartupProperties.get().setProperty(
+ AuthenticationInitializationService.KERBEROS_KEYTAB, "falcon.kerberos.keytab");
+ StartupProperties.get().setProperty(AuthenticationInitializationService.KERBEROS_PRINCIPAL, "falcon");
+
+ authenticationService.init();
+
+ Assert.assertTrue(mockLoginUser.isFromKeytab());
+ Assert.assertEquals(mockLoginUser.getAuthenticationMethod().name(),
+ KerberosAuthenticationHandler.TYPE);
+ Assert.assertEquals("falcon", mockLoginUser.getUserName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java b/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java
new file mode 100644
index 0000000..630aa4b
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test for Security utils.
+ */
+public class SecurityUtilTest {
+
+ @Test
+ public void testDefaultGetAuthenticationType() throws Exception {
+ Assert.assertEquals(SecurityUtil.getAuthenticationType(), "simple");
+ }
+
+ @Test
+ public void testGetAuthenticationType() throws Exception {
+ try {
+ StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, "kerberos");
+ Assert.assertEquals(SecurityUtil.getAuthenticationType(), "kerberos");
+ } finally {
+ // reset
+ StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, "simple");
+ }
+ }
+
+ @Test
+ public void testIsSecurityEnabledByDefault() throws Exception {
+ Assert.assertFalse(SecurityUtil.isSecurityEnabled());
+ }
+
+ @Test
+ public void testIsSecurityEnabled() throws Exception {
+ try {
+ StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, "kerberos");
+ Assert.assertTrue(SecurityUtil.isSecurityEnabled());
+ } finally {
+ // reset
+ StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, "simple");
+ }
+ }
+
+ @Test
+ public void testGetProxyUser() throws Exception {
+ UserGroupInformation proxyUgi = SecurityUtil.getProxyUser("proxy");
+ Assert.assertNotNull(proxyUgi);
+ Assert.assertEquals(proxyUgi.getUserName(), "proxy");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/Security.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Security.twiki b/docs/src/site/twiki/Security.twiki
new file mode 100644
index 0000000..c1f7656
--- /dev/null
+++ b/docs/src/site/twiki/Security.twiki
@@ -0,0 +1,193 @@
+---+ Securing Falcon
+
+---++ Overview
+
+Apache Falcon enforces authentication on protected resources. Once authentication has been established it sets a
+signed HTTP Cookie that contains an authentication token with the user name, user principal,
+authentication type and expiration time.
+
+It does so by using [[http://hadoop.apache .org/docs/current/hadoop-auth/index.html][Hadoop Auth]].
+Hadoop Auth is a Java library consisting of a client and a server components to enable Kerberos SPNEGO authentication
+for HTTP. Hadoop Auth also supports additional authentication mechanisms on the client and the server side via 2
+simple interfaces.
+
+
+---++ Authentication Methods
+
+It supports 2 authentication methods, simple and kerberos out of the box.
+
+---+++ Pseudo/Simple Authentication
+
+Falcon authenticates the user by simply trusting the value of the query string parameter 'user.name'. This is the
+default mode Falcon is configured with.
+
+---+++ Kerberos Authentication
+
+Falcon uses HTTP Kerberos SPNEGO to authenticate the user.
+
+---++ Server Side Configuration Setup
+
+---+++ Common Configuration Parameters
+
+<verbatim>
+# Authentication type must be specified: simple|kerberos
+*.falcon.authentication.type=kerberos
+</verbatim>
+
+---+++ Kerberos Configuration
+
+<verbatim>
+##### Service Configuration
+
+# Indicates the Kerberos principal to be used in Falcon Service.
+*.falcon.service.authentication.kerberos.principal=falcon/_HOST@EXAMPLE.COM
+
+# Location of the keytab file with the credentials for the Service principal.
+*.falcon.service.authentication.kerberos.keytab=/etc/security/keytabs/falcon.service.keytab
+
+# name node principal to talk to config store
+*.dfs.namenode.kerberos.principal=nn/_HOST@EXAMPLE.COM
+
+##### SPNEGO Configuration
+
+# Authentication type must be specified: simple|kerberos|<class>
+# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility
+*.falcon.http.authentication.type=kerberos
+
+# Indicates how long (in seconds) an authentication token is valid before it has to be renewed.
+*.falcon.http.authentication.token.validity=36000
+
+# The signature secret for signing the authentication tokens.
+*.falcon.http.authentication.signature.secret=falcon
+
+# The domain to use for the HTTP cookie that stores the authentication token.
+*.falcon.http.authentication.cookie.domain=
+
+# Indicates if anonymous requests are allowed when using 'simple' authentication.
+*.falcon.http.authentication.simple.anonymous.allowed=true
+
+# Indicates the Kerberos principal to be used for HTTP endpoint.
+# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
+*.falcon.http.authentication.kerberos.principal=HTTP/_HOST@EXAMPLE.COM
+
+# Location of the keytab file with the credentials for the HTTP principal.
+*.falcon.http.authentication.kerberos.keytab=/etc/security/keytabs/spnego.service.keytab
+
+# The kerberos names rules is to resolve kerberos principal names, refer to Hadoop's KerberosName for more details.
+*.falcon.http.authentication.kerberos.name.rules=DEFAULT
+
+# Comma separated list of black listed users
+*.falcon.http.authentication.blacklisted.users=
+</verbatim>
+
+---+++ Pseudo/Simple Configuration
+
+<verbatim>
+##### SPNEGO Configuration
+
+# Authentication type must be specified: simple|kerberos|<class>
+# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility
+*.falcon.http.authentication.type=simple
+
+# Indicates how long (in seconds) an authentication token is valid before it has to be renewed.
+*.falcon.http.authentication.token.validity=36000
+
+# The signature secret for signing the authentication tokens.
+*.falcon.http.authentication.signature.secret=falcon
+
+# The domain to use for the HTTP cookie that stores the authentication token.
+*.falcon.http.authentication.cookie.domain=
+
+# Indicates if anonymous requests are allowed when using 'simple' authentication.
+*.falcon.http.authentication.simple.anonymous.allowed=true
+
+# Comma separated list of black listed users
+*.falcon.http.authentication.blacklisted.users=
+</verbatim>
+
+---+++ SSL Configuration
+
+<verbatim>
+*.falcon.enableTLS=true
+*.keystore.file=/path/to/keystore/file
+*.keystore.password=password
+</verbatim>
+
+---+++ Distributed Falcon Setup
+
+Falcon should be configured to communicate with Prism over TLS in secure mode. Its not enabled by default.
+
+
+---++ Changes to ownership and permissions of directories managed by Falcon
+
+| *Directory* | *Location* | *Owner* | *Permissions* |
+| Configuration Store | ${config.store.uri} | falcon | 750 |
+| Oozie coord/bundle XMLs | ${cluster.staging-location}/workflows/{entity}/{entity-name} | falcon | 644 |
+| Shared libs | {cluster.working}/{lib,libext} | falcon | 755 |
+| App logs | ${cluster.staging-location}/workflows/{entity}/{entity-name}/logs | falcon | 777 |
+
+
+---++ Backwards compatibility
+
+---+++ Scheduled Entities
+
+Entities already scheduled with an earlier version of Falcon are not compatible with this version
+
+---+++ Falcon Clients
+
+Older Falcon clients are backwards compatible wrt Authentication and user information sent as part of the HTTP
+header, Remote-User is still honoured when the authentication type is configured as below:
+
+<verbatim>
+*.falcon.http.authentication.type=org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler
+</verbatim>
+
+---+++ Blacklisted super users for authentication
+
+The blacklist users used to have the following super users: hdfs, mapreduce, oozie, and falcon.
+The list is externalized from code into Startup.properties file and is empty now and needs to be
+configured specifically in the file.
+
+
+---+++ Falcon Dashboard
+
+The dashboard assumes an anonymous user in Pseudo/Simple method and hence anonymous users must be enabled for it to
+work.
+<verbatim>
+# Indicates if anonymous requests are allowed when using 'simple' authentication.
+*.falcon.http.authentication.simple.anonymous.allowed=true
+</verbatim>
+
+In Kerberos method, the browser must support HTTP Kerberos SPNEGO.
+
+
+---++ Known Limitations
+
+ * ActiveMQ topics are not secure but will be in the near future
+ * Entities already scheduled with an earlier version of Falcon are not compatible with this version as new
+ workflow parameters are being passed back into Falcon such as the user are required
+ * Use of hftp as the scheme for read only interface in cluster entity [[https://issues.apache.org/jira/browse/HADOOP-10215][will not work in Oozie]]
+ The alternative is to use webhdfs scheme instead and its been tested with DistCp.
+
+
+---++ Examples
+
+---+++ Accessing the server using Falcon CLI (Java client)
+
+There is no change in the way the CLI is used. The CLI has been changed to work with the configured authentication
+method.
+
+---+++ Accessing the server using curl
+
+Try accessing protected resources using curl. The protected resources are:
+
+<verbatim>
+$ kinit
+Please enter the password for venkatesh@LOCALHOST:
+
+$ curl http://localhost:15000/api/admin/version
+
+$ curl http://localhost:15000/api/admin/version?user.name=venkatesh
+
+$ curl --negotiate -u foo -b ~/cookiejar.txt -c ~/cookiejar.txt curl http://localhost:15000/api/admin/version
+</verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/index.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/index.twiki b/docs/src/site/twiki/index.twiki
index 81c4c3e..ee48fbb 100644
--- a/docs/src/site/twiki/index.twiki
+++ b/docs/src/site/twiki/index.twiki
@@ -30,7 +30,8 @@ describes steps to on-board a pipeline to Falcon. It also gives a sample pipelin
describes various options for the command line utility provided by Falcon.
Falcon provides OOTB [[HiveIntegration][lifecycle management for Tables in Hive (HCatalog)]]
-such as table replication for BCP and table eviction.
+such as table replication for BCP and table eviction. Falcon also enforces
+[[Security][kerberos authentication]] on protected resources and enables SSL.
#LicenseInfo
---+ Licensing Information
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/AdminConfig.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/AdminConfig.twiki b/docs/src/site/twiki/restapi/AdminConfig.twiki
index 2841b25..675b19e 100644
--- a/docs/src/site/twiki/restapi/AdminConfig.twiki
+++ b/docs/src/site/twiki/restapi/AdminConfig.twiki
@@ -17,7 +17,6 @@ Configuration information of the server.
---+++ Rest Call
<verbatim>
GET http://localhost:15000/api/admin/config/deploy
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/AdminStack.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/AdminStack.twiki b/docs/src/site/twiki/restapi/AdminStack.twiki
index a241999..79dbd9b 100644
--- a/docs/src/site/twiki/restapi/AdminStack.twiki
+++ b/docs/src/site/twiki/restapi/AdminStack.twiki
@@ -16,7 +16,6 @@ Stack trace of the server.
---+++ Rest Call
<verbatim>
GET http://localhost:15000/api/admin/stack
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/AdminVersion.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/AdminVersion.twiki b/docs/src/site/twiki/restapi/AdminVersion.twiki
index fbf1405..00b0283 100644
--- a/docs/src/site/twiki/restapi/AdminVersion.twiki
+++ b/docs/src/site/twiki/restapi/AdminVersion.twiki
@@ -16,7 +16,6 @@ Version of the server.
---+++ Rest Call
<verbatim>
GET http://localhost:15000/api/admin/version
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntityDefinition.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityDefinition.twiki b/docs/src/site/twiki/restapi/EntityDefinition.twiki
index 955be71..1f76a4f 100644
--- a/docs/src/site/twiki/restapi/EntityDefinition.twiki
+++ b/docs/src/site/twiki/restapi/EntityDefinition.twiki
@@ -18,7 +18,6 @@ Definition of the entity.
---+++ Rest Call
<verbatim>
GET http://localhost:15000/api/entities/definition/process/SampleProcess
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntityDelete.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityDelete.twiki b/docs/src/site/twiki/restapi/EntityDelete.twiki
index 7a7e22a..850b451 100644
--- a/docs/src/site/twiki/restapi/EntityDelete.twiki
+++ b/docs/src/site/twiki/restapi/EntityDelete.twiki
@@ -18,7 +18,6 @@ Results of the delete operation.
---+++ Rest Call
<verbatim>
DELETE http://localhost:15000/api/entities/delete/cluster/SampleProcess
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntityDependencies.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityDependencies.twiki b/docs/src/site/twiki/restapi/EntityDependencies.twiki
index 6daab68..c61bc43 100644
--- a/docs/src/site/twiki/restapi/EntityDependencies.twiki
+++ b/docs/src/site/twiki/restapi/EntityDependencies.twiki
@@ -18,7 +18,6 @@ Dependenciess of the entity.
---+++ Rest Call
<verbatim>
GET http://localhost:15000/api/entities/dependencies/process/SampleProcess
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntityList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityList.twiki b/docs/src/site/twiki/restapi/EntityList.twiki
index bca84b0..b9cf349 100644
--- a/docs/src/site/twiki/restapi/EntityList.twiki
+++ b/docs/src/site/twiki/restapi/EntityList.twiki
@@ -19,7 +19,6 @@ List of the entities.
---+++ Rest Call
<verbatim>
GET http://localhost:15000/api/entities/list/feed
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
@@ -40,7 +39,6 @@ Remote-User: rgautam
---+++ Rest Call
<verbatim>
GET http://localhost:15000/api/entities/list/feed?fields=status
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntityResume.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityResume.twiki b/docs/src/site/twiki/restapi/EntityResume.twiki
index 223a83f..a2d5184 100644
--- a/docs/src/site/twiki/restapi/EntityResume.twiki
+++ b/docs/src/site/twiki/restapi/EntityResume.twiki
@@ -18,7 +18,6 @@ Result of the resume command.
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/entities/resume/process/SampleProcess
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntitySchedule.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntitySchedule.twiki b/docs/src/site/twiki/restapi/EntitySchedule.twiki
index e481613..dd97141 100644
--- a/docs/src/site/twiki/restapi/EntitySchedule.twiki
+++ b/docs/src/site/twiki/restapi/EntitySchedule.twiki
@@ -18,7 +18,6 @@ Result of the schedule command.
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/entities/schedule/process/SampleProcess
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntityStatus.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityStatus.twiki b/docs/src/site/twiki/restapi/EntityStatus.twiki
index f0e772b..34d166d 100644
--- a/docs/src/site/twiki/restapi/EntityStatus.twiki
+++ b/docs/src/site/twiki/restapi/EntityStatus.twiki
@@ -18,7 +18,6 @@ Status of the entity.
---+++ Rest Call
<verbatim>
GET http://localhost:15000/api/entities/status/process/SampleProcess
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntitySubmit.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntitySubmit.twiki b/docs/src/site/twiki/restapi/EntitySubmit.twiki
index e4b608e..925381c 100644
--- a/docs/src/site/twiki/restapi/EntitySubmit.twiki
+++ b/docs/src/site/twiki/restapi/EntitySubmit.twiki
@@ -17,7 +17,6 @@ Result of the submission.
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/entities/submit/feed
-Remote-User: rgautam
<?xml version="1.0" encoding="UTF-8"?>
<!-- Hourly sample input data -->
@@ -59,7 +58,6 @@ Remote-User: rgautam
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/entities/submit/process
-Remote-User: rgautam
<?xml version="1.0" encoding="UTF-8"?>
<!-- Daily sample process. Runs at 6th hour every day. Input - last day's hourly data. Generates output for yesterday -->
<process xmlns="uri:falcon:process:0.1" name="SampleProcess" >
@@ -103,4 +101,4 @@ Remote-User: rgautam
"message": "default\/Submit successful (process) SampleProcess\n",
"status": "SUCCEEDED"
}
-</verbatim>
\ No newline at end of file
+</verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntitySubmitAndSchedule.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntitySubmitAndSchedule.twiki b/docs/src/site/twiki/restapi/EntitySubmitAndSchedule.twiki
index fb3649d..042a5fb 100644
--- a/docs/src/site/twiki/restapi/EntitySubmitAndSchedule.twiki
+++ b/docs/src/site/twiki/restapi/EntitySubmitAndSchedule.twiki
@@ -17,7 +17,6 @@ Result of the submit and schedule command.
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/entities/submitAndSchedule/process
-Remote-User: rgautam
<?xml version="1.0" encoding="UTF-8"?>
<!-- Daily sample process. Runs at 6th hour every day. Input - last day's hourly data. Generates output for yesterday -->
<process xmlns="uri:falcon:process:0.1" name="SampleProcess" >
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntitySuspend.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntitySuspend.twiki b/docs/src/site/twiki/restapi/EntitySuspend.twiki
index 9d6e9ab..9e5efca 100644
--- a/docs/src/site/twiki/restapi/EntitySuspend.twiki
+++ b/docs/src/site/twiki/restapi/EntitySuspend.twiki
@@ -18,7 +18,6 @@ Status of the entity.
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/entities/suspend/process/SampleProcess
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntityUpdate.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityUpdate.twiki b/docs/src/site/twiki/restapi/EntityUpdate.twiki
index 16ec439..f2c2e7e 100644
--- a/docs/src/site/twiki/restapi/EntityUpdate.twiki
+++ b/docs/src/site/twiki/restapi/EntityUpdate.twiki
@@ -19,7 +19,6 @@ Result of the validation.
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/entities/update/process/SampleProcess?effective=2014-01-01T00:00Z
-Remote-User: rgautam
<?xml version="1.0" encoding="UTF-8"?>
<!-- Daily sample process. Runs at 6th hour every day. Input - last day's hourly data. Generates output for yesterday -->
<process xmlns="uri:falcon:process:0.1" name="SampleProcess" >
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/EntityValidate.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityValidate.twiki b/docs/src/site/twiki/restapi/EntityValidate.twiki
index bc0f508..89bd155 100644
--- a/docs/src/site/twiki/restapi/EntityValidate.twiki
+++ b/docs/src/site/twiki/restapi/EntityValidate.twiki
@@ -17,7 +17,6 @@ Result of the validation.
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/entities/validate/cluster
-Remote-User: rgautam
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cluster xmlns="uri:falcon:cluster:0.1" name="primary-cluster" description="Primary Cluster" colo="west-coast">
<interfaces>
@@ -46,7 +45,6 @@ Remote-User: rgautam
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/entities/validate/feed
-Remote-User: rgautam
<?xml version="1.0" encoding="UTF-8"?>
<!-- Hourly sample input data -->
@@ -88,7 +86,6 @@ Remote-User: rgautam
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/entities/validate/feed
-Remote-User: rgautam
<?xml version="1.0" encoding="UTF-8"?>
<!-- Daily sample output data -->
@@ -125,7 +122,6 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/entities/validate/process
-Remote-User: rgautam
<?xml version="1.0" encoding="UTF-8"?>
<!-- Daily sample process. Runs at 6th hour every day. Input - last day's hourly data. Generates output for yesterday -->
<process xmlns="uri:falcon:process:0.1" name="SampleProcess" >
@@ -169,4 +165,4 @@ Remote-User: rgautam
"message": "Validated successfully (PROCESS) SampleProcess",
"status": "SUCCEEDED"
}
-</verbatim>
\ No newline at end of file
+</verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/InstanceKill.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceKill.twiki b/docs/src/site/twiki/restapi/InstanceKill.twiki
index 5c429f6..eff2893 100644
--- a/docs/src/site/twiki/restapi/InstanceKill.twiki
+++ b/docs/src/site/twiki/restapi/InstanceKill.twiki
@@ -19,7 +19,6 @@ Result of the kill operation.
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/instance/kill/process/SampleProcess?colo=*&start=2012-04-03T07:00Z
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/InstanceLogs.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceLogs.twiki b/docs/src/site/twiki/restapi/InstanceLogs.twiki
index f84b828..599f2d7 100644
--- a/docs/src/site/twiki/restapi/InstanceLogs.twiki
+++ b/docs/src/site/twiki/restapi/InstanceLogs.twiki
@@ -20,7 +20,6 @@ Log of specified instance.
---+++ Rest Call
<verbatim>
GET http://localhost:15000/api/instance/logs/process/SampleProcess?colo=*&start=2012-04-03T07:00Z
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/InstanceRerun.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceRerun.twiki b/docs/src/site/twiki/restapi/InstanceRerun.twiki
index cf35475..77608e0 100644
--- a/docs/src/site/twiki/restapi/InstanceRerun.twiki
+++ b/docs/src/site/twiki/restapi/InstanceRerun.twiki
@@ -19,7 +19,6 @@ Results of the rerun command.
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/instance/rerun/process/SampleProcess?colo=*&start=2012-04-03T07:00Z
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/InstanceResume.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceResume.twiki b/docs/src/site/twiki/restapi/InstanceResume.twiki
index 2bdd6e1..2d29569 100644
--- a/docs/src/site/twiki/restapi/InstanceResume.twiki
+++ b/docs/src/site/twiki/restapi/InstanceResume.twiki
@@ -19,7 +19,6 @@ Results of the resume command.
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/instance/resume/process/SampleProcess?colo=*&start=2012-04-03T07:00Z
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/InstanceRunning.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceRunning.twiki b/docs/src/site/twiki/restapi/InstanceRunning.twiki
index 6b5ee66..116565f 100644
--- a/docs/src/site/twiki/restapi/InstanceRunning.twiki
+++ b/docs/src/site/twiki/restapi/InstanceRunning.twiki
@@ -18,7 +18,6 @@ List of instances currently running.
---+++ Rest Call
<verbatim>
GET http://localhost:15000/api/instance/running/process/SampleProcess?colo=*
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/InstanceStatus.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceStatus.twiki b/docs/src/site/twiki/restapi/InstanceStatus.twiki
index eddc2c8..99497d1 100644
--- a/docs/src/site/twiki/restapi/InstanceStatus.twiki
+++ b/docs/src/site/twiki/restapi/InstanceStatus.twiki
@@ -20,7 +20,6 @@ Status of the specified instance.
---+++ Rest Call
<verbatim>
GET http://localhost:15000/api/instance/status/process/SampleProcess?colo=*&start=2012-04-03T07:00Z
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/InstanceSuspend.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceSuspend.twiki b/docs/src/site/twiki/restapi/InstanceSuspend.twiki
index 62cf72b..8f5f7c3 100644
--- a/docs/src/site/twiki/restapi/InstanceSuspend.twiki
+++ b/docs/src/site/twiki/restapi/InstanceSuspend.twiki
@@ -19,7 +19,6 @@ Results of the suspend command.
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/instance/suspend/process/SampleProcess?colo=*&start=2012-04-03T07:00Z
-Remote-User: rgautam
</verbatim>
---+++ Result
<verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/docs/src/site/twiki/restapi/ResourceList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki
index b9ec4b6..6ca0dea 100644
--- a/docs/src/site/twiki/restapi/ResourceList.twiki
+++ b/docs/src/site/twiki/restapi/ResourceList.twiki
@@ -1,8 +1,32 @@
---+ RESTful Resources
+
+---++ Resource List
* <a href="#REST_Call_on_Entity_Resource">REST Call on Entity Resource</a>
* <a href="#REST_Call_on_Feed_and_Process_Instances">REST Call on Feed/Process Instances</a>
* <a href="#REST_Call_on_Admin_Resource">REST Call on Admin Resource</a>
+---++ Authentication
+
+When security is off (Pseudo/Simple), the authenticated user is the username specified in the user.name query
+parameter. If the user.name parameter is not set, the server may either set the authenticated user to a default web
+user, if there is any, or return an error response.
+
+When security is on (kerberos), authentication is performed by Kerberos SPNEGO.
+
+Below are examples using the curl command tool.
+
+Authentication when security is off (Pseudo/Simple):
+<verbatim>
+curl -i "http://<HOST>:<PORT>/<PATH>?[user.name=<USER>&]<PARAM>=..."
+</verbatim>
+
+Authentication using Kerberos SPNEGO when security is on:
+<verbatim>
+curl -i --negotiate -u : "http://<HOST>:<PORT>/<PATH>?<PARAM>=..."
+</verbatim>
+
+See also: [[../Security.twiki][Security in Falcon]]
+
---++ REST Call on Admin Resource
| *Call Type* | *Resource* | *Description* |
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index 6ca2134..d6dee77 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -38,6 +38,7 @@ import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Property;
import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
import org.apache.falcon.messaging.EntityInstanceMessage.EntityOps;
import org.apache.falcon.oozie.coordinator.ACTION;
@@ -45,6 +46,7 @@ import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.coordinator.SYNCDATASET;
import org.apache.falcon.oozie.coordinator.WORKFLOW;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
@@ -426,7 +428,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
propagateTableCopyProperties(srcCluster, sourceTableStorage,
trgCluster, targetTableStorage, props);
- setupHiveConfiguration(trgCluster, sourceTableStorage, targetTableStorage, wfPath);
+ setupHiveConfiguration(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, wfPath);
}
propagateLateDataProperties(feed, instancePaths, sourceStorage.getType().name(), props);
@@ -479,9 +481,11 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
props.put(prefix + "Partition", "${coord:dataInPartitionFilter('input', 'hive')}");
}
- private void setupHiveConfiguration(Cluster trgCluster, CatalogStorage sourceStorage,
- CatalogStorage targetStorage, Path wfPath) throws IOException {
- FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(trgCluster));
+ private void setupHiveConfiguration(Cluster srcCluster, CatalogStorage sourceStorage,
+ Cluster trgCluster, CatalogStorage targetStorage, Path wfPath)
+ throws IOException, FalconException {
+ Configuration conf = ClusterHelper.getConfiguration(trgCluster);
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
// copy import export scripts to stagingDir
Path scriptPath = new Path(wfPath, "scripts");
@@ -490,8 +494,8 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
// create hive conf to stagingDir
Path confPath = new Path(wfPath + "/conf");
- createHiveConf(fs, confPath, sourceStorage.getCatalogUrl(), "falcon-source-");
- createHiveConf(fs, confPath, targetStorage.getCatalogUrl(), "falcon-target-");
+ createHiveConf(fs, confPath, sourceStorage.getCatalogUrl(), srcCluster, "falcon-source-");
+ createHiveConf(fs, confPath, targetStorage.getCatalogUrl(), trgCluster, "falcon-target-");
}
private void copyHiveScript(FileSystem fs, Path scriptPath,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/feed/src/main/resources/config/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml
index 91d0285..7a95c35 100644
--- a/feed/src/main/resources/config/workflow/replication-workflow.xml
+++ b/feed/src/main/resources/config/workflow/replication-workflow.xml
@@ -218,6 +218,8 @@
<arg>${wf:id()}</arg>
<arg>-logDir</arg>
<arg>${logDir}/job-${nominalTime}/${srcClusterName}/</arg>
+ <arg>-workflowUser</arg>
+ <arg>${wf:user()}</arg>
<file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
<file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
<file>${wf:conf("falcon.libpath")}/jms.jar</file>
@@ -283,6 +285,8 @@
<arg>${wf:id()}</arg>
<arg>-logDir</arg>
<arg>${logDir}/job-${nominalTime}/${srcClusterName}/</arg>
+ <arg>-workflowUser</arg>
+ <arg>${wf:user()}</arg>
<file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
<file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
<file>${wf:conf("falcon.libpath")}/jms.jar</file>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/feed/src/main/resources/config/workflow/retention-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/retention-workflow.xml b/feed/src/main/resources/config/workflow/retention-workflow.xml
index 8b444f5..08795b4 100644
--- a/feed/src/main/resources/config/workflow/retention-workflow.xml
+++ b/feed/src/main/resources/config/workflow/retention-workflow.xml
@@ -107,6 +107,8 @@
<arg>${wf:conf("broker.ttlInMins")}</arg>
<arg>-cluster</arg>
<arg>${cluster}</arg>
+ <arg>-workflowUser</arg>
+ <arg>${wf:user()}</arg>
<file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
<file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
<file>${wf:conf("falcon.libpath")}/jms.jar</file>
@@ -119,8 +121,8 @@
</action>
<kill name="fail">
- <message>Workflow failed, error
- message[${wf:errorMessage(wf:lastErrorNode())}]
+ <message>
+ Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
</message>
</kill>
<end name='end'/>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index a37755b..a153462 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -40,6 +40,7 @@ import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.DECISION;
import org.apache.falcon.oozie.workflow.JAVA;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.CurrentUser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
@@ -82,6 +83,8 @@ public class OozieFeedMapperTest {
@BeforeClass
public void setUpDFS() throws Exception {
+ CurrentUser.authenticate("falcon");
+
srcMiniDFS = EmbeddedCluster.newCluster("cluster1");
String srcHdfsUrl = srcMiniDFS.getConf().get("fs.default.name");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/hadoop-webapp/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-webapp/pom.xml b/hadoop-webapp/pom.xml
index e576310..846a43e 100644
--- a/hadoop-webapp/pom.xml
+++ b/hadoop-webapp/pom.xml
@@ -161,11 +161,6 @@
<!-- Oozie dependencies -->
<dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- </dependency>
-
- <dependency>
<groupId>org.apache.hcatalog</groupId>
<artifactId>webhcat-java-client</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/html5-ui/js/falcon.js
----------------------------------------------------------------------
diff --git a/html5-ui/js/falcon.js b/html5-ui/js/falcon.js
index ff3a929..0a75f6a 100644
--- a/html5-ui/js/falcon.js
+++ b/html5-ui/js/falcon.js
@@ -17,21 +17,26 @@
(function(exports) {
"use strict";
- var USER_ID = 'admin';
+ var USER_ID = 'dashboard';
function onError (msg) {
alert(msg);
}
function ajax_impl(options) {
- $.extend(options, {'headers': { 'Remote-User': USER_ID }});
+ // $.extend(options, add_user(options.url));
return $.ajax(options);
}
+ function add_user(url) {
+ var paramSeparator = (url.indexOf('?') != -1) ? '&' : '?';
+ return url + paramSeparator + 'user.name=' + USER_ID;
+ }
+
function getJson_impl(url, success) {
return ajax_impl({
'dataType': 'json',
- 'url': url,
+ 'url': add_user(url),
'success': success
});
}
@@ -39,7 +44,7 @@
function getText_impl(url, success) {
return ajax_impl({
'dataType': 'text',
- 'url': url,
+ 'url': add_user(url),
'success': success
});
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/messaging/pom.xml
----------------------------------------------------------------------
diff --git a/messaging/pom.xml b/messaging/pom.xml
index 9aa5347..aa5765c 100644
--- a/messaging/pom.xml
+++ b/messaging/pom.xml
@@ -82,6 +82,11 @@
<dependencies>
<dependency>
<groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
<artifactId>falcon-test-util</artifactId>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
index eb49fd5..d8ba4f3 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
@@ -70,7 +70,8 @@ public class EntityInstanceMessage {
topicName("topicName"),
status("status"),
brokerTTL("broker.ttlInMins"),
- cluster("cluster");
+ cluster("cluster"),
+ workflowUser("workflowUser");
private String propName;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java
index ecda5eb..c8ea12d 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java
@@ -41,18 +41,16 @@ public class EntityInstanceMessageCreator {
public Message createMessage(Session session) throws JMSException {
mapMessage = session.createMapMessage();
- for (Entry<ARG, String> entry : instanceMessage.getKeyValueMap()
- .entrySet()) {
- mapMessage.setString(entry.getKey().getArgName(), instanceMessage
- .getKeyValueMap().get(entry.getKey()));
+ for (Entry<ARG, String> entry : instanceMessage.getKeyValueMap().entrySet()) {
+ mapMessage.setString(entry.getKey().getArgName(),
+ instanceMessage.getKeyValueMap().get(entry.getKey()));
}
- return mapMessage;
+ return mapMessage;
}
@Override
public String toString() {
return this.mapMessage.toString();
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
index b37931c..0f9e918 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
@@ -65,7 +65,6 @@ public class MessageProducer extends Configured implements Tool {
producer.setTimeToLive(messageTTL);
producer.send(new EntityInstanceMessageCreator(entityInstanceMessage)
.createMessage(session));
-
}
public static void main(String[] args) throws Exception {
@@ -124,6 +123,8 @@ public class MessageProducer extends Configured implements Tool {
"workflow id"));
addOption(options, new Option(ARG.cluster.getArgName(), true,
"cluster name"));
+ addOption(options, new Option(ARG.workflowUser.getArgName(), true,
+ "workflow user id"), false);
return new GnuParser().parse(options, arguments);
}
@@ -152,14 +153,12 @@ public class MessageProducer extends Configured implements Tool {
return 0;
}
- MessageProducer falconMessageProducer = new MessageProducer();
try {
- falconMessageProducer.createAndStartConnection(
- cmd.getOptionValue(ARG.brokerImplClass.name()), "",
+ createAndStartConnection(cmd.getOptionValue(ARG.brokerImplClass.name()), "",
"", cmd.getOptionValue(ARG.brokerUrl.name()));
for (EntityInstanceMessage message : entityInstanceMessage) {
LOG.info("Sending message:" + message.getKeyValueMap());
- falconMessageProducer.sendMessage(message);
+ sendMessage(message);
}
} catch (JMSException e) {
LOG.error("Error in getConnection:", e);
@@ -167,8 +166,8 @@ public class MessageProducer extends Configured implements Tool {
LOG.error("Error in getConnection:", e);
} finally {
try {
- if (falconMessageProducer.connection != null) {
- falconMessageProducer.connection.close();
+ if (connection != null) {
+ connection.close();
}
} catch (JMSException e) {
LOG.error("Error in closing connection:", e);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
index da126c7..3f0c664 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
@@ -37,8 +37,6 @@ import javax.jms.*;
public class FalconTopicProducerTest {
private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
- // private static final String BROKER_URL =
- // "tcp://localhost:61616?daemon=true";
private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
private static final String TOPIC_NAME = "FALCON.ENTITY.TOPIC";
private static final String SECONDARY_TOPIC_NAME = "FALCON.ENTITY.SEC.TOPIC";
@@ -140,6 +138,7 @@ public class FalconTopicProducerTest {
private List<String> createCommonArgs() {
return new ArrayList<String>(Arrays.asList(
"-" + ARG.workflowId.getArgName(), "workflow-01-00",
+ "-" + ARG.workflowUser.getArgName(), "falcon",
"-" + ARG.runId.getArgName(), "1",
"-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
"-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
@@ -169,6 +168,7 @@ public class FalconTopicProducerTest {
}
};
t.start();
+ Thread.sleep(100);
for (String[] message : messages) {
new MessageProducer().run(message);
}
@@ -203,6 +203,8 @@ public class FalconTopicProducerTest {
"agg-coord");
Assert.assertEquals(m.getString(ARG.workflowId.getArgName()),
"workflow-01-00");
+ Assert.assertEquals(m.getString(ARG.workflowUser.getArgName()),
+ "falcon");
Assert.assertEquals(m.getString(ARG.runId.getArgName()), "1");
Assert.assertEquals(m.getString(ARG.nominalTime.getArgName()),
"2011-01-01T01:00Z");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
index e707567..57ccdc5 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
@@ -73,6 +73,7 @@ public class FeedProducerTest {
"-" + ARG.feedInstancePaths.getArgName(),
"/click-logs/10/05/05/00/20",
"-" + ARG.workflowId.getArgName(), "workflow-01-00",
+ "-" + ARG.workflowUser.getArgName(), "falcon",
"-" + ARG.runId.getArgName(), "1",
"-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
"-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
@@ -200,6 +201,8 @@ public class FeedProducerTest {
Assert.assertEquals(m.getString(ARG.operation.getArgName()), "DELETE");
Assert.assertEquals(m.getString(ARG.workflowId.getArgName()),
"workflow-01-00");
+ Assert.assertEquals(m.getString(ARG.workflowUser.getArgName()),
+ "falcon");
Assert.assertEquals(m.getString(ARG.runId.getArgName()), "1");
Assert.assertEquals(m.getString(ARG.nominalTime.getArgName()),
"2011-01-01T01:00Z");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
index 3a40e76..9f8b07f 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
@@ -35,8 +35,6 @@ public class ProcessProducerTest {
private String[] args;
private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
- // private static final String BROKER_URL =
- // "tcp://localhost:61616?daemon=true";
private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
private static final String TOPIC_NAME = "FALCON.PROCESS";
private BrokerService broker;
@@ -50,6 +48,7 @@ public class ProcessProducerTest {
"-" + ARG.feedInstancePaths.getArgName(),
"/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
"-" + ARG.workflowId.getArgName(), "workflow-01-00",
+ "-" + ARG.workflowUser.getArgName(), "falcon",
"-" + ARG.runId.getArgName(), "1",
"-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
"-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
@@ -139,6 +138,8 @@ public class ProcessProducerTest {
TOPIC_NAME);
Assert.assertEquals(m.getString(ARG.workflowId.getArgName()),
"workflow-01-00");
+ Assert.assertEquals(m.getString(ARG.workflowUser.getArgName()),
+ "falcon");
Assert.assertEquals(m.getString(ARG.runId.getArgName()), "1");
Assert.assertEquals(m.getString(ARG.nominalTime.getArgName()),
"2011-01-01T01:00Z");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
index 275a725..0b680ba 100644
--- a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
+++ b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
@@ -38,6 +38,7 @@ public final class GenericAlert {
@Dimension(value = "entity-name") String entityName,
@Dimension(value = "nominal-name") String nominalTime,
@Dimension(value = "wf-id") String wfId,
+ @Dimension(value = "wf-user") String workflowUser,
@Dimension(value = "run-id") String runId,
@Dimension(value = "error-message") String message) {
return "IGNORE";
@@ -49,6 +50,7 @@ public final class GenericAlert {
@Dimension(value = "entity-name") String entityName,
@Dimension(value = "nominal-name") String nominalTime,
@Dimension(value = "wf-id") String wfId,
+ @Dimension(value = "wf-user") String workflowUser,
@Dimension(value = "run-id") String runId,
@Dimension(value = "error-message") String message) {
return "IGNORE";
@@ -62,6 +64,7 @@ public final class GenericAlert {
@Dimension(value = "entity-name") String entityName,
@Dimension(value = "nominal-time") String nominalTime,
@Dimension(value = "wf-id") String workflowId,
+ @Dimension(value = "wf-user") String workflowUser,
@Dimension(value = "run-id") String runId,
@Dimension(value = "operation") String operation,
@Dimension(value = "start-time") String startTime,
@@ -80,6 +83,7 @@ public final class GenericAlert {
@Dimension(value = "entity-name") String entityName,
@Dimension(value = "nominal-time") String nominalTime,
@Dimension(value = "wf-id") String workflowId,
+ @Dimension(value = "wf-user") String workflowUser,
@Dimension(value = "run-id") String runId,
@Dimension(value = "operation") String operation,
@Dimension(value = "start-time") String startTime,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
index cecdeef..0762514 100644
--- a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
+++ b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
@@ -30,6 +30,7 @@ import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Property;
+import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
import org.apache.falcon.oozie.bundle.COORDINATOR;
@@ -37,6 +38,7 @@ import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.coordinator.ObjectFactory;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.service.FalconPathFilter;
import org.apache.falcon.service.SharedLibraryHostingService;
import org.apache.falcon.util.RuntimeProperties;
@@ -45,8 +47,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
import org.apache.oozie.client.OozieClient;
@@ -141,7 +145,7 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
Path coordPath = getCoordPath(bundlePath, coordinatorapp.getName());
String coordXmlName = marshal(cluster, coordinatorapp, coordPath,
EntityUtil.getWorkflowNameSuffix(coordinatorapp.getName(), entity));
- createTempDir(cluster, coordPath);
+ createLogsDir(cluster, coordPath);
COORDINATOR bundleCoord = new COORDINATOR();
bundleCoord.setName(coordinatorapp.getName());
bundleCoord.setAppPath(getStoragePath(coordPath) + "/" + coordXmlName);
@@ -192,9 +196,9 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
}
protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, EntityType type, String lifecycle)
- throws IOException {
+ throws IOException, FalconException {
String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
- FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
addExtensionJars(fs, new Path(libext), wf);
addExtensionJars(fs, new Path(libext, type.name()), wf);
if (StringUtils.isNotEmpty(lifecycle)) {
@@ -208,7 +212,6 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
libPath, cluster, FALCON_JAR_FILTER);
} catch (IOException e) {
- LOG.error("Failed to copy shared libs on cluster " + cluster.getName(), e);
throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);
}
}
@@ -286,11 +289,11 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
protected void marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
throws FalconException {
-
try {
Marshaller marshaller = jaxbContext.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
- FileSystem fs = outPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(
+ outPath.toUri(), ClusterHelper.getConfiguration(cluster));
OutputStream out = fs.create(outPath);
try {
marshaller.marshal(jaxbElement, out);
@@ -310,12 +313,16 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
}
}
- private void createTempDir(Cluster cluster, Path coordPath) throws FalconException {
+ private void createLogsDir(Cluster cluster, Path coordPath) throws FalconException {
try {
- FileSystem fs = coordPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
- Path tempDir = new Path(coordPath, "../../logs");
- fs.mkdirs(tempDir);
- fs.setPermission(tempDir, new FsPermission((short) 511));
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(
+ coordPath.toUri(), ClusterHelper.getConfiguration(cluster));
+ Path logsDir = new Path(coordPath, "../../logs");
+ fs.mkdirs(logsDir);
+
+ // logs are copied with in oozie as the user in Post Processing and hence 777 permissions
+ FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+ fs.setPermission(logsDir, permission);
} catch (Exception e) {
throw new FalconException("Unable to create temp dir in " + coordPath, e);
}
@@ -334,8 +341,7 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
BUNDLE_JAXB_CONTEXT,
- new Path(
- outPath, "bundle.xml"));
+ new Path(outPath, "bundle.xml"));
}
protected void marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
@@ -394,11 +400,17 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
}
protected void createHiveConf(FileSystem fs, Path confPath, String metastoreUrl,
- String prefix) throws IOException {
+ Cluster cluster, String prefix) throws IOException {
Configuration hiveConf = new Configuration(false);
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUrl);
hiveConf.set("hive.metastore.local", "false");
+ if (UserGroupInformation.isSecurityEnabled()) {
+ hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
+ ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL));
+ hiveConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
+ }
+
OutputStream out = null;
try {
out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
index d544311..92b90e7 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
@@ -46,7 +46,7 @@ import java.net.URLConnection;
import java.util.List;
/**
- * Utitlity called in the post process of oozie workflow to move oozie action executor log.
+ * Utility called in the post process of oozie workflow to move oozie action executor log.
*/
public class LogMover extends Configured implements Tool {