You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/05/21 11:31:39 UTC
[2/2] git commit: FALCON-240 Instance status from CLI on a feed
doesn't give the retention details. Contributed by pavan kumar kolamuri
FALCON-240 Instance status from CLI on a feed doesn't give the retention details. Contributed by pavan kumar kolamuri
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/089ecf41
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/089ecf41
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/089ecf41
Branch: refs/heads/master
Commit: 089ecf419f989a97ea6aa8e68669b25f0ab9a13e
Parents: 5afbf35
Author: Shwetha GS <sh...@inmobi.com>
Authored: Wed May 21 15:01:27 2014 +0530
Committer: Shwetha GS <sh...@inmobi.com>
Committed: Wed May 21 15:01:27 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 6 +-
.../main/java/org/apache/falcon/LifeCycle.java | 39 +++++
.../java/org/apache/falcon/cli/FalconCLI.java | 50 ++++--
.../org/apache/falcon/client/FalconClient.java | 84 ++++++---
.../apache/falcon/entity/v0/SchemaHelper.java | 2 +-
.../workflow/engine/AbstractWorkflowEngine.java | 30 ++--
docs/src/site/twiki/FalconCLI.twiki | 8 +
docs/src/site/twiki/restapi/InstanceKill.twiki | 1 +
docs/src/site/twiki/restapi/InstanceLogs.twiki | 1 +
docs/src/site/twiki/restapi/InstanceRerun.twiki | 1 +
.../src/site/twiki/restapi/InstanceResume.twiki | 2 +-
.../site/twiki/restapi/InstanceRunning.twiki | 2 +-
.../src/site/twiki/restapi/InstanceStatus.twiki | 1 +
.../site/twiki/restapi/InstanceSummary.twiki | 1 +
.../site/twiki/restapi/InstanceSuspend.twiki | 1 +
.../apache/falcon/hadoop/JailedFileSystem.java | 3 +-
.../workflow/engine/OozieWorkflowEngine.java | 169 +++++++++++++------
.../resource/AbstractInstanceManager.java | 91 +++++++---
.../resource/proxy/InstanceManagerProxy.java | 108 +++++++-----
.../falcon/cluster/util/EmbeddedCluster.java | 2 -
.../apache/falcon/resource/InstanceManager.java | 58 ++++---
.../java/org/apache/falcon/cli/FalconCLIIT.java | 76 +++++++++
22 files changed, 532 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2d46e52..b85fecc 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,12 +7,14 @@ Trunk (Unreleased)
NEW FEATURES
IMPROVEMENTS
- FALCON-133 Upgrade to slf4j 1.7.5 and use SLF4J logger. (Jean-Baptiste Onofré via Shwetha GS)
+ FALCON-133 Upgrade to slf4j 1.7.5 and use SLF4J logger. (Jean-Baptiste Onofré
+ via Shwetha GS)
OPTIMIZATIONS
BUG FIXES
-
+ FALCON-240 Instance status from CLI on a feed doesn't give the retention details.
+ (pavan kumar kolamuri via Shwetha GS)
Release Version: 0.5-incubating
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/client/src/main/java/org/apache/falcon/LifeCycle.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/LifeCycle.java b/client/src/main/java/org/apache/falcon/LifeCycle.java
new file mode 100644
index 0000000..58a2a6c
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/LifeCycle.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon;
+
+/**
+ * Represents life cycle of an entity.
+ */
+
+public enum LifeCycle {
+ EXECUTION(Tag.DEFAULT),
+ EVICTION(Tag.RETENTION),
+ REPLICATION(Tag.REPLICATION);
+
+ private final Tag tag;
+
+ LifeCycle(Tag tag) {
+ this.tag = tag;
+ }
+
+ public Tag getTag() {
+ return this.tag;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index 2fb0729..c51c3c0 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -25,6 +25,7 @@ import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
+import org.apache.falcon.LifeCycle;
import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.client.FalconClient;
import org.apache.falcon.entity.v0.SchemaHelper;
@@ -37,6 +38,8 @@ import java.util.Date;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
+import java.util.List;
+import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -89,6 +92,7 @@ public class FalconCLI {
public static final String SOURCECLUSTER_OPT = "sourceClusters";
public static final String CURRENT_COLO = "current.colo";
public static final String CLIENT_PROPERTIES = "/client.properties";
+ public static final String LIFECYCLE_OPT = "lifecycle";
// Graph Commands
public static final String GRAPH_CMD = "graph";
@@ -126,7 +130,6 @@ public class FalconCLI {
+ "' option",
"custom headers for Falcon web services can be specified using '-D"
+ FalconClient.WS_HEADER_PREFIX + "NAME=VALUE'", };
-
/**
* Run a CLI programmatically.
* <p/>
@@ -213,29 +216,31 @@ public class FalconCLI {
String colo = commandLine.getOptionValue(COLO_OPT);
String clusters = commandLine.getOptionValue(CLUSTERS_OPT);
String sourceClusters = commandLine.getOptionValue(SOURCECLUSTER_OPT);
+ List<LifeCycle> lifeCycles = getLifeCycle(commandLine.getOptionValue(LIFECYCLE_OPT));
colo = getColo(colo);
validateInstanceCommands(optionsList, entity, type, start, colo);
if (optionsList.contains(RUNNING_OPT)) {
- result = client.getRunningInstances(type, entity, colo);
+ result = client.getRunningInstances(type, entity, colo, lifeCycles);
} else if (optionsList.contains(STATUS_OPT)) {
- result = client.getStatusOfInstances(type, entity, start, end, colo);
+ result = client.getStatusOfInstances(type, entity, start, end, colo, lifeCycles);
} else if (optionsList.contains(SUMMARY_OPT)) {
- result = client.getSummaryOfInstances(type, entity, start, end, colo);
+ result = client.getSummaryOfInstances(type, entity, start, end, colo, lifeCycles);
} else if (optionsList.contains(KILL_OPT)) {
- result = client.killInstances(type, entity, start, end, colo, clusters, sourceClusters);
+ result = client.killInstances(type, entity, start, end, colo, clusters, sourceClusters, lifeCycles);
} else if (optionsList.contains(SUSPEND_OPT)) {
- result = client.suspendInstances(type, entity, start, end, colo, clusters, sourceClusters);
+ result = client.suspendInstances(type, entity, start, end, colo, clusters, sourceClusters, lifeCycles);
} else if (optionsList.contains(RESUME_OPT)) {
- result = client.resumeInstances(type, entity, start, end, colo, clusters, sourceClusters);
+ result = client.resumeInstances(type, entity, start, end, colo, clusters, sourceClusters, lifeCycles);
} else if (optionsList.contains(RERUN_OPT)) {
- result = client.rerunInstances(type, entity, start, end, filePath, colo, clusters, sourceClusters);
+ result = client.rerunInstances(type, entity, start, end, filePath, colo, clusters, sourceClusters,
+ lifeCycles);
} else if (optionsList.contains(CONTINUE_OPT)) {
- result = client.rerunInstances(type, entity, start, end, colo, clusters, sourceClusters);
+ result = client.rerunInstances(type, entity, start, end, colo, clusters, sourceClusters, lifeCycles);
} else if (optionsList.contains(LOG_OPT)) {
- result = client.getLogsOfInstances(type, entity, start, end, colo, runid);
+ result = client.getLogsOfInstances(type, entity, start, end, colo, runid, lifeCycles);
} else {
throw new FalconCLIException("Invalid command");
}
@@ -579,9 +584,14 @@ public class FalconCLI {
Option entityType = new Option(ENTITY_TYPE_OPT, true,
"Entity type, can be feed or process xml");
Option entityName = new Option(ENTITY_NAME_OPT, true,
- "Entity type, can be feed or process xml");
+ "Entity name, can be feed or process name");
Option colo = new Option(COLO_OPT, true,
"Colo on which the cmd has to be executed");
+ Option lifecycle = new Option(LIFECYCLE_OPT,
+ true,
+ "describes life cycle of entity , for feed it can be replication/retention "
+ + "and for process it can be execution");
+
instanceOptions.addOption(url);
instanceOptions.addOptionGroup(group);
@@ -594,6 +604,7 @@ public class FalconCLI {
instanceOptions.addOption(clusters);
instanceOptions.addOption(sourceClusters);
instanceOptions.addOption(colo);
+ instanceOptions.addOption(lifecycle);
return instanceOptions;
}
@@ -755,4 +766,21 @@ public class FalconCLI {
IOUtils.closeQuietly(inputStream);
}
}
+
+ public static List<LifeCycle> getLifeCycle(String lifeCycleValue) throws FalconCLIException {
+
+ if (lifeCycleValue != null) {
+ String[] lifeCycleValues = lifeCycleValue.split(",");
+ List<LifeCycle> lifeCycles = new ArrayList<LifeCycle>();
+ try {
+ for (String lifeCycle : lifeCycleValues) {
+ lifeCycles.add(LifeCycle.valueOf(lifeCycle.toUpperCase().trim()));
+ }
+ } catch (IllegalArgumentException e) {
+ throw new FalconCLIException("Invalid life cycle values: " + lifeCycles, e);
+ }
+ return lifeCycles;
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 7c75d57..beecc0f 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -25,6 +25,8 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.HTTPSProperties;
import org.apache.commons.io.IOUtils;
import org.apache.commons.net.util.TrustManagerUtils;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.EntityList;
@@ -54,6 +56,7 @@ import java.io.UnsupportedEncodingException;
import java.net.URL;
import java.security.SecureRandom;
import java.util.Date;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -329,60 +332,59 @@ public class FalconClient {
return sendListRequest(Entities.LIST, entityType);
}
- public String getRunningInstances(String type, String entity, String colo)
+ public String getRunningInstances(String type, String entity, String colo, List<LifeCycle> lifeCycles)
throws FalconCLIException {
return sendInstanceRequest(Instances.RUNNING, type, entity, null, null,
- null, null, colo);
+ null, null, colo, lifeCycles);
}
public String getStatusOfInstances(String type, String entity,
String start, String end,
- String colo) throws FalconCLIException {
+ String colo, List<LifeCycle> lifeCycles) throws FalconCLIException {
return sendInstanceRequest(Instances.STATUS, type, entity, start, end,
- null, null, colo);
+ null, null, colo, lifeCycles);
}
public String getSummaryOfInstances(String type, String entity,
- String start, String end,
- String colo) throws FalconCLIException {
+ String start, String end,
+ String colo, List<LifeCycle> lifeCycles) throws FalconCLIException {
return sendInstanceRequest(Instances.SUMMARY, type, entity, start, end,
- null, null, colo);
+ null, null, colo, lifeCycles);
}
-
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
public String killInstances(String type, String entity, String start,
String end, String colo, String clusters,
- String sourceClusters)
+ String sourceClusters, List<LifeCycle> lifeCycles)
throws FalconCLIException, UnsupportedEncodingException {
return sendInstanceRequest(Instances.KILL, type, entity, start, end,
- getServletInputStream(clusters, sourceClusters, null), null, colo);
+ getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles);
}
public String suspendInstances(String type, String entity, String start,
String end, String colo, String clusters,
- String sourceClusters)
+ String sourceClusters, List<LifeCycle> lifeCycles)
throws FalconCLIException, UnsupportedEncodingException {
return sendInstanceRequest(Instances.SUSPEND, type, entity, start, end,
- getServletInputStream(clusters, sourceClusters, null), null, colo);
+ getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles);
}
public String resumeInstances(String type, String entity, String start,
String end, String colo, String clusters,
- String sourceClusters)
+ String sourceClusters, List<LifeCycle> lifeCycles)
throws FalconCLIException, UnsupportedEncodingException {
return sendInstanceRequest(Instances.RESUME, type, entity, start, end,
- getServletInputStream(clusters, sourceClusters, null), null, colo);
+ getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles);
}
- //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
public String rerunInstances(String type, String entity, String start,
String end, String filePath, String colo,
- String clusters, String sourceClusters)
+ String clusters, String sourceClusters, List<LifeCycle> lifeCycles)
throws FalconCLIException, IOException {
StringBuilder buffer = new StringBuilder();
@@ -401,25 +403,28 @@ public class FalconClient {
}
String temp = (buffer.length() == 0) ? null : buffer.toString();
return sendInstanceRequest(Instances.RERUN, type, entity, start, end,
- getServletInputStream(clusters, sourceClusters, temp), null, colo);
+ getServletInputStream(clusters, sourceClusters, temp), null, colo, lifeCycles);
}
- //RESUME CHECKSTYLE CHECK ParameterNumberCheck
public String rerunInstances(String type, String entity, String start,
- String end, String colo, String clusters, String sourceClusters)
+ String end, String colo, String clusters, String sourceClusters,
+ List<LifeCycle> lifeCycles)
throws FalconCLIException, UnsupportedEncodingException {
return sendInstanceRequest(Instances.RERUN, type, entity, start, end,
- getServletInputStream(clusters, sourceClusters, "oozie.wf.rerun.failnodes=true\n"), null, colo);
+ getServletInputStream(clusters, sourceClusters, "oozie.wf.rerun.failnodes=true\n"), null, colo,
+ lifeCycles);
}
public String getLogsOfInstances(String type, String entity, String start,
- String end, String colo, String runId)
+ String end, String colo, String runId,
+ List<LifeCycle> lifeCycles)
throws FalconCLIException {
return sendInstanceRequest(Instances.LOG, type, entity, start, end,
- null, runId, colo);
+ null, runId, colo, lifeCycles);
}
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
public String getThreadDump() throws FalconCLIException {
return sendAdminRequest(AdminOperations.STACK);
@@ -558,7 +563,9 @@ public class FalconClient {
//SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
private String sendInstanceRequest(Instances instances, String type,
String entity, String start, String end, InputStream props,
- String runid, String colo) throws FalconCLIException {
+ String runid, String colo,
+ List<LifeCycle> lifeCycles) throws FalconCLIException {
+ checkType(type);
WebResource resource = service.path(instances.path).path(type)
.path(entity);
if (start != null) {
@@ -574,6 +581,13 @@ public class FalconClient {
resource = resource.queryParam("colo", colo);
}
+ if (lifeCycles != null) {
+ checkLifeCycleOption(lifeCycles, type);
+ for (LifeCycle lifeCycle : lifeCycles) {
+ resource = resource.queryParam("lifecycle", lifeCycle.toString());
+ }
+ }
+
ClientResponse clientResponse;
if (props == null) {
clientResponse = resource
@@ -599,6 +613,30 @@ public class FalconClient {
}
//RESUME CHECKSTYLE CHECK VisibilityModifierCheck
+ private void checkLifeCycleOption(List<LifeCycle> lifeCycles, String type) throws FalconCLIException {
+ if (lifeCycles != null && !lifeCycles.isEmpty()) {
+ EntityType entityType = EntityType.valueOf(type.toUpperCase().trim());
+ for (LifeCycle lifeCycle : lifeCycles) {
+ if (entityType != lifeCycle.getTag().getType()) {
+ throw new FalconCLIException("Incorrect lifecycle: " + lifeCycle + "for given type: " + type);
+ }
+ }
+ }
+ }
+
+ protected void checkType(String type) throws FalconCLIException {
+ if (type == null || type.isEmpty()) {
+ throw new FalconCLIException("entity type is empty");
+ } else {
+ EntityType entityType = EntityType.valueOf(type.toUpperCase().trim());
+ if (entityType == EntityType.CLUSTER) {
+ throw new FalconCLIException(
+ "Instance management functions don't apply to Cluster entities");
+ }
+ }
+ }
+
+
private String sendAdminRequest(AdminOperations job)
throws FalconCLIException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/client/src/main/java/org/apache/falcon/entity/v0/SchemaHelper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/entity/v0/SchemaHelper.java b/client/src/main/java/org/apache/falcon/entity/v0/SchemaHelper.java
index 9061a9e..62b810c 100644
--- a/client/src/main/java/org/apache/falcon/entity/v0/SchemaHelper.java
+++ b/client/src/main/java/org/apache/falcon/entity/v0/SchemaHelper.java
@@ -37,7 +37,7 @@ public final class SchemaHelper {
return tz.getID();
}
- private static DateFormat getDateFormat() {
+ public static DateFormat getDateFormat() {
DateFormat dateFormat = new SimpleDateFormat(ISO8601_FORMAT);
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
return dateFormat;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index f700b8f..c28cb03 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -19,15 +19,18 @@
package org.apache.falcon.workflow.engine;
import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesSummaryResult;
-import java.util.Date;
import java.util.HashSet;
+import java.util.List;
import java.util.Properties;
import java.util.Set;
+import java.util.Date;
+
/**
* Workflow engine should minimally support the
@@ -62,23 +65,26 @@ public abstract class AbstractWorkflowEngine {
public abstract boolean isSuspended(Entity entity) throws FalconException;
- public abstract InstancesResult getRunningInstances(Entity entity) throws FalconException;
+ public abstract InstancesResult getRunningInstances(Entity entity,
+ List<LifeCycle> lifeCycles) throws FalconException;
- public abstract InstancesResult killInstances(Entity entity, Date start, Date end, Properties props)
- throws FalconException;
+ public abstract InstancesResult killInstances(Entity entity, Date start, Date end, Properties props,
+ List<LifeCycle> lifeCycles) throws FalconException;
- public abstract InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props)
- throws FalconException;
+ public abstract InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props,
+ List<LifeCycle> lifeCycles) throws FalconException;
- public abstract InstancesResult suspendInstances(Entity entity, Date start, Date end, Properties props)
- throws FalconException;
+ public abstract InstancesResult suspendInstances(Entity entity, Date start, Date end, Properties props,
+ List<LifeCycle> lifeCycles) throws FalconException;
- public abstract InstancesResult resumeInstances(Entity entity, Date start, Date end, Properties props)
- throws FalconException;
+ public abstract InstancesResult resumeInstances(Entity entity, Date start, Date end, Properties props,
+ List<LifeCycle> lifeCycles) throws FalconException;
- public abstract InstancesResult getStatus(Entity entity, Date start, Date end) throws FalconException;
+ public abstract InstancesResult getStatus(Entity entity, Date start, Date end,
+ List<LifeCycle> lifeCycles) throws FalconException;
- public abstract InstancesSummaryResult getSummary(Entity entity, Date start, Date end) throws FalconException;
+ public abstract InstancesSummaryResult getSummary(Entity entity, Date start, Date end,
+ List<LifeCycle> lifeCycles) throws FalconException;
public abstract Date update(Entity oldEntity, Entity newEntity, String cluster, Date effectiveTime)
throws FalconException;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index bf2f15d..dbb6981 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -164,6 +164,14 @@ Get logs for instance actions
Usage:
$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -logs -start "yyyy-MM-dd'T'HH:mm'Z'" [-end "yyyy-MM-dd'T'HH:mm'Z'"] [-runid <<runid>>]
+---+++LifeCycle
+
+Describes list of life cycles of a entity , for feed it can be replication/retention and for process it can be execution.
+This can be used with instance management options. Default values are replication for feed and execution for process.
+
+Usage:
+$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -status -lifecycle <<lifecycletype>> -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'"
+
---++ Graphs Options
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/docs/src/site/twiki/restapi/InstanceKill.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceKill.twiki b/docs/src/site/twiki/restapi/InstanceKill.twiki
index eff2893..b8dbc4a 100644
--- a/docs/src/site/twiki/restapi/InstanceKill.twiki
+++ b/docs/src/site/twiki/restapi/InstanceKill.twiki
@@ -11,6 +11,7 @@ Kill a currently running instance.
* :entity-type can either be a feed or a process.
* :entity-name is name of the entity.
* start start time of the entity.
+ * lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process.
---++ Results
Result of the kill operation.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/docs/src/site/twiki/restapi/InstanceLogs.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceLogs.twiki b/docs/src/site/twiki/restapi/InstanceLogs.twiki
index 599f2d7..8522e75 100644
--- a/docs/src/site/twiki/restapi/InstanceLogs.twiki
+++ b/docs/src/site/twiki/restapi/InstanceLogs.twiki
@@ -12,6 +12,7 @@ Get log of a specific instance of an entity.
* :entity-name is name of the entity.
* start is the start time of the instance that you want to refer to
* end <optional param> is the end time of the instance that you want to refer to
+ * lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process.
---++ Results
Log of specified instance.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/docs/src/site/twiki/restapi/InstanceRerun.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceRerun.twiki b/docs/src/site/twiki/restapi/InstanceRerun.twiki
index 77608e0..f622480 100644
--- a/docs/src/site/twiki/restapi/InstanceRerun.twiki
+++ b/docs/src/site/twiki/restapi/InstanceRerun.twiki
@@ -11,6 +11,7 @@ Rerun a specific instance of an entity.
* :entity-type can either be a feed or a process.
* :entity-name is name of the entity.
* start is the start time of the instance that you want to refer to
+ * lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process.
---++ Results
Results of the rerun command.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/docs/src/site/twiki/restapi/InstanceResume.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceResume.twiki b/docs/src/site/twiki/restapi/InstanceResume.twiki
index 2d29569..0ea8669 100644
--- a/docs/src/site/twiki/restapi/InstanceResume.twiki
+++ b/docs/src/site/twiki/restapi/InstanceResume.twiki
@@ -11,7 +11,7 @@ Resume a specific instance of an entity.
* :entity-type can either be a feed or a process.
* :entity-name is name of the entity.
* start is the start time of the instance that you want to refer to
-
+ * lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process.
---++ Results
Results of the resume command.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/docs/src/site/twiki/restapi/InstanceRunning.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceRunning.twiki b/docs/src/site/twiki/restapi/InstanceRunning.twiki
index 116565f..7fde90c 100644
--- a/docs/src/site/twiki/restapi/InstanceRunning.twiki
+++ b/docs/src/site/twiki/restapi/InstanceRunning.twiki
@@ -10,7 +10,7 @@ Get a list of instances currently running for a given entity.
---++ Parameters
* :entity-type can either be a feed or a process.
* :entity-name is name of the entity.
-
+ * lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process.
---++ Results
List of instances currently running.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/docs/src/site/twiki/restapi/InstanceStatus.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceStatus.twiki b/docs/src/site/twiki/restapi/InstanceStatus.twiki
index 99497d1..519f55e 100644
--- a/docs/src/site/twiki/restapi/InstanceStatus.twiki
+++ b/docs/src/site/twiki/restapi/InstanceStatus.twiki
@@ -12,6 +12,7 @@ Get status of a specific instance of an entity.
* :entity-name is name of the entity.
* start is the start time of the instance that you want to refer to
* end <optional param> is the end time of the instance that you want to refer to
+ * lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process.
---++ Results
Status of the specified instance.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/docs/src/site/twiki/restapi/InstanceSummary.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceSummary.twiki b/docs/src/site/twiki/restapi/InstanceSummary.twiki
index bd1d2e5..ee1d41f 100644
--- a/docs/src/site/twiki/restapi/InstanceSummary.twiki
+++ b/docs/src/site/twiki/restapi/InstanceSummary.twiki
@@ -12,6 +12,7 @@ Get summary of instance/instances of an entity.
* :entity-name is name of the entity.
* start is the start time of the instance that you want to refer to
* end <optional param> is the end time of the instance that you want to refer to
+ * lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process.
---++ Results
Summary of the instances over the specified time range
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/docs/src/site/twiki/restapi/InstanceSuspend.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceSuspend.twiki b/docs/src/site/twiki/restapi/InstanceSuspend.twiki
index 8f5f7c3..c2adfdc 100644
--- a/docs/src/site/twiki/restapi/InstanceSuspend.twiki
+++ b/docs/src/site/twiki/restapi/InstanceSuspend.twiki
@@ -11,6 +11,7 @@ Suspend a specific instance of an entity.
* :entity-type can either be a feed or a process.
* :entity-name is name of the entity.
* start is the start time of the instance that you want to refer to
+ * lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process.
---++ Results
Results of the suspend command.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
index cdf758e..107ad13 100644
--- a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
+++ b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
@@ -58,7 +58,8 @@ public class JailedFileSystem extends FileSystem {
throw new IOException("Incomplete Jail URI, no jail base: "+ name);
}
basePath = new Path(conf.get("jail.base", System.getProperty("hadoop.tmp.dir",
- conf.get("hadoop.tmp.dir", "/tmp"))) + "/jail-fs/" + base).toUri().getPath();
+ System.getProperty("user.dir") + "/webapp/target/tmp-hadoop-"
+ + System.getProperty("user.name"))) + "/jail-fs/" + base).toUri().getPath();
this.uri = URI.create(name.getScheme()+"://"+name.getAuthority());
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 57fca0f..34192c0 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -20,12 +20,16 @@ package org.apache.falcon.workflow.engine;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
+import org.apache.falcon.LifeCycle;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.*;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityGraph;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.resource.APIResult;
@@ -36,20 +40,39 @@ import org.apache.falcon.resource.InstancesSummaryResult;
import org.apache.falcon.resource.InstancesSummaryResult.InstanceSummary;
import org.apache.falcon.update.UpdateHelper;
import org.apache.falcon.util.OozieUtils;
+import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.OozieWorkflowBuilder;
import org.apache.falcon.workflow.WorkflowBuilder;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.oozie.client.*;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.CoordinatorJob.Timeunit;
+import org.apache.oozie.client.Job;
import org.apache.oozie.client.Job.Status;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.apache.oozie.client.ProxyOozieClient;
+import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.rest.RestConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TimeZone;
/**
* Workflow engine which uses oozies APIs.
@@ -63,7 +86,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private static final List<WorkflowJob.Status> WF_KILL_PRECOND =
Arrays.asList(WorkflowJob.Status.PREP, WorkflowJob.Status.RUNNING, WorkflowJob.Status.SUSPENDED,
- WorkflowJob.Status.FAILED);
+ WorkflowJob.Status.FAILED);
private static final List<WorkflowJob.Status> WF_SUSPEND_PRECOND = Arrays.asList(WorkflowJob.Status.RUNNING);
private static final List<WorkflowJob.Status> WF_RESUME_PRECOND = Arrays.asList(WorkflowJob.Status.SUSPENDED);
private static final List<WorkflowJob.Status> WF_RERUN_PRECOND =
@@ -377,7 +400,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
@Override
- public InstancesResult getRunningInstances(Entity entity) throws FalconException {
+ public InstancesResult getRunningInstances(Entity entity, List<LifeCycle> lifeCycles) throws FalconException {
try {
WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
@@ -385,7 +408,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
String[] wfNames = builder.getWorkflowNames();
List<String> coordNames = new ArrayList<String>();
for (String wfName : wfNames) {
- if (EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString().equals(wfName)) {
+ if (!isCoordApplicable(wfName, lifeCycles)) {
continue;
}
coordNames.add(wfName);
@@ -420,38 +443,41 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
@Override
- public InstancesResult killInstances(Entity entity, Date start, Date end, Properties props) throws FalconException {
- return doJobAction(JobAction.KILL, entity, start, end, props);
+ public InstancesResult killInstances(Entity entity, Date start, Date end,
+ Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+ return doJobAction(JobAction.KILL, entity, start, end, props, lifeCycles);
}
@Override
- public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props)
- throws FalconException {
- return doJobAction(JobAction.RERUN, entity, start, end, props);
+ public InstancesResult reRunInstances(Entity entity, Date start, Date end,
+ Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+ return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles);
}
@Override
- public InstancesResult suspendInstances(Entity entity, Date start, Date end, Properties props)
- throws FalconException {
- return doJobAction(JobAction.SUSPEND, entity, start, end, props);
+ public InstancesResult suspendInstances(Entity entity, Date start, Date end,
+ Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+ return doJobAction(JobAction.SUSPEND, entity, start, end, props, lifeCycles);
}
@Override
- public InstancesResult resumeInstances(Entity entity, Date start, Date end, Properties props)
- throws FalconException {
- return doJobAction(JobAction.RESUME, entity, start, end, props);
+ public InstancesResult resumeInstances(Entity entity, Date start, Date end,
+ Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+ return doJobAction(JobAction.RESUME, entity, start, end, props, lifeCycles);
}
@Override
- public InstancesResult getStatus(Entity entity, Date start, Date end) throws FalconException {
+ public InstancesResult getStatus(Entity entity, Date start, Date end,
+ List<LifeCycle> lifeCycles) throws FalconException {
- return doJobAction(JobAction.STATUS, entity, start, end, null);
+ return doJobAction(JobAction.STATUS, entity, start, end, null, lifeCycles);
}
@Override
- public InstancesSummaryResult getSummary(Entity entity, Date start, Date end) throws FalconException {
+ public InstancesSummaryResult getSummary(Entity entity, Date start, Date end,
+ List<LifeCycle> lifeCycles) throws FalconException {
- return doSummaryJobAction(entity, start, end, null);
+ return doSummaryJobAction(entity, start, end, null, lifeCycles);
}
private static enum JobAction {
@@ -466,9 +492,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
- private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, Properties props)
- throws FalconException {
- Map<String, List<CoordinatorAction>> actionsMap = getCoordActions(entity, start, end);
+ private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end,
+ Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+ Map<String, List<CoordinatorAction>> actionsMap = getCoordActions(entity, start, end, lifeCycles);
List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS);
List<String> sourceClusterList = getIncludedClusters(props, FALCON_INSTANCE_SOURCE_CLUSTERS);
APIResult.Status overallStatus = APIResult.Status.SUCCEEDED;
@@ -522,8 +548,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
return instancesResult;
}
- private InstancesSummaryResult doSummaryJobAction(Entity entity, Date start, Date end, Properties props)
- throws FalconException {
+ private InstancesSummaryResult doSummaryJobAction(Entity entity, Date start,
+ Date end, Properties props,
+ List<LifeCycle> lifeCycles) throws FalconException {
Map<String, List<BundleJob>> bundlesMap = findBundles(entity);
List<InstanceSummary> instances = new ArrayList<InstanceSummary>();
@@ -538,7 +565,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
List<BundleJob> bundles = entry.getValue();
ProxyOozieClient client = OozieClientFactory.get(cluster);
- List<CoordinatorJob> applicableCoords = getApplicableCoords(entity, client, start, end, bundles);
+ List<CoordinatorJob> applicableCoords = getApplicableCoords(entity, client, start, end,
+ bundles, lifeCycles);
long unscheduledInstances = 0;
boolean isLastCoord = false;
@@ -550,7 +578,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
Date iterEnd = (coord.getLastActionTime() != null && coord.getLastActionTime().before(end)
? coord.getLastActionTime() : end);
- if (i == (applicableCoords.size() - 1)) {
+ if (i == 0) {
isLastCoord = true;
}
@@ -741,8 +769,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
- protected Map<String, List<CoordinatorAction>> getCoordActions(Entity entity, Date start, Date end)
- throws FalconException {
+ protected Map<String, List<CoordinatorAction>> getCoordActions(Entity entity, Date start, Date end,
+ List<LifeCycle> lifeCycles) throws FalconException {
Map<String, List<BundleJob>> bundlesMap = findBundles(entity);
Map<String, List<CoordinatorAction>> actionsMap = new HashMap<String, List<CoordinatorAction>>();
@@ -750,8 +778,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
String cluster = entry.getKey();
List<BundleJob> bundles = entry.getValue();
ProxyOozieClient client = OozieClientFactory.get(cluster);
- List<CoordinatorJob> applicableCoords = getApplicableCoords(entity, client, start, end, bundles);
+ List<CoordinatorJob> applicableCoords =
+ getApplicableCoords(entity, client, start, end, bundles, lifeCycles);
List<CoordinatorAction> actions = new ArrayList<CoordinatorAction>();
+ int maxRetentionInstancesCount =
+ Integer.valueOf(RuntimeProperties.get().getProperty("retention.instances.displaycount", "2"));
+ int retentionInstancesCount = 0;
for (CoordinatorJob coord : applicableCoords) {
Date nextMaterializedTime = coord.getNextMaterializedTime();
@@ -759,28 +791,27 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
continue;
}
+ boolean retentionCoord = isRetentionCoord(coord);
Frequency freq = createFrequency(String.valueOf(coord.getFrequency()), coord.getTimeUnit());
TimeZone tz = EntityUtil.getTimeZone(coord.getTimeZone());
Date iterStart = EntityUtil.getNextStartTime(coord.getStartTime(), freq, tz, start);
- Date iterEnd = (nextMaterializedTime.before(end) ? nextMaterializedTime : end);
-
- while (!iterStart.after(iterEnd)) {
- int sequence = EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, iterStart);
- String actionId = coord.getId() + "@" + sequence;
+ Date iterEnd = ((nextMaterializedTime.before(end) || retentionCoord) ? nextMaterializedTime : end);
- try {
- CoordinatorAction coordActionInfo = client.getCoordActionInfo(actionId);
- if (coordActionInfo != null) {
- actions.add(coordActionInfo);
+ while (iterStart.before(iterEnd)) {
+ if (retentionCoord) {
+ if (retentionInstancesCount >= maxRetentionInstancesCount) {
+ break;
}
- } catch (OozieClientException e) {
- LOG.debug("Unable to get action for {}", actionId, e);
+ retentionInstancesCount++;
}
- Calendar startCal = Calendar.getInstance(EntityUtil.getTimeZone(coord.getTimeZone()));
- startCal.setTime(iterStart);
- startCal.add(freq.getTimeUnit().getCalendarUnit(), Integer.valueOf((coord.getFrequency())));
- iterStart = startCal.getTime();
+ int sequence = EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, iterEnd);
+ String actionId = coord.getId() + "@" + sequence;
+ addCoordAction(client, actions, actionId);
+ Calendar endCal = Calendar.getInstance(EntityUtil.getTimeZone(coord.getTimeZone()));
+ endCal.setTime(iterEnd);
+ endCal.add(freq.getTimeUnit().getCalendarUnit(), -(Integer.valueOf((coord.getFrequency()))));
+ iterEnd = endCal.getTime();
}
}
actionsMap.put(cluster, actions);
@@ -788,6 +819,25 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
return actionsMap;
}
+ private boolean isRetentionCoord(CoordinatorJob coord){
+ if (coord.getAppName().contains(LifeCycle.EVICTION.getTag().name())) {
+ return true;
+ }
+ return false;
+ }
+
+ private void addCoordAction(ProxyOozieClient client, List<CoordinatorAction> actions, String actionId) {
+ CoordinatorAction coordActionInfo = null;
+ try {
+ coordActionInfo = client.getCoordActionInfo(actionId);
+ } catch (OozieClientException e) {
+ LOG.debug("Unable to get action for " + actionId + " " + e.getMessage());
+ }
+ if (coordActionInfo != null) {
+ actions.add(coordActionInfo);
+ }
+ }
+
private Frequency createFrequency(String frequency, Timeunit timeUnit) {
return new Frequency(frequency, OozieTimeUnit.valueOf(timeUnit.name()).getFalconTimeUnit());
}
@@ -814,15 +864,17 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private List<CoordinatorJob> getApplicableCoords(Entity entity, ProxyOozieClient client, Date start,
- Date end, List<BundleJob> bundles) throws FalconException {
- String retentionCoordName = EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString();
+ Date end, List<BundleJob> bundles,
+ List<LifeCycle> lifeCycles) throws FalconException {
List<CoordinatorJob> applicableCoords = new ArrayList<CoordinatorJob>();
try {
for (BundleJob bundle : bundles) {
List<CoordinatorJob> coords = client.getBundleJobInfo(bundle.getId()).getCoordinators();
for (CoordinatorJob coord : coords) {
// ignore coords in PREP state, not yet running and retention coord
- if (coord.getStatus() == Status.PREP || retentionCoordName.equals(coord.getAppName())) {
+
+ if (coord.getStatus() == Status.PREP
+ || !isCoordApplicable(coord.getAppName(), lifeCycles)) {
continue;
}
@@ -833,24 +885,34 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
}
- sortCoordsByStartTime(applicableCoords);
+ sortDescByStartTime(applicableCoords);
return applicableCoords;
} catch (OozieClientException e) {
throw new FalconException(e);
}
}
- protected void sortCoordsByStartTime(List<CoordinatorJob> consideredCoords) {
+ private boolean isCoordApplicable(String appName, List<LifeCycle> lifeCycles) {
+ for (LifeCycle lifeCycle : lifeCycles) {
+ if (appName.contains(lifeCycle.getTag().name())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ protected void sortDescByStartTime(List<CoordinatorJob> consideredCoords) {
Collections.sort(consideredCoords, new Comparator<CoordinatorJob>() {
@Override
public int compare(CoordinatorJob left, CoordinatorJob right) {
Date leftStart = left.getStartTime();
Date rightStart = right.getStartTime();
- return leftStart.compareTo(rightStart);
+ return rightStart.compareTo(leftStart);
}
});
}
+
private boolean canUpdateBundle(Entity oldEntity, Entity newEntity, boolean wfUpdated) throws FalconException {
return !wfUpdated && EntityUtil.equals(oldEntity, newEntity, BUNDLE_UPDATEABLE_PROPS);
}
@@ -1330,4 +1392,5 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
throw new FalconException(e);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index 0bdf94b..3b87469 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -19,9 +19,7 @@
package org.apache.falcon.resource;
import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.FalconWebException;
-import org.apache.falcon.Pair;
+import org.apache.falcon.*;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.entity.v0.Entity;
@@ -37,9 +35,7 @@ import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.io.IOException;
-import java.util.Date;
-import java.util.Properties;
-import java.util.Set;
+import java.util.*;
/**
* A base class for managing Entity's Instance operations.
@@ -61,14 +57,36 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
}
- public InstancesResult getRunningInstances(String type, String entity, String colo) {
+ protected List<LifeCycle> checkAndUpdateLifeCycle(List<LifeCycle> lifeCycleValues,
+ String type) throws FalconException {
+ EntityType entityType = EntityType.valueOf(type.toUpperCase().trim());
+ if (lifeCycleValues == null || lifeCycleValues.isEmpty()) {
+ List<LifeCycle> lifeCycles = new ArrayList<LifeCycle>();
+ if (entityType == EntityType.PROCESS) {
+ lifeCycles.add(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
+ } else if (entityType == EntityType.FEED) {
+ lifeCycles.add(LifeCycle.valueOf(LifeCycle.REPLICATION.name()));
+ }
+ return lifeCycles;
+ }
+ for (LifeCycle lifeCycle : lifeCycleValues) {
+ if (entityType != lifeCycle.getTag().getType()) {
+ throw new FalconException("Incorrect lifecycle: " + lifeCycle + "for given type: " + type);
+ }
+ }
+ return lifeCycleValues;
+ }
+
+ public InstancesResult getRunningInstances(String type, String entity,
+ String colo, List<LifeCycle> lifeCycles) {
checkColo(colo);
checkType(type);
try {
+ lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type);
validateNotEmpty("entityName", entity);
AbstractWorkflowEngine wfEngine = getWorkflowEngine();
Entity entityObject = EntityUtil.getEntity(type, entity);
- return wfEngine.getRunningInstances(entityObject);
+ return wfEngine.getRunningInstances(entityObject, lifeCycles);
} catch (Throwable e) {
LOG.error("Failed to get running instances", e);
throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST);
@@ -77,19 +95,19 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
public InstancesResult getStatus(String type, String entity, String startStr, String endStr,
- String colo) {
+ String colo, List<LifeCycle> lifeCycles) {
checkColo(colo);
checkType(type);
try {
+ lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type);
validateParams(type, entity, startStr, endStr);
-
Date start = EntityUtil.parseDateUTC(startStr);
Date end = getEndDate(start, endStr);
Entity entityObject = EntityUtil.getEntity(type, entity);
-
+ // LifeCycle lifeCycleObject = EntityUtil.getLifeCycle(lifeCycle);
AbstractWorkflowEngine wfEngine = getWorkflowEngine();
return wfEngine.getStatus(
- entityObject, start, end);
+ entityObject, start, end, lifeCycles);
} catch (Throwable e) {
LOG.error("Failed to get instances status", e);
throw FalconWebException
@@ -97,10 +115,12 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
}
- public InstancesSummaryResult getSummary(String type, String entity, String startStr, String endStr, String colo) {
+ public InstancesSummaryResult getSummary(String type, String entity, String startStr, String endStr,
+ String colo, List<LifeCycle> lifeCycles) {
checkColo(colo);
checkType(type);
try {
+ lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type);
validateParams(type, entity, startStr, endStr);
Date start = EntityUtil.parseDateUTC(startStr);
@@ -108,7 +128,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
Entity entityObject = EntityUtil.getEntity(type, entity);
AbstractWorkflowEngine wfEngine = getWorkflowEngine();
- return wfEngine.getSummary(entityObject, start, end);
+ return wfEngine.getSummary(entityObject, start, end, lifeCycles);
} catch (Throwable e) {
LOG.error("Failed to get instances status", e);
throw FalconWebException.newInstanceSummaryException(e, Response.Status.BAD_REQUEST);
@@ -116,11 +136,14 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
public InstancesResult getLogs(String type, String entity, String startStr,
- String endStr, String colo, String runId) {
+ String endStr, String colo, String runId,
+ List<LifeCycle> lifeCycles) {
+
try {
+ lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type);
// TODO getStatus does all validations and filters clusters
InstancesResult result = getStatus(type, entity, startStr, endStr,
- colo);
+ colo, lifeCycles);
LogProvider logProvider = new LogProvider();
Entity entityObject = EntityUtil.getEntity(type, entity);
for (Instance instance : result.getInstances()) {
@@ -135,11 +158,14 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
public InstancesResult killInstance(HttpServletRequest request,
- String type, String entity, String startStr, String endStr, String colo) {
+ String type, String entity, String startStr,
+ String endStr, String colo,
+ List<LifeCycle> lifeCycles) {
checkColo(colo);
checkType(type);
try {
+ lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type);
audit(request, entity, type, "INSTANCE_KILL");
validateParams(type, entity, startStr, endStr);
@@ -149,7 +175,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
Properties props = getProperties(request);
AbstractWorkflowEngine wfEngine = getWorkflowEngine();
- return wfEngine.killInstances(entityObject, start, end, props);
+ return wfEngine.killInstances(entityObject, start, end, props, lifeCycles);
} catch (Throwable e) {
LOG.error("Failed to kill instances", e);
throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST);
@@ -157,11 +183,14 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
public InstancesResult suspendInstance(HttpServletRequest request,
- String type, String entity, String startStr, String endStr, String colo) {
+ String type, String entity, String startStr,
+ String endStr, String colo,
+ List<LifeCycle> lifeCycles) {
checkColo(colo);
checkType(type);
try {
+ lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type);
audit(request, entity, type, "INSTANCE_SUSPEND");
validateParams(type, entity, startStr, endStr);
@@ -171,7 +200,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
Properties props = getProperties(request);
AbstractWorkflowEngine wfEngine = getWorkflowEngine();
- return wfEngine.suspendInstances(entityObject, start, end, props);
+ return wfEngine.suspendInstances(entityObject, start, end, props, lifeCycles);
} catch (Throwable e) {
LOG.error("Failed to suspend instances", e);
throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST);
@@ -179,11 +208,14 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
public InstancesResult resumeInstance(HttpServletRequest request,
- String type, String entity, String startStr, String endStr, String colo) {
+ String type, String entity, String startStr,
+ String endStr, String colo,
+ List<LifeCycle> lifeCycles) {
checkColo(colo);
checkType(type);
try {
+ lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type);
audit(request, entity, type, "INSTANCE_RESUME");
validateParams(type, entity, startStr, endStr);
@@ -193,19 +225,21 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
Properties props = getProperties(request);
AbstractWorkflowEngine wfEngine = getWorkflowEngine();
- return wfEngine.resumeInstances(entityObject, start, end, props);
+ return wfEngine.resumeInstances(entityObject, start, end, props, lifeCycles);
} catch (Throwable e) {
LOG.error("Failed to resume instances", e);
throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST);
}
}
- public InstancesResult reRunInstance(String type, String entity, String startStr, String endStr,
- HttpServletRequest request, String colo) {
+ public InstancesResult reRunInstance(String type, String entity, String startStr,
+ String endStr, HttpServletRequest request,
+ String colo, List<LifeCycle> lifeCycles) {
checkColo(colo);
checkType(type);
try {
+ lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type);
audit(request, entity, type, "INSTANCE_RERUN");
validateParams(type, entity, startStr, endStr);
@@ -215,7 +249,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
Properties props = getProperties(request);
AbstractWorkflowEngine wfEngine = getWorkflowEngine();
- return wfEngine.reRunInstances(entityObject, start, end, props);
+ return wfEngine.reRunInstances(entityObject, start, end, props, lifeCycles);
} catch (Exception e) {
LOG.error("Failed to rerun instances", e);
throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST);
@@ -244,7 +278,8 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
return end;
}
- private void validateParams(String type, String entity, String startStr, String endStr) throws FalconException {
+ private void validateParams(String type, String entity, String startStr,
+ String endStr) throws FalconException {
validateNotEmpty("entityType", type);
validateNotEmpty("entityName", entity);
validateNotEmpty("start", startStr);
@@ -272,8 +307,8 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
private void validateDateRangeFor(Entity entity, Pair<Date, String> clusterMinStart,
- Pair<Date, String> clusterMaxEnd, String start, String end)
- throws FalconException {
+ Pair<Date, String> clusterMaxEnd, String start,
+ String end) throws FalconException{
Date instStart = EntityUtil.parseDateUTC(start);
if (instStart.before(clusterMinStart.first)) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
index 407f39a..42b4aeb 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
@@ -21,6 +21,7 @@ package org.apache.falcon.resource.proxy;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconRuntimException;
import org.apache.falcon.FalconWebException;
+import org.apache.falcon.LifeCycle;
import org.apache.falcon.monitors.Dimension;
import org.apache.falcon.monitors.Monitored;
import org.apache.falcon.resource.APIResult;
@@ -75,14 +76,16 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Produces(MediaType.APPLICATION_JSON)
@Monitored(event = "running")
@Override
- public InstancesResult getRunningInstances(@Dimension("entityType") @PathParam("type") final String type,
- @Dimension("entityName") @PathParam("entity") final String entity,
- @Dimension("colo") @QueryParam("colo") String colo) {
+ public InstancesResult getRunningInstances(
+ @Dimension("entityType") @PathParam("type") final String type,
+ @Dimension("entityName") @PathParam("entity") final String entity,
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
return new InstanceProxy() {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).
- invoke("getRunningInstances", type, entity, colo);
+ invoke("getRunningInstances", type, entity, colo, lifeCycles);
}
}.execute(colo, type, entity);
}
@@ -92,16 +95,18 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Produces(MediaType.APPLICATION_JSON)
@Monitored(event = "instance-status")
@Override
- public InstancesResult getStatus(@Dimension("entityType") @PathParam("type") final String type,
- @Dimension("entityName") @PathParam("entity") final String entity,
- @Dimension("start-time") @QueryParam("start") final String startStr,
- @Dimension("end-time") @QueryParam("end") final String endStr,
- @Dimension("colo") @QueryParam("colo") final String colo) {
+ public InstancesResult getStatus(
+ @Dimension("entityType") @PathParam("type") final String type,
+ @Dimension("entityName") @PathParam("entity") final String entity,
+ @Dimension("start-time") @QueryParam("start") final String startStr,
+ @Dimension("end-time") @QueryParam("end") final String endStr,
+ @Dimension("colo") @QueryParam("colo") final String colo,
+ @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
return new InstanceProxy() {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("getStatus",
- type, entity, startStr, endStr, colo);
+ type, entity, startStr, endStr, colo, lifeCycles);
}
}.execute(colo, type, entity);
}
@@ -111,16 +116,18 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Produces(MediaType.APPLICATION_JSON)
@Monitored(event = "instance-summary")
@Override
- public InstancesSummaryResult getSummary(@Dimension("entityType") @PathParam("type") final String type,
- @Dimension("entityName") @PathParam("entity") final String entity,
- @Dimension("start-time") @QueryParam("start") final String startStr,
- @Dimension("end-time") @QueryParam("end") final String endStr,
- @Dimension("colo") @QueryParam("colo") final String colo) {
+ public InstancesSummaryResult getSummary(
+ @Dimension("entityType") @PathParam("type") final String type,
+ @Dimension("entityName") @PathParam("entity") final String entity,
+ @Dimension("start-time") @QueryParam("start") final String startStr,
+ @Dimension("end-time") @QueryParam("end") final String endStr,
+ @Dimension("colo") @QueryParam("colo") final String colo,
+ @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
return new InstanceSummaryProxy() {
@Override
protected InstancesSummaryResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("getSummary",
- type, entity, startStr, endStr, colo);
+ type, entity, startStr, endStr, colo, lifeCycles);
}
}.execute(colo, type, entity);
}
@@ -136,12 +143,13 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Dimension("start-time") @QueryParam("start") final String startStr,
@Dimension("end-time") @QueryParam("end") final String endStr,
@Dimension("colo") @QueryParam("colo") final String colo,
- @Dimension("run-id") @QueryParam("runid") final String runId) {
+ @Dimension("run-id") @QueryParam("runid") final String runId,
+ @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
return new InstanceProxy() {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("getLogs",
- type, entity, startStr, endStr, colo, runId);
+ type, entity, startStr, endStr, colo, runId, lifeCycles);
}
}.execute(colo, type, entity);
}
@@ -151,19 +159,21 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Produces(MediaType.APPLICATION_JSON)
@Monitored(event = "kill-instance")
@Override
- public InstancesResult killInstance(@Context HttpServletRequest request,
- @Dimension("entityType") @PathParam("type") final String type,
- @Dimension("entityName") @PathParam("entity") final String entity,
- @Dimension("start-time") @QueryParam("start") final String startStr,
- @Dimension("end-time") @QueryParam("end") final String endStr,
- @Dimension("colo") @QueryParam("colo") final String colo) {
+ public InstancesResult killInstance(
+ @Context HttpServletRequest request,
+ @Dimension("entityType") @PathParam("type") final String type,
+ @Dimension("entityName") @PathParam("entity") final String entity,
+ @Dimension("start-time") @QueryParam("start") final String startStr,
+ @Dimension("end-time") @QueryParam("end") final String endStr,
+ @Dimension("colo") @QueryParam("colo") final String colo,
+ @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
return new InstanceProxy() {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("killInstance",
- bufferedRequest, type, entity, startStr, endStr, colo);
+ bufferedRequest, type, entity, startStr, endStr, colo, lifeCycles);
}
}.execute(colo, type, entity);
}
@@ -173,18 +183,20 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Produces(MediaType.APPLICATION_JSON)
@Monitored(event = "suspend-instance")
@Override
- public InstancesResult suspendInstance(@Context HttpServletRequest request,
- @Dimension("entityType") @PathParam("type") final String type,
- @Dimension("entityName") @PathParam("entity") final String entity,
- @Dimension("start-time") @QueryParam("start") final String startStr,
- @Dimension("end-time") @QueryParam("end") final String endStr,
- @Dimension("colo") @QueryParam("colo") String colo) {
+ public InstancesResult suspendInstance(
+ @Context HttpServletRequest request,
+ @Dimension("entityType") @PathParam("type") final String type,
+ @Dimension("entityName") @PathParam("entity") final String entity,
+ @Dimension("start-time") @QueryParam("start") final String startStr,
+ @Dimension("end-time") @QueryParam("end") final String endStr,
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
return new InstanceProxy() {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("suspendInstance",
- bufferedRequest, type, entity, startStr, endStr, colo);
+ bufferedRequest, type, entity, startStr, endStr, colo, lifeCycles);
}
}.execute(colo, type, entity);
}
@@ -194,19 +206,21 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Produces(MediaType.APPLICATION_JSON)
@Monitored(event = "resume-instance")
@Override
- public InstancesResult resumeInstance(@Context HttpServletRequest request,
- @Dimension("entityType") @PathParam("type") final String type,
- @Dimension("entityName") @PathParam("entity") final String entity,
- @Dimension("start-time") @QueryParam("start") final String startStr,
- @Dimension("end-time") @QueryParam("end") final String endStr,
- @Dimension("colo") @QueryParam("colo") String colo) {
+ public InstancesResult resumeInstance(
+ @Context HttpServletRequest request,
+ @Dimension("entityType") @PathParam("type") final String type,
+ @Dimension("entityName") @PathParam("entity") final String entity,
+ @Dimension("start-time") @QueryParam("start") final String startStr,
+ @Dimension("end-time") @QueryParam("end") final String endStr,
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
return new InstanceProxy() {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("resumeInstance",
- bufferedRequest, type, entity, startStr, endStr, colo);
+ bufferedRequest, type, entity, startStr, endStr, colo, lifeCycles);
}
}.execute(colo, type, entity);
}
@@ -216,19 +230,21 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Produces(MediaType.APPLICATION_JSON)
@Monitored(event = "re-run-instance")
@Override
- public InstancesResult reRunInstance(@Dimension("entityType") @PathParam("type") final String type,
- @Dimension("entityName") @PathParam("entity") final String entity,
- @Dimension("start-time") @QueryParam("start") final String startStr,
- @Dimension("end-time") @QueryParam("end") final String endStr,
- @Context HttpServletRequest request,
- @Dimension("colo") @QueryParam("colo") String colo) {
+ public InstancesResult reRunInstance(
+ @Dimension("entityType") @PathParam("type") final String type,
+ @Dimension("entityName") @PathParam("entity") final String entity,
+ @Dimension("start-time") @QueryParam("start") final String startStr,
+ @Dimension("end-time") @QueryParam("end") final String endStr,
+ @Context HttpServletRequest request,
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
return new InstanceProxy() {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("reRunInstance",
- type, entity, startStr, endStr, bufferedRequest, colo);
+ type, entity, startStr, endStr, bufferedRequest, colo, lifeCycles);
}
}.execute(colo, type, entity);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/089ecf41/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
index 6e2fad5..9512fa8 100644
--- a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
@@ -78,8 +78,6 @@ public class EmbeddedCluster {
private static EmbeddedCluster createClusterAsUser(String name, boolean global) throws IOException {
EmbeddedCluster cluster = new EmbeddedCluster();
- cluster.conf.set("jail.base", System.getProperty("hadoop.tmp.dir",
- cluster.conf.get("hadoop.tmp.dir", "/tmp")));
cluster.conf.set("fs.default.name", "jail://" + (global ? "global" : name) + ":00");
String hdfsUrl = cluster.conf.get("fs.default.name");