You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:20 UTC

[21/27] adding falcon-regression

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
new file mode 100644
index 0000000..789a256
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
@@ -0,0 +1,1321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import com.google.gson.GsonBuilder;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.ACL;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.Retention;
+import org.apache.falcon.entity.v0.feed.Validity;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.regression.Entities.FeedMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.interfaces.IEntityManagerHelper;
+import org.apache.falcon.regression.core.response.APIResult;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.InstancesSummaryResult;
+import org.apache.falcon.regression.core.response.ResponseKeys;
+import org.apache.falcon.request.BaseRequest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.http.HttpResponse;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.Job.Status;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.apache.oozie.client.WorkflowJob;
+import org.testng.Assert;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * util functions related to instanceTest.
+ */
+public final class InstanceUtil {
+
+    private InstanceUtil() {
+        throw new AssertionError("Instantiating utility class...");
+    }
+
+    private static final Logger LOGGER = Logger.getLogger(InstanceUtil.class);
+    private static final EnumSet<Status> RUNNING_PREP_SUCCEEDED = EnumSet.of(Status.RUNNING,
+        Status.PREP, Status.SUCCEEDED);
+
+    public static APIResult sendRequestProcessInstance(String
+            url, String user)
+        throws IOException, URISyntaxException, AuthenticationException {
+        return hitUrl(url, Util.getMethodType(url), user);
+    }
+
+    public static final int INSTANCES_CREATED_TIMEOUT = OSUtil.IS_WINDOWS ? 20 : 10;
+
+    public static APIResult hitUrl(String url,
+            String method, String user) throws URISyntaxException,
+            IOException, AuthenticationException {
+        BaseRequest request = new BaseRequest(url, method, user);
+        HttpResponse response = request.run();
+        BufferedReader reader = new BufferedReader(
+                new InputStreamReader(response.getEntity().getContent(), "UTF-8"));
+        StringBuilder stringResponse = new StringBuilder();
+        String line;
+        while ((line = reader.readLine()) != null) {
+            stringResponse.append(line).append("\n");
+        }
+        String jsonString = stringResponse.toString();
+        LOGGER.info("The web service response is:\n" + Util.prettyPrintXmlOrJson(jsonString));
+        APIResult r = null;
+        try {
+            if (url.contains("/summary/")) {
+                //Order is not guaranteed in the getDeclaredConstructors() call
+                Constructor<?>[] constructors = InstancesSummaryResult.class
+                        .getDeclaredConstructors();
+                for (Constructor<?> constructor : constructors) {
+                    //we want to invoke the constructor that has no parameters
+                    if (constructor.getParameterTypes().length == 0) {
+                        constructor.setAccessible(true);
+                        r = (InstancesSummaryResult) constructor.newInstance();
+                        break;
+                    }
+                }
+            } else {
+                //Order is not guaranteed in the getDeclaredConstructors() call
+                Constructor<?>[] constructors = InstancesResult.class
+                        .getDeclaredConstructors();
+                for (Constructor<?> constructor : constructors) {
+                    //we want to invoke the constructor that has no parameters
+                    if (constructor.getParameterTypes().length == 0) {
+                        constructor.setAccessible(true);
+                        r = (InstancesResult) constructor.newInstance();
+                        break;
+                    }
+                }
+            }
+        } catch (IllegalAccessException e) {
+            Assert.fail("Could not create InstancesSummaryResult or "
+                    +
+                    "InstancesResult constructor\n" + ExceptionUtils.getStackTrace(e));
+        } catch (InstantiationException e) {
+            Assert.fail("Could not create InstancesSummaryResult or "
+                    +
+                    "InstancesResult constructor\n" + ExceptionUtils.getStackTrace(e));
+        } catch (InvocationTargetException e) {
+            Assert.fail("Could not create InstancesSummaryResult or "
+                    +
+                    "InstancesResult constructor\n" + ExceptionUtils.getStackTrace(e));
+        }
+        Assert.assertNotNull(r, "APIResult is null");
+        if (jsonString.contains("(PROCESS) not found")) {
+            r.setStatusCode(ResponseKeys.PROCESS_NOT_FOUND);
+            return r;
+        } else if (jsonString.contains("Parameter start is empty")
+                ||
+                jsonString.contains("Unparseable date:")) {
+            r.setStatusCode(ResponseKeys.UNPARSEABLE_DATE);
+            return r;
+        } else if (response.getStatusLine().getStatusCode() == 400
+                &&
+                jsonString.contains("(FEED) not found")) {
+            r.setStatusCode(400);
+            return r;
+        } else if (
+                (response.getStatusLine().getStatusCode() == 400
+                        &&
+                        jsonString.contains("is beforePROCESS  start"))
+                        ||
+                        response.getStatusLine().getStatusCode() == 400
+                                &&
+                                jsonString.contains("is after end date")
+                        || (response.getStatusLine().getStatusCode() == 400
+                        &&
+                        jsonString.contains("is after PROCESS's end"))
+                        ||
+                        (response.getStatusLine().getStatusCode() == 400
+                                &&
+                                jsonString.contains("is before PROCESS's  start"))) {
+            r.setStatusCode(400);
+            return r;
+        }
+        if (url.contains("/summary/")) {
+            r = new GsonBuilder().setPrettyPrinting().create()
+                    .fromJson(jsonString, InstancesSummaryResult.class);
+        } else {
+
+            r = new GsonBuilder().create()
+                    .fromJson(jsonString, InstancesResult.class);
+        }
+
+        LOGGER.info("r.getMessage(): " + r.getMessage());
+        LOGGER.info("r.getStatusCode(): " + r.getStatusCode());
+        LOGGER.info("r.getStatus() " + r.getStatus());
+        return r;
+    }
+
+    /**
+     * Checks if API response reflects success and if it's instances match to expected status.
+     *
+     * @param r  - kind of response from API which should contain information about instances
+     * @param b  - bundle from which process instances are being analyzed
+     * @param ws - - expected status of instances
+     */
+    public static void validateSuccess(InstancesResult r, Bundle b,
+            InstancesResult.WorkflowStatus ws) {
+        Assert.assertEquals(r.getStatus(), APIResult.Status.SUCCEEDED);
+        Assert.assertEquals(runningInstancesInResult(r, ws), b.getProcessConcurrency());
+    }
+
+    /**
+     * Check the number of instances in response which have the same status as expected.
+     *
+     * @param r  kind of response from API which should contain information about instances
+     * @param ws expected status of instances
+     * @return number of instances which have expected status
+     */
+    public static int runningInstancesInResult(InstancesResult r,
+            InstancesResult.WorkflowStatus ws) {
+        InstancesResult.Instance[] pArray = r.getInstances();
+        int runningCount = 0;
+        LOGGER.info("pArray: " + Arrays.toString(pArray));
+        for (int instanceIndex = 0; instanceIndex < pArray.length; instanceIndex++) {
+            LOGGER.info(
+                    "pArray[" + instanceIndex + "]: " + pArray[instanceIndex].getStatus() + " , "
+                            +
+                            pArray[instanceIndex].getInstance()
+            );
+
+            if (pArray[instanceIndex].getStatus() == ws) {
+                runningCount++;
+            }
+        }
+        return runningCount;
+    }
+
+    public static void validateSuccessWOInstances(InstancesResult r) {
+        AssertUtil.assertSucceeded(r);
+        Assert.assertNull(r.getInstances(), "Unexpected :" + Arrays.toString(r.getInstances()));
+    }
+
+    public static void validateSuccessWithStatusCode(InstancesResult r,
+            int expectedErrorCode) {
+        Assert.assertEquals(r.getStatusCode(), expectedErrorCode,
+                "Parameter start is empty should have the response");
+    }
+
+    /**
+     * Checks that API action succeed and the instance on which it has been performed on has
+     * expected status.
+     *
+     * @param r  kind of response from API which should contain information about instance
+     * @param ws expected status of instance
+     */
+    public static void validateSuccessOnlyStart(InstancesResult r,
+            InstancesResult.WorkflowStatus ws) {
+        Assert.assertEquals(r.getStatus(), APIResult.Status.SUCCEEDED);
+        Assert.assertEquals(1, runningInstancesInResult(r, ws));
+    }
+
+    /**
+     * Checks that actual number of instances with different statuses are equal to expected number
+     * of instances with matching statuses.
+     *
+     * @param r                  kind of response from API which should contain information about instances
+     *                           <p/>
+     *                           All parameters below reflect number of expected instances with some kind of status.
+     * @param totalInstances     total instance.
+     * @param runningInstances   number of running instances.
+     * @param suspendedInstances number of suspended instance.
+     * @param waitingInstances   number of waiting instance.
+     * @param killedInstances    number of killed instance.
+     */
+    public static void validateResponse(InstancesResult r, int totalInstances,
+            int runningInstances,
+            int suspendedInstances, int waitingInstances,
+            int killedInstances) {
+
+        int actualRunningInstances = 0;
+        int actualSuspendedInstances = 0;
+        int actualWaitingInstances = 0;
+        int actualKilledInstances = 0;
+        InstancesResult.Instance[] pArray = r.getInstances();
+        LOGGER.info("pArray: " + Arrays.toString(pArray));
+        Assert.assertNotNull(pArray, "pArray should be not null");
+        Assert.assertEquals(pArray.length, totalInstances, "Total Instances");
+        for (int instanceIndex = 0; instanceIndex < pArray.length; instanceIndex++) {
+            LOGGER.info(
+                    "pArray[" + instanceIndex + "]: " + pArray[instanceIndex].getStatus() + " , "
+                            +
+                            pArray[instanceIndex].getInstance());
+
+            switch (pArray[instanceIndex].getStatus()) {
+            case RUNNING:
+                actualRunningInstances++;
+                break;
+            case SUSPENDED:
+                actualSuspendedInstances++;
+                break;
+            case WAITING:
+                actualWaitingInstances++;
+                break;
+            case KILLED:
+                actualKilledInstances++;
+                break;
+            default:
+                Assert.fail("Unexpected status=" + pArray[instanceIndex].getStatus());
+            }
+        }
+        Assert.assertEquals(actualRunningInstances, runningInstances, "Running Instances");
+        Assert.assertEquals(actualSuspendedInstances, suspendedInstances, "Suspended Instances");
+        Assert.assertEquals(actualWaitingInstances, waitingInstances, "Waiting Instances");
+        Assert.assertEquals(actualKilledInstances, killedInstances, "Killed Instances");
+    }
+
+    /**
+     * Checks that expected number of failed instances matches actual number of failed ones.
+     *
+     * @param r         kind of response from API which should contain information about instances.
+     * @param failCount number of instances which should be failed.
+     */
+    public static void validateFailedInstances(InstancesResult r, int failCount) {
+        AssertUtil.assertSucceeded(r);
+        int counter = 0;
+        for (InstancesResult.Instance processInstance : r.getInstances()) {
+            if (processInstance.getStatus() == InstancesResult.WorkflowStatus.FAILED) {
+                counter++;
+            }
+        }
+        Assert.assertEquals(counter, failCount, "Actual number of failed instances does not "
+                +
+                "match expected number of failed instances.");
+    }
+
+    public static List<String> getWorkflows(ColoHelper prismHelper, String processName,
+            WorkflowJob.Status... ws) throws OozieClientException {
+
+        String bundleID = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(),
+                processName, EntityType.PROCESS).get(0);
+        OozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient();
+
+        List<String> workflows = OozieUtil.getWorkflowJobs(prismHelper, bundleID);
+
+        List<String> toBeReturned = new ArrayList<String>();
+        for (String jobID : workflows) {
+            WorkflowJob wfJob = oozieClient.getJobInfo(jobID);
+            LOGGER.info("wa.getExternalId(): " + wfJob.getId() + " wa"
+                    +
+                    ".getExternalStatus"
+                    +
+                    "():  "
+                    +
+                    wfJob.getStartTime());
+            LOGGER.info("wf id: " + jobID + "  wf status: " + wfJob.getStatus());
+            if (ws.length == 0) {
+                toBeReturned.add(jobID);
+            } else {
+                for (WorkflowJob.Status status : ws) {
+                    if (wfJob.getStatus().name().equals(status.name())) {
+                        toBeReturned.add(jobID);
+                    }
+                }
+            }
+        }
+        return toBeReturned;
+    }
+
+    public static boolean isWorkflowRunning(OozieClient oozieClient, String workflowID) throws
+            OozieClientException {
+        String status = oozieClient.getJobInfo(workflowID).getStatus().toString();
+        return status.equals("RUNNING");
+    }
+
+    public static void areWorkflowsRunning(OozieClient oozieClient, List<String> wfIDs,
+            int totalWorkflows,
+            int runningWorkflows, int killedWorkflows,
+            int succeededWorkflows) throws OozieClientException {
+
+        List<WorkflowJob> wfJobs = new ArrayList<WorkflowJob>();
+        for (String wdID : wfIDs) {
+            wfJobs.add(oozieClient.getJobInfo(wdID));
+        }
+        if (totalWorkflows != -1) {
+            Assert.assertEquals(wfJobs.size(), totalWorkflows);
+        }
+        int actualRunningWorkflows = 0;
+        int actualKilledWorkflows = 0;
+        int actualSucceededWorkflows = 0;
+        LOGGER.info("wfJobs: " + wfJobs);
+        for (int instanceIndex = 0; instanceIndex < wfJobs.size(); instanceIndex++) {
+            LOGGER.info("was.get(" + instanceIndex + ").getStatus(): "
+                    +
+                    wfJobs.get(instanceIndex).getStatus());
+
+            if (wfJobs.get(instanceIndex).getStatus().toString().equals("RUNNING")) {
+                actualRunningWorkflows++;
+            } else if (wfJobs.get(instanceIndex).getStatus().toString().equals("KILLED")) {
+                actualKilledWorkflows++;
+            } else if (wfJobs.get(instanceIndex).getStatus().toString().equals("SUCCEEDED")) {
+                actualSucceededWorkflows++;
+            }
+        }
+        if (runningWorkflows != -1) {
+            Assert.assertEquals(actualRunningWorkflows, runningWorkflows);
+        }
+        if (killedWorkflows != -1) {
+            Assert.assertEquals(actualKilledWorkflows, killedWorkflows);
+        }
+        if (succeededWorkflows != -1) {
+            Assert.assertEquals(actualSucceededWorkflows, succeededWorkflows);
+        }
+    }
+
+    public static List<CoordinatorAction> getProcessInstanceList(ColoHelper coloHelper,
+            String processName,
+            EntityType entityType)
+        throws OozieClientException {
+
+        OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
+        String coordId = getLatestCoordinatorID(oozieClient, processName, entityType);
+        //String coordId = getDefaultCoordinatorFromProcessName(processName);
+        LOGGER.info("default coordID: " + coordId);
+        return oozieClient.getCoordJobInfo(coordId).getActions();
+    }
+
+    public static String getLatestCoordinatorID(OozieClient oozieClient, String processName,
+            EntityType entityType)
+        throws OozieClientException {
+        return getDefaultCoordIDFromBundle(oozieClient,
+                getLatestBundleID(oozieClient, processName, entityType));
+    }
+
+    public static String getDefaultCoordIDFromBundle(OozieClient oozieClient, String bundleId)
+        throws OozieClientException {
+
+        OozieUtil.waitForCoordinatorJobCreation(oozieClient, bundleId);
+        BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleId);
+        List<CoordinatorJob> coords = bundleInfo.getCoordinators();
+        int min = 100000;
+        String minString = "";
+        for (CoordinatorJob coord : coords) {
+            String strID = coord.getId();
+            if (min > Integer.parseInt(strID.substring(0, strID.indexOf('-')))) {
+                min = Integer.parseInt(strID.substring(0, strID.indexOf('-')));
+                minString = coord.getId();
+            }
+        }
+        LOGGER.info("function getDefaultCoordIDFromBundle: minString: " + minString);
+        return minString;
+    }
+
+    public static int getInstanceCountWithStatus(ColoHelper coloHelper, String processName,
+            org.apache.oozie.client.CoordinatorAction.Status
+                    status,
+            EntityType entityType)
+        throws OozieClientException {
+        List<CoordinatorAction> list = getProcessInstanceList(coloHelper, processName, entityType);
+        int instanceCount = 0;
+        for (CoordinatorAction aList : list) {
+            if (aList.getStatus().equals(status)) {
+                instanceCount++;
+            }
+        }
+        return instanceCount;
+    }
+
+    public static Status getDefaultCoordinatorStatus(ColoHelper colohelper, String processName,
+            int bundleNumber) throws OozieClientException {
+        OozieClient oozieClient = colohelper.getProcessHelper().getOozieClient();
+        String coordId =
+                getDefaultCoordinatorFromProcessName(colohelper, processName, bundleNumber);
+        return oozieClient.getCoordJobInfo(coordId).getStatus();
+    }
+
+    public static String getDefaultCoordinatorFromProcessName(
+            ColoHelper coloHelper, String processName, int bundleNumber) throws OozieClientException {
+        String bundleID =
+                getSequenceBundleID(coloHelper, processName, EntityType.PROCESS, bundleNumber);
+        return getDefaultCoordIDFromBundle(coloHelper.getClusterHelper().getOozieClient(), bundleID);
+    }
+
+    /**
+     * Retrieves all coordinators of bundle.
+     *
+     * @param bundleID specific bundle ID
+     * @param helper   entity helper which is related to job
+     * @return list of bundle coordinators
+     * @throws OozieClientException
+     */
+    public static List<CoordinatorJob> getBundleCoordinators(String bundleID,
+            IEntityManagerHelper helper)
+        throws OozieClientException {
+        OozieClient localOozieClient = helper.getOozieClient();
+        BundleJob bundleInfo = localOozieClient.getBundleJobInfo(bundleID);
+        return bundleInfo.getCoordinators();
+    }
+
+    /**
+     * Retrieves the latest bundle ID.
+     *
+     * @param coloHelper colo helper of cluster job is running on
+     * @param entityName name of entity job is related to
+     * @param entityType type of entity - feed or process expected
+     * @return latest bundle ID
+     * @throws OozieClientException
+     */
+    public static String getLatestBundleID(ColoHelper coloHelper,
+            String entityName, EntityType entityType)
+        throws OozieClientException {
+        return getLatestBundleID(coloHelper.getFeedHelper().getOozieClient(),
+                entityName, entityType);
+    }
+
+    /**
+     * Retrieves the latest bundle ID.
+     *
+     * @param oozieClient where job is running
+     * @param entityName  name of entity job is related to
+     * @param entityType  type of entity - feed or process expected
+     * @return latest bundle ID
+     * @throws OozieClientException
+     */
+    public static String getLatestBundleID(OozieClient oozieClient,
+            String entityName, EntityType entityType)
+        throws OozieClientException {
+        List<String> bundleIds = OozieUtil.getBundles(oozieClient,
+                entityName, entityType);
+        String max = "0";
+        int maxID = -1;
+        for (String strID : bundleIds) {
+            if (maxID < Integer.parseInt(strID.substring(0, strID.indexOf('-')))) {
+                maxID = Integer.parseInt(strID.substring(0, strID.indexOf('-')));
+                max = strID;
+            }
+        }
+        return max;
+    }
+
+    /**
+     * Retrieves ID of bundle related to some process/feed using its ordinal number.
+     *
+     * @param entityName   - name of entity bundle is related to
+     * @param entityType   - feed or process
+     * @param bundleNumber - ordinal number of bundle
+     * @return bundle ID
+     * @throws OozieClientException
+     */
+    public static String getSequenceBundleID(ColoHelper prismHelper, String entityName,
+            EntityType entityType, int bundleNumber)
+        throws OozieClientException {
+        return getSequenceBundleID(prismHelper.getClusterHelper().getOozieClient(), entityName,
+                entityType, bundleNumber);
+    }
+
+    /**
+     * Retrieves ID of bundle related to some process/feed using its ordinal number.
+     *
+     * @param entityName   - name of entity bundle is related to
+     * @param entityType   - feed or process
+     * @param bundleNumber - ordinal number of bundle
+     * @return bundle ID
+     * @throws OozieClientException
+     */
+    public static String getSequenceBundleID(OozieClient oozieClient, String entityName,
+            EntityType entityType, int bundleNumber)
+        throws OozieClientException {
+
+        //sequence start from 0
+        List<String> bundleIds = OozieUtil.getBundles(oozieClient,
+                entityName, entityType);
+        Map<Integer, String> bundleMap = new TreeMap<Integer, String>();
+        String bundleID;
+        for (String strID : bundleIds) {
+            LOGGER.info("getSequenceBundleID: " + strID);
+            int key = Integer.parseInt(strID.substring(0, strID.indexOf('-')));
+            bundleMap.put(key, strID);
+        }
+        for (Map.Entry<Integer, String> entry : bundleMap.entrySet()) {
+            LOGGER.info("Key = " + entry.getKey() + ", Value = " + entry.getValue());
+        }
+        int i = 0;
+        for (Integer key : bundleMap.keySet()) {
+            bundleID = bundleMap.get(key);
+            if (i == bundleNumber) {
+                return bundleID;
+            }
+            i++;
+        }
+        return null;
+    }
+
+    /**
+     * Retrieves status of one instance.
+     *
+     * @param coloHelper     - server from which instance status will be retrieved.
+     * @param processName    - name of process which mentioned instance belongs to.
+     * @param bundleNumber   - ordinal number of one of the bundle which are related to that
+     *                       process.
+     * @param instanceNumber - ordinal number of instance which state will be returned.
+     * @return - state of mentioned instance.
+     * @throws OozieClientException
+     */
+    public static CoordinatorAction.Status getInstanceStatus(ColoHelper coloHelper,
+            String processName,
+            int bundleNumber, int
+            instanceNumber) throws OozieClientException {
+        String bundleID = InstanceUtil
+                .getSequenceBundleID(coloHelper, processName, EntityType.PROCESS, bundleNumber);
+        if (StringUtils.isEmpty(bundleID)) {
+            return null;
+        }
+        String coordID = InstanceUtil.getDefaultCoordIDFromBundle(coloHelper.getClusterHelper().getOozieClient(),
+                bundleID);
+        if (StringUtils.isEmpty(coordID)) {
+            return null;
+        }
+        OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
+        CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID);
+        if (coordInfo == null) {
+            return null;
+        }
+        LOGGER.info("coordInfo = " + coordInfo);
+        List<CoordinatorAction> actions = coordInfo.getActions();
+        if (actions.size() == 0) {
+            return null;
+        }
+        LOGGER.info("actions = " + actions);
+        return actions.get(instanceNumber).getStatus();
+    }
+
+
+    public static void createHDFSFolders(ColoHelper helper, List<String> folderList)
+        throws IOException {
+        LOGGER.info("creating folders.....");
+        Configuration conf = new Configuration();
+        conf.set("fs.default.name", "hdfs://" + helper.getFeedHelper().getHadoopURL());
+        final FileSystem fs = FileSystem.get(conf);
+        for (final String folder : folderList) {
+            if (StringUtils.isNotEmpty(folder)) {
+                fs.mkdirs(new Path(folder));
+            }
+        }
+        LOGGER.info("created folders.....");
+    }
+
+    /**
+     * Sets one more cluster to feed.
+     *
+     * @param feed          feed which is to be modified
+     * @param feedValidity  validity of the feed on the cluster
+     * @param feedRetention set retention of the feed on the cluster
+     * @param clusterName   cluster name, if null would erase all the cluster details from the feed
+     * @param clusterType   cluster type
+     * @param partition     - partition where data is available for feed
+     * @param locations     - location where data is picked
+     * @return - string representation of the modified feed
+     */
+    public static String setFeedCluster(String feed, Validity feedValidity, Retention feedRetention,
+            String clusterName,
+            ClusterType clusterType, String partition,
+            String... locations) {
+        return setFeedClusterWithTable(feed, feedValidity, feedRetention, clusterName, clusterType,
+                partition, null, locations);
+    }
+
+    public static String setFeedClusterWithTable(String feed, Validity feedValidity,
+            Retention feedRetention, String clusterName,
+            ClusterType clusterType, String partition,
+            String tableUri, String... locations) {
+        Feed f = (Feed) Entity.fromString(EntityType.FEED, feed);
+        if (clusterName == null) {
+            f.getClusters().getClusters().clear();
+        } else {
+            Cluster feedCluster = createFeedCluster(feedValidity, feedRetention, clusterName,
+                    clusterType, partition, tableUri, locations);
+            f.getClusters().getClusters().add(feedCluster);
+        }
+        return f.toString();
+    }
+
+    private static CatalogTable getCatalogTable(String tableUri) {
+        CatalogTable catalogTable = new CatalogTable();
+        catalogTable.setUri(tableUri);
+        return catalogTable;
+    }
+
+    private static Cluster createFeedCluster(
+            Validity feedValidity, Retention feedRetention, String clusterName, ClusterType clusterType,
+            String partition, String tableUri, String[] locations) {
+
+        Cluster cluster = new Cluster();
+        cluster.setName(clusterName);
+        cluster.setRetention(feedRetention);
+        if (clusterType != null) {
+            cluster.setType(clusterType);
+        }
+        cluster.setValidity(feedValidity);
+        if (partition != null) {
+            cluster.setPartition(partition);
+        }
+
+        // if table uri is not empty or null then set it.
+        if (StringUtils.isNotEmpty(tableUri)) {
+            cluster.setTable(getCatalogTable(tableUri));
+        }
+        Locations feedLocations = new Locations();
+        if (ArrayUtils.isNotEmpty(locations)) {
+            for (int i = 0; i < locations.length; i++) {
+                Location oneLocation = new Location();
+                oneLocation.setPath(locations[i]);
+                if (i == 0) {
+                    oneLocation.setType(LocationType.DATA);
+                } else if (i == 1) {
+                    oneLocation.setType(LocationType.STATS);
+                } else if (i == 2) {
+                    oneLocation.setType(LocationType.META);
+                } else if (i == 3) {
+                    oneLocation.setType(LocationType.TMP);
+                } else {
+                    Assert.fail("unexpected value of locations: " + Arrays.toString(locations));
+                }
+                feedLocations.getLocations().add(oneLocation);
+            }
+            cluster.setLocations(feedLocations);
+        }
+        return cluster;
+    }
+
+    /**
+     * Retrieves replication coordinatorID from bundle of coordinators.
+     */
+    public static List<String> getReplicationCoordID(String bundlID,
+            IEntityManagerHelper helper)
+        throws OozieClientException {
+        List<CoordinatorJob> coords = InstanceUtil.getBundleCoordinators(bundlID, helper);
+        List<String> replicationCoordID = new ArrayList<String>();
+        for (CoordinatorJob coord : coords) {
+            if (coord.getAppName().contains("FEED_REPLICATION")) {
+                replicationCoordID.add(coord.getId());
+            }
+        }
+        return replicationCoordID;
+    }
+
+    /**
+     * Forms and sends process instance request based on url of action to be performed and it's
+     * parameters.
+     *
+     * @param colo - servers on which action should be performed
+     * @param user - whose credentials will be used for this action
+     * @return result from API
+     */
+    public static APIResult createAndSendRequestProcessInstance(
+            String url, String params, String colo, String user)
+        throws IOException, URISyntaxException, AuthenticationException {
+        if (params != null && !colo.equals("")) {
+            url = url + params + "&" + colo.substring(1);
+        } else if (params != null) {
+            url = url + params;
+        } else {
+            url = url + colo;
+        }
+        return InstanceUtil.sendRequestProcessInstance(url, user);
+    }
+
+    /**
+     * Retrieves prefix (main sub-folders) of feed data path.
+     */
+    public static String getFeedPrefix(String feed) {
+        Feed feedElement = (Feed) Entity.fromString(EntityType.FEED, feed);
+        String p = feedElement.getLocations().getLocations().get(0).getPath();
+        p = p.substring(0, p.indexOf('$'));
+        return p;
+    }
+
+    /**
+     * Sets one more cluster to process definition.
+     *
+     * @param process     - process definition string representation
+     * @param clusterName - name of cluster
+     * @param validity    - cluster validity
+     * @return - string representation of modified process
+     */
+    public static String setProcessCluster(String process, String clusterName,
+            org.apache.falcon.entity.v0.process.Validity validity) {
+        org.apache.falcon.entity.v0.process.Cluster c =
+                new org.apache.falcon.entity.v0.process.Cluster();
+        c.setName(clusterName);
+        c.setValidity(validity);
+        Process p = (Process) Entity.fromString(EntityType.PROCESS, process);
+
+        if (clusterName == null) {
+            p.getClusters().getClusters().set(0, null);
+        } else {
+            p.getClusters().getClusters().add(c);
+        }
+        return p.toString();
+    }
+
+    /**
+     * Adds one input into process.
+     *
+     * @param process - where input should be inserted
+     * @param feed    - feed which will be used as input feed
+     * @return - string representation of process definition
+     */
+    public static String addProcessInputFeed(String process, String feed, String feedName) {
+        Process processElement = (Process) Entity.fromString(EntityType.PROCESS, process);
+        Input in1 = processElement.getInputs().getInputs().get(0);
+        Input in2 = new Input();
+        in2.setEnd(in1.getEnd());
+        in2.setFeed(feed);
+        in2.setName(feedName);
+        in2.setPartition(in1.getPartition());
+        in2.setStart(in1.getStart());
+        processElement.getInputs().getInputs().add(in2);
+        return processElement.toString();
+    }
+
+    public static org.apache.oozie.client.WorkflowJob.Status getInstanceStatusFromCoord(
+            ColoHelper ua1, String coordID, int instanceNumber) throws OozieClientException {
+        OozieClient oozieClient = ua1.getProcessHelper().getOozieClient();
+        CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID);
+        String jobId = coordInfo.getActions().get(instanceNumber).getExternalId();
+        LOGGER.info("jobId = " + jobId);
+        if (jobId == null) {
+            return null;
+        }
+        WorkflowJob actionInfo = oozieClient.getJobInfo(jobId);
+        return actionInfo.getStatus();
+    }
+
+    public static List<String> getInputFoldersForInstanceForReplication(
+            ColoHelper coloHelper, String coordID, int instanceNumber) throws OozieClientException {
+        OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
+        CoordinatorAction x = oozieClient.getCoordActionInfo(coordID + "@" + instanceNumber);
+        String jobId = x.getExternalId();
+        WorkflowJob wfJob = oozieClient.getJobInfo(jobId);
+        return InstanceUtil.getReplicationFolderFromInstanceRunConf(wfJob.getConf());
+    }
+
+    public static List<String> getReplicationFolderFromInstanceRunConf(String runConf) {
+        String conf;
+        conf = runConf.substring(runConf.indexOf("falconInPaths</name>") + 20);
+        conf = conf.substring(conf.indexOf("<value>") + 7);
+        conf = conf.substring(0, conf.indexOf("</value>"));
+        return new ArrayList<String>(Arrays.asList(conf.split(",")));
+    }
+
+    public static int getInstanceRunIdFromCoord(ColoHelper colo, String coordID, int instanceNumber)
+        throws OozieClientException {
+        OozieClient oozieClient = colo.getProcessHelper().getOozieClient();
+        CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID);
+
+        WorkflowJob actionInfo =
+                oozieClient.getJobInfo(coordInfo.getActions().get(instanceNumber).getExternalId());
+        return actionInfo.getRun();
+    }
+
+
+    /**
+     * Sets new feed data path.
+     *
+     * @param feed feed which is to be modified
+     * @param path new feed data path
+     * @return modified feed
+     */
+    public static String setFeedFilePath(String feed, String path) {
+        Feed feedElement = (Feed) Entity.fromString(EntityType.FEED, feed);
+        feedElement.getLocations().getLocations().get(0).setPath(path);
+        return feedElement.toString();
+    }
+
+    public static int checkIfFeedCoordExist(IEntityManagerHelper helper,
+            String feedName, String coordType) throws OozieClientException {
+        LOGGER.info("feedName: " + feedName);
+        int numberOfCoord = 0;
+
+        if (OozieUtil.getBundles(helper.getOozieClient(), feedName, EntityType.FEED).size() == 0) {
+            return 0;
+        }
+        List<String> bundleID =
+                OozieUtil.getBundles(helper.getOozieClient(), feedName, EntityType.FEED);
+        LOGGER.info("bundleID: " + bundleID);
+
+        for (String aBundleID : bundleID) {
+            LOGGER.info("aBundleID: " + aBundleID);
+            OozieUtil.waitForCoordinatorJobCreation(helper.getOozieClient(), aBundleID);
+            List<CoordinatorJob> coords =
+                    InstanceUtil.getBundleCoordinators(aBundleID, helper);
+            LOGGER.info("coords: " + coords);
+            for (CoordinatorJob coord : coords) {
+                if (coord.getAppName().contains(coordType)) {
+                    numberOfCoord++;
+                }
+            }
+        }
+        return numberOfCoord;
+    }
+
+    /**
+     * Sets process frequency.
+     *
+     * @return modified process definition
+     */
+    public static String setProcessFrequency(String process,
+            Frequency frequency) {
+        Process p = (Process) Entity.fromString(EntityType.PROCESS, process);
+
+        p.setFrequency(frequency);
+
+        return p.toString();
+    }
+
+    /**
+     * Sets new process name.
+     */
+    public static String setProcessName(String process, String newName) {
+        Process p = (Process) Entity.fromString(EntityType.PROCESS, process);
+
+        p.setName(newName);
+
+        return p.toString();
+    }
+
+    /**
+     * Sets new process validity on all the process clusters.
+     *
+     * @param process   process entity to be modified
+     * @param startTime start of process validity
+     * @param endTime   end of process validity
+     * @return modified process definition
+     */
+    public static String setProcessValidity(String process,
+            String startTime, String endTime) {
+        Process processElement = (Process) Entity.fromString(EntityType.PROCESS, process);
+
+        for (int i = 0; i < processElement.getClusters().getClusters().size(); i++) {
+            processElement.getClusters().getClusters().get(i).getValidity().setStart(
+                    TimeUtil.oozieDateToDate(startTime).toDate());
+            processElement.getClusters().getClusters().get(i).getValidity()
+                    .setEnd(TimeUtil.oozieDateToDate(endTime).toDate());
+        }
+        return processElement.toString();
+    }
+
+    public static List<CoordinatorAction> getProcessInstanceListFromAllBundles(
+            ColoHelper coloHelper, String processName, EntityType entityType)
+        throws OozieClientException {
+        OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
+        List<CoordinatorAction> list = new ArrayList<CoordinatorAction>();
+        LOGGER.info("bundle size for process is "
+                +
+                OozieUtil.getBundles(coloHelper.getFeedHelper().getOozieClient(), processName,
+                        entityType).size());
+        for (String bundleId : OozieUtil.getBundles(coloHelper.getFeedHelper().getOozieClient(),
+                processName, entityType)) {
+            BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleId);
+            List<CoordinatorJob> coords = bundleInfo.getCoordinators();
+            LOGGER.info("number of coords in bundle " + bundleId + "=" + coords.size());
+            for (CoordinatorJob coord : coords) {
+                List<CoordinatorAction> actions =
+                        oozieClient.getCoordJobInfo(coord.getId()).getActions();
+                LOGGER.info("number of actions in coordinator " + coord.getId() + " is "
+                        +
+                        actions.size());
+                list.addAll(actions);
+            }
+        }
+        String coordId = getLatestCoordinatorID(coloHelper.getClusterHelper().getOozieClient(), processName,
+                entityType);
+        LOGGER.info("default coordID: " + coordId);
+        return list;
+    }
+
+    public static String getOutputFolderForInstanceForReplication(ColoHelper coloHelper,
+            String coordID,
+            int instanceNumber)
+        throws OozieClientException {
+        OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
+        CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID);
+        return InstanceUtil.getReplicatedFolderFromInstanceRunConf(
+                oozieClient.getJobInfo(coordInfo.getActions().get(instanceNumber).getExternalId())
+                        .getConf());
+    }
+
+    private static String getReplicatedFolderFromInstanceRunConf(
+            String runConf) {
+        String inputPathExample =
+                InstanceUtil.getReplicationFolderFromInstanceRunConf(runConf).get(0);
+        String postFix = inputPathExample
+                .substring(inputPathExample.length() - 7, inputPathExample.length());
+        return getReplicatedFolderBaseFromInstanceRunConf(runConf) + postFix;
+    }
+
+    public static String getOutputFolderBaseForInstanceForReplication(
+            ColoHelper coloHelper, String coordID, int instanceNumber) throws OozieClientException {
+        OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient();
+        CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID);
+
+        return InstanceUtil.getReplicatedFolderBaseFromInstanceRunConf(
+                oozieClient.getJobInfo(coordInfo.getActions().get(instanceNumber).getExternalId())
+                        .getConf());
+    }
+
+    private static String getReplicatedFolderBaseFromInstanceRunConf(String runConf) {
+        String conf = runConf.substring(runConf.indexOf("distcpTargetPaths</name>") + 24);
+        conf = conf.substring(conf.indexOf("<value>") + 7);
+        conf = conf.substring(0, conf.indexOf("</value>"));
+        return conf;
+    }
+
+    /**
+     * Waits till supplied number of instances of process/feed reach expected state during
+     * specific time.
+     *
+     * @param client             oozie client to retrieve info about instances
+     * @param entityName         name of feed or process
+     * @param instancesNumber     instance number for which we wait to reach the required status
+     * @param expectedStatus     expected status we are waiting for
+     * @param entityType         type of entity - feed or process expected
+     * @param totalMinutesToWait time in minutes for which instance state should be polled
+     * @throws OozieClientException
+     */
+    public static void waitTillInstanceReachState(OozieClient client, String entityName,
+            int instancesNumber,
+            CoordinatorAction.Status expectedStatus,
+            EntityType entityType, int totalMinutesToWait)
+        throws OozieClientException {
+        String filter;
+        // get the bundle ids
+        if (entityType.equals(EntityType.FEED)) {
+            filter = "name=FALCON_FEED_" + entityName;
+        } else {
+            filter = "name=FALCON_PROCESS_" + entityName;
+        }
+        List<BundleJob> bundleJobs = new ArrayList<BundleJob>();
+        for (int retries = 0; retries < 20; ++retries) {
+            bundleJobs = OozieUtil.getBundles(client, filter, 0, 10);
+            if (bundleJobs.size() > 0) {
+                break;
+            }
+            TimeUtil.sleepSeconds(5);
+        }
+        if (bundleJobs.size() == 0) {
+            Assert.assertTrue(false, "Could not retrieve bundles");
+        }
+        List<String> bundleIds = OozieUtil.getBundleIds(bundleJobs);
+        String bundleId = OozieUtil.getMaxId(bundleIds);
+        LOGGER.info(String.format("Using bundle %s", bundleId));
+        final String coordId;
+        final Status bundleStatus = client.getBundleJobInfo(bundleId).getStatus();
+        Assert.assertTrue(RUNNING_PREP_SUCCEEDED.contains(bundleStatus),
+                String.format("Bundle job %s is should be prep/running but is %s", bundleId,
+                        bundleStatus));
+        OozieUtil.waitForCoordinatorJobCreation(client, bundleId);
+        List<CoordinatorJob> coords = client.getBundleJobInfo(bundleId).getCoordinators();
+        List<String> cIds = new ArrayList<String>();
+        if (entityType == EntityType.PROCESS) {
+            for (CoordinatorJob coord : coords) {
+                cIds.add(coord.getId());
+            }
+            coordId = OozieUtil.getMinId(cIds);
+        } else {
+            for (CoordinatorJob coord : coords) {
+                if (coord.getAppName().contains("FEED_REPLICATION")) {
+                    cIds.add(coord.getId());
+                }
+            }
+            coordId = cIds.get(0);
+        }
+        LOGGER.info(String.format("Using coordinator id: %s", coordId));
+        int maxTries = 50;
+        int totalSleepTime = totalMinutesToWait * 60;
+        int sleepTime = totalSleepTime / maxTries;
+        LOGGER.info(String.format("Sleep for %d seconds", sleepTime));
+        for (int i = 0; i < maxTries; i++) {
+            LOGGER.info(String.format("Try %d of %d", (i + 1), maxTries));
+            CoordinatorJob coordinatorJob = client.getCoordJobInfo(coordId);
+            final Status coordinatorStatus = coordinatorJob.getStatus();
+            Assert.assertTrue(RUNNING_PREP_SUCCEEDED.contains(coordinatorStatus),
+                    String.format("Coordinator %s should be running/prep but is %s.", coordId,
+                            coordinatorStatus));
+            List<CoordinatorAction> coordinatorActions = coordinatorJob.getActions();
+            int instanceWithStatus = 0;
+            for (CoordinatorAction coordinatorAction : coordinatorActions) {
+                LOGGER.info(String.format("Coordinator Action %s status is %s on oozie %s",
+                    coordinatorAction.getId(), coordinatorAction.getStatus(), client.getOozieUrl()));
+                if (expectedStatus == coordinatorAction.getStatus()) {
+                    instanceWithStatus++;
+                }
+            }
+
+            if (instanceWithStatus >= instancesNumber) {
+                return;
+            } else {
+                TimeUtil.sleepSeconds(sleepTime);
+            }
+        }
+        Assert.assertTrue(false, "expected state of instance was never reached");
+    }
+
+    /**
+     * Waits till supplied number of instances of process/feed reach expected state during
+     * specific time.
+     *
+     * @param client           oozie client to retrieve info about instances
+     * @param entityName       name of feed or process
+     * @param numberOfInstance number of instances which status we are waiting for
+     * @param expectedStatus   expected status we are waiting for
+     * @param entityType       type of entity - feed or process expected
+     */
+    public static void waitTillInstanceReachState(OozieClient client, String entityName,
+            int numberOfInstance,
+            CoordinatorAction.Status expectedStatus,
+            EntityType entityType)
+        throws OozieClientException {
+        int totalMinutesToWait = getMinutesToWait(entityType, expectedStatus);
+        waitTillInstanceReachState(client, entityName, numberOfInstance, expectedStatus,
+                entityType, totalMinutesToWait);
+    }
+
+    /**
+     * Waits till bundle job will reach expected status.
+     * Generates time according to expected status.
+     *
+     * @param coloHelper     colo helper of cluster job is running on
+     * @param processName    name of process which job is being analyzed
+     * @param expectedStatus job status we are waiting for
+     * @throws OozieClientException
+     */
+    public static void waitForBundleToReachState(ColoHelper coloHelper,
+            String processName, Job.Status expectedStatus) throws
+            OozieClientException {
+        int totalMinutesToWait = getMinutesToWait(expectedStatus);
+        waitForBundleToReachState(coloHelper, processName, expectedStatus, totalMinutesToWait);
+    }
+
+    /**
+     * Waits till bundle job will reach expected status during specific time.
+     * Use it directly in test cases when timeouts are different from trivial, in other cases use
+     * waitForBundleToReachState(ColoHelper, String, Status)
+     *
+     * @param coloHelper         colo helper of cluster job is running on
+     * @param processName        name of process which job is being analyzed
+     * @param expectedStatus     job status we are waiting for
+     * @param totalMinutesToWait specific time to wait expected state
+     * @throws OozieClientException
+     */
+    public static void waitForBundleToReachState(ColoHelper coloHelper,
+            String processName, Job.Status expectedStatus, int totalMinutesToWait) throws
+            OozieClientException {
+
+        int sleep = totalMinutesToWait * 60 / 20;
+        for (int sleepCount = 0; sleepCount < sleep; sleepCount++) {
+            String bundleID =
+                    InstanceUtil.getLatestBundleID(coloHelper, processName, EntityType.PROCESS);
+            OozieClient oozieClient =
+                    coloHelper.getProcessHelper().getOozieClient();
+            BundleJob j = oozieClient.getBundleJobInfo(bundleID);
+            if (j.getStatus() == expectedStatus) {
+                break;
+            }
+            TimeUtil.sleepSeconds(20);
+        }
+    }
+
+    /**
+     * Generates time which is presumably needed for process/feed instances to reach particular
+     * state.
+     * Feed instances are running faster then process, so feed timeouts are less then process.
+     *
+     * @param entityType     type of entity which instances status we are waiting for
+     * @param expectedStatus expected status we are waiting for
+     * @return minutes to wait for expected status
+     */
+    private static int getMinutesToWait(EntityType entityType,
+            CoordinatorAction.Status expectedStatus) {
+        switch (expectedStatus) {
+        case RUNNING:
+            if (entityType == EntityType.PROCESS) {
+                return OSUtil.IS_WINDOWS ? 20 : 10;
+            } else if (entityType == EntityType.FEED) {
+                return OSUtil.IS_WINDOWS ? 10 : 5;
+            }
+        case WAITING:
+            return OSUtil.IS_WINDOWS ? 6 : 3;
+        case SUCCEEDED:
+            if (entityType == EntityType.PROCESS) {
+                return OSUtil.IS_WINDOWS ? 25 : 15;
+            } else if (entityType == EntityType.FEED) {
+                return OSUtil.IS_WINDOWS ? 20 : 10;
+            }
+        case KILLED:
+        case TIMEDOUT:
+            return OSUtil.IS_WINDOWS ? 40 : 20;
+        default:
+            return OSUtil.IS_WINDOWS ? 30 : 15;
+        }
+    }
+
+    /**
+     * Generates time which is presumably needed for bundle job to reach particular state.
+     *
+     * @param expectedStatus status which we are expect to get from bundle job
+     * @return minutes to wait for expected status
+     */
+    private static int getMinutesToWait(Job.Status expectedStatus) {
+        switch (expectedStatus) {
+        case DONEWITHERROR:
+        case SUCCEEDED:
+            return OSUtil.IS_WINDOWS ? 40 : 20;
+        case KILLED:
+            return OSUtil.IS_WINDOWS ? 30 : 15;
+        default:
+            return OSUtil.IS_WINDOWS ? 60 : 30;
+        }
+    }
+
+    /**
+     * Sets feed frequency.
+     *
+     * @return modified feed
+     */
+    public static String setFeedFrequency(String feed, Frequency f) {
+        Feed feedElement = (Feed) Entity.fromString(EntityType.FEED, feed);
+        feedElement.setFrequency(f);
+        return feedElement.toString();
+    }
+
+    /**
+     * Waits till instances of specific job will be created during specific time.
+     * Use this method directly in unusual test cases where timeouts are different from trivial.
+     * In other cases use waitTillInstancesAreCreated(ColoHelper,String,int)
+     *
+     * @param coloHelper  colo helper of cluster job is running on
+     * @param entity      definition of entity which describes job
+     * @param bundleSeqNo bundle number if update has happened.
+     * @throws OozieClientException
+     */
+    public static void waitTillInstancesAreCreated(ColoHelper coloHelper,
+            String entity,
+            int bundleSeqNo,
+            int totalMinutesToWait
+    ) throws OozieClientException {
+        String entityName = Util.readEntityName(entity);
+        EntityType type = Util.getEntityType(entity);
+        waitTillInstancesAreCreated(coloHelper, entityName, type, bundleSeqNo, totalMinutesToWait);
+    }
+
+    /**
+     * Waits till instances of specific job will be created during specific time.
+     * Use this method directly in unusual test cases where timeouts are different from trivial.
+     * In other cases use waitTillInstancesAreCreated(ColoHelper,String,int)
+     *
+     * @param coloHelper  colo helper of cluster job is running on
+     * @param entityName  name of entity job is related to
+     * @param type        type of entity
+     * @param bundleSeqNo bundle number if update has happened.
+     * @throws OozieClientException
+     */
+    public static void waitTillInstancesAreCreated(ColoHelper coloHelper,
+            String entityName,
+            EntityType type,
+            int bundleSeqNo,
+            int totalMinutesToWait
+    ) throws OozieClientException {
+        String bundleID = getSequenceBundleID(coloHelper, entityName, type,
+                bundleSeqNo);
+        String coordID = getDefaultCoordIDFromBundle(coloHelper.getClusterHelper().getOozieClient(), bundleID);
+        for (int sleepCount = 0; sleepCount < totalMinutesToWait; sleepCount++) {
+            CoordinatorJob coordInfo = coloHelper.getProcessHelper().getOozieClient()
+                    .getCoordJobInfo(coordID);
+
+            if (coordInfo.getActions().size() > 0) {
+                break;
+            }
+            LOGGER.info("Coord " + coordInfo.getId() + " still doesn't have "
+                    +
+                    "instance created on oozie: " + coloHelper.getProcessHelper()
+                    .getOozieClient().getOozieUrl());
+            TimeUtil.sleepSeconds(5);
+        }
+    }
+
+    /**
+     * Waits till instances of specific job will be created during timeout.
+     * Timeout is common for most of usual test cases.
+     *
+     * @param coloHelper  colo helper of cluster job is running on
+     * @param entity      definition of entity which describes job
+     * @param bundleSeqNo bundle number if update has happened.
+     * @throws OozieClientException
+     */
+    public static void waitTillInstancesAreCreated(ColoHelper coloHelper,
+            String entity,
+            int bundleSeqNo
+    ) throws OozieClientException {
+        int sleep = INSTANCES_CREATED_TIMEOUT * 60 / 5;
+        waitTillInstancesAreCreated(coloHelper, entity, bundleSeqNo, sleep);
+    }
+
+    public static String setFeedACL(String feed, String... ownerGroup) {
+        FeedMerlin feedObject = new FeedMerlin(feed);
+        ACL acl = feedObject.getACL();
+        acl.setOwner(MerlinConstants.ACL_OWNER);
+        acl.setGroup(MerlinConstants.ACL_GROUP);
+        if (ownerGroup.length > 0) {
+            acl.setOwner(ownerGroup[0]);
+            if (ownerGroup.length == 2) {
+                acl.setGroup(ownerGroup[1]);
+            }
+        }
+        feedObject.setACL(acl);
+        return feedObject.toString();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/KerberosHelper.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/KerberosHelper.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/KerberosHelper.java
new file mode 100644
index 0000000..71d728e
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/KerberosHelper.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
+import org.testng.Assert;
+import org.apache.log4j.Logger;
+
+/**
+ * Util methods for Kerberos.
+ */
+public final class KerberosHelper {
+    private KerberosHelper() {
+        throw new AssertionError("Instantiating utility class...");
+    }
+
+    private static final Logger LOGGER = Logger.getLogger(KerberosHelper.class);
+
+    public static void loginFromKeytab(String user) {
+        if (!MerlinConstants.IS_SECURE) {
+            LOGGER.info("Kerberos is disabled, hence no user switching.");
+            return;
+        }
+        if (user == null) {
+            user = MerlinConstants.CURRENT_USER_NAME;
+        }
+        final String keytab = MerlinConstants.getKeytabForUser(user);
+        final String command = String.format("kinit -kt %s %s", keytab, user);
+        final int exitVal = ExecUtil.executeCommandGetExitCode(command);
+        Assert.assertEquals(exitVal, 0, "Switching Kerberos credential did not succeed.");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java
new file mode 100644
index 0000000..86d4d47
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Util methods related to OS.
+ */
+public final class OSUtil {
+    private OSUtil() {
+        throw new AssertionError("Instantiating utility class...");
+    }
+
+    public static final boolean IS_WINDOWS = System.getProperty("os.name").toLowerCase().startsWith("windows");
+    public static final String SEPARATOR = System.getProperty("file.separator", "/");
+    public static final String RESOURCES =
+        String.format("src%stest%sresources%s", SEPARATOR, SEPARATOR, SEPARATOR);
+    public static final String RESOURCES_OOZIE = String.format(RESOURCES + "oozie%s", SEPARATOR);
+    public static final String OOZIE_EXAMPLE_INPUT_DATA =
+        String.format(RESOURCES + "OozieExampleInputData%s", SEPARATOR);
+    public static final String OOZIE_EXAMPLE_INPUT_LATE_INPUT =
+        OSUtil.OOZIE_EXAMPLE_INPUT_DATA + "lateData";
+    public static final String NORMAL_INPUT =
+        String.format(OOZIE_EXAMPLE_INPUT_DATA + "normalInput%s", SEPARATOR);
+    public static final String SINGLE_FILE =
+        String.format(OOZIE_EXAMPLE_INPUT_DATA + "SingleFile%s", SEPARATOR);
+
+    public static String getPath(String... pathParts) {
+        return StringUtils.join(pathParts, OSUtil.SEPARATOR);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
new file mode 100644
index 0000000..e22416a
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
@@ -0,0 +1,472 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.oozie.client.AuthOozieClient;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.XOozieClient;
+import org.joda.time.DateTime;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.Assert;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * helper methods for oozie .
+ */
+public final class OozieUtil {
+    private OozieUtil() {
+        throw new AssertionError("Instantiating utility class...");
+    }
+
+    private static final Logger LOGGER = Logger.getLogger(OozieUtil.class);
+
+    public static AuthOozieClient getClient(String url) {
+        return new AuthOozieClient(url);
+    }
+
+    public static List<BundleJob> getBundles(OozieClient client, String filter, int start, int len)
+        throws OozieClientException {
+        LOGGER.info("Connecting to oozie: " + client.getOozieUrl());
+        return client.getBundleJobsInfo(filter, start, len);
+    }
+
+    public static List<String> getBundleIds(OozieClient client, String filter, int start, int len)
+        throws OozieClientException {
+        return getBundleIds(getBundles(client, filter, start, len));
+    }
+
+    public static List<String> getBundleIds(List<BundleJob> bundles) {
+        List<String> ids = new ArrayList<String>();
+        for (BundleJob bundle : bundles) {
+            LOGGER.info("Bundle Id: " + bundle.getId());
+            ids.add(bundle.getId());
+        }
+        return ids;
+    }
+
+    public static List<Job.Status> getBundleStatuses(OozieClient client, String filter, int start,
+                                                     int len) throws OozieClientException {
+        return getBundleStatuses(getBundles(client, filter, start, len));
+    }
+
+    public static List<Job.Status> getBundleStatuses(List<BundleJob> bundles) {
+        List<Job.Status> statuses = new ArrayList<Job.Status>();
+        for (BundleJob bundle : bundles) {
+            LOGGER.info("bundle: " + bundle);
+            statuses.add(bundle.getStatus());
+        }
+        return statuses;
+    }
+
+    public static String getMaxId(List<String> ids) {
+        String oozieId = ids.get(0);
+        int maxInt = Integer.valueOf(oozieId.split("-")[0]);
+        for (int i = 1; i < ids.size(); i++) {
+            String currentId = ids.get(i);
+            int currInt = Integer.valueOf(currentId.split("-")[0]);
+            if (currInt > maxInt) {
+                oozieId = currentId;
+            }
+        }
+        return oozieId;
+    }
+
+    public static String getMinId(List<String> ids) {
+        String oozieId = ids.get(0);
+        int minInt = Integer.valueOf(oozieId.split("-")[0]);
+        for (int i = 1; i < ids.size(); i++) {
+            String currentId = ids.get(i);
+            int currInt = Integer.valueOf(currentId.split("-")[0]);
+            if (currInt < minInt) {
+                oozieId = currentId;
+            }
+        }
+        return oozieId;
+    }
+
+    /**
+     * @param bundleID bundle number
+     * @param oozieClient oozie client
+     * @return list of action ids of the succeeded retention workflow
+     * @throws OozieClientException
+     */
+    public static List<String> waitForRetentionWorkflowToSucceed(String bundleID,
+                                                                 OozieClient oozieClient)
+        throws OozieClientException {
+        LOGGER.info("Connecting to oozie: " + oozieClient.getOozieUrl());
+        List<String> jobIds = new ArrayList<String>();
+        LOGGER.info("using bundleId:" + bundleID);
+        waitForCoordinatorJobCreation(oozieClient, bundleID);
+        final String coordinatorId =
+            oozieClient.getBundleJobInfo(bundleID).getCoordinators().get(0).getId();
+        LOGGER.info("using coordinatorId: " + coordinatorId);
+
+        for (int i = 0;
+             i < 120 && oozieClient.getCoordJobInfo(coordinatorId).getActions().isEmpty(); ++i) {
+            TimeUtil.sleepSeconds(4);
+        }
+        Assert.assertFalse(oozieClient.getCoordJobInfo(coordinatorId).getActions().isEmpty(),
+            "Coordinator actions should have got created by now.");
+
+        final List<CoordinatorAction> actions =
+            oozieClient.getCoordJobInfo(coordinatorId).getActions();
+        LOGGER.info("actions: " + actions);
+
+        for (CoordinatorAction action : actions) {
+            for (int i = 0; i < 180; ++i) {
+                CoordinatorAction actionInfo = oozieClient.getCoordActionInfo(action.getId());
+                LOGGER.info("actionInfo: " + actionInfo);
+                if (EnumSet.of(CoordinatorAction.Status.SUCCEEDED, CoordinatorAction.Status.KILLED,
+                    CoordinatorAction.Status.FAILED).contains(actionInfo.getStatus())) {
+                    break;
+                }
+                TimeUtil.sleepSeconds(10);
+            }
+            Assert.assertEquals(
+                oozieClient.getCoordActionInfo(action.getId()).getStatus(),
+                CoordinatorAction.Status.SUCCEEDED,
+                "Action did not succeed.");
+            jobIds.add(action.getId());
+
+        }
+
+        return jobIds;
+
+    }
+
+    public static void waitForCoordinatorJobCreation(OozieClient oozieClient, String bundleID)
+        throws OozieClientException {
+        LOGGER.info("Connecting to oozie: " + oozieClient.getOozieUrl());
+        for (int i = 0;
+             i < 60 && oozieClient.getBundleJobInfo(bundleID).getCoordinators().isEmpty(); ++i) {
+            TimeUtil.sleepSeconds(2);
+        }
+        Assert.assertFalse(oozieClient.getBundleJobInfo(bundleID).getCoordinators().isEmpty(),
+            "Coordinator job should have got created by now.");
+    }
+
+    public static Job.Status getOozieJobStatus(OozieClient client, String processName,
+                                               EntityType entityType)
+        throws OozieClientException {
+        String filter = String.format("name=FALCON_%s_%s", entityType, processName);
+        List<Job.Status> statuses = getBundleStatuses(client, filter, 0, 10);
+        if (statuses.isEmpty()) {
+            return null;
+        } else {
+            return statuses.get(0);
+        }
+    }
+
+    public static List<String> getBundles(OozieClient client, String entityName,
+                                          EntityType entityType)
+        throws OozieClientException {
+        String filter = "name=FALCON_" + entityType + "_" + entityName;
+        return getBundleIds(client, filter, 0, 10);
+    }
+
+    public static List<DateTime> getStartTimeForRunningCoordinators(ColoHelper prismHelper,
+                                                                    String bundleID)
+        throws OozieClientException {
+        List<DateTime> startTimes = new ArrayList<DateTime>();
+
+        XOozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient();
+        BundleJob bundleJob = oozieClient.getBundleJobInfo(bundleID);
+        CoordinatorJob jobInfo;
+
+
+        for (CoordinatorJob job : bundleJob.getCoordinators()) {
+
+            if (job.getAppName().contains("DEFAULT")) {
+                jobInfo = oozieClient.getCoordJobInfo(job.getId());
+                for (CoordinatorAction action : jobInfo.getActions()) {
+                    DateTime temp = new DateTime(action.getCreatedTime(), DateTimeZone.UTC);
+                    LOGGER.info(temp);
+                    startTimes.add(temp);
+                }
+            }
+
+            Collections.sort(startTimes);
+
+            if (!(startTimes.isEmpty())) {
+                return startTimes;
+            }
+        }
+
+        return null;
+    }
+
+    public static boolean verifyOozieJobStatus(OozieClient client, String processName,
+                                               EntityType entityType, Job.Status expectedStatus)
+        throws OozieClientException {
+        for (int seconds = 0; seconds < 100; seconds+=5) {
+            Job.Status status = getOozieJobStatus(client, processName, entityType);
+            LOGGER.info("Current status: " + status);
+            if (status == expectedStatus) {
+                return true;
+            }
+            TimeUtil.sleepSeconds(5);
+        }
+        return false;
+    }
+
+    public static List<String> getMissingDependencies(ColoHelper helper, String bundleID)
+        throws OozieClientException {
+        BundleJob bundleJob = helper.getClusterHelper().getOozieClient().getBundleJobInfo(bundleID);
+        CoordinatorJob jobInfo =
+            helper.getClusterHelper().getOozieClient().getCoordJobInfo(
+                bundleJob.getCoordinators().get(0).getId());
+        List<CoordinatorAction> actions = jobInfo.getActions();
+
+        if (actions.size() < 1) {
+            return null;
+        }
+        LOGGER.info("conf from event: " + actions.get(0).getMissingDependencies());
+
+        String[] missingDependencies = actions.get(0).getMissingDependencies().split("#");
+        return new ArrayList<String>(Arrays.asList(missingDependencies));
+    }
+
+    public static List<String> getWorkflowJobs(ColoHelper prismHelper, String bundleID)
+        throws OozieClientException {
+        XOozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient();
+        waitForCoordinatorJobCreation(oozieClient, bundleID);
+        List<String> workflowIds = new ArrayList<String>();
+        List<CoordinatorJob> coordJobs = oozieClient.getBundleJobInfo(bundleID).getCoordinators();
+        CoordinatorJob coordJobInfo = oozieClient.getCoordJobInfo(coordJobs.get(0).getId());
+
+        for (CoordinatorAction action : coordJobInfo.getActions()) {
+            workflowIds.add(action.getExternalId());
+        }
+        return workflowIds;
+    }
+
+    public static Date getNominalTime(ColoHelper prismHelper, String bundleID)
+        throws OozieClientException {
+        XOozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient();
+        BundleJob bundleJob = oozieClient.getBundleJobInfo(bundleID);
+        CoordinatorJob jobInfo =
+            oozieClient.getCoordJobInfo(bundleJob.getCoordinators().get(0).getId());
+        List<CoordinatorAction> actions = jobInfo.getActions();
+
+        return actions.get(0).getNominalTime();
+
+    }
+
+    public static CoordinatorJob getDefaultOozieCoord(ColoHelper prismHelper, String bundleId,
+                                                      EntityType type)
+        throws OozieClientException {
+        XOozieClient client = prismHelper.getClusterHelper().getOozieClient();
+        BundleJob bundlejob = client.getBundleJobInfo(bundleId);
+
+        for (CoordinatorJob coord : bundlejob.getCoordinators()) {
+            if ((coord.getAppName().contains("DEFAULT") && EntityType.PROCESS == type)
+                    ||
+                (coord.getAppName().contains("REPLICATION") && EntityType.FEED == type)) {
+                return client.getCoordJobInfo(coord.getId());
+            } else {
+                LOGGER.info("Desired coord does not exists on " + client.getOozieUrl());
+            }
+        }
+
+        return null;
+    }
+
+    public static int getNumberOfWorkflowInstances(ColoHelper prismHelper, String bundleId)
+        throws OozieClientException {
+        return getDefaultOozieCoord(prismHelper, bundleId,
+            EntityType.PROCESS).getActions().size();
+    }
+
+    public static List<String> getActionsNominalTime(ColoHelper prismHelper,
+                                                     String bundleId,
+                                                     EntityType type)
+        throws OozieClientException {
+        Map<Date, CoordinatorAction.Status> actions = getActionsNominalTimeAndStatus(prismHelper, bundleId, type);
+        List<String> nominalTime = new ArrayList<String>();
+        for (Date date : actions.keySet()) {
+            nominalTime.add(date.toString());
+        }
+        return nominalTime;
+    }
+    public static Map<Date, CoordinatorAction.Status> getActionsNominalTimeAndStatus(ColoHelper prismHelper,
+            String bundleId, EntityType type) throws OozieClientException {
+        Map<Date, CoordinatorAction.Status> result = new TreeMap<Date, CoordinatorAction.Status>();
+        List<CoordinatorAction> actions = getDefaultOozieCoord(prismHelper,
+                bundleId, type).getActions();
+        for (CoordinatorAction action : actions) {
+            result.put(action.getNominalTime(), action.getStatus());
+        }
+        return result;
+    }
+
+    public static boolean isBundleOver(ColoHelper coloHelper, String bundleId)
+        throws OozieClientException {
+        XOozieClient client = coloHelper.getClusterHelper().getOozieClient();
+
+        BundleJob bundleJob = client.getBundleJobInfo(bundleId);
+
+        if (EnumSet.of(BundleJob.Status.DONEWITHERROR, BundleJob.Status.FAILED,
+            BundleJob.Status.SUCCEEDED, BundleJob.Status.KILLED).contains(bundleJob.getStatus())) {
+            return true;
+        }
+
+        TimeUtil.sleepSeconds(20);
+        return false;
+    }
+
+    public static void verifyNewBundleCreation(ColoHelper cluster,
+                                               String originalBundleId,
+                                               List<String>
+                                                   initialNominalTimes,
+                                               String entity,
+                                               boolean shouldBeCreated,
+
+                                               boolean matchInstances) throws OozieClientException {
+        String entityName = Util.readEntityName(entity);
+        EntityType entityType = Util.getEntityType(entity);
+        String newBundleId = InstanceUtil.getLatestBundleID(cluster, entityName,
+            entityType);
+        if (shouldBeCreated) {
+            Assert.assertTrue(!newBundleId.equalsIgnoreCase(originalBundleId),
+                "eeks! new bundle is not getting created!!!!");
+            LOGGER.info("old bundleId=" + originalBundleId + " on oozie: "
+                    +
+                "" + cluster.getProcessHelper().getOozieClient().getOozieUrl());
+            LOGGER.info("new bundleId=" + newBundleId + " on oozie: "
+                    +
+                "" + cluster.getProcessHelper().getOozieClient().getOozieUrl());
+            if (matchInstances) {
+                validateNumberOfWorkflowInstances(cluster,
+                        initialNominalTimes, originalBundleId, newBundleId, entityType);
+            }
+        } else {
+            Assert.assertEquals(newBundleId,
+                originalBundleId, "eeks! new bundle is getting created!!!!");
+        }
+    }
+
+    private static void validateNumberOfWorkflowInstances(ColoHelper cluster,
+                                                          List<String> initialNominalTimes,
+                                                          String originalBundleId,
+                                                          String newBundleId, EntityType type)
+        throws OozieClientException {
+
+        List<String> nominalTimesOriginalAndNew = getActionsNominalTime(cluster,
+                originalBundleId, type);
+
+        nominalTimesOriginalAndNew.addAll(getActionsNominalTime(cluster,
+            newBundleId, type));
+
+        initialNominalTimes.removeAll(nominalTimesOriginalAndNew);
+
+        if (initialNominalTimes.size() != 0) {
+            LOGGER.debug("Missing instance are : " + initialNominalTimes);
+            LOGGER.debug("Original Bundle ID   : " + originalBundleId);
+            LOGGER.debug("New Bundle ID        : " + newBundleId);
+
+            Assert.assertFalse(true, "some instances have gone missing after "
+                    +
+                "update");
+        }
+    }
+
+    public static String getCoordStartTime(ColoHelper colo, String entity,
+                                           int bundleNo)
+        throws OozieClientException {
+        String bundleID = InstanceUtil.getSequenceBundleID(colo,
+            Util.readEntityName(entity), Util.getEntityType(entity), bundleNo);
+
+        CoordinatorJob coord = getDefaultOozieCoord(colo, bundleID,
+            Util.getEntityType(entity));
+
+        return TimeUtil.dateToOozieDate(coord.getStartTime()
+        );
+    }
+
+    public static DateTimeFormatter getOozieDateTimeFormatter() {
+        return DateTimeFormat.forPattern("yyyy'-'MM'-'dd'T'HH':'mm'Z'");
+    }
+
+    public static int getNumberOfBundle(ColoHelper helper, EntityType type, String entityName)
+        throws OozieClientException {
+        return OozieUtil.getBundles(helper.getFeedHelper().getOozieClient(),
+            entityName, type).size();
+    }
+
+    public static void createMissingDependencies(ColoHelper helper, EntityType type,
+                                                 String entityName, int bundleNumber,
+                                                 int instanceNumber)
+        throws OozieClientException, IOException {
+        String bundleID = InstanceUtil.getSequenceBundleID(helper, entityName, type, bundleNumber);
+        OozieClient oozieClient = helper.getClusterHelper().getOozieClient();
+        List<CoordinatorJob> coords = oozieClient.getBundleJobInfo(bundleID).getCoordinators();
+        InstanceUtil.createHDFSFolders(helper, getMissingDependenciesForInstance(oozieClient, coords,
+                instanceNumber));
+    }
+
+    private static List<String> getMissingDependenciesForInstance(OozieClient oozieClient,
+            List<CoordinatorJob> coords, int instanceNumber)
+        throws OozieClientException {
+        ArrayList<String> missingPaths = new ArrayList<String>();
+        for (CoordinatorJob coord : coords) {
+
+            CoordinatorJob temp = oozieClient.getCoordJobInfo(coord.getId());
+            CoordinatorAction instance = temp.getActions().get(instanceNumber);
+            missingPaths.addAll(Arrays.asList(instance.getMissingDependencies().split("#")));
+        }
+        return missingPaths;
+    }
+
+    public static void createMissingDependencies(ColoHelper helper, EntityType type,
+                                                 String entityName, int bundleNumber)
+        throws OozieClientException, IOException {
+        String bundleID = InstanceUtil.getSequenceBundleID(helper, entityName, type, bundleNumber);
+        OozieClient oozieClient = helper.getClusterHelper().getOozieClient();
+        List<CoordinatorJob> coords = oozieClient.getBundleJobInfo(bundleID).getCoordinators();
+        for (CoordinatorJob coord : coords) {
+
+            CoordinatorJob temp = oozieClient.getCoordJobInfo(coord.getId());
+            for (int instanceNumber = 0; instanceNumber < temp.getActions().size();
+                 instanceNumber++) {
+                CoordinatorAction instance = temp.getActions().get(instanceNumber);
+                InstanceUtil.createHDFSFolders(helper,
+                    Arrays.asList(instance.getMissingDependencies().split("#")));
+            }
+        }
+    }
+}