You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ro...@apache.org on 2015/04/14 16:01:35 UTC
[5/5] falcon git commit: FALCON-1151 Migrate oozie related methods
from InstanceUtil.java to OozieUtil.java. Contributed by Paul Isaychuk
FALCON-1151 Migrate oozie related methods from InstanceUtil.java to OozieUtil.java. Contributed by Paul Isaychuk
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d0c9850e
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d0c9850e
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d0c9850e
Branch: refs/heads/master
Commit: d0c9850e5abf56b3e3c500ee744e96e634ba7691
Parents: 9d5429a
Author: Ruslan Ostafiychuk <ro...@apache.org>
Authored: Tue Apr 14 17:00:25 2015 +0300
Committer: Ruslan Ostafiychuk <ro...@apache.org>
Committed: Tue Apr 14 17:00:25 2015 +0300
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 3 +
.../regression/core/util/InstanceUtil.java | 382 +++-----------
.../falcon/regression/core/util/OozieUtil.java | 332 +++++++++---
.../falcon/regression/AuthorizationTest.java | 86 ++--
.../regression/ELExpCurrentAndLastWeekTest.java | 19 +-
.../falcon/regression/ELValidationsTest.java | 18 +-
.../regression/EmbeddedPigScriptTest.java | 3 +-
.../falcon/regression/ExternalFSTest.java | 11 +-
.../regression/FeedClusterUpdateTest.java | 510 +++++--------------
.../regression/FeedInstanceListingTest.java | 19 +-
.../falcon/regression/FeedLateRerunTest.java | 32 +-
.../falcon/regression/FeedReplicationTest.java | 28 +-
.../regression/FeedSubmitAndScheduleTest.java | 6 +-
.../falcon/regression/InstanceParamTest.java | 6 +-
.../falcon/regression/InstanceSummaryTest.java | 2 +-
.../apache/falcon/regression/LogMoverTest.java | 4 +-
.../apache/falcon/regression/NewRetryTest.java | 19 +-
.../regression/ProcessInstanceKillsTest.java | 26 +-
.../regression/ProcessInstanceRerunTest.java | 36 +-
.../regression/ProcessInstanceResumeTest.java | 12 +-
.../regression/ProcessInstanceRunningTest.java | 10 +-
.../regression/ProcessInstanceStatusTest.java | 50 +-
.../regression/ProcessInstanceSuspendTest.java | 10 +-
.../falcon/regression/ProcessLateRerunTest.java | 20 +-
.../regression/ProcessLibPathLoadTest.java | 14 +-
.../falcon/regression/ProcessLibPathTest.java | 10 +-
.../regression/TouchAPIPrismAndServerTest.java | 46 +-
.../regression/hcat/HCatFeedOperationsTest.java | 6 +-
.../regression/hcat/HCatReplicationTest.java | 16 +-
.../regression/lineage/EntitySummaryTest.java | 15 +-
.../lineage/LineageApiProcessInstanceTest.java | 5 +-
.../lineage/ListFeedInstancesTest.java | 2 +-
.../lineage/ListProcessInstancesTest.java | 2 +-
.../falcon/regression/prism/FeedDelayTest.java | 24 +-
.../prism/NewPrismProcessUpdateTest.java | 328 ++++++------
.../regression/prism/OptionalInputTest.java | 2 +-
.../prism/PrismFeedLateReplicationTest.java | 107 ++--
.../PrismFeedReplicationPartitionExpTest.java | 98 +---
.../prism/PrismFeedReplicationUpdateTest.java | 30 +-
.../regression/prism/PrismFeedUpdateTest.java | 6 +-
.../prism/PrismProcessScheduleTest.java | 10 +-
.../RescheduleProcessInFinalStatesTest.java | 40 +-
.../falcon/regression/prism/RetentionTest.java | 2 +-
.../prism/UpdateAtSpecificTimeTest.java | 150 +++---
.../falcon/regression/ui/ProcessUITest.java | 8 +-
45 files changed, 1063 insertions(+), 1502 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 891576c..492814b 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -63,6 +63,9 @@ Trunk (Unreleased)
via Samarth Gupta)
IMPROVEMENTS
+ FALCON-1151 Migrate oozie related methods from InstanceUtil.java to OozieUtil.java
+ (Paul Isaychuk via Ruslan Ostafiychuk)
+
FALCON-1138: JDK requirement for merlin should be 1.7 (Raghav Kumar Gautam)
FALCON-1135 Migrate methods related to *Merlin.java classes from InstanceUtil.java and
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
index 6c90256..723ea89 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
@@ -28,7 +28,6 @@ import org.apache.commons.lang.StringUtils;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper;
import org.apache.falcon.request.BaseRequest;
import org.apache.falcon.resource.APIResult;
@@ -41,7 +40,6 @@ import org.apache.log4j.Logger;
import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.Job;
import org.apache.oozie.client.Job.Status;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
@@ -58,29 +56,26 @@ import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
/**
* util functions related to instanceTest.
*/
public final class InstanceUtil {
- private InstanceUtil() {
- throw new AssertionError("Instantiating utility class...");
- }
-
+ public static final int INSTANCES_CREATED_TIMEOUT = OSUtil.IS_WINDOWS ? 20 : 10;
private static final Logger LOGGER = Logger.getLogger(InstanceUtil.class);
private static final EnumSet<Status> RUNNING_PREP_SUCCEEDED = EnumSet.of(Status.RUNNING,
Status.PREP, Status.SUCCEEDED);
+ private InstanceUtil() {
+ throw new AssertionError("Instantiating utility class...");
+ }
+
public static APIResult sendRequestProcessInstance(String url, String user)
throws IOException, URISyntaxException, AuthenticationException, InterruptedException {
return hitUrl(url, Util.getMethodType(url), user);
}
- public static final int INSTANCES_CREATED_TIMEOUT = OSUtil.IS_WINDOWS ? 20 : 10;
-
public static APIResult hitUrl(String url,
String method, String user) throws URISyntaxException,
IOException, AuthenticationException, InterruptedException {
@@ -169,11 +164,20 @@ public final class InstanceUtil {
return Collections.frequency(statuses, workflowStatus);
}
+ /**
+ * Validates that response doesn't contains instances.
+ * @param r response
+ */
public static void validateSuccessWOInstances(InstancesResult r) {
AssertUtil.assertSucceeded(r);
Assert.assertNull(r.getInstances(), "Unexpected :" + Arrays.toString(r.getInstances()));
}
+ /**
+ * Validates that failed response contains specific error message.
+ * @param instancesResult response
+ * @param error expected error
+ */
public static void validateError(InstancesResult instancesResult, ResponseErrors error) {
Assert.assertTrue(instancesResult.getMessage().contains(error.getError()),
"Error should contains '" + error.getError() + "'");
@@ -205,7 +209,6 @@ public final class InstanceUtil {
LOGGER.info("status: " + status + ", instance: " + instance.getInstance());
statuses.add(status);
}
-
Assert.assertEquals(Collections.frequency(statuses, InstancesResult.WorkflowStatus.RUNNING),
runningCount, "Running Instances");
Assert.assertEquals(Collections.frequency(statuses, InstancesResult.WorkflowStatus.SUSPENDED),
@@ -216,22 +219,27 @@ public final class InstanceUtil {
killedCount, "Killed Instances");
}
+ /**
+ * Retrieves workflow IDs from every instances from response.
+ * @param instancesResult response
+ * @return list of workflow IDs
+ */
public static List<String> getWorkflowJobIds(InstancesResult instancesResult) {
InstancesResult.Instance[] instances = instancesResult.getInstances();
- LOGGER.info("instances: " + Arrays.toString(instances));
- Assert.assertNotNull(instances, "instances should be not null");
- List<String> wfids = new ArrayList<String>();
+ LOGGER.info("Instances: " + Arrays.toString(instances));
+ Assert.assertNotNull(instances, "Instances should be not null");
+ List<String> wfIds = new ArrayList<String>();
for (InstancesResult.Instance instance : instances) {
- LOGGER.warn("instance: " + instance + " , status: "
- + instance.getStatus() + ", logs : " + instance.getLogFile());
+ LOGGER.warn(String.format(
+ "instance: %s, status: %s, logs : %s", instance, instance.getStatus(), instance.getLogFile()));
if (instance.getStatus().name().equals("RUNNING") || instance.getStatus().name().equals("SUCCEEDED")) {
- wfids.add(instance.getLogFile());
+ wfIds.add(instance.getLogFile());
}
if (instance.getStatus().name().equals("KILLED") || instance.getStatus().name().equals("WAITING")) {
Assert.assertNull(instance.getLogFile());
}
}
- return wfids;
+ return wfIds;
}
/**
@@ -250,21 +258,27 @@ public final class InstanceUtil {
}
}
Assert.assertEquals(counter, failCount, "Actual number of failed instances does not "
- + "match expected number of failed instances.");
+ + "match to expected number of failed instances.");
}
- public static List<String> getWorkflows(ColoHelper prismHelper, String processName,
+ /**
+ * Gets process workflows by given statuses.
+ * @param oozieClient oozie client of cluster where process is running
+ * @param processName process name
+ * @param statuses statuses workflows will be selected by
+ * @return list of matching workflows
+ * @throws OozieClientException
+ */
+ public static List<String> getWorkflows(OozieClient oozieClient, String processName,
WorkflowJob.Status... statuses) throws OozieClientException {
- OozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient();
String bundleID = OozieUtil.getBundles(oozieClient, processName, EntityType.PROCESS).get(0);
- List<String> workflowJobIds = OozieUtil.getWorkflowJobs(prismHelper, bundleID);
+ List<String> workflowJobIds = OozieUtil.getWorkflowJobs(oozieClient, bundleID);
List<String> toBeReturned = new ArrayList<String>();
for (String jobId : workflowJobIds) {
WorkflowJob wfJob = oozieClient.getJobInfo(jobId);
LOGGER.info("wfJob.getId(): " + wfJob.getId() + " wfJob.getStartTime(): "
- + wfJob.getStartTime()
- + "jobId: " + jobId + " wfJob.getStatus(): " + wfJob.getStatus());
+ + wfJob.getStartTime() + "jobId: " + jobId + " wfJob.getStatus(): " + wfJob.getStatus());
if (statuses.length == 0 || Arrays.asList(statuses).contains(wfJob.getStatus())) {
toBeReturned.add(jobId);
}
@@ -304,44 +318,17 @@ public final class InstanceUtil {
}
}
- public static List<CoordinatorAction> getProcessInstanceList(ColoHelper coloHelper,
+ public static List<CoordinatorAction> getProcessInstanceList(OozieClient oozieClient,
String processName, EntityType entityType) throws OozieClientException {
- OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
- String coordId = getLatestCoordinatorID(oozieClient, processName, entityType);
+ String coordId = OozieUtil.getLatestCoordinatorID(oozieClient, processName, entityType);
//String coordId = getDefaultCoordinatorFromProcessName(processName);
LOGGER.info("default coordID: " + coordId);
return oozieClient.getCoordJobInfo(coordId).getActions();
}
- public static String getLatestCoordinatorID(OozieClient oozieClient, String processName,
- EntityType entityType) throws OozieClientException {
- final String latestBundleID = getLatestBundleID(oozieClient, processName, entityType);
- return getDefaultCoordIDFromBundle(oozieClient, latestBundleID);
- }
-
- public static String getDefaultCoordIDFromBundle(OozieClient oozieClient, String bundleId)
- throws OozieClientException {
- OozieUtil.waitForCoordinatorJobCreation(oozieClient, bundleId);
- BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleId);
- List<CoordinatorJob> coords = bundleInfo.getCoordinators();
- int min = 100000;
- String minString = "";
- for (CoordinatorJob coord : coords) {
- String strID = coord.getId();
- if (min > Integer.parseInt(strID.substring(0, strID.indexOf('-')))) {
- min = Integer.parseInt(strID.substring(0, strID.indexOf('-')));
- minString = coord.getId();
- }
- }
- LOGGER.info("function getDefaultCoordIDFromBundle: minString: " + minString);
- return minString;
- }
-
- public static int getInstanceCountWithStatus(ColoHelper coloHelper, String processName,
- org.apache.oozie.client.CoordinatorAction.Status status, EntityType entityType)
- throws OozieClientException {
- List<CoordinatorAction> coordActions = getProcessInstanceList(coloHelper, processName,
- entityType);
+ public static int getInstanceCountWithStatus(OozieClient oozieClient, String processName,
+ CoordinatorAction.Status status, EntityType entityType) throws OozieClientException {
+ List<CoordinatorAction> coordActions = getProcessInstanceList(oozieClient, processName, entityType);
List<CoordinatorAction.Status> statuses = new ArrayList<CoordinatorAction.Status>();
for (CoordinatorAction action : coordActions) {
statuses.add(action.getStatus());
@@ -349,109 +336,10 @@ public final class InstanceUtil {
return Collections.frequency(statuses, status);
}
- public static Status getDefaultCoordinatorStatus(ColoHelper colohelper, String processName,
- int bundleNumber) throws OozieClientException {
- OozieClient oozieClient = colohelper.getProcessHelper().getOozieClient();
- String bundleID =
- getSequenceBundleID(oozieClient, processName, EntityType.PROCESS, bundleNumber);
- String coordId = getDefaultCoordIDFromBundle(oozieClient, bundleID);
- return oozieClient.getCoordJobInfo(coordId).getStatus();
- }
-
- /**
- * Retrieves all coordinators of bundle.
- *
- * @param oozieClient Oozie client to use for fetching info.
- * @param bundleID specific bundle ID
- * @return list of bundle coordinators
- * @throws OozieClientException
- */
- public static List<CoordinatorJob> getBundleCoordinators(OozieClient oozieClient,
- String bundleID) throws OozieClientException {
- BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleID);
- return bundleInfo.getCoordinators();
- }
-
- /**
- * Retrieves the latest bundle ID.
- *
- * @param coloHelper colo helper of cluster job is running on
- * @param entityName name of entity job is related to
- * @param entityType type of entity - feed or process expected
- * @return latest bundle ID
- * @throws OozieClientException
- */
- public static String getLatestBundleID(ColoHelper coloHelper,
- String entityName, EntityType entityType)
- throws OozieClientException {
- final OozieClient oozieClient = coloHelper.getFeedHelper().getOozieClient();
- return getLatestBundleID(oozieClient, entityName, entityType);
- }
-
- /**
- * Retrieves the latest bundle ID.
- *
- * @param oozieClient where job is running
- * @param entityName name of entity job is related to
- * @param entityType type of entity - feed or process expected
- * @return latest bundle ID
- * @throws OozieClientException
- */
- public static String getLatestBundleID(OozieClient oozieClient,
- String entityName, EntityType entityType) throws OozieClientException {
- List<String> bundleIds = OozieUtil.getBundles(oozieClient, entityName, entityType);
- String max = "0";
- int maxID = -1;
- for (String strID : bundleIds) {
- if (maxID < Integer.parseInt(strID.substring(0, strID.indexOf('-')))) {
- maxID = Integer.parseInt(strID.substring(0, strID.indexOf('-')));
- max = strID;
- }
- }
- return max;
- }
-
- /**
- * Retrieves ID of bundle related to some process/feed using its ordinal number.
- *
- * @param entityName - name of entity bundle is related to
- * @param entityType - feed or process
- * @param bundleNumber - ordinal number of bundle
- * @return bundle ID
- * @throws OozieClientException
- */
- public static String getSequenceBundleID(OozieClient oozieClient, String entityName,
- EntityType entityType, int bundleNumber)
- throws OozieClientException {
-
- //sequence start from 0
- List<String> bundleIds = OozieUtil.getBundles(oozieClient,
- entityName, entityType);
- Map<Integer, String> bundleMap = new TreeMap<Integer, String>();
- String bundleID;
- for (String strID : bundleIds) {
- LOGGER.info("getSequenceBundleID: " + strID);
- int key = Integer.parseInt(strID.substring(0, strID.indexOf('-')));
- bundleMap.put(key, strID);
- }
- for (Map.Entry<Integer, String> entry : bundleMap.entrySet()) {
- LOGGER.info("Key = " + entry.getKey() + ", Value = " + entry.getValue());
- }
- int i = 0;
- for (Map.Entry<Integer, String> entry : bundleMap.entrySet()) {
- bundleID = entry.getValue();
- if (i == bundleNumber) {
- return bundleID;
- }
- i++;
- }
- return null;
- }
-
/**
* Retrieves status of one instance.
*
- * @param coloHelper - server from which instance status will be retrieved.
+ * @param oozieClient - server from which instance status will be retrieved.
* @param processName - name of process which mentioned instance belongs to.
* @param bundleNumber - ordinal number of one of the bundle which are related to that
* process.
@@ -459,17 +347,13 @@ public final class InstanceUtil {
* @return - state of mentioned instance.
* @throws OozieClientException
*/
- public static CoordinatorAction.Status getInstanceStatus(ColoHelper coloHelper,
- String processName,
- int bundleNumber, int
- instanceNumber) throws OozieClientException {
- final OozieClient oozieClient = coloHelper.getClusterHelper().getOozieClient();
- String bundleID =
- getSequenceBundleID(oozieClient, processName, EntityType.PROCESS, bundleNumber);
+ public static CoordinatorAction.Status getInstanceStatus(OozieClient oozieClient, String processName,
+ int bundleNumber, int instanceNumber) throws OozieClientException {
+ String bundleID = OozieUtil.getSequenceBundleID(oozieClient, processName, EntityType.PROCESS, bundleNumber);
if (StringUtils.isEmpty(bundleID)) {
return null;
}
- String coordID = InstanceUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID);
+ String coordID = OozieUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID);
if (StringUtils.isEmpty(coordID)) {
return null;
}
@@ -487,22 +371,6 @@ public final class InstanceUtil {
}
/**
- * Retrieves replication coordinatorID from bundle of coordinators.
- */
- public static List<String> getReplicationCoordID(String bundleId, AbstractEntityHelper helper)
- throws OozieClientException {
- final OozieClient oozieClient = helper.getOozieClient();
- List<CoordinatorJob> coords = InstanceUtil.getBundleCoordinators(oozieClient, bundleId);
- List<String> replicationCoordID = new ArrayList<String>();
- for (CoordinatorJob coord : coords) {
- if (coord.getAppName().contains("FEED_REPLICATION")) {
- replicationCoordID.add(coord.getId());
- }
- }
- return replicationCoordID;
- }
-
- /**
* Forms and sends process instance request based on url of action to be performed and it's
* parameters.
*
@@ -510,8 +378,7 @@ public final class InstanceUtil {
* @param user - whose credentials will be used for this action
* @return result from API
*/
- public static APIResult createAndSendRequestProcessInstance(
- String url, String params, String colo, String user)
+ public static APIResult createAndSendRequestProcessInstance(String url, String params, String colo, String user)
throws IOException, URISyntaxException, AuthenticationException, InterruptedException {
if (params != null && !colo.equals("")) {
url = url + params + "&" + colo.substring(1);
@@ -520,12 +387,11 @@ public final class InstanceUtil {
} else {
url = url + colo;
}
- return InstanceUtil.sendRequestProcessInstance(url, user);
+ return sendRequestProcessInstance(url, user);
}
public static org.apache.oozie.client.WorkflowJob.Status getInstanceStatusFromCoord(
- ColoHelper coloHelper, String coordID, int instanceNumber) throws OozieClientException {
- OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
+ OozieClient oozieClient, String coordID, int instanceNumber) throws OozieClientException {
CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID);
String jobId = coordInfo.getActions().get(instanceNumber).getExternalId();
LOGGER.info("jobId = " + jobId);
@@ -537,15 +403,14 @@ public final class InstanceUtil {
}
public static List<String> getInputFoldersForInstanceForReplication(
- ColoHelper coloHelper, String coordID, int instanceNumber) throws OozieClientException {
- OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
+ OozieClient oozieClient, String coordID, int instanceNumber) throws OozieClientException {
CoordinatorAction x = oozieClient.getCoordActionInfo(coordID + "@" + instanceNumber);
String jobId = x.getExternalId();
WorkflowJob wfJob = oozieClient.getJobInfo(jobId);
- return InstanceUtil.getReplicationFolderFromInstanceRunConf(wfJob.getConf());
+ return getReplicationFolderFromInstanceRunConf(wfJob.getConf());
}
- public static List<String> getReplicationFolderFromInstanceRunConf(String runConf) {
+ private static List<String> getReplicationFolderFromInstanceRunConf(String runConf) {
String conf;
conf = runConf.substring(runConf.indexOf("falconInPaths</name>") + 20);
conf = conf.substring(conf.indexOf("<value>") + 7);
@@ -553,13 +418,10 @@ public final class InstanceUtil {
return new ArrayList<String>(Arrays.asList(conf.split(",")));
}
- public static int getInstanceRunIdFromCoord(ColoHelper colo, String coordID, int instanceNumber)
+ public static int getInstanceRunIdFromCoord(OozieClient oozieClient, String coordID, int instanceNumber)
throws OozieClientException {
- OozieClient oozieClient = colo.getProcessHelper().getOozieClient();
CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID);
-
- WorkflowJob actionInfo =
- oozieClient.getJobInfo(coordInfo.getActions().get(instanceNumber).getExternalId());
+ WorkflowJob actionInfo = oozieClient.getJobInfo(coordInfo.getActions().get(instanceNumber).getExternalId());
return actionInfo.getRun();
}
@@ -575,11 +437,11 @@ public final class InstanceUtil {
List<String> bundleIds = OozieUtil.getBundles(oozieClient, feedName, EntityType.FEED);
LOGGER.info("bundleIds: " + bundleIds);
- for (String aBundleId : bundleIds) {
- LOGGER.info("aBundleId: " + aBundleId);
- OozieUtil.waitForCoordinatorJobCreation(oozieClient, aBundleId);
+ for (String bundleId : bundleIds) {
+ LOGGER.info("bundleId: " + bundleId);
+ OozieUtil.waitForCoordinatorJobCreation(oozieClient, bundleId);
List<CoordinatorJob> coords =
- InstanceUtil.getBundleCoordinators(oozieClient, aBundleId);
+ OozieUtil.getBundleCoordinators(oozieClient, bundleId);
LOGGER.info("coords: " + coords);
for (CoordinatorJob coord : coords) {
if (coord.getAppName().contains(coordType)) {
@@ -591,9 +453,8 @@ public final class InstanceUtil {
}
public static List<CoordinatorAction> getProcessInstanceListFromAllBundles(
- ColoHelper coloHelper, String processName, EntityType entityType)
+ OozieClient oozieClient, String processName, EntityType entityType)
throws OozieClientException {
- OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
List<CoordinatorAction> list = new ArrayList<CoordinatorAction>();
final List<String> bundleIds = OozieUtil.getBundles(oozieClient, processName, entityType);
LOGGER.info("bundle size for process is " + bundleIds.size());
@@ -609,37 +470,31 @@ public final class InstanceUtil {
list.addAll(actions);
}
}
- String coordId = getLatestCoordinatorID(oozieClient, processName, entityType);
+ String coordId = OozieUtil.getLatestCoordinatorID(oozieClient, processName, entityType);
LOGGER.info("default coordID: " + coordId);
return list;
}
- public static String getOutputFolderForInstanceForReplication(ColoHelper coloHelper,
+ public static String getOutputFolderForInstanceForReplication(OozieClient oozieClient,
String coordID, int instanceNumber) throws OozieClientException {
- OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID);
final CoordinatorAction coordAction = coordInfo.getActions().get(instanceNumber);
final String actionConf = oozieClient.getJobInfo(coordAction.getExternalId()).getConf();
- return InstanceUtil.getReplicatedFolderFromInstanceRunConf(actionConf);
+ return getReplicatedFolderFromInstanceRunConf(actionConf);
}
- private static String getReplicatedFolderFromInstanceRunConf(
- String runConf) {
- String inputPathExample =
- InstanceUtil.getReplicationFolderFromInstanceRunConf(runConf).get(0);
- String postFix = inputPathExample
- .substring(inputPathExample.length() - 7, inputPathExample.length());
+ private static String getReplicatedFolderFromInstanceRunConf(String runConf) {
+ String inputPathExample = getReplicationFolderFromInstanceRunConf(runConf).get(0);
+ String postFix = inputPathExample.substring(inputPathExample.length() - 7, inputPathExample.length());
return getReplicatedFolderBaseFromInstanceRunConf(runConf) + postFix;
}
public static String getOutputFolderBaseForInstanceForReplication(
- ColoHelper coloHelper, String coordID, int instanceNumber) throws OozieClientException {
- OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
+ OozieClient oozieClient, String coordID, int instanceNumber) throws OozieClientException {
CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID);
-
final CoordinatorAction coordAction = coordInfo.getActions().get(instanceNumber);
final String actionConf = oozieClient.getJobInfo(coordAction.getExternalId()).getConf();
- return InstanceUtil.getReplicatedFolderBaseFromInstanceRunConf(actionConf);
+ return getReplicatedFolderBaseFromInstanceRunConf(actionConf);
}
private static String getReplicatedFolderBaseFromInstanceRunConf(String runConf) {
@@ -661,10 +516,8 @@ public final class InstanceUtil {
* @param totalMinutesToWait time in minutes for which instance state should be polled
* @throws OozieClientException
*/
- public static void waitTillInstanceReachState(OozieClient client, String entityName,
- int instancesNumber,
- CoordinatorAction.Status expectedStatus,
- EntityType entityType, int totalMinutesToWait)
+ public static void waitTillInstanceReachState(OozieClient client, String entityName, int instancesNumber,
+ CoordinatorAction.Status expectedStatus, EntityType entityType, int totalMinutesToWait)
throws OozieClientException {
String filter;
// get the bundle ids
@@ -682,7 +535,7 @@ public final class InstanceUtil {
TimeUtil.sleepSeconds(5);
}
if (bundleJobs.size() == 0) {
- Assert.assertTrue(false, "Could not retrieve bundles");
+ Assert.fail("Could not retrieve bundles");
}
List<String> bundleIds = OozieUtil.getBundleIds(bundleJobs);
String bundleId = OozieUtil.getMaxId(bundleIds);
@@ -735,7 +588,7 @@ public final class InstanceUtil {
TimeUtil.sleepSeconds(sleepTime);
}
}
- Assert.assertTrue(false, "expected state of instance was never reached");
+ Assert.fail("expected state of instance was never reached");
}
/**
@@ -759,54 +612,6 @@ public final class InstanceUtil {
}
/**
- * Waits till bundle job will reach expected status.
- * Generates time according to expected status.
- *
- * @param coloHelper colo helper of cluster job is running on
- * @param processName name of process which job is being analyzed
- * @param expectedStatus job status we are waiting for
- * @throws OozieClientException
- */
- public static void waitForBundleToReachState(ColoHelper coloHelper,
- String processName, Job.Status expectedStatus) throws
- OozieClientException {
- int totalMinutesToWait = getMinutesToWait(expectedStatus);
- waitForBundleToReachState(coloHelper, processName, expectedStatus, totalMinutesToWait);
- }
-
- /**
- * Waits till bundle job will reach expected status during specific time.
- * Use it directly in test cases when timeouts are different from trivial, in other cases use
- * waitForBundleToReachState(ColoHelper, String, Status)
- *
- * @param coloHelper colo helper of cluster job is running on
- * @param processName name of process which job is being analyzed
- * @param expectedStatus job status we are waiting for
- * @param totalMinutesToWait specific time to wait expected state
- * @throws OozieClientException
- */
- public static void waitForBundleToReachState(ColoHelper coloHelper,
- String processName, Job.Status expectedStatus, int totalMinutesToWait) throws
- OozieClientException {
-
- int sleep = totalMinutesToWait * 60 / 20;
- for (int sleepCount = 0; sleepCount < sleep; sleepCount++) {
- String bundleID =
- InstanceUtil.getLatestBundleID(coloHelper, processName, EntityType.PROCESS);
- OozieClient oozieClient =
- coloHelper.getProcessHelper().getOozieClient();
- BundleJob j = oozieClient.getBundleJobInfo(bundleID);
- LOGGER.info(sleepCount + ". Current status: " + j.getStatus()
- + "; expected: " + expectedStatus);
- if (j.getStatus() == expectedStatus) {
- return;
- }
- TimeUtil.sleepSeconds(20);
- }
- Assert.fail("State " + expectedStatus + " wasn't reached in " + totalMinutesToWait + " mins");
- }
-
- /**
* Generates time which is presumably needed for process/feed instances to reach particular
* state.
* Feed instances are running faster then process, so feed timeouts are less then process.
@@ -815,8 +620,7 @@ public final class InstanceUtil {
* @param expectedStatus expected status we are waiting for
* @return minutes to wait for expected status
*/
- private static int getMinutesToWait(EntityType entityType,
- CoordinatorAction.Status expectedStatus) {
+ private static int getMinutesToWait(EntityType entityType, CoordinatorAction.Status expectedStatus) {
switch (expectedStatus) {
case RUNNING:
if (entityType == EntityType.PROCESS) {
@@ -841,43 +645,22 @@ public final class InstanceUtil {
}
/**
- * Generates time which is presumably needed for bundle job to reach particular state.
- *
- * @param expectedStatus status which we are expect to get from bundle job
- * @return minutes to wait for expected status
- */
- private static int getMinutesToWait(Job.Status expectedStatus) {
- switch (expectedStatus) {
- case DONEWITHERROR:
- case SUCCEEDED:
- return OSUtil.IS_WINDOWS ? 40 : 20;
- case KILLED:
- return OSUtil.IS_WINDOWS ? 30 : 15;
- default:
- return OSUtil.IS_WINDOWS ? 60 : 30;
- }
- }
-
- /**
* Waits till instances of specific job will be created during specific time.
* Use this method directly in unusual test cases where timeouts are different from trivial.
- * In other cases use waitTillInstancesAreCreated(ColoHelper,String,int)
+ * In other cases use waitTillInstancesAreCreated(OozieClient,String,int)
*
* @param oozieClient oozie client of the cluster on which job is running
* @param entity definition of entity which describes job
* @param bundleSeqNo bundle number if update has happened.
* @throws OozieClientException
*/
- public static void waitTillInstancesAreCreated(OozieClient oozieClient,
- String entity,
- int bundleSeqNo,
- int totalMinutesToWait
- ) throws OozieClientException {
+ public static void waitTillInstancesAreCreated(OozieClient oozieClient, String entity, int bundleSeqNo,
+ int totalMinutesToWait) throws OozieClientException {
String entityName = Util.readEntityName(entity);
EntityType type = Util.getEntityType(entity);
- String bundleID = getSequenceBundleID(oozieClient, entityName,
+ String bundleID = OozieUtil.getSequenceBundleID(oozieClient, entityName,
type, bundleSeqNo);
- String coordID = getDefaultCoordIDFromBundle(oozieClient, bundleID);
+ String coordID = OozieUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID);
for (int sleepCount = 0; sleepCount < totalMinutesToWait; sleepCount++) {
CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID);
@@ -894,17 +677,14 @@ public final class InstanceUtil {
* Waits till instances of specific job will be created during timeout.
* Timeout is common for most of usual test cases.
*
- * @param coloHelper colo helper of cluster job is running on
+ * @param oozieClient oozieClient of cluster job is running on
* @param entity definition of entity which describes job
* @param bundleSeqNo bundle number if update has happened.
* @throws OozieClientException
*/
- public static void waitTillInstancesAreCreated(ColoHelper coloHelper,
- String entity,
- int bundleSeqNo
+ public static void waitTillInstancesAreCreated(OozieClient oozieClient, String entity, int bundleSeqNo
) throws OozieClientException {
int sleep = INSTANCES_CREATED_TIMEOUT * 60 / 5;
- final OozieClient oozieClient = coloHelper.getClusterHelper().getOozieClient();
waitTillInstancesAreCreated(oozieClient, entity, bundleSeqNo, sleep);
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
index baa69c7..138b45f 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
@@ -20,6 +20,7 @@ package org.apache.falcon.regression.core.util;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper;
import org.apache.oozie.client.AuthOozieClient;
import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.CoordinatorAction;
@@ -237,11 +238,10 @@ public final class OozieUtil {
}
- public static List<String> getMissingDependencies(ColoHelper helper, String bundleID)
+ public static List<String> getMissingDependencies(OozieClient oozieClient, String bundleID)
throws OozieClientException {
CoordinatorJob jobInfo;
jobInfo = null;
- OozieClient oozieClient = helper.getClusterHelper().getOozieClient();
BundleJob bundleJob = oozieClient.getBundleJobInfo(bundleID);
List<CoordinatorJob> coordinatorJobList = bundleJob.getCoordinators();
if (coordinatorJobList.size() > 1) {
@@ -285,9 +285,8 @@ public final class OozieUtil {
return new ArrayList<String>(Arrays.asList(missingDependencies));
}
- public static List<String> getWorkflowJobs(ColoHelper prismHelper, String bundleID)
+ public static List<String> getWorkflowJobs(OozieClient oozieClient, String bundleID)
throws OozieClientException {
- OozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient();
waitForCoordinatorJobCreation(oozieClient, bundleID);
List<String> workflowIds = new ArrayList<String>();
List<CoordinatorJob> coordJobs = oozieClient.getBundleJobInfo(bundleID).getCoordinators();
@@ -299,12 +298,11 @@ public final class OozieUtil {
return workflowIds;
}
- public static List<String> getWorkflow(ColoHelper coloHelper, String bundleID)
+ public static List<String> getWorkflow(OozieClient oozieClient, String bundleID)
throws OozieClientException {
- OozieClient oozieClient = coloHelper.getClusterHelper().getOozieClient();
waitForCoordinatorJobCreation(oozieClient, bundleID);
List<String> workflowIds = new ArrayList<String>();
- String coordId = InstanceUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID);
+ String coordId = getDefaultCoordIDFromBundle(oozieClient, bundleID);
CoordinatorJob coordJobInfo = oozieClient.getCoordJobInfo(coordId);
for (CoordinatorAction action : coordJobInfo.getActions()) {
if (action.getStatus().name().equals("RUNNING") || action.getStatus().name().equals("SUCCEEDED")) {
@@ -317,59 +315,51 @@ public final class OozieUtil {
return workflowIds;
}
- public static Date getNominalTime(ColoHelper prismHelper, String bundleID)
+ public static Date getNominalTime(OozieClient oozieClient, String bundleID)
throws OozieClientException {
- OozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient();
BundleJob bundleJob = oozieClient.getBundleJobInfo(bundleID);
CoordinatorJob jobInfo =
oozieClient.getCoordJobInfo(bundleJob.getCoordinators().get(0).getId());
List<CoordinatorAction> actions = jobInfo.getActions();
-
return actions.get(0).getNominalTime();
-
}
- public static CoordinatorJob getDefaultOozieCoord(ColoHelper prismHelper, String bundleId,
+ public static CoordinatorJob getDefaultOozieCoord(OozieClient oozieClient, String bundleId,
EntityType type)
throws OozieClientException {
- OozieClient client = prismHelper.getClusterHelper().getOozieClient();
- BundleJob bundlejob = client.getBundleJobInfo(bundleId);
-
+ BundleJob bundlejob = oozieClient.getBundleJobInfo(bundleId);
for (CoordinatorJob coord : bundlejob.getCoordinators()) {
if ((coord.getAppName().contains("DEFAULT") && EntityType.PROCESS == type)
||
(coord.getAppName().contains("REPLICATION") && EntityType.FEED == type)) {
- return client.getCoordJobInfo(coord.getId());
+ return oozieClient.getCoordJobInfo(coord.getId());
} else {
- LOGGER.info("Desired coord does not exists on " + client.getOozieUrl());
+ LOGGER.info("Desired coord does not exists on " + oozieClient.getOozieUrl());
}
}
-
return null;
}
- public static int getNumberOfWorkflowInstances(ColoHelper prismHelper, String bundleId)
+ public static int getNumberOfWorkflowInstances(OozieClient oozieClient, String bundleId)
throws OozieClientException {
- return getDefaultOozieCoord(prismHelper, bundleId,
- EntityType.PROCESS).getActions().size();
+ return getDefaultOozieCoord(oozieClient, bundleId, EntityType.PROCESS).getActions().size();
}
- public static List<String> getActionsNominalTime(ColoHelper prismHelper,
- String bundleId,
- EntityType type)
+ public static List<String> getActionsNominalTime(OozieClient oozieClient,
+ String bundleId, EntityType type)
throws OozieClientException {
- Map<Date, CoordinatorAction.Status> actions = getActionsNominalTimeAndStatus(prismHelper, bundleId, type);
+ Map<Date, CoordinatorAction.Status> actions = getActionsNominalTimeAndStatus(oozieClient, bundleId, type);
List<String> nominalTime = new ArrayList<String>();
for (Date date : actions.keySet()) {
nominalTime.add(date.toString());
}
return nominalTime;
}
- public static Map<Date, CoordinatorAction.Status> getActionsNominalTimeAndStatus(ColoHelper prismHelper,
+
+ public static Map<Date, CoordinatorAction.Status> getActionsNominalTimeAndStatus(OozieClient oozieClient,
String bundleId, EntityType type) throws OozieClientException {
Map<Date, CoordinatorAction.Status> result = new TreeMap<Date, CoordinatorAction.Status>();
- List<CoordinatorAction> actions = getDefaultOozieCoord(prismHelper,
- bundleId, type).getActions();
+ List<CoordinatorAction> actions = getDefaultOozieCoord(oozieClient, bundleId, type).getActions();
for (CoordinatorAction action : actions) {
result.put(action.getNominalTime(), action.getStatus());
}
@@ -386,79 +376,54 @@ public final class OozieUtil {
BundleJob.Status.SUCCEEDED, BundleJob.Status.KILLED).contains(bundleJob.getStatus())) {
return true;
}
-
TimeUtil.sleepSeconds(20);
return false;
}
- public static void verifyNewBundleCreation(ColoHelper cluster,
- String originalBundleId,
- List<String>
- initialNominalTimes,
- String entity,
- boolean shouldBeCreated,
-
- boolean matchInstances) throws OozieClientException {
+ public static void verifyNewBundleCreation(OozieClient oozieClient, String originalBundleId,
+ List<String> initialNominalTimes, String entity,
+ boolean shouldBeCreated, boolean matchInstances)
+ throws OozieClientException {
String entityName = Util.readEntityName(entity);
EntityType entityType = Util.getEntityType(entity);
- String newBundleId = InstanceUtil.getLatestBundleID(cluster, entityName,
- entityType);
+ String newBundleId = getLatestBundleID(oozieClient, entityName, entityType);
if (shouldBeCreated) {
Assert.assertTrue(!newBundleId.equalsIgnoreCase(originalBundleId),
"eeks! new bundle is not getting created!!!!");
- LOGGER.info("old bundleId=" + originalBundleId + " on oozie: "
- +
- "" + cluster.getProcessHelper().getOozieClient().getOozieUrl());
- LOGGER.info("new bundleId=" + newBundleId + " on oozie: "
- +
- "" + cluster.getProcessHelper().getOozieClient().getOozieUrl());
+ LOGGER.info("old bundleId=" + originalBundleId + " on oozie: " + oozieClient);
+ LOGGER.info("new bundleId=" + newBundleId + " on oozie: " + oozieClient);
if (matchInstances) {
- validateNumberOfWorkflowInstances(cluster,
+ validateNumberOfWorkflowInstances(oozieClient,
initialNominalTimes, originalBundleId, newBundleId, entityType);
}
} else {
- Assert.assertEquals(newBundleId,
- originalBundleId, "eeks! new bundle is getting created!!!!");
+ Assert.assertEquals(newBundleId, originalBundleId, "eeks! new bundle is getting created!!!!");
}
}
- private static void validateNumberOfWorkflowInstances(ColoHelper cluster,
+ private static void validateNumberOfWorkflowInstances(OozieClient oozieClient,
List<String> initialNominalTimes,
String originalBundleId,
String newBundleId, EntityType type)
throws OozieClientException {
-
- List<String> nominalTimesOriginalAndNew = getActionsNominalTime(cluster,
- originalBundleId, type);
-
- nominalTimesOriginalAndNew.addAll(getActionsNominalTime(cluster,
- newBundleId, type));
-
+ List<String> nominalTimesOriginalAndNew = getActionsNominalTime(oozieClient, originalBundleId, type);
+ nominalTimesOriginalAndNew.addAll(getActionsNominalTime(oozieClient, newBundleId, type));
initialNominalTimes.removeAll(nominalTimesOriginalAndNew);
-
if (initialNominalTimes.size() != 0) {
LOGGER.info("Missing instance are : " + initialNominalTimes);
LOGGER.debug("Original Bundle ID : " + originalBundleId);
LOGGER.debug("New Bundle ID : " + newBundleId);
-
- Assert.assertFalse(true, "some instances have gone missing after "
- +
- "update");
+ Assert.fail("some instances have gone missing after update");
}
}
- public static String getCoordStartTime(ColoHelper colo, String entity,
- int bundleNo)
+ public static String getCoordStartTime(OozieClient oozieClient, String entity, int bundleNo)
throws OozieClientException {
- final OozieClient oozieClient = colo.getClusterHelper().getOozieClient();
- String bundleID = InstanceUtil.getSequenceBundleID(oozieClient,
+ String bundleID = getSequenceBundleID(oozieClient,
Util.readEntityName(entity), Util.getEntityType(entity), bundleNo);
-
- CoordinatorJob coord = getDefaultOozieCoord(colo, bundleID,
+ CoordinatorJob coord = getDefaultOozieCoord(oozieClient, bundleID,
Util.getEntityType(entity));
-
- return TimeUtil.dateToOozieDate(coord.getStartTime()
- );
+ return TimeUtil.dateToOozieDate(coord.getStartTime());
}
public static DateTimeFormatter getOozieDateTimeFormatter() {
@@ -475,11 +440,10 @@ public final class OozieUtil {
int instanceNumber)
throws OozieClientException, IOException {
final OozieClient oozieClient = helper.getClusterHelper().getOozieClient();
- String bundleID = InstanceUtil.getSequenceBundleID(oozieClient, entityName,
- type, bundleNumber);
+ String bundleID = getSequenceBundleID(oozieClient, entityName, type, bundleNumber);
List<CoordinatorJob> coords = oozieClient.getBundleJobInfo(bundleID).getCoordinators();
- HadoopUtil.createHDFSFolders(helper, getMissingDependenciesForInstance(oozieClient, coords,
- instanceNumber));
+ HadoopUtil.createFolders(helper.getClusterHelper().getHadoopFS(), helper.getPrefix(),
+ getMissingDependenciesForInstance(oozieClient, coords, instanceNumber));
}
private static List<String> getMissingDependenciesForInstance(OozieClient oozieClient,
@@ -487,7 +451,6 @@ public final class OozieUtil {
throws OozieClientException {
ArrayList<String> missingPaths = new ArrayList<String>();
for (CoordinatorJob coord : coords) {
-
CoordinatorJob temp = oozieClient.getCoordJobInfo(coord.getId());
CoordinatorAction instance = temp.getActions().get(instanceNumber);
missingPaths.addAll(Arrays.asList(instance.getMissingDependencies().split("#")));
@@ -499,10 +462,8 @@ public final class OozieUtil {
String entityName, int bundleNumber)
throws OozieClientException, IOException {
final OozieClient oozieClient = helper.getClusterHelper().getOozieClient();
- String bundleID = InstanceUtil.getSequenceBundleID(oozieClient, entityName, type,
- bundleNumber);
- List<List<String>> missingDependencies = createMissingDependenciesForBundle(helper, bundleID);
- return missingDependencies;
+ String bundleID = getSequenceBundleID(oozieClient, entityName, type, bundleNumber);
+ return createMissingDependenciesForBundle(helper, bundleID);
}
public static List<List<String>> createMissingDependenciesForBundle(ColoHelper helper, String bundleId)
@@ -511,7 +472,8 @@ public final class OozieUtil {
List<CoordinatorJob> coords = oozieClient.getBundleJobInfo(bundleId).getCoordinators();
List<List<String>> missingDependencies = getMissingDependenciesForBundle(oozieClient, coords);
for (List<String> missingDependencyPerInstance : missingDependencies) {
- HadoopUtil.createHDFSFolders(helper, missingDependencyPerInstance);
+ HadoopUtil.createFolders(helper.getClusterHelper().getHadoopFS(), helper.getPrefix(),
+ missingDependencyPerInstance);
}
return missingDependencies;
}
@@ -531,12 +493,216 @@ public final class OozieUtil {
return missingDependencies;
}
- public static void validateRetryAttempts(ColoHelper helper, String bundleId, EntityType type,
+ public static void validateRetryAttempts(OozieClient oozieClient, String bundleId, EntityType type,
int attempts) throws OozieClientException {
- OozieClient oozieClient = helper.getClusterHelper().getOozieClient();
- CoordinatorJob coord = getDefaultOozieCoord(helper, bundleId, type);
+ CoordinatorJob coord = getDefaultOozieCoord(oozieClient, bundleId, type);
int actualRun = oozieClient.getJobInfo(coord.getActions().get(0).getExternalId()).getRun();
LOGGER.info("Actual run count: " + actualRun); // wrt 0
Assert.assertEquals(actualRun, attempts, "Rerun attempts did not match");
}
+
+ public static int checkIfFeedCoordExist(OozieClient oozieClient,
+ String feedName, String coordType) throws OozieClientException {
+ LOGGER.info("feedName: " + feedName);
+ int numberOfCoord = 0;
+ if (getBundles(oozieClient, feedName, EntityType.FEED).size() == 0) {
+ return 0;
+ }
+ List<String> bundleIds = getBundles(oozieClient, feedName, EntityType.FEED);
+ LOGGER.info("bundleIds: " + bundleIds);
+
+ for (String aBundleId : bundleIds) {
+ LOGGER.info("aBundleId: " + aBundleId);
+ waitForCoordinatorJobCreation(oozieClient, aBundleId);
+ List<CoordinatorJob> coords =
+ getBundleCoordinators(oozieClient, aBundleId);
+ LOGGER.info("coords: " + coords);
+ for (CoordinatorJob coord : coords) {
+ if (coord.getAppName().contains(coordType)) {
+ numberOfCoord++;
+ }
+ }
+ }
+ return numberOfCoord;
+ }
+
+ /**
+ * Retrieves replication coordinatorID from bundle of coordinators.
+ */
+ public static List<String> getReplicationCoordID(String bundleId, AbstractEntityHelper helper)
+ throws OozieClientException {
+ final OozieClient oozieClient = helper.getOozieClient();
+ List<CoordinatorJob> coords = getBundleCoordinators(oozieClient, bundleId);
+ List<String> replicationCoordID = new ArrayList<String>();
+ for (CoordinatorJob coord : coords) {
+ if (coord.getAppName().contains("FEED_REPLICATION")) {
+ replicationCoordID.add(coord.getId());
+ }
+ }
+ return replicationCoordID;
+ }
+
+ /**
+ * Retrieves ID of bundle related to some process/feed using its ordinal number.
+ *
+ * @param entityName - name of entity bundle is related to
+ * @param entityType - feed or process
+ * @param bundleNumber - ordinal number of bundle
+ * @return bundle ID
+ * @throws org.apache.oozie.client.OozieClientException
+ */
+ public static String getSequenceBundleID(OozieClient oozieClient, String entityName,
+ EntityType entityType, int bundleNumber) throws OozieClientException {
+ //sequence start from 0
+ List<String> bundleIds = getBundles(oozieClient,
+ entityName, entityType);
+ Map<Integer, String> bundleMap = new TreeMap<Integer, String>();
+ String bundleID;
+ for (String strID : bundleIds) {
+ LOGGER.info("getSequenceBundleID: " + strID);
+ int key = Integer.parseInt(strID.substring(0, strID.indexOf('-')));
+ bundleMap.put(key, strID);
+ }
+ for (Map.Entry<Integer, String> entry : bundleMap.entrySet()) {
+ LOGGER.info("Key = " + entry.getKey() + ", Value = " + entry.getValue());
+ }
+ int i = 0;
+ for (Map.Entry<Integer, String> entry : bundleMap.entrySet()) {
+ bundleID = entry.getValue();
+ if (i == bundleNumber) {
+ return bundleID;
+ }
+ i++;
+ }
+ return null;
+ }
+
+ /**
+ * Retrieves the latest bundle ID.
+ *
+ * @param oozieClient where job is running
+ * @param entityName name of entity job is related to
+ * @param entityType type of entity - feed or process expected
+ * @return latest bundle ID
+ * @throws org.apache.oozie.client.OozieClientException
+ */
+ public static String getLatestBundleID(OozieClient oozieClient,
+ String entityName, EntityType entityType) throws OozieClientException {
+ List<String> bundleIds = getBundles(oozieClient, entityName, entityType);
+ String max = "0";
+ int maxID = -1;
+ for (String strID : bundleIds) {
+ if (maxID < Integer.parseInt(strID.substring(0, strID.indexOf('-')))) {
+ maxID = Integer.parseInt(strID.substring(0, strID.indexOf('-')));
+ max = strID;
+ }
+ }
+ return max;
+ }
+
+ /**
+ * Retrieves all coordinators of bundle.
+ *
+ * @param oozieClient Oozie client to use for fetching info.
+ * @param bundleID specific bundle ID
+ * @return list of bundle coordinators
+ * @throws org.apache.oozie.client.OozieClientException
+ */
+ public static List<CoordinatorJob> getBundleCoordinators(OozieClient oozieClient, String bundleID)
+ throws OozieClientException {
+ BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleID);
+ return bundleInfo.getCoordinators();
+ }
+
+ public static Job.Status getDefaultCoordinatorStatus(OozieClient oozieClient, String processName,
+ int bundleNumber) throws OozieClientException {
+ String bundleID = getSequenceBundleID(oozieClient, processName, EntityType.PROCESS, bundleNumber);
+ String coordId = getDefaultCoordIDFromBundle(oozieClient, bundleID);
+ return oozieClient.getCoordJobInfo(coordId).getStatus();
+ }
+
+ public static String getDefaultCoordIDFromBundle(OozieClient oozieClient, String bundleId)
+ throws OozieClientException {
+ waitForCoordinatorJobCreation(oozieClient, bundleId);
+ BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleId);
+ List<CoordinatorJob> coords = bundleInfo.getCoordinators();
+ int min = 100000;
+ String minString = "";
+ for (CoordinatorJob coord : coords) {
+ String strID = coord.getId();
+ if (min > Integer.parseInt(strID.substring(0, strID.indexOf('-')))) {
+ min = Integer.parseInt(strID.substring(0, strID.indexOf('-')));
+ minString = coord.getId();
+ }
+ }
+ LOGGER.info("function getDefaultCoordIDFromBundle: minString: " + minString);
+ return minString;
+ }
+
+ public static String getLatestCoordinatorID(OozieClient oozieClient, String processName,
+ EntityType entityType) throws OozieClientException {
+ final String latestBundleID = getLatestBundleID(oozieClient, processName, entityType);
+ return getDefaultCoordIDFromBundle(oozieClient, latestBundleID);
+ }
+
+ /**
+ * Waits till bundle job will reach expected status.
+ * Generates time according to expected status.
+ *
+ * @param oozieClient oozieClient of cluster job is running on
+ * @param processName name of process which job is being analyzed
+ * @param expectedStatus job status we are waiting for
+ * @throws org.apache.oozie.client.OozieClientException
+ */
+ public static void waitForBundleToReachState(OozieClient oozieClient,
+ String processName, Job.Status expectedStatus) throws OozieClientException {
+ int totalMinutesToWait = getMinutesToWait(expectedStatus);
+ waitForBundleToReachState(oozieClient, processName, expectedStatus, totalMinutesToWait);
+ }
+
+ /**
+ * Waits till bundle job will reach expected status during specific time.
+ * Use it directly in test cases when timeouts are different from trivial, in other cases use
+ * waitForBundleToReachState(OozieClient, String, Status)
+ *
+ * @param oozieClient oozie client of cluster job is running on
+ * @param processName name of process which job is being analyzed
+ * @param expectedStatus job status we are waiting for
+ * @param totalMinutesToWait specific time to wait expected state
+ * @throws org.apache.oozie.client.OozieClientException
+ */
+ public static void waitForBundleToReachState(OozieClient oozieClient, String processName,
+ Job.Status expectedStatus, int totalMinutesToWait) throws OozieClientException {
+ int sleep = totalMinutesToWait * 60 / 20;
+ for (int sleepCount = 0; sleepCount < sleep; sleepCount++) {
+ String bundleID =
+ getLatestBundleID(oozieClient, processName, EntityType.PROCESS);
+ BundleJob j = oozieClient.getBundleJobInfo(bundleID);
+ LOGGER.info(sleepCount + ". Current status: " + j.getStatus()
+ + "; expected: " + expectedStatus);
+ if (j.getStatus() == expectedStatus) {
+ return;
+ }
+ TimeUtil.sleepSeconds(20);
+ }
+ Assert.fail("State " + expectedStatus + " wasn't reached in " + totalMinutesToWait + " mins");
+ }
+
+ /**
+ * Generates time which is presumably needed for bundle job to reach particular state.
+ *
+ * @param expectedStatus status which we are expect to get from bundle job
+ * @return minutes to wait for expected status
+ */
+ private static int getMinutesToWait(Job.Status expectedStatus) {
+ switch (expectedStatus) {
+ case DONEWITHERROR:
+ case SUCCEEDED:
+ return OSUtil.IS_WINDOWS ? 40 : 20;
+ case KILLED:
+ return OSUtil.IS_WINDOWS ? 30 : 15;
+ default:
+ return OSUtil.IS_WINDOWS ? 60 : 30;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
index 02280f3..24af21f 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
@@ -590,14 +590,13 @@ public class AuthorizationTest extends BaseTestClass {
AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING);
//get old process details
- String oldProcessBundleId = InstanceUtil
- .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
- String oldProcessUser =
- getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+ String oldProcessBundleId = OozieUtil
+ .getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+ String oldProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
//get old feed details
- String oldFeedBundleId = InstanceUtil.getLatestBundleID(cluster, feed.getName(), EntityType.FEED);
- String oldFeedUser = getBundleUser(cluster, feed.getName(), EntityType.FEED);
+ String oldFeedBundleId = OozieUtil.getLatestBundleID(clusterOC, feed.getName(), EntityType.FEED);
+ String oldFeedUser = getBundleUser(clusterOC, feed.getName(), EntityType.FEED);
//update feed definition
FeedMerlin newFeed = new FeedMerlin(feed);
@@ -609,15 +608,15 @@ public class AuthorizationTest extends BaseTestClass {
AssertUtil.assertSucceeded(serviceResponse);
//new feed bundle should be created by U1
- OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, newFeed.toString(), true, false);
- String newFeedUser =
- getBundleUser(cluster, newFeed.getName(), EntityType.FEED);
+ OozieUtil.verifyNewBundleCreation(clusterOC, oldFeedBundleId, null, newFeed.toString(), true, false);
+ String newFeedUser = getBundleUser(clusterOC, newFeed.getName(), EntityType.FEED);
Assert.assertEquals(oldFeedUser, newFeedUser, "User should be the same");
//new process bundle should be created by U2
- OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
+ OozieUtil.verifyNewBundleCreation(
+ clusterOC, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
String newProcessUser =
- getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+ getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same");
}
@@ -645,14 +644,13 @@ public class AuthorizationTest extends BaseTestClass {
newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN);
//get old process details
- String oldProcessBundleId = InstanceUtil
- .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
- String oldProcessUser =
- getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+ String oldProcessBundleId = OozieUtil
+ .getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+ String oldProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
//get old feed details
- String oldFeedBundleId = InstanceUtil.getLatestBundleID(cluster, feed.getName(), EntityType.FEED);
- String oldFeedUser = getBundleUser(cluster, feed.getName(), EntityType.FEED);
+ String oldFeedBundleId = OozieUtil.getLatestBundleID(clusterOC, feed.getName(), EntityType.FEED);
+ String oldFeedUser = getBundleUser(clusterOC, feed.getName(), EntityType.FEED);
//update feed by U2
serviceResponse = prism.getFeedHelper().update(feed.toString(), newFeed.toString(),
@@ -660,15 +658,15 @@ public class AuthorizationTest extends BaseTestClass {
AssertUtil.assertSucceeded(serviceResponse);
//new feed bundle should be created by U2
- OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, newFeed.toString(), true, false);
- String newFeedUser = getBundleUser(cluster, newFeed.getName(), EntityType.FEED);
+ OozieUtil.verifyNewBundleCreation(clusterOC, oldFeedBundleId, null, newFeed.toString(), true, false);
+ String newFeedUser = getBundleUser(clusterOC, newFeed.getName(), EntityType.FEED);
Assert.assertNotEquals(oldFeedUser, newFeedUser, "User should not be the same");
Assert.assertEquals(MerlinConstants.USER2_NAME, newFeedUser);
//new process bundle should be created by U2
- OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
- String newProcessUser =
- getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+ OozieUtil.verifyNewBundleCreation(
+ clusterOC, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
+ String newProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same");
}
@@ -691,14 +689,12 @@ public class AuthorizationTest extends BaseTestClass {
AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING);
//get old process details
- String oldProcessBundleId = InstanceUtil
- .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
- String oldProcessUser =
- getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+ String oldProcessBundleId = OozieUtil
+ .getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+ String oldProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
//get old feed details
- String oldFeedBundleId = InstanceUtil
- .getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED);
+ String oldFeedBundleId = OozieUtil.getLatestBundleID(clusterOC, Util.readEntityName(feed), EntityType.FEED);
//update process by U1
ProcessMerlin processObj = bundles[0].getProcessObject();
@@ -707,12 +703,12 @@ public class AuthorizationTest extends BaseTestClass {
AssertUtil.assertSucceeded(serviceResponse);
//new feed bundle should not be created
- OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, feed, false, false);
+ OozieUtil.verifyNewBundleCreation(clusterOC, oldFeedBundleId, null, feed, false, false);
//new process bundle should be created by U1
- OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
- String newProcessUser =
- getBundleUser(cluster, processObj.getName(), EntityType.PROCESS);
+ OozieUtil.verifyNewBundleCreation(
+ clusterOC, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
+ String newProcessUser = getBundleUser(clusterOC, processObj.getName(), EntityType.PROCESS);
Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same");
}
@@ -735,14 +731,12 @@ public class AuthorizationTest extends BaseTestClass {
AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING);
//get old process details
- String oldProcessBundleId = InstanceUtil
- .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
- String oldProcessUser =
- getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+ String oldProcessBundleId = OozieUtil
+ .getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
+ String oldProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS);
//get old feed details
- String oldFeedBundleId = InstanceUtil
- .getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED);
+ String oldFeedBundleId = OozieUtil.getLatestBundleID(clusterOC, Util.readEntityName(feed), EntityType.FEED);
//update process by U2
ProcessMerlin processObj = bundles[0].getProcessObject();
@@ -752,22 +746,20 @@ public class AuthorizationTest extends BaseTestClass {
AssertUtil.assertSucceeded(serviceResponse);
//new feed bundle should not be created
- OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, feed, false, false);
+ OozieUtil.verifyNewBundleCreation(clusterOC, oldFeedBundleId, null, feed, false, false);
//new process bundle should be created by U2
- OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
- String newProcessUser =
- getBundleUser(cluster, processObj.getName(), EntityType.PROCESS);
+ OozieUtil.verifyNewBundleCreation(
+ clusterOC, oldProcessBundleId, null, bundles[0].getProcessData(), true, false);
+ String newProcessUser = getBundleUser(clusterOC, processObj.getName(), EntityType.PROCESS);
Assert.assertNotEquals(oldProcessUser, newProcessUser, "User should not be the same");
Assert.assertEquals(MerlinConstants.USER2_NAME, newProcessUser);
}
- private String getBundleUser(ColoHelper coloHelper, String entityName, EntityType entityType)
+ private String getBundleUser(OozieClient oozieClient, String entityName, EntityType entityType)
throws OozieClientException {
- String newBundleId = InstanceUtil.getLatestBundleID(coloHelper, entityName,
- entityType);
- BundleJob newBundleJob =
- coloHelper.getClusterHelper().getOozieClient().getBundleJobInfo(newBundleId);
+ String newBundleId = OozieUtil.getLatestBundleID(oozieClient, entityName, entityType);
+ BundleJob newBundleJob = oozieClient.getBundleJobInfo(newBundleId);
CoordinatorJob coordinatorJob = null;
for (CoordinatorJob coord : newBundleJob.getCoordinators()) {
if ((entityType == EntityType.PROCESS && coord.getAppName().contains("DEFAULT"))
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
index 33b0e77..8c5d330 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
@@ -24,7 +24,6 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.util.*;
import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.Job;
@@ -44,7 +43,6 @@ import java.util.List;
public class ELExpCurrentAndLastWeekTest extends BaseTestClass {
private ColoHelper cluster = servers.get(0);
- private FileSystem clusterFS = serverFS.get(0);
private OozieClient clusterOC = serverOC.get(0);
private String baseTestDir = cleanAndGetTestDir();
private String aggregateWorkflowDir = baseTestDir + "/aggregator";
@@ -97,13 +95,11 @@ public class ELExpCurrentAndLastWeekTest extends BaseTestClass {
public void currentAndLastWeekTest(String startInstance, String endInstance,
String firstDep, String endDep) throws Exception {
bundles[0].setDatasetInstances(startInstance, endInstance);
-
bundles[0].submitFeedsScheduleProcess(prism);
AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
-
- List<String> missingDependencies = getMissingDependencies(cluster, bundles[0]);
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
+ List<String> missingDependencies = getMissingDependencies(clusterOC, bundles[0].getProcessName());
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, bundles[0].getProcessName(), 0);
InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
@@ -146,15 +142,14 @@ public class ELExpCurrentAndLastWeekTest extends BaseTestClass {
return true;
}
- private List<String> getMissingDependencies(ColoHelper prismHelper, Bundle bundle) throws OozieClientException {
- List<String> bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(),
- bundle.getProcessName(), EntityType.PROCESS);
+ public List<String> getMissingDependencies(OozieClient oozieClient,
+ String processName) throws OozieClientException {
+ List<String> bundles = OozieUtil.getBundles(oozieClient, processName, EntityType.PROCESS);
String coordID = bundles.get(0);
- List<String> missingDependencies =
- OozieUtil.getMissingDependencies(prismHelper, coordID);
+ List<String> missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID);
for (int i = 0; i < 10 && missingDependencies == null; ++i) {
TimeUtil.sleepSeconds(30);
- missingDependencies = OozieUtil.getMissingDependencies(prismHelper, coordID);
+ missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID);
}
Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
return missingDependencies;
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
index 07292e1..37f1149 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
@@ -29,6 +29,7 @@ import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.log4j.Logger;
+import org.apache.oozie.client.OozieClient;
import org.testng.Assert;
import org.testng.TestNGException;
import org.testng.annotations.DataProvider;
@@ -139,7 +140,7 @@ public class ELValidationsTest extends BaseTestClass {
LOGGER.info("processData in try is: " + Util.prettyPrintXml(bundle.getProcessData()));
TimeUtil.sleepSeconds(45);
if (isMatch) {
- getAndMatchDependencies(cluster, bundle);
+ getAndMatchDependencies(serverOC.get(0), bundle);
}
return submitResponse;
} catch (Exception e) {
@@ -151,12 +152,11 @@ public class ELValidationsTest extends BaseTestClass {
}
}
- private void getAndMatchDependencies(ColoHelper prismHelper, Bundle bundle) {
+ private void getAndMatchDependencies(OozieClient oozieClient, Bundle bundle) {
try {
List<String> bundles = null;
for (int i = 0; i < 10; ++i) {
- bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(),
- bundle.getProcessName(), EntityType.PROCESS);
+ bundles = OozieUtil.getBundles(oozieClient, bundle.getProcessName(), EntityType.PROCESS);
if (bundles.size() > 0) {
break;
}
@@ -165,17 +165,16 @@ public class ELValidationsTest extends BaseTestClass {
Assert.assertTrue(bundles != null && bundles.size() > 0, "Bundle job not created.");
String coordID = bundles.get(0);
LOGGER.info("coord id: " + coordID);
- List<String> missingDependencies =
- OozieUtil.getMissingDependencies(prismHelper, coordID);
+ List<String> missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID);
for (int i = 0; i < 10 && missingDependencies == null; ++i) {
TimeUtil.sleepSeconds(30);
- missingDependencies = OozieUtil.getMissingDependencies(prismHelper, coordID);
+ missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID);
}
Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
for (String dependency : missingDependencies) {
LOGGER.info("dependency from job: " + dependency);
}
- Date jobNominalTime = OozieUtil.getNominalTime(prismHelper, coordID);
+ Date jobNominalTime = OozieUtil.getNominalTime(oozieClient, coordID);
Calendar time = Calendar.getInstance();
time.setTime(jobNominalTime);
LOGGER.info("nominalTime:" + jobNominalTime);
@@ -224,8 +223,7 @@ public class ELValidationsTest extends BaseTestClass {
}
private List<String> getQADepedencyList(Calendar nominalTime, Date startRef,
- Date endRef, int frequency,
- Bundle bundle) {
+ Date endRef, int frequency, Bundle bundle) {
LOGGER.info("start ref:" + startRef);
LOGGER.info("end ref:" + endRef);
Calendar initialTime = Calendar.getInstance();
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
index 69be47a..4fb3c4a 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
@@ -33,6 +33,7 @@ import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
@@ -163,7 +164,7 @@ public class EmbeddedPigScriptTest extends BaseTestClass {
InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
int counter = OSUtil.IS_WINDOWS ? 100 : 50;
- InstanceUtil.waitForBundleToReachState(cluster, bundles[0].getProcessName(), Job.Status.SUCCEEDED, counter);
+ OozieUtil.waitForBundleToReachState(clusterOC, bundles[0].getProcessName(), Job.Status.SUCCEEDED, counter);
r = prism.getProcessHelper().getRunningInstance(processName);
InstanceUtil.validateSuccessWOInstances(r);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
index 8eff8e4..05a2b0b 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
@@ -33,6 +33,7 @@ import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.MatrixUtil;
import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
@@ -130,7 +131,6 @@ public class ExternalFSTest extends BaseTestClass{
}
-
@Test(dataProvider = "getData")
public void replicateToExternalFS(final FileSystem externalFS,
final String separator, final boolean withData) throws Exception {
@@ -181,13 +181,10 @@ public class ExternalFSTest extends BaseTestClass{
Path dstPath = new Path(endpoint + testWasbTargetDir + '/' + timePattern);
//check if coordinator exists
- InstanceUtil.waitTillInstancesAreCreated(cluster, feed.toString(), 0);
-
- Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster.getFeedHelper(), Util.readEntityName(feed.toString()),
- "REPLICATION"), 1);
-
+ InstanceUtil.waitTillInstancesAreCreated(clusterOC, feed.toString(), 0);
+ Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(clusterOC, feed.getName(), "REPLICATION"), 1);
TimeUtil.sleepSeconds(10);
+
//replication should start, wait while it ends
InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(feed.toString()), 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);