You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/07/13 08:50:16 UTC
falcon git commit: FALCON-1311 Instance dependency API produces
inconsistent results in some scenarios. Contributed by Pragya Mittal
Repository: falcon
Updated Branches:
refs/heads/master e5698fad3 -> b36a82394
FALCON-1311 Instance dependency API produces inconsistent results in some scenarios. Contributed by Pragya Mittal
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/b36a8239
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/b36a8239
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/b36a8239
Branch: refs/heads/master
Commit: b36a82394998697934f282be22714222e4b2a39d
Parents: e5698fa
Author: Ajay Yadava <aj...@gmail.com>
Authored: Mon Jul 13 12:19:53 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Jul 13 12:19:53 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../java/org/apache/falcon/entity/EntityUtil.java | 17 +++++++++++++++++
.../java/org/apache/falcon/entity/FeedHelper.java | 12 +++++++++---
.../org/apache/falcon/entity/FeedHelperTest.java | 16 ++++++++++++++++
4 files changed, 44 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/b36a8239/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 132e064..e844a60 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -53,6 +53,8 @@ Trunk (Unreleased)
(Suhas Vasu)
BUG FIXES
+ FALCON-1311 Instance dependency API produces inconsistent results in some scenarios(Pragya Mittal via Ajay Yadava)
+
FALCON-1268 Instance Dependency API failure message is not intuitive in distributed mode (Ajay Yadava)
FALCON-1260 Instance dependency API produces incorrect results (Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/b36a8239/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 63dfb9d..25d9008 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -259,6 +259,23 @@ public final class EntityUtil {
return feed.getTimezone();
}
+ /**
+ * Returns true if the given instanceTime is a valid instanceTime on the basis of startTime and frequency of an
+ * entity.
+ *
+ * It doesn't check the instanceTime being after the validity of entity.
+ * @param startTime startTime of the entity
+ * @param frequency frequency of the entity.
+ * @param timezone timezone of the entity.
+ * @param instanceTime instanceTime to be checked for validity
+ * @return
+ */
+ public static boolean isValidInstanceTime(Date startTime, Frequency frequency, TimeZone timezone,
+ Date instanceTime) {
+ Date next = getNextStartTime(startTime, frequency, timezone, instanceTime);
+ return next.equals(instanceTime);
+ }
+
public static Date getNextStartTime(Date startTime, Frequency frequency, TimeZone timezone, Date referenceTime) {
if (startTime.after(referenceTime)) {
return startTime;
http://git-wip-us.apache.org/repos/asf/falcon/blob/b36a8239/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index eadc8d6..bb31de8 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -26,6 +26,7 @@ import org.apache.falcon.Tag;
import org.apache.falcon.entity.common.FeedDataPath;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Cluster;
@@ -475,7 +476,7 @@ public final class FeedHelper {
* @return returns the instance of the process which produces the given feed
*/
public static SchedulableEntityInstance getProducerInstance(Feed feed, Date feedInstanceTime,
- org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
+ org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
//validate the inputs
validateFeedInstance(feed, feedInstanceTime, cluster);
@@ -485,16 +486,21 @@ public final class FeedHelper {
cluster.getName());
Date pStart = processCluster.getValidity().getStart();
Date pEnd = processCluster.getValidity().getEnd();
+ Frequency pFrequency = process.getFrequency();
+ TimeZone pTz = process.getTimezone();
+
try {
Date processInstanceTime = getProducerInstanceTime(feed, feedInstanceTime, process, cluster);
- if (processInstanceTime.before(pStart) || !processInstanceTime.before(pEnd)) {
+ boolean isValid = EntityUtil.isValidInstanceTime(pStart, pFrequency, pTz, processInstanceTime);
+ if (processInstanceTime.before(pStart) || !processInstanceTime.before(pEnd) || !isValid){
return null;
}
+
SchedulableEntityInstance producer = new SchedulableEntityInstance(process.getName(), cluster.getName(),
processInstanceTime, EntityType.PROCESS);
producer.setTags(SchedulableEntityInstance.OUTPUT);
return producer;
- } catch (FalconException e) {
+ } catch (FalconException | IllegalArgumentException e) {
LOG.error("Error in trying to get producer process: {}'s instance time for feed: {}'s instance: } "
+ " on cluster:{}", process.getName(), feed.getName(), feedInstanceTime, cluster.getName());
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/b36a8239/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
index b15f023..c70cfcc 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
@@ -127,6 +127,22 @@ public class FeedHelperTest extends AbstractTestBase {
}
@Test
+ public void testInvalidProducerInstance() throws Exception {
+ Cluster cluster = publishCluster();
+ Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
+ Outputs outputs = new Outputs();
+ Output outFeed = new Output();
+ outFeed.setName("outputFeed");
+ outFeed.setFeed(feed.getName());
+ outFeed.setInstance("now(0,0)");
+ outputs.getOutputs().add(outFeed);
+ process.setOutputs(outputs);
+ store.publish(EntityType.PROCESS, process);
+ Assert.assertNull(FeedHelper.getProducerInstance(feed, getDate("2012-02-28 10:40 UTC"), cluster));
+ }
+
+ @Test
public void testGetProducerOutOfValidity() throws FalconException, ParseException {
Cluster cluster = publishCluster();
Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");