You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/01/28 20:48:35 UTC

falcon git commit: FALCON-763 Support feed listing for CatalogStorage. Contributed by Balu Vellanki

Repository: falcon
Updated Branches:
  refs/heads/master 33d72f77a -> 8c52a9add


FALCON-763 Support feed listing for CatalogStorage. Contributed by Balu Vellanki


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8c52a9ad
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8c52a9ad
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8c52a9ad

Branch: refs/heads/master
Commit: 8c52a9add412cf91143d5fd4c4285f66d88a04d6
Parents: 33d72f7
Author: bvellanki <bv...@hortonworks.com>
Authored: Thu Jan 28 11:48:30 2016 -0800
Committer: bvellanki <bv...@hortonworks.com>
Committed: Thu Jan 28 11:48:30 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../apache/falcon/catalog/CatalogPartition.java |  20 ++-
 .../falcon/catalog/HiveCatalogService.java      |  12 ++
 .../apache/falcon/entity/CatalogStorage.java    |  73 +++++++-
 .../org/apache/falcon/entity/FeedHelper.java    |  18 ++
 .../apache/falcon/entity/FileSystemStorage.java |   6 +-
 .../apache/falcon/entity/FeedHelperTest.java    |  12 ++
 .../apache/falcon/catalog/CatalogStorageIT.java | 170 +++++++++++++++++++
 8 files changed, 304 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b92d88d..d904624 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,7 +9,10 @@ Trunk
     FALCON-1495 In instance status list, show all runs for instances when requested by user(Narayan Periwal via Ajay Yadava)
 
     FALCON-1230 Data based notification Service to notify execution instances when data becomes available(Pavan Kumar Kolamuri via Ajay Yadava)
+
   IMPROVEMENTS
+    FALCON-763 Support feed listing for CatalogStorage (Balu Vellanki)
+
     FALCON-1764 Remove temporary folder "localhost" created during tests(Praveen Adlakha via Ajay Yadava)
 
     FALCON-1756 Remove PID files on service stop(Deepak Barr via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
index 9e35782..71194c7 100644
--- a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
+++ b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
@@ -34,6 +34,7 @@ public class CatalogPartition {
     private String outputFormat;
     private String location;
     private String serdeInfo;
+    private long size = -1;
 
     protected CatalogPartition() {
     }
@@ -74,6 +75,8 @@ public class CatalogPartition {
         this.serdeInfo = serdeInfo;
     }
 
+    public void setSize(long size) { this.size = size; }
+
     /**
      * Gets the database name.
      *
@@ -156,14 +159,21 @@ public class CatalogPartition {
         return this.values;
     }
 
+    /**
+     * Gets the size.
+     *
+     * @return the size
+     */
+    public long getSize() { return size; }
+
     @Override
     public String toString() {
         return "CatalogPartition ["
-            + (tableName != null ? "tableName=" + tableName + ", " : "tableName=null")
-            + (databaseName != null ? "dbName=" + databaseName + ", " : "dbName=null")
-            + (values != null ? "values=" + values + ", " : "values=null")
-            + "createTime=" + createTime + ", lastAccessTime="
-            + lastAccessTime + ", " + "]";
+            + (tableName != null ? "tableName=" + tableName + ", " : "tableName=null, ")
+            + (databaseName != null ? "dbName=" + databaseName + ", " : "dbName=null, ")
+            + (values != null ? "values=" + values + ", " : "values=null, ")
+            + "size=" + size + ", " + "createTime=" + createTime + ", lastAccessTime="
+            + lastAccessTime + "]";
     }
 
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
index b988c3e..872f91f 100644
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.catalog;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.security.SecurityUtil;
@@ -47,6 +48,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * An implementation of CatalogService that uses Hive Meta Store (HCatalog)
@@ -57,6 +59,7 @@ public class HiveCatalogService extends AbstractCatalogService {
     private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogService.class);
     public static final String CREATE_TIME = "falcon.create_time";
     public static final String UPDATE_TIME = "falcon.update_time";
+    public static final String PARTITION_DOES_NOT_EXIST = "Partition does not exist";
 
 
     public static HiveConf createHiveConf(Configuration conf,
@@ -291,6 +294,13 @@ public class HiveCatalogService extends AbstractCatalogService {
         catalogPartition.setSerdeInfo(hCatPartition.getSd().getSerdeInfo().getSerializationLib());
         catalogPartition.setCreateTime(hCatPartition.getCreateTime());
         catalogPartition.setLastAccessTime(hCatPartition.getLastAccessTime());
+        Map<String, String> params = hCatPartition.getParameters();
+        if (params != null) {
+            String size = hCatPartition.getParameters().get("totalSize");
+            if (StringUtils.isNotBlank(size)) {
+                catalogPartition.setSize(Long.parseLong(size));
+            }
+        }
         return catalogPartition;
     }
 
@@ -337,6 +347,8 @@ public class HiveCatalogService extends AbstractCatalogService {
             HiveMetaStoreClient client = createClient(conf, catalogUrl);
             Partition hCatPartition = client.getPartition(database, tableName, partitionValues);
             return createCatalogPartition(hCatPartition);
+        } catch (NoSuchObjectException nsoe) {
+            throw new FalconException(PARTITION_DOES_NOT_EXIST + ":" + nsoe.getMessage(), nsoe);
         } catch (Exception e) {
             throw new FalconException("Exception fetching partition:" + e.getMessage(), e);
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index 143d9b4..c5860c9 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -23,6 +23,7 @@ import org.apache.falcon.Pair;
 import org.apache.falcon.catalog.AbstractCatalogService;
 import org.apache.falcon.catalog.CatalogPartition;
 import org.apache.falcon.catalog.CatalogServiceFactory;
+import org.apache.falcon.catalog.HiveCatalogService;
 import org.apache.falcon.entity.common.FeedDataPath;
 import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -381,15 +382,79 @@ public class CatalogStorage extends Configured implements Storage {
     }
 
     @Override
-    public List<FeedInstanceStatus> getListing(Feed feed, String cluster, LocationType locationType,
+    public List<FeedInstanceStatus> getListing(Feed feed, String clusterName, LocationType locationType,
                                                Date start, Date end) throws FalconException {
-        throw new UnsupportedOperationException("getListing");
+        try {
+            List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>();
+            Date feedStart = FeedHelper.getFeedValidityStart(feed, clusterName);
+            Date alignedDate = EntityUtil.getNextStartTime(feedStart, feed.getFrequency(),
+                    feed.getTimezone(), start);
+
+            while (!end.before(alignedDate)) {
+                List<String> partitionValues = getCatalogPartitionValues(alignedDate);
+                try {
+                    CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
+                            getConf(), getCatalogUrl(), getDatabase(), getTable(), partitionValues);
+                    instances.add(getFeedInstanceFromCatalogPartition(partition));
+                } catch (FalconException e) {
+                    if (e.getMessage().startsWith(HiveCatalogService.PARTITION_DOES_NOT_EXIST)) {
+                        // Partition missing
+                        FeedInstanceStatus instanceStatus = new FeedInstanceStatus(null);
+                        instanceStatus.setInstance(partitionValues.toString());
+                        instances.add(instanceStatus);
+                    } else {
+                        throw e;
+                    }
+                }
+                alignedDate = FeedHelper.getNextFeedInstanceDate(alignedDate, feed);
+            }
+            return instances;
+        } catch (Exception e) {
+            LOG.error("Unable to retrieve listing for {}:{} -- {}", locationType, catalogUrl, e.getMessage());
+            throw new FalconException("Unable to retrieve listing for (URI " + catalogUrl + ")", e);
+        }
+    }
+
+    private List<String> getCatalogPartitionValues(Date alignedDate) throws FalconException {
+        List<String> partitionValues  = new ArrayList<String>();
+        for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
+            if (FeedDataPath.PATTERN.matcher(entry.getValue()).find()) {
+                ExpressionHelper.setReferenceDate(alignedDate);
+                ExpressionHelper expressionHelper = ExpressionHelper.get();
+                String instanceValue = expressionHelper.evaluateFullExpression(entry.getValue(), String.class);
+                partitionValues.add(instanceValue);
+            } else {
+                partitionValues.add(entry.getValue());
+            }
+        }
+        return partitionValues;
+    }
+
+    private FeedInstanceStatus getFeedInstanceFromCatalogPartition(CatalogPartition partition) {
+        FeedInstanceStatus feedInstanceStatus = new FeedInstanceStatus(partition.getLocation());
+        feedInstanceStatus.setCreationTime(partition.getCreateTime());
+        feedInstanceStatus.setInstance(partition.getValues().toString());
+        FeedInstanceStatus.AvailabilityStatus availabilityStatus = FeedInstanceStatus.AvailabilityStatus.MISSING;
+        long size = partition.getSize();
+        if (size == 0) {
+            availabilityStatus = FeedInstanceStatus.AvailabilityStatus.EMPTY;
+        } else if (size > 0) {
+            availabilityStatus = FeedInstanceStatus.AvailabilityStatus.AVAILABLE;
+        }
+        feedInstanceStatus.setSize(size);
+        feedInstanceStatus.setStatus(availabilityStatus);
+        return feedInstanceStatus;
     }
 
     @Override
     public FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed, String clusterName,
-                                         LocationType locationType, Date instancetime) throws FalconException {
-        throw new UnsupportedOperationException("getInstanceAvailabilityStatus"); //TODO to be implemented later
+                                         LocationType locationType, Date instanceTime) throws FalconException {
+        List<FeedInstanceStatus> result = getListing(feed, clusterName, locationType, instanceTime, instanceTime);
+        if (result.isEmpty()) {
+            return FeedInstanceStatus.AvailabilityStatus.MISSING;
+        } else {
+            return result.get(0).getStatus();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 8aa97ec..b3aaaab 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -408,6 +408,24 @@ public final class FeedHelper {
         return null;
     }
 
+    public static Date getFeedValidityStart(Feed feed, String clusterName) throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
+        if (feedCluster != null) {
+            return feedCluster.getValidity().getStart();
+        } else {
+            throw new FalconException("No matching cluster " + clusterName
+                    + "found for feed " + feed.getName());
+        }
+    }
+
+    public static Date getNextFeedInstanceDate(Date alignedDate, Feed feed) {
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(alignedDate);
+        calendar.add(feed.getFrequency().getTimeUnit().getCalendarUnit(),
+                feed.getFrequency().getFrequencyAsInt());
+        return calendar.getTime();
+    }
+
     /**
      * Returns various policies applicable for a feed.
      *

http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 200f71f..ece8b5d 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -466,7 +466,11 @@ public class FileSystemStorage extends Configured implements Storage {
                                                                    Date instanceTime) throws FalconException {
 
         List<FeedInstanceStatus> result = getListing(feed, clusterName, locationType, instanceTime, instanceTime);
-        return result.get(0).getStatus();
+        if (result.isEmpty()) {
+            return FeedInstanceStatus.AvailabilityStatus.MISSING;
+        } else {
+            return result.get(0).getStatus();
+        }
     }
 
     public FileStatus getFileStatus(FileSystem fileSystem, Path feedInstancePath) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
index d565f94..95d10c4 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
@@ -53,6 +53,7 @@ import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.resource.SchedulableEntityInstance;
 import org.apache.falcon.service.LifecyclePolicyMap;
+import org.apache.falcon.util.DateUtil;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
@@ -198,6 +199,17 @@ public class FeedHelperTest extends AbstractTestBase {
     }
 
     @Test
+    public void testGetFeedValidityStartAndNextInstance() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Date date = FeedHelper.getFeedValidityStart(feed, cluster.getName());
+        Assert.assertEquals(DateUtil.getDateFormatFromTime(date.getTime()), "2011-02-28T10:00Z");
+        Date nextDate = FeedHelper.getNextFeedInstanceDate(date, feed);
+        Assert.assertEquals(DateUtil.getDateFormatFromTime(nextDate.getTime()), "2011-02-28T10:05Z");
+    }
+
+
+    @Test
     public void testGetConsumersFirstInstance() throws Exception {
         Cluster cluster = publishCluster();
         Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");

http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/webapp/src/test/java/org/apache/falcon/catalog/CatalogStorageIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/CatalogStorageIT.java b/webapp/src/test/java/org/apache/falcon/catalog/CatalogStorageIT.java
new file mode 100644
index 0000000..bf3b2ec
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/catalog/CatalogStorageIT.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.catalog;
+
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.FeedInstanceStatus;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfaces;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.Validity;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.HiveTestUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hive.hcatalog.api.HCatAddPartitionDesc;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.TimeZone;
+
+
+/**
+ * Tests Hive Meta Store service.
+ */
+public class CatalogStorageIT {
+
+    private static final String METASTORE_URL = "thrift://localhost:49083";
+    private static final String DATABASE_NAME = "CatalogStorageITDB";
+    private static final String TABLE_NAME = "CatalogStorageITTable";
+
+    private HCatClient client;
+    private Feed feed = new Feed();
+    private org.apache.falcon.entity.v0.cluster.Cluster cluster = new org.apache.falcon.entity.v0.cluster.Cluster();
+    private DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm Z");
+    private CatalogStorage storage;
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        // setup a logged in user
+        CurrentUser.authenticate(TestContext.REMOTE_USER);
+        client = TestContext.getHCatClient(METASTORE_URL);
+
+        HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME);
+        List<String> partitionKeys = new ArrayList<String>();
+        partitionKeys.add("ds");
+        partitionKeys.add("region");
+        HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionKeys);
+        addPartitions();
+        addClusterAndFeed();
+    }
+
+    private void addClusterAndFeed() throws Exception {
+        cluster.setName("testCluster");
+        Interfaces interfaces = new Interfaces();
+        Interface registry = new Interface();
+        registry.setType(Interfacetype.REGISTRY);
+        registry.setEndpoint(METASTORE_URL);
+        interfaces.getInterfaces().add(registry);
+        cluster.setInterfaces(interfaces);
+
+        feed.setName("feed");
+        Frequency f = new Frequency("days(1)");
+        feed.setFrequency(f);
+        feed.setTimezone(TimeZone.getTimeZone("UTC"));
+        Clusters fClusters = new Clusters();
+        org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster();
+        fCluster.setType(ClusterType.SOURCE);
+        fCluster.setName("testCluster");
+        Validity validity = new Validity();
+        validity.setStart(format.parse("2013-09-01 00:00 UTC"));
+        validity.setEnd(format.parse("2013-09-06 00:00 UTC"));
+        fCluster.setValidity(validity);
+        fClusters.getClusters().add(fCluster);
+        feed.setClusters(fClusters);
+
+        initCatalogService();
+    }
+
+    private void initCatalogService() throws Exception {
+        CatalogTable table = new CatalogTable();
+        String uri = "catalog:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=${YEAR}${MONTH}${DAY};region=us";
+        table.setUri(uri);
+        feed.setTable(table);
+
+        storage = new CatalogStorage(cluster, table);
+        Configuration configuration = HiveCatalogService.createHiveConf(new Configuration(), storage.getCatalogUrl());
+        storage.setConf(configuration);
+    }
+
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        dropTable(TABLE_NAME);
+        dropDatabase();
+        TestContext.deleteEntitiesFromStore();
+    }
+
+    private void dropTable(String tableName) throws Exception {
+        client.dropTable(DATABASE_NAME, tableName, true);
+    }
+
+    private void dropDatabase() throws Exception {
+        client.dropDatabase(DATABASE_NAME, true, HCatClient.DropDBMode.CASCADE);
+    }
+
+    private void addPartitions() throws Exception {
+        putPartition("20130901", "us");
+        putPartition("20130902", "us");
+        putPartition("20130904", "us");
+        putPartition("20130905", "us");
+    }
+
+    private void putPartition(String date, String region) throws HCatException {
+        Map<String, String> partition = new HashMap<String, String>();
+        partition.put("ds", date); //yyyyMMDD
+        partition.put("region", region);
+        HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(
+                DATABASE_NAME, TABLE_NAME, null, partition).build();
+        client.addPartition(addPtn);
+    }
+
+    @Test
+    public void testGetInstanceAvailabilityStatus() throws Exception {
+        List<FeedInstanceStatus> instanceStatuses = storage.getListing(feed, cluster.getName(),
+                LocationType.DATA, format.parse("2013-09-02 00:00 UTC"), format.parse("2013-09-04 00:00 UTC"));
+        Assert.assertEquals(instanceStatuses.size(), 3);
+    }
+
+    @Test
+    public void testGetListing() throws Exception {
+        FeedInstanceStatus.AvailabilityStatus availabilityStatus = storage.getInstanceAvailabilityStatus(
+                feed, cluster.getName(),
+                LocationType.DATA, format.parse("2013-09-03 00:00 UTC"));
+        Assert.assertEquals(availabilityStatus, FeedInstanceStatus.AvailabilityStatus.MISSING);
+    }
+
+}