You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/08/28 20:13:15 UTC
git commit: FALCON-474 Add Bulk APIs to drive the dashboard needs.
Contributed by Balu Vellanki
Repository: incubator-falcon
Updated Branches:
refs/heads/master e21ec9367 -> 305feb0b4
FALCON-474 Add Bulk APIs to drive the dashboard needs. Contributed by Balu Vellanki
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/305feb0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/305feb0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/305feb0b
Branch: refs/heads/master
Commit: 305feb0b4deb9ba949631ccd5963772848b4981b
Parents: e21ec93
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Thu Aug 28 11:13:11 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Thu Aug 28 11:13:11 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../java/org/apache/falcon/cli/FalconCLI.java | 40 +++-
.../org/apache/falcon/client/FalconClient.java | 78 ++++++-
.../org/apache/falcon/resource/EntityList.java | 2 +-
.../falcon/resource/EntitySummaryResult.java | 220 +++++++++++++++++++
.../org/apache/falcon/entity/EntityUtil.java | 47 ++++
docs/src/site/twiki/FalconCLI.twiki | 13 ++
docs/src/site/twiki/restapi/EntityList.twiki | 2 +-
docs/src/site/twiki/restapi/EntitySummary.twiki | 72 ++++++
docs/src/site/twiki/restapi/ResourceList.twiki | 1 +
.../falcon/resource/AbstractEntityManager.java | 131 ++++++-----
.../resource/AbstractInstanceManager.java | 2 +-
.../AbstractSchedulableEntityManager.java | 107 ++++++++-
.../proxy/SchedulableEntityManagerProxy.java | 24 ++
.../falcon/resource/EntityManagerTest.java | 30 ++-
.../resource/SchedulableEntityManager.java | 23 ++
.../java/org/apache/falcon/cli/FalconCLIIT.java | 25 +++
.../falcon/resource/EntityManagerJerseyIT.java | 13 ++
.../org/apache/falcon/resource/TestContext.java | 5 +-
19 files changed, 758 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8bd1ff9..a358be4 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,9 @@ Trunk (Unreleased)
FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
IMPROVEMENTS
+ FALCON-474 Add Bulk APIs to drive the dashboard needs (Balu Vellanki via
+ Venkatesh Seetharam)
+
FALCON-166 Instance status start and end dates are rigid and inconvenient
(Balu Vellanki via Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index cb46d46..f5b30f0 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -65,6 +65,7 @@ public class FalconCLI {
public static final String ENTITY_CMD = "entity";
public static final String ENTITY_TYPE_OPT = "type";
public static final String COLO_OPT = "colo";
+ public static final String CLUSTER_OPT = "cluster";
public static final String ENTITY_NAME_OPT = "name";
public static final String FILE_PATH_OPT = "file";
public static final String SUBMIT_OPT = "submit";
@@ -87,6 +88,7 @@ public class FalconCLI {
public static final String ORDER_BY_OPT = "orderBy";
public static final String OFFSET_OPT = "offset";
public static final String NUM_RESULTS_OPT = "numResults";
+ public static final String NUM_INSTANCES_OPT = "numInstances";
public static final String INSTANCE_CMD = "instance";
public static final String START_OPT = "start";
@@ -339,6 +341,9 @@ public class FalconCLI {
String entityName = commandLine.getOptionValue(ENTITY_NAME_OPT);
String filePath = commandLine.getOptionValue(FILE_PATH_OPT);
String colo = commandLine.getOptionValue(COLO_OPT);
+ String cluster = commandLine.getOptionValue(CLUSTER_OPT);
+ String start = commandLine.getOptionValue(START_OPT);
+ String end = commandLine.getOptionValue(END_OPT);
String time = commandLine.getOptionValue(EFFECTIVE_OPT);
String orderBy = commandLine.getOptionValue(ORDER_BY_OPT);
String filterBy = commandLine.getOptionValue(FILTER_BY_OPT);
@@ -347,7 +352,9 @@ public class FalconCLI {
Integer offset = parseIntegerInput(commandLine.getOptionValue(OFFSET_OPT), 0, "offset");
Integer numResults = parseIntegerInput(commandLine.getOptionValue(NUM_RESULTS_OPT),
FalconClient.DEFAULT_NUM_RESULTS, "numResults");
+ Integer numInstances = parseIntegerInput(commandLine.getOptionValue(NUM_INSTANCES_OPT), 7, "numInstances");
validateEntityType(entityType);
+ String entityAction = "entity";
if (optionsList.contains(SUBMIT_OPT)) {
validateFilePath(filePath);
@@ -398,11 +405,18 @@ public class FalconCLI {
} else if (optionsList.contains(LIST_OPT)) {
validateColo(optionsList);
validateEntityFields(fields);
- validateOrderBy(orderBy, "entity");
- validateFilterBy(filterBy, "entity");
+ validateOrderBy(orderBy, entityAction);
+ validateFilterBy(filterBy, entityAction);
EntityList entityList = client.getEntityList(entityType, fields, filterBy,
filterTags, orderBy, offset, numResults);
result = entityList != null ? entityList.toString() : "No entity of type (" + entityType + ") found.";
+ } else if (optionsList.contains(SUMMARY_OPT)) {
+ validateCluster(cluster);
+ validateEntityFields(fields);
+ validateFilterBy(filterBy, entityAction);
+ validateOrderBy(orderBy, entityAction);
+ result = client.getEntitySummary(entityType, cluster, start, end, fields, filterBy, filterTags,
+ orderBy, offset, numResults, numInstances);
} else if (optionsList.contains(HELP_CMD)) {
OUT.get().println("Falcon Help");
} else {
@@ -411,6 +425,12 @@ public class FalconCLI {
OUT.get().println(result);
}
+ private void validateCluster(String cluster) throws FalconCLIException {
+ if (StringUtils.isEmpty(cluster)) {
+ throw new FalconCLIException("Missing argument: cluster");
+ }
+ }
+
private String getColo(String colo) throws FalconCLIException, IOException {
if (colo == null) {
Properties prop = getClientProperties();
@@ -565,6 +585,8 @@ public class FalconCLI {
"Gets the dependencies of entity");
Option list = new Option(LIST_OPT, false,
"List entities registerd for a type");
+ Option entitySummary = new Option(SUMMARY_OPT, false,
+ "Get summary of instances for list of entities");
OptionGroup group = new OptionGroup();
group.addOption(submit);
@@ -579,6 +601,7 @@ public class FalconCLI {
group.addOption(definition);
group.addOption(dependency);
group.addOption(list);
+ group.addOption(entitySummary);
Option url = new Option(URL_OPTION, true, "Falcon URL");
Option entityType = new Option(ENTITY_TYPE_OPT, true,
@@ -588,8 +611,10 @@ public class FalconCLI {
"Path to entity xml file");
Option entityName = new Option(ENTITY_NAME_OPT, true,
"Entity type, can be cluster, feed or process xml");
- Option colo = new Option(COLO_OPT, true,
- "Colo name");
+ Option start = new Option(START_OPT, true, "Start time is optional for summary");
+ Option end = new Option(END_OPT, true, "End time is optional for summary");
+ Option colo = new Option(COLO_OPT, true, "Colo name");
+ Option cluster = new Option(CLUSTER_OPT, true, "Cluster name");
colo.setRequired(false);
Option effective = new Option(EFFECTIVE_OPT, true, "Effective time for update");
Option fields = new Option(FIELDS_OPT, true, "Entity fields to show for a request");
@@ -602,6 +627,8 @@ public class FalconCLI {
"Start returning entities from this offset");
Option numResults = new Option(NUM_RESULTS_OPT, true,
"Number of results to return per request");
+ Option numInstances = new Option(NUM_INSTANCES_OPT, true,
+ "Number of instances to return per entity summary request");
entityOptions.addOption(url);
entityOptions.addOptionGroup(group);
@@ -609,6 +636,9 @@ public class FalconCLI {
entityOptions.addOption(entityName);
entityOptions.addOption(filePath);
entityOptions.addOption(colo);
+ entityOptions.addOption(cluster);
+ entityOptions.addOption(start);
+ entityOptions.addOption(end);
entityOptions.addOption(effective);
entityOptions.addOption(fields);
entityOptions.addOption(filterBy);
@@ -616,6 +646,7 @@ public class FalconCLI {
entityOptions.addOption(orderBy);
entityOptions.addOption(offset);
entityOptions.addOption(numResults);
+ entityOptions.addOption(numInstances);
return entityOptions;
}
@@ -672,7 +703,6 @@ public class FalconCLI {
false,
"Displays the workflow parameters for a given instance of specified nominal time");
-
OptionGroup group = new OptionGroup();
group.addOption(running);
group.addOption(list);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 5e9543c..619955b 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -31,6 +31,7 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.EntityList;
+import org.apache.falcon.resource.EntitySummaryResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesSummaryResult;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
@@ -180,6 +181,7 @@ public class FalconClient {
STATUS("api/entities/status/", HttpMethod.GET, MediaType.TEXT_XML),
DEFINITION("api/entities/definition/", HttpMethod.GET, MediaType.TEXT_XML),
LIST("api/entities/list/", HttpMethod.GET, MediaType.TEXT_XML),
+ SUMMARY("api/entities/summary", HttpMethod.GET, MediaType.APPLICATION_JSON),
DEPENDENCY("api/entities/dependencies/", HttpMethod.GET, MediaType.TEXT_XML);
private String path;
@@ -333,13 +335,22 @@ public class FalconClient {
return sendDependencyRequest(Entities.DEPENDENCY, entityType, entityName);
}
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+
public EntityList getEntityList(String entityType, String fields, String filterBy, String filterTags,
String orderBy, Integer offset, Integer numResults) throws FalconCLIException {
return sendListRequest(Entities.LIST, entityType, fields, filterBy,
filterTags, orderBy, offset, numResults);
}
- //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+ public String getEntitySummary(String entityType, String cluster, String start, String end,
+ String fields, String filterBy, String filterTags,
+ String orderBy, Integer offset, Integer numResults, Integer numInstances)
+ throws FalconCLIException {
+ return sendEntitySummaryRequest(Entities.SUMMARY, entityType, cluster, start, end, fields, filterBy, filterTags,
+ orderBy, offset, numResults, numInstances);
+ }
+
public String getRunningInstances(String type, String entity, String colo, List<LifeCycle> lifeCycles,
String filterBy, String orderBy, Integer offset, Integer numResults)
throws FalconCLIException {
@@ -445,7 +456,6 @@ public class FalconClient {
return sendInstanceRequest(Instances.PARAMS, type, entity,
start, null, null, null, colo, lifeCycles);
}
- //RESUME CHECKSTYLE CHECK ParameterNumberCheck
public String getThreadDump() throws FalconCLIException {
return sendAdminRequest(AdminOperations.STACK);
@@ -523,6 +533,51 @@ public class FalconClient {
return parseAPIResult(clientResponse);
}
+ private String sendEntitySummaryRequest(Entities entities, String entityType, String cluster,
+ String start, String end,
+ String fields, String filterBy, String filterTags,
+ String orderBy, Integer offset, Integer numResults,
+ Integer numInstances) throws FalconCLIException {
+ WebResource resource;
+ if (StringUtils.isEmpty(cluster)) {
+ resource = service.path(entities.path).path(entityType);
+ } else {
+ resource = service.path(entities.path).path(entityType).path(cluster);
+ }
+
+ if (!StringUtils.isEmpty(fields)) {
+ resource = resource.queryParam("fields", fields);
+ }
+ if (!StringUtils.isEmpty(filterTags)) {
+ resource = resource.queryParam("tags", filterTags);
+ }
+ if (!StringUtils.isEmpty(filterBy)) {
+ resource = resource.queryParam("filterBy", filterBy);
+ }
+ if (!StringUtils.isEmpty(orderBy)) {
+ resource = resource.queryParam("orderBy", orderBy);
+ }
+ if (!StringUtils.isEmpty(start)) {
+ resource = resource.queryParam("start", start);
+ }
+ if (!StringUtils.isEmpty(end)) {
+ resource = resource.queryParam("end", end);
+ }
+
+ resource = resource.queryParam("offset", offset.toString());
+ resource = resource.queryParam("numResults", numResults.toString());
+ resource = resource.queryParam("numInstances", numInstances.toString());
+
+ ClientResponse clientResponse = resource
+ .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+ .accept(entities.mimeType).type(MediaType.TEXT_XML)
+ .method(entities.method, ClientResponse.class);
+
+ checkIfSuccessful(clientResponse);
+ return parseProcessEntitySummaryResult(clientResponse);
+ }
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
private String sendDefinitionRequest(Entities entities, String entityType,
String entityName) throws FalconCLIException {
@@ -724,6 +779,25 @@ public class FalconClient {
return clientResponse.getEntity(String.class);
}
+ private String parseProcessEntitySummaryResult(ClientResponse clientResponse) {
+ EntitySummaryResult result = clientResponse.getEntity(EntitySummaryResult.class);
+ StringBuilder sb = new StringBuilder();
+ String toAppend;
+ sb.append("Consolidated Status: ").append(result.getStatus()).append("\n");
+ sb.append("\nEntity Summary Result :\n");
+ if (result.getEntitySummaries() != null) {
+ for (EntitySummaryResult.EntitySummary entitySummary : result.getEntitySummaries()) {
+
+ toAppend = entitySummary.toString();
+ sb.append(toAppend).append("\n");
+ }
+ }
+ sb.append("\nAdditional Information:\n");
+ sb.append("Response: ").append(result.getMessage());
+ sb.append("Request Id: ").append(result.getRequestId());
+ return sb.toString();
+ }
+
private String summarizeProcessInstanceResult(ClientResponse clientResponse) {
InstancesSummaryResult result = clientResponse
.getEntity(InstancesSummaryResult.class);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/client/src/main/java/org/apache/falcon/resource/EntityList.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/EntityList.java b/client/src/main/java/org/apache/falcon/resource/EntityList.java
index f67b84b..243c119 100644
--- a/client/src/main/java/org/apache/falcon/resource/EntityList.java
+++ b/client/src/main/java/org/apache/falcon/resource/EntityList.java
@@ -51,7 +51,7 @@ public class EntityList {
* Filter by these Fields is supported by RestAPI.
*/
public static enum EntityFilterByFields {
- TYPE, NAME, STATUS, PIPELINES
+ TYPE, NAME, STATUS, PIPELINES, CLUSTER
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java b/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java
new file mode 100644
index 0000000..4a885ec
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Arrays;
+import java.util.Date;
+
+/**
+ * Pojo for JAXB marshalling / unmarshalling.
+ */
+//SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
+@XmlRootElement
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
+public class EntitySummaryResult extends APIResult {
+
+ /**
+ * Workflow status as being set in result object.
+ */
+ public static enum WorkflowStatus {
+ WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR
+ }
+
+ @XmlElement
+ private EntitySummary[] entitySummaries;
+
+ //For JAXB
+ public EntitySummaryResult() {
+ super();
+ }
+
+ public EntitySummaryResult(String message, EntitySummary[] entitySummaries) {
+ this(Status.SUCCEEDED, message, entitySummaries);
+ }
+
+ public EntitySummaryResult(Status status, String message, EntitySummary[] entitySummaries) {
+ super(status, message);
+ this.entitySummaries = entitySummaries;
+ }
+
+ public EntitySummaryResult(Status status, String message) {
+ super(status, message);
+ }
+
+ public EntitySummary[] getEntitySummaries() {
+ return this.entitySummaries;
+ }
+
+ public void setEntitySummaries(EntitySummary[] entitySummaries) {
+ this.entitySummaries = entitySummaries;
+ }
+
+ /**
+ * A single entity object inside entity summary result.
+ */
+ @XmlRootElement(name = "entitySummary")
+ public static class EntitySummary {
+
+ @XmlElement
+ public String type;
+ @XmlElement
+ public String name;
+ @XmlElement
+ public String status;
+ @XmlElement
+ public String[] tags;
+ @XmlElement
+ public String[] pipelines;
+ @XmlElement
+ public Instance[] instances;
+
+ public EntitySummary() {
+ }
+
+ public EntitySummary(String entityName, String entityType) {
+ this.name = entityName;
+ this.type = entityType;
+ }
+
+ public EntitySummary(String name, String type, String status,
+ String[] tags, String[] pipelines,
+ Instance[] instances) {
+ this.name = name;
+ this.type = type;
+ this.status = status;
+ this.pipelines = pipelines;
+ this.tags = tags;
+ this.instances = instances;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ public String getType() {
+ return this.type;
+ }
+
+ public String getStatus() {
+ return this.status;
+ }
+
+ public String[] getTags() {
+ return this.tags;
+ }
+
+ public String[] getPipelines() {
+ return this.pipelines;
+ }
+
+ public Instance[] getInstances() {
+ return this.instances;
+ }
+
+ @Override
+ public String toString() {
+ return "{Entity: " + (this.name == null ? "" : this.name)
+ + ", Type: " + (this.type == null ? "" : this.type)
+ + ", Status: " + (this.status == null ? "" : this.status)
+ + ", Tags: " + (this.tags == null ? "[]" : Arrays.toString(this.tags))
+ + ", Pipelines: " + (this.pipelines == null ? "[]" : Arrays.toString(this.pipelines))
+ + ", InstanceSummary: " + (this.instances == null ? "[]" : Arrays.toString(this.instances))
+ +"}";
+ }
+ }
+
+ /**
+ * A single instance object inside instance result.
+ */
+ @XmlRootElement(name = "instances")
+ public static class Instance {
+ @XmlElement
+ public String instance;
+
+ @XmlElement
+ public WorkflowStatus status;
+
+ @XmlElement
+ public String logFile;
+
+ @XmlElement
+ public String cluster;
+
+ @XmlElement
+ public String sourceCluster;
+
+ @XmlElement
+ public Date startTime;
+
+ @XmlElement
+ public Date endTime;
+
+ public Instance() {
+ }
+
+ public Instance(String cluster, String instance, WorkflowStatus status) {
+ this.cluster = cluster;
+ this.instance = instance;
+ this.status = status;
+ }
+
+ public String getInstance() {
+ return instance;
+ }
+
+ public WorkflowStatus getStatus() {
+ return status;
+ }
+
+ public String getLogFile() {
+ return logFile;
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ public String getSourceCluster() {
+ return sourceCluster;
+ }
+
+ public Date getStartTime() {
+ return startTime;
+ }
+
+ public Date getEndTime() {
+ return endTime;
+ }
+
+ @Override
+ public String toString() {
+ return "{instance: " + (this.instance == null ? "" : this.instance)
+ + ", status: " + (this.status == null ? "" : this.status)
+ + (this.logFile == null ? "" : ", log: " + this.logFile)
+ + (this.sourceCluster == null ? "" : ", source-cluster: " + this.sourceCluster)
+ + (this.cluster == null ? "" : ", cluster: " + this.cluster)
+ + (this.startTime == null ? "" : ", startTime: " + this.startTime)
+ + (this.endTime == null ? "" : ", endTime: " + this.endTime)
+ + "}";
+ }
+ }
+}
+//RESUME CHECKSTYLE CHECK VisibilityModifierCheck
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index e75f28e..8f258fb 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -21,6 +21,7 @@ package org.apache.falcon.entity;
import org.apache.commons.beanutils.PropertyUtils;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Pair;
import org.apache.falcon.Tag;
@@ -688,6 +689,52 @@ public final class EntityUtil {
return Storage.TYPE.TABLE == storageType;
}
+ public static List<String> getTags(Entity entity) {
+ String rawTags = null;
+
+ switch (entity.getEntityType()) {
+ case PROCESS:
+ rawTags = ((Process) entity).getTags();
+ break;
+
+ case FEED:
+ rawTags = ((Feed) entity).getTags();
+ break;
+
+ case CLUSTER:
+ rawTags = ((Cluster) entity).getTags();
+ break;
+
+ default:
+ break;
+ }
+
+ List<String> tags = new ArrayList<String>();
+ if (!StringUtils.isEmpty(rawTags)) {
+ for(String tag : rawTags.split(",")) {
+ tags.add(tag.trim());
+ }
+ }
+
+ return tags;
+ }
+
+ public static List<String> getPipelines(Entity entity) {
+ List<String> pipelines = new ArrayList<String>();
+
+ if (entity.getEntityType().equals(EntityType.PROCESS)) {
+ Process process = (Process) entity;
+ String pipelineString = process.getPipelines();
+ if (pipelineString != null) {
+ for (String pipeline : pipelineString.split(",")) {
+ pipelines.add(pipeline.trim());
+ }
+ }
+ } // else : Pipelines are only set for Process entities
+
+ return pipelines;
+ }
+
public static Pair<Date, Date> getEntityStartEndDates(Entity entityObject) {
Set<String> clusters = EntityUtil.getClustersDefined(entityObject);
Pair<Date, String> clusterMinStartDate = null;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index 77a306e..4ea80c1 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -56,6 +56,19 @@ Optional Args : -fields <<field1,field2>> -filterBy <<field1:value1,field2:value
<a href="./Restapi/EntityList.html">Optional params described here.</a>
+---+++Summary
+
+Summary of entities of a particular type and a cluster will be listed. Entity summary has N most recent instances of entity.
+
+Usage:
+$FALCON_HOME/bin/falcon entity -type [cluster|feed|process] -summary
+
+Optional Args : -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'" -fields <<field1,field2>>
+-filterBy <<field1:value1,field2:value2>> -tags <<tagkey=tagvalue,tagkey=tagvalue>>
+-orderBy <<field>> -offset 0 -numResults 10 -numInstances 7
+
+<a href="./Restapi/EntitySummary.html">Optional params described here.</a>
+
---+++Update
Update operation allows an already submitted/scheduled entity to be updated. Cluster update is currently
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/docs/src/site/twiki/restapi/EntityList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityList.twiki b/docs/src/site/twiki/restapi/EntityList.twiki
index 0697561..353007c 100644
--- a/docs/src/site/twiki/restapi/EntityList.twiki
+++ b/docs/src/site/twiki/restapi/EntityList.twiki
@@ -12,7 +12,7 @@ Get list of the entities.
* fields <optional param> Fields of entity that the user wants to view, separated by commas.
* Valid options are STATUS, TAGS, PIPELINES.
* filterBy <optional param> Filter results by list of field:value pairs. Example: filterBy=STATUS:RUNNING,PIPELINES:clickLogs
- * Supported filter fields are NAME, STATUS, PIPELINES.
+ * Supported filter fields are NAME, STATUS, PIPELINES, CLUSTER.
* Query will do an AND among filterBy fields.
* tags <optional param> Return list of entities that have specified tags, separated by a comma. Query will do AND on tag values.
* Example: tags=consumer=consumer@xyz.com,owner=producer@xyz.com
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/docs/src/site/twiki/restapi/EntitySummary.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntitySummary.twiki b/docs/src/site/twiki/restapi/EntitySummary.twiki
new file mode 100644
index 0000000..6e6ddf4
--- /dev/null
+++ b/docs/src/site/twiki/restapi/EntitySummary.twiki
@@ -0,0 +1,72 @@
+---++ GET /api/entities/summary/:entity-type/:cluster
+ * <a href="#Description">Description</a>
+ * <a href="#Parameters">Parameters</a>
+ * <a href="#Results">Results</a>
+ * <a href="#Examples">Examples</a>
+
+---++ Description
+Given an EntityType and cluster, get list of entities along with summary of N recent instances of each entity
+
+---++ Parameters
+ * :entity-type Valid options are cluster, feed or process.
+ * :cluster Show entities that belong to this cluster.
+ * start <optional param> Show entity summaries from this date. Date format is yyyy-MM-dd'T'HH:mm'Z'.
+ * By default, it is set to (end - 2 days).
+ * end <optional param> Show entity summary up to this date. Date format is yyyy-MM-dd'T'HH:mm'Z'.
+ * Default is set to now.
+ * fields <optional param> Fields of entity that the user wants to view, separated by commas.
+ * Valid options are STATUS, TAGS, PIPELINES.
+ * filterBy <optional param> Filter results by list of field:value pairs. Example: filterBy=STATUS:RUNNING,PIPELINES:clickLogs
+ * Supported filter fields are NAME, STATUS, PIPELINES, CLUSTER.
+ * Query will do an AND among filterBy fields.
+ * tags <optional param> Return list of entities that have specified tags, separated by a comma. Query will do AND on tag values.
+ * Example: tags=consumer=consumer@xyz.com,owner=producer@xyz.com
+ * orderBy <optional param> Field by which results should be ordered.
+ * Supports ordering by "name".
+ * offset <optional param> Show results from the offset, used for pagination. Defaults to 0.
+ * numResults <optional param> Number of results to show per request, used for pagination. Only integers > 0 are valid, Default is 10.
+ * numInstances <optional param> Number of recent instances to show per entity. Only integers > 0 are valid, Default is 7.
+
+---++ Results
+Show entities along with summary of N instances for each entity.
+
+---++ Examples
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/entities/summary/feed/primary-cluster?filterBy=STATUS:RUNNING&fields=status&tags=consumer=consumer@xyz.com&orderBy=name&offset=0&numResults=1&numInstances=2
+</verbatim>
+---+++ Result
+<verbatim>
+{
+ "entitySummary": [
+ {
+ "name" : "SampleOutput",
+ "type" : "feed",
+ "status": "RUNNING",
+ "instances": [
+ {
+ "details": "",
+ "endTime": "2013-10-21T14:40:26-07:00",
+ "startTime": "2013-10-21T14:39:56-07:00",
+ "cluster": "primary-cluster",
+ "logFile": "http:\/\/localhost:11000\/oozie?job=0000070-131021115933395-oozie-rgau-W",
+ "status": "RUNNING",
+ "instance": "2012-04-03T07:00Z"
+ },
+ {
+ "details": "",
+ "endTime": "2013-10-21T14:42:27-07:00",
+ "startTime": "2013-10-21T14:41:57-07:00",
+ "cluster": "primary-cluster",
+ "logFile": "http:\/\/localhost:11000\/oozie?job=0000070-131021115933397-oozie-rgau-W",
+ "status": "RUNNING",
+ "instance": "2012-04-03T08:00Z"
+ },
+ ]
+ }
+ ]
+ "requestId": "default\/e15bb378-d09f-4911-9df2-5334a45153d2\n",
+ "message": "default\/STATUS\n",
+ "status": "SUCCEEDED"
+}
+</verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/docs/src/site/twiki/restapi/ResourceList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki
index 9284810..d9cb3cb 100644
--- a/docs/src/site/twiki/restapi/ResourceList.twiki
+++ b/docs/src/site/twiki/restapi/ResourceList.twiki
@@ -49,6 +49,7 @@ See also: [[../Security.twiki][Security in Falcon]]
| GET | [[EntityStatus][api/entities/status/:entity-type/:entity-name]] | Get the status of the entity |
| GET | [[EntityDefinition][api/entities/definition/:entity-type/:entity-name]] | Get the definition of the entity |
| GET | [[EntityList][api/entities/list/:entity-type]] | Get the list of entities |
+| GET | [[EntitySummary][api/entities/summary/:entity-type/:cluster]] | Get instance summary of all entities |
| GET | [[EntityDependencies][api/entities/dependencies/:entity-type/:entity-name]] | Get the dependencies of the entity |
---++ REST Call on Feed and Process Instances
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index a6d1b29..d12dede 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -20,6 +20,7 @@ package org.apache.falcon.resource;
import org.apache.commons.beanutils.PropertyUtils;
import org.apache.commons.lang.ObjectUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconRuntimException;
import org.apache.falcon.FalconWebException;
@@ -33,8 +34,6 @@ import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.store.EntityAlreadyExistsException;
import org.apache.falcon.entity.v0.*;
import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.resource.APIResult.Status;
import org.apache.falcon.resource.EntityList.EntityElement;
import org.apache.falcon.security.CurrentUser;
@@ -44,7 +43,6 @@ import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.hadoop.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -461,27 +459,46 @@ public abstract class AbstractEntityManager {
}
}
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
/**
* Returns the list of entities registered of a given type.
*
* @param type Only return entities of this type
* @param fieldStr fields that the query is interested in, separated by comma
* @param filterBy filter by a specific field.
- * @param offset Pagination offset
- * @param resultsPerPage Number of results that should be returned starting at the offset
+ * @param filterTags filter by these tags.
+ * @param orderBy order result by these fields.
+ * @param offset Pagination offset.
+ * @param resultsPerPage Number of results that should be returned starting at the offset.
* @return EntityList
*/
public EntityList getEntityList(String type, String fieldStr, String filterBy, String filterTags,
String orderBy, Integer offset, Integer resultsPerPage) {
HashSet<String> fields = new HashSet<String>(Arrays.asList(fieldStr.toLowerCase().split(",")));
+ List<Entity> entities;
+ try {
+ entities = getEntities(type, "", "", "", filterBy, filterTags, orderBy, offset, resultsPerPage);
+ } catch (Exception e) {
+ LOG.error("Failed to get entity list", e);
+ throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
+ }
+
+ return entities.size() == 0
+ ? new EntityList(new Entity[]{})
+ : new EntityList(buildEntityElements(fields, entities));
+ }
+
+ protected List<Entity> getEntities(String type, String startDate, String endDate, String cluster,
+ String filterBy, String filterTags, String orderBy,
+ int offset, int resultsPerPage) throws FalconException {
final HashMap<String, String> filterByFieldsValues = getFilterByFieldsValues(filterBy);
final ArrayList<String> filterByTags = getFilterByTags(filterTags);
EntityType entityType = EntityType.valueOf(type.toUpperCase());
Collection<String> entityNames = configStore.getEntities(entityType);
if (entityNames == null || entityNames.isEmpty()) {
- return new EntityList(new Entity[]{});
+ return Collections.emptyList();
}
ArrayList<Entity> entities = new ArrayList<Entity>();
@@ -497,8 +514,12 @@ public abstract class AbstractEntityManager {
throw FalconWebException.newException(e1, Response.Status.BAD_REQUEST);
}
- List<String> tags = getTags(entity);
- List<String> pipelines = getPipelines(entity);
+ if (filterEntityByDatesAndCluster(entity, startDate, endDate, cluster)) {
+ continue;
+ }
+
+ List<String> tags = EntityUtil.getTags(entity);
+ List<String> pipelines = EntityUtil.getPipelines(entity);
String entityStatus = getStatusString(entity);
if (filterEntity(entity, entityStatus,
@@ -512,10 +533,37 @@ public abstract class AbstractEntityManager {
int pageCount = getRequiredNumberOfResults(entities.size(), offset, resultsPerPage);
if (pageCount == 0) { // handle pagination
- return new EntityList(new Entity[]{});
+ return new ArrayList<Entity>();
+ }
+
+ return new ArrayList<Entity>(entities.subList(offset, (offset + pageCount)));
+ }
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+ private boolean filterEntityByDatesAndCluster(Entity entity, String startDate, String endDate, String cluster)
+ throws FalconException {
+ if (StringUtils.isEmpty(cluster)) {
+ return false; // no filtering necessary on cluster
+ }
+ Set<String> clusters = EntityUtil.getClustersDefined(entity);
+ if (!clusters.contains(cluster)) {
+ return true; // entity does not have this cluster
+ }
+
+ if (!StringUtils.isEmpty(startDate)) {
+ Date parsedDate = EntityUtil.parseDateUTC(startDate);
+ if (parsedDate.after(EntityUtil.getEndTime(entity, cluster))) {
+ return true;
+ }
+ }
+ if (!StringUtils.isEmpty(endDate)) {
+ Date parseDate = EntityUtil.parseDateUTC(endDate);
+ if (parseDate.before(EntityUtil.getStartTime(entity, cluster))) {
+ return true;
+ }
}
- return new EntityList(buildEntityElements(offset, fields, entities, pageCount));
+ return false;
}
protected static HashMap<String, String> getFilterByFieldsValues(String filterBy) {
@@ -547,51 +595,7 @@ public abstract class AbstractEntityManager {
return filterTagsList;
}
- private List<String> getTags(Entity entity) {
- String rawTags = null;
- switch (entity.getEntityType()) {
- case PROCESS:
- rawTags = ((Process) entity).getTags();
- break;
-
- case FEED:
- rawTags = ((Feed) entity).getTags();
- break;
-
- case CLUSTER:
- rawTags = ((Cluster) entity).getTags();
- break;
-
- default:
- break;
- }
-
- List<String> tags = new ArrayList<String>();
- if (!StringUtils.isEmpty(rawTags)) {
- for(String tag : rawTags.split(",")) {
- LOG.info("Adding tag - "+ tag);
- tags.add(tag.trim());
- }
- }
-
- return tags;
- }
-
- private List<String> getPipelines(Entity entity) {
- List<String> pipelines = new ArrayList<String>();
- if (entity.getEntityType().equals(EntityType.PROCESS)) {
- Process process = (Process) entity;
- String pipelineString = process.getPipelines();
- if (pipelineString != null) {
- for (String pipeline : pipelineString.split(",")) {
- pipelines.add(pipeline.trim());
- }
- }
- }
- return pipelines;
- }
-
- private String getStatusString(Entity entity) {
+ protected String getStatusString(Entity entity) {
String statusString;
try {
statusString = getStatus(entity, entity.getEntityType()).name();
@@ -681,6 +685,12 @@ public abstract class AbstractEntityManager {
}
break;
+ case CLUSTER:
+ Set<String> clusters = EntityUtil.getClustersDefined(entity);
+ if (!clusters.contains(filterValue)) {
+ filterEntity = true;
+ }
+
default:
break;
}
@@ -751,11 +761,10 @@ public abstract class AbstractEntityManager {
return retLen;
}
- private EntityElement[] buildEntityElements(Integer offset, HashSet<String> fields,
- ArrayList<Entity> entities, int pageCount) {
- EntityElement[] elements = new EntityElement[pageCount];
+ private EntityElement[] buildEntityElements(HashSet<String> fields, List<Entity> entities) {
+ EntityElement[] elements = new EntityElement[entities.size()];
int elementIndex = 0;
- for (Entity entity : entities.subList(offset, (offset + pageCount))) {
+ for (Entity entity : entities) {
elements[elementIndex++] = getEntityElement(entity, fields);
}
return elements;
@@ -769,10 +778,10 @@ public abstract class AbstractEntityManager {
elem.status = getStatusString(entity);
}
if (fields.contains("pipelines")) {
- elem.pipelines = getPipelines(entity);
+ elem.pipelines = EntityUtil.getPipelines(entity);
}
if (fields.contains("tags")) {
- elem.tag = getTags(entity);
+ elem.tag = EntityUtil.getTags(entity);
}
return elem;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index 6862b99..1ffe471 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -46,7 +46,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
private static final long MINUTE_IN_MILLIS = 60000L;
private static final long HOUR_IN_MILLIS = 3600000L;
- private static final long DAY_IN_MILLIS = 86400000L;
+ protected static final long DAY_IN_MILLIS = 86400000L;
private static final long MONTH_IN_MILLIS = 2592000000L;
protected void checkType(String type) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index f5329ef..6fba6df 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -18,12 +18,16 @@
package org.apache.falcon.resource;
+import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
+import org.apache.falcon.Pair;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.UnschedulableEntityException;
+import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.monitors.Dimension;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.slf4j.Logger;
@@ -33,12 +37,13 @@ import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
+import java.util.*;
/**
* REST resource of allowed actions on Schedulable Entities, Only Process and
* Feed can have schedulable actions.
*/
-public abstract class AbstractSchedulableEntityManager extends AbstractEntityManager {
+public abstract class AbstractSchedulableEntityManager extends AbstractInstanceManager {
private static final Logger LOG = LoggerFactory.getLogger(AbstractSchedulableEntityManager.class);
@@ -152,6 +157,106 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
}
}
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+ /**
+ * Returns summary of most recent N instances of an entity, filtered by cluster.
+ *
+ * @param type Only return entities of this type.
+ * @param startDate For each entity, show instances after startDate.
+ * @param endDate For each entity, show instances before endDate.
+ * @param cluster Return entities for specific cluster.
+ * @param fields fields that the query is interested in, separated by comma
+ * @param filterBy filter by a specific field.
+ * @param filterTags filter by these tags.
+ * @param orderBy order result by these fields.
+ * @param offset Pagination offset.
+ * @param resultsPerPage Number of results that should be returned starting at the offset.
+ * @param numInstances Number of instance summaries to show per entity
+ * @return EntitySummaryResult
+ */
+ public EntitySummaryResult getEntitySummary(String type, String cluster, String startDate, String endDate,
+ String fields, String filterBy, String filterTags,
+ String orderBy, Integer offset,
+ Integer resultsPerPage, Integer numInstances) {
+ HashSet<String> fieldSet = new HashSet<String>(Arrays.asList(fields.toLowerCase().split(",")));
+ Pair<Date, Date> startAndEndDates = getStartEndDatesForSummary(startDate, endDate);
+
+ List<Entity> entities;
+ String colo;
+ try {
+ entities = getEntities(type,
+ SchemaHelper.getDateFormat().format(startAndEndDates.first),
+ SchemaHelper.getDateFormat().format(startAndEndDates.second),
+ cluster, filterBy, filterTags, orderBy, offset, resultsPerPage);
+ colo = ((Cluster) configStore.get(EntityType.CLUSTER, cluster)).getColo();
+ } catch (Exception e) {
+ LOG.error("Failed to get entities", e);
+ throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
+ }
+
+ List<EntitySummaryResult.EntitySummary> entitySummaries = new ArrayList<EntitySummaryResult.EntitySummary>();
+ for (Entity entity : entities) {
+ InstancesResult instancesResult = getInstances(entity.getEntityType().name(), entity.getName(),
+ SchemaHelper.getDateFormat().format(startAndEndDates.first),
+ SchemaHelper.getDateFormat().format(startAndEndDates.second),
+ colo, null, "", "", 0, numInstances);
+
+ /* ToDo - Use oozie bulk API after FALCON-591 is implemented
+ * getBulkInstances(entity, cluster,
+ * startAndEndDates.first, startAndEndDates.second, colo, "starttime", 0, numInstances);
+ */
+ List<EntitySummaryResult.Instance> entitySummaryInstances =
+ getElementsFromInstanceResult(instancesResult);
+
+ List<String> pipelines = new ArrayList<String>();
+ List<String> tags = new ArrayList<String>();
+ if (fieldSet.contains("pipelines")) { pipelines = EntityUtil.getPipelines(entity); }
+ if (fieldSet.contains("tags")) { tags = EntityUtil.getTags(entity); }
+
+ EntitySummaryResult.EntitySummary entitySummary =
+ new EntitySummaryResult.EntitySummary(entity.getName(), entity.getEntityType().toString(),
+ getStatusString(entity),
+ tags.toArray(new String[tags.size()]),
+ pipelines.toArray(new String[pipelines.size()]),
+ entitySummaryInstances.toArray(
+ new EntitySummaryResult.Instance[entitySummaryInstances.size()]));
+ entitySummaries.add(entitySummary);
+ }
+ return new EntitySummaryResult("Entity Summary Result",
+ entitySummaries.toArray(new EntitySummaryResult.EntitySummary[entitySummaries.size()]));
+ }
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+ private Pair<Date, Date> getStartEndDatesForSummary(String startDate, String endDate) {
+ Date end = (StringUtils.isEmpty(endDate)) ? new Date() : SchemaHelper.parseDateUTC(endDate);
+
+ long startMillisecs = end.getTime() - (2* DAY_IN_MILLIS); // default - 2 days before end
+ Date start = (StringUtils.isEmpty(startDate))
+ ? new Date(startMillisecs) : SchemaHelper.parseDateUTC(startDate);
+
+ return new Pair<Date, Date>(start, end);
+ }
+
+ private List<EntitySummaryResult.Instance> getElementsFromInstanceResult(InstancesResult instancesResult) {
+ ArrayList<EntitySummaryResult.Instance> elemInstanceList =
+ new ArrayList<EntitySummaryResult.Instance>();
+ InstancesResult.Instance[] instances = instancesResult.getInstances();
+ if (instances != null && instances.length > 0) {
+ for (InstancesResult.Instance rawInstance : instances) {
+ EntitySummaryResult.Instance instance = new EntitySummaryResult.Instance(rawInstance.getCluster(),
+ rawInstance.getInstance(),
+ EntitySummaryResult.WorkflowStatus.valueOf(rawInstance.getStatus().toString()));
+ instance.logFile = rawInstance.getLogFile();
+ instance.sourceCluster = rawInstance.sourceCluster;
+ instance.startTime = rawInstance.startTime;
+ instance.endTime = rawInstance.endTime;
+ elemInstanceList.add(instance);
+ }
+ }
+
+ return elemInstanceList;
+ }
+
private void checkSchedulableEntity(String type) throws UnschedulableEntityException {
EntityType entityType = EntityType.valueOf(type.toUpperCase());
if (!entityType.isSchedulable()) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index f743005..e69e531 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -30,6 +30,7 @@ import org.apache.falcon.monitors.Monitored;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.AbstractSchedulableEntityManager;
import org.apache.falcon.resource.EntityList;
+import org.apache.falcon.resource.EntitySummaryResult;
import org.apache.falcon.resource.channel.Channel;
import org.apache.falcon.resource.channel.ChannelFactory;
import org.apache.falcon.util.DeploymentUtil;
@@ -403,6 +404,29 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
}.execute();
}
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+ @GET
+ @Path("summary/{type}/{cluster}")
+ @Produces({MediaType.TEXT_XML, MediaType.APPLICATION_JSON})
+ @Monitored(event = "summary")
+ @Override
+ public EntitySummaryResult getEntitySummary(
+ @Dimension("type") @PathParam("type") final String type,
+ @Dimension("cluster") @PathParam("cluster") final String cluster,
+ @DefaultValue("") @QueryParam("start") String startStr,
+ @DefaultValue("") @QueryParam("end") String endStr,
+ @DefaultValue("") @QueryParam("fields") final String entityFields,
+ @DefaultValue("") @QueryParam("filterBy") final String entityFilter,
+ @DefaultValue("") @QueryParam("tags") final String entityTags,
+ @DefaultValue("") @QueryParam("orderBy") final String entityOrderBy,
+ @DefaultValue("0") @QueryParam("offset") final Integer entityOffset,
+ @DefaultValue("10") @QueryParam("numResults") final Integer numEntities,
+ @DefaultValue("7") @QueryParam("numInstances") final Integer numInstanceResults) {
+ return super.getEntitySummary(type, cluster, startStr, endStr, entityFields, entityFilter, entityTags,
+ entityOrderBy, entityOffset, numEntities, numInstanceResults);
+ }
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
private abstract class EntityProxy {
private String type;
private String name;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
index 2c0fa25..0b55eb3 100644
--- a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
@@ -21,7 +21,9 @@ import org.apache.falcon.FalconWebException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.process.ACL;
+import org.apache.falcon.entity.v0.process.Clusters;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Validity;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.util.StartupProperties;
import org.mockito.Mock;
@@ -35,6 +37,7 @@ import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Date;
import static org.mockito.Mockito.when;
@@ -48,6 +51,7 @@ public class EntityManagerTest extends AbstractEntityManager {
private static final String SAMPLE_PROCESS_XML = "/process-version-0.xml";
private static final String SAMPLE_INVALID_PROCESS_XML = "/process-invalid.xml";
+ private static final long DAY_IN_MILLIS = 86400000L;
@BeforeClass
public void init() {
@@ -199,14 +203,28 @@ public class EntityManagerTest extends AbstractEntityManager {
acl.setGroup("hdfs");
acl.setPermission("*");
- Process p = new Process();
- p.setName(name);
- p.setACL(acl);
- p.setPipelines(pipelines);
- p.setTags(tags);
- return p;
+ Process process = new Process();
+ process.setName(name);
+ process.setACL(acl);
+ process.setPipelines(pipelines);
+ process.setTags(tags);
+ process.setClusters(buildClusters("cluster" + name));
+ return process;
}
+ private Clusters buildClusters(String name) {
+ Validity validity = new Validity();
+ long startMilliSecs = new Date().getTime() - (2 * DAY_IN_MILLIS);
+ validity.setStart(new Date(startMilliSecs));
+ validity.setEnd(new Date());
+ org.apache.falcon.entity.v0.process.Cluster cluster = new org.apache.falcon.entity.v0.process.Cluster();
+ cluster.setName(name);
+ cluster.setValidity(validity);
+
+ Clusters clusters = new Clusters();
+ clusters.getClusters().add(cluster);
+ return clusters;
+ }
/**
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
index 58a3bd2..c10301b 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
@@ -69,6 +69,29 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
return super.getEntityList(type, fields, filterBy, tags, orderBy, offset, resultsPerPage);
}
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+ @GET
+ @Path("summary/{type}/{cluster}")
+ @Produces({MediaType.TEXT_XML, MediaType.APPLICATION_JSON})
+ @Monitored(event = "summary")
+ @Override
+ public EntitySummaryResult getEntitySummary(
+ @Dimension("type") @PathParam("type") String type,
+ @Dimension("cluster") @PathParam("cluster") String cluster,
+ @DefaultValue("") @QueryParam("start") String startStr,
+ @DefaultValue("") @QueryParam("end") String endStr,
+ @DefaultValue("") @QueryParam("fields") String fields,
+ @DefaultValue("") @QueryParam("filterBy") String entityFilter,
+ @DefaultValue("") @QueryParam("tags") String entityTags,
+ @DefaultValue("") @QueryParam("orderBy") String entityOrderBy,
+ @DefaultValue("0") @QueryParam("offset") Integer entityOffset,
+ @DefaultValue("10") @QueryParam("numResults") Integer numEntities,
+ @DefaultValue("7") @QueryParam("numInstances") Integer numInstanceResults) {
+ return super.getEntitySummary(type, cluster, startStr, endStr, fields, entityFilter, entityTags,
+ entityOrderBy, entityOffset, numEntities, numInstanceResults);
+ }
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
@GET
@Path("definition/{type}/{entity}")
@Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 4ac978c..df99c23 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -285,6 +285,25 @@ public class FalconCLIIT {
executeWithURL("entity -status -type process -name "
+ overlay.get("processName")));
+ Assert.assertEquals(0,
+ executeWithURL("entity -summary -type feed -cluster "+ overlay.get("cluster")
+ + " -fields status,tags -start " + START_INSTANCE
+ + " -filterBy TYPE:FEED -orderBy name "
+ + " -offset 0 -numResults 1 -numInstances 5"));
+ Assert.assertEquals(0,
+ executeWithURL("entity -summary -type process -fields status,pipelines"
+ + " -cluster " + overlay.get("cluster")
+ + " -start " + SchemaHelper.getDateFormat().format(new Date(0))
+ + " -end " + SchemaHelper.getDateFormat().format(new Date())
+ + " -filterBy TYPE:PROCESS -orderBy name "
+ + " -offset 0 -numResults 1 -numInstances 7"));
+ // No start or end date
+ Assert.assertEquals(0,
+ executeWithURL("entity -summary -type process -fields status,pipelines"
+ + " -cluster " + overlay.get("cluster")
+ + " -filterBy TYPE:PROCESS -orderBy name "
+ + " -offset 0 -numResults 1 -numInstances 7"));
+
}
public void testSubCommandPresence() throws Exception {
@@ -457,6 +476,12 @@ public class FalconCLIIT {
+ overlay.get("outputFeedName")
+ " -start "+ SchemaHelper.getDateFormat().format(new Date())
+" -filterBy STATUS:SUCCEEDED -offset 0 -numResults 1"));
+ // When you get a cluster for which there are no feed entities,
+ Assert.assertEquals(0,
+ executeWithURL("entity -summary -type feed -cluster " + overlay.get("cluster") + " -fields status,tags"
+ + " -start "+ SchemaHelper.getDateFormat().format(new Date())
+ + " -offset 0 -numResults 1 -numInstances 3"));
+
}
public void testInstanceRunningAndSummaryCommands() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index 0a41fe0..4755c30 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -803,6 +803,19 @@ public class EntityManagerJerseyIT {
for (EntityList.EntityElement entityElement : result.getElements()) {
Assert.assertNotNull(entityElement.status); // status is null
}
+
+ response = context.service
+ .path("api/entities/summary/process/" + overlay.get("cluster"))
+ .queryParam("fields", "status,pipelines")
+ .queryParam("numInstances", "1")
+ .queryParam("orderBy", "name")
+ .header("Cookie", context.getAuthenticationToken())
+ .type(MediaType.APPLICATION_JSON)
+ .accept(MediaType.APPLICATION_JSON)
+ .get(ClientResponse.class);
+ Assert.assertEquals(response.getStatus(), 200);
+ EntitySummaryResult summaryResult = response.getEntity(EntitySummaryResult.class);
+ Assert.assertNotNull(summaryResult);
}
public Date getEndTime() {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/305feb0b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index 9752518..40da789 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -38,6 +38,7 @@ import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -350,7 +351,7 @@ public class TestContext {
long time = System.currentTimeMillis();
clusterName = "cluster" + time;
overlay.put("cluster", clusterName);
- overlay.put("colo", "gs");
+ overlay.put("colo", DeploymentUtil.getCurrentColo());
overlay.put("inputFeedName", "in" + time);
//only feeds with future dates can be scheduled
Date endDate = new Date(System.currentTimeMillis() + 15 * 60 * 1000);
@@ -390,7 +391,7 @@ public class TestContext {
Map<String, String> overlay = new HashMap<String, String>();
overlay.put("cluster", RandomStringUtils.randomAlphabetic(5));
- overlay.put("colo", "gs");
+ overlay.put("colo", DeploymentUtil.getCurrentColo());
TestContext.overlayParametersOverTemplate(clusterTemplate, overlay);
EmbeddedCluster cluster = EmbeddedCluster.newCluster(overlay.get("cluster"), true);