You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:13 UTC

[14/27] adding falcon-regression

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
new file mode 100644
index 0000000..e77d534
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.InstancesResult.WorkflowStatus;
+import org.apache.falcon.regression.core.response.ResponseKeys;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.Job;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Process instance suspend tests.
+ */
+@Test(groups = "embedded")
+public class ProcessInstanceSuspendTest extends BaseTestClass {
+
+    private String baseTestHDFSDir = baseHDFSDir + "/ProcessInstanceSuspendTest";
+    private String feedInputPath = baseTestHDFSDir +
+        "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String feedOutputPath =
+        baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+    private ColoHelper cluster = servers.get(0);
+    private FileSystem clusterFS = serverFS.get(0);
+    private static final Logger LOGGER = Logger.getLogger(ProcessInstanceSuspendTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void createTestData() throws Exception {
+        LOGGER.info("in @BeforeClass");
+        HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+
+        Bundle bundle = BundleUtil.readELBundle();
+        bundle = new Bundle(bundle, cluster);
+        String startDate = "2010-01-01T23:40Z";
+        String endDate = "2010-01-02T01:40Z";
+        bundle.setInputFeedDataPath(feedInputPath);
+        String prefix = bundle.getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup(Method method) throws Exception {
+        LOGGER.info("test name: " + method.getName());
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setInputFeedDataPath(feedInputPath);
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * Schedule process. Try to suspend instances with start/end parameters which are
+     * wider then process validity range. Should fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceSuspendLargeRange() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(5);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        TimeUtil.sleepSeconds(15);
+        AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        InstancesResult result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
+        InstanceUtil.validateResponse(result, 5, 5, 0, 0, 0);
+        prism.getProcessHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z");
+        result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z");
+        InstanceUtil.validateSuccessWithStatusCode(result, 400);
+    }
+
+    /**
+     * Schedule single-instance process. Wait till instance succeed. Try to suspend
+     * succeeded instance. Action should be performed successfully as indempotent action.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceSuspendSucceeded() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(1);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstanceUtil.waitTillInstanceReachState(serverOC.get(0), Util.getProcessName(bundles[0]
+            .getProcessData()), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z");
+        InstanceUtil.validateSuccessWithStatusCode(r, 0);
+    }
+
+    /**
+     * Schedule process. Check that all instances are running. Suspend them. Check that all are
+     * suspended. In every action valid time range is used.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceSuspendAll() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(5);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        TimeUtil.sleepSeconds(15);
+        AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        InstancesResult result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+        InstanceUtil.validateResponse(result, 5, 5, 0, 0, 0);
+        prism.getProcessHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+        result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+        InstanceUtil.validateResponse(result, 5, 0, 5, 0, 0);
+    }
+
+    /**
+     * Schedule process and try to perform -suspend action without date range parameters.
+     * Attempt should fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceSuspendWoParams() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(2);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()), null);
+        InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.UNPARSEABLE_DATE);
+    }
+
+    /**
+     * Schedule process with 3 running and 2 waiting instances expected. Suspend ones which are
+     * running. Check that now 3 are suspended and 2 are still waiting.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceSuspendStartAndEnd() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(3);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        TimeUtil.sleepSeconds(15);
+        AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        InstancesResult result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:22Z");
+        InstanceUtil.validateResponse(result, 5, 3, 0, 2, 0);
+        prism.getProcessHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:15Z");
+        result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:22Z");
+        InstanceUtil.validateResponse(result, 5, 0, 3, 2, 0);
+    }
+
+    /**
+     * Try to suspend process which wasn't submitted and scheduled. Action should fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceSuspendNonExistent() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(5);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        InstancesResult r =
+            prism.getProcessHelper()
+                .getProcessInstanceSuspend("invalidName", "?start=2010-01-02T01:20Z");
+        if ((r.getStatusCode() != ResponseKeys.PROCESS_NOT_FOUND)) {
+            Assert.assertTrue(false);
+        }
+    }
+
+    /**
+     * Schedule process. Perform -suspend action using only -start parameter which points to start
+     * time of process. Check that only 1 instance is suspended then.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceSuspendOnlyStart() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(3);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        prism.getProcessHelper()
+            .getRunningInstance(URLS.INSTANCE_RUNNING,
+                Util.readEntityName(bundles[0].getProcessData()));
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z");
+        InstanceUtil.validateSuccessOnlyStart(r, WorkflowStatus.SUSPENDED);
+        prism.getProcessHelper()
+            .getRunningInstance(URLS.INSTANCE_RUNNING,
+                Util.readEntityName(bundles[0].getProcessData()));
+    }
+
+    /**
+     * Schedule process with number of instances running. Perform -suspend action using only -start
+     * parameter with value which points to expected last time of instantiation. Check that only
+     * the last instance is suspended.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceSuspendSuspendLast() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(5);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        InstancesResult result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+        InstanceUtil.validateResponse(result, 5, 5, 0, 0, 0);
+        prism.getProcessHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:20Z");
+        result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+        InstanceUtil.validateResponse(result, 5, 4, 1, 0, 0);
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void deleteData() throws Exception {
+        LOGGER.info("in @AfterClass");
+        Bundle bundle = BundleUtil.readELBundle();
+        bundle = new Bundle(bundle, cluster);
+        bundle.setInputFeedDataPath(feedInputPath);
+        String prefix = bundle.getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
new file mode 100644
index 0000000..ef6860b
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job.Status;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Tests with process lib folder detached from workflow.xml
+ */
+@Test(groups = "embedded")
+public class ProcessLibPathTest extends BaseTestClass {
+
+    ColoHelper cluster = servers.get(0);
+    FileSystem clusterFS = serverFS.get(0);
+    String testLibDir = baseHDFSDir + "/ProcessLibPath/TestLib";
+    private static final Logger logger = Logger.getLogger(ProcessLibPathTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void createTestData() throws Exception {
+
+        logger.info("in @BeforeClass");
+        //common lib for both test cases
+        HadoopUtil.uploadDir(clusterFS, testLibDir, OSUtil.RESOURCES_OOZIE + "lib");
+
+        Bundle b = BundleUtil.readELBundle();
+        b.generateUniqueBundle();
+        b = new Bundle(b, cluster);
+
+        String startDate = "2010-01-01T22:00Z";
+        String endDate = "2010-01-02T03:00Z";
+
+        b.setInputFeedDataPath(baseHDFSDir + "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+        String prefix = b.getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+    }
+
+
+    @BeforeMethod(alwaysRun = true)
+    public void testName(Method method) throws Exception {
+        logger.info("test name: " + method.getName());
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setInputFeedDataPath(baseHDFSDir + "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(
+            baseHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+        bundles[0].setProcessConcurrency(1);
+        bundles[0].setProcessLibPath(testLibDir);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * Test which test a process with no lib folder in workflow location
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void setDifferentLibPathWithNoLibFolderInWorkflowfLocaltion() throws Exception {
+        String workflowDir = testLibDir + "/aggregatorLib1/";
+        HadoopUtil.uploadDir(clusterFS, workflowDir, OSUtil.RESOURCES_OOZIE);
+        HadoopUtil.deleteDirIfExists(workflowDir + "/lib", clusterFS);
+        logger.info("processData: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstanceUtil
+            .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+    }
+
+    /**
+     * Test which test a process with wrong jar in lib folder in workflow location
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void setDifferentLibPathWithWrongJarInWorkflowLib() throws Exception {
+        String workflowDir = testLibDir + "/aggregatorLib2/";
+        HadoopUtil.uploadDir(clusterFS, workflowDir, OSUtil.RESOURCES_OOZIE);
+        HadoopUtil.recreateDir(clusterFS, workflowDir + "/lib");
+        HadoopUtil.copyDataToFolder(clusterFS, workflowDir + "/lib",
+            OSUtil.RESOURCES + "ivory-oozie-lib-0.1.jar");
+        logger.info("processData: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstanceUtil
+            .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TestngListener.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TestngListener.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TestngListener.java
new file mode 100644
index 0000000..64febac
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TestngListener.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.testHelper.BaseUITestClass;
+import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
+import org.openqa.selenium.OutputType;
+import org.openqa.selenium.TakesScreenshot;
+import org.testng.ITestContext;
+import org.testng.ITestListener;
+import org.testng.ITestResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class TestngListener implements ITestListener {
+    private static final Logger logger = Logger.getLogger(TestngListener.class);
+
+    @Override
+    public void onTestStart(ITestResult result) {
+        logLine();
+        logger.info(
+            String.format("Testing going to start for: %s.%s %s", result.getTestClass().getName(),
+                result.getName(), Arrays.toString(result.getParameters())));
+        NDC.push(result.getName());
+    }
+
+    private void logLine() {
+        logger.info(
+            "-----------------------------------------------------------------------------------------------");
+    }
+
+    private void logEndOfTest(ITestResult result, String outcome) {
+        logger.info(
+            String.format("Testing going to end for: %s.%s(%s) %s", result.getTestClass().getName(),
+                result.getName(), Arrays.toString(result.getParameters()), outcome));
+        NDC.pop();
+        logLine();
+    }
+
+    @Override
+    public void onTestSuccess(ITestResult result) {
+        logEndOfTest(result, "SUCCESS");
+    }
+
+    @Override
+    public void onTestFailure(ITestResult result) {
+        logEndOfTest(result, "FAILED");
+        if (BaseUITestClass.getDRIVER() != null) {
+            byte[] scrFile = ((TakesScreenshot)BaseUITestClass.getDRIVER()).getScreenshotAs
+                    (OutputType.BYTES);
+            try {
+                String filename = OSUtil.getPath("target", "surefire-reports", "screenshots", String.format("%s.%s.png",
+                        result.getTestClass().getRealClass().getSimpleName(), result.getName()));
+                FileUtils.writeByteArrayToFile(new File(filename), scrFile);
+            } catch (IOException e) {
+                logger.info("Saving screenshot FAILED: " + e.getCause());
+            }
+        }
+
+        logger.info(ExceptionUtils.getStackTrace(result.getThrowable()));
+        logLine();
+    }
+
+    @Override
+    public void onTestSkipped(ITestResult result) {
+        logEndOfTest(result, "SKIPPED");
+    }
+
+    @Override
+    public void onTestFailedButWithinSuccessPercentage(ITestResult result) {
+        logEndOfTest(result, "TestFailedButWithinSuccessPercentage");
+    }
+
+    @Override
+    public void onStart(ITestContext context) {
+    }
+
+    @Override
+    public void onFinish(ITestContext context) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
new file mode 100644
index 0000000..32062ba
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
@@ -0,0 +1,649 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.hcat;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.regression.Entities.FeedMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HCatUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.api.HCatAddPartitionDesc;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.HCatCreateTableDesc;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Test(groups = "embedded")
+public class HCatProcessTest extends BaseTestClass {
+    private static final Logger logger = Logger.getLogger(HCatProcessTest.class);
+    ColoHelper cluster = servers.get(0);
+    FileSystem clusterFS = serverFS.get(0);
+    OozieClient clusterOC = serverOC.get(0);
+    HCatClient clusterHC;
+
+    final String testDir = "/HCatProcessTest";
+    final String baseTestHDFSDir = baseHDFSDir + testDir;
+    String hiveScriptDir = baseTestHDFSDir + "/hive";
+    String hiveScriptFile = hiveScriptDir + "/script.hql";
+    String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+    String hiveScriptFileNonHCatInput = hiveScriptDir + "/script_non_hcat_input.hql";
+    String hiveScriptFileNonHCatOutput = hiveScriptDir + "/script_non_hcat_output.hql";
+    String hiveScriptTwoHCatInputOneHCatOutput =
+        hiveScriptDir + "/script_two_hcat_input_one_hcat_output.hql";
+    String hiveScriptOneHCatInputTwoHCatOutput =
+        hiveScriptDir + "/script_one_hcat_input_two_hcat_output.hql";
+    String hiveScriptTwoHCatInputTwoHCatOutput =
+        hiveScriptDir + "/script_two_hcat_input_two_hcat_output.hql";
+    final String inputHDFSDir = baseTestHDFSDir + "/input";
+    final String inputHDFSDir2 = baseTestHDFSDir + "/input2";
+    final String outputHDFSDir = baseTestHDFSDir + "/output";
+    final String outputHDFSDir2 = baseTestHDFSDir + "/output2";
+
+    final String dbName = "default";
+    final String inputTableName = "hcatprocesstest_input_table";
+    final String inputTableName2 = "hcatprocesstest_input_table2";
+    final String outputTableName = "hcatprocesstest_output_table";
+    final String outputTableName2 = "hcatprocesstest_output_table2";
+    public static final String col1Name = "id";
+    public static final String col2Name = "value";
+    public static final String partitionColumn = "dt";
+
+    private static final String hcatDir = OSUtil.getPath("src", "test", "resources", "hcat");
+    private static final String localHCatData = OSUtil.getPath(hcatDir, "data");
+    private static final String hiveScript = OSUtil.getPath(hcatDir, "hivescript");
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp() throws Exception {
+        clusterHC = cluster.getClusterHelper().getHCatClient();
+        bundles[0] = BundleUtil.readHCatBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setProcessWorkflow(hiveScriptFile, EngineType.HIVE);
+        bundles[0].setClusterInterface(Interfacetype.REGISTRY,
+            cluster.getClusterHelper().getHCatEndpoint());
+
+        HadoopUtil.deleteDirIfExists(baseTestHDFSDir, clusterFS);
+        HadoopUtil.uploadDir(clusterFS, hiveScriptDir, hiveScript);
+        HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+        HadoopUtil.recreateDir(clusterFS, outputHDFSDir);
+        HadoopUtil.recreateDir(clusterFS, outputHDFSDir2);
+        clusterHC.dropTable(dbName, inputTableName, true);
+        clusterHC.dropTable(dbName, inputTableName2, true);
+        clusterHC.dropTable(dbName, outputTableName, true);
+        clusterHC.dropTable(dbName, outputTableName2, true);
+    }
+
+    @DataProvider
+    public String[][] generateSeparators() {
+        //disabling till FALCON-372 is fixed
+        //return new String[][] {{"-"}, {"/"}};
+        return new String[][]{{"-"},};
+    }
+
+    @Test(dataProvider = "generateSeparators")
+    public void OneHCatInputOneHCatOutput(String separator) throws Exception {
+        /* upload data and create partition */
+        final String startDate = "2010-01-01T20:00Z";
+        final String endDate = "2010-01-02T04:00Z";
+        final String datePattern =
+            StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH"}, separator);
+        List<String> dataDates = getDatesList(startDate, endDate, datePattern, 60);
+
+        final List<String> dataset = HadoopUtil
+            .flattenAndPutDataInFolder(clusterFS, localHCatData, inputHDFSDir, dataDates);
+
+        ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+        cols.add(HCatUtil.getStringSchema(col1Name, col1Name + " comment"));
+        cols.add(HCatUtil.getStringSchema(col2Name, col2Name + " comment"));
+        ArrayList<HCatFieldSchema> partitionCols = new ArrayList<HCatFieldSchema>();
+
+        partitionCols.add(HCatUtil.getStringSchema(partitionColumn, partitionColumn + " partition"));
+        clusterHC.createTable(HCatCreateTableDesc
+            .create(dbName, inputTableName, cols)
+            .partCols(partitionCols)
+            .ifNotExists(true)
+            .isTableExternal(true)
+            .location(inputHDFSDir)
+            .build());
+
+        clusterHC.createTable(HCatCreateTableDesc
+            .create(dbName, outputTableName, cols)
+            .partCols(partitionCols)
+            .ifNotExists(true)
+            .isTableExternal(true)
+            .location(outputHDFSDir)
+            .build());
+
+        addPartitionsToTable(dataDates, dataset, "dt", dbName, inputTableName);
+
+        final String tableUriPartitionFragment = StringUtils.join(
+            new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator);
+        String inputTableUri =
+            "catalog:" + dbName + ":" + inputTableName + tableUriPartitionFragment;
+        bundles[0].setInputFeedTableUri(inputTableUri);
+        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setInputFeedValidity(startDate, endDate);
+        String outputTableUri =
+            "catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment;
+        bundles[0].setOutputFeedTableUri(outputTableUri);
+        bundles[0].setOutputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setOutputFeedValidity(startDate, endDate);
+
+        bundles[0].setProcessValidity(startDate, endDate);
+        bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
+        bundles[0].submitFeedsScheduleProcess();
+
+        InstanceUtil.waitTillInstanceReachState(
+            clusterOC, bundles[0].getProcessName(), 1, CoordinatorAction.Status.SUCCEEDED,
+            EntityType.PROCESS);
+
+        AssertUtil.checkContentSize(inputHDFSDir + "/" + dataDates.get(0),
+            outputHDFSDir + "/dt=" + dataDates.get(0), clusterFS);
+    }
+
+    @Test(dataProvider = "generateSeparators")
+    public void TwoHCatInputOneHCatOutput(String separator) throws Exception {
+        /* upload data and create partition */
+        final String startDate = "2010-01-01T20:00Z";
+        final String endDate = "2010-01-02T04:00Z";
+        final String datePattern =
+            StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH"}, separator);
+        List<String> dataDates = getDatesList(startDate, endDate, datePattern, 60);
+
+        final List<String> dataset = HadoopUtil
+            .flattenAndPutDataInFolder(clusterFS, localHCatData, inputHDFSDir, dataDates);
+        final List<String> dataset2 = HadoopUtil
+            .flattenAndPutDataInFolder(clusterFS, localHCatData, inputHDFSDir2, dataDates);
+
+        ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+        cols.add(HCatUtil.getStringSchema(col1Name, col1Name + " comment"));
+        cols.add(HCatUtil.getStringSchema(col2Name, col2Name + " comment"));
+        ArrayList<HCatFieldSchema> partitionCols = new ArrayList<HCatFieldSchema>();
+
+        partitionCols.add(HCatUtil.getStringSchema(partitionColumn, partitionColumn + " partition"));
+        clusterHC.createTable(HCatCreateTableDesc
+            .create(dbName, inputTableName, cols)
+            .partCols(partitionCols)
+            .ifNotExists(true)
+            .isTableExternal(true)
+            .location(inputHDFSDir)
+            .build());
+
+        clusterHC.createTable(HCatCreateTableDesc
+            .create(dbName, inputTableName2, cols)
+            .partCols(partitionCols)
+            .ifNotExists(true)
+            .isTableExternal(true)
+            .location(inputHDFSDir2)
+            .build());
+
+        clusterHC.createTable(HCatCreateTableDesc
+            .create(dbName, outputTableName, cols)
+            .partCols(partitionCols)
+            .ifNotExists(true)
+            .isTableExternal(true)
+            .location(outputHDFSDir)
+            .build());
+
+        addPartitionsToTable(dataDates, dataset, "dt", dbName, inputTableName);
+        addPartitionsToTable(dataDates, dataset2, "dt", dbName, inputTableName2);
+
+        final String tableUriPartitionFragment = StringUtils.join(
+            new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator);
+        String inputTableUri =
+            "catalog:" + dbName + ":" + inputTableName + tableUriPartitionFragment;
+        String inputTableUri2 =
+            "catalog:" + dbName + ":" + inputTableName2 + tableUriPartitionFragment;
+        bundles[0].setInputFeedTableUri(inputTableUri);
+        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setInputFeedValidity(startDate, endDate);
+        final String inputFeed1 = bundles[0].getInputFeedFromBundle();
+        final String inputFeed2Name = "second-" + Util.readEntityName(inputFeed1);
+
+        FeedMerlin feedObj = new FeedMerlin(inputFeed1);
+        feedObj.setName(inputFeed2Name);
+        feedObj.getTable().setUri(inputTableUri2);
+
+        String inputFeed2 = feedObj.toString();
+        bundles[0].addInputFeedToBundle("inputData2", inputFeed2, 0);
+
+        String outputTableUri =
+            "catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment;
+        bundles[0].setOutputFeedTableUri(outputTableUri);
+        bundles[0].setOutputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setOutputFeedValidity(startDate, endDate);
+
+        bundles[0].setProcessValidity(startDate, endDate);
+        bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
+        bundles[0].setProcessWorkflow(hiveScriptTwoHCatInputOneHCatOutput, EngineType.HIVE);
+        bundles[0].submitFeedsScheduleProcess();
+
+        InstanceUtil.waitTillInstanceReachState(
+            clusterOC, bundles[0].getProcessName(), 1, CoordinatorAction.Status.SUCCEEDED,
+            EntityType.PROCESS);
+
+        final ContentSummary inputContentSummary =
+            clusterFS.getContentSummary(new Path(inputHDFSDir + "/" + dataDates.get(0)));
+        final ContentSummary inputContentSummary2 =
+            clusterFS.getContentSummary(new Path(inputHDFSDir2 + "/" + dataDates.get(0)));
+        final ContentSummary outputContentSummary =
+            clusterFS.getContentSummary(new Path(outputHDFSDir + "/dt=" + dataDates.get(0)));
+        logger.info("inputContentSummary = " + inputContentSummary.toString(false));
+        logger.info("inputContentSummary2 = " + inputContentSummary2.toString(false));
+        logger.info("outputContentSummary = " + outputContentSummary.toString(false));
+        Assert.assertEquals(inputContentSummary.getLength() + inputContentSummary2.getLength(),
+            outputContentSummary.getLength(),
+            "Unexpected size of the output.");
+    }
+
+    @Test(dataProvider = "generateSeparators")
+    public void OneHCatInputTwoHCatOutput(String separator) throws Exception {
+        /* upload data and create partition */
+        final String startDate = "2010-01-01T20:00Z";
+        final String endDate = "2010-01-02T04:00Z";
+        final String datePattern =
+            StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH"}, separator);
+        List<String> dataDates = getDatesList(startDate, endDate, datePattern, 60);
+
+        final List<String> dataset = HadoopUtil
+            .flattenAndPutDataInFolder(clusterFS, localHCatData, inputHDFSDir, dataDates);
+
+        ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+        cols.add(HCatUtil.getStringSchema(col1Name, col1Name + " comment"));
+        cols.add(HCatUtil.getStringSchema(col2Name, col2Name + " comment"));
+        ArrayList<HCatFieldSchema> partitionCols = new ArrayList<HCatFieldSchema>();
+
+        partitionCols.add(HCatUtil.getStringSchema(partitionColumn, partitionColumn + " partition"));
+        clusterHC.createTable(HCatCreateTableDesc
+            .create(dbName, inputTableName, cols)
+            .partCols(partitionCols)
+            .ifNotExists(true)
+            .isTableExternal(true)
+            .location(inputHDFSDir)
+            .build());
+
+        clusterHC.createTable(HCatCreateTableDesc
+            .create(dbName, outputTableName, cols)
+            .partCols(partitionCols)
+            .ifNotExists(true)
+            .isTableExternal(true)
+            .location(outputHDFSDir)
+            .build());
+
+        clusterHC.createTable(HCatCreateTableDesc
+            .create(dbName, outputTableName2, cols)
+            .partCols(partitionCols)
+            .ifNotExists(true)
+            .isTableExternal(true)
+            .location(outputHDFSDir2)
+            .build());
+
+        addPartitionsToTable(dataDates, dataset, "dt", dbName, inputTableName);
+
+        final String tableUriPartitionFragment = StringUtils.join(
+            new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator);
+        String inputTableUri =
+            "catalog:" + dbName + ":" + inputTableName + tableUriPartitionFragment;
+        bundles[0].setInputFeedTableUri(inputTableUri);
+        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setInputFeedValidity(startDate, endDate);
+        String outputTableUri =
+            "catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment;
+        String outputTableUri2 =
+            "catalog:" + dbName + ":" + outputTableName2 + tableUriPartitionFragment;
+        bundles[0].setOutputFeedTableUri(outputTableUri);
+        bundles[0].setOutputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setOutputFeedValidity(startDate, endDate);
+        final String outputFeed1 = bundles[0].getInputFeedFromBundle();
+        final String outputFeed2Name = "second-" + Util.readEntityName(outputFeed1);
+        FeedMerlin feedObj = new FeedMerlin(outputFeed1);
+        feedObj.setName(outputFeed2Name);
+        feedObj.getTable().setUri(outputTableUri2);
+        bundles[0].addOutputFeedToBundle("outputData2", feedObj.toString(), 0);
+
+        bundles[0].setProcessValidity(startDate, endDate);
+        bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
+        bundles[0].setProcessWorkflow(hiveScriptOneHCatInputTwoHCatOutput, EngineType.HIVE);
+        bundles[0].submitFeedsScheduleProcess();
+
+        InstanceUtil.waitTillInstanceReachState(
+            clusterOC, bundles[0].getProcessName(), 1, CoordinatorAction.Status.SUCCEEDED,
+            EntityType.PROCESS);
+
+        AssertUtil.checkContentSize(inputHDFSDir + "/" + dataDates.get(0),
+            outputHDFSDir + "/dt=" + dataDates.get(0), clusterFS);
+        AssertUtil.checkContentSize(inputHDFSDir + "/" + dataDates.get(0),
+            outputHDFSDir2 + "/dt=" + dataDates.get(0), clusterFS);
+    }
+
+
+    @Test(dataProvider = "generateSeparators")
+    public void TwoHCatInputTwoHCatOutput(String separator) throws Exception {
+        /* upload data and create partition */
+        final String startDate = "2010-01-01T20:00Z";
+        final String endDate = "2010-01-02T04:00Z";
+        final String datePattern =
+            StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH"}, separator);
+        List<String> dataDates = getDatesList(startDate, endDate, datePattern, 60);
+
+        final List<String> dataset = HadoopUtil
+            .flattenAndPutDataInFolder(clusterFS, localHCatData, inputHDFSDir, dataDates);
+        final List<String> dataset2 = HadoopUtil
+            .flattenAndPutDataInFolder(clusterFS, localHCatData, inputHDFSDir2, dataDates);
+
+        ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+        cols.add(HCatUtil.getStringSchema(col1Name, col1Name + " comment"));
+        cols.add(HCatUtil.getStringSchema(col2Name, col2Name + " comment"));
+        ArrayList<HCatFieldSchema> partitionCols = new ArrayList<HCatFieldSchema>();
+
+        partitionCols.add(HCatUtil.getStringSchema(partitionColumn, partitionColumn + " partition"));
+        clusterHC.createTable(HCatCreateTableDesc
+            .create(dbName, inputTableName, cols)
+            .partCols(partitionCols)
+            .ifNotExists(true)
+            .isTableExternal(true)
+            .location(inputHDFSDir)
+            .build());
+
+        clusterHC.createTable(HCatCreateTableDesc
+            .create(dbName, inputTableName2, cols)
+            .partCols(partitionCols)
+            .ifNotExists(true)
+            .isTableExternal(true)
+            .location(inputHDFSDir2)
+            .build());
+
+        clusterHC.createTable(HCatCreateTableDesc
+            .create(dbName, outputTableName, cols)
+            .partCols(partitionCols)
+            .ifNotExists(true)
+            .isTableExternal(true)
+            .location(outputHDFSDir)
+            .build());
+
+        clusterHC.createTable(HCatCreateTableDesc
+            .create(dbName, outputTableName2, cols)
+            .partCols(partitionCols)
+            .ifNotExists(true)
+            .isTableExternal(true)
+            .location(outputHDFSDir2)
+            .build());
+
+        addPartitionsToTable(dataDates, dataset, "dt", dbName, inputTableName);
+        addPartitionsToTable(dataDates, dataset2, "dt", dbName, inputTableName2);
+
+        final String tableUriPartitionFragment = StringUtils.join(
+            new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator);
+        String inputTableUri =
+            "catalog:" + dbName + ":" + inputTableName + tableUriPartitionFragment;
+        String inputTableUri2 =
+            "catalog:" + dbName + ":" + inputTableName2 + tableUriPartitionFragment;
+        bundles[0].setInputFeedTableUri(inputTableUri);
+        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setInputFeedValidity(startDate, endDate);
+        final String inputFeed1 = bundles[0].getInputFeedFromBundle();
+        final String inputFeed2Name = "second-" + Util.readEntityName(inputFeed1);
+        FeedMerlin feedObj = new FeedMerlin(inputFeed1);
+        feedObj.setName(inputFeed2Name);
+        feedObj.getTable().setUri(inputTableUri2);
+        String inputFeed2 = feedObj.toString();
+        bundles[0].addInputFeedToBundle("inputData2", inputFeed2, 0);
+
+        String outputTableUri =
+            "catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment;
+        String outputTableUri2 =
+            "catalog:" + dbName + ":" + outputTableName2 + tableUriPartitionFragment;
+        bundles[0].setOutputFeedTableUri(outputTableUri);
+        bundles[0].setOutputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setOutputFeedValidity(startDate, endDate);
+        final String outputFeed1 = bundles[0].getOutputFeedFromBundle();
+        final String outputFeed2Name = "second-" + Util.readEntityName(outputFeed1);
+        FeedMerlin feedObj2 = new FeedMerlin(outputFeed1);
+        feedObj2.setName(outputFeed2Name);
+        feedObj2.getTable().setUri(outputTableUri2);
+        String outputFeed2 = feedObj2.toString();
+        bundles[0].addOutputFeedToBundle("outputData2", outputFeed2, 0);
+        bundles[0].setProcessValidity(startDate, endDate);
+        bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
+        bundles[0].setProcessWorkflow(hiveScriptTwoHCatInputTwoHCatOutput, EngineType.HIVE);
+        bundles[0].submitFeedsScheduleProcess();
+
+        InstanceUtil.waitTillInstanceReachState(
+            clusterOC, bundles[0].getProcessName(), 1, CoordinatorAction.Status.SUCCEEDED,
+            EntityType.PROCESS);
+
+        final ContentSummary inputContentSummary =
+            clusterFS.getContentSummary(new Path(inputHDFSDir + "/" + dataDates.get(0)));
+        final ContentSummary inputContentSummary2 =
+            clusterFS.getContentSummary(new Path(inputHDFSDir2 + "/" + dataDates.get(0)));
+        final ContentSummary outputContentSummary =
+            clusterFS.getContentSummary(new Path(outputHDFSDir + "/dt=" + dataDates.get(0)));
+        final ContentSummary outputContentSummary2 =
+            clusterFS.getContentSummary(new Path(outputHDFSDir2 + "/dt=" + dataDates.get(0)));
+        logger.info("inputContentSummary = " + inputContentSummary.toString(false));
+        logger.info("inputContentSummary2 = " + inputContentSummary2.toString(false));
+        logger.info("outputContentSummary = " + outputContentSummary.toString(false));
+        logger.info("outputContentSummary2 = " + outputContentSummary2.toString(false));
+        Assert.assertEquals(inputContentSummary.getLength() + inputContentSummary2.getLength(),
+            outputContentSummary.getLength(),
+            "Unexpected size of the output.");
+        Assert.assertEquals(inputContentSummary.getLength() + inputContentSummary2.getLength(),
+            outputContentSummary2.getLength(),
+            "Unexpected size of the output.");
+    }
+
+
+    @Test(dataProvider = "generateSeparators")
+    public void OneHCatInputOneNonHCatOutput(String separator) throws Exception {
+        final String startDate = "2010-01-01T20:00Z";
+        final String endDate = "2010-01-02T04:00Z";
+        /* upload data and create partition */
+        final String datePattern =
+            StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH"}, separator);
+        List<String> dataDates = getDatesList(startDate, endDate, datePattern, 60);
+
+        final List<String> dataset = HadoopUtil
+            .flattenAndPutDataInFolder(clusterFS, localHCatData, inputHDFSDir, dataDates);
+
+        ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+        cols.add(HCatUtil.getStringSchema(col1Name, col1Name + " comment"));
+        cols.add(HCatUtil.getStringSchema(col2Name, col2Name + " comment"));
+        ArrayList<HCatFieldSchema> partitionCols = new ArrayList<HCatFieldSchema>();
+
+        partitionCols.add(HCatUtil.getStringSchema(partitionColumn, partitionColumn + " partition"));
+        clusterHC.createTable(HCatCreateTableDesc
+            .create(dbName, inputTableName, cols)
+            .partCols(partitionCols)
+            .ifNotExists(true)
+            .isTableExternal(true)
+            .location(inputHDFSDir)
+            .build());
+
+        addPartitionsToTable(dataDates, dataset, "dt", dbName, inputTableName);
+
+        final String tableUriPartitionFragment = StringUtils.join(
+            new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator);
+        String inputTableUri =
+            "catalog:" + dbName + ":" + inputTableName + tableUriPartitionFragment;
+        bundles[0].setInputFeedTableUri(inputTableUri);
+        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setInputFeedValidity(startDate, endDate);
+
+        //
+        String nonHCatFeed = BundleUtil.readELBundle().getOutputFeedFromBundle();
+        final String outputFeedName = bundles[0].getOutputFeedNameFromBundle();
+        nonHCatFeed = Util.setFeedName(nonHCatFeed, outputFeedName);
+        final List<String> clusterNames = bundles[0].getClusterNames();
+        Assert.assertEquals(clusterNames.size(), 1, "Expected only one cluster in the bundle.");
+        nonHCatFeed = Util.setClusterNameInFeed(nonHCatFeed, clusterNames.get(0), 0);
+        bundles[0].writeFeedElement(nonHCatFeed, outputFeedName);
+        bundles[0].setOutputFeedLocationData(outputHDFSDir + "/" +
+            StringUtils.join(new String[]{"${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator));
+        bundles[0].setOutputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setOutputFeedValidity(startDate, endDate);
+
+        bundles[0].setProcessValidity(startDate, endDate);
+        bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
+
+        bundles[0].setProcessWorkflow(hiveScriptFileNonHCatOutput, EngineType.HIVE);
+        bundles[0].submitFeedsScheduleProcess();
+
+        InstanceUtil.waitTillInstanceReachState(
+            clusterOC, bundles[0].getProcessName(), 1, CoordinatorAction.Status.SUCCEEDED,
+            EntityType.PROCESS);
+
+        AssertUtil.checkContentSize(inputHDFSDir + "/" + dataDates.get(0),
+            outputHDFSDir + "/" + dataDates.get(0), clusterFS);
+    }
+
+    @Test(dataProvider = "generateSeparators")
+    public void OneNonCatInputOneHCatOutput(String separator) throws Exception {
+        /* upload data and create partition */
+        final String startDate = "2010-01-01T20:00Z";
+        final String endDate = "2010-01-02T04:00Z";
+        final String datePattern =
+            StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH"}, separator);
+        List<String> dataDates = getDatesList(startDate, endDate, datePattern, 60);
+
+        final List<String> dataset = HadoopUtil.
+            flattenAndPutDataInFolder(clusterFS, localHCatData, inputHDFSDir, dataDates);
+
+        ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+        cols.add(HCatUtil.getStringSchema(col1Name, col1Name + " comment"));
+        cols.add(HCatUtil.getStringSchema(col2Name, col2Name + " comment"));
+        ArrayList<HCatFieldSchema> partitionCols = new ArrayList<HCatFieldSchema>();
+
+        partitionCols.add(HCatUtil.getStringSchema(partitionColumn, partitionColumn + " partition"));
+        clusterHC.createTable(HCatCreateTableDesc
+            .create(dbName, outputTableName, cols)
+            .partCols(partitionCols)
+            .ifNotExists(true)
+            .isTableExternal(true)
+            .location(outputHDFSDir)
+            .build());
+
+        String nonHCatFeed = BundleUtil.readELBundle().getInputFeedFromBundle();
+        final String inputFeedName = bundles[0].getInputFeedNameFromBundle();
+        nonHCatFeed = Util.setFeedName(nonHCatFeed, inputFeedName);
+        final List<String> clusterNames = bundles[0].getClusterNames();
+        Assert.assertEquals(clusterNames.size(), 1, "Expected only one cluster in the bundle.");
+        nonHCatFeed = Util.setClusterNameInFeed(nonHCatFeed, clusterNames.get(0), 0);
+        bundles[0].writeFeedElement(nonHCatFeed, inputFeedName);
+        bundles[0].setInputFeedDataPath(inputHDFSDir + "/" +
+            StringUtils.join(new String[]{"${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator));
+        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setInputFeedValidity(startDate, endDate);
+
+        final String tableUriPartitionFragment = StringUtils.join(
+            new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator);
+        String outputTableUri =
+            "catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment;
+        bundles[0].setOutputFeedTableUri(outputTableUri);
+        bundles[0].setOutputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setOutputFeedValidity(startDate, endDate);
+
+        bundles[0].setProcessWorkflow(hiveScriptFileNonHCatInput, EngineType.HIVE);
+        bundles[0].setProcessValidity(startDate, endDate);
+        bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
+        bundles[0].submitFeedsScheduleProcess();
+
+        InstanceUtil.waitTillInstanceReachState(
+            clusterOC, bundles[0].getProcessName(), 1, CoordinatorAction.Status.SUCCEEDED,
+            EntityType.PROCESS);
+
+        AssertUtil.checkContentSize(inputHDFSDir + "/" + dataDates.get(0),
+            outputHDFSDir + "/dt=" + dataDates.get(0), clusterFS);
+    }
+
+    private void addPartitionsToTable(List<String> partitions, List<String> partitionLocations,
+                                      String partitionCol,
+                                      String dbName, String tableName) throws HCatException {
+        Assert.assertEquals(partitions.size(), partitionLocations.size(),
+            "Number of locations is not same as number of partitions.");
+        final List<HCatAddPartitionDesc> partitionDesc = new ArrayList<HCatAddPartitionDesc>();
+        for (int i = 0; i < partitions.size(); ++i) {
+            final String partition = partitions.get(i);
+            final Map<String, String> onePartition = new HashMap<String, String>();
+            onePartition.put(partitionCol, partition);
+            final String partitionLoc = partitionLocations.get(i);
+            partitionDesc.add(
+                HCatAddPartitionDesc.create(dbName, tableName, partitionLoc, onePartition).build());
+        }
+        clusterHC.addPartitions(partitionDesc);
+    }
+
+    public static List<String> getDatesList(String startDate, String endDate, String datePattern,
+                                            int skipMinutes) {
+        DateTime startDateJoda = new DateTime(TimeUtil.oozieDateToDate(startDate));
+        DateTime endDateJoda = new DateTime(TimeUtil.oozieDateToDate(endDate));
+        DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+        logger.info("generating data between " + formatter.print(startDateJoda) + " and " +
+            formatter.print(endDateJoda));
+        List<String> dates = new ArrayList<String>();
+        dates.add(formatter.print(startDateJoda));
+        while (!startDateJoda.isAfter(endDateJoda)) {
+            startDateJoda = startDateJoda.plusMinutes(skipMinutes);
+            dates.add(formatter.print(startDateJoda));
+        }
+        return dates;
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
new file mode 100644
index 0000000..860e680
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.hcat;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HCatUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hive.hcatalog.api.HCatAddPartitionDesc;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.HCatCreateTableDesc;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.joda.time.format.DateTimeFormat;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Test(groups = "embedded")
+public class HCatReplicationTest extends BaseTestClass {
+
+    private static final Logger logger = Logger.getLogger(HCatReplicationTest.class);
+    ColoHelper cluster = servers.get(0);
+    FileSystem clusterFS = serverFS.get(0);
+    HCatClient clusterHC;
+
+    ColoHelper cluster2 = servers.get(1);
+    FileSystem cluster2FS = serverFS.get(1);
+    OozieClient cluster2OC = serverOC.get(1);
+    HCatClient cluster2HC;
+
+    ColoHelper cluster3 = servers.get(2);
+    FileSystem cluster3FS = serverFS.get(2);
+    OozieClient cluster3OC = serverOC.get(2);
+    HCatClient cluster3HC;
+
+    final String baseTestHDFSDir = baseHDFSDir + "/HCatReplicationTest";
+
+    final String dbName = "default";
+    private static final String localHCatData = OSUtil.getPath(OSUtil.RESOURCES, "hcat", "data");
+
+    @BeforeClass(alwaysRun = true)
+    public void beforeClass() throws IOException {
+        clusterHC = cluster.getClusterHelper().getHCatClient();
+        cluster2HC = cluster2.getClusterHelper().getHCatClient();
+        cluster3HC = cluster3.getClusterHelper().getHCatClient();
+        // create the base dir on all clusters.
+        HadoopUtil.recreateDir(serverFS, baseTestHDFSDir);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp() throws Exception {
+        Bundle bundle = BundleUtil.readHCatBundle();
+        bundles[0] = new Bundle(bundle, cluster.getPrefix());
+        bundles[0].generateUniqueBundle();
+        bundles[0].setClusterInterface(Interfacetype.REGISTRY,
+            cluster.getClusterHelper().getHCatEndpoint());
+
+        bundles[1] = new Bundle(bundle, cluster2.getPrefix());
+        bundles[1].generateUniqueBundle();
+        bundles[1].setClusterInterface(Interfacetype.REGISTRY, cluster2.getClusterHelper()
+            .getHCatEndpoint());
+
+        bundles[2] = new Bundle(bundle, cluster3.getPrefix());
+        bundles[2].generateUniqueBundle();
+        bundles[2].setClusterInterface(Interfacetype.REGISTRY, cluster3.getClusterHelper()
+            .getHCatEndpoint());
+
+    }
+
+    @DataProvider
+    public String[][] generateSeparators() {
+        //disabling till FALCON-372 is fixed
+        //return new String[][] {{"-"}, {"/"}};
+        return new String[][]{{"-"},};
+    }
+
+    // make sure oozie changes mentioned FALCON-389 are done on the clusters. Otherwise the test
+    // will fail.
+    // Noticed with hive 0.13 we need the following issues resolved to work HIVE-6848 and
+    // HIVE-6868. Also oozie share libs need to have hive jars that have these jira's resolved and
+    // the maven depenendcy you are using to run the tests has to have hcat that has these fixed.
+    @Test(dataProvider = "generateSeparators")
+    public void oneSourceOneTarget(String separator) throws Exception {
+        String tcName = "HCatReplication_oneSourceOneTarget";
+        if (separator.equals("-")) {
+            tcName += "_hyphen";
+        } else {
+            tcName += "_slash";
+        }
+        String tblName = tcName;
+        String testHdfsDir = baseTestHDFSDir + "/" + tcName;
+        HadoopUtil.recreateDir(serverFS, testHdfsDir);
+        final String startDate = "2010-01-01T20:00Z";
+        final String endDate = "2099-01-01T00:00Z";
+        final String dataEndDate = "2010-01-01T21:00Z";
+        final String tableUriPartitionFragment = StringUtils
+            .join(new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator);
+        String tableUri = "catalog:" + dbName + ":" + tblName + tableUriPartitionFragment;
+        final String datePattern =
+            StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH"}, separator);
+        // use the start date for both as this will only generate 2 partitions.
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, dataEndDate, 60,
+            DateTimeFormat.forPattern(datePattern));
+
+        final List<String> dataset = HadoopUtil.flattenAndPutDataInFolder(clusterFS,
+            localHCatData, testHdfsDir, dataDates);
+        final String col1Name = "id";
+        final String col2Name = "value";
+        final String partitionColumn = "dt";
+
+        ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+        cols.add(HCatUtil.getStringSchema(col1Name, col1Name + " comment"));
+        cols.add(HCatUtil.getStringSchema(col2Name, col2Name + " comment"));
+        ArrayList<HCatFieldSchema> partitionCols = new ArrayList<HCatFieldSchema>();
+
+        // create table on cluster 1 and add data to it.
+        partitionCols.add(HCatUtil.getStringSchema(partitionColumn, partitionColumn + " partition"));
+        createTable(clusterHC, dbName, tblName, cols, partitionCols, testHdfsDir);
+        addPartitionsToTable(dataDates, dataset, "dt", dbName, tblName, clusterHC);
+
+        // create table on target cluster.
+        createTable(cluster2HC, dbName, tblName, cols, partitionCols, testHdfsDir);
+
+        Bundle.submitCluster(bundles[0], bundles[1]);
+
+        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setInputFeedValidity(startDate, endDate);
+        bundles[0].setInputFeedTableUri(tableUri);
+
+        String feed = bundles[0].getDataSets().get(0);
+        // set the cluster 2 as the target.
+        feed = InstanceUtil.setFeedClusterWithTable(feed,
+            XmlUtil.createValidity(startDate, endDate),
+            XmlUtil.createRtention("months(9000)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+            tableUri);
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL,
+                feed)
+        );
+        TimeUtil.sleepSeconds(15);
+        //check if all coordinators exist
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+                "REPLICATION"), 1);
+
+        //replication should start, wait while it ends
+        // we will check for 2 instances so that both partitions are copied over.
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+
+        //check if data was replicated correctly
+        List<Path> cluster1ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(clusterFS, new Path(testHdfsDir));
+        logger.info("Data on source cluster: " + cluster1ReplicatedData);
+        List<Path> cluster2ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testHdfsDir));
+        logger.info("Data on target cluster: " + cluster2ReplicatedData);
+        AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
+
+    }
+
+    // make sure oozie changes mentioned FALCON-389 are done on the clusters. Otherwise the test
+    // will fail.
+    // Noticed with hive 0.13 we need the following issues resolved to work HIVE-6848 and
+    // HIVE-6868. Also oozie share libs need to have hive jars that have these jira's resolved and
+    // the maven depenendcy you are using to run the tests has to have hcat that has these fixed.
+    // This test can fail randomly because of https://issues.apache.org/jira/browse/FALCON-401
+    @Test(dataProvider = "generateSeparators")
+    public void oneSourceTwoTarget(String separator) throws Exception {
+        String tcName = "HCatReplication_oneSourceTwoTarget";
+        if (separator.equals("-")) {
+            tcName += "_hyphen";
+        } else {
+            tcName += "_slash";
+        }
+        String tblName = tcName;
+        String testHdfsDir = baseTestHDFSDir + "/" + tcName;
+        HadoopUtil.recreateDir(serverFS, testHdfsDir);
+        final String startDate = "2010-01-01T20:00Z";
+        final String endDate = "2099-01-01T00:00Z";
+        final String dataEndDate = "2010-01-01T21:00Z";
+        final String tableUriPartitionFragment = StringUtils
+            .join(new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator);
+        String tableUri = "catalog:" + dbName + ":" + tblName + tableUriPartitionFragment;
+        final String datePattern =
+            StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH"}, separator);
+        // use the start date for both as this will only generate 2 partitions.
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, dataEndDate, 60,
+            DateTimeFormat.forPattern(datePattern));
+
+        final List<String> dataset = HadoopUtil.flattenAndPutDataInFolder(clusterFS,
+            localHCatData, testHdfsDir, dataDates);
+        final String col1Name = "id";
+        final String col2Name = "value";
+        final String partitionColumn = "dt";
+
+        ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+        cols.add(HCatUtil.getStringSchema(col1Name, col1Name + " comment"));
+        cols.add(HCatUtil.getStringSchema(col2Name, col2Name + " comment"));
+        ArrayList<HCatFieldSchema> partitionCols = new ArrayList<HCatFieldSchema>();
+
+        // create table on cluster 1 and add data to it.
+        partitionCols.add(HCatUtil.getStringSchema(partitionColumn, partitionColumn + " partition"));
+        createTable(clusterHC, dbName, tblName, cols, partitionCols, testHdfsDir);
+        addPartitionsToTable(dataDates, dataset, "dt", dbName, tblName, clusterHC);
+
+        // create table on target cluster.
+        createTable(cluster2HC, dbName, tblName, cols, partitionCols, testHdfsDir);
+        createTable(cluster3HC, dbName, tblName, cols, partitionCols, testHdfsDir);
+
+        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+        bundles[0].setInputFeedValidity(startDate, endDate);
+        bundles[0].setInputFeedTableUri(tableUri);
+
+        String feed = bundles[0].getDataSets().get(0);
+        // set the cluster 2 as the target.
+        feed = InstanceUtil.setFeedClusterWithTable(feed,
+            XmlUtil.createValidity(startDate, endDate),
+            XmlUtil.createRtention("months(9000)", ActionType.DELETE),
+            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+            tableUri);
+        // set the cluster 3 as the target.
+        feed = InstanceUtil.setFeedClusterWithTable(feed,
+            XmlUtil.createValidity(startDate, endDate),
+            XmlUtil.createRtention("months(9000)", ActionType.DELETE),
+            Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.TARGET, null,
+            tableUri);
+
+        AssertUtil.assertSucceeded(
+            prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL,
+                feed)
+        );
+        TimeUtil.sleepSeconds(15);
+        //check if all coordinators exist
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+                "REPLICATION"), 1);
+
+        //check if all coordinators exist
+        Assert.assertEquals(InstanceUtil
+            .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed),
+                "REPLICATION"), 1);
+
+        //replication should start, wait while it ends
+        // we will check for 2 instances so that both partitions are copied over.
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+
+        //replication should start, wait while it ends
+        // we will check for 2 instances so that both partitions are copied over.
+        InstanceUtil.waitTillInstanceReachState(cluster3OC, Util.readEntityName(feed), 2,
+            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+
+        //check if data was replicated correctly
+        List<Path> srcData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(clusterFS, new Path(testHdfsDir));
+        logger.info("Data on source cluster: " + srcData);
+        List<Path> cluster2TargetData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testHdfsDir));
+        logger.info("Data on target cluster: " + cluster2TargetData);
+        AssertUtil.checkForListSizes(srcData, cluster2TargetData);
+        List<Path> cluster3TargetData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testHdfsDir));
+        logger.info("Data on target cluster: " + cluster3TargetData);
+        AssertUtil.checkForListSizes(srcData, cluster3TargetData);
+    }
+
+    //TODO: More tests need to be added such as
+    // Tests to make sure new partitions that are added are replicated
+    // Tests to make sure partitions that do no match the pattern are not copied
+
+    private void addPartitionsToTable(List<String> partitions, List<String> partitionLocations,
+                                      String partitionCol,
+                                      String dbName, String tableName, HCatClient hc) throws
+        HCatException {
+        Assert.assertEquals(partitions.size(), partitionLocations.size(),
+            "Number of locations is not same as number of partitions.");
+        final List<HCatAddPartitionDesc> partitionDesc = new ArrayList<HCatAddPartitionDesc>();
+        for (int i = 0; i < partitions.size(); ++i) {
+            final String partition = partitions.get(i);
+            final Map<String, String> onePartition = new HashMap<String, String>();
+            onePartition.put(partitionCol, partition);
+            final String partitionLoc = partitionLocations.get(i);
+            partitionDesc
+                .add(HCatAddPartitionDesc.create(dbName, tableName, partitionLoc, onePartition)
+                    .build());
+        }
+        logger.info("adding partitions: " + partitionDesc);
+        hc.addPartitions(partitionDesc);
+    }
+
+    private static void createTable(HCatClient hcatClient, String dbName, String tblName,
+                                    List<HCatFieldSchema> cols, List<HCatFieldSchema> partitionCols,
+                                    String hdfsDir) throws HCatException {
+        hcatClient.dropTable(dbName, tblName, true);
+        hcatClient.createTable(HCatCreateTableDesc
+            .create(dbName, tblName, cols)
+            .partCols(partitionCols)
+            .ifNotExists(true)
+            .isTableExternal(true)
+            .location(hdfsDir)
+            .build());
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+}