You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/08/17 03:57:37 UTC

falcon git commit: FALCON-2096 Feed instance listing doesn't give instance status of all the clusters

Repository: falcon
Updated Branches:
  refs/heads/master 2bea7f474 -> 7354f870b


FALCON-2096 Feed instance listing doesn't give instance status of all the clusters

Author: sandeep <sa...@gmail.com>

Reviewers: @pallavi-rao, @PraveenAdlakha

Closes #261 from sandeepSamudrala/FALCON-2096 and squashes the following commits:

c9030e6 [sandeep] FALCON-2096. Fixed checkstyle issues
4c9c655 [sandeep] FALCON-2096. Incorporated review comments. Extracted frequently used constants
9f0d751 [sandeep] Falcon 2096. Feed instance listing doesn't give instance status of all the clusters
97d2a8f [sandeep] Falcon 2096. Feed instance listing doesn't give instance status of all the clusters
d853c76 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2096
fa8f8ad [sandeep] FALCON-2096. Feed instance listing doesn't give instance status of all the clusters.
89def80 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2096
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7354f870
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7354f870
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7354f870

Branch: refs/heads/master
Commit: 7354f870b2bff7b5f0451a792cb06d2eee00ff38
Parents: 2bea7f4
Author: sandeep <sa...@gmail.com>
Authored: Wed Aug 17 09:27:31 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Wed Aug 17 09:27:31 2016 +0530

----------------------------------------------------------------------
 .../org/apache/falcon/entity/FeedHelper.java    |   9 +-
 .../falcon/entity/FileSystemStorageTest.java    | 163 ++++++++++++-------
 2 files changed, 111 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/7354f870/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index ea34d34..757359f 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -803,12 +803,11 @@ public final class FeedHelper {
                                                             Date start, Date end) throws FalconException {
         Set<String> clusters = EntityUtil.getClustersDefinedInColos(entityObject);
         FeedInstanceResult result = new FeedInstanceResult(APIResult.Status.SUCCEEDED, "Success");
+        List<FeedInstanceResult.Instance> allInstances = new ArrayList<FeedInstanceResult.Instance>();
         for (String cluster : clusters) {
             Feed feed = (Feed) entityObject;
             Storage storage = createStorage(cluster, feed);
             List<FeedInstanceStatus> feedListing = storage.getListing(feed, cluster, LocationType.DATA, start, end);
-            FeedInstanceResult.Instance[] instances = new FeedInstanceResult.Instance[feedListing.size()];
-            int index = 0;
             for (FeedInstanceStatus feedStatus : feedListing) {
                 FeedInstanceResult.Instance instance = new
                         FeedInstanceResult.Instance(cluster, feedStatus.getInstance(),
@@ -817,10 +816,12 @@ public final class FeedHelper {
                 instance.uri = feedStatus.getUri();
                 instance.size = feedStatus.getSize();
                 instance.sizeH = feedStatus.getSizeH();
-                instances[index++] = instance;
+                allInstances.add(instance);
             }
-            result.setInstances(instances);
         }
+        FeedInstanceResult.Instance[] resultInstances = allInstances.toArray(
+                new FeedInstanceResult.Instance[allInstances.size()]);
+        result.setInstances(resultInstances);
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/7354f870/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index 30edd94..98b0f8e 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -34,6 +34,7 @@ import org.apache.falcon.entity.v0.feed.Locations;
 import org.apache.falcon.entity.v0.feed.Validity;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.resource.FeedInstanceResult;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.FalconTestUtil;
 import org.apache.hadoop.fs.FileStatus;
@@ -60,6 +61,8 @@ import java.util.TimeZone;
 public class FileSystemStorageTest {
 
     private static final String USER = FalconTestUtil.TEST_USER_1;
+    private static final String TEST_FEED_LISTING = "TestFeedListing";
+    private static final String TEST_FEED_INSTANCE_LISTING = "TestFeedInstanceListing";
 
     @BeforeClass
     public void setUp() {
@@ -424,7 +427,7 @@ public class FileSystemStorageTest {
     @Test (dataProvider = "testListingDataProvider")
     public void testListing(String availabilityFlag, Frequency frequency, TimeZone timeZone,
                             Date start, Date end) throws Exception {
-        EmbeddedCluster cluster = EmbeddedCluster.newCluster("TestFeedListing", false);
+        EmbeddedCluster cluster = EmbeddedCluster.newCluster(TEST_FEED_LISTING, false);
         FileSystem fs = cluster.getFileSystem();
         ConfigurationStore.get().publish(EntityType.CLUSTER, cluster.getCluster());
         try {
@@ -433,79 +436,125 @@ public class FileSystemStorageTest {
             FileSystemStorage fileSystemStorage = new FileSystemStorage(cluster.getFileSystem().
                     getUri().toString(), feed.getLocations());
             List<FeedInstanceStatus> actual = fileSystemStorage.
-                    getListing(feed, "TestFeedListing", LocationType.DATA, start, end);
+                    getListing(feed, TEST_FEED_LISTING, LocationType.DATA, start, end);
             Assert.assertEquals(actual, expected, "Feed instance Listings doesn't match");
         } finally {
             ConfigurationStore.get().remove(EntityType.CLUSTER, cluster.getCluster().getName());
         }
     }
 
+    @Test (dataProvider = "testListingDataProvider")
+    public void testInstanceListing(String availabilityFlag, Frequency frequency, TimeZone timeZone,
+                                    Date start, Date end) throws Exception {
+        EmbeddedCluster firstCluster = EmbeddedCluster.newCluster(TEST_FEED_LISTING, false);
+        FileSystem fs = firstCluster.getFileSystem();
+        ConfigurationStore.get().publish(EntityType.CLUSTER, firstCluster.getCluster());
+
+
+        EmbeddedCluster secondCluster = EmbeddedCluster.newCluster(TEST_FEED_INSTANCE_LISTING, false);
+        ConfigurationStore.get().publish(EntityType.CLUSTER, secondCluster.getCluster());
+
+        try {
+            Feed feed = getFeed(availabilityFlag, frequency, timeZone);
+            Cluster cluster = new Cluster();
+            cluster.setName(TEST_FEED_INSTANCE_LISTING);
+            feed.getClusters().getClusters().add(cluster);
+            Validity validity = new Validity();
+            cluster.setValidity(validity);
+            validity.setStart(new Date(System.currentTimeMillis() - (1000L * 24 * 3600000)));
+            validity.setEnd(new Date(System.currentTimeMillis() - (1000L * 21 * 3600000)));
+            Locations locations = new Locations();
+            Location dataLocation = new Location();
+            dataLocation.setPath("/TestFeedInstanceListing/data/${YEAR}/${MONTH}/${DAY}"
+                    + (frequency.getTimeUnit() == Frequency.TimeUnit.hours ? "/${HOUR}" : "") + "/MORE");
+            dataLocation.setType(LocationType.DATA);
+            locations.getLocations().add(dataLocation);
+            cluster.setLocations(locations);
+
+            List<FeedInstanceStatus> expected = prepareData(fs, feed, start, end);
+
+            FeedInstanceResult actual = FeedHelper.getFeedInstanceListing(feed, start, end);
+            Assert.assertEquals(actual.getInstances().length, expected.size());
+        } finally {
+            ConfigurationStore.get().remove(EntityType.CLUSTER, firstCluster.getCluster().getName());
+            ConfigurationStore.get().remove(EntityType.CLUSTER, secondCluster.getCluster().getName());
+        }
+
+
+    }
+
     @SuppressWarnings("MagicConstant")
     private List<FeedInstanceStatus> prepareData(FileSystem fs, Feed feed,
                                                  Date start, Date end) throws Exception {
-        fs.delete(new Path("/TestFeedListing"), true);
+        fs.delete(new Path("/" + TEST_FEED_LISTING), true);
         Random random = new Random();
         List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>();
         String basePath = feed.getLocations().getLocations().get(0).getPath();
         Frequency frequency = feed.getFrequency();
         TimeZone tz = feed.getTimezone();
-        Date dataStart = EntityUtil.getNextStartTime(feed.getClusters().getClusters().get(0).getValidity().getStart(),
-                feed.getFrequency(), tz, new Date(start.getTime()));
-        Date dataEnd = new Date(end.getTime());
-        while (dataStart.before(dataEnd)) {
-            Properties properties = ExpressionHelper.getTimeVariables(dataStart, tz);
-            String path = ExpressionHelper.substitute(basePath, properties);
-            FeedInstanceStatus instance = new FeedInstanceStatus(path);
-            instance.setStatus(FeedInstanceStatus.AvailabilityStatus.MISSING);
-            instance.setSize(-1);
-            instance.setCreationTime(0);
-            Date date = FeedHelper.getDate(basePath, new Path(path), tz);
-            instance.setInstance(SchemaHelper.formatDateUTC(date));
-            Calendar cal = Calendar.getInstance();
-            cal.setTime(dataStart);
-            cal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequencyAsInt());
-            dataStart.setTime(cal.getTimeInMillis());
-            if (random.nextBoolean()) {
-                OutputStream out = fs.create(new Path(path, "file"));
-                out.write("Hello World\n".getBytes());
-                out.close();
-                instance.setSize(12);
-                if (feed.getAvailabilityFlag() == null
-                        || (feed.getAvailabilityFlag() != null && random.nextBoolean())) {
-                    //If availability is not present or if ok to create availability file, mark as available
-                    instance.setStatus(FeedInstanceStatus.AvailabilityStatus.AVAILABLE);
-                    if (feed.getAvailabilityFlag() != null) {
+        for (Cluster cluster : feed.getClusters().getClusters()) {
+            Date dataStart = EntityUtil.getNextStartTime(cluster.getValidity().getStart(),
+                    feed.getFrequency(), tz, new Date(start.getTime()));
+            String clusterLocationPath = null;
+            if (cluster.getLocations() != null && cluster.getLocations().getLocations().get(0).getPath() != null) {
+                basePath = clusterLocationPath == null ? basePath : clusterLocationPath;
+            }
+            Date dataEnd = new Date(end.getTime());
+            while (dataStart.before(dataEnd)) {
+                Properties properties = ExpressionHelper.getTimeVariables(dataStart, tz);
+                String path = ExpressionHelper.substitute(basePath, properties);
+                FeedInstanceStatus instance = new FeedInstanceStatus(path);
+                instance.setStatus(FeedInstanceStatus.AvailabilityStatus.MISSING);
+                instance.setSize(-1);
+                instance.setCreationTime(0);
+                Date date = FeedHelper.getDate(basePath, new Path(path), tz);
+                instance.setInstance(SchemaHelper.formatDateUTC(date));
+                Calendar cal = Calendar.getInstance();
+                cal.setTime(dataStart);
+                cal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequencyAsInt());
+                dataStart.setTime(cal.getTimeInMillis());
+                if (random.nextBoolean()) {
+                    OutputStream out = fs.create(new Path(path, "file"));
+                    out.write("Hello World\n".getBytes());
+                    out.close();
+                    instance.setSize(12);
+                    if (feed.getAvailabilityFlag() == null
+                            || (feed.getAvailabilityFlag() != null && random.nextBoolean())) {
+                        //If availability is not present or if ok to create availability file, mark as available
+                        instance.setStatus(FeedInstanceStatus.AvailabilityStatus.AVAILABLE);
+                        if (feed.getAvailabilityFlag() != null) {
+                            fs.create(new Path(path, feed.getAvailabilityFlag())).close();
+                        }
+                    } else if (feed.getAvailabilityFlag() != null) {
+                        //If availability is present or not ok to create availability file, mark as partial
+                        fs.mkdirs(new Path(path));
+                        instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL);
+                    }
+                } else {
+                    if (feed.getAvailabilityFlag() == null && random.nextBoolean()) {
+                        //If availability is not present or ok to create dir, mark as empty
+                        fs.mkdirs(new Path(path));
+                        instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY);
+                        instance.setSize(0);
+                    } else if (feed.getAvailabilityFlag() != null && random.nextBoolean()) {
+                        //If availability is present and ok to create dir, mark as partial
+                        fs.mkdirs(new Path(path));
+                        instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL);
+                    } else if (feed.getAvailabilityFlag() != null) {
+                        //If availability is present and ok to create empty instance
                         fs.create(new Path(path, feed.getAvailabilityFlag())).close();
+                        instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY);
+                        instance.setSize(0);
                     }
-                } else if (feed.getAvailabilityFlag() != null) {
-                    //If availability is present or not ok to create availability file, mark as partial
-                    fs.mkdirs(new Path(path));
-                    instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL);
                 }
-            } else {
-                if (feed.getAvailabilityFlag() == null && random.nextBoolean()) {
-                    //If availability is not present or ok to create dir, mark as empty
-                    fs.mkdirs(new Path(path));
-                    instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY);
-                    instance.setSize(0);
-                } else if (feed.getAvailabilityFlag() != null && random.nextBoolean()) {
-                    //If availability is present and ok to create dir, mark as partial
-                    fs.mkdirs(new Path(path));
-                    instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL);
-                } else if (feed.getAvailabilityFlag() != null)  {
-                    //If availability is present and ok to create empty instance
-                    fs.create(new Path(path, feed.getAvailabilityFlag())).close();
-                    instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY);
-                    instance.setSize(0);
+                try {
+                    FileStatus fileStatus = fs.getFileStatus(new Path(path));
+                    instance.setCreationTime(fileStatus.getModificationTime());
+                } catch (IOException e) {
+                    //ignore
                 }
+                instances.add(instance);
             }
-            try {
-                FileStatus fileStatus = fs.getFileStatus(new Path(path));
-                instance.setCreationTime(fileStatus.getModificationTime());
-            } catch (IOException e) {
-                //ignore
-            }
-            instances.add(instance);
         }
         return instances;
     }
@@ -518,12 +567,12 @@ public class FileSystemStorageTest {
         feed.setLocations(new Locations());
         Location dataLocation = new Location();
         feed.getLocations().getLocations().add(dataLocation);
-        dataLocation.setPath("/TestFeedListing/data/${YEAR}/${MONTH}/${DAY}"
+        dataLocation.setPath("/" + TEST_FEED_LISTING + "/data/${YEAR}/${MONTH}/${DAY}"
                 + (frequency.getTimeUnit() == Frequency.TimeUnit.hours ? "/${HOUR}" : "") + "/MORE");
         dataLocation.setType(LocationType.DATA);
         feed.setClusters(new Clusters());
         Cluster cluster = new Cluster();
-        cluster.setName("TestFeedListing");
+        cluster.setName(TEST_FEED_LISTING);
         feed.getClusters().getClusters().add(cluster);
         Validity validity = new Validity();
         cluster.setValidity(validity);