You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/03/01 09:25:52 UTC

[06/51] [partial] falcon git commit: FALCON-1830 Removed code source directories and updated pom

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
deleted file mode 100644
index 514fd10..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.entity.v0.feed.ActionType;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-
-/**
- * Feed instance status tests.
- */
-@Test(groups = "embedded")
-public class FeedInstanceStatusTest extends BaseTestClass {
-
-    private String baseTestDir = cleanAndGetTestDir();
-    private String feedInputPath = baseTestDir + MINUTE_DATE_PATTERN;
-    private String aggregateWorkflowDir = baseTestDir + "/aggregator";
-
-    private ColoHelper cluster2 = servers.get(1);
-    private ColoHelper cluster3 = servers.get(2);
-    private FileSystem cluster2FS = serverFS.get(1);
-    private FileSystem cluster3FS = serverFS.get(2);
-    private static final Logger LOGGER = Logger.getLogger(FeedInstanceStatusTest.class);
-
-    @BeforeClass(alwaysRun = true)
-    public void uploadWorkflow() throws Exception {
-        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-    }
-
-    @BeforeMethod(alwaysRun = true)
-    public void setup() throws Exception {
-        Bundle bundle = BundleUtil.readELBundle();
-        for (int i = 0; i < 3; i++) {
-            bundles[i] = new Bundle(bundle, servers.get(i));
-            bundles[i].generateUniqueBundle(this);
-            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
-        }
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void tearDown() {
-        removeTestClassEntities();
-    }
-
-    /**
-     * Goes through the whole feed replication workflow checking its instances status while.
-     * submitting feed, scheduling it, performing different combinations of actions like
-     * -submit, -resume, -kill, -rerun.
-     */
-    @Test(groups = {"multiCluster"})
-    public void feedInstanceStatusRunning() throws Exception {
-        bundles[0].setInputFeedDataPath(feedInputPath);
-
-        AssertUtil.assertSucceeded(prism.getClusterHelper()
-            .submitEntity(bundles[0].getClusters().get(0)));
-
-        AssertUtil.assertSucceeded(prism.getClusterHelper()
-            .submitEntity(bundles[1].getClusters().get(0)));
-
-        AssertUtil.assertSucceeded(prism.getClusterHelper()
-            .submitEntity(bundles[2].getClusters().get(0)));
-
-        String feed = bundles[0].getDataSets().get(0);
-        String feedName = Util.readEntityName(feed);
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
-        String startTime = TimeUtil.getTimeWrtSystemTime(-50);
-        final String startPlus20Min = TimeUtil.addMinsToTime(startTime, 20);
-        final String startPlus40Min = TimeUtil.addMinsToTime(startTime, 40);
-        final String startPlus100Min = TimeUtil.addMinsToTime(startTime, 100);
-
-        feed = FeedMerlin.fromString(feed)
-            .addFeedCluster(new FeedMerlin.FeedClusterBuilder(
-                Util.readEntityName(bundles[1].getClusters().get(0)))
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
-                .withClusterType(ClusterType.SOURCE)
-                .withPartition("US/${cluster.colo}")
-                .build())
-            .toString();
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(startPlus20Min,
-                    TimeUtil.addMinsToTime(startTime, 85))
-                .withClusterType(ClusterType.TARGET)
-                .build())
-            .toString();
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(startPlus40Min,
-                    TimeUtil.addMinsToTime(startTime, 110))
-                .withClusterType(ClusterType.SOURCE)
-                .withPartition("UK/${cluster.colo}")
-                .build())
-            .toString();
-
-        LOGGER.info("feed: " + Util.prettyPrintXml(feed));
-
-        //status before submit
-        prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus100Min
-                + "&end=" + TimeUtil.addMinsToTime(startTime, 120));
-
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed));
-        prism.getFeedHelper().getProcessInstanceStatus(feedName,
-            "?start=" + startTime + "&end=" + startPlus100Min);
-
-        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed));
-
-        // both replication instances
-        prism.getFeedHelper().getProcessInstanceStatus(feedName,
-            "?start=" + startTime + "&end=" + startPlus100Min);
-
-        // single instance at -30
-        prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus20Min);
-
-        //single at -10
-        prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min);
-
-        //single at 10
-        prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min);
-
-        //single at 30
-        prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min);
-
-        String postFix = "/US/" + cluster2.getClusterHelper().getColoName();
-        String prefix = bundles[0].getFeedDataPathPrefix();
-        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
-        HadoopUtil.lateDataReplenish(cluster2FS, 80, 20, prefix, postFix);
-
-        postFix = "/UK/" + cluster3.getClusterHelper().getColoName();
-        prefix = bundles[0].getFeedDataPathPrefix();
-        HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS);
-        HadoopUtil.lateDataReplenish(cluster3FS, 80, 20, prefix, postFix);
-
-        // both replication instances
-        prism.getFeedHelper().getProcessInstanceStatus(feedName,
-            "?start=" + startTime + "&end=" + startPlus100Min);
-
-        // single instance at -30
-        prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus20Min);
-
-        //single at -10
-        prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min);
-
-        //single at 10
-        prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min);
-
-        //single at 30
-        prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min);
-
-        LOGGER.info("Wait till feed goes into running ");
-
-        //suspend instances -10
-        prism.getFeedHelper().getProcessInstanceSuspend(feedName, "?start=" + startPlus40Min);
-        prism.getFeedHelper().getProcessInstanceStatus(feedName,
-            "?start=" + startPlus20Min + "&end=" + startPlus40Min);
-
-        //resuspend -10 and suspend -30 source specific
-        prism.getFeedHelper().getProcessInstanceSuspend(feedName,
-            "?start=" + startPlus20Min + "&end=" + startPlus40Min);
-        prism.getFeedHelper().getProcessInstanceStatus(feedName,
-            "?start=" + startPlus20Min + "&end=" + startPlus40Min);
-
-        //resume -10 and -30
-        prism.getFeedHelper().getProcessInstanceResume(feedName,
-            "?start=" + startPlus20Min + "&end=" + startPlus40Min);
-        prism.getFeedHelper().getProcessInstanceStatus(feedName,
-            "?start=" + startPlus20Min + "&end=" + startPlus40Min);
-
-        //get running instances
-        prism.getFeedHelper().getRunningInstance(feedName);
-
-        //rerun succeeded instance
-        prism.getFeedHelper().getProcessInstanceRerun(feedName, "?start=" + startTime);
-        prism.getFeedHelper().getProcessInstanceStatus(feedName,
-            "?start=" + startTime + "&end=" + startPlus20Min);
-
-        //kill instance
-        prism.getFeedHelper().getProcessInstanceKill(feedName,
-            "?start=" + TimeUtil.addMinsToTime(startTime, 44));
-        prism.getFeedHelper().getProcessInstanceKill(feedName, "?start=" + startTime);
-
-        //end time should be less than end of validity i.e startTime + 110
-        prism.getFeedHelper().getProcessInstanceStatus(feedName,
-            "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 110));
-
-        //rerun killed instance
-        prism.getFeedHelper().getProcessInstanceRerun(feedName, "?start=" + startTime);
-        prism.getFeedHelper().getProcessInstanceStatus(feedName,
-            "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 110));
-
-        //kill feed
-        prism.getFeedHelper().delete(feed);
-        InstancesResult responseInstance = prism.getFeedHelper().getProcessInstanceStatus(feedName,
-            "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 110));
-
-        LOGGER.info(responseInstance.getMessage());
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
deleted file mode 100644
index 5bb5e6e..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.ActionType;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.OozieClientException;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBException;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.List;
-
-/**
- * This test submits and schedules feed and then check for replication.
- * On adding further late data it checks whether the data has been replicated correctly in the given late cut-off time.
- * Assuming that late frequency set in server is 3 minutes. Although value can be changed according to requirement.
- */
-@Test(groups = "embedded")
-public class FeedLateRerunTest extends BaseTestClass {
-
-    private ColoHelper cluster1 = servers.get(0);
-    private ColoHelper cluster2 = servers.get(1);
-    private FileSystem cluster1FS = serverFS.get(0);
-    private FileSystem cluster2FS = serverFS.get(1);
-    private OozieClient cluster2OC = serverOC.get(1);
-    private String baseTestDir = cleanAndGetTestDir();
-    private String feedDataLocation = baseTestDir + "/source" + MINUTE_DATE_PATTERN;
-    private String targetPath = baseTestDir + "/target";
-    private String targetDataLocation = targetPath + MINUTE_DATE_PATTERN;
-    private static final Logger LOGGER = Logger.getLogger(FeedLateRerunTest.class);
-    private String source = null;
-    private String target = null;
-
-    @BeforeMethod(alwaysRun = true)
-    public void setUp() throws JAXBException, IOException {
-        Bundle bundle = BundleUtil.readFeedReplicationBundle();
-        bundles[0] = new Bundle(bundle, cluster1);
-        bundles[1] = new Bundle(bundle, cluster2);
-        bundles[0].generateUniqueBundle(this);
-        bundles[1].generateUniqueBundle(this);
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void tearDown() {
-        removeTestClassEntities();
-    }
-
-    @Test(dataProvider = "dataFlagProvider")
-    public void testLateRerun(boolean dataFlag)
-        throws URISyntaxException, AuthenticationException, InterruptedException, IOException,
-        OozieClientException, JAXBException {
-        Bundle.submitCluster(bundles[0], bundles[1]);
-        String startTime = TimeUtil.getTimeWrtSystemTime(0);
-        String endTime = TimeUtil.addMinsToTime(startTime, 30);
-        LOGGER.info("Time range between : " + startTime + " and " + endTime);
-
-        //configure feed
-        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
-        feed.setFilePath(feedDataLocation);
-        //erase all clusters from feed definition
-        feed.clearFeedClusters();
-        //set cluster1 as source
-        feed.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.SOURCE)
-                .build());
-        //set cluster2 as target
-        feed.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.TARGET)
-                .withDataLocation(targetDataLocation)
-                .build());
-        String entityName = feed.getName();
-
-        //submit and schedule feed
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
-
-        //check if coordinator exists
-        InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, entityName, "REPLICATION"), 1);
-
-        //Finding bundleId of replicated instance on target
-        String bundleId = OozieUtil.getLatestBundleID(cluster2OC, entityName, EntityType.FEED);
-
-        //Finding and creating missing dependencies
-        List<String> missingDependencies = getAndCreateDependencies(
-            cluster1FS, cluster1.getPrefix(), cluster2OC, bundleId, dataFlag, entityName);
-        int count = 1;
-        for (String location : missingDependencies) {
-            if (count==1) {
-                source = location;
-                count++;
-            }
-        }
-        source=splitPathFromIp(source, "8020");
-        LOGGER.info("source : " + source);
-        target = source.replace("source", "target");
-        LOGGER.info("target : " + target);
-        /* Sleep for some time ( as is defined in runtime property of server ).
-           Let the instance rerun and then it should succeed.*/
-        int sleepMins = 8;
-        for(int i=0; i < sleepMins; i++) {
-            LOGGER.info("Waiting...");
-            TimeUtil.sleepSeconds(60);
-        }
-        String bundleID = OozieUtil.getLatestBundleID(cluster2OC, entityName, EntityType.FEED);
-        OozieUtil.validateRetryAttempts(cluster2OC, bundleID, EntityType.FEED, 1);
-
-        //check if data has been replicated correctly
-        List<Path> cluster1ReplicatedData = HadoopUtil
-            .getAllFilesRecursivelyHDFS(cluster1FS, new Path(HadoopUtil.cutProtocol(source)));
-        List<Path> cluster2ReplicatedData = HadoopUtil
-            .getAllFilesRecursivelyHDFS(cluster2FS, new Path(HadoopUtil.cutProtocol(target)));
-        AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
-    }
-
-    private String splitPathFromIp(String src, String port) {
-        String reqSrc, tempSrc = "";
-        if (src.contains(":")) {
-            String[] tempPath = src.split(":");
-            for (String aTempPath : tempPath) {
-                if (aTempPath.startsWith(port)) {
-                    tempSrc = aTempPath;
-                }
-            }
-        }
-        if (tempSrc.isEmpty()) {
-            reqSrc = src;
-        } else {
-            reqSrc=tempSrc.replace(port, "");
-        }
-        return reqSrc;
-    }
-
-    /* prismHelper1 - source colo, prismHelper2 - target colo */
-    private List<String> getAndCreateDependencies(FileSystem sourceFS, String prefix, OozieClient targetOC,
-            String bundleId, boolean dataFlag, String entityName) throws OozieClientException, IOException {
-        List<String> missingDependencies = OozieUtil.getMissingDependencies(targetOC, bundleId);
-        for (int i = 0; i < 10 && missingDependencies == null; ++i) {
-            TimeUtil.sleepSeconds(30);
-            LOGGER.info("sleeping...");
-            missingDependencies = OozieUtil.getMissingDependencies(targetOC, bundleId);
-        }
-        Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
-        //print missing dependencies
-        for (String dependency : missingDependencies) {
-            LOGGER.info("dependency from job: " + dependency);
-        }
-        // Creating missing dependencies
-        HadoopUtil.createFolders(sourceFS, prefix, missingDependencies);
-        //Adding data to empty folders depending on dataFlag
-        if (dataFlag) {
-            int tempCount = 1;
-            for (String location : missingDependencies) {
-                if (tempCount==1) {
-                    LOGGER.info("Transferring data to : " + location);
-                    HadoopUtil.copyDataToFolder(sourceFS, location, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml"));
-                    tempCount++;
-                }
-            }
-        }
-        //replication should start, wait while it ends
-        InstanceUtil.waitTillInstanceReachState(targetOC, entityName, 1,
-            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
-        // Adding data for late rerun
-        int tempCounter = 1;
-        for (String dependency : missingDependencies) {
-            if (tempCounter==1) {
-                LOGGER.info("Transferring late data to : " + dependency);
-                HadoopUtil.copyDataToFolder(sourceFS, dependency,
-                    OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.properties"));
-            }
-            tempCounter++;
-        }
-        return missingDependencies;
-    }
-
-    @DataProvider(name = "dataFlagProvider")
-    private Object[][] dataFlagProvider() {
-        return new Object[][] {
-            new Object[] {true, },
-            new Object[] {false, },
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
deleted file mode 100644
index a936aa1..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
+++ /dev/null
@@ -1,581 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.ActionType;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.supportClasses.ExecResult;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.OozieClient;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBException;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * feed replication test.
- * Replicates empty directories as well as directories containing data.
- */
-@Test(groups = "embedded")
-public class FeedReplicationTest extends BaseTestClass {
-
-    private ColoHelper cluster1 = servers.get(0);
-    private ColoHelper cluster2 = servers.get(1);
-    private ColoHelper cluster3 = servers.get(2);
-    private FileSystem cluster1FS = serverFS.get(0);
-    private FileSystem cluster2FS = serverFS.get(1);
-    private FileSystem cluster3FS = serverFS.get(2);
-    private OozieClient cluster2OC = serverOC.get(1);
-    private OozieClient cluster3OC = serverOC.get(2);
-    private String baseTestDir = cleanAndGetTestDir();
-    private String sourcePath = baseTestDir + "/source";
-    private String feedDataLocation = baseTestDir + "/source" + MINUTE_DATE_PATTERN;
-    private String targetPath = baseTestDir + "/target";
-    private String targetDataLocation = targetPath + MINUTE_DATE_PATTERN;
-    private static final Logger LOGGER = Logger.getLogger(FeedReplicationTest.class);
-
-    @BeforeMethod(alwaysRun = true)
-    public void setUp() throws JAXBException, IOException {
-        Bundle bundle = BundleUtil.readFeedReplicationBundle();
-
-        bundles[0] = new Bundle(bundle, cluster1);
-        bundles[1] = new Bundle(bundle, cluster2);
-        bundles[2] = new Bundle(bundle, cluster3);
-
-        bundles[0].generateUniqueBundle(this);
-        bundles[1].generateUniqueBundle(this);
-        bundles[2].generateUniqueBundle(this);
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void tearDown() throws IOException {
-        removeTestClassEntities();
-        cleanTestsDirs();
-    }
-
-    /**
-     * Test demonstrates replication of stored data from one source cluster to one target cluster.
-     * It checks the lifecycle of replication workflow instance including its creation. When
-     * replication ends test checks if data was replicated correctly.
-     * Also checks for presence of _SUCCESS file in target directory.
-     */
-    @Test(dataProvider = "dataFlagProvider")
-    public void replicate1Source1Target(boolean dataFlag)
-        throws Exception {
-        Bundle.submitCluster(bundles[0], bundles[1]);
-        String startTime = TimeUtil.getTimeWrtSystemTime(0);
-        String endTime = TimeUtil.addMinsToTime(startTime, 5);
-        LOGGER.info("Time range between : " + startTime + " and " + endTime);
-
-        //configure feed
-        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
-        feed.setFilePath(feedDataLocation);
-        //erase all clusters from feed definition
-        feed.clearFeedClusters();
-        //set cluster1 as source
-        feed.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.SOURCE)
-                .build());
-        //set cluster2 as target
-        feed.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.TARGET)
-                .withDataLocation(targetDataLocation)
-                .build());
-        feed.withProperty("job.counter", "true");
-
-        //submit and schedule feed
-        LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
-
-        //upload necessary data
-        DateTime date = new DateTime(startTime, DateTimeZone.UTC);
-        DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'");
-        String timePattern = fmt.print(date);
-        String sourceLocation = sourcePath + "/" + timePattern + "/";
-        String targetLocation = targetPath + "/" + timePattern + "/";
-        HadoopUtil.recreateDir(cluster1FS, sourceLocation);
-
-        Path toSource = new Path(sourceLocation);
-        Path toTarget = new Path(targetLocation);
-        if (dataFlag) {
-            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
-                OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml"));
-            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
-                OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile1.txt"));
-        }
-
-        //check if coordinator exists
-        InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1);
-
-        //replication should start, wait while it ends
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed.toString()), 1,
-                CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
-
-        //check if data has been replicated correctly
-        List<Path> cluster1ReplicatedData = HadoopUtil
-                .getAllFilesRecursivelyHDFS(cluster1FS, toSource);
-        List<Path> cluster2ReplicatedData = HadoopUtil
-                .getAllFilesRecursivelyHDFS(cluster2FS, toTarget);
-
-        AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
-
-        //_SUCCESS does not exist in source
-        Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster1FS, toSource, ""), false);
-
-        //_SUCCESS should exist in target
-        Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, ""), true);
-
-        AssertUtil.assertLogMoverPath(true, Util.readEntityName(feed.toString()),
-            cluster2FS, "feed", "Success logs are not present");
-
-        ExecResult execResult = cluster1.getFeedHelper().getCLIMetrics(feed.getName());
-        AssertUtil.assertCLIMetrics(execResult, feed.getName(), 1, dataFlag);
-    }
-
-    /**
-     * Test demonstrates replication of stored data from one source cluster to two target clusters.
-     * It checks the lifecycle of replication workflow instances including their creation on both
-     * targets. When replication ends test checks if data was replicated correctly.
-     * Also checks for presence of _SUCCESS file in target directory.
-     */
-    @Test(dataProvider = "dataFlagProvider")
-    public void replicate1Source2Targets(boolean dataFlag) throws Exception {
-        Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
-        String startTime = TimeUtil.getTimeWrtSystemTime(0);
-        String endTime = TimeUtil.addMinsToTime(startTime, 5);
-        LOGGER.info("Time range between : " + startTime + " and " + endTime);
-
-        //configure feed
-        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
-        feed.setFilePath(feedDataLocation);
-        //erase all clusters from feed definition
-        feed.clearFeedClusters();
-        //set cluster1 as source
-        feed.addFeedCluster(
-                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                        .withRetention("days(1000000)", ActionType.DELETE)
-                        .withValidity(startTime, endTime)
-                        .withClusterType(ClusterType.SOURCE)
-                        .build());
-        //set cluster2 as target
-        feed.addFeedCluster(
-                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                        .withRetention("days(1000000)", ActionType.DELETE)
-                        .withValidity(startTime, endTime)
-                        .withClusterType(ClusterType.TARGET)
-                        .withDataLocation(targetDataLocation)
-                        .build());
-        //set cluster3 as target
-        feed.addFeedCluster(
-                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
-                        .withRetention("days(1000000)", ActionType.DELETE)
-                        .withValidity(startTime, endTime)
-                        .withClusterType(ClusterType.TARGET)
-                        .withDataLocation(targetDataLocation)
-                        .build());
-        feed.withProperty("job.counter", "true");
-
-        //submit and schedule feed
-        LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
-
-        //upload necessary data
-        DateTime date = new DateTime(startTime, DateTimeZone.UTC);
-        DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'");
-        String timePattern = fmt.print(date);
-        String sourceLocation = sourcePath + "/" + timePattern + "/";
-        String targetLocation = targetPath + "/" + timePattern + "/";
-        HadoopUtil.recreateDir(cluster1FS, sourceLocation);
-
-        Path toSource = new Path(sourceLocation);
-        Path toTarget = new Path(targetLocation);
-
-        if (dataFlag) {
-            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
-                OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml"));
-            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
-                OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile1.txt"));
-        }
-
-        //check if all coordinators exist
-        InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0);
-        InstanceUtil.waitTillInstancesAreCreated(cluster3OC, feed.toString(), 0);
-
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feed.getName(), "REPLICATION"), 1);
-        //replication on cluster 2 should start, wait till it ends
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
-            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
-
-        //replication on cluster 3 should start, wait till it ends
-        InstanceUtil.waitTillInstanceReachState(cluster3OC, feed.getName(), 1,
-                CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
-
-        //check if data has been replicated correctly
-        List<Path> cluster1ReplicatedData = HadoopUtil
-                .getAllFilesRecursivelyHDFS(cluster1FS, toSource);
-        List<Path> cluster2ReplicatedData = HadoopUtil
-                .getAllFilesRecursivelyHDFS(cluster2FS, toTarget);
-        List<Path> cluster3ReplicatedData = HadoopUtil
-                .getAllFilesRecursivelyHDFS(cluster3FS, toTarget);
-
-        AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
-        AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster3ReplicatedData);
-
-        //_SUCCESS does not exist in source
-        Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster1FS, toSource, ""), false);
-
-        //_SUCCESS should exist in target
-        Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, ""), true);
-        Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster3FS, toTarget, ""), true);
-
-        AssertUtil.assertLogMoverPath(true, Util.readEntityName(feed.toString()),
-            cluster2FS, "feed", "Success logs are not present");
-
-        ExecResult execResult = cluster1.getFeedHelper().getCLIMetrics(feed.getName());
-        AssertUtil.assertCLIMetrics(execResult, feed.getName(), 1, dataFlag);
-    }
-
-    /**
-     * Test demonstrates how replication depends on availability flag. Scenario includes one
-     * source and one target cluster. When feed is submitted and scheduled and data is available,
-     * feed still waits for availability flag (file which name is defined as availability flag in
-     * feed definition). As soon as mentioned file is got uploaded in data directory,
-     * replication starts and when it ends test checks if data was replicated correctly.
-     * Also checks for presence of availability flag in target directory.
-     */
-    @Test(dataProvider = "dataFlagProvider")
-    public void availabilityFlagTest(boolean dataFlag) throws Exception {
-        //replicate1Source1Target scenario + set availability flag but don't upload required file
-        Bundle.submitCluster(bundles[0], bundles[1]);
-        String startTime = TimeUtil.getTimeWrtSystemTime(0);
-        String endTime = TimeUtil.addMinsToTime(startTime, 5);
-        LOGGER.info("Time range between : " + startTime + " and " + endTime);
-
-        //configure feed
-        String availabilityFlagName = "availabilityFlag.txt";
-        String feedName = Util.readEntityName(bundles[0].getDataSets().get(0));
-        FeedMerlin feedElement = bundles[0].getFeedElement(feedName);
-        feedElement.setAvailabilityFlag(availabilityFlagName);
-        bundles[0].writeFeedElement(feedElement, feedName);
-        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
-        feed.setFilePath(feedDataLocation);
-        //erase all clusters from feed definition
-        feed.clearFeedClusters();
-        //set cluster1 as source
-        feed.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.SOURCE)
-                .build());
-        //set cluster2 as target
-        feed.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.TARGET)
-                .withDataLocation(targetDataLocation)
-                .build());
-        feed.withProperty("job.counter", "true");
-
-        //submit and schedule feed
-        LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
-
-        //upload necessary data
-        DateTime date = new DateTime(startTime, DateTimeZone.UTC);
-        DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'");
-        String timePattern = fmt.print(date);
-        String sourceLocation = sourcePath + "/" + timePattern + "/";
-        String targetLocation = targetPath + "/" + timePattern + "/";
-        HadoopUtil.recreateDir(cluster1FS, sourceLocation);
-
-        Path toSource = new Path(sourceLocation);
-        Path toTarget = new Path(targetLocation);
-        if (dataFlag) {
-            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
-                OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml"));
-            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
-                OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile1.txt"));
-        }
-
-        //check while instance is got created
-        InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0);
-
-        //check if coordinator exists
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 1);
-
-        //replication should not start even after time
-        TimeUtil.sleepSeconds(60);
-        InstancesResult r = prism.getFeedHelper().getProcessInstanceStatus(feedName,
-                "?start=" + startTime + "&end=" + endTime);
-        InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0);
-        LOGGER.info("Replication didn't start.");
-
-        //create availability flag on source
-        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.concat(OSUtil.RESOURCES, availabilityFlagName));
-
-        //check if instance become running
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
-            CoordinatorAction.Status.RUNNING, EntityType.FEED);
-
-        //wait till instance succeed
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
-                CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
-
-        //check if data was replicated correctly
-        List<Path> cluster1ReplicatedData = HadoopUtil
-                .getAllFilesRecursivelyHDFS(cluster1FS, toSource);
-        LOGGER.info("Data on source cluster: " + cluster1ReplicatedData);
-        List<Path> cluster2ReplicatedData = HadoopUtil
-                .getAllFilesRecursivelyHDFS(cluster2FS, toTarget);
-        LOGGER.info("Data on target cluster: " + cluster2ReplicatedData);
-        AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
-
-        //availabilityFlag exists in source
-        Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster1FS, toSource, availabilityFlagName), true);
-
-        //availabilityFlag should exist in target
-        Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, availabilityFlagName), true);
-
-        AssertUtil.assertLogMoverPath(true, Util.readEntityName(feed.toString()),
-            cluster2FS, "feed", "Success logs are not present");
-
-        ExecResult execResult = cluster1.getFeedHelper().getCLIMetrics(feed.getName());
-        AssertUtil.assertCLIMetrics(execResult, feed.getName(), 1, dataFlag);
-    }
-
-    /**
-     * Test for https://issues.apache.org/jira/browse/FALCON-668.
-     * Check that new DistCp options are allowed.
-     */
-    @Test
-    public void testNewDistCpOptions() throws Exception {
-        Bundle.submitCluster(bundles[0], bundles[1]);
-        String startTime = TimeUtil.getTimeWrtSystemTime(0);
-        String endTime = TimeUtil.addMinsToTime(startTime, 5);
-        LOGGER.info("Time range between : " + startTime + " and " + endTime);
-        //configure feed
-        String feedName = Util.readEntityName(bundles[0].getDataSets().get(0));
-        FeedMerlin feedElement = bundles[0].getFeedElement(feedName);
-        bundles[0].writeFeedElement(feedElement, feedName);
-        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
-        feed.setFilePath(feedDataLocation);
-        //erase all clusters from feed definition
-        feed.clearFeedClusters();
-        //set cluster1 as source
-        feed.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.SOURCE)
-                .build());
-        //set cluster2 as target
-        feed.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.TARGET)
-                .withDataLocation(targetDataLocation)
-                .build());
-        feed.withProperty("job.counter", "true");
-
-        //add custom properties to feed
-        HashMap<String, String> propMap = new HashMap<>();
-        propMap.put("overwrite", "true");
-        propMap.put("ignoreErrors", "false");
-        propMap.put("skipChecksum", "false");
-        propMap.put("removeDeletedFiles", "true");
-        propMap.put("preserveBlockSize", "true");
-        propMap.put("preserveReplicationNumber", "true");
-        propMap.put("preservePermission", "true");
-        for (Map.Entry<String, String> entry : propMap.entrySet()) {
-            feed.withProperty(entry.getKey(), entry.getValue());
-        }
-        //add custom property which shouldn't be passed to workflow
-        HashMap<String, String> unsupportedPropMap = new HashMap<>();
-        unsupportedPropMap.put("myCustomProperty", "true");
-        feed.withProperty("myCustomProperty", "true");
-
-        //upload necessary data to source
-        DateTime date = new DateTime(startTime, DateTimeZone.UTC);
-        DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'");
-        String timePattern = fmt.print(date);
-        String sourceLocation = sourcePath + "/" + timePattern + "/";
-        HadoopUtil.recreateDir(cluster1FS, sourceLocation);
-        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml"));
-        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.concat(OSUtil.NORMAL_INPUT,  "dataFile1.txt"));
-
-        //copy 2 files to target to check if they will be deleted because of removeDeletedFiles property
-        String targetLocation = targetPath + "/" + timePattern + "/";
-        cluster2FS.copyFromLocalFile(new Path(OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile3.txt")),
-            new Path(targetLocation + "dataFile3.txt"));
-
-        //submit and schedule feed
-        LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
-
-        //check while instance is got created
-        InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0);
-
-        //check if coordinator exists and replication starts
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1);
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
-            CoordinatorAction.Status.RUNNING, EntityType.FEED);
-
-        //check that properties were passed to workflow definition
-        String bundleId = OozieUtil.getLatestBundleID(cluster2OC, feedName, EntityType.FEED);
-        String coordId = OozieUtil.getReplicationCoordID(bundleId, cluster2.getFeedHelper()).get(0);
-        CoordinatorAction coordinatorAction = cluster2OC.getCoordJobInfo(coordId).getActions().get(0);
-        String wfDefinition = cluster2OC.getJobDefinition(coordinatorAction.getExternalId());
-        LOGGER.info(String.format("Definition of coordinator job action %s : \n %s \n",
-            coordinatorAction.getExternalId(), Util.prettyPrintXml(wfDefinition)));
-        Assert.assertTrue(OozieUtil.propsArePresentInWorkflow(wfDefinition, "replication", propMap),
-            "New distCp supported properties should be passed to replication args list.");
-        Assert.assertFalse(OozieUtil.propsArePresentInWorkflow(wfDefinition, "replication", unsupportedPropMap),
-            "Unsupported properties shouldn't be passed to replication args list.");
-
-        //check that replication succeeds
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
-            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
-
-        List<Path> finalFiles = HadoopUtil.getAllFilesRecursivelyHDFS(cluster2FS, new Path(targetPath));
-        Assert.assertEquals(finalFiles.size(), 2, "Only replicated files should be present on target "
-            + "because of 'removeDeletedFiles' distCp property.");
-
-        ExecResult execResult = cluster1.getFeedHelper().getCLIMetrics(feed.getName());
-        AssertUtil.assertCLIMetrics(execResult, feed.getName(), 1, true);
-    }
-
-    /**
-     * Test demonstrates failure pf replication of stored data from one source cluster to one target cluster.
-     * When replication job fails test checks if failed logs are present in staging directory or not.
-     */
-    @Test
-    public void replicate1Source1TargetFail()
-        throws Exception {
-        Bundle.submitCluster(bundles[0], bundles[1]);
-        String startTime = TimeUtil.getTimeWrtSystemTime(0);
-        String endTime = TimeUtil.addMinsToTime(startTime, 5);
-        LOGGER.info("Time range between : " + startTime + " and " + endTime);
-
-        //configure feed
-        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
-        feed.setFilePath(feedDataLocation);
-        //erase all clusters from feed definition
-        feed.clearFeedClusters();
-        //set cluster1 as source
-        feed.addFeedCluster(
-                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                        .withRetention("days(1000000)", ActionType.DELETE)
-                        .withValidity(startTime, endTime)
-                        .withClusterType(ClusterType.SOURCE)
-                        .build());
-        //set cluster2 as target
-        feed.addFeedCluster(
-                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                        .withRetention("days(1000000)", ActionType.DELETE)
-                        .withValidity(startTime, endTime)
-                        .withClusterType(ClusterType.TARGET)
-                        .withDataLocation(targetDataLocation)
-                        .build());
-
-        //submit and schedule feed
-        LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
-
-        //upload necessary data
-        DateTime date = new DateTime(startTime, DateTimeZone.UTC);
-        DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'");
-        String timePattern = fmt.print(date);
-        String sourceLocation = sourcePath + "/" + timePattern + "/";
-        String targetLocation = targetPath + "/" + timePattern + "/";
-        HadoopUtil.recreateDir(cluster1FS, sourceLocation);
-
-        Path toSource = new Path(sourceLocation);
-        Path toTarget = new Path(targetLocation);
-        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml"));
-        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile1.txt"));
-
-        //check if coordinator exists
-        InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1);
-
-        //check if instance become running
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
-                CoordinatorAction.Status.RUNNING, EntityType.FEED);
-
-        HadoopUtil.deleteDirIfExists(sourceLocation, cluster1FS);
-
-        //check if instance became killed
-        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
-                CoordinatorAction.Status.KILLED, EntityType.FEED);
-
-        AssertUtil.assertLogMoverPath(false, Util.readEntityName(feed.toString()),
-                cluster2FS, "feed", "Success logs are not present");
-    }
-
-    /* Flag value denotes whether to add data for replication or not.
-     * flag=true : add data for replication.
-     * flag=false : let empty directories be replicated.
-     */
-    @DataProvider(name = "dataFlagProvider")
-    private Object[][] dataFlagProvider() {
-        return new Object[][] {
-            new Object[] {true, },
-            new Object[] {false, },
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java
deleted file mode 100644
index ec117d7..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-/**
- * Feed resume tests.
- */
-@Test(groups = "embedded")
-public class FeedResumeTest extends BaseTestClass {
-
-    private final AbstractEntityHelper feedHelper = prism.getFeedHelper();
-    private String feed;
-    private ColoHelper cluster = servers.get(0);
-    private OozieClient clusterOC = serverOC.get(0);
-
-    @BeforeMethod(alwaysRun = true)
-    public void setup() throws Exception {
-        bundles[0] = BundleUtil.readELBundle();
-        bundles[0].generateUniqueBundle(this);
-        bundles[0] = new Bundle(bundles[0], cluster);
-        bundles[0].submitClusters(prism);
-        feed = bundles[0].getInputFeedFromBundle();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void tearDown() {
-        removeTestClassEntities();
-    }
-
-    /**
-     * Launches feed, suspends it and then resumes and checks if it got running.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void resumeSuspendedFeed() throws Exception {
-        AssertUtil.assertSucceeded(feedHelper.submitAndSchedule(feed));
-        AssertUtil.assertSucceeded(feedHelper.suspend(feed));
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
-        AssertUtil.assertSucceeded(feedHelper.resume(feed));
-        ServiceResponse response = feedHelper.getStatus(feed);
-        String colo = feedHelper.getColo();
-        Assert.assertTrue(response.getMessage().contains(colo + "/RUNNING"));
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
-    }
-
-
-    /**
-     * Tries to resume feed that wasn't submitted and scheduled. Attempt should fail.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void resumeNonExistentFeed() throws Exception {
-        AssertUtil.assertFailed(feedHelper.resume(feed));
-    }
-
-    /**
-     * Tries to resume deleted feed. Attempt should fail.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void resumeDeletedFeed() throws Exception {
-        AssertUtil.assertSucceeded(feedHelper.submitAndSchedule(feed));
-        AssertUtil.assertSucceeded(feedHelper.delete(feed));
-        AssertUtil.assertFailed(feedHelper.resume(feed));
-    }
-
-    /**
-     * Tries to resume scheduled feed which wasn't suspended. Feed status shouldn't change.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void resumeScheduledFeed() throws Exception {
-        AssertUtil.assertSucceeded(feedHelper.submitAndSchedule(feed));
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
-        AssertUtil.assertSucceeded(feedHelper.resume(feed));
-        ServiceResponse response = feedHelper.getStatus(feed);
-        String colo = feedHelper.getColo();
-        Assert.assertTrue(response.getMessage().contains(colo + "/RUNNING"));
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSLATest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSLATest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSLATest.java
deleted file mode 100644
index 28ddbd7..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSLATest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.ActionType;
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-/**
- * Feed SLA tests.
- */
-@Test(groups = "embedded")
-public class FeedSLATest extends BaseTestClass {
-
-    private ColoHelper cluster = servers.get(0);
-    private String baseTestDir = cleanAndGetTestDir();
-    private String feedInputPath = baseTestDir + "/input" + MINUTE_DATE_PATTERN;
-    private static final Logger LOGGER = Logger.getLogger(FeedSLATest.class);
-
-    private FeedMerlin feedMerlin;
-    private String startTime;
-    private String endTime;
-
-    @BeforeMethod(alwaysRun = true)
-    public void setup() throws Exception {
-        Bundle bundle = BundleUtil.readELBundle();
-        bundles[0] = new Bundle(bundle, cluster);
-        bundles[0].generateUniqueBundle(this);
-        bundles[0].setInputFeedDataPath(feedInputPath);
-
-        startTime = TimeUtil.getTimeWrtSystemTime(0);
-        endTime = TimeUtil.addMinsToTime(startTime, 120);
-        LOGGER.info("Time range between : " + startTime + " and " + endTime);
-        ServiceResponse response =
-                prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0));
-        AssertUtil.assertSucceeded(response);
-
-        feedMerlin = new FeedMerlin(bundles[0].getInputFeedFromBundle());
-        feedMerlin.setFrequency(new Frequency("1", Frequency.TimeUnit.hours));
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void tearDown() {
-        removeTestClassEntities();
-    }
-
-    /**
-     * Submit feed with correctly adjusted sla. Response should reflect success.
-     *
-     */
-
-    @Test
-    public void submitValidFeedSLA() throws Exception {
-
-        feedMerlin.clearFeedClusters();
-        feedMerlin.addFeedCluster(new FeedMerlin.FeedClusterBuilder(
-                Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .build());
-
-        //set slaLow and slaHigh
-        feedMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours), new Frequency("6", Frequency.TimeUnit.hours));
-
-        final ServiceResponse serviceResponse =
-                prism.getFeedHelper().submitEntity(feedMerlin.toString());
-        AssertUtil.assertSucceeded(serviceResponse);
-    }
-
-    /**
-     * Submit feed with slaHigh greater than  feed retention. Response should reflect failure.
-     *
-     */
-
-    @Test
-    public void submitFeedWithSLAHigherThanRetention() throws Exception {
-
-        feedMerlin.clearFeedClusters();
-        feedMerlin.addFeedCluster(new FeedMerlin.FeedClusterBuilder(
-                Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention((new Frequency("2", Frequency.TimeUnit.hours)).toString(), ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .build());
-
-        //set slaLow and slaHigh
-        feedMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours), new Frequency("6", Frequency.TimeUnit.hours));
-
-        final ServiceResponse serviceResponse =
-                prism.getFeedHelper().submitEntity(feedMerlin.toString());
-        String message = "Feed's retention limit: "
-                + feedMerlin.getClusters().getClusters().get(0).getRetention().getLimit()
-                + " of referenced cluster " + bundles[0].getClusterNames().get(0)
-                + " should be more than feed's late arrival cut-off period: "
-                + feedMerlin.getSla().getSlaHigh().getTimeUnit()
-                + "(" + feedMerlin.getSla().getSlaHigh().getFrequency() + ")"
-                + " for feed: " + bundles[0].getInputFeedNameFromBundle();
-        validate(serviceResponse, message);
-    }
-
-
-    /**
-     * Submit feed with slaHigh less than  slaLow. Response should reflect failure.
-     *
-     */
-    @Test
-    public void submitFeedWithSLAHighLowerthanSLALow() throws Exception {
-
-        feedMerlin.clearFeedClusters();
-        feedMerlin.addFeedCluster(new FeedMerlin.FeedClusterBuilder(
-                Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention((new Frequency("6", Frequency.TimeUnit.hours)).toString(), ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .build());
-
-        //set slaLow and slaHigh
-        feedMerlin.setSla(new Frequency("4", Frequency.TimeUnit.hours), new Frequency("2", Frequency.TimeUnit.hours));
-
-        final ServiceResponse serviceResponse =
-                prism.getFeedHelper().submitEntity(feedMerlin.toString());
-        String message = "slaLow of Feed: " + feedMerlin.getSla().getSlaLow().getTimeUnit() + "("
-                + feedMerlin.getSla().getSlaLow().getFrequency() + ")is greater than slaHigh: "
-                + feedMerlin.getSla().getSlaHigh().getTimeUnit() + "(" + feedMerlin.getSla().getSlaHigh().getFrequency()
-                + ") for cluster: " + bundles[0].getClusterNames().get(0);
-        validate(serviceResponse, message);
-    }
-
-    /**
-     * Submit feed with slaHigh and slaLow greater than feed retention. Response should reflect failure.
-     *
-     */
-    @Test
-    public void submitFeedWithSLAHighSLALowHigherThanRetention() throws Exception {
-
-        feedMerlin.clearFeedClusters();
-        feedMerlin.addFeedCluster(new FeedMerlin.FeedClusterBuilder(
-                Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention((new Frequency("4", Frequency.TimeUnit.hours)).toString(), ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .build());
-
-        //set slaLow and slaHigh
-        feedMerlin.setSla(new Frequency("5", Frequency.TimeUnit.hours), new Frequency("6", Frequency.TimeUnit.hours));
-
-        final ServiceResponse serviceResponse =
-                prism.getFeedHelper().submitEntity(feedMerlin.toString());
-        String message = "Feed's retention limit: "
-                + feedMerlin.getClusters().getClusters().get(0).getRetention().getLimit()
-                + " of referenced cluster " + bundles[0].getClusterNames().get(0)
-                + " should be more than feed's late arrival cut-off period: "
-                + feedMerlin.getSla().getSlaHigh().getTimeUnit() +"(" + feedMerlin.getSla().getSlaHigh().getFrequency()
-                + ")" + " for feed: " + bundles[0].getInputFeedNameFromBundle();
-        validate(serviceResponse, message);
-    }
-
-    /**
-     * Submit feed with slaHigh and slaLow having equal value. Response should reflect success.
-     *
-     */
-    @Test
-    public void submitFeedWithSameSLAHighSLALow() throws Exception {
-
-        feedMerlin.clearFeedClusters();
-        feedMerlin.addFeedCluster(new FeedMerlin.FeedClusterBuilder(
-                Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention((new Frequency("7", Frequency.TimeUnit.hours)).toString(), ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .build());
-
-        //set slaLow and slaHigh
-        feedMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours), new Frequency("3", Frequency.TimeUnit.hours));
-
-        final ServiceResponse serviceResponse =
-                prism.getFeedHelper().submitEntity(feedMerlin.toString());
-        AssertUtil.assertSucceeded(serviceResponse);
-    }
-
-    private void validate(ServiceResponse response, String message) throws Exception {
-        AssertUtil.assertFailed(response);
-        LOGGER.info("Expected message is : " + message);
-        Assert.assertTrue(response.getMessage().contains(message),
-                "Correct response was not present in feed schedule. Feed response is : "
-                        + response.getMessage());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java
deleted file mode 100644
index 79b722a..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.client.OozieClient;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-/**
- * Feed schedule tests.
- */
-@Test(groups = "embedded")
-public class FeedScheduleTest extends BaseTestClass {
-
-    private ColoHelper cluster = servers.get(0);
-    private OozieClient clusterOC = serverOC.get(0);
-    private String feed;
-
-    @BeforeMethod(alwaysRun = true)
-    public void setUp() throws Exception {
-        bundles[0] = BundleUtil.readELBundle();
-        bundles[0] = new Bundle(bundles[0], cluster);
-        bundles[0].generateUniqueBundle(this);
-        Bundle.submitCluster(bundles[0]);
-        feed = bundles[0].getInputFeedFromBundle();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void tearDown() {
-        removeTestClassEntities();
-    }
-
-    /**
-     * Tries to schedule already scheduled feed. Request should be considered as correct.
-     * Feed status shouldn't change.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void scheduleAlreadyScheduledFeed() throws Exception {
-        ServiceResponse response = prism.getFeedHelper().submitEntity(feed);
-        AssertUtil.assertSucceeded(response);
-
-        response = prism.getFeedHelper().schedule(feed);
-        AssertUtil.assertSucceeded(response);
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
-
-        //now try re-scheduling again
-        response = prism.getFeedHelper().schedule(feed);
-        AssertUtil.assertSucceeded(response);
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
-    }
-
-    /**
-     * Schedule correct feed. Feed should got running.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void scheduleValidFeed() throws Exception {
-        //submit feed
-        ServiceResponse response = prism.getFeedHelper().submitEntity(feed);
-        AssertUtil.assertSucceeded(response);
-
-        //now schedule the thing
-        response = prism.getFeedHelper().schedule(feed);
-        AssertUtil.assertSucceeded(response);
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
-    }
-
-    /**
-     * Tries to schedule already scheduled and suspended feed. Suspended status shouldn't change.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void scheduleSuspendedFeed() throws Exception {
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
-
-        //now suspend
-        AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(feed));
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
-        //now schedule this!
-        AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed));
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
-    }
-
-    /**
-     * Schedules and deletes feed. Tries to schedule it. Request should fail.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void scheduleKilledFeed() throws Exception {
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
-
-        //now suspend
-        AssertUtil.assertSucceeded(prism.getFeedHelper().delete(feed));
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.KILLED);
-        //now schedule this!
-        AssertUtil.assertFailed(prism.getFeedHelper().schedule(feed));
-    }
-
-    /**
-     * Tries to schedule feed which wasn't submitted. Request should fail.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void scheduleNonExistentFeed() throws Exception {
-        AssertUtil.assertFailed(prism.getFeedHelper().schedule(feed));
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedStatusTest.java
deleted file mode 100644
index d5e8696..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedStatusTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-/**
- * Feed status tests. Checks getStatus functionality.
- */
-@Test(groups = "embedded")
-public class FeedStatusTest extends BaseTestClass {
-
-    private ColoHelper cluster = servers.get(0);
-    private OozieClient clusterOC = serverOC.get(0);
-    private String feed;
-    private static final Logger LOGGER = Logger.getLogger(FeedStatusTest.class);
-
-
-    @BeforeMethod(alwaysRun = true)
-    public void setUp() throws Exception {
-        bundles[0] = BundleUtil.readELBundle();
-        bundles[0].generateUniqueBundle(this);
-        bundles[0] = new Bundle(bundles[0], cluster);
-
-        //submit the cluster
-        ServiceResponse response =
-            prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0));
-        AssertUtil.assertSucceeded(response);
-        feed = bundles[0].getInputFeedFromBundle();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void tearDown() {
-        removeTestClassEntities();
-    }
-
-    /**
-     * Schedules feed. Queries a feed status and checks the response
-     * correctness and a feed status correspondence.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void getStatusForScheduledFeed() throws Exception {
-        ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed);
-        LOGGER.info("Feed: " + Util.prettyPrintXml(feed));
-        AssertUtil.assertSucceeded(response);
-
-        response = prism.getFeedHelper().getStatus(feed);
-
-        AssertUtil.assertSucceeded(response);
-
-        String colo = prism.getFeedHelper().getColo();
-        Assert.assertTrue(response.getMessage().contains(colo + "/RUNNING"));
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
-    }
-
-    /**
-     * Schedules and suspends feed. Queries a feed status and checks the response
-     * correctness and a feed status correspondence.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void getStatusForSuspendedFeed() throws Exception {
-        ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed);
-
-        AssertUtil.assertSucceeded(response);
-
-        response = prism.getFeedHelper().suspend(feed);
-        AssertUtil.assertSucceeded(response);
-
-        response = prism.getFeedHelper().getStatus(feed);
-
-        AssertUtil.assertSucceeded(response);
-        String colo = prism.getFeedHelper().getColo();
-        Assert.assertTrue(response.getMessage().contains(colo + "/SUSPENDED"));
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
-    }
-
-    /**
-     * Submits feed. Queries a feed status and checks the response
-     * correctness and a feed status correspondence.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void getStatusForSubmittedFeed() throws Exception {
-        ServiceResponse response = prism.getFeedHelper().submitEntity(feed);
-
-        AssertUtil.assertSucceeded(response);
-
-        response = prism.getFeedHelper().getStatus(feed);
-
-        AssertUtil.assertSucceeded(response);
-        String colo = prism.getFeedHelper().getColo();
-        Assert.assertTrue(response.getMessage().contains(colo + "/SUBMITTED"));
-        AssertUtil.checkNotStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
-    }
-
-    /**
-     * Removes feed. Queries a feed status. Checks that the response correctness.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void getStatusForDeletedFeed() throws Exception {
-        ServiceResponse response = prism.getFeedHelper().submitEntity(feed);
-        AssertUtil.assertSucceeded(response);
-
-        response = prism.getFeedHelper().delete(feed);
-        AssertUtil.assertSucceeded(response);
-
-        response = prism.getFeedHelper().getStatus(feed);
-        AssertUtil.assertFailed(response);
-
-        Assert.assertTrue(
-            response.getMessage().contains(Util.readEntityName(feed) + " (FEED) not found"));
-        AssertUtil.checkNotStatus(clusterOC, EntityType.FEED, feed, Job.Status.KILLED);
-    }
-
-    /**
-     * Queries a status of feed which wasn't submitted and checks the response.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void getStatusForNonExistentFeed() throws Exception {
-        ServiceResponse response = prism.getFeedHelper().getStatus(feed);
-        AssertUtil.assertFailed(response);
-        Assert.assertTrue(
-            response.getMessage().contains(Util.readEntityName(feed) + " (FEED) not found"));
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
deleted file mode 100644
index f7bf0f8..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.client.OozieClient;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBException;
-import java.io.IOException;
-import java.net.URISyntaxException;
-
-/**
- * Feed submit and schedule tests.
- */
-@Test(groups = "embedded")
-public class FeedSubmitAndScheduleTest extends BaseTestClass {
-
-    private ColoHelper cluster = servers.get(0);
-    private OozieClient clusterOC = serverOC.get(0);
-    private String feed;
-
-    @BeforeMethod(alwaysRun = true)
-    public void setUp() throws Exception {
-        bundles[0] = BundleUtil.readELBundle();
-        bundles[0] = new Bundle(bundles[0], cluster);
-        bundles[0].generateUniqueBundle(this);
-        feed = bundles[0].getDataSets().get(0);
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void tearDown() {
-        //remove entities which belong to both default and different user
-        removeTestClassEntities(null, MerlinConstants.DIFFERENT_USER_NAME);
-    }
-
-    @Test(groups = {"singleCluster"})
-    public void snsNewFeed() throws Exception {
-        submitFirstClusterScheduleFirstFeed();
-    }
-
-    /**
-     * Submits and schedules feed with a cluster it depends on.
-     *
-     * @throws JAXBException
-     * @throws IOException
-     * @throws URISyntaxException
-     * @throws AuthenticationException
-     */
-    private void submitFirstClusterScheduleFirstFeed()
-        throws JAXBException, IOException, URISyntaxException, AuthenticationException,
-        InterruptedException {
-        AssertUtil.assertSucceeded(prism.getClusterHelper()
-            .submitEntity(bundles[0].getClusters().get(0)));
-        ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed);
-        AssertUtil.assertSucceeded(response);
-    }
-
-    /**
-     * Submits and schedules a feed and then tries to do the same on it. Checks that status
-     * hasn't changed and response is successful.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void snsExistingFeed() throws Exception {
-        submitFirstClusterScheduleFirstFeed();
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
-
-        //get created bundle id
-        String bundleId = OozieUtil.getLatestBundleID(clusterOC, Util.readEntityName(feed), EntityType.FEED);
-
-        //try to submit and schedule the same process again
-        ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed);
-        AssertUtil.assertSucceeded(response);
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
-
-        //check that new bundle wasn't created
-        OozieUtil.verifyNewBundleCreation(clusterOC, bundleId, null, feed, false, false);
-    }
-
-    /**
-     * Try to submit and schedule feed without submitting cluster it depends on.
-     * Request should fail.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void snsFeedWithoutCluster() throws Exception {
-        ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed);
-        AssertUtil.assertFailed(response);
-    }
-
-    /**
-     * Submits and schedules feed. Removes it. Submitted and schedules removed feed.
-     * Checks response and status of feed.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void snsDeletedFeed() throws Exception {
-        submitFirstClusterScheduleFirstFeed();
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
-        AssertUtil.assertSucceeded(prism.getFeedHelper().delete(feed));
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.KILLED);
-        ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed);
-        AssertUtil.assertSucceeded(response);
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
-    }
-
-    /**
-     * Suspends feed, submit and schedules it. Checks that response is successful,
-     * feed status hasn't changed.
-     *
-     * @throws Exception
-     */
-    @Test(groups = {"singleCluster"})
-    public void snsSuspendedFeed() throws Exception {
-        submitFirstClusterScheduleFirstFeed();
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
-        AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(feed));
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
-        ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed);
-        AssertUtil.assertSucceeded(response);
-        AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
-    }
-
-    /**
-     * Test for https://issues.apache.org/jira/browse/FALCON-1647.
-     * Create cluster entity as user1. Submit and schedule feed entity feed1 in this cluster as user1.
-     * Now try to submit and schedule a feed entity feed2 in this cluster as user2.
-     */
-    @Test
-    public void snsDiffFeedDiffUserSameCluster()
-        throws URISyntaxException, AuthenticationException, InterruptedException, IOException, JAXBException {
-        bundles[0].submitClusters(prism);
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
-        FeedMerlin feedMerlin = FeedMerlin.fromString(feed);
-        feedMerlin.setName(feedMerlin.getName() + "-2");
-        feedMerlin.setACL(MerlinConstants.DIFFERENT_USER_NAME, MerlinConstants.DIFFERENT_USER_GROUP, "*");
-        ServiceResponse response = prism.getFeedHelper().submitAndSchedule(
-            feedMerlin.toString(), MerlinConstants.DIFFERENT_USER_NAME, null);
-        AssertUtil.assertSucceeded(response);
-    }
-}