You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:18 UTC

[19/27] adding falcon-regression

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/main/java/org/apache/falcon/regression/ui/pages/ProcessPage.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/main/java/org/apache/falcon/regression/ui/pages/ProcessPage.java b/falcon-regression/merlin/src/main/java/org/apache/falcon/regression/ui/pages/ProcessPage.java
new file mode 100644
index 0000000..9dd5be2
--- /dev/null
+++ b/falcon-regression/merlin/src/main/java/org/apache/falcon/regression/ui/pages/ProcessPage.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.ui.pages;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.log4j.Logger;
+import org.openqa.selenium.By;
+import org.openqa.selenium.WebDriver;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.interactions.Actions;
+import org.openqa.selenium.Point;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class ProcessPage extends EntityPage<Process> {
+
+    private static final Logger logger = Logger.getLogger(ProcessPage.class);
+    private boolean isLineageOpened = false;
+
+    private final static String INSTANCES_PANEL = "//div[@id='panel-instance']//span";
+    private final static String INSTANCE_STATUS_TEMPLATE = INSTANCES_PANEL + "[contains(..,'%s')]";
+    private final static String LINEAGE_LINK_TEMPLATE =
+        "//a[@class='lineage-href' and @data-instance-name='%s']";
+
+    //Lineage information xpaths
+    private static final String CLOSE_LINEAGE_LINK_TEMPLATE =
+        "//body[@class='modal-open']//button[contains(., 'Close')]";
+    private static final String LINEAGE_MODAL = "//div[@id='lineage-modal']";
+    private static final String SVG_ELEMENT = "//*[name() = 'svg']/*[name()='g']/*[name()='g']";
+    private static final String VERTICES_BLOCKS = SVG_ELEMENT + "[not(@class='lineage-link')]";
+    private static final String VERTICES_TEXT = VERTICES_BLOCKS +
+        "//div[@class='lineage-node-text']";
+    private static final String EDGE = SVG_ELEMENT + "[@class='lineage-link']//*[name()='path']";
+    private static final String CIRCLE = "//*[name() = 'circle']";
+    private static final String VERTICES = VERTICES_BLOCKS + CIRCLE;
+    private static final String VERTEX_BLOCK_TEMPLATE = VERTICES_BLOCKS + "[contains(., '%s')]";
+    private static final String VERTEX_TEMPLATE = VERTEX_BLOCK_TEMPLATE + CIRCLE;
+
+    private static final String LINEAGE_INFO_PANEL_LIST = "//div[@id='lineage-info-panel']" +
+        "//div[@class='col-md-3']";
+
+    private static final String LINEAGE_TITLE = LINEAGE_MODAL + "//div[@class='modal-header']/h4";
+
+    private static final String LINEAGE_LEGENDS_BLOCK = LINEAGE_MODAL +
+        "//div[@class='modal-body']/div[ul[@class='lineage-legend']]";
+    private static final String LINEAGE_LEGENDS_TITLE = LINEAGE_LEGENDS_BLOCK + "/h4";
+    private static final String LINEAGE_LEGENDS_ELEMENTS = LINEAGE_LEGENDS_BLOCK + "/ul/li";
+
+    public ProcessPage(WebDriver driver, ColoHelper helper, String entityName) {
+        super(driver, helper, EntityType.PROCESS, Process.class, entityName);
+    }
+
+    /**
+     * @param nominalTime particular instance of process, defined by it's start time
+     */
+    public void openLineage(String nominalTime) {
+        waitForElement(String.format(LINEAGE_LINK_TEMPLATE, nominalTime), DEFAULT_TIMEOUT,
+            "Lineage button didn't appear");
+        logger.info("Working with instance: " + nominalTime);
+        WebElement lineage =
+            driver.findElement(By.xpath(String.format(LINEAGE_LINK_TEMPLATE, nominalTime)));
+        logger.info("Opening lineage...");
+        lineage.click();
+        waitForElement(VERTICES, DEFAULT_TIMEOUT, "Circles not found");
+        waitForDisplayed(LINEAGE_TITLE, DEFAULT_TIMEOUT, "Lineage title not found");
+        isLineageOpened = true;
+    }
+
+    public void closeLineage() {
+        logger.info("Closing lineage...");
+        if (isLineageOpened) {
+            WebElement close = driver.findElement(By.xpath(CLOSE_LINEAGE_LINK_TEMPLATE));
+            close.click();
+            isLineageOpened = false;
+            waitForDisappear(CLOSE_LINEAGE_LINK_TEMPLATE, DEFAULT_TIMEOUT,
+                "Lineage didn't disappear");
+        }
+    }
+
+    @Override
+    public void refresh() {
+        super.refresh();
+        isLineageOpened = false;
+    }
+
+    /**
+     * @return map with instances names and their nominal start time
+     */
+    public HashMap<String, List<String>> getAllVertices() {
+        logger.info("Getting all vertices from lineage graph...");
+        HashMap<String, List<String>> map = null;
+        if (isLineageOpened) {
+            waitForElement(VERTICES_TEXT, DEFAULT_TIMEOUT,
+                "Vertices blocks with names not found");
+            List<WebElement> blocks = driver.findElements(By.xpath(VERTICES_TEXT));
+            logger.info(blocks.size() + " elements found");
+            map = new HashMap<String, List<String>>();
+            for (WebElement block : blocks) {
+                waitForElement(block, ".[contains(.,'/')]", DEFAULT_TIMEOUT,
+                    "Expecting text to contain '/' :" + block.getText());
+                String text = block.getText();
+                logger.info("Vertex: " + text);
+                String[] separate = text.split("/");
+                String name = separate[0];
+                String nominalTime = separate[1];
+                if (map.containsKey(name)) {
+                    map.get(name).add(nominalTime);
+                } else {
+                    List<String> instances = new ArrayList<String>();
+                    instances.add(nominalTime);
+                    map.put(name, instances);
+                }
+            }
+        }
+        return map;
+    }
+
+    /**
+     * @return list of all vertices names
+     */
+    public List<String> getAllVerticesNames() {
+        logger.info("Getting all vertices names from lineage graph...");
+        List<String> list = new ArrayList<String>();
+        if (isLineageOpened) {
+            waitForElement(CLOSE_LINEAGE_LINK_TEMPLATE, DEFAULT_TIMEOUT,
+                "Close Lineage button not found");
+            waitForElement(VERTICES_BLOCKS, DEFAULT_TIMEOUT,
+                "Vertices not found");
+            List<WebElement> blocks = driver.findElements(By.xpath(VERTICES_BLOCKS));
+            logger.info(blocks.size() + " elements found");
+            for (WebElement block : blocks) {
+                list.add(block.getText());
+            }
+        }
+        logger.info("Vertices: " + list);
+        return list;
+    }
+
+    /**
+     * Vertex is defined by it's entity name and particular time of it's creation
+     */
+    public void clickOnVertex(String entityName, String nominalTime) {
+        logger.info("Clicking on vertex " + entityName + '/' + nominalTime);
+        if (isLineageOpened) {
+            WebElement circle = driver.findElement(By.xpath(String.format(VERTEX_TEMPLATE,
+                entityName + '/' + nominalTime)));
+            Actions builder = new Actions(driver);
+            builder.click(circle).build().perform();
+            TimeUtil.sleepSeconds(0.5);
+        }
+    }
+
+    /**
+     * @return map of parameters from info panel and their values
+     */
+    public HashMap<String, String> getPanelInfo() {
+        logger.info("Getting info panel values...");
+        HashMap<String, String> map = null;
+        if (isLineageOpened) {
+            //check if vertex was clicked
+            waitForElement(LINEAGE_INFO_PANEL_LIST, DEFAULT_TIMEOUT, "Info panel not found");
+            List<WebElement> infoBlocks = driver.findElements(By.xpath(LINEAGE_INFO_PANEL_LIST));
+            logger.info(infoBlocks.size() + " values found");
+            map = new HashMap<String, String>();
+            for (WebElement infoBlock : infoBlocks) {
+                String text = infoBlock.getText();
+                String[] values = text.split("\n");
+                map.put(values[0], values[1]);
+            }
+        }
+        logger.info("Values: " + map);
+        return map;
+    }
+
+    /**
+     * @return map of legends as key and their names on UI as values
+     */
+    public HashMap<String, String> getLegends() {
+        HashMap<String, String> map = null;
+        if (isLineageOpened) {
+            map = new HashMap<String, String>();
+            List<WebElement> legends = driver.findElements(By.xpath(LINEAGE_LEGENDS_ELEMENTS));
+            for (WebElement legend : legends) {
+                String value = legend.getText();
+                String elementClass = legend.getAttribute("class");
+                map.put(elementClass, value);
+            }
+        }
+        return map;
+    }
+
+    /**
+     * @return the main title of Lineage UI
+     */
+    public String getLineageTitle() {
+        logger.info("Getting Lineage title...");
+        if (isLineageOpened) {
+            return driver.findElement(By.xpath(LINEAGE_TITLE)).getText();
+        } else return null;
+    }
+
+    /**
+     * @return the name of legends block
+     */
+    public String getLegendsTitle() {
+        logger.info("Getting Legends title...");
+        if (isLineageOpened) {
+            return driver.findElement(By.xpath(LINEAGE_LEGENDS_TITLE)).getText();
+        } else return null;
+    }
+
+    /**
+     * @return list of edges present on UI. Each edge presented as two 2d points - beginning and
+     * the end of the edge.
+     */
+    public List<Point[]> getEdgesFromGraph() {
+        List<Point[]> pathsEndpoints = null;
+        logger.info("Getting edges from lineage graph...");
+        if (isLineageOpened) {
+            pathsEndpoints = new ArrayList<Point[]>();
+            List<WebElement> paths = driver.findElements(By.xpath(EDGE));
+            logger.info(paths.size() + " edges found");
+            for (WebElement path : paths) {
+                String[] coordinates = path.getAttribute("d").split("[MLC,]");
+                int x = 0, y, i = 0;
+                while (i < coordinates.length) {
+                    if (!coordinates[i].isEmpty()) {
+                        x = (int) Double.parseDouble(coordinates[i]);
+                        break;
+                    } else {
+                        i++;
+                    }
+                }
+                y = (int) Double.parseDouble(coordinates[i + 1]);
+                Point startPoint = new Point(x, y);
+                x = (int) Math.round(Double.parseDouble(coordinates[coordinates.length - 2]));
+                y = (int) Math.round(Double.parseDouble(coordinates[coordinates.length - 1]));
+                Point endPoint = new Point(x, y);
+                logger.info("Edge " + startPoint + '→' + endPoint);
+                pathsEndpoints.add(new Point[]{startPoint, endPoint});
+            }
+        }
+        return pathsEndpoints;
+    }
+
+    /**
+     * @return common value for radius of every vertex (circle) on the graph
+     */
+    public int getCircleRadius() {
+        logger.info("Getting value of vertex radius...");
+        WebElement circle = driver.findElements(By.xpath(VERTICES)).get(0);
+        return Integer.parseInt(circle.getAttribute("r"));
+    }
+
+    /**
+     * Finds vertex on the graph by its name and evaluates its coordinates as 2d point
+     * @param vertex the name of vertex which point is needed
+     * @return Point(x,y) object
+     */
+    public Point getVertexEndpoint(String vertex) {
+        /** get circle of start vertex */
+        logger.info("Getting vertex coordinates...");
+        WebElement block = driver.findElement(By.xpath(String.format(VERTEX_BLOCK_TEMPLATE, vertex)));
+        String attribute = block.getAttribute("transform");
+        attribute = attribute.replaceAll("[a-zA-Z]", "");
+        String[] numbers = attribute.replaceAll("[()]", "").split(",");
+        return new Point(Integer.parseInt(numbers[0]), Integer.parseInt(numbers[1]));
+    }
+
+    /**
+     * Returns status of instance from instances panel
+     * @param instanceDate date stamp of instance
+     * @return status of instance from instances panel
+     */
+    public String getInstanceStatus(String instanceDate) {
+        waitForInstancesPanel();
+        logger.info("Getting status of " + instanceDate + " instance");
+        List<WebElement> status =
+            driver.findElements(By.xpath(String.format(INSTANCE_STATUS_TEMPLATE, instanceDate)));
+        if (status.isEmpty()) {
+            return null;
+        } else {
+            return status.get(0).getAttribute("class").replace("instance-icons instance-link-", "");
+        }
+    }
+
+    /**
+     * Checks if 'Lineage' link is present on instances panel
+     * @param instanceDate date stamp of instance
+     * @return true if link is present
+     */
+    public boolean isLineageLinkPresent(String instanceDate) {
+        waitForInstancesPanel();
+        logger.info("Checking if 'Lineage' link is present for " + instanceDate);
+        List<WebElement> lineage =
+            driver.findElements(By.xpath(String.format(LINEAGE_LINK_TEMPLATE, instanceDate)));
+        return !lineage.isEmpty();
+    }
+
+    private void waitForInstancesPanel() {
+        waitForElement(INSTANCES_PANEL, DEFAULT_TIMEOUT, "Instances panel didn't appear");
+    }
+
+    /**
+     * Checks whether vertex is terminal or not
+     * @param vertexName name of vertex
+     * @return whether it is terminal or not
+     */
+    public boolean isTerminal(String vertexName) {
+        logger.info("Checking if " + vertexName + " is 'terminal' instance");
+        waitForElement(String.format(VERTEX_TEMPLATE, vertexName), DEFAULT_TIMEOUT,
+            "Vertex not found");
+        WebElement vertex = driver.findElement(By.xpath(String.format(VERTEX_TEMPLATE, vertexName)));
+        String vertexClass = vertex.getAttribute("class");
+        return vertexClass.contains("lineage-node-terminal");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/main/resources/errorMapping.properties
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/main/resources/errorMapping.properties b/falcon-regression/merlin/src/main/resources/errorMapping.properties
new file mode 100644
index 0000000..3ed3cea
--- /dev/null
+++ b/falcon-regression/merlin/src/main/resources/errorMapping.properties
@@ -0,0 +1,25 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+NoRetrySpecified.xml=javax.xml.bind.UnmarshalException - with linked exception:[org.xml.sax.SAXParseException; lineNumber: 54; columnNumber: 67; cvc-complex-type.2.4.a: Invalid content was found starting with element 'late-process'. One of '{retry}' is expected.]
+noConcurrencyParam.xml=javax.xml.bind.UnmarshalException - with linked exception:[org.xml.sax.SAXParseException; lineNumber: 29; columnNumber: 16; cvc-complex-type.2.4.a: Invalid content was found starting with element 'execution'. One of '{concurrency}' is expected.]
+noExecutionSpecified.xml=javax.xml.bind.UnmarshalException - with linked exception:[org.xml.sax.SAXParseException; lineNumber: 29; columnNumber: 16; cvc-complex-type.2.4.a: Invalid content was found starting with element 'frequency'. One of '{execution}' is expected.]
+NoWorkflowParams.xml=javax.xml.bind.UnmarshalException - with linked exception:[org.xml.sax.SAXParseException; lineNumber: 52; columnNumber: 71; cvc-complex-type.2.4.a: Invalid content was found starting with element 'retry'. One of '{workflow}' is expected.]
+process-invalid.xml=javax.xml.bind.UnmarshalException - with linked exception:[org.xml.sax.SAXParseException; lineNumber: 2; columnNumber: 72; cvc-elt.1: Cannot find the declaration of element 'Process'.]
+inValid01_sameName.xml=inValid01_sameName already exists
+inValid02_sameName.xml=inValid02_sameName already exists
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/main/resources/log4testng.properties
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/main/resources/log4testng.properties b/falcon-regression/merlin/src/main/resources/log4testng.properties
new file mode 100644
index 0000000..fd6f966
--- /dev/null
+++ b/falcon-regression/merlin/src/main/resources/log4testng.properties
@@ -0,0 +1,28 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# log4testng will log its own behavior (generally used for debugging this package only).
+ log4testng.debug=false
+
+ # Specifies the root Loggers logging level. Will log DEBUG level and above
+ log4testng.rootLogger=DEBUG
+
+ # The org.testng.reporters.EmailableReporter Logger will log TRACE level and above
+ log4testng.logger.org.testng.reporters.EmailableReporter=TRACE
+
+ # All Logger in packages below org.testng will log WARN level and above
+ log4testng.logger.org.testng=INFO
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
new file mode 100644
index 0000000..dd901fb
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
@@ -0,0 +1,693 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.KerberosHelper;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.apache.log4j.Logger;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.List;
+
+/**
+ * test for Authorization in falcon .
+ */
+@Test(groups = "embedded")
+public class AuthorizationTest extends BaseTestClass {
+    private static final Logger LOGGER = Logger.getLogger(AuthorizationTest.class);
+
+    private ColoHelper cluster = servers.get(0);
+    private FileSystem clusterFS = serverFS.get(0);
+    private OozieClient clusterOC = serverOC.get(0);
+    private String baseTestDir = baseHDFSDir + "/AuthorizationTest";
+    private String aggregateWorkflowDir = baseTestDir + "/aggregator";
+    private String datePattern = "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String feedInputPath = baseTestDir + "/input" + datePattern;
+
+    @BeforeClass(alwaysRun = true)
+    public void uploadWorkflow() throws Exception {
+        HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup(Method method) throws Exception {
+        LOGGER.info("test name: " + method.getName());
+        Bundle bundle = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundle, cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+    }
+
+    /**
+     * U2Delete test cases.
+     */
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1SubmitU2DeleteCluster() throws Exception {
+        bundles[0].submitClusters(prism);
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        final ServiceResponse serviceResponse = prism.getClusterHelper().delete(
+            Util.URLS.DELETE_URL, bundles[0].getClusters().get(0), MerlinConstants.USER2_NAME);
+        AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
+            "Entity submitted by first user should not be deletable by second user");
+    }
+
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1SubmitU2DeleteProcess() throws Exception {
+        bundles[0].submitClusters(prism);
+        bundles[0].submitProcess(true);
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        final ServiceResponse serviceResponse = prism.getProcessHelper().delete(
+            Util.URLS.DELETE_URL, bundles[0].getProcessData(), MerlinConstants.USER2_NAME);
+        AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
+            "Entity submitted by first user should not be deletable by second user");
+    }
+
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1SubmitU2DeleteFeed() throws Exception {
+        bundles[0].submitClusters(prism);
+        bundles[0].submitFeed();
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        final ServiceResponse serviceResponse = prism.getFeedHelper().delete(
+            Util.URLS.DELETE_URL, bundles[0].getDataSets().get(0), MerlinConstants.USER2_NAME);
+        AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
+            "Entity submitted by first user should not be deletable by second user");
+    }
+
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1ScheduleU2DeleteProcess()
+        throws Exception {
+        //submit, schedule process by U1
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        //try to delete process by U2
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        final ServiceResponse serviceResponse = prism.getProcessHelper().delete(Util.URLS
+            .DELETE_URL, bundles[0].getProcessData(), MerlinConstants.USER2_NAME);
+        AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
+            "Process scheduled by first user should not be deleted by second user");
+    }
+
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1ScheduleU2DeleteFeed() throws Exception {
+        String feed = bundles[0].getInputFeedFromBundle();
+        //submit, schedule feed by U1
+        bundles[0].submitClusters(prism);
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(
+            Util.URLS.SUBMIT_AND_SCHEDULE_URL, feed));
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
+        //delete feed by U2
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        final ServiceResponse serviceResponse = prism.getFeedHelper().delete(Util.URLS
+            .DELETE_URL, feed, MerlinConstants.USER2_NAME);
+        AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
+            "Feed scheduled by first user should not be deleted by second user");
+    }
+
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1SuspendU2DeleteProcess() throws Exception {
+        //submit, schedule, suspend process by U1
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL,
+            bundles[0].getProcessData()));
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.SUSPENDED);
+        //try to delete process by U2
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        final ServiceResponse serviceResponse = prism.getProcessHelper().delete(Util.URLS
+            .DELETE_URL, bundles[0].getProcessData(), MerlinConstants.USER2_NAME);
+        AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
+            "Process suspended by first user should not be deleted by second user");
+    }
+
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1SuspendU2DeleteFeed() throws Exception {
+        String feed = bundles[0].getInputFeedFromBundle();
+        //submit, schedule, suspend feed by U1
+        bundles[0].submitClusters(prism);
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(
+            Util.URLS.SUBMIT_AND_SCHEDULE_URL, feed));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(Util.URLS.SUSPEND_URL, feed));
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
+        //delete feed by U2
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        final ServiceResponse serviceResponse = prism.getFeedHelper().delete(Util.URLS
+            .DELETE_URL, feed, MerlinConstants.USER2_NAME);
+        AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
+            "Feed scheduled by first user should not be deleted by second user");
+    }
+
+    /**
+     * U2Suspend test cases.
+     */
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1ScheduleU2SuspendFeed() throws Exception {
+        String feed = bundles[0].getInputFeedFromBundle();
+        //submit, schedule by U1
+        bundles[0].submitClusters(prism);
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(
+            Util.URLS.SUBMIT_AND_SCHEDULE_URL, feed));
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
+        //try to suspend by U2
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        final ServiceResponse serviceResponse = prism.getFeedHelper().suspend(Util.URLS
+            .SUSPEND_URL, feed, MerlinConstants.USER2_NAME);
+        AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
+            "Feed scheduled by first user should not be suspended by second user");
+    }
+
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1ScheduleU2SuspendProcess() throws Exception {
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        //try to suspend process by U2
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        final ServiceResponse serviceResponse = prism.getProcessHelper().suspend(Util.URLS
+            .SUSPEND_URL, bundles[0].getProcessData(), MerlinConstants.USER2_NAME);
+        AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
+            "Process scheduled by first user should not be suspended by second user");
+    }
+
+    /**
+     * U2Resume test cases.
+     */
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1SuspendU2ResumeFeed() throws Exception {
+        String feed = bundles[0].getInputFeedFromBundle();
+        //submit, schedule and then suspend feed by User1
+        bundles[0].submitClusters(prism);
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(
+            Util.URLS.SUBMIT_AND_SCHEDULE_URL, feed));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(Util.URLS.SUSPEND_URL, feed));
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
+        //try to resume feed by User2
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        final ServiceResponse serviceResponse = prism.getFeedHelper().resume(Util.URLS
+            .RESUME_URL, feed, MerlinConstants.USER2_NAME);
+        AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
+            "Feed suspended by first user should not be resumed by second user");
+    }
+
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1SuspendU2ResumeProcess() throws Exception {
+        //submit, schedule, suspend process by U1
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(Util.URLS.SUSPEND_URL,
+            bundles[0].getProcessData()));
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.SUSPENDED);
+        //try to resume process by U2
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        final ServiceResponse serviceResponse = prism.getProcessHelper().resume(Util.URLS
+            .RESUME_URL, bundles[0].getProcessData(), MerlinConstants.USER2_NAME);
+        AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
+            "Process suspended by first user should not be resumed by second user");
+    }
+
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1SuspendU2ResumeProcessInstances() throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 5);
+        String midTime = TimeUtil.addMinsToTime(startTime, 2);
+        LOGGER.info("Start time: " + startTime + "\tEnd time: " + endTime);
+
+        //prepare process definition
+        bundles[0].setProcessValidity(startTime, endTime);
+        bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(5);
+        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.minutes);
+        bundles[0].setInputFeedDataPath(feedInputPath);
+        bundles[0].setProcessInput("now(0,0)", "now(0,4)");
+
+        //provide necessary data for first 3 instances to run
+        LOGGER.info("Creating necessary data...");
+        String prefix = bundles[0].getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
+            TimeUtil.addMinsToTime(startTime, -2), endTime, 0);
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+
+        //submit, schedule process by U1
+        LOGGER.info("Process data: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+        bundles[0].submitFeedsScheduleProcess(prism);
+
+        //check that there are 3 running instances
+        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
+            .getProcessData()), 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+
+        //check that there are 2 waiting instances
+        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
+            .getProcessData()), 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
+
+        //3 instances should be running , other 2 should be waiting
+        InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(Util
+                .readEntityName(bundles[0].getProcessData()),
+            "?start=" + startTime + "&end=" + endTime);
+        InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0);
+
+        //suspend 3 running instances
+        r = prism.getProcessHelper().getProcessInstanceSuspend(Util
+                .readEntityName(bundles[0].getProcessData()),
+            "?start=" + startTime + "&end=" + midTime);
+        InstanceUtil.validateResponse(r, 3, 0, 3, 0, 0);
+
+        //try to resume suspended instances by U2
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        r = prism.getProcessHelper().getProcessInstanceResume(Util.readEntityName(bundles[0]
+                .getProcessData()), "?start=" + startTime + "&end=" + midTime,
+            MerlinConstants.USER2_NAME);
+
+        //the state of above 3 instances should still be suspended
+        InstanceUtil.validateResponse(r, 3, 0, 3, 0, 0);
+
+        //check the status of all instances
+        r = prism.getProcessHelper().getProcessInstanceStatus(Util
+                .readEntityName(bundles[0].getProcessData()),
+            "?start=" + startTime + "&end=" + endTime);
+        InstanceUtil.validateResponse(r, 5, 0, 3, 2, 0);
+    }
+
+    /**
+     * U2Kill test cases.
+     */
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1ScheduleU2KillProcessInstances() throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 5);
+        LOGGER.info("Start time: " + startTime + "\tEnd time: " + endTime);
+
+        //prepare process definition
+        bundles[0].setProcessValidity(startTime, endTime);
+        bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(5);
+        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.minutes);
+        bundles[0].setInputFeedDataPath(feedInputPath);
+        bundles[0].setProcessInput("now(0,0)", "now(0,4)");
+
+        //provide necessary data for first 3 instances to run
+        LOGGER.info("Creating necessary data...");
+        String prefix = bundles[0].getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
+            TimeUtil.addMinsToTime(startTime, -2), endTime, 0);
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+
+        //submit, schedule process by U1
+        LOGGER.info("Process data: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+        bundles[0].submitFeedsScheduleProcess(prism);
+
+        //check that there are 3 running instances
+        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
+            .getProcessData()), 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+
+        //3 instances should be running , other 2 should be waiting
+        InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(Util
+                .readEntityName(bundles[0].getProcessData()),
+            "?start=" + startTime + "&end=" + endTime);
+        InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0);
+
+        //try to kill all instances by U2
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        r = prism.getProcessHelper().getProcessInstanceKill(Util
+                .readEntityName(bundles[0].getProcessData()),
+            "?start=" + startTime + "&end=" + endTime, MerlinConstants.USER2_NAME);
+
+        //number of instances should be the same as before
+        InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0);
+    }
+
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1SuspendU2KillProcessInstances() throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 5);
+        String midTime = TimeUtil.addMinsToTime(startTime, 2);
+        LOGGER.info("Start time: " + startTime + "\tEnd time: " + endTime);
+
+        //prepare process definition
+        bundles[0].setProcessValidity(startTime, endTime);
+        bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(5);
+        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.minutes);
+        bundles[0].setInputFeedDataPath(feedInputPath);
+        bundles[0].setProcessInput("now(0,0)", "now(0,4)");
+
+        //provide necessary data for first 3 instances to run
+        LOGGER.info("Creating necessary data...");
+        String prefix = bundles[0].getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
+            TimeUtil.addMinsToTime(startTime, -2), endTime, 0);
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+
+        //submit, schedule process by U1
+        LOGGER.info("Process data: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+        bundles[0].submitFeedsScheduleProcess(prism);
+
+        //check that there are 3 running instances
+        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
+            .getProcessData()), 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+
+        //check that there are 2 waiting instances
+        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
+            .getProcessData()), 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
+
+        //3 instances should be running , other 2 should be waiting
+        InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(Util
+                .readEntityName(bundles[0].getProcessData()),
+            "?start=" + startTime + "&end=" + endTime);
+        InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0);
+
+        //suspend 3 running instances
+        r = prism.getProcessHelper().getProcessInstanceSuspend(Util
+                .readEntityName(bundles[0].getProcessData()),
+            "?start=" + startTime + "&end=" + midTime);
+        InstanceUtil.validateResponse(r, 3, 0, 3, 0, 0);
+
+        //try to kill all instances by U2
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        r = prism.getProcessHelper().getProcessInstanceKill(Util
+                .readEntityName(bundles[0].getProcessData()),
+            "?start=" + startTime + "&end=" + endTime, MerlinConstants.USER2_NAME);
+
+        //3 should still be suspended, 2 should be waiting
+        InstanceUtil.validateResponse(r, 5, 0, 3, 2, 0);
+    }
+
+    /**
+     * U2Rerun test cases.
+     */
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1KillSomeU2RerunAllProcessInstances()
+        throws IOException, JAXBException,
+
+        AuthenticationException, URISyntaxException, OozieClientException {
+        String startTime = TimeUtil
+            .getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 5);
+        String midTime = TimeUtil.addMinsToTime(startTime, 2);
+        LOGGER.info("Start time: " + startTime + "\tEnd time: " + endTime);
+
+        //prepare process definition
+        bundles[0].setProcessValidity(startTime, endTime);
+        bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(5);
+        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.minutes);
+        bundles[0].setInputFeedDataPath(feedInputPath);
+        bundles[0].setProcessInput("now(0,0)", "now(0,3)");
+
+        //provide necessary data for first 4 instances to run
+        LOGGER.info("Creating necessary data...");
+        String prefix = bundles[0].getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
+            TimeUtil.addMinsToTime(startTime, -2), endTime, 0);
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+
+        //submit, schedule process by U1
+        LOGGER.info("Process data: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+        bundles[0].submitFeedsScheduleProcess(prism);
+
+        //check that there are 4 running instances
+        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
+            .getProcessData()), 4, CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+
+        //4 instances should be running , 1 should be waiting
+        InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(Util
+                .readEntityName(bundles[0].getProcessData()),
+            "?start=" + startTime + "&end=" + endTime);
+        InstanceUtil.validateResponse(r, 5, 4, 0, 1, 0);
+
+        //kill 3 running instances
+        r = prism.getProcessHelper().getProcessInstanceKill(Util
+            .readEntityName(bundles[0].getProcessData()), "?start=" + startTime + "&end="
+                +
+            midTime);
+        InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
+
+        //generally 3 instances should be killed, 1 is running and 1 is waiting
+
+        //try to rerun instances by U2
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        r = prism.getProcessHelper().getProcessInstanceRerun(Util
+            .readEntityName(bundles[0].getProcessData()), "?start=" + startTime + "&end="
+                +
+            midTime, MerlinConstants.USER2_NAME);
+
+        //instances should still be killed
+        InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
+    }
+
+    /**
+     * U2Update test cases.
+     */
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1SubmitU2UpdateFeed()
+        throws URISyntaxException, IOException, AuthenticationException, JAXBException {
+        String feed = bundles[0].getInputFeedFromBundle();
+        //submit feed
+        bundles[0].submitClusters(prism);
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(Util.URLS.SUBMIT_URL, feed));
+        String definition = prism.getFeedHelper()
+            .getEntityDefinition(Util.URLS.GET_ENTITY_DEFINITION,
+                feed).getMessage();
+        Assert.assertTrue(definition.contains(Util
+                .readEntityName(feed)) && !definition.contains("(feed) not found"),
+            "Feed should be already submitted");
+        //update feed definition
+        String newFeed = Util.setFeedPathValue(feed,
+            baseHDFSDir + "/randomPath/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/");
+        //try to update feed by U2
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        final ServiceResponse serviceResponse = prism.getFeedHelper().update(feed, newFeed,
+            TimeUtil.getTimeWrtSystemTime(0),
+            MerlinConstants.USER2_NAME);
+        AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
+            "Feed submitted by first user should not be updated by second user");
+    }
+
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1ScheduleU2UpdateFeed() throws Exception {
+        String feed = bundles[0].getInputFeedFromBundle();
+        //submit and schedule feed
+        bundles[0].submitClusters(prism);
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(
+            Util.URLS.SUBMIT_AND_SCHEDULE_URL, feed));
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
+        //update feed definition
+        String newFeed = Util.setFeedPathValue(feed,
+            baseHDFSDir + "/randomPath/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/");
+        //try to update feed by U2
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        final ServiceResponse serviceResponse = prism.getFeedHelper().update(feed, newFeed,
+            TimeUtil.getTimeWrtSystemTime(0),
+            MerlinConstants.USER2_NAME);
+        AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
+            "Feed scheduled by first user should not be updated by second user");
+    }
+
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1SubmitU2UpdateProcess() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
+        String processName = bundles[0].getProcessName();
+        //submit process
+        bundles[0].submitBundle(prism);
+        String definition = prism.getProcessHelper()
+            .getEntityDefinition(Util.URLS.GET_ENTITY_DEFINITION,
+                bundles[0].getProcessData()).getMessage();
+        Assert.assertTrue(definition.contains(processName)
+                &&
+            !definition.contains("(process) not found"), "Process should be already submitted");
+        //update process definition
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2020-01-02T01:04Z");
+        //try to update process by U2
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        final ServiceResponse serviceResponse = prism.getProcessHelper().update(bundles[0]
+                .getProcessData(), bundles[0].getProcessData(),
+            TimeUtil.getTimeWrtSystemTime(0),
+            MerlinConstants.USER2_NAME);
+        AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
+            "Process submitted by first user should not be updated by second user");
+    }
+
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1ScheduleU2UpdateProcess() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
+        //submit, schedule process by U1
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        //update process definition
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2020-01-02T01:04Z");
+        //try to update process by U2
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        final ServiceResponse serviceResponse = prism.getProcessHelper().update(bundles[0]
+                .getProcessData(), bundles[0].getProcessData(),
+            TimeUtil.getTimeWrtSystemTime(0),
+            MerlinConstants.USER2_NAME);
+        AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
+            "Process scheduled by first user should not be updated by second user");
+    }
+
+    //disabled since, falcon does not have authorization https://issues.apache
+    // .org/jira/browse/FALCON-388
+    @Test(enabled = false)
+    public void u1ScheduleFeedU2ScheduleDependantProcessU1UpdateFeed() throws Exception {
+        String feed = bundles[0].getInputFeedFromBundle();
+        String process = bundles[0].getProcessData();
+        //submit both feeds
+        bundles[0].submitClusters(prism);
+        bundles[0].submitFeeds(prism);
+        //schedule input feed by U1
+        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(
+            Util.URLS.SCHEDULE_URL, feed));
+        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
+
+        //by U2 schedule process dependant on scheduled feed by U1
+        KerberosHelper.loginFromKeytab(MerlinConstants.USER2_NAME);
+        ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(Util
+            .URLS.SUBMIT_AND_SCHEDULE_URL, process, MerlinConstants.USER2_NAME);
+        AssertUtil.assertSucceeded(serviceResponse);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+
+        //get old process details
+        String oldProcessBundleId = InstanceUtil
+            .getLatestBundleID(cluster, Util.readEntityName(process), EntityType.PROCESS);
+
+        String oldProcessUser =
+            getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+
+        //get old feed details
+        String oldFeedBundleId = InstanceUtil
+            .getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED);
+
+        //update feed definition
+        String newFeed = Util.setFeedPathValue(feed,
+            baseHDFSDir + "/randomPath/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/");
+
+        //update feed by U1
+        KerberosHelper.loginFromKeytab(MerlinConstants.CURRENT_USER_NAME);
+        serviceResponse = prism.getFeedHelper().update(feed, newFeed,
+            TimeUtil.getTimeWrtSystemTime(0), MerlinConstants.CURRENT_USER_NAME);
+        AssertUtil.assertSucceeded(serviceResponse);
+
+        //new feed bundle should be created by by U1
+        OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, feed, true, false);
+
+        //new process bundle should be created by U2
+        OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, process, true, false);
+        String newProcessUser =
+            getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+        Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same");
+    }
+
+    private String getBundleUser(ColoHelper coloHelper, String entityName, EntityType entityType)
+        throws OozieClientException {
+        String newProcessBundleId = InstanceUtil.getLatestBundleID(coloHelper, entityName,
+            entityType);
+        BundleJob newProcessBundlejob =
+            coloHelper.getClusterHelper().getOozieClient().getBundleJobInfo(newProcessBundleId);
+        CoordinatorJob coordinatorJob = null;
+        for (CoordinatorJob coord : newProcessBundlejob.getCoordinators()) {
+            if (coord.getAppName().contains("DEFAULT")) {
+                coordinatorJob = coord;
+            }
+        }
+        Assert.assertNotNull(coordinatorJob);
+        return coordinatorJob.getUser();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        KerberosHelper.loginFromKeytab(MerlinConstants.CURRENT_USER_NAME);
+        removeBundles();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExp_FutureAndLatestTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExp_FutureAndLatestTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExp_FutureAndLatestTest.java
new file mode 100644
index 0000000..4fe3ebb
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExp_FutureAndLatestTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * EL Expression test.
+ */
+@Test(groups = "embedded")
+public class ELExp_FutureAndLatestTest extends BaseTestClass {
+
+    ColoHelper cluster = servers.get(0);
+    FileSystem clusterFS = serverFS.get(0);
+    OozieClient clusterOC = serverOC.get(0);
+    private String prefix;
+    private String baseTestDir = baseHDFSDir + "/ELExp_FutureAndLatest";
+    private String aggregateWorkflowDir = baseTestDir + "/aggregator";
+    private static final Logger logger = Logger.getLogger(ELExp_FutureAndLatestTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void createTestData() throws Exception {
+        logger.info("in @BeforeClass");
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+
+        Bundle b = BundleUtil.readELBundle();
+        b.generateUniqueBundle();
+        b = new Bundle(b, cluster);
+
+        String startDate = TimeUtil.getTimeWrtSystemTime(-150);
+        String endDate = TimeUtil.getTimeWrtSystemTime(100);
+
+        b.setInputFeedDataPath(
+            baseTestDir + "/ELExp_latest/testData/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+        b.setProcessWorkflow(aggregateWorkflowDir);
+        prefix = b.getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 1);
+
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setInputFeedDataPath(
+            baseTestDir + "/ELExp_latest/testData/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+        bundles[0].setInputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setInputFeedValidity("2010-04-01T00:00Z", "2015-04-01T00:00Z");
+        String processStart = TimeUtil.getTimeWrtSystemTime(-3);
+        String processEnd = TimeUtil.getTimeWrtSystemTime(8);
+        logger.info("processStart: " + processStart + " processEnd: " + processEnd);
+        bundles[0].setProcessValidity(processStart, processEnd);
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    @Test(groups = {"singleCluster"})
+    public void latestTest() throws Exception {
+        bundles[0].setDatasetInstances("latest(-3)", "latest(0)");
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+    }
+
+    @Test(groups = {"singleCluster"})
+    public void futureTest() throws Exception {
+        bundles[0].setDatasetInstances("future(0,10)", "future(3,10)");
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void deleteData() throws Exception {
+        logger.info("in @AfterClass");
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
new file mode 100644
index 0000000..5425360
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.OozieUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.TestNGException;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.text.DecimalFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
+
+/**
+ * EL Validations tests.
+ */
+@Test(groups = "embedded")
+public class ELValidationsTest extends BaseTestClass {
+
+    ColoHelper cluster = servers.get(0);
+    private static final Logger logger = Logger.getLogger(ELValidationsTest.class);
+    String aggregateWorkflowDir = baseHDFSDir + "/ELTest/aggregator";
+
+    //test for instance when process time line is subset of feed time
+    @BeforeMethod(alwaysRun = true)
+    public void testName(Method method) {
+        logger.info("test name: " + method.getName());
+    }
+
+    @Test(groups = {"0.1", "0.2"})
+    public void startInstBeforeFeedStart_today02() throws Exception {
+        String response =
+            testWith(prism, cluster, "2009-02-02T20:00Z", "2011-12-31T00:00Z", "2009-02-02T20:00Z",
+                "2011-12-31T00:00Z", "now(-40,0)", "currentYear(20,30,24,20)", false);
+        validate(response);
+    }
+
+    @Test(groups = {"singleCluster"})
+    public void startInstAfterFeedEnd() throws Exception {
+        String response = testWith(prism, cluster, null, null, null, null,
+            "currentYear(10,0,22,0)", "now(4,20)", false);
+        validate(response);
+    }
+
+    @Test(groups = {"singleCluster"})
+    public void bothInstReverse() throws Exception {
+        String response = testWith(prism, cluster, null, null, null, null,
+            "now(0,0)", "now(-100,0)", false);
+        validate(response);
+    }
+
+    @Test(groups = {"singleCluster"}, dataProvider = "EL-DP")
+    public void ExpressionLanguageTest(String startInstance, String endInstance) throws Exception {
+        testWith(prism, cluster, null, null, null, null, startInstance, endInstance, true);
+    }
+
+    @DataProvider(name = "EL-DP")
+    public Object[][] getELData(Method m) {
+        return new Object[][]{
+            {"now(-3,0)", "now(4,20)"},
+            {"yesterday(22,0)", "now(4,20)"},
+            {"currentMonth(0,22,0)", "now(4,20)"},
+            {"lastMonth(30,22,0)", "now(4,20)"},
+            {"currentYear(0,0,22,0)", "currentYear(1,1,22,0)"},
+            {"currentMonth(0,22,0)", "currentMonth(1,22,20)"},
+            {"lastMonth(30,22,0)", "lastMonth(60,2,40)"},
+            {"lastYear(12,0,22,0)", "lastYear(13,1,22,0)"}
+        };
+    }
+
+    private void validate(String response) {
+        if ((response.contains("End instance ") || response.contains("Start instance"))
+            && (response.contains("for feed") || response.contains("of feed"))
+            && (response.contains("is before the start of feed") ||
+            response.contains("is after the end of feed"))) {
+            return;
+        }
+        if (response.contains("End instance") &&
+            response.contains("is before the start instance")) {
+            return;
+        }
+        Assert.fail("Response is not valid");
+    }
+
+    private String testWith(ColoHelper prismHelper, ColoHelper server, String feedStart,
+                            String feedEnd, String processStart,
+                            String processEnd,
+                            String startInstance, String endInstance, boolean isMatch)
+        throws IOException, JAXBException, ParseException, URISyntaxException {
+        HadoopUtil.uploadDir(server.getClusterHelper().getHadoopFS(),
+            aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+        Bundle bundle = BundleUtil.readELBundle();
+        bundle = new Bundle(bundle, server.getPrefix());
+        bundle.generateUniqueBundle();
+        bundle.setProcessWorkflow(aggregateWorkflowDir);
+        if (feedStart != null && feedEnd != null) {
+            bundle.setFeedValidity(feedStart, feedEnd,
+                bundle.getInputFeedNameFromBundle
+                        ());
+        }
+        if (processStart != null && processEnd != null) {
+            bundle.setProcessValidity(processStart, processEnd);
+        }
+        try {
+            bundle.setInvalidData();
+            bundle.setDatasetInstances(startInstance, endInstance);
+            String submitResponse = bundle.submitFeedsScheduleProcess(prismHelper);
+            logger.info("processData in try is: " + Util.prettyPrintXml(bundle.getProcessData()));
+            TimeUtil.sleepSeconds(45);
+            if (isMatch)
+                getAndMatchDependencies(server, bundle);
+            return submitResponse;
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e);
+        } finally {
+            logger.info("deleting entity:");
+            bundle.deleteBundle(prismHelper);
+        }
+    }
+
+    private void getAndMatchDependencies(ColoHelper prismHelper, Bundle bundle) {
+        try {
+            List<String> bundles = null;
+            for (int i = 0; i < 10; ++i) {
+                bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(),
+                    Util.getProcessName(bundle.getProcessData()), EntityType.PROCESS);
+                if (bundles.size() > 0) {
+                    break;
+                }
+                TimeUtil.sleepSeconds(30);
+            }
+            Assert.assertTrue(bundles != null && bundles.size() > 0, "Bundle job not created.");
+            String coordID = bundles.get(0);
+            logger.info("coord id: " + coordID);
+            List<String> missingDependencies =
+                OozieUtil.getMissingDependencies(prismHelper, coordID);
+            for (int i = 0; i < 10 && missingDependencies == null; ++i) {
+                TimeUtil.sleepSeconds(30);
+                missingDependencies = OozieUtil.getMissingDependencies(prismHelper, coordID);
+            }
+            Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
+            for (String dependency : missingDependencies) {
+                logger.info("dependency from job: " + dependency);
+            }
+            Date jobNominalTime = OozieUtil.getNominalTime(prismHelper, coordID);
+            Calendar time = Calendar.getInstance();
+            time.setTime(jobNominalTime);
+            logger.info("nominalTime:" + jobNominalTime);
+            SimpleDateFormat df = new SimpleDateFormat("dd MMM yyyy HH:mm:ss");
+            logger.info(
+                "nominalTime in GMT string: " + df.format(jobNominalTime.getTime()) + " GMT");
+            TimeZone z = time.getTimeZone();
+            int offset = z.getRawOffset();
+            int offsetHrs = offset / 1000 / 60 / 60;
+            int offsetMins = offset / 1000 / 60 % 60;
+
+            logger.info("offset: " + offsetHrs);
+            logger.info("offset: " + offsetMins);
+
+            time.add(Calendar.HOUR_OF_DAY, (-offsetHrs));
+            time.add(Calendar.MINUTE, (-offsetMins));
+
+            logger.info("GMT Time: " + time.getTime());
+
+            int frequency = bundle.getInitialDatasetFrequency();
+            List<String> qaDependencyList =
+                getQADepedencyList(time, bundle.getStartInstanceProcess(time),
+                    bundle.getEndInstanceProcess(time),
+                    frequency, bundle);
+            for (String qaDependency : qaDependencyList)
+                logger.info("qa qaDependencyList: " + qaDependency);
+
+            Assert.assertTrue(matchDependencies(missingDependencies, qaDependencyList));
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e);
+        }
+    }
+
+    private boolean matchDependencies(List<String> fromJob, List<String> QAList) {
+        if (fromJob.size() != QAList.size())
+            return false;
+        for (int index = 0; index < fromJob.size(); index++) {
+            if (!fromJob.get(index).contains(QAList.get(index)))
+                return false;
+        }
+        return true;
+    }
+
+    private List<String> getQADepedencyList(Calendar nominalTime, Date startRef,
+                                            Date endRef, int frequency,
+                                            Bundle bundle) {
+        logger.info("start ref:" + startRef);
+        logger.info("end ref:" + endRef);
+        Calendar initialTime = Calendar.getInstance();
+        initialTime.setTime(startRef);
+        Calendar finalTime = Calendar.getInstance();
+
+        finalTime.setTime(endRef);
+        String path = bundle.getDatasetPath();
+
+        TimeZone tz = TimeZone.getTimeZone("GMT");
+        nominalTime.setTimeZone(tz);
+        logger.info("nominalTime: " + initialTime.getTime());
+        logger.info("finalTime: " + finalTime.getTime());
+        List<String> returnList = new ArrayList<String>();
+        while (!initialTime.getTime().equals(finalTime.getTime())) {
+            logger.info("initialTime: " + initialTime.getTime());
+            returnList.add(getPath(path, initialTime));
+            initialTime.add(Calendar.MINUTE, frequency);
+        }
+        returnList.add(getPath(path, initialTime));
+        Collections.reverse(returnList);
+        return returnList;
+    }
+
+    private String getPath(String path, Calendar time) {
+        if (path.contains("${YEAR}")) {
+            path = path.replaceAll("\\$\\{YEAR\\}", Integer.toString(time.get(Calendar.YEAR)));
+        }
+        if (path.contains("${MONTH}")) {
+            path = path.replaceAll("\\$\\{MONTH\\}", intToString(time.get(Calendar.MONTH) + 1, 2));
+        }
+        if (path.contains("${DAY}")) {
+            path = path.replaceAll("\\$\\{DAY\\}", intToString(time.get(Calendar.DAY_OF_MONTH), 2));
+        }
+        if (path.contains("${HOUR}")) {
+            path = path.replaceAll("\\$\\{HOUR\\}", intToString(time.get(Calendar.HOUR_OF_DAY), 2));
+        }
+        if (path.contains("${MINUTE}")) {
+            path = path.replaceAll("\\$\\{MINUTE\\}", intToString(time.get(Calendar.MINUTE), 2));
+        }
+        return path;
+    }
+
+    private String intToString(int num, int digits) {
+        assert digits > 0 : "Invalid number of digits";
+
+        // create variable length array of zeros
+        char[] zeros = new char[digits];
+        Arrays.fill(zeros, '0');
+
+        // format number as String
+        DecimalFormat df = new DecimalFormat(String.valueOf(zeros));
+        return df.format(num);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
new file mode 100644
index 0000000..bf0e8d3
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Properties;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.InstancesResult.WorkflowStatus;
+import org.apache.falcon.regression.core.response.ResponseKeys;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Embedded pig script test.
+ */
+@Test(groups = "embedded")
+public class EmbeddedPigScriptTest extends BaseTestClass {
+
+    ColoHelper cluster = servers.get(0);
+    FileSystem clusterFS = serverFS.get(0);
+    OozieClient clusterOC = serverOC.get(0);
+    private String prefix;
+    String pigTestDir = baseHDFSDir + "/EmbeddedPigScriptTest";
+    String pigScriptDir = pigTestDir + "/EmbeddedPigScriptTest/pig";
+    String pigScriptLocation = pigScriptDir + "/id.pig";
+    String inputPath = pigTestDir + "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private static final Logger logger = Logger.getLogger(EmbeddedPigScriptTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void createTestData() throws Exception {
+
+        logger.info("in @BeforeClass");
+        //copy pig script
+        HadoopUtil.uploadDir(clusterFS, pigScriptDir, OSUtil.RESOURCES + "pig");
+
+        Bundle bundle = BundleUtil.readELBundle();
+        bundle.generateUniqueBundle();
+        bundle = new Bundle(bundle, cluster);
+
+        String startDate = "2010-01-02T00:40Z";
+        String endDate = "2010-01-02T01:10Z";
+
+        bundle.setInputFeedDataPath(inputPath);
+        prefix = bundle.getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+
+        List<String> dataDates =
+            TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setInputFeedDataPath(inputPath);
+        bundles[0].setOutputFeedLocationData(
+            pigTestDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+        bundles[0].setProcessWorkflow(pigScriptLocation);
+        bundles[0].setProcessInputNames("INPUT");
+        bundles[0].setProcessOutputNames("OUTPUT");
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:10Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+
+        final Process processElement = bundles[0].getProcessObject();
+        final Properties properties = new Properties();
+        final Property property = new Property();
+        property.setName("queueName");
+        property.setValue("default");
+        properties.getProperties().add(property);
+        processElement.setProperties(properties);
+        processElement.getWorkflow().setEngine(EngineType.PIG);
+        bundles[0].setProcessData(processElement.toString());
+        bundles[0].submitFeedsScheduleProcess(prism);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    @Test(groups = {"singleCluster"})
+    public void getResumedProcessInstance() throws Exception {
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        prism.getProcessHelper().suspend(URLS.SUSPEND_URL, bundles[0].getProcessData());
+        TimeUtil.sleepSeconds(15);
+        ServiceResponse status =
+            prism.getProcessHelper().getStatus(URLS.STATUS_URL, bundles[0].getProcessData());
+        Assert.assertTrue(status.getMessage().contains("SUSPENDED"), "Process not suspended.");
+        prism.getProcessHelper().resume(URLS.RESUME_URL, bundles[0].getProcessData());
+        TimeUtil.sleepSeconds(15);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        InstancesResult r = prism.getProcessHelper()
+            .getRunningInstance(URLS.INSTANCE_RUNNING,
+                Util.readEntityName(bundles[0].getProcessData()));
+        InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
+    }
+
+    @Test(groups = {"singleCluster"})
+    public void getSuspendedProcessInstance() throws Exception {
+        prism.getProcessHelper().suspend(URLS.SUSPEND_URL, bundles[0].getProcessData());
+        TimeUtil.sleepSeconds(15);
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.SUSPENDED);
+        InstancesResult r = prism.getProcessHelper()
+            .getRunningInstance(URLS.INSTANCE_RUNNING,
+                Util.readEntityName(bundles[0].getProcessData()));
+        InstanceUtil.validateSuccessWOInstances(r);
+    }
+
+    @Test(groups = {"singleCluster"})
+    public void getRunningProcessInstance() throws Exception {
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        InstancesResult r = prism.getProcessHelper()
+            .getRunningInstance(URLS.INSTANCE_RUNNING,
+                Util.readEntityName(bundles[0].getProcessData()));
+        InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
+    }
+
+    @Test(groups = {"singleCluster"})
+    public void getKilledProcessInstance() throws Exception {
+        prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+        InstancesResult r = prism.getProcessHelper()
+            .getRunningInstance(URLS.INSTANCE_RUNNING,
+                Util.readEntityName(bundles[0].getProcessData()));
+        Assert.assertEquals(r.getStatusCode(), ResponseKeys.PROCESS_NOT_FOUND,
+            "Unexpected status code");
+    }
+
+    @Test(groups = {"singleCluster"})
+    public void getSucceededProcessInstance() throws Exception {
+        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        InstancesResult r = prism.getProcessHelper()
+            .getRunningInstance(URLS.INSTANCE_RUNNING,
+                Util.readEntityName(bundles[0].getProcessData()));
+        InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
+
+        int counter = OSUtil.IS_WINDOWS ? 100 : 50;
+        InstanceUtil.waitForBundleToReachState(cluster, Util.getProcessName(bundles[0]
+            .getProcessData()), Job.Status.SUCCEEDED, counter);
+        r = prism.getProcessHelper()
+            .getRunningInstance(URLS.INSTANCE_RUNNING,
+                Util.readEntityName(bundles[0].getProcessData()));
+        InstanceUtil.validateSuccessWOInstances(r);
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void deleteData() throws Exception {
+        logger.info("in @AfterClass");
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+    }
+}