You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ar...@apache.org on 2014/08/21 19:30:55 UTC
[13/18] git commit: FALCON-622 Fix ProcessUpdate and update at
specific time test contributed by Samarth Gupta
FALCON-622 Fix ProcessUpdate and update at specific time test contributed by Samarth Gupta
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/a349aeda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/a349aeda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/a349aeda
Branch: refs/heads/FALCON-585
Commit: a349aeda7b7455929fee51da56b78688239488d5
Parents: b991e81
Author: Samarth Gupta <sa...@inmobi.com>
Authored: Wed Aug 20 14:32:19 2014 +0530
Committer: Samarth Gupta <sa...@inmobi.com>
Committed: Wed Aug 20 14:32:19 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../falcon/regression/core/util/OozieUtil.java | 12 +++++--
.../falcon/regression/ProcessLibPathTest.java | 8 +++++
.../prism/NewPrismProcessUpdateTest.java | 34 +++++++++-----------
.../prism/UpdateAtSpecificTimeTest.java | 6 ++--
5 files changed, 37 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a349aeda/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6409032..3521216 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -64,6 +64,8 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-622 Fix ProcessUpdate and update at specific time test
+ (Samarthg)
FALCON-616 cluster submission should fail when shared libs copy fail
(Shwetha GS via Suhas Vasu)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a349aeda/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
index e22416a..806bbd3 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
@@ -395,7 +395,7 @@ public final class OozieUtil {
initialNominalTimes.removeAll(nominalTimesOriginalAndNew);
if (initialNominalTimes.size() != 0) {
- LOGGER.debug("Missing instance are : " + initialNominalTimes);
+ LOGGER.info("Missing instance are : " + initialNominalTimes);
LOGGER.debug("Original Bundle ID : " + originalBundleId);
LOGGER.debug("New Bundle ID : " + newBundleId);
@@ -456,8 +456,14 @@ public final class OozieUtil {
String entityName, int bundleNumber)
throws OozieClientException, IOException {
String bundleID = InstanceUtil.getSequenceBundleID(helper, entityName, type, bundleNumber);
+ createMissingDependenciesForBundle(helper, bundleID);
+
+ }
+
+ public static void createMissingDependenciesForBundle(ColoHelper helper, String bundleId)
+ throws OozieClientException, IOException {
OozieClient oozieClient = helper.getClusterHelper().getOozieClient();
- List<CoordinatorJob> coords = oozieClient.getBundleJobInfo(bundleID).getCoordinators();
+ List<CoordinatorJob> coords = oozieClient.getBundleJobInfo(bundleId).getCoordinators();
for (CoordinatorJob coord : coords) {
CoordinatorJob temp = oozieClient.getCoordJobInfo(coord.getId());
@@ -465,7 +471,7 @@ public final class OozieUtil {
instanceNumber++) {
CoordinatorAction instance = temp.getActions().get(instanceNumber);
InstanceUtil.createHDFSFolders(helper,
- Arrays.asList(instance.getMissingDependencies().split("#")));
+ Arrays.asList(instance.getMissingDependencies().split("#")));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a349aeda/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
index 01e58c9..fc8e4a8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
@@ -18,6 +18,7 @@
package org.apache.falcon.regression;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
import org.apache.falcon.regression.core.helpers.ColoHelper;
@@ -25,6 +26,7 @@ import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
@@ -109,6 +111,9 @@ public class ProcessLibPathTest extends BaseTestClass {
bundles[0].setProcessWorkflow(workflowDir);
logger.info("processData: " + Util.prettyPrintXml(bundles[0].getProcessData()));
bundles[0].submitFeedsScheduleProcess(prism);
+ InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, Util.readEntityName(bundles[0]
+ .getProcessData()), 0);
InstanceUtil
.waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
}
@@ -128,6 +133,9 @@ public class ProcessLibPathTest extends BaseTestClass {
bundles[0].setProcessWorkflow(workflowDir);
logger.info("processData: " + Util.prettyPrintXml(bundles[0].getProcessData()));
bundles[0].submitFeedsScheduleProcess(prism);
+ InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, Util.readEntityName(bundles[0]
+ .getProcessData()), 0);
InstanceUtil
.waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a349aeda/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
index f9f37cb..f4553df 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
@@ -90,12 +90,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
LOGGER.info("test name: " + method.getName());
Bundle b = BundleUtil.readUpdateBundle();
bundles[0] = new Bundle(b, cluster1);
+ bundles[0].generateUniqueBundle();
bundles[1] = new Bundle(b, cluster2);
+ bundles[1].generateUniqueBundle();
bundles[2] = new Bundle(b, cluster3);
+ bundles[2].generateUniqueBundle();
setBundleWFPath(bundles[0], bundles[1], bundles[2]);
bundles[1].addClusterToBundle(bundles[2].getClusters().get(0),
ClusterType.TARGET, null, null);
- usualGrind(cluster3, bundles[1]);
+ usualGrind(bundles[1]);
Util.restartService(cluster3.getClusterHelper());
}
@@ -220,8 +223,6 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
dualComparison(prism, cluster3, bundles[1].getProcessData());
- OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS,
- bundles[1].getProcessName(), 0);
waitingForBundleFinish(cluster3, oldBundleId, 15);
//ensure that the running process has new coordinators created; while the submitted
// one is updated correctly.
@@ -371,8 +372,6 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(),
Util.getProcessObject(updatedProcess).getFrequency());
dualComparison(prism, cluster3, bundles[1].getProcessData());
- waitingForBundleFinish(cluster3, oldBundleId);
-
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@@ -458,6 +457,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
boolean doesExist = false;
+ OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED
&&
status != Job.Status.DONEWITHERROR) {
@@ -555,8 +555,6 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
bundles[1].verifyDependencyListing(cluster2);
dualComparison(prism, cluster3, bundles[1].getProcessData());
- OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS,
- bundles[1].getProcessName(), 0);
waitingForBundleFinish(cluster3, oldBundleId);
//ensure that the running process has new coordinators created; while the submitted
// one is updated
@@ -631,6 +629,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
boolean doesExist = false;
+ OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED
&&
status != Job.Status.DONEWITHERROR) {
@@ -649,10 +648,6 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
}
Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!");
-
- OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS,
- bundles[1].getProcessName(), 0);
-
waitingForBundleFinish(cluster3, oldBundleId);
int finalNumberOfInstances =
@@ -732,6 +727,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS);
boolean doesExist = false;
+ OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
while (status != Job.Status.SUCCEEDED && status != Job.Status.FAILED
&&
status != Job.Status.DONEWITHERROR) {
@@ -753,7 +749,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
OozieUtil.verifyNewBundleCreation(cluster3, InstanceUtil
.getLatestBundleID(cluster3,
Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS),
- oldNominalTimes, Util.readEntityName(bundles[1].getProcessData()), false,
+ oldNominalTimes, bundles[1].getProcessData(), false,
true
);
@@ -979,7 +975,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
@Test(groups = { "multiCluster" }, timeOut = 1200000)
public void updateProcessAddNewInputInEachColoWithOneProcessSuspended() throws Exception {
- String startTime = TimeUtil.getTimeWrtSystemTime(2);
+ String startTime = TimeUtil.getTimeWrtSystemTime(1);
String endTime = TimeUtil.getTimeWrtSystemTime(6);
bundles[1].setProcessValidity(startTime, endTime);
@@ -1371,7 +1367,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
));
waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING);
-
+ OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0);
AssertUtil.assertSucceeded(
prism.getProcessHelper()
.update(bundles[1].getProcessData(), bundles[1].getProcessData()));
@@ -1486,8 +1482,6 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
bundles[1].verifyDependencyListing(cluster2);
dualComparison(prism, cluster3, bundles[1].getProcessData());
- waitingForBundleFinish(cluster3, oldBundleId);
-
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
@@ -1621,11 +1615,10 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
}
}
- private Bundle usualGrind(ColoHelper prism, Bundle b) throws Exception {
+ private Bundle usualGrind(Bundle b) throws Exception {
b.setInputFeedDataPath(inputFeedPath);
String prefix = b.getFeedDataPathPrefix();
HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS);
- HadoopUtil.lateDataReplenish(cluster1FS, 60, 1, prefix, null);
final String starTime = TimeUtil.getTimeWrtSystemTime(3);
String endTime = TimeUtil.getTimeWrtSystemTime(7);
b.setProcessPeriodicity(1, TimeUnit.minutes);
@@ -1682,8 +1675,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass {
throws Exception {
int wait = 0;
while (!OozieUtil.isBundleOver(coloHelper, bundleId)) {
+ //create missing dependencies if new instance have come up
+ OozieUtil.createMissingDependenciesForBundle(coloHelper, bundleId);
+
//keep waiting
- LOGGER.info("bundle not over .. waiting");
+ LOGGER.info("bundle " + bundleId + " not over .. waiting");
TimeUtil.sleepSeconds(60);
wait++;
if (wait == minutes) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a349aeda/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
index fac89d7..a017c79 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
@@ -417,7 +417,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
Util.readEntityName(feed), EntityType.FEED);
//send update again
- r = prism.getFeedHelper().update(feed, updatedFeed, updateTime);
+ r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null);
AssertUtil.assertSucceeded(r);
//verify new bundle creation on cluster_2 and no new bundle on cluster_1
@@ -429,7 +429,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
.verifyNewBundleCreation(cluster_1, newBundle_cluster1, oldNominalTimes_cluster1,
feed, false, false);
//wait till update time is reached
- TimeUtil.sleepTill(updateTime);
+ TimeUtil.sleepTill(TimeUtil.getTimeWrtSystemTime(5));
//verify new bundle creation with instance matching
OozieUtil
@@ -542,7 +542,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass {
logger.info("Updated Feed :" + Util.prettyPrintXml(updatedFeed));
logger.info("Update Time : " + updateTime);
- r = prism.getFeedHelper().update(feed, updatedFeed, updateTime);
+ r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null);
AssertUtil.assertSucceeded(r);
InstanceUtil.waitTillInstancesAreCreated(cluster_1, feed, 1);