You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ar...@apache.org on 2014/08/21 19:30:55 UTC

[13/18] git commit: FALCON-622 Fix ProcessUpdate and update at specific time test contributed by Samarth Gupta

FALCON-622 Fix ProcessUpdate and update at specific time test contributed by Samarth Gupta


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/a349aeda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/a349aeda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/a349aeda

Branch: refs/heads/FALCON-585
Commit: a349aeda7b7455929fee51da56b78688239488d5
Parents: b991e81
Author: Samarth Gupta <sa...@inmobi.com>
Authored: Wed Aug 20 14:32:19 2014 +0530
Committer: Samarth Gupta <sa...@inmobi.com>
Committed: Wed Aug 20 14:32:19 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../falcon/regression/core/util/OozieUtil.java  | 12 +++++--
 .../falcon/regression/ProcessLibPathTest.java   |  8 +++++
 .../prism/NewPrismProcessUpdateTest.java        | 34 +++++++++-----------
 .../prism/UpdateAtSpecificTimeTest.java         |  6 ++--
 5 files changed, 37 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a349aeda/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6409032..3521216 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -64,6 +64,8 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-622 Fix ProcessUpdate and update at specific time test
+   (Samarthg)
    FALCON-616 cluster submission should fail when shared libs copy fail
    (Shwetha GS via Suhas Vasu)   
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a349aeda/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
index e22416a..806bbd3 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
@@ -395,7 +395,7 @@ public final class OozieUtil {
         initialNominalTimes.removeAll(nominalTimesOriginalAndNew);
 
         if (initialNominalTimes.size() != 0) {
-            LOGGER.debug("Missing instance are : " + initialNominalTimes);
+            LOGGER.info("Missing instance are : " + initialNominalTimes);
             LOGGER.debug("Original Bundle ID   : " + originalBundleId);
             LOGGER.debug("New Bundle ID        : " + newBundleId);
 
@@ -456,8 +456,14 @@ public final class OozieUtil {
                                                  String entityName, int bundleNumber)
         throws OozieClientException, IOException {
         String bundleID = InstanceUtil.getSequenceBundleID(helper, entityName, type, bundleNumber);
+        createMissingDependenciesForBundle(helper, bundleID);
+
+    }
+
+    public static void createMissingDependenciesForBundle(ColoHelper helper, String bundleId)
+            throws OozieClientException, IOException {
         OozieClient oozieClient = helper.getClusterHelper().getOozieClient();
-        List<CoordinatorJob> coords = oozieClient.getBundleJobInfo(bundleID).getCoordinators();
+        List<CoordinatorJob> coords = oozieClient.getBundleJobInfo(bundleId).getCoordinators();
         for (CoordinatorJob coord : coords) {
 
             CoordinatorJob temp = oozieClient.getCoordJobInfo(coord.getId());
@@ -465,7 +471,7 @@ public final class OozieUtil {
                  instanceNumber++) {
                 CoordinatorAction instance = temp.getActions().get(instanceNumber);
                 InstanceUtil.createHDFSFolders(helper,
-                    Arrays.asList(instance.getMissingDependencies().split("#")));
+                        Arrays.asList(instance.getMissingDependencies().split("#")));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a349aeda/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
index 01e58c9..fc8e4a8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.regression;
 
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.regression.core.helpers.ColoHelper;
@@ -25,6 +26,7 @@ import org.apache.falcon.regression.core.util.BundleUtil;
 import org.apache.falcon.regression.core.util.HadoopUtil;
 import org.apache.falcon.regression.core.util.InstanceUtil;
 import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
 import org.apache.falcon.regression.core.util.TimeUtil;
 import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
@@ -109,6 +111,9 @@ public class ProcessLibPathTest extends BaseTestClass {
         bundles[0].setProcessWorkflow(workflowDir);
         logger.info("processData: " + Util.prettyPrintXml(bundles[0].getProcessData()));
         bundles[0].submitFeedsScheduleProcess(prism);
+        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, Util.readEntityName(bundles[0]
+                .getProcessData()), 0);
         InstanceUtil
             .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
     }
@@ -128,6 +133,9 @@ public class ProcessLibPathTest extends BaseTestClass {
         bundles[0].setProcessWorkflow(workflowDir);
         logger.info("processData: " + Util.prettyPrintXml(bundles[0].getProcessData()));
         bundles[0].submitFeedsScheduleProcess(prism);
+        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, Util.readEntityName(bundles[0]
+                .getProcessData()), 0);
         InstanceUtil
             .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a349aeda/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
index f9f37cb..f4553df 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
@@ -90,12 +90,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         LOGGER.info("test name: " + method.getName());
         Bundle b = BundleUtil.readUpdateBundle();
         bundles[0] = new Bundle(b, cluster1);
+        bundles[0].generateUniqueBundle();
         bundles[1] = new Bundle(b, cluster2);
+        bundles[1].generateUniqueBundle();
         bundles[2] = new Bundle(b, cluster3);
+        bundles[2].generateUniqueBundle();
         setBundleWFPath(bundles[0], bundles[1], bundles[2]);
         bundles[1].addClusterToBundle(bundles[2].getClusters().get(0),
                 ClusterType.TARGET, null, null);
-        usualGrind(cluster3, bundles[1]);
+        usualGrind(bundles[1]);
         Util.restartService(cluster3.getClusterHelper());
     }
 
@@ -220,8 +223,6 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
         dualComparison(prism, cluster3, bundles[1].getProcessData());
 
-        OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS,
-                bundles[1].getProcessName(), 0);
         waitingForBundleFinish(cluster3, oldBundleId, 15);
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated correctly.
@@ -371,8 +372,6 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(),
                 Util.getProcessObject(updatedProcess).getFrequency());
         dualComparison(prism, cluster3, bundles[1].getProcessData());
-        waitingForBundleFinish(cluster3, oldBundleId);
-
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
     }
 
@@ -458,6 +457,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
 
         boolean doesExist = false;
+        OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
         while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED
                 &&
                 status != Job.Status.DONEWITHERROR) {
@@ -555,8 +555,6 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         bundles[1].verifyDependencyListing(cluster2);
 
         dualComparison(prism, cluster3, bundles[1].getProcessData());
-        OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS,
-                bundles[1].getProcessName(), 0);
         waitingForBundleFinish(cluster3, oldBundleId);
         //ensure that the running process has new coordinators created; while the submitted
         // one is updated
@@ -631,6 +629,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                 Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
 
         boolean doesExist = false;
+        OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
         while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED
                 &&
                 status != Job.Status.DONEWITHERROR) {
@@ -649,10 +648,6 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         }
 
         Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
-
-        OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS,
-                bundles[1].getProcessName(), 0);
-
         waitingForBundleFinish(cluster3, oldBundleId);
 
         int finalNumberOfInstances =
@@ -732,6 +727,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
                         Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
 
         boolean doesExist = false;
+        OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
         while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED
                 &&
                 status != Job.Status.DONEWITHERROR) {
@@ -753,7 +749,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         OozieUtil.verifyNewBundleCreation(cluster3, InstanceUtil
                         .getLatestBundleID(cluster3,
                                 Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS),
-                oldNominalTimes, Util.readEntityName(bundles[1].getProcessData()), false,
+                oldNominalTimes, bundles[1].getProcessData(), false,
                 true
         );
 
@@ -979,7 +975,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
 
     @Test(groups = { "multiCluster" }, timeOut = 1200000)
     public void updateProcessAddNewInputInEachColoWithOneProcessSuspended() throws Exception {
-        String startTime = TimeUtil.getTimeWrtSystemTime(2);
+        String startTime = TimeUtil.getTimeWrtSystemTime(1);
         String endTime = TimeUtil.getTimeWrtSystemTime(6);
         bundles[1].setProcessValidity(startTime, endTime);
 
@@ -1371,7 +1367,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         ));
 
         waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
-
+        OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
         AssertUtil.assertSucceeded(
                 prism.getProcessHelper()
                         .update(bundles[1].getProcessData(), bundles[1].getProcessData()));
@@ -1486,8 +1482,6 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         bundles[1].verifyDependencyListing(cluster2);
 
         dualComparison(prism, cluster3, bundles[1].getProcessData());
-        waitingForBundleFinish(cluster3, oldBundleId);
-
         AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
     }
 
@@ -1621,11 +1615,10 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         }
     }
 
-    private Bundle usualGrind(ColoHelper prism, Bundle b) throws Exception {
+    private Bundle usualGrind(Bundle b) throws Exception {
         b.setInputFeedDataPath(inputFeedPath);
         String prefix = b.getFeedDataPathPrefix();
         HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS);
-        HadoopUtil.lateDataReplenish(cluster1FS, 60, 1, prefix, null);
         final String starTime = TimeUtil.getTimeWrtSystemTime(3);
         String endTime = TimeUtil.getTimeWrtSystemTime(7);
         b.setProcessPeriodicity(1, TimeUnit.minutes);
@@ -1682,8 +1675,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
         throws Exception {
         int wait = 0;
         while (!OozieUtil.isBundleOver(coloHelper, bundleId)) {
+            //create missing dependencies if new instance have come up
+            OozieUtil.createMissingDependenciesForBundle(coloHelper, bundleId);
+
             //keep waiting
-            LOGGER.info("bundle not over .. waiting");
+            LOGGER.info("bundle " + bundleId + " not over .. waiting");
             TimeUtil.sleepSeconds(60);
             wait++;
             if (wait == minutes) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a349aeda/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
index fac89d7..a017c79 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
@@ -417,7 +417,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
                 Util.readEntityName(feed), EntityType.FEED);
 
             //send update again
-            r = prism.getFeedHelper().update(feed, updatedFeed, updateTime);
+            r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null);
             AssertUtil.assertSucceeded(r);
 
             //verify new bundle creation on cluster_2 and no new bundle on cluster_1
@@ -429,7 +429,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
                 .verifyNewBundleCreation(cluster_1, newBundle_cluster1, oldNominalTimes_cluster1,
                     feed, false, false);
             //wait till update time is reached
-            TimeUtil.sleepTill(updateTime);
+            TimeUtil.sleepTill(TimeUtil.getTimeWrtSystemTime(5));
 
             //verify new bundle creation with instance matching
             OozieUtil
@@ -542,7 +542,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
         logger.info("Updated Feed :" + Util.prettyPrintXml(updatedFeed));
         logger.info("Update Time : " + updateTime);
 
-        r = prism.getFeedHelper().update(feed, updatedFeed, updateTime);
+        r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null);
         AssertUtil.assertSucceeded(r);
         InstanceUtil.waitTillInstancesAreCreated(cluster_1, feed, 1);