You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/01/06 11:53:14 UTC

[1/9] falcon git commit: FALCON-1709 FIFO order is not followed when scheduled using native scheduler (Pallavi Rao)

Repository: falcon
Updated Branches:
  refs/heads/0.9 dfb85abdd -> 092f70a9d


FALCON-1709 FIFO order is not followed when scheduled using native scheduler (Pallavi Rao)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/20bacfa1
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/20bacfa1
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/20bacfa1

Branch: refs/heads/0.9
Commit: 20bacfa134c7eb89b030dff22a5ba8aff0b4636b
Parents: c81db67
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Mon Jan 4 14:40:08 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Mon Jan 4 14:40:08 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                               |  2 ++
 .../notification/service/impl/SchedulerService.java       | 10 ++++++++++
 2 files changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/20bacfa1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 018a6db..4adbdef 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -90,6 +90,8 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1709 FIFO order is not followed when scheduled using native scheduler (Pallavi Rao)
+
     FALCON-1711 DependencyInstance twiki does not contain correct documentation	(Praveen Adlakha via Pallavi Rao)
 
     FALCON-1702 Exception thrown by workflow status listeners on success notification(Pallavi Rao via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/20bacfa1/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
index 57a41c8..f5a7c86 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
@@ -101,6 +101,8 @@ public class SchedulerService implements FalconNotificationService, Notification
         if (obj != null) {
             instancesToIgnore.invalidate(request.getInstance().getId());
         }
+        LOG.debug("Received request to schedule instance {} with sequence {}.", request.getInstance().getId(),
+                request.getInstance().getInstanceSequence());
         runQueue.execute(new InstanceRunner(request));
     }
 
@@ -281,6 +283,10 @@ public class SchedulerService implements FalconNotificationService, Notification
             }
         }
 
+        public ExecutionInstance getInstance() {
+            return instance;
+        }
+
         @Override
         public void run() {
             try {
@@ -387,6 +393,10 @@ public class SchedulerService implements FalconNotificationService, Notification
     private static class PriorityComparator<T extends InstanceRunner> implements Comparator<T>, Serializable {
         @Override
         public int compare(T o1, T o2) {
+            // If both instances have same priority, go by instance sequence.
+            if (o1.getPriority() == o2.getPriority()) {
+                return o1.getInstance().getInstanceSequence() - o2.getInstance().getInstanceSequence();
+            }
             return o1.getPriority() - o2.getPriority();
         }
     }


[3/9] falcon git commit: FALCON-1697 Stabilization of scenarios which are based on instances lifecycle. Contributed by Paul Isaychuk.

Posted by pa...@apache.org.
FALCON-1697 Stabilization of scenarios which are based on instances lifecycle. Contributed by Paul Isaychuk.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/1e3dcb73
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/1e3dcb73
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/1e3dcb73

Branch: refs/heads/0.9
Commit: 1e3dcb73fd30b93332ae81a075a7cecbc3dd3508
Parents: 6d0119b
Author: Ajay Yadava <aj...@gmail.com>
Authored: Mon Jan 4 18:16:47 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Jan 4 18:16:47 2016 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   2 +
 .../regression/core/util/InstanceUtil.java      |   8 +-
 .../regression/ProcessInstanceStatusTest.java   |   2 +
 .../regression/ProcessInstanceSuspendTest.java  |   7 +-
 .../lineage/ListFeedInstancesTest.java          | 159 ++++++++++---------
 .../lineage/ListProcessInstancesTest.java       |  40 +++--
 .../regression/searchUI/EntityPageTest.java     |   8 +-
 7 files changed, 124 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/1e3dcb73/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 19dc351..c083c2c 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,6 +5,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+   FALCON-1697 Stabilization of scenarios which are based on instances lifecycle(Paul Isaychuk via Ajay Yadava)
+
    FALCON-1700 Add new test cases to HiveDRTest(Paul Isaychuk & Murali Ramasami via Ajay Yadava)
 
    FALCON-1689 NoOutputProcessTest fails due to scheme missing in workflow.xml(Pragya Mittal)

http://git-wip-us.apache.org/repos/asf/falcon/blob/1e3dcb73/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
index 3d05ae9..b07e275 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
@@ -73,8 +73,8 @@ public final class InstanceUtil {
 
     public static final int INSTANCES_CREATED_TIMEOUT = OSUtil.IS_WINDOWS ? 20 : 10;
     private static final Logger LOGGER = Logger.getLogger(InstanceUtil.class);
-    private static final EnumSet<Status> RUNNING_PREP_SUCCEEDED = EnumSet.of(Status.RUNNING,
-        Status.PREP, Status.SUCCEEDED);
+    private static final EnumSet<Status> LIVE_STATUS = EnumSet.of(Status.RUNNING,
+        Status.PREP, Status.SUCCEEDED, Status.SUSPENDED);
 
     private InstanceUtil() {
         throw new AssertionError("Instantiating utility class...");
@@ -573,7 +573,7 @@ public final class InstanceUtil {
         for (String bundleId : bundleIds) {
             LOGGER.info(String.format("Using bundle %s", bundleId));
             final Status status = client.getBundleJobInfo(bundleId).getStatus();
-            Assert.assertTrue(RUNNING_PREP_SUCCEEDED.contains(status),
+            Assert.assertTrue(LIVE_STATUS.contains(status),
                 String.format("Bundle job %s is should be prep/running but is %s", bundleId, status));
             OozieUtil.waitForCoordinatorJobCreation(client, bundleId);
             List<CoordinatorJob> coords = client.getBundleJobInfo(bundleId).getCoordinators();
@@ -607,7 +607,7 @@ public final class InstanceUtil {
             CoordinatorJob coordinatorJob = client.getCoordJobInfo(coordId);
             final Status coordinatorStatus = coordinatorJob.getStatus();
             if (expectedStatus != CoordinatorAction.Status.TIMEDOUT){
-                Assert.assertTrue(RUNNING_PREP_SUCCEEDED.contains(coordinatorStatus),
+                Assert.assertTrue(LIVE_STATUS.contains(coordinatorStatus),
                         String.format("Coordinator %s should be running/prep but is %s.", coordId, coordinatorStatus));
             }
             List<CoordinatorAction> coordinatorActions = coordinatorJob.getActions();

http://git-wip-us.apache.org/repos/asf/falcon/blob/1e3dcb73/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
index 6493133..14ecfe4 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
@@ -339,6 +339,8 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
             Status.RUNNING, EntityType.PROCESS, 5);
         AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(process));
         AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.SUSPENDED);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, Status.SUSPENDED, EntityType.PROCESS, 3);
+        TimeUtil.sleepSeconds(TIMEOUT);
         AssertUtil.assertSucceeded(prism.getProcessHelper().resume(process));
         TimeUtil.sleepSeconds(TIMEOUT);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,

http://git-wip-us.apache.org/repos/asf/falcon/blob/1e3dcb73/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
index f673314..4a27a0a 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
@@ -63,8 +63,9 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
         bundles[0] = new Bundle(bundles[0], cluster);
         bundles[0].generateUniqueBundle(this);
         bundles[0].setInputFeedDataPath(feedInputPath);
-        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+        bundles[0].setInputFeedPeriodicity(5, TimeUnit.minutes);
         bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         processName = bundles[0].getProcessName();
     }
@@ -171,7 +172,9 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
         bundles[0].setProcessConcurrency(3);
         bundles[0].submitFeedsScheduleProcess(prism);
         InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
-        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0);
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 1);
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 2);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
         InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName,

http://git-wip-us.apache.org/repos/asf/falcon/blob/1e3dcb73/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
index 7ad4c8e..17725ae 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
@@ -23,7 +23,6 @@ import org.apache.falcon.entity.v0.feed.ActionType;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.regression.Entities.FeedMerlin;
 import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
 import org.apache.falcon.regression.core.util.AssertUtil;
 import org.apache.falcon.regression.core.util.BundleUtil;
 import org.apache.falcon.regression.core.util.HadoopUtil;
@@ -42,8 +41,9 @@ import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.OozieClientException;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import org.testng.asserts.SoftAssert;
 
@@ -55,12 +55,13 @@ import java.util.Date;
 import java.util.List;
 
 /**
- * Testing the list instances api for feed.
+ * Testing the list instances api for feed. Testing is based on initial scenario and sets of
+ * expected instance statuses which are being compared with actual result of -list request
+ * with different parameters in different order, variation, etc.
  */
 @Test(groups = "embedded")
 public class ListFeedInstancesTest extends BaseTestClass {
     private static final Logger LOGGER = Logger.getLogger(ListFeedInstancesTest.class);
-    private ColoHelper cluster2 = servers.get(1);
     private OozieClient cluster2OC = serverOC.get(1);
     private String baseTestHDFSDir = cleanAndGetTestDir();
     private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
@@ -68,7 +69,8 @@ public class ListFeedInstancesTest extends BaseTestClass {
     private String feedDataLocation = sourcePath + MINUTE_DATE_PATTERN;
     private String targetPath = baseTestHDFSDir + "/target";
     private String targetDataLocation = targetPath + MINUTE_DATE_PATTERN;
-    private String startTime, endTime;
+    private final String startTime = "2010-01-02T00:00Z";
+    private final String endTime = "2010-01-02T00:57Z";
     private String feedName;
 
     @BeforeClass(alwaysRun = true)
@@ -76,23 +78,21 @@ public class ListFeedInstancesTest extends BaseTestClass {
         throws IOException, OozieClientException, JAXBException, AuthenticationException,
         URISyntaxException, InterruptedException {
         uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-        startTime = TimeUtil.getTimeWrtSystemTime(-55);
-        endTime = TimeUtil.getTimeWrtSystemTime(3);
-        LOGGER.info("Time range is between : " + startTime + " and " + endTime);
         Bundle bundle = BundleUtil.readELBundle();
         for (int i = 0; i < 2; i++) {
             bundles[i] = new Bundle(bundle, servers.get(i));
             bundles[i].generateUniqueBundle(this);
         }
-        prepareScenario();
     }
 
     /*
      * Prepares running feed with instances ordered (desc): 1 waiting, 1 running, 1 suspended,
-     * 3 waiting and 6 killed. Testing is based on expected instances statuses.
+     * 5 waiting, 2 killed, 2 waiting. Testing is based on expected sets of instance statuses.
+     * Variety of instance statuses increases accuracy of testing.
      */
+    @BeforeMethod(alwaysRun = true)
     private void prepareScenario() throws AuthenticationException, IOException, URISyntaxException,
-            JAXBException, OozieClientException, InterruptedException {
+        JAXBException, OozieClientException, InterruptedException {
         bundles[0].setInputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
         bundles[0].setInputFeedDataPath(feedDataLocation);
         String feed = bundles[0].getInputFeedFromBundle();
@@ -127,23 +127,23 @@ public class ListFeedInstancesTest extends BaseTestClass {
         InstanceUtil.waitTillInstanceReachState(cluster2OC, feedName, 12,
             CoordinatorAction.Status.WAITING, EntityType.FEED);
 
-        //retrieve specific instances to rule them directly
+        //retrieve instances to rule them directly
         List<CoordinatorAction> actions = getReplicationInstances(cluster2OC, feedName);
         LOGGER.info(actions);
         Assert.assertNotNull(actions, "Required coordinator not found.");
         Assert.assertEquals(actions.size(), 12, "Unexpected number of actions.");
 
-        //killing first 6 instances
+        //killing the 3d and the 4th instances
         String range;
         InstancesResult r;
-        for (int i = 0; i < 6; i++) {
+        for (int i = 2; i <= 3; i++) {
             HadoopUtil.createFolders(serverFS.get(0), "", Arrays.asList(actions.get(i)
                 .getMissingDependencies().split("#")));
             //only running instance can be killed, so we should make it running and then kill it
             InstanceUtil.waitTillInstanceReachState(cluster2OC, feedName, 1,
                 CoordinatorAction.Status.RUNNING, EntityType.FEED, 3);
-            range = "?start=" + TimeUtil.addMinsToTime(startTime, i * 5 - 1)
-                + "&end=" + TimeUtil.addMinsToTime(startTime, i * 5 + 1);
+            range = "?start=" + TimeUtil.addMinsToTime(startTime, i * 5 - 2)
+                + "&end=" + TimeUtil.addMinsToTime(startTime, i * 5 + 2);
             r = prism.getFeedHelper().getProcessInstanceKill(feedName, range);
             InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1);
         }
@@ -166,11 +166,17 @@ public class ListFeedInstancesTest extends BaseTestClass {
         //check that the scenario works as expected.
         r = prism.getFeedHelper().getProcessInstanceStatus(feedName,
             "?start=" + startTime + "&numResults=12");
-        InstanceUtil.validateResponse(r, 12, 1, 1, 4, 6);
+        InstanceUtil.validateResponse(r, 12, 1, 1, 8, 2);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws IOException {
+        removeTestClassEntities();
+        HadoopUtil.deleteDirIfExists(sourcePath, serverFS.get(0));
     }
 
     /*
-     * Retrieves replication coordinator instances.
+     * Retrieves replication coordinator actions (replication instances).
      * @param client target oozie client
      * @param fName feed name
      */
@@ -204,7 +210,7 @@ public class ListFeedInstancesTest extends BaseTestClass {
         throws URISyntaxException, OozieClientException, JAXBException, AuthenticationException,
         IOException, InterruptedException {
         SoftAssert softAssert = new SoftAssert();
-        //orderBy start time
+        //orderBy start time, check an order
         InstancesResult r = prism.getFeedHelper().listInstances(feedName,
             "orderBy=startTime&sortOrder=desc", null);
         InstancesResult.Instance[] instances = r.getInstances();
@@ -217,10 +223,10 @@ public class ListFeedInstancesTest extends BaseTestClass {
                 previousDate = (Date) current.clone();
             }
         }
-        //orderBy status
+        //orderBy status, check an order
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + startTime + "&numResults=12&orderBy=status&sortOrder=desc", null);
-        InstanceUtil.validateResponse(r, 12, 1, 1, 4, 6);
+        InstanceUtil.validateResponse(r, 12, 1, 1, 8, 2);
         instances = r.getInstances();
         InstancesResult.WorkflowStatus previousStatus = InstancesResult.WorkflowStatus.WAITING;
         for (InstancesResult.Instance instance : instances) {
@@ -229,7 +235,7 @@ public class ListFeedInstancesTest extends BaseTestClass {
                 "Wrong order. Compared " + current + " and " + previousStatus + " statuses.");
             previousStatus = current;
         }
-        //sort by endTime
+        //sort by endTime, check an order
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + startTime + "&numResults=12&orderBy=endTime&sortOrder=desc", null);
         instances = r.getInstances();
@@ -246,7 +252,7 @@ public class ListFeedInstancesTest extends BaseTestClass {
     }
 
     /**
-     * Test the list feed instance api using start/end parameters. Check instances number.
+     * List instances through api using start/end parameters. Check a number of instances.
      */
     @Test
     public void testFeedStartEnd()
@@ -255,60 +261,64 @@ public class ListFeedInstancesTest extends BaseTestClass {
         //actual start/end values.
         InstancesResult r = prism.getFeedHelper().listInstances(feedName,
             "start=" + startTime + "&end=" + endTime, null);
-        InstanceUtil.validateResponse(r, 10, 1, 1, 4, 4);
+        InstanceUtil.validateResponse(r, 10, 1, 1, 6, 2);
 
-        //without params, the default start/end should be applied.
+        //without params, the default start/end should be applied. End is set to now,
+        // start is set to end - (10 * entityFrequency)
         r = prism.getFeedHelper().listInstances(feedName, null, null);
-        InstanceUtil.validateResponse(r, 10, 1, 1, 4, 4);
+        InstanceUtil.validateResponse(r, 10, 1, 1, 6, 2);
 
-        //increasing a -start, the -end stays the same.
+        //increasing -start, -end stays the same.
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + TimeUtil.addMinsToTime(startTime, 6)
                 + "&end=" + TimeUtil.addMinsToTime(endTime, -5), null);
-        InstanceUtil.validateResponse(r, 9, 1, 1, 3, 4);
+        InstanceUtil.validateResponse(r, 9, 1, 1, 5, 2);
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + TimeUtil.addMinsToTime(startTime, 11)
                 + "&end=" + TimeUtil.addMinsToTime(endTime, -5), null);
-        InstanceUtil.validateResponse(r, 8, 1, 1, 3, 3);
+        InstanceUtil.validateResponse(r, 8, 1, 1, 5, 1);
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + TimeUtil.addMinsToTime(startTime, 16)
                 + "&end=" + TimeUtil.addMinsToTime(endTime, -5), null);
-        InstanceUtil.validateResponse(r, 7, 1, 1, 3, 2);
+        InstanceUtil.validateResponse(r, 7, 1, 1, 5, 0);
 
-        //one instance between start/end, use instances with different statuses.
+        //one instance between start/end, killed instance
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + TimeUtil.addMinsToTime(startTime, 12)
                 + "&end=" + TimeUtil.addMinsToTime(startTime, 16), null);
         InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1);
+
+        //one instance between start/end, waiting instance
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + TimeUtil.addMinsToTime(endTime, -5) + "&end=" + endTime, null);
         InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0);
 
-        //only start, actual feed startTime, should get 1-10 instances(end is automatically set to freq*10).
+        //only start, actual feed startTime, should get 1-10 instances(end is automatically set to now).
         r = prism.getFeedHelper().listInstances(feedName, "start=" + startTime, null);
-        InstanceUtil.validateResponse(r, 10, 0, 1, 3, 6);
+        InstanceUtil.validateResponse(r, 10, 1, 1, 6, 2);
 
         //only start, greater then the actual startTime.
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + TimeUtil.addMinsToTime(startTime, 16), null);
-        InstanceUtil.validateResponse(r, 8, 1, 1, 4, 2);
+        InstanceUtil.validateResponse(r, 8, 1, 1, 6, 0);
 
         //only end, 1 instance is expected
         r = prism.getFeedHelper().listInstances(feedName,
             "end=" + TimeUtil.addMinsToTime(startTime, 4), null);
-        InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1);
+        InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0);
 
-        //only the end, 10 the most recent instances are expected
+        //only end, actual value, 10 the most recent instances are expected
         r = prism.getFeedHelper().listInstances(feedName, "end=" + endTime, null);
-        InstanceUtil.validateResponse(r, 10, 1, 1, 4, 4);
+        InstanceUtil.validateResponse(r, 10, 1, 1, 6, 2);
 
-        //only the end
+        //only end, first 6 instances
         r = prism.getFeedHelper().listInstances(feedName,
             "end=" + TimeUtil.addMinsToTime(endTime, -31), null);
-        InstanceUtil.validateResponse(r, 6, 0, 0, 0, 6);
+        InstanceUtil.validateResponse(r, 6, 0, 0, 4, 2);
+        //only end, first 8 instances
         r = prism.getFeedHelper().listInstances(feedName,
             "end=" + TimeUtil.addMinsToTime(endTime, -21), null);
-        InstanceUtil.validateResponse(r, 8, 0, 0, 2, 6);
+        InstanceUtil.validateResponse(r, 8, 0, 0, 6, 2);
     }
 
     /**
@@ -320,7 +330,7 @@ public class ListFeedInstancesTest extends BaseTestClass {
         throws URISyntaxException, IOException, AuthenticationException, InterruptedException {
         //check the default value of the numResults param. Expecting 10 instances.
         InstancesResult r = prism.getFeedHelper().listInstances(feedName, null, null);
-        InstanceUtil.validateResponse(r, 10, 1, 1, 4, 4);
+        InstanceUtil.validateResponse(r, 10, 1, 1, 6, 2);
 
         //changing a value to 6. 6 instances are expected
         r = prism.getFeedHelper().listInstances(feedName, "numResults=6", null);
@@ -328,17 +338,17 @@ public class ListFeedInstancesTest extends BaseTestClass {
 
         //use a start option without a numResults parameter. 10 instances are expected
         r = prism.getFeedHelper().listInstances(feedName, "start=" + startTime, null);
-        InstanceUtil.validateResponse(r, 10, 1, 1, 4, 4);
+        InstanceUtil.validateResponse(r, 10, 1, 1, 6, 2);
 
         //use a start option with a numResults value which is smaller then the default.
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + startTime + "&numResults=8", null);
-        InstanceUtil.validateResponse(r, 8, 1, 1, 4, 2);
+        InstanceUtil.validateResponse(r, 8, 1, 1, 6, 0);
 
         //use a start option with a numResults value greater then the default.
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + startTime + "&numResults=12", null);
-        InstanceUtil.validateResponse(r, 12, 1, 1, 4, 6);
+        InstanceUtil.validateResponse(r, 12, 1, 1, 8, 2);
 
         //get all instances
         InstancesResult.Instance[] allInstances = r.getInstances();
@@ -347,7 +357,7 @@ public class ListFeedInstancesTest extends BaseTestClass {
         int offset = 3;
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + startTime + "&offset=" + offset + "&numResults=12", null);
-        InstanceUtil.validateResponse(r, 9, 0, 0, 3, 6);
+        InstanceUtil.validateResponse(r, 9, 0, 0, 7, 2);
 
         //check that expected instances were retrieved
         InstancesResult.Instance[] instances = r.getInstances();
@@ -359,7 +369,7 @@ public class ListFeedInstancesTest extends BaseTestClass {
         offset = 6;
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + startTime + "&offset=" + offset + "&numResults=6", null);
-        InstanceUtil.validateResponse(r, 6, 0, 0, 0, 6);
+        InstanceUtil.validateResponse(r, 6, 0, 0, 4, 2);
 
         //check that expected instances are present in response
         instances = r.getInstances();
@@ -370,7 +380,7 @@ public class ListFeedInstancesTest extends BaseTestClass {
     }
 
     /**
-     * Test the list feed instances api using filterBy parameter.
+     * List feed instances with filterBy parameter.
      */
     @Test
     public void testFeedFilterBy()
@@ -380,18 +390,20 @@ public class ListFeedInstancesTest extends BaseTestClass {
         InstancesResult r = prism.getFeedHelper().listInstances(feedName,
             "filterBy=STATUS:RUNNING", null);
         InstanceUtil.validateResponse(r, 1, 1, 0, 0, 0);
+        //end is set to now (actual end), start is set to (end - (10 * entityFrequency))
+        //filtered range is from 3rd till 12th instance
         r = prism.getFeedHelper().listInstances(feedName, "filterBy=STATUS:WAITING", null);
-        InstanceUtil.validateResponse(r, 4, 0, 0, 4, 0);
+        InstanceUtil.validateResponse(r, 6, 0, 0, 6, 0);
 
         //get all instances.
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + startTime + "&numResults=12", null);
-        InstanceUtil.validateResponse(r, 12, 1, 1, 4, 6);
+        InstanceUtil.validateResponse(r, 12, 1, 1, 8, 2);
 
         //use different statuses, filterBy among all instances.
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + startTime + "&filterBy=STATUS:KILLED", null);
-        InstanceUtil.validateResponse(r, 6, 0, 0, 0, 6);
+        InstanceUtil.validateResponse(r, 2, 0, 0, 0, 2);
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + startTime + "&filterBy=STATUS:SUSPENDED", null);
         InstanceUtil.validateResponse(r, 1, 0, 1, 0, 0);
@@ -400,24 +412,17 @@ public class ListFeedInstancesTest extends BaseTestClass {
         InstanceUtil.validateResponse(r, 1, 1, 0, 0, 0);
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + startTime + "&filterBy=STATUS:WAITING", null);
-        InstanceUtil.validateResponse(r, 4, 0, 0, 4, 0);
+        InstanceUtil.validateResponse(r, 8, 0, 0, 8, 0);
 
         //use additional filters.
-        String sourceCluster = bundles[0].getClusterNames().get(0);
         String clusterName = bundles[1].getClusterNames().get(0);
         r = prism.getFeedHelper().listInstances(feedName,
             "start=" + startTime + "&filterBy=CLUSTER:" + clusterName, null);
-        InstanceUtil.validateResponse(r, 10, 1, 1, 4, 4);
-        r = prism.getFeedHelper().listInstances(feedName, "start=" + startTime + "&numResults=12"
-                + "&filterBy=SOURCECLUSTER:" + sourceCluster, null);
-        InstanceUtil.validateResponse(r, 8, 1, 1, 0, 6);
-        r = prism.getFeedHelper().listInstances(feedName,
-            "filterBy=SOURCECLUSTER:" + sourceCluster, null);
-        InstanceUtil.validateResponse(r, 6, 1, 1, 0, 4);
+        InstanceUtil.validateResponse(r, 10, 1, 1, 6, 2);
     }
 
     /**
-     * Test list feed instances using custom filter. Expecting list of feed instances which
+     * List feed instances using custom filter. Expecting list of feed instances which
      * satisfy custom filters.
      */
     @Test
@@ -427,33 +432,39 @@ public class ListFeedInstancesTest extends BaseTestClass {
         InstancesResult r = prism.getFeedHelper().listInstances(feedName, params, null);
         InstanceUtil.validateResponse(r, 1, 1, 0, 0, 0);
 
+        //expecting 0 instances, because RUNNING instance is out of range start + 10 instances
         params = "start=" + startTime + "&end=" + endTime + "&filterBy=status:RUNNING&offset=2";
         r = prism.getFeedHelper().listInstances(feedName, params, null);
         InstanceUtil.validateSuccessWOInstances(r);
 
+        //offset is absent that's why whole range is filtered
         params = "start=" + startTime + "&end=" + endTime + "&filterBy=status:WAITING";
         r = prism.getFeedHelper().listInstances(feedName, params, null);
-        InstanceUtil.validateResponse(r, 4, 0, 0, 4, 0);
+        InstanceUtil.validateResponse(r, 8, 0, 0, 8, 0);
 
+        //filtered range is from 1st till 9th instances inclusively
         params = "start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 41)
             + "&filterBy=status:WAITING";
         r = prism.getFeedHelper().listInstances(feedName, params, null);
-        InstanceUtil.validateResponse(r, 3, 0, 0, 3, 0);
+        InstanceUtil.validateResponse(r, 7, 0, 0, 7, 0);
 
-        params = "start=" + startTime + "&offset=1&numResults=1&filterBy=status:WAITING";
+        //filtered range is within 3nd till 8th instance inclusively
+        params = "start=" + startTime + "&offset=4&numResults=6&filterBy=status:WAITING";
         r = prism.getFeedHelper().listInstances(feedName, params, null);
-        InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0);
+        InstanceUtil.validateResponse(r, 4, 0, 0, 4, 0);
 
+        //filtered range is within 4th till 10th instances inclusively
         params = "start=" + TimeUtil.addMinsToTime(startTime, 16) + "&offset=2&numResults=12";
         r = prism.getFeedHelper().listInstances(feedName, params, null);
-        InstanceUtil.validateResponse(r, 6, 0, 1, 3, 2);
+        InstanceUtil.validateResponse(r, 6, 0, 1, 5, 0);
 
+        //use mix of params
         String sourceCluster = bundles[0].getClusterNames().get(0);
         String clusterName = bundles[1].getClusterNames().get(0);
         params = "start=" + startTime + "&filterBy=STATUS:KILLED,CLUSTER:"+ clusterName
             + "&numResults=5&orderBy=startTime&sortOrder=desc";
         r = prism.getFeedHelper().listInstances(feedName, params, null);
-        InstanceUtil.validateResponse(r, 5, 0, 0, 0, 5);
+        InstanceUtil.validateResponse(r, 2, 0, 0, 0, 2);
 
         //should be ordered by a start time
         SoftAssert softAssert = new SoftAssert();
@@ -468,21 +479,15 @@ public class ListFeedInstancesTest extends BaseTestClass {
         }
         softAssert.assertAll();
 
-        //missing 1st, 11th, 12th instances, all other instances should be retrieved.
+        // filtered range is within 2nd and 10th instance inclusively
         params = "start=" + TimeUtil.addMinsToTime(startTime, 2) + "&offset=2";
         r = prism.getFeedHelper().listInstances(feedName, params, null);
-        InstanceUtil.validateResponse(r, 9, 0, 1, 3, 5);
+        InstanceUtil.validateResponse(r, 9, 0, 1, 6, 2);
 
-        //missing the 1st, 11th, 12th instance, all instances which have progressed should be present:
-        //5 killed + 1 suspended, but numResults=5, so expecting 1 suspended and 4 killed instances.
+        //filtered range is within 7nd and 11th instances inclusively
         params = "start=" + TimeUtil.addMinsToTime(startTime, 2) + "&filterBy=SOURCECLUSTER:"
-            + sourceCluster + "&offset=1&numResults=5";
+            + sourceCluster + "&offset=1&numResults=5" + "&colo=*";
         r = prism.getFeedHelper().listInstances(feedName, params, null);
-        InstanceUtil.validateResponse(r, 5, 0, 1, 0, 4);
-    }
-
-    @AfterClass(alwaysRun = true)
-    public void tearDown() throws IOException {
-        removeTestClassEntities();
+        InstanceUtil.validateResponse(r, 5, 1, 1, 3, 0);
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/1e3dcb73/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
index 43bdd87..a4022a8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java
@@ -19,9 +19,11 @@
 package org.apache.falcon.regression.lineage;
 
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.regression.core.helpers.ColoHelper;
 import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
 import org.apache.falcon.regression.core.util.InstanceUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
 import org.apache.falcon.regression.core.util.OozieUtil;
@@ -78,11 +80,14 @@ public class ListProcessInstancesTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(baseTestHDFSDir + "/output/" + suffix + MINUTE_DATE_PATTERN);
         bundles[0].setProcessValidity(startTime, endTime);
         bundles[0].setProcessConcurrency(3);
+        bundles[0].setInputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
         bundles[0].submitAndScheduleProcess();
         processName = bundles[0].getProcessName();
         InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
         //create data for processes to run and wait some time for instances to make progress
-        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0);
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 1);
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 2);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 3);
     }
@@ -90,6 +95,7 @@ public class ListProcessInstancesTest extends BaseTestClass {
     @AfterMethod(alwaysRun = true)
     public void tearDown() throws IOException {
         removeTestClassEntities();
+        HadoopUtil.deleteDirIfExists(sourcePath, serverFS.get(0));
     }
 
     /**
@@ -97,9 +103,6 @@ public class ListProcessInstancesTest extends BaseTestClass {
      */
     @Test
     public void testProcessOrderBy() throws Exception {
-        //provide data for 4th and 5th instances (fyi: indexing starts from 0th instance)
-        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 3);
-        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 4);
         SoftAssert softAssert = new SoftAssert();
         //orderBy startTime descending order
         InstancesResult r = prism.getProcessHelper().listInstances(processName,
@@ -122,14 +125,23 @@ public class ListProcessInstancesTest extends BaseTestClass {
         r = prism.getProcessHelper().getProcessInstanceKill(processName,
             "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 3));
         InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1);
-        //wait till instances status be stable
+        //wait till another instances succeed
+        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        //provide data for 4th, 5th and 6th instances (indexing starts from 0th instance)
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 3);
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 4);
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 5);
+
+        //wait for new 3 instances to run
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
-            CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 3);
+            CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
 
         //orderBy status ascending order
         r = prism.getProcessHelper().listInstances(processName,
             "start=" + startTime + "&numResults=12&orderBy=status&sortOrder=desc", null);
-        InstanceUtil.validateResponse(r, 12, 3, 1, 7, 1);
+        InstanceUtil.validateResponse(r, 12, 3, 1, 6, 1);
         instances = r.getInstances();
         InstancesResult.WorkflowStatus previousStatus = InstancesResult.WorkflowStatus.WAITING;
         for (InstancesResult.Instance instance : instances) {
@@ -221,10 +233,6 @@ public class ListProcessInstancesTest extends BaseTestClass {
      */
     @Test
     public void testProcessFilterBy() throws Exception {
-        //provide data for 4th and 5th instances (fyi: indexing starts from 0th instance)
-        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 3);
-        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 4);
-
         //test with simple filters
         InstancesResult r = prism.getProcessHelper().listInstances(processName,
             "filterBy=STATUS:RUNNING", null);
@@ -247,14 +255,14 @@ public class ListProcessInstancesTest extends BaseTestClass {
             "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 3));
         InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1);
 
-        //wait till new instances be RUNNING and total status count be stable
-        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
+        //check that running instance is still running
+        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 3);
 
         //get all instances
         r = prism.getProcessHelper().listInstances(processName,
             "start=" + startTime + "&numResults=12", null);
-        InstanceUtil.validateResponse(r, 12, 3, 1, 7, 1);
+        InstanceUtil.validateResponse(r, 12, 1, 1, 9, 1);
 
         //use different statuses, filterBy among all instances
         r = prism.getProcessHelper().listInstances(processName,
@@ -265,10 +273,10 @@ public class ListProcessInstancesTest extends BaseTestClass {
         InstanceUtil.validateResponse(r, 1, 0, 1, 0, 0);
         r = prism.getProcessHelper().listInstances(processName,
             "start=" + startTime + "&filterBy=STATUS:RUNNING", null);
-        InstanceUtil.validateResponse(r, 3, 3, 0, 0, 0);
+        InstanceUtil.validateResponse(r, 1, 1, 0, 0, 0);
         r = prism.getProcessHelper().listInstances(processName,
             "start=" + startTime + "&filterBy=STATUS:WAITING", null);
-        InstanceUtil.validateResponse(r, 7, 0, 0, 7, 0);
+        InstanceUtil.validateResponse(r, 9, 0, 0, 9, 0);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/falcon/blob/1e3dcb73/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java
index 4ad775e..6acec27 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java
@@ -85,6 +85,7 @@ public class EntityPageTest extends BaseUITestClass {
         bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes);
         bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
         bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setInputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
 
         openBrowser();
         final LoginPage loginPage = LoginPage.open(getDriver());
@@ -295,14 +296,15 @@ public class EntityPageTest extends BaseUITestClass {
 
         InstanceUtil.waitTillInstanceReachState(clusterOC, process.getName(), 1,
             CoordinatorAction.Status.WAITING, EntityType.PROCESS, 1);
-        final List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startTime,
-            TimeUtil.addMinsToTime(endTime, -5), 5);
-        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
 
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, process.getName(), 0, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, process.getName(), 1,
             CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 5);
+
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, process.getName(), 0, 1);
         InstanceUtil.waitTillInstanceReachState(clusterOC, process.getName(), 1,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
+
         //suspend the second instance
         prism.getProcessHelper().getProcessInstanceSuspend(process.getName(),
             "?start=" + TimeUtil.addMinsToTime(startTime, 5)


[6/9] falcon git commit: FALCON-1710 dependency API sets totalResults as 0 by default. Contributed by Praveen Adlakha.

Posted by pa...@apache.org.
FALCON-1710 dependency API sets totalResults as 0 by default. Contributed by Praveen Adlakha.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/5e80dcd0
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/5e80dcd0
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/5e80dcd0

Branch: refs/heads/0.9
Commit: 5e80dcd0ddaace24582c4e07b99e6032f053c250
Parents: 054295a
Author: Ajay Yadava <aj...@gmail.com>
Authored: Tue Jan 5 20:42:28 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Jan 5 23:31:26 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                                     | 2 ++
 client/src/main/java/org/apache/falcon/resource/EntityList.java | 5 +++++
 .../test/java/org/apache/falcon/resource/EntityManagerTest.java | 1 +
 3 files changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/5e80dcd0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 597b551..599efca 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -90,6 +90,8 @@ Proposed Release Version: 0.9
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1710 dependency API sets totalResults as 0 by default(Praveen Adlakha via Ajay Yadava)
+
     FALCON-1714 EntityNotRegisteredException when process with no input/output feed is scheduled(Ajay Yadava)
 
     FALCON-1674 Fix the mapping of InstanceState status to workflow Status in InstancesResult (Pallavi Rao via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/5e80dcd0/client/src/main/java/org/apache/falcon/resource/EntityList.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/EntityList.java b/client/src/main/java/org/apache/falcon/resource/EntityList.java
index f581691..b91bdbe 100644
--- a/client/src/main/java/org/apache/falcon/resource/EntityList.java
+++ b/client/src/main/java/org/apache/falcon/resource/EntityList.java
@@ -43,6 +43,10 @@ public class EntityList {
     public static final String INPUT_TAG = "Input";
     public static final String OUTPUT_TAG = "Output";
 
+    public int getTotalResults() {
+        return totalResults;
+    }
+
     @XmlElement
     private int totalResults;
 
@@ -141,6 +145,7 @@ public class EntityList {
 
     public EntityList(Entity[] dependentEntities, Entity entity) {
         int len = dependentEntities.length;
+        this.totalResults = len;
         EntityElement[] items = new EntityElement[len];
         for (int i = 0; i < len; i++) {
             Entity e = dependentEntities[i];

http://git-wip-us.apache.org/repos/asf/falcon/blob/5e80dcd0/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
index 99a537d..0519cd2 100644
--- a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
@@ -151,6 +151,7 @@ public class EntityManagerTest extends AbstractEntityManager {
         EntityList entityList = this.getEntityList("", "", "", "process", "", "", "", "asc", 0, 10, "");
         Assert.assertNotNull(entityList.getElements());
         Assert.assertEquals(entityList.getElements().length, 1);
+        Assert.assertEquals(entityList.getTotalResults(), 1);
 
         /*
          * Both entities should be returned when the user is SuperUser.


[4/9] falcon git commit: FALCON-1674 Fix the mapping of InstanceState status to workflow Status in InstancesResult. Contributed by Pallavi Rao.

Posted by pa...@apache.org.
FALCON-1674 Fix the mapping of InstanceState status to workflow Status in InstancesResult. Contributed by Pallavi Rao.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/679167a2
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/679167a2
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/679167a2

Branch: refs/heads/0.9
Commit: 679167a24508994c5b4128162f856c2385de0df2
Parents: 1e3dcb7
Author: Ajay Yadava <aj...@gmail.com>
Authored: Tue Jan 5 13:44:03 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Jan 5 13:44:03 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 docs/src/site/twiki/restapi/InstanceList.twiki  | 10 ++++
 .../workflow/engine/OozieWorkflowEngine.java    |  5 +-
 .../workflow/engine/FalconWorkflowEngine.java   | 49 ++++++++++++++++----
 4 files changed, 56 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/679167a2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9c59ed6..1d3aa02 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -90,6 +90,8 @@ Proposed Release Version: 0.9
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1674 Fix the mapping of InstanceState status to workflow Status in InstancesResult (Pallavi Rao via Ajay Yadava)
+
     FALCON-1709 FIFO order is not followed when scheduled using native scheduler (Pallavi Rao)
 
     FALCON-1711 DependencyInstance twiki does not contain correct documentation	(Praveen Adlakha via Pallavi Rao)

http://git-wip-us.apache.org/repos/asf/falcon/blob/679167a2/docs/src/site/twiki/restapi/InstanceList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceList.twiki b/docs/src/site/twiki/restapi/InstanceList.twiki
index 229d6f9..5dee8a5 100644
--- a/docs/src/site/twiki/restapi/InstanceList.twiki
+++ b/docs/src/site/twiki/restapi/InstanceList.twiki
@@ -29,6 +29,16 @@ Get list of all instances of a given entity.
 ---++ Results
 List of instances of given entity.
 
+The possible instance status returned and its meaning are as follows:
+   * WAITING - The instance is waiting for the corresponding data(feed) instances to become available.
+   * READY - The instance is ready to be scheduled. But, is waiting for scheduling conditions to be met. For example, limitation on number of instances that can be run in parallel.
+   * RUNNING - The instance is running on the workflow engine.
+   * FAILED - The instance has failed during execution.
+   * KILLED - The instance has been killed either manually or by the system.
+   * SUCCEEDED - The instance has executed successfully.
+   * SKIPPED - This instance was not executed, but was skipped. For example, when the execution order is LAST_ONLY, the older instances are skipped.
+   * ERROR - There was error while executing this instance on the workflow engine.
+   * UNDEFINED - The status of the instance could not be determined.
 ---++ Examples
 ---+++ Rest Call
 <verbatim>

http://git-wip-us.apache.org/repos/asf/falcon/blob/679167a2/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index b486357..cf0e30d 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -953,8 +953,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private String mapActionStatus(String status) {
-        if (CoordinatorAction.Status.READY.toString().equals(status)
-            || CoordinatorAction.Status.WAITING.toString().equals(status)
+        if (CoordinatorAction.Status.READY.toString().equals(status)) {
+            return InstancesResult.WorkflowStatus.READY.name();
+        } else if (CoordinatorAction.Status.WAITING.toString().equals(status)
             || CoordinatorAction.Status.SUBMITTED.toString().equals(status)) {
             return InstancesResult.WorkflowStatus.WAITING.name();
         } else if (CoordinatorAction.Status.IGNORED.toString().equals(status)) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/679167a2/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index bceab72..8306b34 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -27,6 +27,7 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.execution.EntityExecutor;
 import org.apache.falcon.execution.ExecutionInstance;
 import org.apache.falcon.execution.FalconExecutionService;
@@ -267,23 +268,19 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
         switch(action) {
         case KILL:
             executor.kill(instance);
-            instanceInfo.status = InstancesResult.WorkflowStatus.KILLED;
+            populateInstanceInfo(instanceInfo, instance);
             break;
         case SUSPEND:
             executor.suspend(instance);
-            instanceInfo.status = InstancesResult.WorkflowStatus.SUSPENDED;
+            populateInstanceInfo(instanceInfo, instance);
             break;
         case RESUME:
             executor.resume(instance);
-            instanceInfo.status =
-                    InstancesResult.WorkflowStatus.valueOf(STATE_STORE
-                            .getExecutionInstance(instance.getId()).getCurrentState().name());
+            populateInstanceInfo(instanceInfo, instance);
             break;
         case RERUN:
             executor.rerun(instance, userProps, isForced);
-            instanceInfo.status =
-                    InstancesResult.WorkflowStatus.valueOf(STATE_STORE
-                            .getExecutionInstance(instance.getId()).getCurrentState().name());
+            populateInstanceInfo(instanceInfo, instance);
             break;
         case STATUS:
             // Mask wfParams
@@ -293,6 +290,9 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
                         DAGEngineFactory.getDAGEngine(cluster).getJobDetails(instance.getExternalID());
                 instanceInfo.actions = instanceActions
                         .toArray(new InstancesResult.InstanceAction[instanceActions.size()]);
+            // If not scheduled externally yet, get details from state
+            } else {
+                populateInstanceInfo(instanceInfo, instance);
             }
             break;
 
@@ -313,6 +313,39 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
         return instanceInfo;
     }
 
+    // Populates the InstancesResult.Instance instance using ExecutionInstance
+    private void populateInstanceInfo(InstancesResult.Instance instanceInfo, ExecutionInstance instance)
+        throws StateStoreException {
+        instanceInfo.cluster = instance.getCluster();
+        InstanceState.STATE state = STATE_STORE.getExecutionInstance(instance.getId()).getCurrentState();
+        switch (state) {
+        case SUCCEEDED:
+            instanceInfo.status = InstancesResult.WorkflowStatus.SUCCEEDED;
+            break;
+        case FAILED:
+            instanceInfo.status = InstancesResult.WorkflowStatus.FAILED;
+            break;
+        case KILLED:
+            instanceInfo.status = InstancesResult.WorkflowStatus.KILLED;
+            break;
+        case READY:
+            instanceInfo.status = InstancesResult.WorkflowStatus.READY;
+            break;
+        case WAITING:
+            instanceInfo.status = InstancesResult.WorkflowStatus.WAITING;
+            break;
+        case SUSPENDED:
+            instanceInfo.status = InstancesResult.WorkflowStatus.SUSPENDED;
+            break;
+        case RUNNING:
+            instanceInfo.status = InstancesResult.WorkflowStatus.RUNNING;
+            break;
+        default:
+            instanceInfo.status = InstancesResult.WorkflowStatus.UNDEFINED;
+            break;
+        }
+    }
+
     @Override
     public InstancesResult killInstances(Entity entity, Date start, Date end,
                                          Properties props, List<LifeCycle> lifeCycles) throws FalconException {


[5/9] falcon git commit: FALCON-1714 EntityNotRegisteredException when process with no input/output feed is scheduled. Contributed by Ajay Yadava.

Posted by pa...@apache.org.
FALCON-1714 EntityNotRegisteredException when process with no input/output feed is scheduled. Contributed by Ajay Yadava.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/054295a7
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/054295a7
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/054295a7

Branch: refs/heads/0.9
Commit: 054295a7de9af8b0b0f24f212a93c7bd2f27eae6
Parents: 679167a
Author: Ajay Yadava <aj...@gmail.com>
Authored: Tue Jan 5 15:03:22 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Jan 5 15:03:22 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                            |  2 ++
 .../apache/falcon/handler/SLAMonitoringHandler.java    | 13 ++++++++-----
 2 files changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/054295a7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1d3aa02..597b551 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -90,6 +90,8 @@ Proposed Release Version: 0.9
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1714 EntityNotRegisteredException when process with no input/output feed is scheduled(Ajay Yadava)
+
     FALCON-1674 Fix the mapping of InstanceState status to workflow Status in InstancesResult (Pallavi Rao via Ajay Yadava)
 
     FALCON-1709 FIFO order is not followed when scheduled using native scheduler (Pallavi Rao)

http://git-wip-us.apache.org/repos/asf/falcon/blob/054295a7/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java b/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java
index bb3b8e0..df2a1e0 100644
--- a/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java
+++ b/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java
@@ -18,6 +18,7 @@
 package org.apache.falcon.handler;
 
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
@@ -53,12 +54,14 @@ public class SLAMonitoringHandler implements WorkflowExecutionListener {
                                      String[] outputFeedInstancePathsList) throws FalconException {
         Storage storage;
         for (int index=0; index<outputFeedNamesList.length; index++) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, outputFeedNamesList[index]);
-            storage = FeedHelper.createStorage(clusterName, feed);
-            String templatePath = new Path(storage.getUriTemplate(LocationType.DATA)).toUri().getPath();
-            Date date = FeedHelper.getDate(templatePath, new Path(outputFeedInstancePathsList[index]),
+            if (!StringUtils.equals(outputFeedNamesList[index], "NONE")) {
+                Feed feed = EntityUtil.getEntity(EntityType.FEED, outputFeedNamesList[index]);
+                storage = FeedHelper.createStorage(clusterName, feed);
+                String templatePath = new Path(storage.getUriTemplate(LocationType.DATA)).toUri().getPath();
+                Date date = FeedHelper.getDate(templatePath, new Path(outputFeedInstancePathsList[index]),
                     EntityUtil.getTimeZone(feed));
-            FeedSLAMonitoringService.get().makeFeedInstanceAvailable(outputFeedNamesList[index], clusterName, date);
+                FeedSLAMonitoringService.get().makeFeedInstanceAvailable(outputFeedNamesList[index], clusterName, date);
+            }
         }
     }
 


[7/9] falcon git commit: FALCON-1719 Retry does not update the state of the instance in the database

Posted by pa...@apache.org.
FALCON-1719 Retry does not update the state of the instance in the database


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7cde36c4
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7cde36c4
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7cde36c4

Branch: refs/heads/0.9
Commit: 7cde36c4104bcb43b913cbd1c22288daa773f488
Parents: 5e80dcd
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Wed Jan 6 15:13:59 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Wed Jan 6 15:13:59 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                          |  2 ++
 .../falcon/rerun/handler/AbstractRerunConsumer.java  | 11 ++++++++---
 .../falcon/rerun/handler/AbstractRerunHandler.java   | 15 +++++++++++++--
 .../falcon/rerun/handler/LateRerunConsumer.java      |  9 +++++----
 .../falcon/rerun/handler/LateRerunHandler.java       |  5 ++---
 .../apache/falcon/rerun/handler/RetryConsumer.java   |  4 ++--
 6 files changed, 32 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 599efca..2ed1ab4 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -90,6 +90,8 @@ Proposed Release Version: 0.9
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1719 Retry does not update the state of the instance in the database (Pavan Kolamuri via Pallavi Rao)
+
     FALCON-1710 dependency API sets totalResults as 0 by default(Praveen Adlakha via Ajay Yadava)
 
     FALCON-1714 EntityNotRegisteredException when process with no input/output feed is scheduled(Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
index 582cb15..f60b927 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
@@ -26,6 +26,7 @@ import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
 import org.apache.falcon.rerun.policy.ExpBackoffPolicy;
 import org.apache.falcon.rerun.queue.DelayedQueue;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,9 +75,12 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
 
                 // Login the user to access WfEngine as this user
                 CurrentUser.authenticate(message.getWorkflowUser());
-                String jobStatus = handler.getWfEngine().getWorkflowStatus(
+                AbstractWorkflowEngine wfEngine = handler.getWfEngine(message.getEntityType(),
+                        message.getEntityName());
+                String jobStatus = wfEngine.getWorkflowStatus(
                         message.getClusterName(), message.getWfId());
-                handleRerun(message.getClusterName(), jobStatus, message);
+                handleRerun(message.getClusterName(), jobStatus, message,
+                        message.getEntityType(), message.getEntityName());
 
             } catch (Throwable e) {
                 LOG.error("Error in rerun consumer", e);
@@ -84,5 +88,6 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
         }
     }
 
-    protected abstract void handleRerun(String clusterName, String jobStatus, T message);
+    protected abstract void handleRerun(String clusterName, String jobStatus, T message,
+                                        String entityType, String entityName);
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index 64c566e..bc1f7f2 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -17,9 +17,11 @@
  */
 package org.apache.falcon.rerun.handler;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.process.Retry;
 import org.apache.falcon.rerun.event.RerunEvent;
 import org.apache.falcon.rerun.queue.DelayedQueue;
@@ -58,8 +60,17 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay
                                      String wfId, String workflowUser, long msgReceivedTime);
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
-    public AbstractWorkflowEngine getWfEngine() {
-        return wfEngine;
+    public AbstractWorkflowEngine getWfEngine(String entityType, String entityName) {
+        if (StringUtils.isBlank(entityType) || StringUtils.isBlank(entityName)) {
+            return wfEngine;
+        }
+        try {
+            Entity entity = EntityUtil.getEntity(EntityType.valueOf(entityType), entityName);
+            return WorkflowEngineFactory.getWorkflowEngine(entity);
+        } catch (FalconException e) {
+            // Just to make sure of backward compatibility in case of any exceptions.
+            return wfEngine;
+        }
     }
 
     public boolean offerToQueue(T event) throws FalconException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index ee31952..4297788 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -53,7 +53,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
 
     @Override
     protected void handleRerun(String clusterName, String jobStatus,
-                               LaterunEvent message) {
+                               LaterunEvent message, String entityType, String entityName) {
         try {
             if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")
                     || jobStatus.equals("SUSPENDED")) {
@@ -77,7 +77,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
 
             LOG.info("Late changes detected in the following feeds: {}", detectLate);
 
-            handler.getWfEngine().reRun(message.getClusterName(), message.getWfId(), null, true);
+            handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), message.getWfId(), null, true);
             LOG.info("Scheduled late rerun for wf-id: {} on cluster: {}",
                     message.getWfId(), message.getClusterName());
         } catch (Exception e) {
@@ -91,8 +91,9 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
 
     public String detectLate(LaterunEvent message) throws Exception {
         LateDataHandler late = new LateDataHandler();
-        Properties properties = handler.getWfEngine().getWorkflowProperties(
-                message.getClusterName(), message.getWfId());
+        AbstractWorkflowEngine wfEngine = handler.getWfEngine(message.getEntityType(),
+                message.getEntityName());
+        Properties properties = wfEngine.getWorkflowProperties(message.getClusterName(), message.getWfId());
         String falconInputs = properties.getProperty(WorkflowExecutionArgs.INPUT_NAMES.getName());
         String falconInPaths = properties.getProperty(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName());
         String falconInputFeedStorageTypes =

http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 64177a4..1d2ed37 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -65,9 +65,8 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
             Long wait = getEventDelay(entity, nominalTime);
             if (wait == -1) {
                 LOG.info("Late rerun expired for entity: {} ({})", entityType, entityName);
-
-                java.util.Properties properties =
-                        this.getWfEngine().getWorkflowProperties(cluster, wfId);
+                AbstractWorkflowEngine wfEngine = this.getWfEngine(entityType, entityName);
+                java.util.Properties properties = wfEngine.getWorkflowProperties(cluster, wfId);
                 String logDir = properties.getProperty("logDir");
                 String srcClusterName = properties.getProperty("srcClusterName");
                 Path lateLogPath = this.getLateLogPath(logDir,

http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index 61aa3e1..96300d9 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -39,7 +39,7 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
 
     @Override
     protected void handleRerun(String clusterName, String jobStatus,
-                               RetryEvent message) {
+                               RetryEvent message, String entityType, String entityName) {
         try {
             if (!jobStatus.equals("KILLED")) {
                 LOG.debug("Re-enqueing message in RetryHandler for workflow with same delay as job status is running:"
@@ -52,7 +52,7 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
                     + " At time: {}",
                     (message.getRunId() + 1), message.getAttempts(), message.getEntityName(), message.getInstance(),
                     message.getWfId(), SchemaHelper.formatDateUTC(new Date(System.currentTimeMillis())));
-            handler.getWfEngine().reRun(message.getClusterName(), message.getWfId(), null, false);
+            handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), message.getWfId(), null, false);
         } catch (Exception e) {
             int maxFailRetryCount = Integer.parseInt(StartupProperties.get()
                     .getProperty("max.retry.failure.count", "1"));


[8/9] falcon git commit: FALCON-1720 Rerun API does not rerun succeeded instances (by Pavan Kolamuri)

Posted by pa...@apache.org.
FALCON-1720 Rerun API does not rerun succeeded instances (by Pavan Kolamuri)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ccb6df38
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ccb6df38
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ccb6df38

Branch: refs/heads/0.9
Commit: ccb6df38bf086135d28bce9c0c92d6fd23e19459
Parents: 7cde36c
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Wed Jan 6 15:35:31 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Wed Jan 6 15:35:31 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../falcon/workflow/engine/OozieDAGEngine.java  | 24 +++++++++++++++-----
 .../AbstractSchedulerManagerJerseyIT.java       |  2 +-
 .../InstanceSchedulerManagerJerseyIT.java       | 12 +++++++++-
 .../local-process-noinputs-template.xml         |  2 +-
 5 files changed, 33 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/ccb6df38/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2ed1ab4..5677175 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -90,6 +90,8 @@ Proposed Release Version: 0.9
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1720 Rerun API does not rerun succeeded instances (Pavan Kolamuri via Pallavi Rao)
+
     FALCON-1719 Retry does not update the state of the instance in the database (Pavan Kolamuri via Pallavi Rao)
 
     FALCON-1710 dependency API sets totalResults as 0 by default(Praveen Adlakha via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/ccb6df38/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
index 1425a97..1d0e126 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
@@ -237,17 +237,29 @@ public class OozieDAGEngine implements DAGEngine {
         String jobId = instance.getExternalID();
         try {
             WorkflowJob jobInfo = client.getJobInfo(jobId);
-            Properties jobprops = OozieUtils.toProperties(jobInfo.getConf());
-            if (props != null) {
-                jobprops.putAll(props);
+            if (props == null) {
+                props = new Properties();
             }
             //if user has set any of these oozie rerun properties then force rerun flag is ignored
-            if (!jobprops.containsKey(OozieClient.RERUN_FAIL_NODES)
-                    && !jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) {
-                jobprops.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced));
+            if (!props.containsKey(OozieClient.RERUN_FAIL_NODES)
+                    && !props.containsKey(OozieClient.RERUN_SKIP_NODES)) {
+                props.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced));
             }
+
+            Properties jobprops = OozieUtils.toProperties(jobInfo.getConf());
+            jobprops.putAll(props);
+
             jobprops.remove(OozieClient.COORDINATOR_APP_PATH);
             jobprops.remove(OozieClient.BUNDLE_APP_PATH);
+            // In case if both props exists one should be removed otherwise it will fail.
+            // This case will occur when user runs workflow with skip-nodes property and
+            // try to do force rerun or rerun with fail-nodes property.
+            if (jobprops.containsKey(OozieClient.RERUN_FAIL_NODES)
+                    && jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) {
+                LOG.warn("Both " + OozieClient.RERUN_SKIP_NODES + " and " +  OozieClient.RERUN_FAIL_NODES
+                        + " are present in workflow params removing" + OozieClient.RERUN_SKIP_NODES);
+                jobprops.remove(OozieClient.RERUN_SKIP_NODES);
+            }
             client.reRun(jobId, jobprops);
             assertStatus(instance.getExternalID(), Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUCCEEDED);
             LOG.info("Rerun job {} of entity {} of time {} on cluster {}", jobId, instance.getEntity().getName(),

http://git-wip-us.apache.org/repos/asf/falcon/blob/ccb6df38/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
index f053b76..0a3e984 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
@@ -62,7 +62,7 @@ public class AbstractSchedulerManagerJerseyIT extends FalconUnitTestBase {
     private static final String DB_BASE_DIR = "target/test-data/falcondb";
     protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
     protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
-    protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
+    protected static final String DB_SQL_FILE = dbLocation + File.separator + "out.sql";
     protected LocalFileSystem localFS = new LocalFileSystem();
 
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/ccb6df38/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
index 1523b76..18c36ff 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
@@ -64,7 +64,7 @@ public class InstanceSchedulerManagerJerseyIT extends AbstractSchedulerManagerJe
     }
 
     @Test
-    public void testKillInstances() throws Exception {
+    public void testKillAndRerunInstances() throws Exception {
         UnitTestContext context = new UnitTestContext();
         Map<String, String> overlay = context.getUniqueOverlay();
 
@@ -84,6 +84,16 @@ public class InstanceSchedulerManagerJerseyIT extends AbstractSchedulerManagerJe
                 processName, START_INSTANCE);
         Assert.assertEquals(status, InstancesResult.WorkflowStatus.KILLED);
 
+        result = falconUnitClient.rerunInstances(EntityType.PROCESS.toString(),
+                processName, START_INSTANCE, END_TIME, colo, null, null, null, null, true, null);
+        assertStatus(result);
+
+        waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE,
+                InstancesResult.WorkflowStatus.RUNNING);
+        status = getClient().getInstanceStatus(EntityType.PROCESS.name(),
+                processName, START_INSTANCE);
+        Assert.assertEquals(status, InstancesResult.WorkflowStatus.RUNNING);
+
 
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/ccb6df38/webapp/src/test/resources/local-process-noinputs-template.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/local-process-noinputs-template.xml b/webapp/src/test/resources/local-process-noinputs-template.xml
index aabdc6a..a7388d8 100644
--- a/webapp/src/test/resources/local-process-noinputs-template.xml
+++ b/webapp/src/test/resources/local-process-noinputs-template.xml
@@ -38,5 +38,5 @@
         <property name="sundayThisWeek" value="${currentWeek('SUN', 0, 0)}"/>
     </properties>
     <workflow engine="oozie" path="##workflow.path##" lib="##workflow.lib.path##"/>
-    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+    <retry policy="periodic" delay="minutes(10)" attempts="1"/>
 </process>


[9/9] falcon git commit: Merge FALCON-1719 and FALCON-1720 with latest code base

Posted by pa...@apache.org.
Merge FALCON-1719 and FALCON-1720 with latest code base


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/092f70a9
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/092f70a9
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/092f70a9

Branch: refs/heads/0.9
Commit: 092f70a9d2b42983aaaa65fc5816a3d1a2df8245
Parents: ccb6df3 dfb85ab
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Wed Jan 6 16:20:32 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Wed Jan 6 16:20:32 2016 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/092f70a9/falcon-regression/CHANGES.txt
----------------------------------------------------------------------


[2/9] falcon git commit: FALCON-1717 Update CHANGES.txt for 0.9 Release. Contributed by Pallavi Rao.

Posted by pa...@apache.org.
FALCON-1717 Update CHANGES.txt for 0.9 Release. Contributed by Pallavi Rao.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/6d0119b7
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/6d0119b7
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/6d0119b7

Branch: refs/heads/0.9
Commit: 6d0119b7b4ef23a36ca5888762900d64ef600ce8
Parents: 20bacfa
Author: Ajay Yadava <aj...@gmail.com>
Authored: Mon Jan 4 15:07:27 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Jan 4 15:07:27 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/6d0119b7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4adbdef..9c59ed6 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,5 @@
 Apache Falcon Change log
-Trunk (Unreleased)
+Proposed Release Version: 0.9
     FALCON-1594 Update changes.txt to reflect 0.8 as released version(Sowmya Ramesh via Ajay Yadava)
 
     FALCON-1523 Update CHANGES.txt to change 0.8 branch to release(Sowmya Ramesh)