You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/02/16 04:31:26 UTC
[5/5] git commit: FALCON-11 Add support for security in Falcon.
Contributed by Venkatesh Seetharam
FALCON-11 Add support for security in Falcon. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/3c51f105
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/3c51f105
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/3c51f105
Branch: refs/heads/master
Commit: 3c51f1053a3524557d1e1e58f0a4fc778431e75c
Parents: 2cb42df
Author: Venkatesh Seetharam <ve...@hortonworks.com>
Authored: Sat Feb 15 19:30:01 2014 -0800
Committer: Venkatesh Seetharam <ve...@hortonworks.com>
Committed: Sat Feb 15 19:30:01 2014 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
client/pom.xml | 24 +
.../java/org/apache/falcon/cli/FalconCLI.java | 43 +-
.../org/apache/falcon/client/FalconClient.java | 159 +++---
common/pom.xml | 5 +
.../falcon/catalog/AbstractCatalogService.java | 19 +-
.../falcon/catalog/HiveCatalogService.java | 88 ++-
.../falcon/cleanup/AbstractCleanupHandler.java | 19 +-
.../falcon/cleanup/FeedCleanupHandler.java | 2 +-
.../apache/falcon/entity/CatalogStorage.java | 6 -
.../org/apache/falcon/entity/ClusterHelper.java | 24 +-
.../org/apache/falcon/entity/EntityUtil.java | 3 +-
.../apache/falcon/entity/FileSystemStorage.java | 6 -
.../java/org/apache/falcon/entity/Storage.java | 9 -
.../entity/parser/ClusterEntityParser.java | 50 +-
.../falcon/entity/parser/FeedEntityParser.java | 29 +-
.../entity/parser/ProcessEntityParser.java | 13 +-
.../falcon/entity/store/ConfigurationStore.java | 27 +-
.../falcon/hadoop/HadoopClientFactory.java | 202 +++++++
.../AuthenticationInitializationService.java | 122 ++++
.../org/apache/falcon/security/CurrentUser.java | 3 +-
.../falcon/security/FalconLoginModule.java | 89 ---
.../security/FalconSecurityConfiguration.java | 52 --
.../falcon/security/SecurityConstants.java | 38 --
.../apache/falcon/security/SecurityUtil.java | 102 ++++
.../org/apache/falcon/update/UpdateHelper.java | 10 +-
common/src/main/resources/startup.properties | 64 ++-
.../apache/falcon/entity/AbstractTestBase.java | 13 +-
.../falcon/entity/FileSystemStorageTest.java | 12 -
.../falcon/hadoop/HadoopClientFactoryTest.java | 101 ++++
...AuthenticationInitializationServiceTest.java | 142 +++++
.../falcon/security/SecurityUtilTest.java | 69 +++
docs/src/site/twiki/Security.twiki | 193 +++++++
docs/src/site/twiki/index.twiki | 3 +-
docs/src/site/twiki/restapi/AdminConfig.twiki | 1 -
docs/src/site/twiki/restapi/AdminStack.twiki | 1 -
docs/src/site/twiki/restapi/AdminVersion.twiki | 1 -
.../site/twiki/restapi/EntityDefinition.twiki | 1 -
docs/src/site/twiki/restapi/EntityDelete.twiki | 1 -
.../site/twiki/restapi/EntityDependencies.twiki | 1 -
docs/src/site/twiki/restapi/EntityList.twiki | 2 -
docs/src/site/twiki/restapi/EntityResume.twiki | 1 -
.../src/site/twiki/restapi/EntitySchedule.twiki | 1 -
docs/src/site/twiki/restapi/EntityStatus.twiki | 1 -
docs/src/site/twiki/restapi/EntitySubmit.twiki | 4 +-
.../twiki/restapi/EntitySubmitAndSchedule.twiki | 1 -
docs/src/site/twiki/restapi/EntitySuspend.twiki | 1 -
docs/src/site/twiki/restapi/EntityUpdate.twiki | 1 -
.../src/site/twiki/restapi/EntityValidate.twiki | 6 +-
docs/src/site/twiki/restapi/InstanceKill.twiki | 1 -
docs/src/site/twiki/restapi/InstanceLogs.twiki | 1 -
docs/src/site/twiki/restapi/InstanceRerun.twiki | 1 -
.../src/site/twiki/restapi/InstanceResume.twiki | 1 -
.../site/twiki/restapi/InstanceRunning.twiki | 1 -
.../src/site/twiki/restapi/InstanceStatus.twiki | 1 -
.../site/twiki/restapi/InstanceSuspend.twiki | 1 -
docs/src/site/twiki/restapi/ResourceList.twiki | 24 +
.../falcon/converter/OozieFeedMapper.java | 16 +-
.../config/workflow/replication-workflow.xml | 4 +
.../config/workflow/retention-workflow.xml | 6 +-
.../falcon/converter/OozieFeedMapperTest.java | 3 +
hadoop-webapp/pom.xml | 5 -
html5-ui/js/falcon.js | 13 +-
messaging/pom.xml | 5 +
.../falcon/messaging/EntityInstanceMessage.java | 3 +-
.../messaging/EntityInstanceMessageCreator.java | 10 +-
.../falcon/messaging/MessageProducer.java | 13 +-
.../messaging/FalconTopicProducerTest.java | 6 +-
.../falcon/messaging/FeedProducerTest.java | 3 +
.../falcon/messaging/ProcessProducerTest.java | 5 +-
.../org/apache/falcon/aspect/GenericAlert.java | 4 +
.../converter/AbstractOozieEntityMapper.java | 40 +-
.../org/apache/falcon/logging/LogMover.java | 2 +-
.../org/apache/falcon/logging/LogProvider.java | 20 +-
.../service/SharedLibraryHostingService.java | 19 +-
.../falcon/workflow/FalconPostProcessing.java | 15 +-
.../workflow/engine/OozieClientFactory.java | 24 +-
.../engine/OozieHouseKeepingService.java | 3 +-
.../workflow/engine/OozieWorkflowEngine.java | 89 ++-
.../apache/oozie/client/CustomOozieClient.java | 101 ----
.../apache/oozie/client/ProxyOozieClient.java | 562 +++++++++++++++++++
.../workflow/FalconPostProcessingTest.java | 5 +
pom.xml | 17 +-
.../falcon/resource/channel/HTTPChannel.java | 24 +-
.../apache/falcon/security/BasicAuthFilter.java | 186 ++++--
...eUserInHeaderBasedAuthenticationHandler.java | 49 ++
.../falcon/service/FalconTopicSubscriber.java | 67 +--
.../service/ProcessSubscriberService.java | 4 +-
.../apache/falcon/aspect/GenericAlertTest.java | 4 +-
.../service/FalconTopicSubscriberTest.java | 8 +-
.../falcon/converter/OozieProcessMapper.java | 43 +-
.../config/workflow/process-parent-workflow.xml | 4 +
.../converter/OozieProcessMapperTest.java | 3 +
.../apache/falcon/latedata/LateDataHandler.java | 22 +-
.../apache/falcon/rerun/event/LaterunEvent.java | 9 +-
.../apache/falcon/rerun/event/RerunEvent.java | 8 +-
.../falcon/rerun/event/RerunEventFactory.java | 4 +-
.../apache/falcon/rerun/event/RetryEvent.java | 6 +-
.../rerun/handler/AbstractRerunConsumer.java | 8 +-
.../rerun/handler/AbstractRerunHandler.java | 7 +-
.../falcon/rerun/handler/LateRerunConsumer.java | 26 +-
.../falcon/rerun/handler/LateRerunHandler.java | 18 +-
.../falcon/rerun/handler/RetryConsumer.java | 6 +-
.../falcon/rerun/handler/RetryHandler.java | 15 +-
.../falcon/rerun/queue/InMemoryQueue.java | 5 +-
.../rerun/handler/TestLateRerunHandler.java | 2 +-
.../apache/falcon/rerun/queue/ActiveMQTest.java | 2 +-
.../falcon/rerun/queue/InMemoryQueueTest.java | 6 +-
src/bin/falcon | 2 +-
src/conf/log4j.xml | 29 +
src/conf/startup.properties | 68 ++-
.../falcon/cluster/util/EmbeddedCluster.java | 10 +-
webapp/pom.xml | 5 +
webapp/src/conf/oozie/conf/oozie-site.xml | 44 +-
webapp/src/main/resources/log4j.xml | 29 +
.../falcon/catalog/HiveCatalogServiceIT.java | 16 +-
.../java/org/apache/falcon/cli/FalconCLIIT.java | 72 +--
.../org/apache/falcon/cli/FalconCLISmokeIT.java | 18 +-
.../apache/falcon/late/LateDataHandlerIT.java | 2 +-
.../lifecycle/FileSystemFeedReplicationIT.java | 40 +-
.../lifecycle/TableStorageFeedEvictorIT.java | 1 +
.../TableStorageFeedReplicationIT.java | 20 +-
.../org/apache/falcon/process/PigProcessIT.java | 12 +-
.../falcon/process/TableStorageProcessIT.java | 16 +-
.../falcon/resource/EntityManagerJerseyIT.java | 235 ++++----
.../resource/EntityManagerJerseySmokeIT.java | 36 +-
.../resource/ProcessInstanceManagerIT.java | 33 +-
.../org/apache/falcon/resource/TestContext.java | 258 +++------
.../falcon/security/BasicAuthFilterTest.java | 92 ++-
.../org/apache/falcon/util/OozieTestUtils.java | 104 +++-
.../util/ResourcesReflectionUtilTest.java | 2 +-
.../validation/ClusterEntityValidationIT.java | 4 +-
.../validation/FeedEntityValidationIT.java | 2 +-
133 files changed, 3123 insertions(+), 1353 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0462902..ecab28a 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@ Apache Falcon (incubating) Change log
Trunk (Unreleased)
INCOMPATIBLE CHANGES
+ FALCON-11 Add support for security in Falcon (Venkatesh Seetharam)
NEW FEATURES
FALCON-281 Design Action Interface. (Srikanth Sundarrajan)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index a43f7f5..63c4cce 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -32,6 +32,30 @@
<name>Apache Falcon CLI client</name>
<packaging>jar</packaging>
+ <profiles>
+ <profile>
+ <id>hadoop-1</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>hadoop-2</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
<dependencies>
<dependency>
<groupId>commons-cli</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index aa712ad..a414e32 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -138,12 +138,18 @@ public class FalconCLI {
int exitValue = 0;
if (command.getName().equals(HELP_CMD)) {
parser.showHelp();
- } else if (command.getName().equals(ADMIN_CMD)) {
- exitValue = adminCommand(command.getCommandLine());
- } else if (command.getName().equals(ENTITY_CMD)) {
- entityCommand(command.getCommandLine());
- } else if (command.getName().equals(INSTANCE_CMD)) {
- instanceCommand(command.getCommandLine());
+ } else {
+ CommandLine commandLine = command.getCommandLine();
+ String falconUrl = getFalconEndpoint(commandLine);
+ FalconClient client = new FalconClient(falconUrl);
+
+ if (command.getName().equals(ADMIN_CMD)) {
+ exitValue = adminCommand(commandLine, client, falconUrl);
+ } else if (command.getName().equals(ENTITY_CMD)) {
+ entityCommand(commandLine, client);
+ } else if (command.getName().equals(INSTANCE_CMD)) {
+ instanceCommand(commandLine, client);
+ }
}
return exitValue;
@@ -167,10 +173,8 @@ public class FalconCLI {
}
}
- private void instanceCommand(CommandLine commandLine) throws FalconCLIException, IOException {
- String falconUrl = getFalconEndpoint(commandLine);
- FalconClient client = new FalconClient(falconUrl);
-
+ private void instanceCommand(CommandLine commandLine, FalconClient client)
+ throws FalconCLIException, IOException {
Set<String> optionsList = new HashSet<String>();
for (Option option : commandLine.getOptions()) {
optionsList.add(option.getOpt());
@@ -257,12 +261,8 @@ public class FalconCLI {
}
}
- private void entityCommand(CommandLine commandLine)
+ private void entityCommand(CommandLine commandLine, FalconClient client)
throws FalconCLIException, IOException {
-
- String falconUrl = getFalconEndpoint(commandLine);
- FalconClient client = new FalconClient(falconUrl);
-
Set<String> optionsList = new HashSet<String>();
for (Option option : commandLine.getOptions()) {
optionsList.add(option.getOpt());
@@ -395,9 +395,12 @@ public class FalconCLI {
"show the current system status");
Option version = new Option(VERSION_OPTION, false,
"show Falcon server build version");
+ Option stack = new Option(STACK_OPTION, false,
+ "show the thread stack dump");
Option help = new Option("help", false, "show Falcon help");
group.addOption(status);
group.addOption(version);
+ group.addOption(stack);
group.addOption(help);
adminOptions.addOptionGroup(group);
@@ -587,11 +590,9 @@ public class FalconCLI {
return url;
}
- private int adminCommand(CommandLine commandLine) throws FalconCLIException, IOException {
+ private int adminCommand(CommandLine commandLine, FalconClient client,
+ String falconUrl) throws FalconCLIException, IOException {
String result;
- String falconUrl = getFalconEndpoint(commandLine);
- FalconClient client = new FalconClient(falconUrl);
-
Set<String> optionsList = new HashSet<String>();
for (Option option : commandLine.getOptions()) {
optionsList.add(option.getOpt());
@@ -603,9 +604,8 @@ public class FalconCLI {
}
int exitValue = 0;
if (optionsList.contains(STATUS_OPTION)) {
- int status = 0;
try {
- status = client.getStatus();
+ int status = client.getStatus();
if (status != 200) {
ERR.get().println("Falcon server is not fully operational (on " + falconUrl + "). "
+ "Please check log files.");
@@ -623,6 +623,7 @@ public class FalconCLI {
} else if (optionsList.contains(HELP_CMD)) {
OUT.get().println("Falcon Help");
}
+
return exitValue;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 367fcc5..a5c31c2 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -27,6 +27,9 @@ import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.InstancesResult;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
+import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
import org.apache.falcon.resource.InstancesSummaryResult;
import javax.ws.rs.HttpMethod;
@@ -41,6 +44,7 @@ import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
+import java.net.URL;
import java.util.Date;
import java.util.Map;
import java.util.Properties;
@@ -51,55 +55,83 @@ import java.util.Properties;
*/
public class FalconClient {
- private final WebResource service;
public static final String WS_HEADER_PREFIX = "header:";
- private static final String REMOTE_USER = "Remote-User";
- private static final String USER = System.getProperty("user.name");
+ public static final String USER = System.getProperty("user.name");
+ public static final String AUTH_URL = "api/options?" + PseudoAuthenticator.USER_NAME + "=" + USER;
+
private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters";
/**
+ * Name of the HTTP cookie used for the authentication token between the client and the server.
+ */
+ public static final String AUTH_COOKIE = "hadoop.auth";
+ private static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "=";
+ private static final KerberosAuthenticator AUTHENTICATOR = new KerberosAuthenticator();
+
+ private final WebResource service;
+ private final AuthenticatedURL.Token authenticationToken;
+
+ /**
* Create a Falcon client instance.
*
* @param falconUrl of the server to which client interacts
- * @throws IOException
+ * @throws FalconCLIException
*/
- public FalconClient(String falconUrl) throws IOException {
+ public FalconClient(String falconUrl) throws FalconCLIException {
String baseUrl = notEmpty(falconUrl, "FalconUrl");
if (!baseUrl.endsWith("/")) {
baseUrl += "/";
}
+
Client client = Client.create(new DefaultClientConfig());
setFalconTimeOut(client);
service = client.resource(UriBuilder.fromUri(baseUrl).build());
- client.resource(UriBuilder.fromUri(baseUrl).build());
- // addHeaders();
+ authenticationToken = getToken(baseUrl);
}
- private void setFalconTimeOut(Client client) throws IOException {
- Properties prop = new Properties();
- int readTimeout;
- int connectTimeout;
- InputStream inputStream = null;
+ private void setFalconTimeOut(Client client) throws FalconCLIException {
try {
- inputStream = FalconClient.class.getResourceAsStream("/client.properties");
- if (inputStream != null) {
- prop.load(inputStream);
- readTimeout = prop.containsKey("falcon.read.timeout") ? Integer
- .parseInt(prop.getProperty("falcon.read.timeout")) : 180000;
- connectTimeout = prop.containsKey("falcon.connect.timeout") ? Integer
- .parseInt(prop.getProperty("falcon.connect.timeout"))
- : 180000;
- } else {
- readTimeout = 180000;
- connectTimeout = 180000;
+ Properties prop = new Properties();
+ int readTimeout;
+ int connectTimeout;
+ InputStream inputStream = null;
+ try {
+ inputStream = FalconClient.class.getResourceAsStream("/client.properties");
+ if (inputStream != null) {
+ prop.load(inputStream);
+ readTimeout = prop.containsKey("falcon.read.timeout") ? Integer
+ .parseInt(prop.getProperty("falcon.read.timeout")) : 180000;
+ connectTimeout = prop.containsKey("falcon.connect.timeout") ? Integer
+ .parseInt(prop.getProperty("falcon.connect.timeout"))
+ : 180000;
+ } else {
+ readTimeout = 180000;
+ connectTimeout = 180000;
+ }
+ } finally {
+ IOUtils.closeQuietly(inputStream);
}
- } finally {
- IOUtils.closeQuietly(inputStream);
+ client.setConnectTimeout(connectTimeout);
+ client.setReadTimeout(readTimeout);
+ } catch (IOException e) {
+ throw new FalconCLIException("An error occurred while reading client.properties file.", e);
}
- client.setConnectTimeout(connectTimeout);
- client.setReadTimeout(readTimeout);
+ }
+
+ public static AuthenticatedURL.Token getToken(String baseUrl) throws FalconCLIException {
+ AuthenticatedURL.Token currentToken = new AuthenticatedURL.Token();
+ try {
+ URL url = new URL(baseUrl + AUTH_URL);
+ // using KerberosAuthenticator which falls back to PsuedoAuthenticator
+ // instead of passing authentication type from the command line - bad factory
+ new AuthenticatedURL(AUTHENTICATOR).openConnection(url, currentToken);
+ } catch (Exception ex) {
+ throw new FalconCLIException("Could not authenticate, " + ex.getMessage(), ex);
+ }
+
+ return currentToken;
}
/**
@@ -234,10 +266,11 @@ public class FalconClient {
if (effectiveTime != null) {
resource = resource.queryParam("time", SchemaHelper.formatDateUTC(effectiveTime));
}
- ClientResponse clientResponse = resource.header(REMOTE_USER, USER)
+ ClientResponse clientResponse = resource
+ .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(operation.mimeType).type(MediaType.TEXT_XML)
.method(operation.method, ClientResponse.class, entityStream);
- checkIfSuccessfull(clientResponse);
+ checkIfSuccessful(clientResponse);
return parseAPIResult(clientResponse);
}
@@ -253,7 +286,6 @@ public class FalconClient {
throws FalconCLIException {
return sendEntityRequest(Entities.STATUS, entityType, entityName, colo);
-
}
public String getDefinition(String entityType, String entityName)
@@ -261,7 +293,6 @@ public class FalconClient {
return sendDefinitionRequest(Entities.DEFINITION, entityType,
entityName);
-
}
public String getDependency(String entityType, String entityName)
@@ -378,8 +409,9 @@ public class FalconClient {
public int getStatus() throws FalconCLIException {
AdminOperations job = AdminOperations.VERSION;
ClientResponse clientResponse = service.path(job.path)
- .header(REMOTE_USER, USER).accept(job.mimeType)
- .type(MediaType.TEXT_PLAIN).method(job.method, ClientResponse.class);
+ .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+ .accept(job.mimeType).type(MediaType.TEXT_PLAIN)
+ .method(job.method, ClientResponse.class);
return clientResponse.getStatus();
}
@@ -443,11 +475,12 @@ public class FalconClient {
if (colo != null) {
resource = resource.queryParam("colo", colo);
}
- ClientResponse clientResponse = resource.header(REMOTE_USER, USER)
+ ClientResponse clientResponse = resource
+ .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(entities.mimeType).type(MediaType.TEXT_XML)
.method(entities.method, ClientResponse.class);
- checkIfSuccessfull(clientResponse);
+ checkIfSuccessful(clientResponse);
return parseAPIResult(clientResponse);
}
@@ -455,24 +488,26 @@ public class FalconClient {
private String sendDefinitionRequest(Entities entities, String entityType,
String entityName) throws FalconCLIException {
- ClientResponse clientResponse = service.path(entities.path)
- .path(entityType).path(entityName).header(REMOTE_USER, USER)
+ ClientResponse clientResponse = service
+ .path(entities.path).path(entityType).path(entityName)
+ .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(entities.mimeType).type(MediaType.TEXT_XML)
.method(entities.method, ClientResponse.class);
- checkIfSuccessfull(clientResponse);
+ checkIfSuccessful(clientResponse);
return clientResponse.getEntity(String.class);
}
private String sendDependencyRequest(Entities entities, String entityType,
String entityName) throws FalconCLIException {
- ClientResponse clientResponse = service.path(entities.path)
- .path(entityType).path(entityName).header(REMOTE_USER, USER)
+ ClientResponse clientResponse = service
+ .path(entities.path).path(entityType).path(entityName)
+ .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(entities.mimeType).type(MediaType.TEXT_XML)
.method(entities.method, ClientResponse.class);
- checkIfSuccessfull(clientResponse);
+ checkIfSuccessful(clientResponse);
return parseEntityList(clientResponse);
}
@@ -480,12 +515,13 @@ public class FalconClient {
private String sendListRequest(Entities entities, String entityType)
throws FalconCLIException {
- ClientResponse clientResponse = service.path(entities.path)
- .path(entityType).header(REMOTE_USER, USER)
+ ClientResponse clientResponse = service
+ .path(entities.path).path(entityType)
+ .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(entities.mimeType).type(MediaType.TEXT_XML)
.method(entities.method, ClientResponse.class);
- checkIfSuccessfull(clientResponse);
+ checkIfSuccessful(clientResponse);
return parseEntityList(clientResponse);
}
@@ -497,29 +533,16 @@ public class FalconClient {
if (colo != null) {
resource = resource.queryParam("colo", colo);
}
- ClientResponse clientResponse = resource.header(REMOTE_USER, USER)
+ ClientResponse clientResponse = resource
+ .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(entities.mimeType).type(MediaType.TEXT_XML)
.method(entities.method, ClientResponse.class, requestObject);
- checkIfSuccessfull(clientResponse);
+ checkIfSuccessful(clientResponse);
return parseAPIResult(clientResponse);
}
- public InstancesResult instanceCmd(Instances instances, String type, String name,
- String start, String end, String colo) {
- WebResource resource = service.path(instances.path).path(type).path(name);
- resource = resource.queryParam("start", start);
- if (end != null) {
- resource = resource.queryParam("end", end);
- }
- resource = resource.queryParam("colo", colo);
-
- return resource.header(REMOTE_USER, USER)
- .accept(instances.mimeType)
- .method(instances.method, InstancesResult.class);
- }
-
//SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
private String sendInstanceRequest(Instances instances, String type,
String entity, String start, String end, InputStream props,
@@ -541,15 +564,17 @@ public class FalconClient {
ClientResponse clientResponse;
if (props == null) {
- clientResponse = resource.header(REMOTE_USER, USER)
+ clientResponse = resource
+ .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(instances.mimeType)
.method(instances.method, ClientResponse.class);
} else {
- clientResponse = resource.header(REMOTE_USER, USER)
+ clientResponse = resource
+ .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(instances.mimeType)
.method(instances.method, ClientResponse.class, props);
}
- checkIfSuccessfull(clientResponse);
+ checkIfSuccessful(clientResponse);
if (instances.name().equals("LOG")) {
return parseProcessInstanceResultLogs(clientResponse, runid);
@@ -566,8 +591,10 @@ public class FalconClient {
throws FalconCLIException {
ClientResponse clientResponse = service.path(job.path)
- .header(REMOTE_USER, USER).accept(job.mimeType)
- .type(MediaType.TEXT_PLAIN).method(job.method, ClientResponse.class);
+ .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+ .accept(job.mimeType)
+ .type(job.mimeType)
+ .method(job.method, ClientResponse.class);
return parseStringResult(clientResponse);
}
@@ -720,7 +747,7 @@ public class FalconClient {
return sb.toString();
}
- private void checkIfSuccessfull(ClientResponse clientResponse)
+ private void checkIfSuccessful(ClientResponse clientResponse)
throws FalconCLIException {
if (clientResponse.getStatus() == Response.Status.BAD_REQUEST
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 068a22c..c55c989 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -153,6 +153,11 @@
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>net.sourceforge.findbugs</groupId>
<artifactId>annotations</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
index 691d805..fc9c3b1 100644
--- a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
@@ -32,11 +32,13 @@ public abstract class AbstractCatalogService {
/**
* This method checks if the catalog service is alive.
*
- * @param catalogBaseUrl url for the catalog service
+ * @param catalogUrl url for the catalog service
+ * @param metaStorePrincipal kerberos principal for hive metastore as this is executed in falcon on behalf of user
* @return if the service was reachable
* @throws FalconException exception
*/
- public abstract boolean isAlive(String catalogBaseUrl) throws FalconException;
+ public abstract boolean isAlive(String catalogUrl,
+ String metaStorePrincipal) throws FalconException;
/**
* This method checks if the given table exists in the catalog.
@@ -44,14 +46,15 @@ public abstract class AbstractCatalogService {
* @param catalogUrl url for the catalog service
* @param database database the table belongs to
* @param tableName tableName to check if it exists
+ * @param metaStorePrincipal kerberos principal for hive metastore as this is executed in falcon on behalf of user
* @return if the table exists
* @throws FalconException exception
*/
- public abstract boolean tableExists(String catalogUrl, String database, String tableName)
- throws FalconException;
+ public abstract boolean tableExists(String catalogUrl, String database, String tableName,
+ String metaStorePrincipal) throws FalconException;
/**
- * Returns if the table is external or not.
+ * Returns if the table is external or not. Executed in the workflow engine.
*
* @param catalogUrl url for the catalog service
* @param database database the table belongs to
@@ -63,7 +66,7 @@ public abstract class AbstractCatalogService {
String tableName) throws FalconException;
/**
- * List partitions by filter.
+ * List partitions by filter. Executed in the workflow engine.
*
* @param catalogUrl url for the catalog service
* @param database database the table belongs to
@@ -79,7 +82,7 @@ public abstract class AbstractCatalogService {
throws FalconException;
/**
- * Drops a given partition.
+ * Drops a given partition. Executed in the workflow engine.
*
* @param catalogUrl url for the catalog service
* @param database database the table belongs to
@@ -92,7 +95,7 @@ public abstract class AbstractCatalogService {
Map<String, String> partitions) throws FalconException;
/**
- * Gets the partition.
+ * Gets the partition. Executed in the workflow engine.
*
* @param catalogUrl url for the catalog service
* @param database database the table belongs to
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
index 51e4d6e..3c3660e 100644
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -23,6 +23,7 @@ import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hcatalog.api.HCatClient;
import org.apache.hcatalog.api.HCatDatabase;
import org.apache.hcatalog.api.HCatPartition;
@@ -32,6 +33,8 @@ import org.apache.hcatalog.common.HCatException;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -67,63 +70,94 @@ public class HiveCatalogService extends AbstractCatalogService {
private static HCatClient getHCatClient(String metastoreUrl) throws FalconException {
try {
- HiveConf hcatConf = new HiveConf();
- hcatConf.set("hive.metastore.local", "false");
- hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl);
- hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
- hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
- HCatSemanticAnalyzer.class.getName());
- hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
-
- hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
- hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
-
+ HiveConf hcatConf = createHiveConf(metastoreUrl);
return HCatClient.create(hcatConf);
} catch (HCatException e) {
- throw new FalconException(e);
+ throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e);
}
}
+ private static HiveConf createHiveConf(String metastoreUrl) {
+ HiveConf hcatConf = new HiveConf();
+ hcatConf.set("hive.metastore.local", "false");
+ hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl);
+ hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+ hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ HCatSemanticAnalyzer.class.getName());
+ hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+
+ hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ return hcatConf;
+ }
+
+ public static synchronized HCatClient getProxiedClient(String catalogUrl,
+ String metaStorePrincipal) throws FalconException {
+ if (!CACHE.containsKey(catalogUrl)) {
+ try {
+ final HiveConf hcatConf = createHiveConf(catalogUrl);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, metaStorePrincipal);
+ hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
+ }
+
+ LOG.info("Creating and caching HCatalog client object for " + catalogUrl);
+ UserGroupInformation currentUser = UserGroupInformation.getLoginUser();
+ HCatClient hcatClient = currentUser.doAs(new PrivilegedExceptionAction<HCatClient>() {
+ public HCatClient run() throws Exception {
+ return HCatClient.create(hcatConf);
+ }
+ });
+ CACHE.putIfAbsent(catalogUrl, hcatClient);
+ } catch (IOException e) {
+ throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(), e);
+ } catch (InterruptedException e) {
+ throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(), e);
+ }
+ }
+
+ return CACHE.get(catalogUrl);
+ }
@Override
- public boolean isAlive(String catalogBaseUrl) throws FalconException {
- LOG.info("Checking if the service is alive for: " + catalogBaseUrl);
+ public boolean isAlive(final String catalogUrl,
+ final String metaStorePrincipal) throws FalconException {
+ LOG.info("Checking if the service is alive for: " + catalogUrl);
try {
- HCatClient client = get(catalogBaseUrl);
- client.close();
+ HCatClient client = getProxiedClient(catalogUrl, metaStorePrincipal);
HCatDatabase database = client.getDatabase("default");
return database != null;
} catch (HCatException e) {
- throw new FalconException(e);
+ throw new FalconException("Exception checking if the service is alive:" + e.getMessage(), e);
}
}
@Override
- public boolean tableExists(String catalogUrl, String database, String tableName)
- throws FalconException {
+ public boolean tableExists(final String catalogUrl, final String database, final String tableName,
+ final String metaStorePrincipal) throws FalconException {
LOG.info("Checking if the table exists: " + tableName);
try {
- HCatClient client = get(catalogUrl);
+ HCatClient client = getProxiedClient(catalogUrl, metaStorePrincipal);
HCatTable table = client.getTable(database, tableName);
return table != null;
} catch (HCatException e) {
- throw new FalconException(e);
+ throw new FalconException("Exception checking if the table exists:" + e.getMessage(), e);
}
}
@Override
public boolean isTableExternal(String catalogUrl, String database, String tableName)
throws FalconException {
- LOG.info("Returns a list of table properties for:" + tableName);
+ LOG.info("Checking if the table is external:" + tableName);
try {
HCatClient client = get(catalogUrl);
HCatTable table = client.getTable(database, tableName);
return !table.getTabletype().equals("MANAGED_TABLE");
} catch (HCatException e) {
- throw new FalconException(e);
+ throw new FalconException("Exception checking if the table is external:" + e.getMessage(), e);
}
}
@@ -145,7 +179,7 @@ public class HiveCatalogService extends AbstractCatalogService {
return catalogPartitionList;
} catch (HCatException e) {
- throw new FalconException(e);
+ throw new FalconException("Exception listing partitions:" + e.getMessage(), e);
}
}
@@ -180,7 +214,7 @@ public class HiveCatalogService extends AbstractCatalogService {
HCatClient client = get(catalogUrl);
client.dropPartitions(database, tableName, partitions, true);
} catch (HCatException e) {
- throw new FalconException(e);
+ throw new FalconException("Exception dropping partitions:" + e.getMessage(), e);
}
return true;
@@ -189,14 +223,14 @@ public class HiveCatalogService extends AbstractCatalogService {
@Override
public CatalogPartition getPartition(String catalogUrl, String database, String tableName,
Map<String, String> partitionSpec) throws FalconException {
- LOG.info("List partitions for : " + tableName + ", partition spec: " + partitionSpec);
+ LOG.info("Fetch partition for : " + tableName + ", partition spec: " + partitionSpec);
try {
HCatClient client = get(catalogUrl);
HCatPartition hCatPartition = client.getPartition(database, tableName, partitionSpec);
return createCatalogPartition(hCatPartition);
} catch (HCatException e) {
- throw new FalconException(e);
+ throw new FalconException("Exception fetching partition:" + e.getMessage(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
index 644afd2..20d46c3 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
@@ -26,9 +26,9 @@ import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,8 +44,8 @@ import java.io.IOException;
*/
public abstract class AbstractCleanupHandler {
- protected static final Logger LOG = Logger
- .getLogger(AbstractCleanupHandler.class);
+ protected static final Logger LOG = Logger.getLogger(AbstractCleanupHandler.class);
+
protected static final ConfigurationStore STORE = ConfigurationStore.get();
public static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
public static final ExpressionHelper RESOLVER = ExpressionHelper.get();
@@ -66,7 +66,6 @@ public abstract class AbstractCleanupHandler {
private String getRetentionValue(Frequency.TimeUnit timeunit) {
return RuntimeProperties.get().getProperty(
"log.cleanup.frequency." + timeunit + ".retention", "days(1)");
-
}
protected FileStatus[] getAllLogs(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
@@ -87,14 +86,7 @@ public abstract class AbstractCleanupHandler {
protected FileSystem getFileSystem(org.apache.falcon.entity.v0.cluster.Cluster cluster)
throws FalconException {
- FileSystem fs;
- try {
- fs = new Path(ClusterHelper.getStorageUrl(cluster))
- .getFileSystem(new Configuration());
- } catch (IOException e) {
- throw new FalconException(e);
- }
- return fs;
+ return HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
}
protected void delete(Cluster cluster, Entity entity, long retention)
@@ -116,8 +108,7 @@ public abstract class AbstractCleanupHandler {
for (FileStatus log : logs) {
if (now - log.getModificationTime() > retention) {
try {
- boolean isDeleted = getFileSystem(cluster).delete(
- log.getPath(), true);
+ boolean isDeleted = getFileSystem(cluster).delete(log.getPath(), true);
if (!isDeleted) {
LOG.error("Unable to delete path: " + log.getPath());
} else {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
index 7dbac58..58d2199 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
@@ -56,7 +56,7 @@ public class FeedCleanupHandler extends AbstractCleanupHandler {
delete(currentCluster, feed, retention);
deleteStagedData(currentCluster, feed, retention);
} else {
- LOG.info("Ignoring cleanup for process:" + feedName
+ LOG.info("Ignoring cleanup for feed:" + feedName
+ " in cluster: " + cluster.getName() + " as this does not belong to current colo");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index 32f7605..ed9b238 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -19,7 +19,6 @@
package org.apache.falcon.entity;
import org.apache.falcon.FalconException;
-import org.apache.falcon.catalog.CatalogServiceFactory;
import org.apache.falcon.entity.common.FeedDataPath;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
@@ -325,11 +324,6 @@ public class CatalogStorage implements Storage {
}
@Override
- public boolean exists() throws FalconException {
- return CatalogServiceFactory.getCatalogService().tableExists(catalogUrl, database, table);
- }
-
- @Override
public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
CatalogStorage catalogStorage = (CatalogStorage) toCompareAgainst;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index 38b5c5b..c0f3ee2 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -18,14 +18,11 @@
package org.apache.falcon.entity;
-import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.cluster.*;
+import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import java.io.IOException;
-
/**
* Helper to get end points relating to the cluster.
*/
@@ -37,22 +34,21 @@ public final class ClusterHelper {
public static Configuration getConfiguration(Cluster cluster) {
Configuration conf = new Configuration();
- conf.set("fs.default.name", getStorageUrl(cluster));
- conf.set("mapred.job.tracker", getMREndPoint(cluster));
+
+ final String storageUrl = getStorageUrl(cluster);
+ conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl);
+
+ final String executeEndPoint = getMREndPoint(cluster);
+ conf.set(HadoopClientFactory.MR_JOB_TRACKER_KEY, executeEndPoint);
+ conf.set(HadoopClientFactory.YARN_RM_ADDRESS_KEY, executeEndPoint);
+
if (cluster.getProperties() != null) {
for (Property prop : cluster.getProperties().getProperties()) {
conf.set(prop.getName(), prop.getValue());
}
}
- return conf;
- }
- public static FileSystem getFileSystem(Cluster cluster) throws FalconException {
- try {
- return FileSystem.get(getConfiguration(cluster));
- } catch (IOException e) {
- throw new FalconException(e);
- }
+ return conf;
}
public static String getOozieUrl(Cluster cluster) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index a3ad83d..b4bc07d 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -34,6 +34,7 @@ import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.*;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.hadoop.fs.FileStatus;
@@ -553,7 +554,7 @@ public final class EntityUtil {
private static Path getStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Path path)
throws FalconException {
try {
- FileSystem fs = ClusterHelper.getFileSystem(cluster);
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
FileStatus latest = null;
FileStatus[] files = fs.globStatus(path, new PathFilter() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 68370c7..41917c8 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -170,12 +170,6 @@ public class FileSystemStorage implements Storage {
}
@Override
- public boolean exists() throws FalconException {
- // Directories on FS will be created if they don't exist.
- return true;
- }
-
- @Override
public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
FileSystemStorage fsStorage = (FileSystemStorage) toCompareAgainst;
final List<Location> fsStorageLocations = fsStorage.getLocations();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/Storage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java
index 0634969..60c87c5 100644
--- a/common/src/main/java/org/apache/falcon/entity/Storage.java
+++ b/common/src/main/java/org/apache/falcon/entity/Storage.java
@@ -65,15 +65,6 @@ public interface Storage {
String getUriTemplate(LocationType locationType);
/**
- * Check if the storage, filesystem location or catalog table exists.
- * Filesystem location always returns true.
- *
- * @return true if table exists else false
- * @throws FalconException an exception
- */
- boolean exists() throws FalconException;
-
- /**
* Check for equality of this instance against the one in question.
*
* @param toCompareAgainst instance to compare
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index e633838..831bfdc 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import javax.jms.ConnectionFactory;
+import org.apache.commons.lang.Validate;
import org.apache.falcon.FalconException;
import org.apache.falcon.catalog.CatalogServiceFactory;
import org.apache.falcon.entity.ClusterHelper;
@@ -29,16 +30,17 @@ import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.store.StoreAccessException;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
+
/**
* Parser that parses cluster entity definition.
*/
@@ -51,8 +53,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
}
@Override
- public void validate(Cluster cluster) throws StoreAccessException,
- ValidationException {
+ public void validate(Cluster cluster) throws StoreAccessException, ValidationException {
// validating scheme in light of fail-early
validateScheme(cluster, Interfacetype.READONLY);
validateScheme(cluster, Interfacetype.WRITE);
@@ -88,23 +89,34 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
final String readOnlyStorageUrl = ClusterHelper.getReadOnlyStorageUrl(cluster);
LOG.info("Validating read interface: " + readOnlyStorageUrl);
- validateFileSystem(readOnlyStorageUrl);
+ validateFileSystem(cluster, readOnlyStorageUrl);
}
private void validateWriteInterface(Cluster cluster) throws ValidationException {
final String writeStorageUrl = ClusterHelper.getStorageUrl(cluster);
LOG.info("Validating write interface: " + writeStorageUrl);
- validateFileSystem(writeStorageUrl);
+ validateFileSystem(cluster, writeStorageUrl);
}
- private void validateFileSystem(String storageUrl) throws ValidationException {
+ private void validateFileSystem(Cluster cluster, String storageUrl) throws ValidationException {
try {
Configuration conf = new Configuration();
- conf.set("fs.default.name", storageUrl);
+ conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl);
conf.setInt("ipc.client.connect.max.retries", 10);
- FileSystem.get(conf);
- } catch (IOException e) {
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ String nameNodePrincipal = ClusterHelper.getPropertyValue(cluster, SecurityUtil.NN_PRINCIPAL);
+ Validate.notEmpty(nameNodePrincipal,
+ "Cluster definition missing required namenode credential property: " + SecurityUtil.NN_PRINCIPAL);
+
+ conf.set(SecurityUtil.NN_PRINCIPAL, nameNodePrincipal);
+ }
+
+ // todo: ideally check if the end user has access using createProxiedFileSystem
+ // hftp won't work and bug is logged at HADOOP-10215
+ HadoopClientFactory.get().createFileSystem(conf);
+ } catch (FalconException e) {
throw new ValidationException("Invalid storage server or port: " + storageUrl, e);
}
}
@@ -114,11 +126,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
LOG.info("Validating execute interface: " + executeUrl);
try {
- JobConf jobConf = new JobConf();
- jobConf.set("mapred.job.tracker", executeUrl);
- jobConf.set("yarn.resourcemanager.address", executeUrl);
- JobClient jobClient = new JobClient(jobConf);
- jobClient.getClusterStatus().getMapTasks();
+ HadoopClientFactory.validateJobClient(executeUrl);
} catch (IOException e) {
throw new ValidationException("Invalid Execute server or port: " + executeUrl, e);
}
@@ -173,7 +181,15 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
LOG.info("Validating catalog registry interface: " + catalogUrl);
try {
- if (!CatalogServiceFactory.getCatalogService().isAlive(catalogUrl)) {
+ String metaStorePrincipal = null;
+ if (UserGroupInformation.isSecurityEnabled()) {
+ metaStorePrincipal = ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL);
+ Validate.notEmpty(metaStorePrincipal,
+ "Cluster definition missing required metastore credential property: "
+ + SecurityUtil.HIVE_METASTORE_PRINCIPAL);
+ }
+
+ if (!CatalogServiceFactory.getCatalogService().isAlive(catalogUrl, metaStorePrincipal)) {
throw new ValidationException("Unable to reach Catalog server:" + catalogUrl);
}
} catch (FalconException e) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 5c1d9ad..d138179 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -20,7 +20,9 @@ package org.apache.falcon.entity.parser;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.catalog.CatalogServiceFactory;
import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
@@ -38,6 +40,7 @@ import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.group.FeedGroup;
import org.apache.falcon.group.FeedGroupMap;
+import org.apache.falcon.security.SecurityUtil;
import org.apache.log4j.Logger;
import java.util.Date;
@@ -344,6 +347,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
}
private void validateStorageExists(Feed feed) throws FalconException {
+ StringBuilder buffer = new StringBuilder();
for (Cluster cluster : feed.getClusters().getClusters()) {
org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
@@ -352,12 +356,27 @@ public class FeedEntityParser extends EntityParser<Feed> {
}
final Storage storage = FeedHelper.createStorage(cluster, feed);
- if (!storage.exists()) {
- // this is only true for table, filesystem always returns true
- CatalogStorage catalogStorage = (CatalogStorage) storage;
- throw new ValidationException("Table [" + catalogStorage.getTable()
- + "] does not exist for feed: " + feed.getName() + ", cluster: " + cluster.getName());
+ // this is only true for table, filesystem always returns true
+ if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+ continue;
+ }
+
+ CatalogStorage catalogStorage = (CatalogStorage) storage;
+ String metaStorePrincipal = ClusterHelper.getPropertyValue(clusterEntity,
+ SecurityUtil.HIVE_METASTORE_PRINCIPAL);
+ if (!CatalogServiceFactory.getCatalogService().tableExists(catalogStorage.getCatalogUrl(),
+ catalogStorage.getDatabase(), catalogStorage.getTable(), metaStorePrincipal)) {
+ buffer.append("Table [")
+ .append(catalogStorage.getTable())
+ .append("] does not exist for feed: ")
+ .append(feed.getName())
+ .append(" in cluster: ")
+ .append(cluster.getName());
}
}
+
+ if (buffer.length() > 0) {
+ throw new ValidationException(buffer.toString());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 8647d43..837b86a 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -41,6 +41,7 @@ import org.apache.falcon.entity.v0.process.LateInput;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Outputs;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -97,6 +98,13 @@ public class ProcessEntityParser extends EntityParser<Process> {
validateLateInputs(process);
}
+ /**
+ * Validate if the user submitting this entity has access to the specific dirs on HDFS.
+ *
+ * @param process process
+ * @param clusterName cluster the process is materialized on
+ * @throws FalconException
+ */
private void validateHDFSPaths(Process process, String clusterName) throws FalconException {
org.apache.falcon.entity.v0.cluster.Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER,
clusterName);
@@ -109,9 +117,8 @@ public class ProcessEntityParser extends EntityParser<Process> {
String libPath = process.getWorkflow().getLib();
String nameNode = getNameNode(cluster, clusterName);
try {
- Configuration configuration = new Configuration();
- configuration.set("fs.default.name", nameNode);
- FileSystem fs = FileSystem.get(configuration);
+ Configuration configuration = ClusterHelper.getConfiguration(cluster);
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(configuration);
if (!fs.exists(new Path(workflowPath))) {
throw new ValidationException(
"Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index 18ceb6e..156fafe 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -21,14 +21,16 @@ package org.apache.falcon.entity.store;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.service.ConfigurationChangeListener;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.util.ReflectionUtils;
import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.log4j.Logger;
import javax.xml.bind.JAXBException;
@@ -84,8 +86,26 @@ public final class ConfigurationStore implements FalconService {
String uri = StartupProperties.get().getProperty("config.store.uri");
storePath = new Path(uri);
+ fs = initializeFileSystem();
+ }
+
+ /**
+ * Falcon owns this dir on HDFS which no one has permissions to read.
+ *
+ * @return FileSystem handle
+ */
+ private FileSystem initializeFileSystem() {
try {
- fs = FileSystem.get(storePath.toUri(), new Configuration());
+ FileSystem fileSystem = HadoopClientFactory.get().createFileSystem(storePath.toUri());
+ if (!fileSystem.exists(storePath)) {
+ LOG.info("Creating configuration store directory: " + storePath);
+ fileSystem.mkdirs(storePath);
+ // set permissions so config store dir is owned by falcon alone
+ FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
+ fileSystem.setPermission(storePath, permission);
+ }
+
+ return fileSystem;
} catch (Exception e) {
throw new RuntimeException("Unable to bring up config store", e);
}
@@ -305,8 +325,7 @@ public final class ConfigurationStore implements FalconService {
Path archivePath = new Path(storePath, "archive" + Path.SEPARATOR + type);
fs.mkdirs(archivePath);
fs.rename(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml"),
- new Path(archivePath,
- URLEncoder.encode(name, UTF_8) + "." + System.currentTimeMillis()));
+ new Path(archivePath, URLEncoder.encode(name, UTF_8) + "." + System.currentTimeMillis()));
LOG.info("Archived configuration " + type + "/" + name);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
new file mode 100644
index 0000000..d5fbda8
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hadoop;
+
+import org.apache.commons.lang.Validate;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * A factory implementation to dole out FileSystem handles based on the logged in user.
+ */
+public final class HadoopClientFactory {
+
+ public static final String FS_DEFAULT_NAME_KEY = "fs.default.name";
+ public static final String MR_JOB_TRACKER_KEY = "mapred.job.tracker";
+ public static final String YARN_RM_ADDRESS_KEY = "yarn.resourcemanager.address";
+
+ private static final HadoopClientFactory INSTANCE = new HadoopClientFactory();
+
+ private HadoopClientFactory() {
+ }
+
+ public static HadoopClientFactory get() {
+ return INSTANCE;
+ }
+
+ /**
+ * This method is only used by Falcon internally to talk to the config store on HDFS.
+ *
+ * @param uri file system URI for config store.
+ * @return FileSystem created with the provided proxyUser/group.
+ * @throws org.apache.falcon.FalconException
+ * if the filesystem could not be created.
+ */
+ public FileSystem createFileSystem(final URI uri) throws FalconException {
+ Validate.notNull(uri, "uri cannot be null");
+
+ try {
+ Configuration conf = new Configuration();
+ if (UserGroupInformation.isSecurityEnabled()) {
+ conf.set(SecurityUtil.NN_PRINCIPAL, StartupProperties.get().getProperty(SecurityUtil.NN_PRINCIPAL));
+ }
+
+ return createFileSystem(UserGroupInformation.getLoginUser(), uri, conf);
+ } catch (IOException e) {
+ throw new FalconException("Exception while getting FileSystem for: " + uri, e);
+ }
+ }
+
+ public FileSystem createFileSystem(final Configuration conf)
+ throws FalconException {
+ Validate.notNull(conf, "configuration cannot be null");
+
+ String nameNode = conf.get(FS_DEFAULT_NAME_KEY);
+ try {
+ return createFileSystem(UserGroupInformation.getLoginUser(), new URI(nameNode), conf);
+ } catch (URISyntaxException e) {
+ throw new FalconException("Exception while getting FileSystem for: " + nameNode, e);
+ } catch (IOException e) {
+ throw new FalconException("Exception while getting FileSystem for: " + nameNode, e);
+ }
+ }
+
+ public FileSystem createFileSystem(final URI uri, final Configuration conf)
+ throws FalconException {
+ Validate.notNull(uri, "uri cannot be null");
+
+ try {
+ return createFileSystem(UserGroupInformation.getLoginUser(), uri, conf);
+ } catch (IOException e) {
+ throw new FalconException("Exception while getting FileSystem for: " + uri, e);
+ }
+ }
+
+ public FileSystem createProxiedFileSystem(final Configuration conf)
+ throws FalconException {
+ Validate.notNull(conf, "configuration cannot be null");
+
+ String nameNode = conf.get(FS_DEFAULT_NAME_KEY);
+ try {
+ return createProxiedFileSystem(CurrentUser.getUser(), new URI(nameNode), conf);
+ } catch (URISyntaxException e) {
+ throw new FalconException("Exception while getting FileSystem for: " + nameNode, e);
+ }
+ }
+
+ /**
+ * Return a FileSystem created with the provided proxyUser for the specified URI.
+ *
+ * @param proxyUser proxyUser
+ * @param uri file system URI.
+ * @param conf Configuration with all necessary information to create the FileSystem.
+ * @return FileSystem created with the provided proxyUser/group.
+ * @throws org.apache.falcon.FalconException
+ * if the filesystem could not be created.
+ */
+ public FileSystem createProxiedFileSystem(String proxyUser, final URI uri, final Configuration conf)
+ throws FalconException {
+ Validate.notEmpty(proxyUser, "proxyUser cannot be null");
+
+ try {
+ UserGroupInformation proxyUgi = SecurityUtil.getProxyUser(proxyUser);
+ return createFileSystem(proxyUgi, uri, conf);
+ } catch (IOException ex) {
+ throw new FalconException("Exception while getting FileSystem: " + ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Return a FileSystem created with the provided user for the specified URI.
+ *
+ * @param ugi user group information
+ * @param uri file system URI.
+ * @param conf Configuration with all necessary information to create the FileSystem.
+ * @return FileSystem created with the provided user/group.
+ * @throws org.apache.falcon.FalconException
+ * if the filesystem could not be created.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ public FileSystem createFileSystem(UserGroupInformation ugi, final URI uri, final Configuration conf)
+ throws FalconException {
+ Validate.notNull(ugi, "ugi cannot be null");
+ Validate.notNull(conf, "configuration cannot be null");
+
+ String nameNode = uri.getAuthority();
+ if (nameNode == null) {
+ nameNode = conf.get(FS_DEFAULT_NAME_KEY);
+ if (nameNode != null) {
+ try {
+ new URI(nameNode).getAuthority();
+ } catch (URISyntaxException ex) {
+ throw new FalconException("Exception while getting FileSystem", ex);
+ }
+ }
+ }
+
+ try {
+ return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ public FileSystem run() throws Exception {
+ return FileSystem.get(uri, conf);
+ }
+ });
+ } catch (InterruptedException ex) {
+ throw new FalconException("Exception creating FileSystem:" + ex.getMessage(), ex);
+ } catch (IOException ex) {
+ throw new FalconException("Exception creating FileSystem:" + ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * This method validates if the execute url is able to reach the MR endpoint.
+ *
+ * @param executeUrl jt url or RM url
+ * @throws IOException
+ */
+ public static void validateJobClient(String executeUrl) throws IOException {
+ final JobConf jobConf = new JobConf();
+ jobConf.set(MR_JOB_TRACKER_KEY, executeUrl);
+ jobConf.set(YARN_RM_ADDRESS_KEY, executeUrl);
+
+ UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+ try {
+ JobClient jobClient = loginUser.doAs(new PrivilegedExceptionAction<JobClient>() {
+ public JobClient run() throws Exception {
+ return new JobClient(jobConf);
+ }
+ });
+
+ jobClient.getClusterStatus().getMapTasks();
+ } catch (InterruptedException e) {
+ throw new IOException("Exception creating job client:" + e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/security/AuthenticationInitializationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/AuthenticationInitializationService.java b/common/src/main/java/org/apache/falcon/security/AuthenticationInitializationService.java
new file mode 100644
index 0000000..264d5b8
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/security/AuthenticationInitializationService.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.commons.lang.Validate;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.service.FalconService;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.util.Properties;
+
+
+/**
+ * Authentication Service at startup that initializes the authentication credentials
+ * based on authentication type. If Kerberos is enabled, it logs in the user with the key tab.
+ */
+public class AuthenticationInitializationService implements FalconService {
+
+ private static final Logger LOG = Logger.getLogger(AuthenticationInitializationService.class);
+
+ /**
+ * Constant for the configuration property that indicates the prefix.
+ */
+ protected static final String CONFIG_PREFIX = "falcon.service.authentication.";
+
+ /**
+ * Constant for the configuration property that indicates the keytab file path.
+ */
+ protected static final String KERBEROS_KEYTAB = CONFIG_PREFIX + KerberosAuthenticationHandler.KEYTAB;
+ /**
+ * Constant for the configuration property that indicates the kerberos principal.
+ */
+ protected static final String KERBEROS_PRINCIPAL = CONFIG_PREFIX + KerberosAuthenticationHandler.PRINCIPAL;
+
+
+ @Override
+ public String getName() {
+ return "Authentication initialization service";
+ }
+
+ @Override
+ public void init() throws FalconException {
+
+ if (SecurityUtil.isSecurityEnabled()) {
+ LOG.info("Falcon Kerberos Authentication Enabled!");
+ initializeKerberos();
+ } else {
+ LOG.info("Falcon Simple Authentication Enabled!");
+ Configuration ugiConf = new Configuration();
+ ugiConf.set("hadoop.security.authentication", "simple");
+ UserGroupInformation.setConfiguration(ugiConf);
+ }
+ }
+
+ protected void initializeKerberos() throws FalconException {
+ try {
+ Properties configuration = StartupProperties.get();
+ String principal = configuration.getProperty(KERBEROS_PRINCIPAL);
+ Validate.notEmpty(principal,
+ "Missing required configuration property: " + KERBEROS_PRINCIPAL);
+ principal = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(
+ principal, SecurityUtil.getLocalHostName());
+
+ String keytabFilePath = configuration.getProperty(KERBEROS_KEYTAB);
+ Validate.notEmpty(keytabFilePath,
+ "Missing required configuration property: " + KERBEROS_KEYTAB);
+ checkIsReadable(keytabFilePath);
+
+ Configuration conf = new Configuration();
+ conf.set("hadoop.security.authentication", "kerberos");
+
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation.loginUserFromKeytab(principal, keytabFilePath);
+
+ LOG.info("Got Kerberos ticket, keytab: " + keytabFilePath
+ + ", Falcon principal principal: " + principal);
+ } catch (Exception ex) {
+ throw new FalconException("Could not initialize " + getName()
+ + ": " + ex.getMessage(), ex);
+ }
+ }
+
+ private static void checkIsReadable(String keytabFilePath) {
+ File keytabFile = new File(keytabFilePath);
+ if (!keytabFile.exists()) {
+ throw new IllegalArgumentException("The keytab file does not exist! " + keytabFilePath);
+ }
+
+ if (!keytabFile.isFile()) {
+ throw new IllegalArgumentException("The keytab file cannot be a directory! " + keytabFilePath);
+ }
+
+ if (!keytabFile.canRead()) {
+ throw new IllegalArgumentException("The keytab file is not readable! " + keytabFilePath);
+ }
+ }
+
+ @Override
+ public void destroy() throws FalconException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/security/CurrentUser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
index 4d2299e..cd7d0b0 100644
--- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java
+++ b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
@@ -37,8 +37,7 @@ public final class CurrentUser {
return INSTANCE;
}
- private final ThreadLocal<Subject> currentSubject =
- new ThreadLocal<Subject>();
+ private final ThreadLocal<Subject> currentSubject = new ThreadLocal<Subject>();
public static void authenticate(String user) {
if (user == null || user.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java b/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java
deleted file mode 100644
index d95e147..0000000
--- a/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-
-import com.sun.security.auth.UnixPrincipal;
-import org.apache.log4j.Logger;
-
-import javax.security.auth.Subject;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.login.LoginException;
-import javax.security.auth.spi.LoginModule;
-import java.security.Principal;
-import java.util.Map;
-
-/**
- * Falcon JAAS login module.
- */
-public class FalconLoginModule implements LoginModule {
- private static final Logger LOG = Logger.getLogger(FalconLoginModule.class);
-
- private Subject subject;
-
- public Subject getSubject() {
- return subject;
- }
-
- @Override
- public boolean abort() throws LoginException {
- return true;
- }
-
- private <T extends Principal> T getCanonicalUser(Class<T> cls) {
- for (T user : subject.getPrincipals(cls)) {
- return user;
- }
- return null;
- }
-
- @Override
- public boolean commit() throws LoginException {
- if (!subject.getPrincipals(SecurityConstants.OS_PRINCIPAL_CLASS).
- isEmpty()) {
- return true;
- }
-
- Principal user = getCanonicalUser(SecurityConstants.OS_PRINCIPAL_CLASS);
- if (user != null) {
- subject.getPrincipals().add(new UnixPrincipal(user.getName()));
- return true;
- }
- LOG.error("No such user " + subject);
- throw new LoginException("No such user " + subject);
- }
-
- //SUSPEND CHECKSTYLE CHECK HiddenFieldCheck
- @Override
- public void initialize(Subject subject, CallbackHandler callbackHandler,
- Map<String, ?> sharedState, Map<String, ?> options) {
- this.subject = subject;
- }
- //RESUME CHECKSTYLE CHECK HiddenFieldCheck
-
- @Override
- public boolean login() throws LoginException {
- return true;
- }
-
- @Override
- public boolean logout() throws LoginException {
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java b/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java
deleted file mode 100644
index b80ab6d..0000000
--- a/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.Configuration;
-import java.util.HashMap;
-
-/**
- * Falcon JAAS security configuration.
- */
-public class FalconSecurityConfiguration extends Configuration {
-
- private static final AppConfigurationEntry OS_SPECIFIC_LOGIN =
- new AppConfigurationEntry(SecurityConstants.OS_LOGIN_MODULE_NAME,
- AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
- new HashMap<String, String>());
-
- private static final AppConfigurationEntry[] SIMPLE_CONF =
- new AppConfigurationEntry[]{OS_SPECIFIC_LOGIN};
-
- private final Configuration parent;
-
- public FalconSecurityConfiguration(Configuration parent) {
- this.parent = parent;
- }
-
- @Override
- public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
- if (parent == null || appName.equals(SecurityConstants.FALCON_LOGIN)) {
- return SIMPLE_CONF.clone();
- } else {
- return parent.getAppConfigurationEntry(appName);
- }
- }
-}