You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/02/16 04:31:23 UTC
[2/5] FALCON-11 Add support for security in Falcon. Contributed by
Venkatesh Seetharam
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index 87be709..6d0297e 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -41,6 +41,7 @@ import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Property;
import org.apache.falcon.entity.v0.process.Workflow;
import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
import org.apache.falcon.oozie.coordinator.CONTROLS;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
@@ -84,20 +85,10 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
super(entity);
}
- private void mkdir(FileSystem fs, Path path) throws FalconException {
- try {
- if (!fs.exists(path) && !fs.mkdirs(path)) {
- throw new FalconException("mkdir failed for " + path);
- }
- } catch (IOException e) {
- throw new FalconException("mkdir failed for " + path, e);
- }
- }
-
@Override
protected List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
try {
- FileSystem fs = ClusterHelper.getFileSystem(cluster);
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
Process process = getEntity();
//Copy user workflow and lib to staging dir
@@ -136,11 +127,11 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
private Path getUserWorkflowPath(Cluster cluster, Path bundlePath) throws FalconException {
try {
- FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
Process process = getEntity();
Path wfPath = new Path(process.getWorkflow().getPath());
if (fs.isFile(wfPath)) {
- return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName().toString());
+ return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName());
} else {
return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR);
}
@@ -151,14 +142,15 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
private Path getUserLibPath(Cluster cluster, Path bundlePath) throws FalconException {
try {
- FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
Process process = getEntity();
if (process.getWorkflow().getLib() == null) {
return null;
}
Path libPath = new Path(process.getWorkflow().getLib());
+
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
if (fs.isFile(libPath)) {
- return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName().toString());
+ return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
} else {
return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR);
}
@@ -516,9 +508,9 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
action.getSubWorkflow().setAppPath("${nameNode}" + userWfPath);
} else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
- decoratePIGAction(cluster, process, processWorkflow, action.getPig(), parentWfPath);
+ decoratePIGAction(cluster, process, action.getPig(), parentWfPath);
} else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
- decorateHiveAction(cluster, process, processWorkflow, action, parentWfPath);
+ decorateHiveAction(cluster, process, action, parentWfPath);
} else if (FALCON_ACTIONS.contains(actionName)) {
decorateWithOozieRetries(action);
}
@@ -529,7 +521,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
}
private void decoratePIGAction(Cluster cluster, Process process,
- Workflow processWorkflow, PIG pigAction, Path parentWfPath) throws FalconException {
+ PIG pigAction, Path parentWfPath) throws FalconException {
Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
pigAction.setScript("${nameNode}" + userWfPath.toString());
@@ -548,11 +540,11 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
}
- addArchiveForCustomJars(cluster, processWorkflow, pigAction.getArchive(),
+ addArchiveForCustomJars(cluster, pigAction.getArchive(),
getUserLibPath(cluster, parentWfPath.getParent()));
}
- private void decorateHiveAction(Cluster cluster, Process process, Workflow processWorkflow, ACTION wfAction,
+ private void decorateHiveAction(Cluster cluster, Process process, ACTION wfAction,
Path parentWfPath) throws FalconException {
JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = unMarshalHiveAction(wfAction);
@@ -571,7 +563,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
setupHiveConfiguration(cluster, parentWfPath, "falcon-");
- addArchiveForCustomJars(cluster, processWorkflow, hiveAction.getArchive(),
+ addArchiveForCustomJars(cluster, hiveAction.getArchive(),
getUserLibPath(cluster, parentWfPath.getParent()));
marshalHiveAction(wfAction, actionJaxbElement);
@@ -750,16 +742,16 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
String prefix) throws FalconException {
String catalogUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
try {
- FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
Path confPath = new Path(wfPath, "conf");
- createHiveConf(fs, confPath, catalogUrl, prefix);
+ createHiveConf(fs, confPath, catalogUrl, cluster, prefix);
} catch (IOException e) {
throw new FalconException(e);
}
}
- private void addArchiveForCustomJars(Cluster cluster, Workflow processWorkflow,
- List<String> archiveList, Path libPath) throws FalconException {
+ private void addArchiveForCustomJars(Cluster cluster, List<String> archiveList,
+ Path libPath) throws FalconException {
if (libPath == null) {
return;
}
@@ -814,5 +806,4 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
throw new RuntimeException("Unable to marshall hive action.", e);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/process/src/main/resources/config/workflow/process-parent-workflow.xml
----------------------------------------------------------------------
diff --git a/process/src/main/resources/config/workflow/process-parent-workflow.xml b/process/src/main/resources/config/workflow/process-parent-workflow.xml
index 494bf20..f53c1e7 100644
--- a/process/src/main/resources/config/workflow/process-parent-workflow.xml
+++ b/process/src/main/resources/config/workflow/process-parent-workflow.xml
@@ -189,6 +189,8 @@
<arg>${userWorkflowEngine}</arg>
<arg>-logDir</arg>
<arg>${logDir}/job-${nominalTime}/</arg>
+ <arg>-workflowUser</arg>
+ <arg>${wf:user()}</arg>
<file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
<file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
<file>${wf:conf("falcon.libpath")}/jms.jar</file>
@@ -256,6 +258,8 @@
<arg>${userWorkflowEngine}</arg>
<arg>-logDir</arg>
<arg>${logDir}/job-${nominalTime}/</arg>
+ <arg>-workflowUser</arg>
+ <arg>${wf:user()}</arg>
<file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
<file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
<file>${wf:conf("falcon.libpath")}/jms.jar</file>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index 61ddbdc..b4c059a 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -46,6 +46,7 @@ import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.DECISION;
import org.apache.falcon.oozie.workflow.PIG;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -82,6 +83,8 @@ public class OozieProcessMapperTest extends AbstractTestBase {
@BeforeClass
public void setUpDFS() throws Exception {
+ CurrentUser.authenticate("falcon");
+
EmbeddedCluster cluster = EmbeddedCluster.newCluster("testCluster");
Configuration conf = cluster.getConf();
hdfsUrl = conf.get("fs.default.name");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index 4b35760..f204b15 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -25,11 +25,14 @@ import org.apache.falcon.catalog.CatalogServiceFactory;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
+import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
@@ -125,10 +128,15 @@ public class LateDataHandler extends Configured implements Tool {
return computedMetrics;
}
- private void persistMetrics(Map<String, Long> metrics, Path file) throws IOException {
+ private void persistMetrics(Map<String, Long> metrics, Path file) throws IOException, FalconException {
OutputStream out = null;
try {
- out = file.getFileSystem(getConf()).create(file);
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(file.toUri(), getConf());
+ out = fs.create(file);
+
+ // making sure falcon can read this file
+ FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+ fs.setPermission(file, permission);
for (Map.Entry<String, Long> entry : metrics.entrySet()) {
out.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
@@ -191,7 +199,7 @@ public class LateDataHandler extends Configured implements Tool {
* @throws IOException
*/
private long getFileSystemUsageMetric(String pathGroup, Configuration conf)
- throws IOException {
+ throws IOException, FalconException {
long usage = 0;
for (String pathElement : pathGroup.split(",")) {
Path inPath = new Path(pathElement);
@@ -201,8 +209,8 @@ public class LateDataHandler extends Configured implements Tool {
return usage;
}
- private long usage(Path inPath, Configuration conf) throws IOException {
- FileSystem fs = inPath.getFileSystem(conf);
+ private long usage(Path inPath, Configuration conf) throws IOException, FalconException {
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(inPath.toUri(), conf);
FileStatus[] fileStatuses = fs.globStatus(inPath);
if (fileStatuses == null || fileStatuses.length == 0) {
return 0;
@@ -251,8 +259,8 @@ public class LateDataHandler extends Configured implements Tool {
throws Exception {
StringBuilder buffer = new StringBuilder();
- BufferedReader in = new BufferedReader(new InputStreamReader(
- file.getFileSystem(conf).open(file)));
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(file.toUri(), conf);
+ BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(file)));
String line;
try {
Map<String, Long> recordedMetrics = new LinkedHashMap<String, Long>();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
index b5ac121..2b52762 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
@@ -23,10 +23,11 @@ package org.apache.falcon.rerun.event;
public class LaterunEvent extends RerunEvent {
//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
- public LaterunEvent(String clusterName, String wfId, long msgInsertTime, long delay,
- String entityType, String entityName, String instance, int runId) {
+ public LaterunEvent(String clusterName, String wfId, long msgInsertTime,
+ long delay, String entityType, String entityName,
+ String instance, int runId, String workflowUser) {
super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
- instance, runId);
+ instance, runId, workflowUser);
}
//RESUME CHECKSTYLE CHECK ParameterNumberCheck
@@ -37,6 +38,6 @@ public class LaterunEvent extends RerunEvent {
+ "msgInsertTime=" + msgInsertTime + SEP + "delayInMilliSec="
+ delayInMilliSec + SEP + "entityType=" + entityType + SEP
+ "entityName=" + entityName + SEP + "instance=" + instance
- + SEP + "runId=" + runId;
+ + SEP + "runId=" + runId + SEP + "workflowUser=" + workflowUser;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
index baf4601..254f285 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
@@ -38,6 +38,7 @@ public class RerunEvent implements Delayed {
protected String clusterName;
protected String wfId;
+ protected String workflowUser;
protected long msgInsertTime;
protected long delayInMilliSec;
protected String entityType;
@@ -47,9 +48,10 @@ public class RerunEvent implements Delayed {
//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
public RerunEvent(String clusterName, String wfId, long msgInsertTime, long delay,
- String entityType, String entityName, String instance, int runId) {
+ String entityType, String entityName, String instance, int runId, String workflowUser) {
this.clusterName = clusterName;
this.wfId = wfId;
+ this.workflowUser = workflowUser;
this.msgInsertTime = msgInsertTime;
this.delayInMilliSec = delay;
this.entityName = entityName;
@@ -67,6 +69,10 @@ public class RerunEvent implements Delayed {
return wfId;
}
+ public String getWorkflowUser() {
+ return workflowUser;
+ }
+
public long getDelayInMilliSec() {
return delayInMilliSec;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
index 03230f9..c2a8fe2 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
@@ -45,7 +45,7 @@ public class RerunEventFactory<T extends RerunEvent> {
return (T) new LaterunEvent(map.get("clusterName"), map.get("wfId"),
Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map.get("delayInMilliSec")),
map.get("entityType"), map.get("entityName"), map.get("instance"),
- Integer.parseInt(map.get("runId")));
+ Integer.parseInt(map.get("runId")), map.get("workflowUser"));
}
@SuppressWarnings("unchecked")
@@ -55,7 +55,7 @@ public class RerunEventFactory<T extends RerunEvent> {
Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map.get("delayInMilliSec")),
map.get("entityType"), map.get("entityName"), map.get("instance"),
Integer.parseInt(map.get("runId")), Integer.parseInt(map.get("attempts")),
- Integer.parseInt(map.get("failRetryCount")));
+ Integer.parseInt(map.get("failRetryCount")), map.get("workflowUser"));
}
private Map<String, String> getMap(String message) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
index 1396f19..b5312a6 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
@@ -28,9 +28,9 @@ public class RetryEvent extends RerunEvent {
//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
public RetryEvent(String clusterName, String wfId, long msgInsertTime,
long delay, String entityType, String entityName, String instance,
- int runId, int attempts, int failRetryCount) {
+ int runId, int attempts, int failRetryCount, String workflowUser) {
super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
- instance, runId);
+ instance, runId, workflowUser);
this.attempts = attempts;
this.failRetryCount = failRetryCount;
}
@@ -56,7 +56,7 @@ public class RetryEvent extends RerunEvent {
+ delayInMilliSec + SEP + "entityType=" + entityType + SEP
+ "entityName=" + entityName + SEP + "instance=" + instance
+ SEP + "runId=" + runId + SEP + "attempts=" + attempts + SEP
- + "failRetryCount=" + failRetryCount;
+ + "failRetryCount=" + failRetryCount + SEP + "workflowUser=" + workflowUser;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
index b073117..ca2304e 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
@@ -24,6 +24,7 @@ import org.apache.falcon.rerun.event.RerunEvent;
import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
import org.apache.falcon.rerun.policy.ExpBackoffPolicy;
import org.apache.falcon.rerun.queue.DelayedQueue;
+import org.apache.falcon.security.CurrentUser;
import org.apache.log4j.Logger;
/**
@@ -51,7 +52,7 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
Frequency frequency = new Frequency("minutes(1)");
while (true) {
try {
- T message = null;
+ T message;
try {
message = handler.takeFromQueue();
attempt = 1;
@@ -64,6 +65,8 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
attempt++;
continue;
}
+
+ CurrentUser.authenticate(message.getWorkflowUser());
String jobStatus = handler.getWfEngine().getWorkflowStatus(
message.getClusterName(), message.getWfId());
handleRerun(message.getClusterName(), jobStatus, message);
@@ -72,8 +75,7 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
LOG.error("Error in rerun consumer:", e);
}
}
-
}
- protected abstract void handleRerun(String cluster, String jobStatus, T message);
+ protected abstract void handleRerun(String clusterName, String jobStatus, T message);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index ab7f472..0333918 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -46,8 +46,11 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay
this.delayQueue.init();
}
- public abstract void handleRerun(String cluster, String entityType, String entityName,
- String nominalTime, String runId, String wfId, long msgReceivedTime);
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+ public abstract void handleRerun(String clusterName, String entityType,
+ String entityName, String nominalTime, String runId,
+ String wfId, String workflowUser, long msgReceivedTime);
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
public AbstractWorkflowEngine getWfEngine() {
return wfEngine;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index fffd5cd..17f4337 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -22,6 +22,7 @@ import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.process.LateInput;
+import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.latedata.LateDataHandler;
import org.apache.falcon.rerun.event.LaterunEvent;
import org.apache.falcon.rerun.queue.DelayedQueue;
@@ -45,7 +46,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
}
@Override
- protected void handleRerun(String cluster, String jobStatus,
+ protected void handleRerun(String clusterName, String jobStatus,
LaterunEvent message) {
try {
if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")
@@ -65,10 +66,9 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
+ message.getWfId()
+ " at "
+ SchemaHelper.formatDateUTC(new Date()));
- handler.handleRerun(cluster, message.getEntityType(),
- message.getEntityName(), message.getInstance(),
- Integer.toString(message.getRunId()),
- message.getWfId(), System.currentTimeMillis());
+ handler.handleRerun(clusterName, message.getEntityType(), message.getEntityName(),
+ message.getInstance(), Integer.toString(message.getRunId()),
+ message.getWfId(), message.getWorkflowUser(), System.currentTimeMillis());
return;
}
@@ -78,18 +78,14 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
LOG.info("Scheduled late rerun for wf-id: " + message.getWfId()
+ " on cluster: " + message.getClusterName());
} catch (Exception e) {
- LOG.warn(
- "Late Re-run failed for instance "
+ LOG.warn("Late Re-run failed for instance "
+ message.getEntityName() + ":"
+ message.getInstance() + " after "
- + message.getDelayInMilliSec() + " with message:",
- e);
- GenericAlert.alertLateRerunFailed(message.getEntityType(),
- message.getEntityName(), message.getInstance(),
- message.getWfId(), Integer.toString(message.getRunId()),
- e.getMessage());
+ + message.getDelayInMilliSec() + " with message:", e);
+ GenericAlert.alertLateRerunFailed(message.getEntityType(), message.getEntityName(),
+ message.getInstance(), message.getWfId(), message.getWorkflowUser(),
+ Integer.toString(message.getRunId()), e.getMessage());
}
-
}
public String detectLate(LaterunEvent message) throws Exception {
@@ -106,7 +102,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
final String storageEndpoint = properties.getProperty(AbstractWorkflowEngine.NAME_NODE);
Configuration conf = LateRerunHandler.getConfiguration(storageEndpoint);
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
if (!fs.exists(lateLogPath)) {
LOG.warn("Late log file:" + lateLogPath + " not found:");
return "";
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 897e7ab..72f93cb 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -29,13 +29,13 @@ import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.*;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.rerun.event.LaterunEvent;
import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
import org.apache.falcon.rerun.policy.RerunPolicyFactory;
import org.apache.falcon.rerun.queue.DelayedQueue;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -50,8 +50,9 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
AbstractRerunHandler<LaterunEvent, M> {
@Override
- public void handleRerun(String cluster, String entityType, String entityName,
- String nominalTime, String runId, String wfId, long msgReceivedTime) {
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+ public void handleRerun(String cluster, String entityType, String entityName, String nominalTime,
+ String runId, String wfId, String workflowUser, long msgReceivedTime) {
try {
Entity entity = EntityUtil.getEntity(entityType, entityName);
try {
@@ -66,6 +67,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
LOG.error("Unable to get Late Process for entity:" + entityName);
return;
}
+
int intRunId = Integer.parseInt(runId);
Date msgInsertTime = EntityUtil.parseDateUTC(nominalTime);
Long wait = getEventDelay(entity, nominalTime);
@@ -81,7 +83,8 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
LOG.info("Going to delete path:" + lateLogPath);
final String storageEndpoint = properties.getProperty(AbstractWorkflowEngine.NAME_NODE);
- FileSystem fs = FileSystem.get(getConfiguration(storageEndpoint));
+ Configuration conf = getConfiguration(storageEndpoint);
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
if (fs.exists(lateLogPath)) {
boolean deleted = fs.delete(lateLogPath, true);
if (deleted) {
@@ -95,16 +98,17 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
+ entityType + "(" + entityName + ")" + ":" + nominalTime
+ " And WorkflowId: " + wfId);
LaterunEvent event = new LaterunEvent(cluster, wfId, msgInsertTime.getTime(),
- wait, entityType, entityName, nominalTime, intRunId);
+ wait, entityType, entityName, nominalTime, intRunId, workflowUser);
offerToQueue(event);
} catch (Exception e) {
LOG.error("Unable to schedule late rerun for entity instance : "
+ entityType + "(" + entityName + ")" + ":" + nominalTime
+ " And WorkflowId: " + wfId, e);
GenericAlert.alertLateRerunFailed(entityType, entityName,
- nominalTime, wfId, runId, e.getMessage());
+ nominalTime, wfId, workflowUser, runId, e.getMessage());
}
}
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
private long getEventDelay(Entity entity, String nominalTime) throws FalconException {
@@ -217,7 +221,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
public static Configuration getConfiguration(String storageEndpoint) throws FalconException {
Configuration conf = new Configuration();
- conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, storageEndpoint);
+ conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageEndpoint);
return conf;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index 63dade8..bb0b34a 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -38,7 +38,7 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
}
@Override
- protected void handleRerun(String cluster, String jobStatus,
+ protected void handleRerun(String clusterName, String jobStatus,
RetryEvent message) {
try {
if (!jobStatus.equals("KILLED")) {
@@ -80,7 +80,7 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
LOG.error("Unable to re-offer to queue:", ex);
GenericAlert.alertRetryFailed(message.getEntityType(),
message.getEntityName(), message.getInstance(),
- message.getWfId(),
+ message.getWfId(), message.getWorkflowUser(),
Integer.toString(message.getRunId()),
ex.getMessage());
}
@@ -91,7 +91,7 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
+ message.getInstance(), e);
GenericAlert.alertRetryFailed(message.getEntityType(),
message.getEntityName(), message.getInstance(),
- message.getWfId(),
+ message.getWfId(), message.getWorkflowUser(),
Integer.toString(message.getRunId()),
"Failure retry attempts exhausted");
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index 2b41a7c..ef49c3a 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -38,8 +38,9 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
AbstractRerunHandler<RetryEvent, M> {
@Override
- public void handleRerun(String cluster, String entityType, String entityName,
- String nominalTime, String runId, String wfId, long msgReceivedTime) {
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+ public void handleRerun(String clusterName, String entityType, String entityName, String nominalTime,
+ String runId, String wfId, String workflowUser, long msgReceivedTime) {
try {
Entity entity = getEntity(entityType, entityName);
Retry retry = getRetry(entity);
@@ -58,9 +59,9 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
if (attempts > intRunId) {
AbstractRerunPolicy rerunPolicy = RerunPolicyFactory.getRetryPolicy(policy);
long delayTime = rerunPolicy.getDelay(delay, Integer.parseInt(runId));
- RetryEvent event = new RetryEvent(cluster, wfId,
+ RetryEvent event = new RetryEvent(clusterName, wfId,
msgReceivedTime, delayTime, entityType, entityName,
- nominalTime, intRunId, attempts, 0);
+ nominalTime, intRunId, attempts, 0, workflowUser);
offerToQueue(event);
} else {
LOG.warn("All retry attempt failed out of configured: "
@@ -69,15 +70,17 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
+ wfId);
GenericAlert.alertRetryFailed(entityType, entityName,
- nominalTime, wfId, runId,
+ nominalTime, wfId, workflowUser, runId,
"All retry attempt failed out of configured: "
+ attempts + " attempt for entity instance::");
}
} catch (FalconException e) {
LOG.error("Error during retry of entity instance " + entityName + ":" + nominalTime, e);
- GenericAlert.alertRetryFailed(entityType, entityName, nominalTime, wfId, runId, e.getMessage());
+ GenericAlert.alertRetryFailed(entityType, entityName, nominalTime,
+ wfId, workflowUser, runId, e.getMessage());
}
}
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
@Override
public void init(M aDelayQueue) throws FalconException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
index 8234d8a..bc7c999 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
@@ -109,9 +109,8 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> {
if (!retryFile.exists()) {
LOG.warn("Rerun file deleted or renamed for process-instance: "
+ event.getEntityName() + ":" + event.getInstance());
- GenericAlert.alertRetryFailed(event.getEntityType(),
- event.getEntityName(), event.getInstance(),
- event.getWfId(), Integer.toString(event.getRunId()),
+ GenericAlert.alertRetryFailed(event.getEntityType(), event.getEntityName(), event.getInstance(),
+ event.getWfId(), event.getWorkflowUser(), Integer.toString(event.getRunId()),
"Rerun file deleted or renamed for process-instance:");
} else {
if (!retryFile.delete()) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateRerunHandler.java b/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateRerunHandler.java
index e02b495..8137f60 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateRerunHandler.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateRerunHandler.java
@@ -18,12 +18,12 @@
package org.apache.falcon.rerun.handler;
-import junit.framework.Assert;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LateArrival;
+import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Date;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
index 01d0415..6b6b834 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
@@ -51,7 +51,7 @@ public class ActiveMQTest {
RerunEvent event = new LaterunEvent("clusterName", "wfId",
System.currentTimeMillis(), 60 * 1000, "entityType",
- "entityName", "instance", 0);
+ "entityName", "instance", 0, "falcon");
try {
activeMQueue.offer(event);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
index 6aafaa5..8508d37 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
@@ -45,7 +45,7 @@ public class InMemoryQueueTest {
long time = System.currentTimeMillis();
int delay = ((5 - index) / 2) * 50;
MyEvent event = new MyEvent("someCluster", Integer.toString(index),
- time, delay, "someType", "someName", "someInstance", 0);
+ time, delay, "someType", "someName", "someInstance", 0, "falcon");
queue.offer(event);
boolean inserted = false;
for (int posn = 0; posn < events.size(); posn++) {
@@ -73,9 +73,9 @@ public class InMemoryQueueTest {
//SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
public MyEvent(String clusterName, String wfId,
long msgInsertTime, long delay, String entityType,
- String entityName, String instance, int runId) {
+ String entityName, String instance, int runId, String workflowUser) {
super(clusterName, wfId, msgInsertTime, delay,
- entityType, entityName, instance, runId);
+ entityType, entityName, instance, runId, workflowUser);
}
//RESUME CHECKSTYLE CHECK VisibilityModifierCheck
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/src/bin/falcon
----------------------------------------------------------------------
diff --git a/src/bin/falcon b/src/bin/falcon
index d196a5d..113c9a7 100755
--- a/src/bin/falcon
+++ b/src/bin/falcon
@@ -30,4 +30,4 @@ BASEDIR=`dirname ${PRG}`
BASEDIR=`cd ${BASEDIR}/..;pwd`
. ${BASEDIR}/bin/falcon-config.sh 'client'
-${JAVA_BIN} -cp ${FALCONCPPATH} org.apache.falcon.cli.FalconCLI "${@}"
+${JAVA_BIN} -cp ${FALCONCPPATH} -Dfalcon.log.dir=$HOME -Dfalcon.app.type=client org.apache.falcon.cli.FalconCLI "${@}"
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/src/conf/log4j.xml
----------------------------------------------------------------------
diff --git a/src/conf/log4j.xml b/src/conf/log4j.xml
index 58ebd80..90abe26 100644
--- a/src/conf/log4j.xml
+++ b/src/conf/log4j.xml
@@ -51,6 +51,15 @@
</layout>
</appender>
+ <appender name="SECURITY" class="org.apache.log4j.DailyRollingFileAppender">
+ <param name="File" value="${falcon.log.dir}/${falcon.app.type}.security.audit.log"/>
+ <param name="Append" value="true"/>
+ <param name="Threshold" value="debug"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %x %m%n"/>
+ </layout>
+ </appender>
+
<logger name="org.apache.falcon" additivity="false">
<level value="debug"/>
<appender-ref ref="FILE"/>
@@ -66,6 +75,26 @@
<appender-ref ref="METRIC"/>
</logger>
+ <logger name="org.apache.hadoop.security" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="SECURITY"/>
+ </logger>
+
+ <logger name="org.apache.hadoop" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="FILE"/>
+ </logger>
+
+ <logger name="org.apache.oozie" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="FILE"/>
+ </logger>
+
+ <logger name="org.apache.hadoop.hive" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="FILE"/>
+ </logger>
+
<root>
<priority value="info"/>
<appender-ref ref="FILE"/>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 79cd211..0d0ab41 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -32,8 +32,9 @@
*.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager
*.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService
-*.application.services=org.apache.falcon.entity.store.ConfigurationStore,\
+*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
org.apache.falcon.service.ProcessSubscriberService,\
+ org.apache.falcon.entity.store.ConfigurationStore,\
org.apache.falcon.rerun.service.RetryService,\
org.apache.falcon.rerun.service.LateRunService,\
org.apache.falcon.service.SLAMonitoringService,\
@@ -51,18 +52,79 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
######### Implementation classes #########
+
+######### System startup parameters #########
+
+# Location to store user entity configurations
*.config.store.uri=file://${falcon.home}/store
+
+# Location of libraries that is shipped to Hadoop
*.system.lib.location=${falcon.home}/server/webapp/falcon/WEB-INF/lib
prism.system.lib.location=${falcon.home}/server/webapp/prism/WEB-INF/lib
-*.broker.url=tcp://localhost:61616
+
*.retry.recorder.path=${falcon.log.dir}/retry
*.falcon.cleanup.service.frequency=days(1)
-#default time-to-live for a JMS message 3 days (time in minutes)
+
+######### Properties for configuring JMS provider - activemq #########
+# Default Active MQ url
+*.broker.url=tcp://localhost:61616
+
+# default time-to-live for a JMS message 3 days (time in minutes)
*.broker.ttlInMins=4320
*.entity.topic=FALCON.ENTITY.TOPIC
*.max.retry.failure.count=1
######### Properties for configuring iMon client and metric #########
*.internal.queue.size=1000
+
+
+######### Authentication Properties #########
+
+# Authentication type must be specified: simple|kerberos
+*.falcon.authentication.type=simple
+
+##### Service Configuration
+
+# Indicates the Kerberos principal to be used in Falcon Service.
+*.falcon.service.authentication.kerberos.principal=
+
+# Location of the keytab file with the credentials for the Service principal.
+*.falcon.service.authentication.kerberos.keytab=
+
+# name node principal to talk to config store
+*.dfs.namenode.kerberos.principal=
+
+##### SPNEGO Configuration
+
+# Authentication type must be specified: simple|kerberos|<class>
+# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility
+*.falcon.http.authentication.type=simple
+
+# Indicates how long (in seconds) an authentication token is valid before it has to be renewed.
+*.falcon.http.authentication.token.validity=36000
+
+# The signature secret for signing the authentication tokens.
+*.falcon.http.authentication.signature.secret=falcon
+
+# The domain to use for the HTTP cookie that stores the authentication token.
+*.falcon.http.authentication.cookie.domain=
+
+# Indicates if anonymous requests are allowed when using 'simple' authentication.
+*.falcon.http.authentication.simple.anonymous.allowed=true
+
+# Indicates the Kerberos principal to be used for HTTP endpoint.
+# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
+*.falcon.http.authentication.kerberos.principal=
+
+# Location of the keytab file with the credentials for the HTTP principal.
+*.falcon.http.authentication.kerberos.keytab=
+
+# The kerberos names rules is to resolve kerberos principal names, refer to Hadoop's KerberosName for more details.
+*.falcon.http.authentication.kerberos.name.rules=DEFAULT
+
+# Comma separated list of black listed users
+*.falcon.http.authentication.blacklisted.users=
+
+######### Authentication Properties #########
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
index 2b55407..af29f93 100644
--- a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
@@ -43,7 +43,6 @@ public class EmbeddedCluster {
protected EmbeddedCluster() {
}
- //private MiniDFSCluster dfsCluster;
protected Configuration conf = newConfiguration();
protected Cluster clusterEntity;
@@ -81,6 +80,7 @@ public class EmbeddedCluster {
cluster.conf.set("jail.base", System.getProperty("hadoop.tmp.dir",
cluster.conf.get("hadoop.tmp.dir", "/tmp")));
cluster.conf.set("fs.default.name", "jail://" + (global ? "global" : name) + ":00");
+
String hdfsUrl = cluster.conf.get("fs.default.name");
LOG.info("Cluster Namenode = " + hdfsUrl);
cluster.buildClusterObject(name);
@@ -133,17 +133,9 @@ public class EmbeddedCluster {
}
public void shutdown() {
- //dfsCluster.shutdown();
}
public Cluster getCluster() {
return clusterEntity;
}
-
- public Cluster clone(String cloneName) {
- EmbeddedCluster clone = new EmbeddedCluster();
- clone.conf = this.conf;
- clone.buildClusterObject(cloneName);
- return clone.clusterEntity;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 8c37409..0c9e601 100644
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -207,6 +207,11 @@
</dependency>
<dependency>
+ <groupId>org.apache.hcatalog</groupId>
+ <artifactId>webhcat-java-client</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-hadoop-webapp</artifactId>
<type>war</type>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/src/conf/oozie/conf/oozie-site.xml
----------------------------------------------------------------------
diff --git a/webapp/src/conf/oozie/conf/oozie-site.xml b/webapp/src/conf/oozie/conf/oozie-site.xml
index e5f404a..5f644a2 100644
--- a/webapp/src/conf/oozie/conf/oozie-site.xml
+++ b/webapp/src/conf/oozie/conf/oozie-site.xml
@@ -304,7 +304,7 @@
<property>
<name>oozie.authentication.simple.anonymous.allowed</name>
- <value>true</value>
+ <value>false</value>
<description>
Indicates if anonymous requests are allowed.
This setting is meaningful only when using 'simple' authentication.
@@ -485,51 +485,11 @@
<!-- Proxyuser Configuration -->
<property>
<name>oozie.service.ProxyUserService.proxyuser.${user.name}.hosts</name>
- <value>localhost</value>
- <description></description>
- </property>
- <property>
- <name>oozie.service.ProxyUserService.proxyuser.${user.name}.groups</name>
- <value>users</value>
- <description></description>
- </property>
-
- <!--
-
- <property>
- <name>oozie.service.ProxyUserService.proxyuser.#USER#.hosts</name>
<value>*</value>
- <description>
- List of hosts the '#USER#' user is allowed to perform 'doAs'
- operations.
-
- The '#USER#' must be replaced with the username o the user who is
- allowed to perform 'doAs' operations.
-
- The value can be the '*' wildcard or a list of hostnames.
-
- For multiple users copy this property and replace the user name
- in the property name.
- </description>
</property>
-
<property>
- <name>oozie.service.ProxyUserService.proxyuser.#USER#.groups</name>
+ <name>oozie.service.ProxyUserService.proxyuser.${user.name}.groups</name>
<value>*</value>
- <description>
- List of groups the '#USER#' user is allowed to impersonate users
- from to perform 'doAs' operations.
-
- The '#USER#' must be replaced with the username o the user who is
- allowed to perform 'doAs' operations.
-
- The value can be the '*' wildcard or a list of groups.
-
- For multiple users copy this property and replace the user name
- in the property name.
- </description>
</property>
- -->
-
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/resources/log4j.xml b/webapp/src/main/resources/log4j.xml
index d133b8e..5ba6f16 100644
--- a/webapp/src/main/resources/log4j.xml
+++ b/webapp/src/main/resources/log4j.xml
@@ -54,6 +54,15 @@
</layout>
</appender>
+ <appender name="SECURITY" class="org.apache.log4j.DailyRollingFileAppender">
+ <param name="File" value="${user.dir}/target/logs/security.audit.log"/>
+ <param name="Append" value="true"/>
+ <param name="Threshold" value="debug"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %x %m%n"/>
+ </layout>
+ </appender>
+
<logger name="org.apache.falcon" additivity="false">
<level value="debug"/>
<appender-ref ref="FILE"/>
@@ -69,6 +78,26 @@
<appender-ref ref="METRIC"/>
</logger>
+ <logger name="org.apache.hadoop.security" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="SECURITY"/>
+ </logger>
+
+ <logger name="org.apache.hadoop" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="FILE"/>
+ </logger>
+
+ <logger name="org.apache.oozie" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="FILE"/>
+ </logger>
+
+ <logger name="org.apache.hadoop.hive" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="FILE"/>
+ </logger>
+
<root>
<priority value="info"/>
<appender-ref ref="FILE"/>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
index 9909140..fd004a1 100644
--- a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
+++ b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
@@ -19,6 +19,8 @@
package org.apache.falcon.catalog;
import org.apache.falcon.FalconException;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.security.CurrentUser;
import org.apache.hcatalog.api.HCatAddPartitionDesc;
import org.apache.hcatalog.api.HCatClient;
import org.apache.hcatalog.api.HCatCreateDBDesc;
@@ -55,6 +57,9 @@ public class HiveCatalogServiceIT {
@BeforeClass
public void setUp() throws Exception {
+ // setup a logged in user
+ CurrentUser.authenticate(TestContext.REMOTE_USER);
+
hiveCatalogService = new HiveCatalogService();
client = HiveCatalogService.get(METASTORE_URL);
@@ -168,22 +173,23 @@ public class HiveCatalogServiceIT {
@Test
public void testIsAlive() throws Exception {
- Assert.assertTrue(hiveCatalogService.isAlive(METASTORE_URL));
+ Assert.assertTrue(hiveCatalogService.isAlive(METASTORE_URL, "metaStorePrincipal"));
}
- @Test (expectedExceptions = FalconException.class)
+ @Test (expectedExceptions = Exception.class)
public void testIsAliveNegative() throws Exception {
- hiveCatalogService.isAlive("thrift://localhost:9999");
+ hiveCatalogService.isAlive("thrift://localhost:9999", "metaStorePrincipal");
}
@Test (expectedExceptions = FalconException.class)
public void testTableExistsNegative() throws Exception {
- hiveCatalogService.tableExists(METASTORE_URL, DATABASE_NAME, "blah");
+ hiveCatalogService.tableExists(METASTORE_URL, DATABASE_NAME, "blah", "metaStorePrincipal");
}
@Test
public void testTableExists() throws Exception {
- Assert.assertTrue(hiveCatalogService.tableExists(METASTORE_URL, DATABASE_NAME, TABLE_NAME));
+ Assert.assertTrue(hiveCatalogService.tableExists(
+ METASTORE_URL, DATABASE_NAME, TABLE_NAME, "metaStorePrincipal"));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 72369c0..5cd7beb 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -19,6 +19,7 @@
package org.apache.falcon.cli;
import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.OozieTestUtils;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -39,9 +40,6 @@ import java.util.Map;
public class FalconCLIIT {
private InMemoryWriter stream = new InMemoryWriter(System.out);
- // private static final String BROKER_URL =
- // "tcp://localhost:61616?daemon=true";
- private static final boolean TEST_ENABLED = true;
@BeforeClass
public void prepare() throws Exception {
@@ -56,7 +54,7 @@ public class FalconCLIIT {
TestContext context = new TestContext();
Map<String, String> overlay = context.getUniqueOverlay();
- filePath = context.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
Assert.assertEquals(
0,
executeWithURL("entity -submit -type cluster -file " + filePath));
@@ -64,7 +62,7 @@ public class FalconCLIIT {
Assert.assertEquals(stream.buffer.toString().trim(),
"default/Submit successful (cluster) " + context.getClusterName());
- filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
Assert.assertEquals(0,
executeWithURL("entity -submit -type feed -file " + filePath));
Assert.assertEquals(
@@ -72,7 +70,7 @@ public class FalconCLIIT {
"default/Submit successful (feed) "
+ overlay.get("inputFeedName"));
- filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
Assert.assertEquals(0,
executeWithURL("entity -submit -type feed -file " + filePath));
Assert.assertEquals(
@@ -80,7 +78,7 @@ public class FalconCLIIT {
"default/Submit successful (feed) "
+ overlay.get("outputFeedName"));
- filePath = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
Assert.assertEquals(
0,
executeWithURL("entity -submit -type process -file " + filePath));
@@ -102,29 +100,29 @@ public class FalconCLIIT {
TestContext context = new TestContext();
Map<String, String> overlay = context.getUniqueOverlay();
- filePath = context.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
+ filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
Assert.assertEquals(-1,
executeWithURL("entity -submitAndSchedule -type cluster -file "
+ filePath));
context.setCluster(overlay.get("cluster"));
- filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
Assert.assertEquals(0,
executeWithURL("entity -submitAndSchedule -type feed -file "
+ filePath));
- filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
Assert.assertEquals(0,
executeWithURL("entity -submitAndSchedule -type feed -file "
+ filePath));
- filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
Assert.assertEquals(0,
executeWithURL("entity -submit -type feed -file " + filePath));
- filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
Assert.assertEquals(0,
executeWithURL("entity -submit -type feed -file " + filePath));
- filePath = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
Assert.assertEquals(0,
executeWithURL("entity -submitAndSchedule -type process -file "
+ filePath));
@@ -136,7 +134,7 @@ public class FalconCLIIT {
TestContext context = new TestContext();
Map<String, String> overlay = context.getUniqueOverlay();
- filePath = context.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
+ filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
Assert.assertEquals(0,
executeWithURL("entity -validate -type cluster -file "
+ filePath));
@@ -146,19 +144,19 @@ public class FalconCLIIT {
executeWithURL("entity -submit -type cluster -file " + filePath));
context.setCluster(overlay.get("cluster"));
- filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
Assert.assertEquals(0,
executeWithURL("entity -validate -type feed -file " + filePath));
Assert.assertEquals(0,
executeWithURL("entity -submit -type feed -file " + filePath));
- filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
Assert.assertEquals(0,
executeWithURL("entity -validate -type feed -file " + filePath));
Assert.assertEquals(0,
executeWithURL("entity -submit -type feed -file " + filePath));
- filePath = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
Assert.assertEquals(0,
executeWithURL("entity -validate -type process -file "
+ filePath));
@@ -239,7 +237,7 @@ public class FalconCLIIT {
executeWithURL("entity -schedule -type process -name "
+ overlay.get("processName")));
- context.waitForProcessWFtoStart();
+ OozieTestUtils.waitForProcessWFtoStart(context);
Assert.assertEquals(
0,
@@ -329,7 +327,7 @@ public class FalconCLIIT {
TestContext context = new TestContext();
Map<String, String> overlay = context.getUniqueOverlay();
- context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
Assert.assertEquals(-1,
executeWithURL("entity -submit -type feed -name " + "name"));
@@ -349,7 +347,7 @@ public class FalconCLIIT {
Assert.assertEquals(0,
executeWithURL("entity -schedule -type feed -name "
+ overlay.get("outputFeedName")));
- context.waitForProcessWFtoStart();
+ OozieTestUtils.waitForProcessWFtoStart(context);
Assert.assertEquals(0,
executeWithURL("instance -status -type feed -name "
@@ -378,7 +376,7 @@ public class FalconCLIIT {
Assert.assertEquals(0,
executeWithURL("entity -schedule -type feed -name "
+ overlay.get("outputFeedName")));
- context.waitForProcessWFtoStart();
+ OozieTestUtils.waitForProcessWFtoStart(context);
Assert.assertEquals(0,
executeWithURL("instance -status -type feed -name "
@@ -428,7 +426,7 @@ public class FalconCLIIT {
executeWithURL("entity -schedule -type process -name "
+ overlay.get("processName")));
- context.waitForProcessWFtoStart();
+ OozieTestUtils.waitForProcessWFtoStart(context);
Assert.assertEquals(
0,
executeWithURL("instance -kill -type process -name "
@@ -452,7 +450,7 @@ public class FalconCLIIT {
executeWithURL("entity -schedule -type process -name "
+ overlay.get("processName")));
- context.waitForProcessWFtoStart();
+ OozieTestUtils.waitForProcessWFtoStart(context);
Assert.assertEquals(
0,
executeWithURL("instance -kill -type process -name "
@@ -481,7 +479,6 @@ public class FalconCLIIT {
Assert.assertEquals(-1,
executeWithURL("instance -kill -type process -name "
+ " -start 2010-01-01T01:00Z -end 2010-01-01T03:00Z"));
-
}
public void testFalconURL() throws Exception {
@@ -495,8 +492,6 @@ public class FalconCLIIT {
+ "processName -url http://unknownhost:1234/"
+ " -start 2010-01-01T01:00Z -end 2010-01-01T03:00Z")
.split("\\s")));
-
-
}
public void testClientProperties() throws Exception {
@@ -504,8 +499,7 @@ public class FalconCLIIT {
Map<String, String> overlay = context.getUniqueOverlay();
submitTestFiles(context, overlay);
- Assert.assertEquals(
- 0,
+ Assert.assertEquals(0,
new FalconCLI().run(("entity -schedule -type feed -name "
+ overlay.get("outputFeedName") + " -url "
+ TestContext.BASE_URL).split("\\s+")));
@@ -514,15 +508,21 @@ public class FalconCLIIT {
new FalconCLI().run(("entity -schedule -type process -name "
+ overlay.get("processName")+ " -url "
+ TestContext.BASE_URL).split("\\s+")));
-
}
public void testGetVersion() throws Exception {
Assert.assertEquals(0,
- new FalconCLI().run("admin -version".split("\\s")));
+ new FalconCLI().run(("admin -version -url " + TestContext.BASE_URL).split("\\s")));
+ }
+
+ public void testGetStatus() throws Exception {
+ Assert.assertEquals(0,
+ new FalconCLI().run(("admin -status -url " + TestContext.BASE_URL).split("\\s")));
+ }
+ public void testGetThreadStackDump() throws Exception {
Assert.assertEquals(0,
- new FalconCLI().run("admin -stack".split("\\s")));
+ new FalconCLI().run(("admin -stack -url " + TestContext.BASE_URL).split("\\s")));
}
public void testInstanceGetLogs() throws Exception {
@@ -538,7 +538,6 @@ public class FalconCLIIT {
executeWithURL("instance -logs -type process -name "
+ overlay.get("processName")
+ " -start " + START_INSTANCE + " -end " + START_INSTANCE));
-
}
private int executeWithURL(String command) throws Exception {
@@ -560,22 +559,22 @@ public class FalconCLIIT {
private void submitTestFiles(TestContext context, Map<String, String> overlay) throws Exception {
- String filePath = context.overlayParametersOverTemplate(context.getClusterFileTemplate(),
+ String filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(),
overlay);
Assert.assertEquals(
0,
executeWithURL("entity -submit -type cluster -file " + filePath));
context.setCluster(overlay.get("cluster"));
- filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
Assert.assertEquals(0,
executeWithURL("entity -submit -type feed -file " + filePath));
- filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
Assert.assertEquals(0,
executeWithURL("entity -submit -type feed -file " + filePath));
- filePath = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
Assert.assertEquals(
0,
executeWithURL("entity -submit -type process -file " + filePath));
@@ -596,6 +595,7 @@ public class FalconCLIIT {
super.println(x);
}
+ @SuppressWarnings("UnusedDeclaration")
public String getBuffer() {
return buffer.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
index 55f240f..d503735 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
@@ -19,6 +19,7 @@
package org.apache.falcon.cli;
import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.OozieTestUtils;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -44,39 +45,39 @@ public class FalconCLISmokeIT {
TestContext context = new TestContext();
Map<String, String> overlay = context.getUniqueOverlay();
- filePath = context.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
+ filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
Assert.assertEquals(-1,
executeWithURL("entity -submitAndSchedule -type cluster -file "
+ filePath));
context.setCluster(overlay.get("cluster"));
- filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
Assert.assertEquals(0,
executeWithURL("entity -submitAndSchedule -type feed -file "
+ filePath));
- filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
Assert.assertEquals(0,
executeWithURL("entity -submitAndSchedule -type feed -file "
+ filePath));
- filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
Assert.assertEquals(0,
executeWithURL("entity -submit -type feed -file " + filePath));
- filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
Assert.assertEquals(0,
executeWithURL("entity -submit -type feed -file " + filePath));
- filePath = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
Assert.assertEquals(0,
executeWithURL("entity -validate -type process -file "
+ filePath));
- filePath = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
Assert.assertEquals(0,
executeWithURL("entity -submitAndSchedule -type process -file "
+ filePath));
- context.waitForProcessWFtoStart();
+ OozieTestUtils.waitForProcessWFtoStart(context);
Assert.assertEquals(0,
executeWithURL("entity -definition -type cluster -name "
@@ -90,7 +91,6 @@ public class FalconCLISmokeIT {
Assert.assertEquals(0,
executeWithURL("instance -running -type process -name "
+ overlay.get("processName")));
-
}
private int executeWithURL(String command) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
index 6cfa4e6..ab60307 100644
--- a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
+++ b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
@@ -67,7 +67,7 @@ public class LateDataHandlerIT {
public void prepare() throws Exception {
TestContext.cleanupStore();
- String filePath = context.overlayParametersOverTemplate(
+ String filePath = TestContext.overlayParametersOverTemplate(
TestContext.CLUSTER_TEMPLATE, context.getUniqueOverlay());
context.setCluster(filePath);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
index 058b35c..92ff8ac 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
@@ -64,7 +64,7 @@ public class FileSystemFeedReplicationIT {
TestContext.cleanupStore();
Map<String, String> overlay = sourceContext.getUniqueOverlay();
- String sourceFilePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+ String sourceFilePath = TestContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
sourceContext.setCluster(sourceFilePath);
final Cluster sourceCluster = sourceContext.getCluster().getCluster();
@@ -74,21 +74,21 @@ public class FileSystemFeedReplicationIT {
final String sourcePath = sourceStorageUrl + SOURCE_LOCATION + PARTITION_VALUE;
FSUtils.copyResourceToHDFS("/apps/data/data.txt", "data.txt", sourcePath);
- String targetFilePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
+ String targetFilePath = TestContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
targetContext.setCluster(targetFilePath);
final Cluster targetCluster = targetContext.getCluster().getCluster();
copyLibsToHDFS(targetCluster);
- String file = targetAlphaContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
+ String file = TestContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
targetAlphaContext.setCluster(file);
copyLibsToHDFS(targetAlphaContext.getCluster().getCluster());
- file = targetBetaContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
+ file = TestContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
targetBetaContext.setCluster(file);
copyLibsToHDFS(targetBetaContext.getCluster().getCluster());
- file = targetGammaContext.overlayParametersOverTemplate("/table/target-cluster-gamma.xml", overlay);
+ file = TestContext.overlayParametersOverTemplate("/table/target-cluster-gamma.xml", overlay);
targetGammaContext.setCluster(file);
copyLibsToHDFS(targetGammaContext.getCluster().getCluster());
}
@@ -123,17 +123,17 @@ public class FileSystemFeedReplicationIT {
@Test (enabled = false)
public void testFSReplicationSingleSourceToTarget() throws Exception {
final Map<String, String> overlay = sourceContext.getUniqueOverlay();
- String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+ String filePath = TestContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
- filePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
+ filePath = TestContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
// verify if the partition on the source exists - precondition
FileSystem sourceFS = FileSystem.get(ClusterHelper.getConfiguration(sourceContext.getCluster().getCluster()));
Assert.assertTrue(sourceFS.exists(new Path(SOURCE_LOCATION + PARTITION_VALUE)));
- filePath = sourceContext.overlayParametersOverTemplate("/table/customer-fs-replicating-feed.xml", overlay);
+ filePath = TestContext.overlayParametersOverTemplate("/table/customer-fs-replicating-feed.xml", overlay);
Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
// wait until the workflow job completes
@@ -148,7 +148,7 @@ public class FileSystemFeedReplicationIT {
Assert.assertTrue(fs.exists(new Path(TARGET_LOCATION + PARTITION_VALUE)));
InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
- .header("Remote-User", "guest")
+ .header("Cookie", targetContext.getAuthenticationToken())
.accept(MediaType.APPLICATION_JSON)
.get(InstancesResult.class);
Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
@@ -161,23 +161,23 @@ public class FileSystemFeedReplicationIT {
@Test (enabled = false)
public void testFSReplicationSingleSourceToMultipleTargets() throws Exception {
final Map<String, String> overlay = sourceContext.getUniqueOverlay();
- String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+ String filePath = TestContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
- filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
+ filePath = TestContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
- filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
+ filePath = TestContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
- filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-gamma.xml", overlay);
+ filePath = TestContext.overlayParametersOverTemplate("/table/target-cluster-gamma.xml", overlay);
Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
// verify if the partition on the source exists - precondition
FileSystem sourceFS = FileSystem.get(ClusterHelper.getConfiguration(sourceContext.getCluster().getCluster()));
Assert.assertTrue(sourceFS.exists(new Path(SOURCE_LOCATION + PARTITION_VALUE)));
- filePath = sourceContext.overlayParametersOverTemplate("/table/multiple-targets-replicating-feed.xml", overlay);
+ filePath = TestContext.overlayParametersOverTemplate("/table/multiple-targets-replicating-feed.xml", overlay);
Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
// wait until the workflow job completes
@@ -201,7 +201,7 @@ public class FileSystemFeedReplicationIT {
gamma.exists(new Path("/falcon/test/target-cluster-gamma/customer_gamma/" + PARTITION_VALUE)));
InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
- .header("Remote-User", "guest")
+ .header("Cookie", targetContext.getAuthenticationToken())
.accept(MediaType.APPLICATION_JSON)
.get(InstancesResult.class);
Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
@@ -226,19 +226,19 @@ public class FileSystemFeedReplicationIT {
Assert.assertTrue(sourceFS.exists(sourcePath));
final Map<String, String> overlay = sourceContext.getUniqueOverlay();
- String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+ String filePath = TestContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
- filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
+ filePath = TestContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
- filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
+ filePath = TestContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
// verify if the partition on the source exists - precondition
Assert.assertTrue(sourceFS.exists(sourcePath));
- filePath = sourceContext.overlayParametersOverTemplate("/table/complex-replicating-feed.xml", overlay);
+ filePath = TestContext.overlayParametersOverTemplate("/table/complex-replicating-feed.xml", overlay);
Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
// wait until the workflow job completes
@@ -257,7 +257,7 @@ public class FileSystemFeedReplicationIT {
Assert.assertTrue(beta.exists(new Path("/localDC/rc/billing/ua2/" + partitionValue)));
InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
- .header("Remote-User", "guest")
+ .header("Cookie", targetContext.getAuthenticationToken())
.accept(MediaType.APPLICATION_JSON)
.get(InstancesResult.class);
Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
index 37226e2..770780e 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
@@ -356,6 +356,7 @@ public class TableStorageFeedEvictorIT {
super.println(x);
}
+ @SuppressWarnings("UnusedDeclaration")
public String getBuffer() {
return buffer.toString();
}