You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2014/10/15 10:24:56 UTC
git commit: FALCON-762 Support feed listing for file system storage.
Contributed by Srikanth Sundarrajan
Repository: incubator-falcon
Updated Branches:
refs/heads/master 7cfc6002f -> 9f722b3d0
FALCON-762 Support feed listing for file system storage. Contributed by Srikanth Sundarrajan
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/9f722b3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/9f722b3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/9f722b3d
Branch: refs/heads/master
Commit: 9f722b3d01fe118951ed4f7c87b16c495a3fb45e
Parents: 7cfc600
Author: srikanth.sundarrajan <sr...@inmobi.com>
Authored: Wed Oct 15 13:54:33 2014 +0530
Committer: srikanth.sundarrajan <sr...@inmobi.com>
Committed: Wed Oct 15 13:54:33 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../java/org/apache/falcon/cli/FalconCLI.java | 11 +-
.../org/apache/falcon/client/FalconClient.java | 61 ++++++-
.../org/apache/falcon/resource/APIResult.java | 7 +
.../falcon/resource/FeedInstanceResult.java | 128 +++++++++++++++
.../apache/falcon/resource/InstancesResult.java | 28 ++--
.../falcon/resource/InstancesSummaryResult.java | 28 ++--
.../apache/falcon/entity/CatalogStorage.java | 7 +
.../org/apache/falcon/entity/ClusterHelper.java | 7 +
.../org/apache/falcon/entity/FeedHelper.java | 140 +++++++++++++++-
.../falcon/entity/FeedInstanceStatus.java | 137 ++++++++++++++++
.../apache/falcon/entity/FileSystemStorage.java | 90 +++++++++++
.../java/org/apache/falcon/entity/Storage.java | 10 ++
.../falcon/expression/ExpressionHelper.java | 13 ++
.../falcon/entity/FileSystemStorageTest.java | 142 +++++++++++++++++
docs/src/site/twiki/FalconCLI.twiki | 12 ++
.../twiki/restapi/FeedInstanceListing.twiki | 45 ++++++
.../workflow/engine/OozieWorkflowEngine.java | 9 +-
.../falcon/resource/AbstractEntityManager.java | 36 +++++
.../resource/AbstractInstanceManager.java | 32 +++-
.../resource/proxy/InstanceManagerProxy.java | 159 ++++++-------------
.../proxy/SchedulableEntityManagerProxy.java | 34 +---
.../apache/falcon/resource/InstanceManager.java | 15 +-
.../java/org/apache/falcon/cli/FalconCLIIT.java | 5 +
24 files changed, 983 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 46c58fc..71a2278 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -35,6 +35,8 @@ Trunk (Unreleased)
FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
IMPROVEMENTS
+ FALCON-762 Support feed listing for file system storage (Srikanth Sundarrajan)
+
FALCON-20 Remove dependency on custom InMobi DistCp (Sowmya Ramesh via
Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index f7229ec..9cf6339 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -108,6 +108,7 @@ public class FalconCLI {
public static final String CLIENT_PROPERTIES = "/client.properties";
public static final String LIFECYCLE_OPT = "lifecycle";
public static final String PARARMS_OPT = "params";
+ public static final String LISTING_OPT = "listing";
// Graph Commands
public static final String GRAPH_CMD = "graph";
@@ -285,7 +286,9 @@ public class FalconCLI {
filterBy, orderBy, sortOrder, offset, numResults);
} else if (optionsList.contains(PARARMS_OPT)) {
// start time is the nominal time of instance
- result = client.getParamsOfInstance(type, entity, start, colo, clusters, sourceClusters, lifeCycles);
+ result = client.getParamsOfInstance(type, entity, start, colo, lifeCycles);
+ } else if (optionsList.contains(LISTING_OPT)) {
+ result = client.getFeedListing(type, entity, start, end, colo);
} else {
throw new FalconCLIException("Invalid command");
}
@@ -728,6 +731,11 @@ public class FalconCLI {
false,
"Displays the workflow parameters for a given instance of specified nominal time");
+ Option listing = new Option(
+ LISTING_OPT,
+ false,
+ "Displays feed listing and their status between a start and end time range.");
+
OptionGroup group = new OptionGroup();
group.addOption(running);
group.addOption(list);
@@ -741,6 +749,7 @@ public class FalconCLI {
group.addOption(logs);
group.addOption(continues);
group.addOption(params);
+ group.addOption(listing);
Option url = new Option(URL_OPTION, true, "Falcon URL");
Option start = new Option(START_OPT, true,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 7e46f28..576ca09 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -35,6 +35,7 @@ import org.apache.falcon.recipe.RecipeToolArgs;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.EntitySummaryResult;
+import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesSummaryResult;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
@@ -235,7 +236,8 @@ public class FalconClient {
RERUN("api/instance/rerun/", HttpMethod.POST, MediaType.APPLICATION_JSON),
LOG("api/instance/logs/", HttpMethod.GET, MediaType.APPLICATION_JSON),
SUMMARY("api/instance/summary/", HttpMethod.GET, MediaType.APPLICATION_JSON),
- PARAMS("api/instance/params/", HttpMethod.GET, MediaType.APPLICATION_JSON);
+ PARAMS("api/instance/params/", HttpMethod.GET, MediaType.APPLICATION_JSON),
+ LISTING("api/instance/listing/", HttpMethod.GET, MediaType.APPLICATION_JSON);
private String path;
private String method;
@@ -406,6 +408,13 @@ public class FalconClient {
null, null, colo, lifeCycles);
}
+ public String getFeedListing(String type, String entity, String start,
+ String end, String colo)
+ throws FalconCLIException {
+
+ return sendInstanceRequest(Instances.LISTING, type, entity, start, end, null, null, colo, null);
+ }
+
public String killInstances(String type, String entity, String start,
String end, String colo, String clusters,
String sourceClusters, List<LifeCycle> lifeCycles)
@@ -479,7 +488,6 @@ public class FalconClient {
public String getParamsOfInstance(String type, String entity,
String start, String colo,
- String clusters, String sourceClusters,
List<LifeCycle> lifeCycles)
throws FalconCLIException, UnsupportedEncodingException {
@@ -729,14 +737,16 @@ public class FalconClient {
}
checkIfSuccessful(clientResponse);
- if (instances.name().equals("LOG")) {
+ switch (instances) {
+ case LOG:
return parseProcessInstanceResultLogs(clientResponse, runid);
- } else if (instances.name().equals("SUMMARY")) {
+ case SUMMARY:
return summarizeProcessInstanceResult(clientResponse);
- } else {
+ case LISTING:
+ return parseFeedInstanceResult(clientResponse);
+ default:
return parseProcessInstanceResult(clientResponse);
}
-
}
//RESUME CHECKSTYLE CHECK VisibilityModifierCheck
@@ -1006,6 +1016,45 @@ public class FalconClient {
return sb.toString();
}
+ private String parseFeedInstanceResult(ClientResponse clientResponse) {
+ FeedInstanceResult result = clientResponse.getEntity(FeedInstanceResult.class);
+ StringBuilder sb = new StringBuilder();
+ String toAppend;
+
+ sb.append("Consolidated Status: ").append(result.getStatus()).append("\n");
+
+ sb.append("\nInstances:\n");
+ sb.append("Cluster\t\tInstance\t\tStatus\t\tSize\t\tCreationTime\t\tDetails\n");
+ sb.append("-----------------------------------------------------------------------------------------------\n");
+ if (result.getInstances() != null) {
+ for (FeedInstanceResult.Instance instance : result.getInstances()) {
+
+ toAppend = instance.getCluster() != null ? instance.getCluster() : "-";
+ sb.append(toAppend).append("\t");
+
+ toAppend = instance.getInstance() != null ? instance.getInstance() : "-";
+ sb.append(toAppend).append("\t");
+
+ toAppend = instance.getStatus() != null ? instance.getStatus() : "-";
+ sb.append(toAppend).append("\t");
+
+ toAppend = instance.getSize() != -1 ? String.valueOf(instance.getSize()) : "-";
+ sb.append(toAppend).append("\t");
+
+ toAppend = instance.getCreationTime() != 0
+ ? SchemaHelper.formatDateUTC(new Date(instance.getCreationTime())) : "-";
+ sb.append(toAppend).append("\t");
+
+ toAppend = StringUtils.isEmpty(instance.getUri()) ? "-" : instance.getUri();
+ sb.append(toAppend).append("\n");
+ }
+ }
+ sb.append("\nAdditional Information:\n");
+ sb.append("Response: ").append(result.getMessage());
+ sb.append("Request Id: ").append(result.getRequestId());
+ return sb.toString();
+ }
+
protected static enum GraphOperations {
VERTICES("api/graphs/lineage/vertices", HttpMethod.GET, MediaType.APPLICATION_JSON),
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/client/src/main/java/org/apache/falcon/resource/APIResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/APIResult.java b/client/src/main/java/org/apache/falcon/resource/APIResult.java
index 79b8a1d..3b89040 100644
--- a/client/src/main/java/org/apache/falcon/resource/APIResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/APIResult.java
@@ -104,4 +104,11 @@ public class APIResult {
return e.getMessage();
}
}
+
+ public Object[] getCollection() {
+ return null;
+ }
+
+ public void setCollection(Object[] items) {
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/client/src/main/java/org/apache/falcon/resource/FeedInstanceResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/FeedInstanceResult.java b/client/src/main/java/org/apache/falcon/resource/FeedInstanceResult.java
new file mode 100644
index 0000000..1d55e68
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/resource/FeedInstanceResult.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * Pojo for JAXB marshalling / unmarshalling.
+ */
+//SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
+@XmlRootElement
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
+public class FeedInstanceResult extends APIResult {
+
+ @XmlElement
+ private Instance[] instances;
+
+ private FeedInstanceResult() { // for jaxb
+ super();
+ }
+
+ public FeedInstanceResult(String message, Instance[] instances) {
+ this(Status.SUCCEEDED, message, instances);
+ }
+
+ public FeedInstanceResult(Status status, String message,
+ Instance[] inInstances) {
+ super(status, message);
+ this.instances = inInstances;
+ }
+
+ public FeedInstanceResult(Status status, String message) {
+ super(status, message);
+ }
+
+ public Instance[] getInstances() {
+ return instances;
+ }
+
+ public void setInstances(Instance[] instances) {
+ this.instances = instances;
+ }
+
+ /**
+ * A single instance object inside instance result.
+ */
+ @XmlRootElement(name = "instance")
+ public static class Instance {
+ @XmlElement
+ public String cluster;
+
+ @XmlElement
+ public String instance;
+
+ @XmlElement
+ public String status;
+
+ @XmlElement
+ public String uri;
+
+ @XmlElement
+ public long creationTime;
+
+ @XmlElement
+ public long size;
+
+ public Instance() {
+ }
+
+ public Instance(String cluster, String instance, String status) {
+ this.cluster = cluster;
+ this.instance = instance;
+ this.status = status;
+ }
+
+ public String getInstance() {
+ return instance;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ public Long getSize() {
+ return size;
+ }
+
+ @Override
+ public String toString() {
+ return "{instance:"
+ + this.instance
+ + ", status:"
+ + this.status
+ + (this.uri == null ? "" : ", uri: " + this.uri)
+ + (this.cluster == null ? "" : ", cluster:" + this.cluster) + "}";
+ }
+ }
+}
+//RESUME CHECKSTYLE CHECK VisibilityModifierCheck
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
index 5754f97..65355f0 100644
--- a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
@@ -53,16 +53,6 @@ public class InstancesResult extends APIResult {
super();
}
- public InstancesResult(String message, Instance[] instances) {
- this(Status.SUCCEEDED, message, instances);
- }
-
- public InstancesResult(Status status, String message,
- Instance[] instanceExes) {
- super(status, message);
- this.instances = instanceExes;
- }
-
public InstancesResult(Status status, String message) {
super(status, message);
}
@@ -76,6 +66,24 @@ public class InstancesResult extends APIResult {
this.instances = instances;
}
+ @Override
+ public Object[] getCollection() {
+ return getInstances();
+ }
+
+ @Override
+ public void setCollection(Object[] items) {
+ if (items == null) {
+ setInstances(new Instance[0]);
+ } else {
+ Instance[] newInstances = new Instance[items.length];
+ for (int index = 0; index < items.length; index++) {
+ newInstances[index] = (Instance)items[index];
+ }
+ setInstances(newInstances);
+ }
+ }
+
/**
* A single instance object inside instance result.
*/
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java
index 0758c8b..a3dcbe4 100644
--- a/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java
@@ -39,16 +39,6 @@ public class InstancesSummaryResult extends APIResult {
super();
}
- public InstancesSummaryResult(String message, InstanceSummary[] instancesSummary) {
- this(Status.SUCCEEDED, message, instancesSummary);
- }
-
- public InstancesSummaryResult(Status status, String message,
- InstanceSummary[] instancesSummary) {
- super(status, message);
- this.instancesSummary = instancesSummary;
- }
-
public InstancesSummaryResult(Status status, String message) {
super(status, message);
}
@@ -61,6 +51,24 @@ public class InstancesSummaryResult extends APIResult {
this.instancesSummary = instancesSummary;
}
+ @Override
+ public Object[] getCollection() {
+ return getInstancesSummary();
+ }
+
+ @Override
+ public void setCollection(Object[] items) {
+ if (items == null) {
+ setInstancesSummary(new InstanceSummary[0]);
+ } else {
+ InstanceSummary[] newInstances = new InstanceSummary[items.length];
+ for (int index = 0; index < items.length; index++) {
+ newInstances[index] = (InstanceSummary)items[index];
+ }
+ setInstancesSummary(newInstances);
+ }
+ }
+
/**
* A single instance object inside instance result.
*/
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index 7ad0716..dbb0293 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -30,6 +30,7 @@ import org.apache.falcon.entity.v0.feed.LocationType;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -346,6 +347,12 @@ public class CatalogStorage implements Storage {
}
@Override
+ public List<FeedInstanceStatus> getListing(Feed feed, String cluster, LocationType locationType,
+ Date start, Date end) throws FalconException {
+ throw new UnsupportedOperationException("getListing");
+ }
+
+ @Override
public String toString() {
return "CatalogStorage{"
+ "catalogUrl='" + catalogUrl + '\''
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index 6945cea..52b570d 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -18,6 +18,9 @@
package org.apache.falcon.entity;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Interface;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
@@ -40,6 +43,10 @@ public final class ClusterHelper {
private ClusterHelper() {
}
+ public static Cluster getCluster(String cluster) throws FalconException {
+ return ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+ }
+
public static Configuration getConfiguration(Cluster cluster) {
Configuration conf = new Configuration();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 4174135..4532669 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -22,28 +22,44 @@ import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
+import org.apache.falcon.entity.common.FeedDataPath;
+import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.util.BuildProperties;
+import org.apache.hadoop.fs.Path;
import java.net.URISyntaxException;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.Arrays;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
/**
* Feed entity helper methods.
*/
public final class FeedHelper {
+ private static final String FORMAT = "yyyyMMddHHmm";
+
private FeedHelper() {}
public static Cluster getCluster(Feed feed, String clusterName) {
@@ -227,7 +243,7 @@ public final class FeedHelper {
return normalizePartitionExpression(partition, null);
}
- private static Properties loadClusterProperties(org.apache.falcon.entity.v0.cluster.Cluster cluster) {
+ public static Properties getClusterProperties(org.apache.falcon.entity.v0.cluster.Cluster cluster) {
Properties properties = new Properties();
Map<String, String> clusterVars = new HashMap<String, String>();
clusterVars.put("colo", cluster.getColo());
@@ -244,7 +260,7 @@ public final class FeedHelper {
public static String evaluateClusterExp(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, String exp)
throws FalconException {
- Properties properties = loadClusterProperties(clusterEntity);
+ Properties properties = getClusterProperties(clusterEntity);
ExpressionHelper expHelp = ExpressionHelper.get();
expHelp.setPropertiesForVariable(properties);
return expHelp.evaluateFullExpression(exp, String.class);
@@ -290,4 +306,124 @@ public final class FeedHelper {
props.put("userWorkflowVersion", version);
return props;
}
+
+ public static Properties getFeedProperties(Feed feed) {
+ Properties feedProperties = new Properties();
+ if (feed.getProperties() != null) {
+ for (org.apache.falcon.entity.v0.feed.Property property : feed.getProperties().getProperties()) {
+ feedProperties.put(property.getName(), property.getValue());
+ }
+ }
+ return feedProperties;
+ }
+
+ /**
+ * Replaces timed variables with corresponding time notations e.g., ${YEAR} with yyyy and so on.
+ * @param templatePath - template feed path
+ * @return time notations
+ */
+ public static String getDateFormatInPath(String templatePath) {
+ String mask = extractDatePartFromPathMask(templatePath, templatePath);
+ //yyyyMMddHHmm
+ return mask.replaceAll(FeedDataPath.VARS.YEAR.regex(), "yyyy")
+ .replaceAll(FeedDataPath.VARS.MONTH.regex(), "MM")
+ .replaceAll(FeedDataPath.VARS.DAY.regex(), "dd")
+ .replaceAll(FeedDataPath.VARS.HOUR.regex(), "HH")
+ .replaceAll(FeedDataPath.VARS.MINUTE.regex(), "mm");
+ }
+
+ /**
+ * Extracts the date part of the path and builds a date format mask.
+ * @param mask - Path pattern containing ${YEAR}, ${MONTH}...
+ * @param inPath - Path from which date part need to be extracted
+ * @return - Parts of inPath with non-date-part stripped out.
+ *
+ * Example: extractDatePartFromPathMask("/data/foo/${YEAR}/${MONTH}", "/data/foo/2012/${MONTH}");
+ * Returns: 2012${MONTH}.
+ */
+ private static String extractDatePartFromPathMask(String mask, String inPath) {
+ String[] elements = FeedDataPath.PATTERN.split(mask);
+
+ String out = inPath;
+ for (String element : elements) {
+ out = out.replaceFirst(element, "");
+ }
+ return out;
+ }
+
+ private static Map<FeedDataPath.VARS, String> getDatePartMap(String path, String mask) {
+ Map<FeedDataPath.VARS, String> map = new TreeMap<FeedDataPath.VARS, String>();
+ Matcher matcher = FeedDataPath.DATE_FIELD_PATTERN.matcher(mask);
+ int start = 0;
+ while (matcher.find(start)) {
+ String subMask = mask.substring(matcher.start(), matcher.end());
+ String subPath = path.substring(matcher.start(), matcher.end());
+ FeedDataPath.VARS var = FeedDataPath.VARS.from(subMask);
+ if (!map.containsKey(var)) {
+ map.put(var, subPath);
+ }
+ start = matcher.start() + 1;
+ }
+ return map;
+ }
+
+ /**
+ * Extracts date from the actual data path e.g., /path/2014/05/06 maps to 2014-05-06T00:00Z.
+ * @param file - actual data path
+ * @param templatePath - template path from feed definition
+ * @param dateMask - path mask from getDateFormatInPath()
+ * @param timeZone
+ * @return date corresponding to the path
+ */
+ //consider just the first occurrence of the pattern
+ public static Date getDate(Path file, String templatePath, String dateMask, String timeZone) {
+ String path = extractDatePartFromPathMask(templatePath, file.toString());
+ Map<FeedDataPath.VARS, String> map = getDatePartMap(path, dateMask);
+
+ if (map.isEmpty()) {
+ return null;
+ }
+
+ StringBuilder date = new StringBuilder();
+ int ordinal = 0;
+ for (Map.Entry<FeedDataPath.VARS, String> entry : map.entrySet()) {
+ if (ordinal++ == entry.getKey().ordinal()) {
+ date.append(entry.getValue());
+ } else {
+ return null;
+ }
+ }
+
+ try {
+ DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, date.length()));
+ dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+ return dateFormat.parse(date.toString());
+ } catch (ParseException e) {
+ return null;
+ }
+ }
+
+ public static FeedInstanceResult getFeedInstanceListing(Entity entityObject,
+ Date start, Date end) throws FalconException {
+ Set<String> clusters = EntityUtil.getClustersDefinedInColos(entityObject);
+ FeedInstanceResult result = new FeedInstanceResult(APIResult.Status.SUCCEEDED, "Success");
+ for (String cluster : clusters) {
+ Feed feed = (Feed) entityObject;
+ Storage storage = createStorage(cluster, feed);
+ List<FeedInstanceStatus> feedListing = storage.getListing(feed, cluster, LocationType.DATA, start, end);
+ FeedInstanceResult.Instance[] instances = new FeedInstanceResult.Instance[feedListing.size()];
+ int index = 0;
+ for (FeedInstanceStatus feedStatus : feedListing) {
+ FeedInstanceResult.Instance instance = new
+ FeedInstanceResult.Instance(cluster, feedStatus.getInstance(),
+ feedStatus.getStatus().name());
+ instance.creationTime = feedStatus.getCreationTime();
+ instance.uri = feedStatus.getUri();
+ instance.size = feedStatus.getSize();
+ instances[index++] = instance;
+ }
+ result.setInstances(instances);
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/common/src/main/java/org/apache/falcon/entity/FeedInstanceStatus.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedInstanceStatus.java b/common/src/main/java/org/apache/falcon/entity/FeedInstanceStatus.java
new file mode 100644
index 0000000..ff06554
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/FeedInstanceStatus.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.entity;
+
+/**
+ * Feed Instance Status is used to provide feed instance listing and corresponding status.
+ *
+ * This is used for exchanging information for getListing api
+ */
+public class FeedInstanceStatus {
+
+ private String instance;
+
+ private final String uri;
+
+ private long creationTime;
+
+ private long size = -1;
+
+ private AvailabilityStatus status = AvailabilityStatus.MISSING;
+
+ /**
+ * Availability status of a feed instance.
+ *
+ * Missing if the feed partition is entirely missing,
+ * Available if present and the availability flag is also present
+ * Availability flag is configured in feed definition, but availability flag is missing in data path
+ * Empty if the empty
+ */
+ public enum AvailabilityStatus {MISSING, AVAILABLE, PARTIAL, EMPTY}
+
+ public FeedInstanceStatus(String uri) {
+ this.uri = uri;
+ }
+
+ public String getInstance() {
+ return instance;
+ }
+
+ public void setInstance(String instance) {
+ this.instance = instance;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ public void setCreationTime(long creationTime) {
+ this.creationTime = creationTime;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public void setSize(long size) {
+ this.size = size;
+ }
+
+ public AvailabilityStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(AvailabilityStatus status) {
+ this.status = status;
+ }
+
+ @Override
+ public String toString() {
+ return "FeedInstanceStatus{"
+ + "instance='" + instance + '\''
+ + ", uri='" + uri + '\''
+ + ", creationTime=" + creationTime
+ + ", size=" + size
+ + ", status='" + status + '\''
+ + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FeedInstanceStatus that = (FeedInstanceStatus) o;
+
+ if (creationTime != that.creationTime) {
+ return false;
+ }
+ if (size != that.size) {
+ return false;
+ }
+ if (!instance.equals(that.instance)) {
+ return false;
+ }
+ if (status != that.status) {
+ return false;
+ }
+ if (uri != null ? !uri.equals(that.uri) : that.uri != null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = instance.hashCode();
+ result = 31 * result + (uri != null ? uri.hashCode() : 0);
+ result = 31 * result + (int) (creationTime ^ (creationTime >>> 32));
+ result = 31 * result + (int) (size ^ (size >>> 32));
+ result = 31 * result + (status != null ? status.hashCode() : 0);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 58506ad..012a6e7 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -22,13 +22,17 @@ import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.common.FeedDataPath;
import org.apache.falcon.entity.v0.AccessControlList;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.security.CurrentUser;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,8 +43,12 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
import java.util.List;
+import java.util.Properties;
import java.util.Set;
+import java.util.TimeZone;
import java.util.regex.Matcher;
/**
@@ -241,6 +249,16 @@ public class FileSystemStorage implements Storage {
return null;
}
+ public static Properties getFeedProperties(Feed feed) {
+ Properties feedProperties = new Properties();
+ if (feed.getProperties() != null) {
+ for (org.apache.falcon.entity.v0.feed.Property property : feed.getProperties().getProperties()) {
+ feedProperties.put(property.getName(), property.getValue());
+ }
+ }
+ return feedProperties;
+ }
+
@Override
public void validateACL(AccessControlList acl) throws FalconException {
try {
@@ -272,6 +290,78 @@ public class FileSystemStorage implements Storage {
}
}
+ @Override
+ @SuppressWarnings("MagicConstant")
+ public List<FeedInstanceStatus> getListing(Feed feed, String clusterName, LocationType locationType,
+ Date start, Date end) throws FalconException {
+
+ Calendar calendar = Calendar.getInstance();
+ List<Location> clusterSpecificLocation = FeedHelper.
+ getLocations(FeedHelper.getCluster(feed, clusterName), feed);
+ Location location = getLocation(clusterSpecificLocation, locationType);
+ try {
+ FileSystem fileSystem = HadoopClientFactory.get().createProxiedFileSystem(getConf());
+ Cluster cluster = ClusterHelper.getCluster(clusterName);
+ Properties baseProperties = FeedHelper.getClusterProperties(cluster);
+ baseProperties.putAll(FeedHelper.getFeedProperties(feed));
+ List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>();
+ Date feedStart = FeedHelper.getCluster(feed, clusterName).getValidity().getStart();
+ TimeZone tz = feed.getTimezone();
+ Date alignedStart = EntityUtil.getNextStartTime(feedStart, feed.getFrequency(), tz, start);
+
+ String basePath = location.getPath();
+ while (!end.before(alignedStart)) {
+ Properties allProperties = ExpressionHelper.getTimeVariables(alignedStart, tz);
+ allProperties.putAll(baseProperties);
+ String feedInstancePath = ExpressionHelper.substitute(basePath, allProperties);
+ FileStatus fileStatus = getFileStatus(fileSystem, new Path(feedInstancePath));
+ FeedInstanceStatus instance = new FeedInstanceStatus(feedInstancePath);
+ String dateMask = FeedHelper.getDateFormatInPath(basePath);
+ Date date = FeedHelper.getDate(new Path(feedInstancePath), basePath, dateMask, tz.getID());
+ instance.setInstance(SchemaHelper.formatDateUTC(date));
+ if (fileStatus != null) {
+ instance.setCreationTime(fileStatus.getModificationTime());
+ ContentSummary contentSummary = fileSystem.getContentSummary(fileStatus.getPath());
+ if (contentSummary != null) {
+ long size = contentSummary.getSpaceConsumed();
+ instance.setSize(size);
+ if (!StringUtils.isEmpty(feed.getAvailabilityFlag())) {
+ FileStatus doneFile = getFileStatus(fileSystem,
+ new Path(fileStatus.getPath(), feed.getAvailabilityFlag()));
+ if (doneFile != null) {
+ instance.setStatus(FeedInstanceStatus.AvailabilityStatus.AVAILABLE);
+ } else {
+ instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL);
+ }
+ } else {
+ instance.setStatus(size > 0 ? FeedInstanceStatus.AvailabilityStatus.AVAILABLE
+ : FeedInstanceStatus.AvailabilityStatus.EMPTY);
+ }
+ }
+ }
+ instances.add(instance);
+ calendar.setTime(alignedStart);
+ calendar.add(feed.getFrequency().getTimeUnit().getCalendarUnit(),
+ feed.getFrequency().getFrequencyAsInt());
+ alignedStart = calendar.getTime();
+ }
+ return instances;
+ } catch (IOException e) {
+ LOG.error("Unable to retrieve listing for {}:{}", locationType, getStorageUrl(), e);
+ throw new FalconException("Unable to retrieve listing for (URI " + getStorageUrl() + ")", e);
+ }
+ }
+
+ public FileStatus getFileStatus(FileSystem fileSystem, Path feedInstancePath) {
+ FileStatus fileStatus = null;
+ try {
+ fileStatus = fileSystem.getFileStatus(feedInstancePath);
+ } catch (IOException ignore) {
+ //ignore
+ }
+ return fileStatus;
+ }
+
private Configuration getConf() {
Configuration conf = new Configuration();
conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/common/src/main/java/org/apache/falcon/entity/Storage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java
index f88e139..af3040b 100644
--- a/common/src/main/java/org/apache/falcon/entity/Storage.java
+++ b/common/src/main/java/org/apache/falcon/entity/Storage.java
@@ -20,8 +20,12 @@ package org.apache.falcon.entity;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.AccessControlList;
+import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
+import java.util.Date;
+import java.util.List;
+
/**
* A class to encapsulate the storage for a given feed which can either be
* expressed as a path on the file system or a table in a catalog.
@@ -81,4 +85,10 @@ public interface Storage {
* @throws FalconException if the permissions are not valid.
*/
void validateACL(AccessControlList acl) throws FalconException;
+
+ /**
+ * Get Feed Listing for a feed between a date range.
+ */
+ List<FeedInstanceStatus> getListing(Feed feed, String cluster, LocationType locationType,
+ Date start, Date end) throws FalconException;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
index 79d6e2d..e04f046 100644
--- a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
+++ b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
@@ -20,6 +20,7 @@ package org.apache.falcon.expression;
import org.apache.commons.el.ExpressionEvaluatorImpl;
import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.common.FeedDataPath;
import javax.servlet.jsp.el.ELException;
import javax.servlet.jsp.el.ExpressionEvaluator;
@@ -97,6 +98,18 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver
referenceDate.set(date);
}
+ public static Properties getTimeVariables(Date date, TimeZone tz) {
+ Properties vars = new Properties();
+ Calendar cal = Calendar.getInstance(tz);
+ cal.setTime(date);
+ vars.put(FeedDataPath.VARS.YEAR.name(), String.format("%04d", cal.get(Calendar.YEAR)));
+ vars.put(FeedDataPath.VARS.MONTH.name(), String.format("%02d", (cal.get(Calendar.MONTH) + 1)));
+ vars.put(FeedDataPath.VARS.DAY.name(), String.format("%02d", cal.get(Calendar.DAY_OF_MONTH)));
+ vars.put(FeedDataPath.VARS.HOUR.name(), String.format("%02d", cal.get(Calendar.HOUR_OF_DAY)));
+ vars.put(FeedDataPath.VARS.MINUTE.name(), String.format("%02d", cal.get(Calendar.MINUTE)));
+ return vars;
+ }
+
private static int getDayOffset(String weekDayName) {
int day;
Calendar nominalTime = Calendar.getInstance();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index b97564d..1667161 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -20,11 +20,22 @@ package org.apache.falcon.entity;
import org.apache.falcon.FalconException;
import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.AccessControlList;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.Validity;
+import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.security.CurrentUser;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
@@ -32,8 +43,15 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.TimeZone;
/**
* Test class for File System Storage.
@@ -389,4 +407,128 @@ public class FileSystemStorageTest {
return permission;
}
}
+
+ @DataProvider(name = "testListingDataProvider")
+ private Object[][] createTestListingData() {
+ final long millis = 24L * 3600 * 1000;
+ final long now = System.currentTimeMillis();
+ TimeZone utc = TimeZone.getTimeZone("UTC");
+ return new Object[][] {
+ {null, Frequency.fromString("hours(2)"), utc, new Date(now - 60 * millis), new Date(now - 56 * millis)},
+ {null, Frequency.fromString("days(1)"), utc, new Date(now - 20 * millis), new Date(now + 6 * millis)},
+ {null, Frequency.fromString("months(1)"), utc, new Date(now - 85 * millis), new Date(now - 10 * millis)},
+ };
+ }
+
+ @Test (dataProvider = "testListingDataProvider")
+ public void testListing(String availabilityFlag, Frequency frequency, TimeZone timeZone,
+ Date start, Date end) throws Exception {
+ EmbeddedCluster cluster = EmbeddedCluster.newCluster("TestFeedListing", false);
+ FileSystem fs = cluster.getFileSystem();
+ ConfigurationStore.get().publish(EntityType.CLUSTER, cluster.getCluster());
+ try {
+ Feed feed = getFeed(availabilityFlag, frequency, timeZone);
+ List<FeedInstanceStatus> expected = prepareData(fs, feed, start, end);
+ FileSystemStorage fileSystemStorage = new FileSystemStorage(cluster.getFileSystem().
+ getUri().toString(), feed.getLocations());
+ List<FeedInstanceStatus> actual = fileSystemStorage.
+ getListing(feed, "TestFeedListing", LocationType.DATA, start, end);
+ Assert.assertEquals(actual, expected, "Feed instance Listings doesn't match");
+ } finally {
+ ConfigurationStore.get().remove(EntityType.CLUSTER, cluster.getCluster().getName());
+ }
+ }
+
+ @SuppressWarnings("MagicConstant")
+ private List<FeedInstanceStatus> prepareData(FileSystem fs, Feed feed,
+ Date start, Date end) throws Exception {
+ fs.delete(new Path("/TestFeedListing"), true);
+ Random random = new Random();
+ List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>();
+ String basePath = feed.getLocations().getLocations().get(0).getPath();
+ Frequency frequency = feed.getFrequency();
+ TimeZone tz = feed.getTimezone();
+ Date dataStart = EntityUtil.getNextStartTime(feed.getClusters().getClusters().get(0).getValidity().getStart(),
+ feed.getFrequency(), tz, new Date(start.getTime()));
+ Date dataEnd = new Date(end.getTime());
+ while (dataStart.before(dataEnd)) {
+ Properties properties = ExpressionHelper.getTimeVariables(dataStart, tz);
+ String path = ExpressionHelper.substitute(basePath, properties);
+ FeedInstanceStatus instance = new FeedInstanceStatus(path);
+ instance.setStatus(FeedInstanceStatus.AvailabilityStatus.MISSING);
+ instance.setSize(-1);
+ instance.setCreationTime(0);
+ String dateMask = FeedHelper.getDateFormatInPath(basePath);
+ Date date = FeedHelper.getDate(new Path(path), basePath, dateMask, tz.getID());
+ instance.setInstance(SchemaHelper.formatDateUTC(date));
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(dataStart);
+ cal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequencyAsInt());
+ dataStart.setTime(cal.getTimeInMillis());
+ if (random.nextBoolean()) {
+ OutputStream out = fs.create(new Path(path, "file"));
+ out.write("Hello World\n".getBytes());
+ out.close();
+ instance.setSize(12);
+ if (feed.getAvailabilityFlag() == null
+ || (feed.getAvailabilityFlag() != null && random.nextBoolean())) {
+ //If availability is not present or if ok to create availability file, mark as available
+ instance.setStatus(FeedInstanceStatus.AvailabilityStatus.AVAILABLE);
+ if (feed.getAvailabilityFlag() != null) {
+ fs.create(new Path(path, feed.getAvailabilityFlag())).close();
+ }
+ } else if (feed.getAvailabilityFlag() != null) {
+ //If availability is present or not ok to create availability file, mark as partial
+ fs.mkdirs(new Path(path));
+ instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL);
+ }
+ } else {
+ if (feed.getAvailabilityFlag() == null && random.nextBoolean()) {
+ //If availability is not present or ok to create dir, mark as empty
+ fs.mkdirs(new Path(path));
+ instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY);
+ instance.setSize(0);
+ } else if (feed.getAvailabilityFlag() != null && random.nextBoolean()) {
+ //If availability is present and ok to create dir, mark as partial
+ fs.mkdirs(new Path(path));
+ instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL);
+ } else if (feed.getAvailabilityFlag() != null) {
+ //If availability is present and ok to create empty instance
+ fs.create(new Path(path, feed.getAvailabilityFlag())).close();
+ instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY);
+ instance.setSize(0);
+ }
+ }
+ try {
+ FileStatus fileStatus = fs.getFileStatus(new Path(path));
+ instance.setCreationTime(fileStatus.getModificationTime());
+ } catch (IOException e) {
+ //ignore
+ }
+ instances.add(instance);
+ }
+ return instances;
+ }
+
+ private Feed getFeed(String availabilityFlag, Frequency frequency, TimeZone timeZone) {
+ Feed feed = new Feed();
+ feed.setAvailabilityFlag(availabilityFlag);
+ feed.setFrequency(frequency);
+ feed.setTimezone(timeZone);
+ feed.setLocations(new Locations());
+ Location dataLocation = new Location();
+ feed.getLocations().getLocations().add(dataLocation);
+ dataLocation.setPath("/TestFeedListing/data/${YEAR}/${MONTH}/${DAY}"
+ + (frequency.getTimeUnit() == Frequency.TimeUnit.hours ? "/${HOUR}" : "") + "/MORE");
+ dataLocation.setType(LocationType.DATA);
+ feed.setClusters(new Clusters());
+ Cluster cluster = new Cluster();
+ cluster.setName("TestFeedListing");
+ feed.getClusters().getClusters().add(cluster);
+ Validity validity = new Validity();
+ cluster.setValidity(validity);
+ validity.setStart(new Date(System.currentTimeMillis() - (1000L * 24 * 3600000)));
+ validity.setEnd(new Date(System.currentTimeMillis() + (1000L * 24 * 3600000)));
+ return feed;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index 1d536a1..b8d36bb 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -214,6 +214,18 @@ Optional Args : -colo <<colo>> -lifecycle <<lifecycles>>
<a href="./Restapi/InstanceRunning.html">Optional params described here.</a>
+---+++FeedInstanceListing
+
+Get falcon feed instance availability.
+
+Usage:
+$FALCON_HOME/bin/falcon instance -entity feed -name <<name>> -listing
+
+Optional Args : -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'"
+-colo <<colo>>
+
+<a href="./Restapi/FeedInstanceListing.html">Optional params described here.</a>
+
---+++Logs
Get logs for instance actions
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/docs/src/site/twiki/restapi/FeedInstanceListing.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/FeedInstanceListing.twiki b/docs/src/site/twiki/restapi/FeedInstanceListing.twiki
new file mode 100644
index 0000000..a3e306d
--- /dev/null
+++ b/docs/src/site/twiki/restapi/FeedInstanceListing.twiki
@@ -0,0 +1,45 @@
+---++ GET /api/instance/listing/feed/:entity-name
+ * <a href="#Description">Description</a>
+ * <a href="#Parameters">Parameters</a>
+ * <a href="#Results">Results</a>
+ * <a href="#Examples">Examples</a>
+
+---++ Description
+Get falcon feed instance availability.
+
+---++ Parameters
+ * :entity-name Name of the entity.
+ * start <optional param> Show instances from this date. Date format is yyyy-MM-dd'T'HH:mm'Z'.
+ * By default, it is set to (end - (10 * entityFrequency)).
+ * end <optional param> Show instances up to this date. Date format is yyyy-MM-dd'T'HH:mm'Z'.
+ * Default is set to now.
+ * colo <optional param> Colo on which the query should be run.
+
+---++ Results
+Feed instance availability status
+
+---++ Examples
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/instance/listing/feed/SampleFeed?colo=*&start=2012-04-03T07:00Z
+</verbatim>
+---+++ Result
+<verbatim>
+{
+ "instances": [
+ {
+ "size": "450231212222",
+ "creationTime": "1236679827365",
+ "cluster": "primary-cluster",
+ "uri": "/data/SampleFeed/2012-04-03",
+ "status": "AVAILABLE",
+ "instance": "2012-04-03T07:00Z"
+ }
+ ],
+ "requestId": "default\/3527038e-8334-4e50-8173-76c4fa430d0b\n",
+ "message": "default\/STATUS\n",
+ "status": "SUCCEEDED"
+}
+</verbatim>
+
+
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index fd8c63f..d5432eb 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -454,7 +454,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
}
- return new InstancesResult("Running Instances", runInstances.toArray(new Instance[runInstances.size()]));
+ InstancesResult result = new InstancesResult(APIResult.Status.SUCCEEDED, "Running Instances");
+ result.setInstances(runInstances.toArray(new Instance[runInstances.size()]));
+ return result;
} catch (OozieClientException e) {
throw new FalconException(e);
@@ -1440,7 +1442,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
instance.cluster = cluster;
instances[0] = instance;
- return new InstancesResult("Instance for workflow id:" + jobId, instances);
+ InstancesResult result = new InstancesResult(APIResult.Status.SUCCEEDED,
+ "Instance for workflow id:" + jobId);
+ result.setInstances(instances);
+ return result;
} catch (Exception e) {
throw new FalconException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 41cd601..e50da2d 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -50,6 +50,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.reflect.Constructor;
import java.util.*;
/**
@@ -807,4 +808,39 @@ public abstract class AbstractEntityManager {
protected AbstractWorkflowEngine getWorkflowEngine() {
return this.workflowEngine;
}
+
+ protected <T extends APIResult> T consolidateResult(Map<String, T> results, Class<T> clazz) {
+ if (results == null || results.isEmpty()) {
+ return null;
+ }
+
+ StringBuilder message = new StringBuilder();
+ StringBuilder requestIds = new StringBuilder();
+ List instances = new ArrayList();
+ int statusCount = 0;
+ for (Map.Entry<String, T> entry : results.entrySet()) {
+ String colo = entry.getKey();
+ T result = results.get(colo);
+ message.append(colo).append('/').append(result.getMessage()).append('\n');
+ requestIds.append(colo).append('/').append(result.getRequestId()).append('\n');
+ statusCount += result.getStatus().ordinal();
+
+ if (result.getCollection() == null) {
+ continue;
+ }
+ Collections.addAll(instances, result.getCollection());
+ }
+ Object[] arrInstances = instances.toArray();
+ APIResult.Status status = (statusCount == 0) ? APIResult.Status.SUCCEEDED
+ : ((statusCount == results.size() * 2) ? APIResult.Status.FAILED : APIResult.Status.PARTIAL);
+ try {
+ Constructor<T> constructor = clazz.getConstructor(Status.class, String.class);
+ T result = constructor.newInstance(status, message.toString());
+ result.setCollection(arrInstances);
+ result.setRequestId(requestIds.toString());
+ return result;
+ } catch (Exception e) {
+ throw new FalconRuntimException("Unable to consolidate result.", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index 2070713..9df6a2b 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -21,6 +21,7 @@ package org.apache.falcon.resource;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.*;
import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
@@ -49,7 +50,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
protected static final long DAY_IN_MILLIS = 86400000L;
private static final long MONTH_IN_MILLIS = 2592000000L;
- protected void checkType(String type) {
+ protected EntityType checkType(String type) {
if (StringUtils.isEmpty(type)) {
throw FalconWebException.newInstanceException("entity type is empty",
Response.Status.BAD_REQUEST);
@@ -60,9 +61,11 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
"Instance management functions don't apply to Cluster entities",
Response.Status.BAD_REQUEST);
}
+ return entityType;
}
}
+
protected List<LifeCycle> checkAndUpdateLifeCycle(List<LifeCycle> lifeCycleValues,
String type) throws FalconException {
EntityType entityType = EntityType.valueOf(type.toUpperCase().trim());
@@ -190,14 +193,16 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
instanceSet = filteredInstanceSet(resultSet, instanceSet, getFilterByFieldsValues(filterBy));
int pageCount = super.getRequiredNumberOfResults(instanceSet.size(), offset, numResults);
+ InstancesResult result = new InstancesResult(resultSet.getStatus(), resultSet.getMessage());
if (pageCount == 0) {
// return empty result set
- return new InstancesResult(resultSet.getMessage(), new Instance[0]);
+ result.setInstances(new Instance[0]);
+ return result;
}
// Sort the ArrayList using orderBy
instanceSet = sortInstances(instanceSet, orderBy, sortOrder);
- return new InstancesResult(resultSet.getMessage(),
- instanceSet.subList(offset, (offset+pageCount)).toArray(new Instance[pageCount]));
+ result.setCollection(instanceSet.subList(offset, (offset+pageCount)).toArray(new Instance[pageCount]));
+ return result;
}
private ArrayList<Instance> filteredInstanceSet(InstancesResult resultSet, ArrayList<Instance> instanceSet,
@@ -310,6 +315,25 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
//RESUME CHECKSTYLE CHECK ParameterNumberCheck
+ public FeedInstanceResult getListing(String type, String entity, String startStr,
+ String endStr, String colo) {
+ checkColo(colo);
+ EntityType entityType = checkType(type);
+ try {
+ if (entityType != EntityType.FEED) {
+ throw new IllegalArgumentException("getLocation is not applicable for " + type);
+ }
+ validateParams(type, entity);
+ Entity entityObject = EntityUtil.getEntity(type, entity);
+ Pair<Date, Date> startAndEndDate = getStartAndEndDate(entityObject, startStr, endStr);
+
+ return FeedHelper.getFeedInstanceListing(entityObject, startAndEndDate.first, startAndEndDate.second);
+ } catch (Throwable e) {
+ LOG.error("Failed to get instances listing", e);
+ throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST);
+ }
+ }
+
public InstancesResult getInstanceParams(String type,
String entity, String startTime,
String colo, List<LifeCycle> lifeCycles) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
index d172c3e..e6cf904 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
@@ -26,10 +26,9 @@ import org.apache.falcon.monitors.Dimension;
import org.apache.falcon.monitors.Monitored;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.AbstractInstanceManager;
+import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.resource.InstancesResult;
-import org.apache.falcon.resource.InstancesResult.Instance;
import org.apache.falcon.resource.InstancesSummaryResult;
-import org.apache.falcon.resource.InstancesSummaryResult.InstanceSummary;
import org.apache.falcon.resource.channel.Channel;
import org.apache.falcon.resource.channel.ChannelFactory;
@@ -38,6 +37,7 @@ import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import java.lang.reflect.Constructor;
import java.util.*;
/**
@@ -87,7 +87,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@DefaultValue("") @QueryParam("sortOrder") final String sortOrder,
@DefaultValue("0") @QueryParam("offset") final Integer offset,
@DefaultValue(DEFAULT_NUM_RESULTS) @QueryParam("numResults") final Integer resultsPerPage) {
- return new InstanceProxy() {
+ return new InstanceProxy<InstancesResult>(InstancesResult.class) {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).
@@ -118,7 +118,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@DefaultValue("") @QueryParam("sortOrder") final String sortOrder,
@DefaultValue("0") @QueryParam("offset") final Integer offset,
@DefaultValue(DEFAULT_NUM_RESULTS) @QueryParam("numResults") final Integer resultsPerPage) {
- return new InstanceProxy() {
+ return new InstanceProxy<InstancesResult>(InstancesResult.class) {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("getInstances",
@@ -145,7 +145,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@DefaultValue("") @QueryParam("sortOrder") final String sortOrder,
@DefaultValue("0") @QueryParam("offset") final Integer offset,
@DefaultValue(DEFAULT_NUM_RESULTS) @QueryParam("numResults") final Integer resultsPerPage) {
- return new InstanceProxy() {
+ return new InstanceProxy<InstancesResult>(InstancesResult.class) {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("getStatus",
@@ -167,7 +167,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Dimension("end-time") @QueryParam("end") final String endStr,
@Dimension("colo") @QueryParam("colo") final String colo,
@Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
- return new InstanceSummaryProxy() {
+ return new InstanceProxy<InstancesSummaryResult>(InstancesSummaryResult.class) {
@Override
protected InstancesSummaryResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("getSummary",
@@ -177,6 +177,26 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
}
@GET
+ @Path("listing/{type}/{entity}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Monitored(event = "instance-listing")
+ @Override
+ public FeedInstanceResult getListing(
+ @Dimension("type") @PathParam("type") final String type,
+ @Dimension("entity") @PathParam("entity") final String entity,
+ @Dimension("start-time") @QueryParam("start") final String start,
+ @Dimension("end-time") @QueryParam("end") final String end,
+ @Dimension("colo") @QueryParam("colo") String colo) {
+ return new InstanceProxy<FeedInstanceResult>(FeedInstanceResult.class) {
+ @Override
+ protected FeedInstanceResult doExecute(String colo) throws FalconException {
+ return getInstanceManager(colo).invoke("getListing",
+ type, entity, start, end, colo);
+ }
+ }.execute(colo, type, entity);
+ }
+
+ @GET
@Path("params/{type}/{entity}")
@Produces(MediaType.APPLICATION_JSON)
@Monitored(event = "instance-params")
@@ -187,7 +207,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Dimension("start-time") @QueryParam("start") final String start,
@Dimension("colo") @QueryParam("colo") String colo,
@Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
- return new InstanceProxy() {
+ return new InstanceProxy<InstancesResult>(InstancesResult.class) {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("getInstanceParams",
@@ -196,7 +216,6 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
}.execute(colo, type, entity);
}
-
@GET
@Path("logs/{type}/{entity}")
@Produces(MediaType.APPLICATION_JSON)
@@ -215,7 +234,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@DefaultValue("") @QueryParam("sortOrder") final String sortOrder,
@DefaultValue("0") @QueryParam("offset") final Integer offset,
@DefaultValue(DEFAULT_NUM_RESULTS) @QueryParam("numResults") final Integer resultsPerPage) {
- return new InstanceProxy() {
+ return new InstanceProxy<InstancesResult>(InstancesResult.class) {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("getLogs",
@@ -240,7 +259,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
- return new InstanceProxy() {
+ return new InstanceProxy<InstancesResult>(InstancesResult.class) {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("killInstance",
@@ -263,7 +282,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Dimension("colo") @QueryParam("colo") String colo,
@Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
- return new InstanceProxy() {
+ return new InstanceProxy<InstancesResult>(InstancesResult.class) {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("suspendInstance",
@@ -287,7 +306,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
- return new InstanceProxy() {
+ return new InstanceProxy<InstancesResult>(InstancesResult.class) {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("resumeInstance",
@@ -311,7 +330,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
- return new InstanceProxy() {
+ return new InstanceProxy<InstancesResult>(InstancesResult.class) {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("reRunInstance",
@@ -321,50 +340,28 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
}
//RESUME CHECKSTYLE CHECK ParameterNumberCheck
- private abstract class InstanceProxy {
+ private abstract class InstanceProxy<T extends APIResult> {
- public InstancesResult execute(String coloExpr, String type, String name) {
- Set<String> colos = getColosFromExpression(coloExpr, type, name);
+ private final Class<T> clazz;
- Map<String, InstancesResult> results = new HashMap<String, InstancesResult>();
- for (String colo : colos) {
- try {
- InstancesResult resultHolder = doExecute(colo);
- results.put(colo, resultHolder);
- } catch (FalconException e) {
- results.put(colo, new InstancesResult(APIResult.Status.FAILED,
- e.getClass().getName() + "::" + e.getMessage(),
- new InstancesResult.Instance[0]));
- }
- }
- InstancesResult finalResult = consolidateInstanceResult(results);
- if (finalResult.getStatus() != APIResult.Status.SUCCEEDED) {
- throw FalconWebException.newException(finalResult, Response.Status.BAD_REQUEST);
- } else {
- return finalResult;
- }
+ public InstanceProxy(Class<T> resultClazz) {
+ this.clazz = resultClazz;
}
- protected abstract InstancesResult doExecute(String colo) throws FalconException;
- }
-
- private abstract class InstanceSummaryProxy {
-
- public InstancesSummaryResult execute(String coloExpr, String type, String name) {
+ public T execute(String coloExpr, String type, String name) {
Set<String> colos = getColosFromExpression(coloExpr, type, name);
- Map<String, InstancesSummaryResult> results = new HashMap<String, InstancesSummaryResult>();
+ Map<String, T> results = new HashMap<String, T>();
for (String colo : colos) {
try {
- InstancesSummaryResult resultHolder = doExecute(colo);
+ T resultHolder = doExecute(colo);
results.put(colo, resultHolder);
} catch (FalconException e) {
- results.put(colo, new InstancesSummaryResult(APIResult.Status.FAILED,
- e.getClass().getName() + "::" + e.getMessage(),
- new InstancesSummaryResult.InstanceSummary[0]));
+ results.put(colo, getResultInstance(APIResult.Status.FAILED,
+ e.getClass().getName() + "::" + e.getMessage()));
}
}
- InstancesSummaryResult finalResult = consolidateInstanceSummaryResult(results);
+ T finalResult = consolidateResult(results, clazz);
if (finalResult.getStatus() != APIResult.Status.SUCCEEDED) {
throw FalconWebException.newException(finalResult, Response.Status.BAD_REQUEST);
} else {
@@ -372,73 +369,15 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
}
}
- protected abstract InstancesSummaryResult doExecute(String colo) throws FalconException;
- }
-
- private InstancesResult consolidateInstanceResult(Map<String, InstancesResult> results) {
- if (results == null || results.isEmpty()) {
- return null;
- }
-
- StringBuilder message = new StringBuilder();
- StringBuilder requestIds = new StringBuilder();
- List<Instance> instances = new ArrayList<Instance>();
- int statusCount = 0;
- for (Map.Entry<String, InstancesResult> entry : results.entrySet()) {
- String colo = entry.getKey();
- InstancesResult result = results.get(colo);
- message.append(colo).append('/').append(result.getMessage()).append('\n');
- requestIds.append(colo).append('/').append(result.getRequestId()).append('\n');
- statusCount += result.getStatus().ordinal();
-
- if (result.getInstances() == null) {
- continue;
- }
-
- for (Instance instance : result.getInstances()) {
- instance.instance = instance.getInstance();
- instances.add(instance);
- }
- }
- Instance[] arrInstances = new Instance[instances.size()];
- APIResult.Status status = (statusCount == 0) ? APIResult.Status.SUCCEEDED
- : ((statusCount == results.size() * 2) ? APIResult.Status.FAILED : APIResult.Status.PARTIAL);
- InstancesResult result = new InstancesResult(status, message.toString(), instances.toArray(arrInstances));
- result.setRequestId(requestIds.toString());
- return result;
- }
-
- private InstancesSummaryResult consolidateInstanceSummaryResult(Map<String, InstancesSummaryResult> results) {
- if (results == null || results.isEmpty()) {
- return null;
- }
-
- StringBuilder message = new StringBuilder();
- StringBuilder requestIds = new StringBuilder();
- List<InstanceSummary> instances = new ArrayList<InstanceSummary>();
- int statusCount = 0;
- for (Map.Entry<String, InstancesSummaryResult> entry : results.entrySet()) {
- String colo = entry.getKey();
- InstancesSummaryResult result = results.get(colo);
- message.append(colo).append('/').append(result.getMessage()).append('\n');
- requestIds.append(colo).append('/').append(result.getRequestId()).append('\n');
- statusCount += result.getStatus().ordinal();
-
- if (result.getInstancesSummary() == null) {
- continue;
- }
+ protected abstract T doExecute(String colo) throws FalconException;
- for (InstanceSummary instance : result.getInstancesSummary()) {
- instance.summaryMap = instance.getSummaryMap();
- instances.add(instance);
+ private T getResultInstance(APIResult.Status status, String message) {
+ try {
+ Constructor<T> constructor = clazz.getConstructor(APIResult.Status.class, String.class);
+ return constructor.newInstance(status, message);
+ } catch (Exception e) {
+ throw new FalconRuntimException("Unable to consolidate result.", e);
}
}
- InstanceSummary[] arrInstances = new InstanceSummary[instances.size()];
- APIResult.Status status = (statusCount == 0) ? APIResult.Status.SUCCEEDED
- : ((statusCount == results.size() * 2) ? APIResult.Status.FAILED : APIResult.Status.PARTIAL);
- InstancesSummaryResult result = new InstancesSummaryResult(status, message.toString(),
- instances.toArray(arrInstances));
- result.setRequestId(requestIds.toString());
- return result;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index fbccd6b..a4122a5 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -43,7 +43,6 @@ import javax.ws.rs.core.Response;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
/**
@@ -129,7 +128,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
if (!embeddedMode) {
results.put(PRISM_TAG, super.submit(bufferedRequest, type, currentColo));
}
- return consolidateResult(results);
+ return consolidateResult(results, APIResult.class);
}
private Entity getEntity(HttpServletRequest request, String type) {
@@ -187,7 +186,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
if (!embeddedMode) {
results.put(PRISM_TAG, super.delete(bufferedRequest, type, entity, currentColo));
}
- return consolidateResult(results);
+ return consolidateResult(results, APIResult.class);
}
@POST
@@ -258,7 +257,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
results.put(PRISM_TAG, super.update(bufferedRequest, type, entityName, currentColo, effectiveTime));
}
- return consolidateResult(results);
+ return consolidateResult(results, APIResult.class);
}
@GET
@@ -338,7 +337,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
Map<String, APIResult> results = new HashMap<String, APIResult>();
results.put("submit", submit(bufferedRequest, type, coloExpr));
results.put("schedule", schedule(bufferedRequest, type, entity, coloExpr));
- return consolidateResult(results);
+ return consolidateResult(results, APIResult.class);
}
@POST
@@ -451,7 +450,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
new APIResult(APIResult.Status.FAILED, e.getClass().getName() + "::" + e.getMessage()));
}
}
- APIResult finalResult = consolidateResult(results);
+ APIResult finalResult = consolidateResult(results, APIResult.class);
if (finalResult.getStatus() != APIResult.Status.SUCCEEDED) {
throw FalconWebException.newException(finalResult, Response.Status.BAD_REQUEST);
} else {
@@ -465,27 +464,4 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
protected abstract APIResult doExecute(String colo) throws FalconException;
}
-
- private APIResult consolidateResult(Map<String, APIResult> results) {
- if (results == null || results.size() == 0) {
- return null;
- }
-
- StringBuilder buffer = new StringBuilder();
- StringBuilder requestIds = new StringBuilder();
- int statusCount = 0;
- for (Entry<String, APIResult> entry : results.entrySet()) {
- String colo = entry.getKey();
- APIResult result = entry.getValue();
- buffer.append(colo).append('/').append(result.getMessage()).append('\n');
- requestIds.append(colo).append('/').append(result.getRequestId()).append('\n');
- statusCount += result.getStatus().ordinal();
- }
-
- APIResult.Status status = (statusCount == 0) ? APIResult.Status.SUCCEEDED
- : ((statusCount == results.size() * 2) ? APIResult.Status.FAILED : APIResult.Status.PARTIAL);
- APIResult result = new APIResult(status, buffer.toString());
- result.setRequestId(requestIds.toString());
- return result;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
index 7a7d993..d4e0ae0 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
@@ -115,6 +115,20 @@ public class InstanceManager extends AbstractInstanceManager {
}
@GET
+ @Path("listing/{type}/{entity}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Monitored(event = "instance-listing")
+ @Override
+ public FeedInstanceResult getListing(
+ @Dimension("type") @PathParam("type") String type,
+ @Dimension("entity") @PathParam("entity") String entity,
+ @Dimension("start-time") @QueryParam("start") String start,
+ @Dimension("end-time") @QueryParam("end") String end,
+ @Dimension("colo") @QueryParam("colo") String colo) {
+ return super.getListing(type, entity, start, end, colo);
+ }
+
+ @GET
@Path("logs/{type}/{entity}")
@Produces(MediaType.APPLICATION_JSON)
@Monitored(event = "instance-logs")
@@ -150,7 +164,6 @@ public class InstanceManager extends AbstractInstanceManager {
return super.getInstanceParams(type, entity, start, colo, lifeCycles);
}
-
@POST
@Path("kill/{type}/{entity}")
@Produces(MediaType.APPLICATION_JSON)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9f722b3d/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 0943103..6694af1 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -404,6 +404,11 @@ public class FalconCLIIT {
+ " -start " + SchemaHelper.getDateFormat().format(new Date())));
Assert.assertEquals(0,
+ executeWithURL("instance -listing -type feed -name "
+ + overlay.get("outputFeedName")
+ + " -start " + SchemaHelper.getDateFormat().format(new Date())));
+
+ Assert.assertEquals(0,
executeWithURL("instance -status -type process -name "
+ overlay.get("processName")
+ " -start " + START_INSTANCE));