You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by su...@apache.org on 2015/06/29 16:06:27 UTC
falcon git commit: FALCON-1186. Add filtering capability to result of
instance summary. Contributed by Suhas Vasu
Repository: falcon
Updated Branches:
refs/heads/master 98e12502a -> aabf5e136
FALCON-1186. Add filtering capability to result of instance summary. Contributed by Suhas Vasu
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/aabf5e13
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/aabf5e13
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/aabf5e13
Branch: refs/heads/master
Commit: aabf5e1361c87e52fd49141c57850f58f68fc785
Parents: 98e1250
Author: Suhas Vasu <su...@inmobi.com>
Authored: Mon Jun 29 19:35:57 2015 +0530
Committer: Suhas Vasu <su...@inmobi.com>
Committed: Mon Jun 29 19:35:57 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../java/org/apache/falcon/cli/FalconCLI.java | 14 +-
.../org/apache/falcon/client/FalconClient.java | 6 +-
.../falcon/resource/InstancesSummaryResult.java | 7 +
docs/src/site/twiki/FalconCLI.twiki | 9 +-
.../site/twiki/restapi/InstanceSummary.twiki | 73 ++++++++-
.../falcon/resource/AbstractEntityManager.java | 73 ++++++---
.../resource/AbstractInstanceManager.java | 151 ++++++++++++++++---
.../AbstractSchedulableEntityManager.java | 4 +-
.../resource/proxy/InstanceManagerProxy.java | 8 +-
.../apache/falcon/resource/InstanceManager.java | 8 +-
11 files changed, 295 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/aabf5e13/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 14658f1..d2d589e 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ Trunk (Unreleased)
FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava)
IMPROVEMENTS
+ FALCON-1186 Add filtering capability to result of instance summary (Suhas Vasu)
+
FALCON-1293 Update CHANGES.txt to change 0.6.1 branch to release (Shaik Idris Ali via Ajay Yadava)
FALCON-1116 Rule for Oozie 4+ doesn't match 5+ versions (Ruslan Ostafiychuk)
http://git-wip-us.apache.org/repos/asf/falcon/blob/aabf5e13/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index cc041c0..e393f82 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -36,6 +36,7 @@ import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.FeedLookupResult;
import org.apache.falcon.resource.InstanceDependencyResult;
import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.InstancesSummaryResult;
import java.io.IOException;
import java.io.InputStream;
@@ -281,10 +282,12 @@ public class FalconCLI {
lifeCycles,
filterBy, orderBy, sortOrder, offset, numResults));
} else if (optionsList.contains(SUMMARY_OPT)) {
+ validateOrderBy(orderBy, "summary");
+ validateFilterBy(filterBy, "summary");
result =
ResponseHelper.getString(client
.getSummaryOfInstances(type, entity, start, end, colo,
- lifeCycles));
+ lifeCycles, filterBy, orderBy, sortOrder));
} else if (optionsList.contains(KILL_OPT)) {
validateNotEmpty(start, START_OPT);
validateNotEmpty(end, END_OPT);
@@ -584,6 +587,8 @@ public class FalconCLI {
EntityList.EntityFilterByFields.valueOf(tempKeyVal[0].toUpperCase());
} else if (filterType.equals("instance")) {
InstancesResult.InstanceFilterFields.valueOf(tempKeyVal[0].toUpperCase());
+ }else if (filterType.equals("summary")) {
+ InstancesSummaryResult.InstanceSummaryFilterFields.valueOf(tempKeyVal[0].toUpperCase());
} else {
throw new IllegalArgumentException("Invalid API call");
}
@@ -606,8 +611,13 @@ public class FalconCLI {
if (Arrays.asList(new String[] {"type", "name"}).contains(orderBy.toLowerCase())) {
return;
}
+ } else if (action.equals("summary")) {
+ if (Arrays.asList(new String[]{"cluster"})
+ .contains(orderBy.toLowerCase())) {
+ return;
+ }
}
- throw new FalconCLIException("Invalid orderBy argument : " + ORDER_BY_OPT);
+ throw new FalconCLIException("Invalid orderBy argument : " + orderBy);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aabf5e13/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 5df8626..555bdb7 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -437,10 +437,12 @@ public class FalconClient {
public InstancesSummaryResult getSummaryOfInstances(String type, String entity,
String start, String end,
- String colo, List<LifeCycle> lifeCycles) throws FalconCLIException {
+ String colo, List<LifeCycle> lifeCycles,
+ String filterBy, String orderBy, String sortOrder) throws FalconCLIException {
return sendInstanceRequest(Instances.SUMMARY, type, entity, start, end, null,
- null, colo, lifeCycles, "", "", "", 0, DEFAULT_NUM_RESULTS).getEntity(InstancesSummaryResult.class);
+ null, colo, lifeCycles, filterBy, orderBy, sortOrder, 0, DEFAULT_NUM_RESULTS)
+ .getEntity(InstancesSummaryResult.class);
}
public FeedInstanceResult getFeedListing(String type, String entity, String start,
http://git-wip-us.apache.org/repos/asf/falcon/blob/aabf5e13/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java
index a3dcbe4..aa0db99 100644
--- a/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java
@@ -32,6 +32,13 @@ import java.util.Map;
@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
public class InstancesSummaryResult extends APIResult {
+ /**
+ * RestAPI supports filterBy these fields of instanceSummary.
+ */
+ public static enum InstanceSummaryFilterFields {
+ STATUS, CLUSTER
+ }
+
@XmlElement
private InstanceSummary[] instancesSummary;
http://git-wip-us.apache.org/repos/asf/falcon/blob/aabf5e13/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index aeab8f8..9203699 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -198,7 +198,7 @@ If the instance is in WAITING state, missing dependencies are listed
Example : Suppose a process has 3 instance, one has succeeded,one is in running state and other one is waiting, the expected output is:
-{"status":"SUCCEEDED","message":"getStatus is successful","instances":[{"instance":"2012-05-07T05:02Z","status":"SUCCEEDED","logFile":"http://oozie-dashboard-url"},{"instance":"2012-05-07T05:07Z","status":"RUNNING","logFile":"http://oozie-dashboard-url"}, {"instance":"2010-01-02T11:05Z","status":"WAITING"}]
+{"status":"SUCCEEDED","message":"getStatus is successful","instances":[{"instance":"2012-05-07T05:02Z","status":"SUCCEEDED","logFile":"http://oozie-dashboard-url"},{"instance":"2012-05-07T05:07Z","status":"RUNNING","logFile":"http://oozie-dashboard-url"}, {"instance":"2010-01-02T11:05Z","status":"WAITING"}]}
Usage:
$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -list
@@ -217,13 +217,14 @@ The unscheduled instances between the specified time period are included as UNSC
Example : Suppose a process has 3 instance, one has succeeded,one is in running state and other one is waiting, the expected output is:
-{"status":"SUCCEEDED","message":"getSummary is successful", "cluster": <<name>> [{"SUCCEEDED":"1"}, {"WAITING":"1"}, {"RUNNING":"1"}]}
+{"status":"SUCCEEDED","message":"getSummary is successful", instancesSummary:[{"cluster": <<name>> "map":[{"SUCCEEDED":"1"}, {"WAITING":"1"}, {"RUNNING":"1"}]}]}
Usage:
$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -summary
-Optional Args : -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'"
--colo <<colo>> -lifecycle <<lifecycles>>
+Optional Args : -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'" -colo <<colo>>
+-filterBy <<field1:value1,field2:value2>> -lifecycle <<lifecycles>>
+-orderBy field -sortOrder <<sortOrder>>
<a href="./Restapi/InstanceSummary.html">Optional params described here.</a>
http://git-wip-us.apache.org/repos/asf/falcon/blob/aabf5e13/docs/src/site/twiki/restapi/InstanceSummary.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceSummary.twiki b/docs/src/site/twiki/restapi/InstanceSummary.twiki
index 9d1041e..2e44598 100644
--- a/docs/src/site/twiki/restapi/InstanceSummary.twiki
+++ b/docs/src/site/twiki/restapi/InstanceSummary.twiki
@@ -8,14 +8,23 @@
Get summary of instance/instances of an entity.
---++ Parameters
- * :entity-type Valid options are cluster, feed or process.
+ * :entity-type Valid options are feed or process.
* :entity-name Name of the entity.
* start <optional param> Show instances from this date. Date format is yyyy-MM-dd'T'HH:mm'Z'.
- * By default, it is set to (end - (10 * entityFrequency)).
+ * By default, it is set to (end - (10 * entityFrequency)).
* end <optional param> Show instances up to this date. Date format is yyyy-MM-dd'T'HH:mm'Z'.
- * Default is set to now.
+ * Default is set to now.
* colo <optional param> Colo on which the query should be run.
* lifecycle <optional param> Valid lifecycles for feed are Eviction/Replication(default) and for process is Execution(default).
+ * filterBy <optional param> Filter results by list of field:value pairs.
+ Example1: filterBy=STATUS:RUNNING,CLUSTER:primary-cluster
+ Example2: filterBy=Status:RUNNING,Status:KILLED
+ * Supported filter fields are STATUS, CLUSTER
+ * Query will do an AND among filterBy fields.
+ * orderBy <optional param> Field by which results should be ordered.
+ * Supports ordering by "cluster".
+ * sortOrder <optional param> Valid options are "asc" and "desc"
+ Example: orderBy=cluster sortOrder=asc
---++ Results
Summary of the instances over the specified time range
@@ -39,10 +48,66 @@ GET http://localhost:15000/api/instance/summary/process/WordCount?colo=*&start=2
"entry":
{
"key":"SUCCEEDED",
- "key2":"value"
+ "value":"value"
}
}
}
}
}
</verbatim>
+
+---+++ Rest Call
+<verbatim>
+GET https://localhost:16443/api/instance/summary/process/WordCount?filterBy=Status:KILLED,Status:RUNNING&start=2015-06-24T16:00Z&end=2015-06-24T23:00Z&colo=*
+</verbatim>
+---+++ Result
+<verbatim>
+{
+ "status":"SUCCEEDED",
+ "message":"local/SUMMARY\n",
+ "requestId":"local/1246061948@qtp-1059149611-5 - 34d8c3bb-f461-4fd5-87cd-402c9c6b1ed2\n",
+ "instancesSummary":[
+ {
+ "cluster":"local",
+ "map":{
+ "entry":{
+ "key":"RUNNING",
+ "value":"1"
+ },
+ "entry":{
+ "key":"KILLED",
+ "value":"1"
+ }
+ }
+ }
+ ]
+}
+</verbatim>
+
+---+++ Rest Call
+<verbatim>
+GET https://localhost:16443/api/instance/summary/process/WordCount?orderBy=cluster&sortOrder=asc&start=2015-06-24T16:00Z&end=2015-06-24T23:00Z&colo=*
+</verbatim>
+---+++ Result
+<verbatim>
+{
+ "status":"SUCCEEDED",
+ "message":"local/SUMMARY\n",
+ "requestId":"local/1246061948@qtp-1059149611-5 - 42e2040d-6b6e-4bfd-a090-83db5ed1a429\n",
+ "instancesSummary":[
+ {
+ "cluster":"local",
+ "map":{
+ "entry":{
+ "key":"SUCCEEDED",
+ "value":"6"
+ },
+ "entry":{
+ "key":"KILLED",
+ "value":"1"
+ }
+ }
+ }
+ ]
+}
+</verbatim>
http://git-wip-us.apache.org/repos/asf/falcon/blob/aabf5e13/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index a721666..a3801e9 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -581,10 +581,10 @@ public abstract class AbstractEntityManager {
String orderBy, String sortOrder, Integer offset, Integer resultsPerPage) {
HashSet<String> fields = new HashSet<String>(Arrays.asList(fieldStr.toUpperCase().split(",")));
- Map<String, String> filterByFieldsValues = getFilterByFieldsValues(filterBy);
+ Map<String, List<String>> filterByFieldsValues = getFilterByFieldsValues(filterBy);
validateEntityFilterByClause(filterByFieldsValues);
if (StringUtils.isNotEmpty(filterTags)) {
- filterByFieldsValues.put(EntityList.EntityFilterByFields.TAGS.name(), filterTags);
+ filterByFieldsValues.put(EntityList.EntityFilterByFields.TAGS.name(), Arrays.asList(filterTags));
}
// get filtered entities
@@ -634,8 +634,8 @@ public abstract class AbstractEntityManager {
return entitiesReturn;
}
- protected Map<String, String> validateEntityFilterByClause(Map<String, String> filterByFieldsValues) {
- for (Map.Entry<String, String> entry : filterByFieldsValues.entrySet()) {
+ protected Map<String, List<String>> validateEntityFilterByClause(Map<String, List<String>> filterByFieldsValues) {
+ for (Map.Entry<String, List<String>> entry : filterByFieldsValues.entrySet()) {
try {
EntityList.EntityFilterByFields.valueOf(entry.getKey().toUpperCase());
} catch (IllegalArgumentException e) {
@@ -646,14 +646,15 @@ public abstract class AbstractEntityManager {
return filterByFieldsValues;
}
- protected Map<String, String> validateEntityFilterByClause(String entityFilterByClause) {
- Map<String, String> filterByFieldsValues = getFilterByFieldsValues(entityFilterByClause);
+ protected Map<String, List<String>> validateEntityFilterByClause(String entityFilterByClause) {
+ Map<String, List<String>> filterByFieldsValues = getFilterByFieldsValues(entityFilterByClause);
return validateEntityFilterByClause(filterByFieldsValues);
}
protected List<Entity> getFilteredEntities(
- EntityType entityType, String nameSubsequence, String tagKeywords, Map<String, String> filterByFieldsValues,
- String startDate, String endDate, String cluster) throws FalconException, IOException {
+ EntityType entityType, String nameSubsequence, String tagKeywords,
+ Map<String, List<String>> filterByFieldsValues, String startDate, String endDate, String cluster)
+ throws FalconException, IOException {
Collection<String> entityNames = configStore.getEntities(entityType);
if (entityNames.isEmpty()) {
return Collections.emptyList();
@@ -661,7 +662,12 @@ public abstract class AbstractEntityManager {
List<Entity> entities = new ArrayList<Entity>();
char[] subsequence = nameSubsequence.toLowerCase().toCharArray();
- final List<String> tagKeywordsList = getFilterByTags(tagKeywords.toLowerCase());
+ List<String> tagKeywordsList;
+ if (StringUtils.isEmpty(tagKeywords)) {
+ tagKeywordsList = new ArrayList<>();
+ } else {
+ tagKeywordsList = getFilterByTags(Arrays.asList(tagKeywords.toLowerCase()));
+ }
for (String entityName : entityNames) {
Entity entity;
try {
@@ -765,16 +771,21 @@ public abstract class AbstractEntityManager {
return false;
}
- protected static Map<String, String> getFilterByFieldsValues(String filterBy) {
+ protected static Map<String, List<String>> getFilterByFieldsValues(String filterBy) {
// Filter the results by specific field:value, eliminate empty values
- Map<String, String> filterByFieldValues = new HashMap<String, String>();
+ Map<String, List<String>> filterByFieldValues = new HashMap<String, List<String>>();
if (StringUtils.isNotEmpty(filterBy)) {
String[] fieldValueArray = filterBy.split(",");
for (String fieldValue : fieldValueArray) {
String[] splits = fieldValue.split(":", 2);
String filterByField = splits[0];
if (splits.length == 2 && !splits[1].equals("")) {
- filterByFieldValues.put(filterByField, splits[1]);
+ List<String> currentValue = filterByFieldValues.get(filterByField);
+ if (currentValue == null) {
+ currentValue = new ArrayList<String>();
+ filterByFieldValues.put(filterByField, currentValue);
+ }
+ currentValue.add(splits[1]);
}
}
}
@@ -782,13 +793,16 @@ public abstract class AbstractEntityManager {
return filterByFieldValues;
}
- private static List<String> getFilterByTags(String filterTags) {
+ private static List<String> getFilterByTags(List<String> filterTags) {
ArrayList<String> filterTagsList = new ArrayList<String>();
- if (StringUtils.isNotEmpty(filterTags)) {
- String[] splits = filterTags.split(",");
- for (String tag : splits) {
- filterTagsList.add(tag.trim());
+ if (filterTags!= null && !filterTags.isEmpty()) {
+ for (String filterTag: filterTags) {
+ String[] splits = filterTag.split(",");
+ for (String tag : splits) {
+ filterTagsList.add(tag.trim());
+ }
}
+
}
return filterTagsList;
}
@@ -834,12 +848,12 @@ public abstract class AbstractEntityManager {
return false;
}
- private boolean isFilteredByFields(Entity entity, Map<String, String> filterKeyVals) {
+ private boolean isFilteredByFields(Entity entity, Map<String, List<String>> filterKeyVals) {
if (filterKeyVals.isEmpty()) {
return false;
}
- for (Map.Entry<String, String> pair : filterKeyVals.entrySet()) {
+ for (Map.Entry<String, List<String>> pair : filterKeyVals.entrySet()) {
EntityList.EntityFilterByFields filter =
EntityList.EntityFilterByFields.valueOf(pair.getKey().toUpperCase());
if (isEntityFiltered(entity, filter, pair)) {
@@ -851,16 +865,16 @@ public abstract class AbstractEntityManager {
}
private boolean isEntityFiltered(Entity entity, EntityList.EntityFilterByFields filter,
- Map.Entry<String, String> pair) {
+ Map.Entry<String, List<String>> pair) {
switch (filter) {
case TYPE:
- return !entity.getEntityType().toString().equalsIgnoreCase(pair.getValue());
+ return !containsIgnoreCase(pair.getValue(), entity.getEntityType().toString());
case NAME:
- return !entity.getName().equalsIgnoreCase(pair.getValue());
+ return !containsIgnoreCase(pair.getValue(), entity.getName());
case STATUS:
- return !getStatusString(entity).equalsIgnoreCase(pair.getValue());
+ return !containsIgnoreCase(pair.getValue(), getStatusString(entity));
case PIPELINES:
if (!entity.getEntityType().equals(EntityType.PROCESS)) {
@@ -868,10 +882,10 @@ public abstract class AbstractEntityManager {
"Invalid filterBy key for non process entities " + pair.getKey(),
Response.Status.BAD_REQUEST);
}
- return !EntityUtil.getPipelines(entity).contains(pair.getValue());
+ return !EntityUtil.getPipelines(entity).contains(pair.getValue().get(0));
case CLUSTER:
- return !EntityUtil.getClustersDefined(entity).contains(pair.getValue());
+ return !EntityUtil.getClustersDefined(entity).contains(pair.getValue().get(0));
case TAGS:
return isFilteredByTags(getFilterByTags(pair.getValue()), EntityUtil.getTags(entity));
@@ -1082,4 +1096,13 @@ public abstract class AbstractEntityManager {
throw new FalconRuntimException("Unable to consolidate result.", e);
}
}
+
+ private boolean containsIgnoreCase(List<String> strList, String str) {
+ for (String s : strList) {
+ if (s.equalsIgnoreCase(str)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aabf5e13/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index 13e0c82..310e73b 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -41,6 +41,7 @@ import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.logging.LogProvider;
import org.apache.falcon.resource.InstancesResult.Instance;
+import org.apache.falcon.resource.InstancesSummaryResult.InstanceSummary;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.slf4j.Logger;
@@ -56,11 +57,11 @@ import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.HashMap;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
@@ -134,13 +135,13 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
protected void validateInstanceFilterByClause(String entityFilterByClause) {
- Map<String, String> filterByFieldsValues = getFilterByFieldsValues(entityFilterByClause);
- for (Map.Entry<String, String> entry : filterByFieldsValues.entrySet()) {
+ Map<String, List<String>> filterByFieldsValues = getFilterByFieldsValues(entityFilterByClause);
+ for (Map.Entry<String, List<String>> entry : filterByFieldsValues.entrySet()) {
try {
InstancesResult.InstanceFilterFields filterKey =
InstancesResult.InstanceFilterFields .valueOf(entry.getKey().toUpperCase());
if (filterKey == InstancesResult.InstanceFilterFields.STARTEDAFTER) {
- EntityUtil.parseDateUTC(entry.getValue());
+ getEarliestDate(entry.getValue());
}
} catch (IllegalArgumentException e) {
throw FalconWebException.newInstanceException(
@@ -245,7 +246,8 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
public InstancesSummaryResult getSummary(String type, String entity, String startStr, String endStr,
- String colo, List<LifeCycle> lifeCycles) {
+ String colo, List<LifeCycle> lifeCycles,
+ String filterBy, String orderBy, String sortOrder) {
checkColo(colo);
checkType(type);
try {
@@ -255,10 +257,12 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
Pair<Date, Date> startAndEndDate = getStartAndEndDate(entityObject, startStr, endStr);
AbstractWorkflowEngine wfEngine = getWorkflowEngine();
- return wfEngine.getSummary(entityObject, startAndEndDate.first, startAndEndDate.second,
- lifeCycles);
+ return getInstanceSummaryResultSubset(wfEngine.getSummary(entityObject,
+ startAndEndDate.first, startAndEndDate.second, lifeCycles),
+ filterBy, orderBy, sortOrder);
+
} catch (Throwable e) {
- LOG.error("Failed to get instances status", e);
+ LOG.error("Failed to get instance summary", e);
throw FalconWebException.newInstanceSummaryException(e, Response.Status.BAD_REQUEST);
}
}
@@ -297,7 +301,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
// Filter instances
- Map<String, String> filterByFieldsValues = getFilterByFieldsValues(filterBy);
+ Map<String, List<String>> filterByFieldsValues = getFilterByFieldsValues(filterBy);
List<Instance> instanceSet = getFilteredInstanceSet(resultSet, filterByFieldsValues);
int pageCount = super.getRequiredNumberOfResults(instanceSet.size(), offset, numResults);
@@ -314,8 +318,28 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
return result;
}
+ private InstancesSummaryResult getInstanceSummaryResultSubset(InstancesSummaryResult resultSet, String filterBy,
+ String orderBy, String sortOrder) throws FalconException {
+ if (resultSet.getInstancesSummary() == null) {
+ // return the empty resultSet
+ resultSet.setInstancesSummary(new InstancesSummaryResult.InstanceSummary[0]);
+ return resultSet;
+ }
+
+ // Filter instances
+ Map<String, List<String>> filterByFieldsValues = getFilterByFieldsValues(filterBy);
+ List<InstanceSummary> instanceSet = getFilteredInstanceSummarySet(resultSet, filterByFieldsValues);
+
+ InstancesSummaryResult result = new InstancesSummaryResult(resultSet.getStatus(), resultSet.getMessage());
+
+ // Sort the ArrayList using orderBy
+ instanceSet = sortInstanceSummary(instanceSet, orderBy.toLowerCase(), sortOrder);
+ result.setInstancesSummary(instanceSet.toArray(new InstanceSummary[instanceSet.size()]));
+ return result;
+ }
+
private List<Instance> getFilteredInstanceSet(InstancesResult resultSet,
- Map<String, String> filterByFieldsValues)
+ Map<String, List<String>> filterByFieldsValues)
throws FalconException {
// If filterBy is empty, return all instances. Else return instances with matching filter.
if (filterByFieldsValues.size() == 0) {
@@ -327,7 +351,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
boolean isInstanceFiltered = false;
// for each filter
- for (Map.Entry<String, String> pair : filterByFieldsValues.entrySet()) {
+ for (Map.Entry<String, List<String>> pair : filterByFieldsValues.entrySet()) {
if (isInstanceFiltered(instance, pair)) { // wait until all filters are applied
isInstanceFiltered = true;
break; // no use to continue other filters as the current one filtered this
@@ -342,25 +366,80 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
return instanceSet;
}
+ private List<InstanceSummary> getFilteredInstanceSummarySet(InstancesSummaryResult resultSet,
+ Map<String, List<String>> filterByFieldsValues)
+ throws FalconException {
+ // If filterBy is empty, return all instances. Else return instances with matching filter.
+ if (filterByFieldsValues.size() == 0) {
+ return Arrays.asList(resultSet.getInstancesSummary());
+ }
+
+ List<InstanceSummary> instanceSet = new ArrayList<>();
+ // for each instance
+ for (InstanceSummary instance : resultSet.getInstancesSummary()) {
+ // for each filter
+ boolean isInstanceFiltered = false;
+ Map<String, Long> newSummaryMap = null;
+ for (Map.Entry<String, List<String>> pair : filterByFieldsValues.entrySet()) {
+ switch (InstancesSummaryResult.InstanceSummaryFilterFields.valueOf(pair.getKey().toUpperCase())) {
+ case CLUSTER:
+ if (instance.getCluster() == null || !containsIgnoreCase(pair.getValue(), instance.getCluster())) {
+ isInstanceFiltered = true;
+ }
+ break;
+
+ case STATUS:
+ if (newSummaryMap == null) {
+ newSummaryMap = new HashMap<>();
+ }
+ if (instance.getSummaryMap() == null || instance.getSummaryMap().isEmpty()) {
+ isInstanceFiltered = true;
+ } else {
+ for (Map.Entry<String, Long> entry : instance.getSummaryMap().entrySet()) {
+ if (containsIgnoreCase(pair.getValue(), entry.getKey())) {
+ newSummaryMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ }
+ break;
+
+ default:
+ isInstanceFiltered = true;
+ }
+
+ if (isInstanceFiltered) { // wait until all filters are applied
+ break; // no use to continue other filters as the current one filtered this
+ }
+ }
+
+ if (!isInstanceFiltered) { // survived all filters
+ instanceSet.add(new InstanceSummary(instance.getCluster(), newSummaryMap));
+ }
+ }
+
+ return instanceSet;
+ }
+
private boolean isInstanceFiltered(Instance instance,
- Map.Entry<String, String> pair) throws FalconException {
- final String filterValue = pair.getValue();
+ Map.Entry<String, List<String>> pair) throws FalconException {
+ final List<String> filterValue = pair.getValue();
switch (InstancesResult.InstanceFilterFields.valueOf(pair.getKey().toUpperCase())) {
case STATUS:
return instance.getStatus() == null
- || !instance.getStatus().toString().equalsIgnoreCase(filterValue);
+ || !containsIgnoreCase(filterValue, instance.getStatus().toString());
case CLUSTER:
return instance.getCluster() == null
- || !instance.getCluster().equalsIgnoreCase(filterValue);
+ || !containsIgnoreCase(filterValue, instance.getCluster());
case SOURCECLUSTER:
return instance.getSourceCluster() == null
- || !instance.getSourceCluster().equalsIgnoreCase(filterValue);
+ || !containsIgnoreCase(filterValue, instance.getSourceCluster());
case STARTEDAFTER:
return instance.getStartTime() == null
- || instance.getStartTime().before(EntityUtil.parseDateUTC(filterValue));
+ || instance.getStartTime().before(getEarliestDate(filterValue));
default:
return true;
@@ -417,6 +496,22 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
return instanceSet;
}
+ private List<InstanceSummary> sortInstanceSummary(List<InstanceSummary> instanceSet,
+ String orderBy, String sortOrder) {
+ final String order = getValidSortOrder(sortOrder, orderBy);
+ if (orderBy.equals("cluster")) {
+ Collections.sort(instanceSet, new Comparator<InstanceSummary>() {
+ @Override
+ public int compare(InstanceSummary i1, InstanceSummary i2) {
+ return (order.equalsIgnoreCase("asc")) ? i1.getCluster().compareTo(i2.getCluster())
+ : i2.getCluster().compareTo(i1.getCluster());
+ }
+ });
+ }//Default : no sort
+
+ return instanceSet;
+ }
+
public FeedInstanceResult getListing(String type, String entity, String startStr,
String endStr, String colo) {
checkColo(colo);
@@ -833,4 +928,26 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
throw new ValidationException("Parameter " + field + " is empty");
}
}
+
+ private boolean containsIgnoreCase(List<String> strList, String str) {
+ for (String s : strList) {
+ if (s.equalsIgnoreCase(str)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private Date getEarliestDate(List<String> dateList) throws FalconException {
+ if (dateList.size() == 1) {
+ return EntityUtil.parseDateUTC(dateList.get(0));
+ }
+ Date earliestDate = EntityUtil.parseDateUTC(dateList.get(0));
+ for (int i = 1; i < dateList.size(); i++) {
+ if (earliestDate.after(EntityUtil.parseDateUTC(dateList.get(i)))) {
+ earliestDate = EntityUtil.parseDateUTC(dateList.get(i));
+ }
+ }
+ return earliestDate;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aabf5e13/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index 95e0e69..e38749a 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -203,10 +203,10 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
HashSet<String> fieldSet = new HashSet<String>(Arrays.asList(fields.toLowerCase().split(",")));
Pair<Date, Date> startAndEndDates = getStartEndDatesForSummary(startDate, endDate);
validateTypeForEntitySummary(type);
- Map<String, String> filterByFieldsValues = getFilterByFieldsValues(filterBy);
+ Map<String, List<String>> filterByFieldsValues = getFilterByFieldsValues(filterBy);
validateEntityFilterByClause(filterByFieldsValues);
if (StringUtils.isNotEmpty(filterTags)) {
- filterByFieldsValues.put(EntityList.EntityFilterByFields.TAGS.name(), filterTags);
+ filterByFieldsValues.put(EntityList.EntityFilterByFields.TAGS.name(), Arrays.asList(filterTags));
}
List<Entity> entities;
http://git-wip-us.apache.org/repos/asf/falcon/blob/aabf5e13/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
index 0d59c22..ac3e5db 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
@@ -180,12 +180,16 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Dimension("start-time") @QueryParam("start") final String startStr,
@Dimension("end-time") @QueryParam("end") final String endStr,
@Dimension("colo") @QueryParam("colo") final String colo,
- @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
+ @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles,
+ @DefaultValue("") @QueryParam("filterBy") final String filterBy,
+ @DefaultValue("") @QueryParam("orderBy") final String orderBy,
+ @DefaultValue("") @QueryParam("sortOrder") final String sortOrder) {
return new InstanceProxy<InstancesSummaryResult>(InstancesSummaryResult.class) {
@Override
protected InstancesSummaryResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("getSummary",
- type, entity, startStr, endStr, colo, lifeCycles);
+ type, entity, startStr, endStr, colo, lifeCycles,
+ filterBy, orderBy, sortOrder);
}
}.execute(colo, type, entity);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aabf5e13/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
index 7249ba4..9c5538b 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
@@ -116,8 +116,12 @@ public class InstanceManager extends AbstractInstanceManager {
@Dimension("start-time") @QueryParam("start") String startStr,
@Dimension("end-time") @QueryParam("end") String endStr,
@Dimension("colo") @QueryParam("colo") String colo,
- @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) {
- return super.getSummary(type, entity, startStr, endStr, colo, lifeCycles);
+ @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles,
+ @DefaultValue("") @QueryParam("filterBy") String filterBy,
+ @DefaultValue("") @QueryParam("orderBy") String orderBy,
+ @DefaultValue("") @QueryParam("sortOrder") String sortOrder) {
+ return super.getSummary(type, entity, startStr, endStr, colo, lifeCycles,
+ filterBy, orderBy, sortOrder);
}
@GET