You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/08/17 03:57:37 UTC
falcon git commit: FALCON-2096 Feed instance listing doesn't give
instance status of all the clusters
Repository: falcon
Updated Branches:
refs/heads/master 2bea7f474 -> 7354f870b
FALCON-2096 Feed instance listing doesn't give instance status of all the clusters
Author: sandeep <sa...@gmail.com>
Reviewers: @pallavi-rao, @PraveenAdlakha
Closes #261 from sandeepSamudrala/FALCON-2096 and squashes the following commits:
c9030e6 [sandeep] FALCON-2096. Fixed checkstyle issues
4c9c655 [sandeep] FALCON-2096. Incorporated review comments. Extracted frequently used constants
9f0d751 [sandeep] Falcon 2096. Feed instance listing doesn't give instance status of all the clusters
97d2a8f [sandeep] Falcon 2096. Feed instance listing doesn't give instance status of all the clusters
d853c76 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2096
fa8f8ad [sandeep] FALCON-2096. Feed instance listing doesn't give instance status of all the clusters.
89def80 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2096
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7354f870
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7354f870
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7354f870
Branch: refs/heads/master
Commit: 7354f870b2bff7b5f0451a792cb06d2eee00ff38
Parents: 2bea7f4
Author: sandeep <sa...@gmail.com>
Authored: Wed Aug 17 09:27:31 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Wed Aug 17 09:27:31 2016 +0530
----------------------------------------------------------------------
.../org/apache/falcon/entity/FeedHelper.java | 9 +-
.../falcon/entity/FileSystemStorageTest.java | 163 ++++++++++++-------
2 files changed, 111 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/7354f870/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index ea34d34..757359f 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -803,12 +803,11 @@ public final class FeedHelper {
Date start, Date end) throws FalconException {
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entityObject);
FeedInstanceResult result = new FeedInstanceResult(APIResult.Status.SUCCEEDED, "Success");
+ List<FeedInstanceResult.Instance> allInstances = new ArrayList<FeedInstanceResult.Instance>();
for (String cluster : clusters) {
Feed feed = (Feed) entityObject;
Storage storage = createStorage(cluster, feed);
List<FeedInstanceStatus> feedListing = storage.getListing(feed, cluster, LocationType.DATA, start, end);
- FeedInstanceResult.Instance[] instances = new FeedInstanceResult.Instance[feedListing.size()];
- int index = 0;
for (FeedInstanceStatus feedStatus : feedListing) {
FeedInstanceResult.Instance instance = new
FeedInstanceResult.Instance(cluster, feedStatus.getInstance(),
@@ -817,10 +816,12 @@ public final class FeedHelper {
instance.uri = feedStatus.getUri();
instance.size = feedStatus.getSize();
instance.sizeH = feedStatus.getSizeH();
- instances[index++] = instance;
+ allInstances.add(instance);
}
- result.setInstances(instances);
}
+ FeedInstanceResult.Instance[] resultInstances = allInstances.toArray(
+ new FeedInstanceResult.Instance[allInstances.size()]);
+ result.setInstances(resultInstances);
return result;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7354f870/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index 30edd94..98b0f8e 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -34,6 +34,7 @@ import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.feed.Validity;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.util.FalconTestUtil;
import org.apache.hadoop.fs.FileStatus;
@@ -60,6 +61,8 @@ import java.util.TimeZone;
public class FileSystemStorageTest {
private static final String USER = FalconTestUtil.TEST_USER_1;
+ private static final String TEST_FEED_LISTING = "TestFeedListing";
+ private static final String TEST_FEED_INSTANCE_LISTING = "TestFeedInstanceListing";
@BeforeClass
public void setUp() {
@@ -424,7 +427,7 @@ public class FileSystemStorageTest {
@Test (dataProvider = "testListingDataProvider")
public void testListing(String availabilityFlag, Frequency frequency, TimeZone timeZone,
Date start, Date end) throws Exception {
- EmbeddedCluster cluster = EmbeddedCluster.newCluster("TestFeedListing", false);
+ EmbeddedCluster cluster = EmbeddedCluster.newCluster(TEST_FEED_LISTING, false);
FileSystem fs = cluster.getFileSystem();
ConfigurationStore.get().publish(EntityType.CLUSTER, cluster.getCluster());
try {
@@ -433,79 +436,125 @@ public class FileSystemStorageTest {
FileSystemStorage fileSystemStorage = new FileSystemStorage(cluster.getFileSystem().
getUri().toString(), feed.getLocations());
List<FeedInstanceStatus> actual = fileSystemStorage.
- getListing(feed, "TestFeedListing", LocationType.DATA, start, end);
+ getListing(feed, TEST_FEED_LISTING, LocationType.DATA, start, end);
Assert.assertEquals(actual, expected, "Feed instance Listings doesn't match");
} finally {
ConfigurationStore.get().remove(EntityType.CLUSTER, cluster.getCluster().getName());
}
}
+ @Test (dataProvider = "testListingDataProvider")
+ public void testInstanceListing(String availabilityFlag, Frequency frequency, TimeZone timeZone,
+ Date start, Date end) throws Exception {
+ EmbeddedCluster firstCluster = EmbeddedCluster.newCluster(TEST_FEED_LISTING, false);
+ FileSystem fs = firstCluster.getFileSystem();
+ ConfigurationStore.get().publish(EntityType.CLUSTER, firstCluster.getCluster());
+
+
+ EmbeddedCluster secondCluster = EmbeddedCluster.newCluster(TEST_FEED_INSTANCE_LISTING, false);
+ ConfigurationStore.get().publish(EntityType.CLUSTER, secondCluster.getCluster());
+
+ try {
+ Feed feed = getFeed(availabilityFlag, frequency, timeZone);
+ Cluster cluster = new Cluster();
+ cluster.setName(TEST_FEED_INSTANCE_LISTING);
+ feed.getClusters().getClusters().add(cluster);
+ Validity validity = new Validity();
+ cluster.setValidity(validity);
+ validity.setStart(new Date(System.currentTimeMillis() - (1000L * 24 * 3600000)));
+ validity.setEnd(new Date(System.currentTimeMillis() - (1000L * 21 * 3600000)));
+ Locations locations = new Locations();
+ Location dataLocation = new Location();
+ dataLocation.setPath("/TestFeedInstanceListing/data/${YEAR}/${MONTH}/${DAY}"
+ + (frequency.getTimeUnit() == Frequency.TimeUnit.hours ? "/${HOUR}" : "") + "/MORE");
+ dataLocation.setType(LocationType.DATA);
+ locations.getLocations().add(dataLocation);
+ cluster.setLocations(locations);
+
+ List<FeedInstanceStatus> expected = prepareData(fs, feed, start, end);
+
+ FeedInstanceResult actual = FeedHelper.getFeedInstanceListing(feed, start, end);
+ Assert.assertEquals(actual.getInstances().length, expected.size());
+ } finally {
+ ConfigurationStore.get().remove(EntityType.CLUSTER, firstCluster.getCluster().getName());
+ ConfigurationStore.get().remove(EntityType.CLUSTER, secondCluster.getCluster().getName());
+ }
+
+
+ }
+
@SuppressWarnings("MagicConstant")
private List<FeedInstanceStatus> prepareData(FileSystem fs, Feed feed,
Date start, Date end) throws Exception {
- fs.delete(new Path("/TestFeedListing"), true);
+ fs.delete(new Path("/" + TEST_FEED_LISTING), true);
Random random = new Random();
List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>();
String basePath = feed.getLocations().getLocations().get(0).getPath();
Frequency frequency = feed.getFrequency();
TimeZone tz = feed.getTimezone();
- Date dataStart = EntityUtil.getNextStartTime(feed.getClusters().getClusters().get(0).getValidity().getStart(),
- feed.getFrequency(), tz, new Date(start.getTime()));
- Date dataEnd = new Date(end.getTime());
- while (dataStart.before(dataEnd)) {
- Properties properties = ExpressionHelper.getTimeVariables(dataStart, tz);
- String path = ExpressionHelper.substitute(basePath, properties);
- FeedInstanceStatus instance = new FeedInstanceStatus(path);
- instance.setStatus(FeedInstanceStatus.AvailabilityStatus.MISSING);
- instance.setSize(-1);
- instance.setCreationTime(0);
- Date date = FeedHelper.getDate(basePath, new Path(path), tz);
- instance.setInstance(SchemaHelper.formatDateUTC(date));
- Calendar cal = Calendar.getInstance();
- cal.setTime(dataStart);
- cal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequencyAsInt());
- dataStart.setTime(cal.getTimeInMillis());
- if (random.nextBoolean()) {
- OutputStream out = fs.create(new Path(path, "file"));
- out.write("Hello World\n".getBytes());
- out.close();
- instance.setSize(12);
- if (feed.getAvailabilityFlag() == null
- || (feed.getAvailabilityFlag() != null && random.nextBoolean())) {
- //If availability is not present or if ok to create availability file, mark as available
- instance.setStatus(FeedInstanceStatus.AvailabilityStatus.AVAILABLE);
- if (feed.getAvailabilityFlag() != null) {
+ for (Cluster cluster : feed.getClusters().getClusters()) {
+ Date dataStart = EntityUtil.getNextStartTime(cluster.getValidity().getStart(),
+ feed.getFrequency(), tz, new Date(start.getTime()));
+ String clusterLocationPath = null;
+ if (cluster.getLocations() != null && cluster.getLocations().getLocations().get(0).getPath() != null) {
+ basePath = clusterLocationPath == null ? basePath : clusterLocationPath;
+ }
+ Date dataEnd = new Date(end.getTime());
+ while (dataStart.before(dataEnd)) {
+ Properties properties = ExpressionHelper.getTimeVariables(dataStart, tz);
+ String path = ExpressionHelper.substitute(basePath, properties);
+ FeedInstanceStatus instance = new FeedInstanceStatus(path);
+ instance.setStatus(FeedInstanceStatus.AvailabilityStatus.MISSING);
+ instance.setSize(-1);
+ instance.setCreationTime(0);
+ Date date = FeedHelper.getDate(basePath, new Path(path), tz);
+ instance.setInstance(SchemaHelper.formatDateUTC(date));
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(dataStart);
+ cal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequencyAsInt());
+ dataStart.setTime(cal.getTimeInMillis());
+ if (random.nextBoolean()) {
+ OutputStream out = fs.create(new Path(path, "file"));
+ out.write("Hello World\n".getBytes());
+ out.close();
+ instance.setSize(12);
+ if (feed.getAvailabilityFlag() == null
+ || (feed.getAvailabilityFlag() != null && random.nextBoolean())) {
+ //If availability is not present or if ok to create availability file, mark as available
+ instance.setStatus(FeedInstanceStatus.AvailabilityStatus.AVAILABLE);
+ if (feed.getAvailabilityFlag() != null) {
+ fs.create(new Path(path, feed.getAvailabilityFlag())).close();
+ }
+ } else if (feed.getAvailabilityFlag() != null) {
+ //If availability is present or not ok to create availability file, mark as partial
+ fs.mkdirs(new Path(path));
+ instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL);
+ }
+ } else {
+ if (feed.getAvailabilityFlag() == null && random.nextBoolean()) {
+ //If availability is not present or ok to create dir, mark as empty
+ fs.mkdirs(new Path(path));
+ instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY);
+ instance.setSize(0);
+ } else if (feed.getAvailabilityFlag() != null && random.nextBoolean()) {
+ //If availability is present and ok to create dir, mark as partial
+ fs.mkdirs(new Path(path));
+ instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL);
+ } else if (feed.getAvailabilityFlag() != null) {
+ //If availability is present and ok to create empty instance
fs.create(new Path(path, feed.getAvailabilityFlag())).close();
+ instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY);
+ instance.setSize(0);
}
- } else if (feed.getAvailabilityFlag() != null) {
- //If availability is present or not ok to create availability file, mark as partial
- fs.mkdirs(new Path(path));
- instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL);
}
- } else {
- if (feed.getAvailabilityFlag() == null && random.nextBoolean()) {
- //If availability is not present or ok to create dir, mark as empty
- fs.mkdirs(new Path(path));
- instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY);
- instance.setSize(0);
- } else if (feed.getAvailabilityFlag() != null && random.nextBoolean()) {
- //If availability is present and ok to create dir, mark as partial
- fs.mkdirs(new Path(path));
- instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL);
- } else if (feed.getAvailabilityFlag() != null) {
- //If availability is present and ok to create empty instance
- fs.create(new Path(path, feed.getAvailabilityFlag())).close();
- instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY);
- instance.setSize(0);
+ try {
+ FileStatus fileStatus = fs.getFileStatus(new Path(path));
+ instance.setCreationTime(fileStatus.getModificationTime());
+ } catch (IOException e) {
+ //ignore
}
+ instances.add(instance);
}
- try {
- FileStatus fileStatus = fs.getFileStatus(new Path(path));
- instance.setCreationTime(fileStatus.getModificationTime());
- } catch (IOException e) {
- //ignore
- }
- instances.add(instance);
}
return instances;
}
@@ -518,12 +567,12 @@ public class FileSystemStorageTest {
feed.setLocations(new Locations());
Location dataLocation = new Location();
feed.getLocations().getLocations().add(dataLocation);
- dataLocation.setPath("/TestFeedListing/data/${YEAR}/${MONTH}/${DAY}"
+ dataLocation.setPath("/" + TEST_FEED_LISTING + "/data/${YEAR}/${MONTH}/${DAY}"
+ (frequency.getTimeUnit() == Frequency.TimeUnit.hours ? "/${HOUR}" : "") + "/MORE");
dataLocation.setType(LocationType.DATA);
feed.setClusters(new Clusters());
Cluster cluster = new Cluster();
- cluster.setName("TestFeedListing");
+ cluster.setName(TEST_FEED_LISTING);
feed.getClusters().getClusters().add(cluster);
Validity validity = new Validity();
cluster.setValidity(validity);