You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/01/28 20:48:35 UTC
falcon git commit: FALCON-763 Support feed listing for
CatalogStorage. Contributed by Balu Vellanki
Repository: falcon
Updated Branches:
refs/heads/master 33d72f77a -> 8c52a9add
FALCON-763 Support feed listing for CatalogStorage. Contributed by Balu Vellanki
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8c52a9ad
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8c52a9ad
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8c52a9ad
Branch: refs/heads/master
Commit: 8c52a9add412cf91143d5fd4c4285f66d88a04d6
Parents: 33d72f7
Author: bvellanki <bv...@hortonworks.com>
Authored: Thu Jan 28 11:48:30 2016 -0800
Committer: bvellanki <bv...@hortonworks.com>
Committed: Thu Jan 28 11:48:30 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../apache/falcon/catalog/CatalogPartition.java | 20 ++-
.../falcon/catalog/HiveCatalogService.java | 12 ++
.../apache/falcon/entity/CatalogStorage.java | 73 +++++++-
.../org/apache/falcon/entity/FeedHelper.java | 18 ++
.../apache/falcon/entity/FileSystemStorage.java | 6 +-
.../apache/falcon/entity/FeedHelperTest.java | 12 ++
.../apache/falcon/catalog/CatalogStorageIT.java | 170 +++++++++++++++++++
8 files changed, 304 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b92d88d..d904624 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,7 +9,10 @@ Trunk
FALCON-1495 In instance status list, show all runs for instances when requested by user(Narayan Periwal via Ajay Yadava)
FALCON-1230 Data based notification Service to notify execution instances when data becomes available(Pavan Kumar Kolamuri via Ajay Yadava)
+
IMPROVEMENTS
+ FALCON-763 Support feed listing for CatalogStorage (Balu Vellanki)
+
FALCON-1764 Remove temporary folder "localhost" created during tests(Praveen Adlakha via Ajay Yadava)
FALCON-1756 Remove PID files on service stop(Deepak Barr via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
index 9e35782..71194c7 100644
--- a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
+++ b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
@@ -34,6 +34,7 @@ public class CatalogPartition {
private String outputFormat;
private String location;
private String serdeInfo;
+ private long size = -1;
protected CatalogPartition() {
}
@@ -74,6 +75,8 @@ public class CatalogPartition {
this.serdeInfo = serdeInfo;
}
+ public void setSize(long size) { this.size = size; }
+
/**
* Gets the database name.
*
@@ -156,14 +159,21 @@ public class CatalogPartition {
return this.values;
}
+ /**
+ * Gets the size.
+ *
+ * @return the size
+ */
+ public long getSize() { return size; }
+
@Override
public String toString() {
return "CatalogPartition ["
- + (tableName != null ? "tableName=" + tableName + ", " : "tableName=null")
- + (databaseName != null ? "dbName=" + databaseName + ", " : "dbName=null")
- + (values != null ? "values=" + values + ", " : "values=null")
- + "createTime=" + createTime + ", lastAccessTime="
- + lastAccessTime + ", " + "]";
+ + (tableName != null ? "tableName=" + tableName + ", " : "tableName=null, ")
+ + (databaseName != null ? "dbName=" + databaseName + ", " : "dbName=null, ")
+ + (values != null ? "values=" + values + ", " : "values=null, ")
+ + "size=" + size + ", " + "createTime=" + createTime + ", lastAccessTime="
+ + lastAccessTime + "]";
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
index b988c3e..872f91f 100644
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -18,6 +18,7 @@
package org.apache.falcon.catalog;
+import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.security.SecurityUtil;
@@ -47,6 +48,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* An implementation of CatalogService that uses Hive Meta Store (HCatalog)
@@ -57,6 +59,7 @@ public class HiveCatalogService extends AbstractCatalogService {
private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogService.class);
public static final String CREATE_TIME = "falcon.create_time";
public static final String UPDATE_TIME = "falcon.update_time";
+ public static final String PARTITION_DOES_NOT_EXIST = "Partition does not exist";
public static HiveConf createHiveConf(Configuration conf,
@@ -291,6 +294,13 @@ public class HiveCatalogService extends AbstractCatalogService {
catalogPartition.setSerdeInfo(hCatPartition.getSd().getSerdeInfo().getSerializationLib());
catalogPartition.setCreateTime(hCatPartition.getCreateTime());
catalogPartition.setLastAccessTime(hCatPartition.getLastAccessTime());
+ Map<String, String> params = hCatPartition.getParameters();
+ if (params != null) {
+ String size = hCatPartition.getParameters().get("totalSize");
+ if (StringUtils.isNotBlank(size)) {
+ catalogPartition.setSize(Long.parseLong(size));
+ }
+ }
return catalogPartition;
}
@@ -337,6 +347,8 @@ public class HiveCatalogService extends AbstractCatalogService {
HiveMetaStoreClient client = createClient(conf, catalogUrl);
Partition hCatPartition = client.getPartition(database, tableName, partitionValues);
return createCatalogPartition(hCatPartition);
+ } catch (NoSuchObjectException nsoe) {
+ throw new FalconException(PARTITION_DOES_NOT_EXIST + ":" + nsoe.getMessage(), nsoe);
} catch (Exception e) {
throw new FalconException("Exception fetching partition:" + e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index 143d9b4..c5860c9 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -23,6 +23,7 @@ import org.apache.falcon.Pair;
import org.apache.falcon.catalog.AbstractCatalogService;
import org.apache.falcon.catalog.CatalogPartition;
import org.apache.falcon.catalog.CatalogServiceFactory;
+import org.apache.falcon.catalog.HiveCatalogService;
import org.apache.falcon.entity.common.FeedDataPath;
import org.apache.falcon.entity.v0.AccessControlList;
import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -381,15 +382,79 @@ public class CatalogStorage extends Configured implements Storage {
}
@Override
- public List<FeedInstanceStatus> getListing(Feed feed, String cluster, LocationType locationType,
+ public List<FeedInstanceStatus> getListing(Feed feed, String clusterName, LocationType locationType,
Date start, Date end) throws FalconException {
- throw new UnsupportedOperationException("getListing");
+ try {
+ List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>();
+ Date feedStart = FeedHelper.getFeedValidityStart(feed, clusterName);
+ Date alignedDate = EntityUtil.getNextStartTime(feedStart, feed.getFrequency(),
+ feed.getTimezone(), start);
+
+ while (!end.before(alignedDate)) {
+ List<String> partitionValues = getCatalogPartitionValues(alignedDate);
+ try {
+ CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
+ getConf(), getCatalogUrl(), getDatabase(), getTable(), partitionValues);
+ instances.add(getFeedInstanceFromCatalogPartition(partition));
+ } catch (FalconException e) {
+ if (e.getMessage().startsWith(HiveCatalogService.PARTITION_DOES_NOT_EXIST)) {
+ // Partition missing
+ FeedInstanceStatus instanceStatus = new FeedInstanceStatus(null);
+ instanceStatus.setInstance(partitionValues.toString());
+ instances.add(instanceStatus);
+ } else {
+ throw e;
+ }
+ }
+ alignedDate = FeedHelper.getNextFeedInstanceDate(alignedDate, feed);
+ }
+ return instances;
+ } catch (Exception e) {
+ LOG.error("Unable to retrieve listing for {}:{} -- {}", locationType, catalogUrl, e.getMessage());
+ throw new FalconException("Unable to retrieve listing for (URI " + catalogUrl + ")", e);
+ }
+ }
+
+ private List<String> getCatalogPartitionValues(Date alignedDate) throws FalconException {
+ List<String> partitionValues = new ArrayList<String>();
+ for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
+ if (FeedDataPath.PATTERN.matcher(entry.getValue()).find()) {
+ ExpressionHelper.setReferenceDate(alignedDate);
+ ExpressionHelper expressionHelper = ExpressionHelper.get();
+ String instanceValue = expressionHelper.evaluateFullExpression(entry.getValue(), String.class);
+ partitionValues.add(instanceValue);
+ } else {
+ partitionValues.add(entry.getValue());
+ }
+ }
+ return partitionValues;
+ }
+
+ private FeedInstanceStatus getFeedInstanceFromCatalogPartition(CatalogPartition partition) {
+ FeedInstanceStatus feedInstanceStatus = new FeedInstanceStatus(partition.getLocation());
+ feedInstanceStatus.setCreationTime(partition.getCreateTime());
+ feedInstanceStatus.setInstance(partition.getValues().toString());
+ FeedInstanceStatus.AvailabilityStatus availabilityStatus = FeedInstanceStatus.AvailabilityStatus.MISSING;
+ long size = partition.getSize();
+ if (size == 0) {
+ availabilityStatus = FeedInstanceStatus.AvailabilityStatus.EMPTY;
+ } else if (size > 0) {
+ availabilityStatus = FeedInstanceStatus.AvailabilityStatus.AVAILABLE;
+ }
+ feedInstanceStatus.setSize(size);
+ feedInstanceStatus.setStatus(availabilityStatus);
+ return feedInstanceStatus;
}
@Override
public FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed, String clusterName,
- LocationType locationType, Date instancetime) throws FalconException {
- throw new UnsupportedOperationException("getInstanceAvailabilityStatus"); //TODO to be implemented later
+ LocationType locationType, Date instanceTime) throws FalconException {
+ List<FeedInstanceStatus> result = getListing(feed, clusterName, locationType, instanceTime, instanceTime);
+ if (result.isEmpty()) {
+ return FeedInstanceStatus.AvailabilityStatus.MISSING;
+ } else {
+ return result.get(0).getStatus();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 8aa97ec..b3aaaab 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -408,6 +408,24 @@ public final class FeedHelper {
return null;
}
+ public static Date getFeedValidityStart(Feed feed, String clusterName) throws FalconException {
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
+ if (feedCluster != null) {
+ return feedCluster.getValidity().getStart();
+ } else {
+ throw new FalconException("No matching cluster " + clusterName
+ + "found for feed " + feed.getName());
+ }
+ }
+
+ public static Date getNextFeedInstanceDate(Date alignedDate, Feed feed) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(alignedDate);
+ calendar.add(feed.getFrequency().getTimeUnit().getCalendarUnit(),
+ feed.getFrequency().getFrequencyAsInt());
+ return calendar.getTime();
+ }
+
/**
* Returns various policies applicable for a feed.
*
http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 200f71f..ece8b5d 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -466,7 +466,11 @@ public class FileSystemStorage extends Configured implements Storage {
Date instanceTime) throws FalconException {
List<FeedInstanceStatus> result = getListing(feed, clusterName, locationType, instanceTime, instanceTime);
- return result.get(0).getStatus();
+ if (result.isEmpty()) {
+ return FeedInstanceStatus.AvailabilityStatus.MISSING;
+ } else {
+ return result.get(0).getStatus();
+ }
}
public FileStatus getFileStatus(FileSystem fileSystem, Path feedInstancePath) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
index d565f94..95d10c4 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
@@ -53,6 +53,7 @@ import org.apache.falcon.entity.v0.process.Outputs;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.resource.SchedulableEntityInstance;
import org.apache.falcon.service.LifecyclePolicyMap;
+import org.apache.falcon.util.DateUtil;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
@@ -198,6 +199,17 @@ public class FeedHelperTest extends AbstractTestBase {
}
@Test
+ public void testGetFeedValidityStartAndNextInstance() throws Exception {
+ Cluster cluster = publishCluster();
+ Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Date date = FeedHelper.getFeedValidityStart(feed, cluster.getName());
+ Assert.assertEquals(DateUtil.getDateFormatFromTime(date.getTime()), "2011-02-28T10:00Z");
+ Date nextDate = FeedHelper.getNextFeedInstanceDate(date, feed);
+ Assert.assertEquals(DateUtil.getDateFormatFromTime(nextDate.getTime()), "2011-02-28T10:05Z");
+ }
+
+
+ @Test
public void testGetConsumersFirstInstance() throws Exception {
Cluster cluster = publishCluster();
Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/webapp/src/test/java/org/apache/falcon/catalog/CatalogStorageIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/CatalogStorageIT.java b/webapp/src/test/java/org/apache/falcon/catalog/CatalogStorageIT.java
new file mode 100644
index 0000000..bf3b2ec
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/catalog/CatalogStorageIT.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.catalog;
+
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.FeedInstanceStatus;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfaces;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.Validity;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.HiveTestUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hive.hcatalog.api.HCatAddPartitionDesc;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.TimeZone;
+
+
+/**
+ * Tests Hive Meta Store service.
+ */
+public class CatalogStorageIT {
+
+ private static final String METASTORE_URL = "thrift://localhost:49083";
+ private static final String DATABASE_NAME = "CatalogStorageITDB";
+ private static final String TABLE_NAME = "CatalogStorageITTable";
+
+ private HCatClient client;
+ private Feed feed = new Feed();
+ private org.apache.falcon.entity.v0.cluster.Cluster cluster = new org.apache.falcon.entity.v0.cluster.Cluster();
+ private DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm Z");
+ private CatalogStorage storage;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ // setup a logged in user
+ CurrentUser.authenticate(TestContext.REMOTE_USER);
+ client = TestContext.getHCatClient(METASTORE_URL);
+
+ HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME);
+ List<String> partitionKeys = new ArrayList<String>();
+ partitionKeys.add("ds");
+ partitionKeys.add("region");
+ HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionKeys);
+ addPartitions();
+ addClusterAndFeed();
+ }
+
+ private void addClusterAndFeed() throws Exception {
+ cluster.setName("testCluster");
+ Interfaces interfaces = new Interfaces();
+ Interface registry = new Interface();
+ registry.setType(Interfacetype.REGISTRY);
+ registry.setEndpoint(METASTORE_URL);
+ interfaces.getInterfaces().add(registry);
+ cluster.setInterfaces(interfaces);
+
+ feed.setName("feed");
+ Frequency f = new Frequency("days(1)");
+ feed.setFrequency(f);
+ feed.setTimezone(TimeZone.getTimeZone("UTC"));
+ Clusters fClusters = new Clusters();
+ org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster();
+ fCluster.setType(ClusterType.SOURCE);
+ fCluster.setName("testCluster");
+ Validity validity = new Validity();
+ validity.setStart(format.parse("2013-09-01 00:00 UTC"));
+ validity.setEnd(format.parse("2013-09-06 00:00 UTC"));
+ fCluster.setValidity(validity);
+ fClusters.getClusters().add(fCluster);
+ feed.setClusters(fClusters);
+
+ initCatalogService();
+ }
+
+ private void initCatalogService() throws Exception {
+ CatalogTable table = new CatalogTable();
+ String uri = "catalog:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=${YEAR}${MONTH}${DAY};region=us";
+ table.setUri(uri);
+ feed.setTable(table);
+
+ storage = new CatalogStorage(cluster, table);
+ Configuration configuration = HiveCatalogService.createHiveConf(new Configuration(), storage.getCatalogUrl());
+ storage.setConf(configuration);
+ }
+
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ dropTable(TABLE_NAME);
+ dropDatabase();
+ TestContext.deleteEntitiesFromStore();
+ }
+
+ private void dropTable(String tableName) throws Exception {
+ client.dropTable(DATABASE_NAME, tableName, true);
+ }
+
+ private void dropDatabase() throws Exception {
+ client.dropDatabase(DATABASE_NAME, true, HCatClient.DropDBMode.CASCADE);
+ }
+
+ private void addPartitions() throws Exception {
+ putPartition("20130901", "us");
+ putPartition("20130902", "us");
+ putPartition("20130904", "us");
+ putPartition("20130905", "us");
+ }
+
+ private void putPartition(String date, String region) throws HCatException {
+ Map<String, String> partition = new HashMap<String, String>();
+ partition.put("ds", date); //yyyyMMDD
+ partition.put("region", region);
+ HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(
+ DATABASE_NAME, TABLE_NAME, null, partition).build();
+ client.addPartition(addPtn);
+ }
+
+ @Test
+ public void testGetInstanceAvailabilityStatus() throws Exception {
+ List<FeedInstanceStatus> instanceStatuses = storage.getListing(feed, cluster.getName(),
+ LocationType.DATA, format.parse("2013-09-02 00:00 UTC"), format.parse("2013-09-04 00:00 UTC"));
+ Assert.assertEquals(instanceStatuses.size(), 3);
+ }
+
+ @Test
+ public void testGetListing() throws Exception {
+ FeedInstanceStatus.AvailabilityStatus availabilityStatus = storage.getInstanceAvailabilityStatus(
+ feed, cluster.getName(),
+ LocationType.DATA, format.parse("2013-09-03 00:00 UTC"));
+ Assert.assertEquals(availabilityStatus, FeedInstanceStatus.AvailabilityStatus.MISSING);
+ }
+
+}