You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/03/01 09:25:53 UTC

[07/51] [partial] falcon git commit: FALCON-1830 Removed code source directories and updated pom

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
deleted file mode 100644
index a0922cb..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.TestNGException;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBException;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.text.DecimalFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
-
-
-/**
- * EL Validations tests.
- */
-@Test(groups = "embedded")
-public class ELValidationsTest extends BaseTestClass {
-
-    private ColoHelper cluster = servers.get(0);
-    private static final Logger LOGGER = Logger.getLogger(ELValidationsTest.class);
-    private String aggregateWorkflowDir = cleanAndGetTestDir() + "/aggregator";
-
-
-    @Test(groups = {"0.1", "0.2"})
-    public void startInstBeforeFeedStartToday02() throws Exception {
-        String response =
-            testWith("2009-02-02T20:00Z", "2011-12-31T00:00Z", "2009-02-02T20:00Z",
-                "2011-12-31T00:00Z", "now(-40,0)", "currentYear(20,30,24,20)", false);
-        validate(response);
-    }
-
-    @Test(groups = {"singleCluster"})
-    public void startInstAfterFeedEnd() throws Exception {
-        String response = testWith(null, null, null, null,
-            "currentYear(10,0,22,0)", "now(4,20)", false);
-        validate(response);
-    }
-
-    @Test(groups = {"singleCluster"})
-    public void bothInstReverse() throws Exception {
-        String response = testWith(null, null, null, null,
-            "now(0,0)", "now(-100,0)", false);
-        validate(response);
-    }
-
-    @Test(groups = {"singleCluster"}, dataProvider = "EL-DP")
-    public void expressionLanguageTest(String startInstance, String endInstance) throws Exception {
-        testWith(null, null, null, null, startInstance, endInstance, true);
-    }
-
-    @DataProvider(name = "EL-DP")
-    public Object[][] getELData() {
-        return new Object[][]{
-            {"now(-3,0)", "now(4,20)"},
-            {"yesterday(22,0)", "now(4,20)"},
-            {"currentMonth(0,22,0)", "now(4,20)"},
-            {"lastMonth(30,22,0)", "now(4,20)"},
-            {"currentYear(0,0,22,0)", "currentYear(1,1,22,0)"},
-            {"currentMonth(0,22,0)", "currentMonth(1,22,20)"},
-            {"lastMonth(30,22,0)", "lastMonth(60,2,40)"},
-            {"lastYear(12,0,22,0)", "lastYear(13,1,22,0)"},
-        };
-    }
-
-    private void validate(String response) {
-        if ((response.contains("End instance ") || response.contains("Start instance"))
-            && (response.contains("for feed") || response.contains("of feed"))
-            && (response.contains("is before the start of feed")
-            || response.contains("is after the end of feed"))) {
-            return;
-        }
-        if (response.contains("End instance")
-            && response.contains("is before the start instance")) {
-            return;
-        }
-        Assert.fail("Response is not valid");
-    }
-
-    private String testWith(String feedStart,
-                            String feedEnd, String processStart,
-                            String processEnd,
-                            String startInstance, String endInstance, boolean isMatch)
-        throws IOException, JAXBException, ParseException, URISyntaxException {
-        HadoopUtil.uploadDir(cluster.getClusterHelper().getHadoopFS(),
-            aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-        Bundle bundle = BundleUtil.readELBundle();
-        bundle = new Bundle(bundle, cluster.getPrefix());
-        bundle.generateUniqueBundle(this);
-        bundle.setProcessWorkflow(aggregateWorkflowDir);
-        if (feedStart != null && feedEnd != null) {
-            bundle.setFeedValidity(feedStart, feedEnd, bundle.getInputFeedNameFromBundle());
-        }
-        if (processStart != null && processEnd != null) {
-            bundle.setProcessValidity(processStart, processEnd);
-        }
-        try {
-            bundle.setInvalidData();
-            bundle.setDatasetInstances(startInstance, endInstance);
-            String submitResponse = bundle.submitFeedsScheduleProcess(prism).getMessage();
-            LOGGER.info("processData in try is: " + Util.prettyPrintXml(bundle.getProcessData()));
-            TimeUtil.sleepSeconds(45);
-            if (isMatch) {
-                getAndMatchDependencies(serverOC.get(0), bundle);
-            }
-            return submitResponse;
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new TestNGException(e);
-        } finally {
-            LOGGER.info("deleting entity:");
-            bundle.deleteBundle(prism);
-        }
-    }
-
-    private void getAndMatchDependencies(OozieClient oozieClient, Bundle bundle) {
-        try {
-            List<String> bundles = null;
-            for (int i = 0; i < 10; ++i) {
-                bundles = OozieUtil.getBundles(oozieClient, bundle.getProcessName(), EntityType.PROCESS);
-                if (bundles.size() > 0) {
-                    break;
-                }
-                TimeUtil.sleepSeconds(30);
-            }
-            Assert.assertTrue(bundles != null && bundles.size() > 0, "Bundle job not created.");
-            String coordID = bundles.get(0);
-            LOGGER.info("coord id: " + coordID);
-            List<String> missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID);
-            for (int i = 0; i < 10 && missingDependencies == null; ++i) {
-                TimeUtil.sleepSeconds(30);
-                missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID);
-            }
-            Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
-            for (String dependency : missingDependencies) {
-                LOGGER.info("dependency from job: " + dependency);
-            }
-            Date jobNominalTime = OozieUtil.getNominalTime(oozieClient, coordID);
-            Calendar time = Calendar.getInstance();
-            time.setTime(jobNominalTime);
-            LOGGER.info("nominalTime:" + jobNominalTime);
-            SimpleDateFormat df = new SimpleDateFormat("dd MMM yyyy HH:mm:ss");
-            LOGGER.info(
-                "nominalTime in GMT string: " + df.format(jobNominalTime.getTime()) + " GMT");
-            TimeZone z = time.getTimeZone();
-            int offset = z.getRawOffset();
-            int offsetHrs = offset / 1000 / 60 / 60;
-            int offsetMins = offset / 1000 / 60 % 60;
-
-            LOGGER.info("offset: " + offsetHrs);
-            LOGGER.info("offset: " + offsetMins);
-
-            time.add(Calendar.HOUR_OF_DAY, (-offsetHrs));
-            time.add(Calendar.MINUTE, (-offsetMins));
-
-            LOGGER.info("GMT Time: " + time.getTime());
-
-            int frequency = bundle.getInitialDatasetFrequency();
-            List<String> qaDependencyList =
-                getQADepedencyList(time, bundle.getStartInstanceProcess(time),
-                    bundle.getEndInstanceProcess(time),
-                    frequency, bundle);
-            for (String qaDependency : qaDependencyList) {
-                LOGGER.info("qa qaDependencyList: " + qaDependency);
-            }
-
-            Assert.assertTrue(matchDependencies(missingDependencies, qaDependencyList));
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new TestNGException(e);
-        }
-    }
-
-    private boolean matchDependencies(List<String> fromJob, List<String> qaList) {
-        if (fromJob.size() != qaList.size()) {
-            return false;
-        }
-        Collections.sort(fromJob);
-        Collections.sort(qaList);
-        for (int index = 0; index < fromJob.size(); index++) {
-            if (!fromJob.get(index).contains(qaList.get(index))) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private List<String> getQADepedencyList(Calendar nominalTime, Date startRef,
-                                            Date endRef, int frequency, Bundle bundle) {
-        LOGGER.info("start ref:" + startRef);
-        LOGGER.info("end ref:" + endRef);
-        Calendar initialTime = Calendar.getInstance();
-        initialTime.setTime(startRef);
-        Calendar finalTime = Calendar.getInstance();
-
-        finalTime.setTime(endRef);
-        String path = bundle.getDatasetPath();
-
-        TimeZone tz = TimeZone.getTimeZone("GMT");
-        nominalTime.setTimeZone(tz);
-        LOGGER.info("nominalTime: " + initialTime.getTime());
-        LOGGER.info("finalTime: " + finalTime.getTime());
-        List<String> returnList = new ArrayList<>();
-        while (initialTime.getTime().before(finalTime.getTime())) {
-            LOGGER.info("initialTime: " + initialTime.getTime());
-            returnList.add(getPath(path, initialTime));
-            initialTime.add(Calendar.MINUTE, frequency);
-        }
-        returnList.add(getPath(path, initialTime));
-        Collections.reverse(returnList);
-        return returnList;
-    }
-
-    private String getPath(String path, Calendar time) {
-        if (path.contains("${YEAR}")) {
-            path = path.replaceAll("\\$\\{YEAR\\}", Integer.toString(time.get(Calendar.YEAR)));
-        }
-        if (path.contains("${MONTH}")) {
-            path = path.replaceAll("\\$\\{MONTH\\}", intToString(time.get(Calendar.MONTH) + 1, 2));
-        }
-        if (path.contains("${DAY}")) {
-            path = path.replaceAll("\\$\\{DAY\\}", intToString(time.get(Calendar.DAY_OF_MONTH), 2));
-        }
-        if (path.contains("${HOUR}")) {
-            path = path.replaceAll("\\$\\{HOUR\\}", intToString(time.get(Calendar.HOUR_OF_DAY), 2));
-        }
-        if (path.contains("${MINUTE}")) {
-            path = path.replaceAll("\\$\\{MINUTE\\}", intToString(time.get(Calendar.MINUTE), 2));
-        }
-        return path;
-    }
-
-    private String intToString(int num, int digits) {
-        assert digits > 0 : "Invalid number of digits";
-
-        // create variable length array of zeros
-        char[] zeros = new char[digits];
-        Arrays.fill(zeros, '0');
-
-        // format number as String
-        DecimalFormat df = new DecimalFormat(String.valueOf(zeros));
-        return df.format(num);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
deleted file mode 100644
index c49c381..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.falcon.resource.InstancesResult.WorkflowStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.List;
-
-/**
- * Embedded pig script test.
- */
-@Test(groups = "embedded")
-public class EmbeddedPigScriptTest extends BaseTestClass {
-
-    private ColoHelper cluster = servers.get(0);
-    private FileSystem clusterFS = serverFS.get(0);
-    private OozieClient clusterOC = serverOC.get(0);
-    private String pigTestDir = cleanAndGetTestDir();
-    private String pigScriptDir = pigTestDir + "/pig";
-    private String pigScriptLocation = pigScriptDir + "/id.pig";
-    private String inputPath = pigTestDir + "/input" + MINUTE_DATE_PATTERN;
-    private static final Logger LOGGER = Logger.getLogger(EmbeddedPigScriptTest.class);
-    private static final double TIMEOUT = 15;
-    private String processName;
-    private String process;
-
-    @BeforeClass(alwaysRun = true)
-    public void createTestData() throws Exception {
-        LOGGER.info("in @BeforeClass");
-
-        //copy pig script
-        HadoopUtil.uploadDir(clusterFS, pigScriptDir, OSUtil.concat(OSUtil.RESOURCES, "pig"));
-        Bundle bundle = BundleUtil.readELBundle();
-        bundle.generateUniqueBundle(this);
-        bundle = new Bundle(bundle, cluster);
-        String startDate = "2010-01-02T00:40Z";
-        String endDate = "2010-01-02T01:10Z";
-        bundle.setInputFeedDataPath(inputPath);
-        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
-        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT,
-            bundle.getFeedDataPathPrefix(), dataDates);
-    }
-
-    @BeforeMethod(alwaysRun = true)
-    public void setUp() throws Exception {
-        bundles[0] = BundleUtil.readELBundle();
-        bundles[0] = new Bundle(bundles[0], cluster);
-        bundles[0].generateUniqueBundle(this);
-        bundles[0].setInputFeedDataPath(inputPath);
-        bundles[0].setOutputFeedLocationData(pigTestDir + "/output-data" + MINUTE_DATE_PATTERN);
-        bundles[0].setProcessWorkflow(pigScriptLocation);
-        bundles[0].setProcessInputNames("INPUT");
-        bundles[0].setProcessOutputNames("OUTPUT");
-        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:10Z");
-        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
-        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
-
-        final ProcessMerlin processElement = bundles[0].getProcessObject();
-        processElement.clearProperties().withProperty("queueName", "default");
-        processElement.getWorkflow().setEngine(EngineType.PIG);
-        bundles[0].setProcessData(processElement.toString());
-        bundles[0].submitFeedsScheduleProcess(prism);
-        process = bundles[0].getProcessData();
-        processName = Util.readEntityName(process);
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void tearDown() {
-        removeTestClassEntities();
-    }
-
-    @Test(groups = {"singleCluster"}, timeOut = 600000)
-    public void getResumedProcessInstance() throws Exception {
-        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
-        prism.getProcessHelper().suspend(process);
-        TimeUtil.sleepSeconds(TIMEOUT);
-        ServiceResponse status = prism.getProcessHelper().getStatus(process);
-        Assert.assertTrue(status.getMessage().contains("SUSPENDED"), "Process not suspended.");
-        prism.getProcessHelper().resume(process);
-        TimeUtil.sleepSeconds(TIMEOUT);
-        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
-        InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
-        InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
-    }
-
-    @Test(groups = {"singleCluster"}, timeOut = 600000)
-    public void getSuspendedProcessInstance() throws Exception {
-        prism.getProcessHelper().suspend(process);
-        TimeUtil.sleepSeconds(TIMEOUT);
-        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.SUSPENDED);
-        InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
-        InstanceUtil.validateSuccessWOInstances(r);
-    }
-
-    @Test(groups = {"singleCluster"}, timeOut = 600000)
-    public void getRunningProcessInstance() throws Exception {
-        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
-        TimeUtil.sleepSeconds(TIMEOUT);
-        InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
-        InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
-    }
-
-    @Test(groups = {"singleCluster"}, timeOut = 600000)
-    public void getKilledProcessInstance() throws Exception {
-        prism.getProcessHelper().delete(process);
-        TimeUtil.sleepSeconds(TIMEOUT);
-        InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
-        InstanceUtil.validateError(r, ResponseErrors.PROCESS_NOT_FOUND);
-    }
-
-    @Test(groups = {"singleCluster"}, timeOut = 6000000)
-    public void getSucceededProcessInstance() throws Exception {
-        AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
-        InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
-        InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
-        int counter = OSUtil.IS_WINDOWS ? 100 : 50;
-        OozieUtil.waitForBundleToReachState(clusterOC, bundles[0].getProcessName(), Job.Status.SUCCEEDED, counter);
-        r = prism.getProcessHelper().getRunningInstance(processName);
-        InstanceUtil.validateSuccessWOInstances(r);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
deleted file mode 100644
index 728b797..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.ActionType;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.MatrixUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.OozieClient;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBException;
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-
-/**
- * Tests for operations with external file systems.
- */
-@Test(groups = "embedded")
-public class ExternalFSTest extends BaseTestClass{
-
-    public static final String WASB_END_POINT =
-            "wasb://" + MerlinConstants.WASB_CONTAINER + "@" + MerlinConstants.WASB_ACCOUNT;
-    private ColoHelper cluster = servers.get(0);
-    private FileSystem clusterFS = serverFS.get(0);
-    private OozieClient clusterOC = serverOC.get(0);
-    private FileSystem wasbFS;
-    private Bundle externalBundle;
-
-    private String baseTestDir = cleanAndGetTestDir();
-    private String sourcePath = baseTestDir + "/source";
-    private String baseWasbDir = "/falcon-regression/" + UUID.randomUUID().toString().split("-")[0];
-    private String testWasbTargetDir = baseWasbDir + '/'
-        + UUID.randomUUID().toString().split("-")[0] + '/';
-
-    private static final Logger LOGGER = Logger.getLogger(ExternalFSTest.class);
-
-    @BeforeClass
-    public void setUpClass() throws IOException {
-        HadoopUtil.recreateDir(clusterFS, baseTestDir);
-        Configuration conf = new Configuration();
-        conf.set("fs.defaultFS", WASB_END_POINT);
-        conf.set("fs.azure.account.key." + MerlinConstants.WASB_ACCOUNT,
-                MerlinConstants.WASB_SECRET);
-        conf.setBoolean("fs.hdfs.impl.disable.cache", false);
-        wasbFS = FileSystem.get(conf);
-        LOGGER.info("creating base wasb dir" + baseWasbDir);
-    }
-
-    @BeforeMethod(alwaysRun = true)
-    public void setUp() throws JAXBException, IOException {
-        Bundle bundle = BundleUtil.readFeedReplicationBundle();
-
-        bundles[0] = new Bundle(bundle, cluster);
-        externalBundle = new Bundle(bundle, cluster);
-
-        bundles[0].generateUniqueBundle(this);
-        externalBundle.generateUniqueBundle(this);
-
-        LOGGER.info("checking wasb credentials with location: " + testWasbTargetDir);
-        wasbFS.create(new Path(testWasbTargetDir));
-        wasbFS.delete(new Path(testWasbTargetDir), true);
-    }
-
-    @AfterMethod
-    public void tearDown() throws IOException {
-        removeTestClassEntities();
-        wasbFS.delete(new Path(testWasbTargetDir), true);
-    }
-
-    @AfterClass(alwaysRun = true)
-    public void tearDownClass() throws IOException {
-        wasbFS.delete(new Path(baseWasbDir), true);
-    }
-
-
-    @Test(dataProvider = "getInvalidTargets")
-    public void invalidCredentialsExtFS(String endpoint) throws Exception {
-        bundles[0].setClusterInterface(Interfacetype.READONLY, endpoint);
-        bundles[0].setClusterInterface(Interfacetype.WRITE, endpoint);
-
-        AssertUtil.assertFailed(prism.getClusterHelper()
-            .submitEntity(bundles[0].getClusterElement().toString()));
-
-    }
-
-    @Test(dataProvider = "getData")
-    public void replicateToExternalFS(final FileSystem externalFS,
-        final String separator, final boolean withData) throws Exception {
-        final String endpoint = externalFS.getUri().toString();
-        Bundle.submitCluster(bundles[0], externalBundle);
-        String startTime = TimeUtil.getTimeWrtSystemTime(0);
-        String endTime = TimeUtil.addMinsToTime(startTime, 5);
-        LOGGER.info("Time range between : " + startTime + " and " + endTime);
-        String datePattern = StringUtils .join(
-            new String[]{"${YEAR}", "${MONTH}", "${DAY}", "${HOUR}", "${MINUTE}"}, separator);
-
-        //configure feed
-        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
-        String targetDataLocation = endpoint + testWasbTargetDir + datePattern;
-        feed.setFilePath(sourcePath + '/' + datePattern);
-        //erase all clusters from feed definition
-        feed.clearFeedClusters();
-        //set local cluster as source
-        feed.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.SOURCE)
-                .build());
-        //set externalFS cluster as target
-        feed.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(externalBundle.getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.TARGET)
-                .withDataLocation(targetDataLocation)
-                .build());
-
-        //submit and schedule feed
-        LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
-        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
-        datePattern = StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH", "mm"}, separator);
-        //upload necessary data
-        DateTime date = new DateTime(startTime, DateTimeZone.UTC);
-        DateTimeFormatter fmt = DateTimeFormat.forPattern(datePattern);
-        String timePattern = fmt.print(date);
-        HadoopUtil.recreateDir(clusterFS, sourcePath + '/' + timePattern);
-        if (withData) {
-            HadoopUtil.copyDataToFolder(clusterFS, sourcePath + '/' + timePattern, OSUtil.SINGLE_FILE);
-        }
-
-        Path srcPath = new Path(sourcePath + '/' + timePattern);
-        Path dstPath = new Path(endpoint + testWasbTargetDir + '/' + timePattern);
-
-        //check if coordinator exists
-        TimeUtil.sleepSeconds(10);
-        InstanceUtil.waitTillInstancesAreCreated(clusterOC, feed.toString(), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(clusterOC, feed.getName(), "REPLICATION"), 1);
-
-        //replication should start, wait while it ends
-        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(feed.toString()), 1,
-            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
-
-        //check if data has been replicated correctly
-        List<Path> cluster1ReplicatedData =
-                HadoopUtil.getAllFilesRecursivelyHDFS(clusterFS, srcPath);
-        List<Path> cluster2ReplicatedData =
-                HadoopUtil.getAllFilesRecursivelyHDFS(externalFS, dstPath);
-        AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
-        final ContentSummary srcSummary = clusterFS.getContentSummary(srcPath);
-        final ContentSummary dstSummary = externalFS.getContentSummary(dstPath);
-        Assert.assertEquals(dstSummary.getLength(), srcSummary.getLength());
-    }
-
-
-
-    @DataProvider
-    public Object[][] getData() {
-        //"-" for single directory, "/" - for dir with subdirs };
-        return MatrixUtil.crossProduct(new FileSystem[]{wasbFS},
-            new String[]{"/", "-"},
-            new Boolean[]{true, false});
-    }
-
-    @DataProvider
-    public Object[][] getInvalidTargets() {
-        return new Object[][]{{"wasb://invalid@invalid.blob.core.windows.net/"}};
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java
deleted file mode 100644
index feb0cc1..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java
+++ /dev/null
@@ -1,678 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.entity.v0.feed.ActionType;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.core.util.XmlUtil;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-/**
- * Feed cluster update tests.
- */
-@Test(groups = "distributed")
-public class FeedClusterUpdateTest extends BaseTestClass {
-
-    private String baseTestDir = cleanAndGetTestDir();
-    private String aggregateWorkflowDir = baseTestDir + "/aggregator";
-    private ColoHelper cluster1 = servers.get(0);
-    private ColoHelper cluster2 = servers.get(1);
-    private ColoHelper cluster3 = servers.get(2);
-    private OozieClient cluster1OC = serverOC.get(0);
-    private OozieClient cluster2OC = serverOC.get(1);
-    private OozieClient cluster3OC = serverOC.get(2);
-    private FileSystem cluster2FS = serverFS.get(1);
-    private FileSystem cluster3FS = serverFS.get(2);
-    private String feed;
-    private String feedName;
-    private String startTime;
-    private String feedOriginalSubmit;
-    private String feedUpdated;
-    private String cluster1Name;
-    private String cluster2Name;
-    private String cluster3Name;
-    private static final Logger LOGGER = Logger.getLogger(FeedClusterUpdateTest.class);
-
-
-    @BeforeClass(alwaysRun = true)
-    public void createTestData() throws Exception {
-        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-        Bundle bundle = BundleUtil.readELBundle();
-        for (int i = 0; i < 3; i++) {
-            bundles[i] = new Bundle(bundle, servers.get(i));
-            bundles[i].generateUniqueBundle(this);
-            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
-        }
-        try {
-            String postFix = "/US/" + servers.get(1).getClusterHelper().getColoName();
-            HadoopUtil.deleteDirIfExists(baseTestDir, cluster2FS);
-            HadoopUtil.lateDataReplenish(cluster2FS, 80, 1, baseTestDir, postFix);
-            postFix = "/UK/" + servers.get(2).getClusterHelper().getColoName();
-            HadoopUtil.deleteDirIfExists(baseTestDir, cluster3FS);
-            HadoopUtil.lateDataReplenish(cluster3FS, 80, 1, baseTestDir, postFix);
-        } finally {
-            removeTestClassEntities();
-        }
-    }
-
-    @BeforeMethod(alwaysRun = true)
-    public void setup() throws Exception {
-        Bundle bundle = BundleUtil.readELBundle();
-        for (int i = 0; i < 3; i++) {
-            bundles[i] = new Bundle(bundle, servers.get(i));
-            bundles[i].generateUniqueBundle(this);
-            bundles[i].setProcessWorkflow(aggregateWorkflowDir);
-        }
-        BundleUtil.submitAllClusters(prism, bundles[0], bundles[1], bundles[2]);
-        feed = bundles[0].getDataSets().get(0);
-        feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
-        startTime = TimeUtil.getTimeWrtSystemTime(-50);
-        feedName = Util.readEntityName(feed);
-        cluster1Name = Util.readEntityName(bundles[0].getClusters().get(0));
-        cluster2Name = Util.readEntityName(bundles[1].getClusters().get(0));
-        cluster3Name = Util.readEntityName(bundles[2].getClusters().get(0));
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void tearDown() {
-        removeTestClassEntities();
-    }
-
-    @Test(enabled = true, groups = {"multiCluster"})
-    public void addSourceCluster() throws Exception {
-        //add one source and one target , schedule only on source
-        feedOriginalSubmit = FeedMerlin.fromString(feed)
-            .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
-                .withClusterType(ClusterType.SOURCE)
-                .build())
-            .toString();
-        feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster1Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 20),
-                    TimeUtil.addMinsToTime(startTime, 85))
-                .withClusterType(ClusterType.TARGET)
-                .build())
-            .toString();
-
-        LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
-        ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit);
-        TimeUtil.sleepSeconds(10);
-        AssertUtil.assertSucceeded(response);
-
-        //schedule on source
-        response = cluster2.getFeedHelper().schedule(feedOriginalSubmit);
-        TimeUtil.sleepSeconds(20);
-        AssertUtil.assertSucceeded(response);
-
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 0);
-
-        //prepare updated Feed
-        feedUpdated = FeedMerlin.fromString(feed)
-            .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
-                .withClusterType(ClusterType.SOURCE)
-                .withPartition("UK/${cluster.colo}")
-                .build())
-            .toString();
-        feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster1Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 20),
-                    TimeUtil.addMinsToTime(startTime, 85))
-                .withClusterType(ClusterType.TARGET)
-                .build())
-            .toString();
-        feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster3Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 40),
-                    TimeUtil.addMinsToTime(startTime, 110))
-                .withClusterType(ClusterType.SOURCE)
-                .withPartition("UK/${cluster.colo}")
-                .build())
-            .toString();
-
-        response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
-        TimeUtil.sleepSeconds(20);
-        AssertUtil.assertSucceeded(response);
-
-        prism.getFeedHelper().submitAndSchedule(feedUpdated);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 2);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1);
-    }
-
-    @Test(enabled = true, groups = {"multiCluster"})
-    public void addTargetCluster() throws Exception {
-        //add one source and one target , schedule only on source
-        feedOriginalSubmit = FeedMerlin.fromString(feed)
-            .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
-                .withClusterType(ClusterType.SOURCE)
-                .build())
-            .toString();
-        feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster3Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 40),
-                    TimeUtil.addMinsToTime(startTime, 110))
-                .withClusterType(ClusterType.SOURCE)
-                .withPartition("UK/${cluster.colo}")
-                .build())
-            .toString();
-
-        LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
-        ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit);
-        TimeUtil.sleepSeconds(10);
-        AssertUtil.assertSucceeded(response);
-
-        //schedule on source
-        response = cluster2.getFeedHelper().schedule(feedOriginalSubmit);
-        TimeUtil.sleepSeconds(20);
-        AssertUtil.assertSucceeded(response);
-
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 0);
-
-        //prepare updated Feed
-        feedUpdated = FeedMerlin.fromString(feed)
-            .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
-                .withClusterType(ClusterType.SOURCE)
-                .withPartition("US/${cluster.colo}")
-                .build())
-            .toString();
-        feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster1Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 20),
-                    TimeUtil.addMinsToTime(startTime, 85))
-                .withClusterType(ClusterType.TARGET)
-                .build())
-            .toString();
-        feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster3Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 40),
-                    TimeUtil.addMinsToTime(startTime, 110))
-                .withClusterType(ClusterType.SOURCE)
-                .withPartition("UK/${cluster.colo}")
-                .build())
-            .toString();
-
-        LOGGER.info("Updated Feed: " + Util.prettyPrintXml(feedUpdated));
-        response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
-        TimeUtil.sleepSeconds(20);
-        AssertUtil.assertSucceeded(response);
-
-        prism.getFeedHelper().submitAndSchedule(feedUpdated);
-
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 2);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1);
-    }
-
-    @Test(enabled = true, groups = {"multiCluster"})
-    public void add2SourceCluster() throws Exception {
-        //add one source , schedule only on source
-        feedOriginalSubmit = FeedMerlin.fromString(feed)
-            .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
-                .withClusterType(ClusterType.SOURCE)
-                .build())
-            .toString();
-
-        LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
-        ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit);
-        TimeUtil.sleepSeconds(10);
-        AssertUtil.assertSucceeded(response);
-
-        //schedule on source
-        response = cluster2.getFeedHelper().schedule(feedOriginalSubmit);
-        TimeUtil.sleepSeconds(20);
-        AssertUtil.assertSucceeded(response);
-
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 0);
-
-        //prepare updated Feed
-        feedUpdated = FeedMerlin.fromString(feed)
-            .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
-                .withClusterType(ClusterType.SOURCE)
-                .withPartition("US/${cluster.colo}")
-                .build())
-            .toString();
-        feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster1Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 20),
-                    TimeUtil.addMinsToTime(startTime, 85))
-                .withClusterType(ClusterType.SOURCE)
-                .build())
-            .toString();
-        feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster3Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 40),
-                    TimeUtil.addMinsToTime(startTime, 110))
-                .withClusterType(ClusterType.SOURCE)
-                .withPartition("UK/${cluster.colo}")
-                .build())
-            .toString();
-
-        LOGGER.info("Updated Feed: " + Util.prettyPrintXml(feedUpdated));
-        response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
-        TimeUtil.sleepSeconds(20);
-        AssertUtil.assertSucceeded(response);
-
-        prism.getFeedHelper().submitAndSchedule(feedUpdated);
-
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 2);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1);
-    }
-
-    @Test(enabled = true, groups = {"multiCluster"})
-    public void add2TargetCluster() throws Exception {
-        //add one source and one target , schedule only on source
-        feedOriginalSubmit = FeedMerlin.fromString(feed)
-            .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
-                .withClusterType(ClusterType.SOURCE)
-                .build())
-            .toString();
-
-        LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
-        ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit);
-        TimeUtil.sleepSeconds(10);
-        AssertUtil.assertSucceeded(response);
-
-        //schedule on source
-        response = cluster2.getFeedHelper().schedule(feedOriginalSubmit);
-        TimeUtil.sleepSeconds(20);
-        AssertUtil.assertSucceeded(response);
-
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 0);
-
-        //prepare updated Feed
-        feedUpdated = FeedMerlin.fromString(feed)
-            .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
-                .withClusterType(ClusterType.SOURCE)
-                .build())
-            .toString();
-        feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster1Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 20),
-                    TimeUtil.addMinsToTime(startTime, 85))
-                .withClusterType(ClusterType.TARGET)
-                .build())
-            .toString();
-        feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster3Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 40),
-                    TimeUtil.addMinsToTime(startTime, 110))
-                .withClusterType(ClusterType.TARGET)
-                .build())
-            .toString();
-
-        LOGGER.info("Updated Feed: " + Util.prettyPrintXml(feedUpdated));
-        response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
-        TimeUtil.sleepSeconds(20);
-        AssertUtil.assertSucceeded(response);
-
-        prism.getFeedHelper().submitAndSchedule(feedUpdated);
-
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1);
-    }
-
-    @Test(enabled = true, groups = {"multiCluster"})
-    public void add1Source1TargetCluster() throws Exception {
-        //add one source and one target , schedule only on source
-        feedOriginalSubmit = FeedMerlin.fromString(feed)
-            .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
-                .withClusterType(ClusterType.SOURCE)
-                .build())
-            .toString();
-
-        LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
-        ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit);
-        TimeUtil.sleepSeconds(10);
-        AssertUtil.assertSucceeded(response);
-
-        //schedule on source
-        response = cluster2.getFeedHelper().schedule(feedOriginalSubmit);
-        TimeUtil.sleepSeconds(20);
-        AssertUtil.assertSucceeded(response);
-
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 0);
-
-        //prepare updated Feed
-        feedUpdated = FeedMerlin.fromString(feed)
-            .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
-                .withClusterType(ClusterType.SOURCE)
-                .withPartition("US/${cluster.colo}")
-                .build())
-            .toString();
-        feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster1Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 20),
-                    TimeUtil.addMinsToTime(startTime, 85))
-                .withClusterType(ClusterType.TARGET)
-                .build())
-            .toString();
-        feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster3Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 40),
-                    TimeUtil.addMinsToTime(startTime, 110))
-                .withClusterType(ClusterType.SOURCE)
-                .withPartition("UK/${cluster.colo}")
-                .build())
-            .toString();
-
-        LOGGER.info("Updated Feed: " + Util.prettyPrintXml(feedUpdated));
-        response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
-        TimeUtil.sleepSeconds(20);
-        AssertUtil.assertSucceeded(response);
-
-        prism.getFeedHelper().submitAndSchedule(feedUpdated);
-
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 2);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1);
-    }
-
-    @Test(enabled = true, groups = {"multiCluster"})
-    public void deleteSourceCluster() throws Exception {
-        //add one source and one target , schedule only on source
-        feedOriginalSubmit = FeedMerlin.fromString(feed)
-            .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
-                .withClusterType(ClusterType.SOURCE)
-                .withPartition("US/${cluster.colo}")
-                .build())
-            .toString();
-        feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster1Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 20),
-                    TimeUtil.addMinsToTime(startTime, 85))
-                .withClusterType(ClusterType.TARGET)
-                .build())
-            .toString();
-        feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster3Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 40),
-                    TimeUtil.addMinsToTime(startTime, 110))
-                .withClusterType(ClusterType.SOURCE)
-                .withPartition("UK/${cluster.colo}")
-                .build())
-            .toString();
-
-        LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
-        ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit);
-        TimeUtil.sleepSeconds(10);
-        AssertUtil.assertSucceeded(response);
-
-        //schedule on source
-        response = prism.getFeedHelper().schedule(feedOriginalSubmit);
-        TimeUtil.sleepSeconds(20);
-        AssertUtil.assertSucceeded(response);
-
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1);
-
-        //prepare updated Feed
-        feedUpdated = FeedMerlin.fromString(feed)
-            .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
-                .withClusterType(ClusterType.SOURCE)
-                .build())
-            .toString();
-        feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster1Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 20),
-                    TimeUtil.addMinsToTime(startTime, 85))
-                .withClusterType(ClusterType.TARGET)
-                .build())
-            .toString();
-
-        response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
-        TimeUtil.sleepSeconds(20);
-        AssertUtil.assertSucceeded(response);
-
-        response = cluster3.getFeedHelper().getEntityDefinition(feedUpdated);
-        AssertUtil.assertFailed(response);
-
-        prism.getFeedHelper().submitAndSchedule(feedUpdated);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 2);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 3);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 2);
-    }
-
-    @Test(enabled = true, groups = {"multiCluster"})
-    public void deleteTargetCluster() throws Exception {
-        /*
-        this test creates a multiCluster feed. Cluster1 is the target cluster
-         and cluster3 and Cluster2 are the source cluster.
-
-        feed is submitted through prism so submitted to both target and
-        source. Feed is scheduled through prism, so only on Cluster3 and
-        Cluster2 retention coord should exists. Cluster1 one which
-         is target both retention and replication coord should exists. there
-         will be 2 replication coord, one each for each source cluster.
-
-        then we update feed by deleting cluster1 and cluster2 from the feed
-        xml and send update request.
-
-        Once update is over. definition should go missing from cluster1 and
-        cluster2 and prism and cluster3 should have new def
-
-        there should be a new retention coord on cluster3 and old number of
-        coord on cluster1 and cluster2
-         */
-
-        //add two source and one target
-        feedOriginalSubmit = FeedMerlin.fromString(feed).clearFeedClusters().toString();
-
-        feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit)
-            .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
-                .withClusterType(ClusterType.SOURCE)
-                .withPartition("US/${cluster.colo}")
-                .build())
-            .toString();
-        feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster1Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 20),
-                    TimeUtil.addMinsToTime(startTime, 85))
-                .withClusterType(ClusterType.TARGET)
-                .build())
-            .toString();
-        feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster3Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 40),
-                    TimeUtil.addMinsToTime(startTime, 110))
-                .withClusterType(ClusterType.SOURCE)
-                .withPartition("UK/${cluster.colo}")
-                .build())
-            .toString();
-
-        LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
-        ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit);
-        TimeUtil.sleepSeconds(10);
-        AssertUtil.assertSucceeded(response);
-
-        //schedule on source
-        response = prism.getFeedHelper().schedule(feedOriginalSubmit);
-        TimeUtil.sleepSeconds(20);
-        AssertUtil.assertSucceeded(response);
-
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1);
-
-        //prepare updated Feed
-        feedUpdated = FeedMerlin.fromString(feed).clearFeedClusters().toString();
-        feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(cluster3Name)
-                .withRetention("hours(10)", ActionType.DELETE)
-                .withValidity(TimeUtil.addMinsToTime(startTime, 40),
-                    TimeUtil.addMinsToTime(startTime, 110))
-                .withClusterType(ClusterType.SOURCE)
-                .withPartition("UK/${cluster.colo}")
-                .build())
-            .toString();
-
-        LOGGER.info("Feed: " + Util.prettyPrintXml(feedUpdated));
-        response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
-        TimeUtil.sleepSeconds(20);
-        AssertUtil.assertSucceeded(response);
-
-        //verify xmls definitions
-        response = cluster1.getFeedHelper().getEntityDefinition(feedUpdated);
-        AssertUtil.assertFailed(response);
-        response = cluster2.getFeedHelper().getEntityDefinition(feedUpdated);
-        AssertUtil.assertFailed(response);
-        response = cluster3.getFeedHelper().getEntityDefinition(feedUpdated);
-        Assert.assertTrue(XmlUtil.isIdentical(feedUpdated, response.getMessage()));
-        response = prism.getFeedHelper().getEntityDefinition(feedUpdated);
-        Assert.assertTrue(XmlUtil.isIdentical(feedUpdated, response.getMessage()));
-
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2);
-        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1);
-    }
-
-    /*
-    @Test(enabled = false)
-    public void delete2SourceCluster() {
-
-    }
-
-    @Test(enabled = false)
-    public void delete2TargetCluster() {
-
-    }
-
-    @Test(enabled = false)
-    public void delete1Source1TargetCluster() {
-
-    }
-    */
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java
deleted file mode 100644
index ecb5798..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.falcon.resource.FeedInstanceResult;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Test for https://issues.apache.org/jira/browse/FALCON-761.
- */
-@Test(groups = "embedded", timeOut = 900000)
-public class FeedInstanceListingTest extends BaseTestClass{
-    private String baseTestDir = cleanAndGetTestDir();
-    private String aggregateWorkflowDir = baseTestDir + "/aggregator";
-    private String feedInputPath = baseTestDir + "/input" + MINUTE_DATE_PATTERN;
-    private String feedOutputPath = baseTestDir + "/output-data" + MINUTE_DATE_PATTERN;
-    private String processName;
-
-    private ColoHelper cluster = servers.get(0);
-    private FileSystem clusterFS = serverFS.get(0);
-    private OozieClient clusterOC = serverOC.get(0);
-
-    private static final Logger LOGGER = Logger.getLogger(FeedInstanceListingTest.class);
-
-    @BeforeMethod(alwaysRun = true)
-    public void setup() throws Exception {
-        bundles[0] = BundleUtil.readELBundle();
-        bundles[0] = new Bundle(bundles[0], cluster);
-        bundles[0].generateUniqueBundle(this);
-        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
-        bundles[0].setInputFeedDataPath(feedInputPath);
-        bundles[0].setInputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
-        bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
-        bundles[0].setOutputFeedLocationData(feedOutputPath);
-        bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes);
-        processName = bundles[0].getProcessName();
-        HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void tearDown() throws IOException{
-        cleanTestsDirs();
-        removeTestClassEntities();
-    }
-
-    /**
-     * Test when all data is available for all instances.
-     */
-    @Test
-    public void testFeedListingWhenAllAvailable() throws Exception {
-        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
-        bundles[0].setProcessConcurrency(1);
-        bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
-        List<List<String>> missingDependencies = OozieUtil.createMissingDependencies(cluster,
-                EntityType.PROCESS, processName, 0);
-        List<String> missingDependencyLastInstance = missingDependencies.get(missingDependencies.size()-1);
-        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE, missingDependencyLastInstance);
-        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
-                CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
-        FeedInstanceResult r = prism.getFeedHelper()
-                .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)),
-                        "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
-        validateResponse(r, 5, 0, 0, 0, 5);
-    }
-
-   /**
-    *Test when only empty directories exist for all instances.
-    */
-    @Test
-    public void testFeedListingWhenAllEmpty() throws Exception {
-        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
-        bundles[0].setProcessConcurrency(1);
-        bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
-        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
-        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
-                CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
-        FeedInstanceResult r = prism.getFeedHelper()
-                .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)),
-                        "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
-        validateResponse(r, 5, 0, 5, 0, 0);
-    }
-
-   /**
-    * Test when no data is present for any instance.
-    */
-    @Test
-    public void testFeedListingWhenAllMissing() throws Exception {
-        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
-        bundles[0].setProcessConcurrency(1);
-        bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
-        FeedInstanceResult r = prism.getFeedHelper()
-                .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)),
-                        "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
-        validateResponse(r, 5, 5, 0, 0, 0);
-    }
-
-   /**
-    * Initially no availability flag is set for the feed. And data is created, so instance status is available.
-    * Then, set the availability flag and update the feed. The instance status should change to partial.
-    */
-    @Test
-    public void testFeedListingAfterFeedAvailabilityFlagUpdate() throws Exception {
-        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
-        bundles[0].setProcessConcurrency(1);
-        bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
-        List<List<String>> missingDependencies = OozieUtil.createMissingDependencies(cluster,
-                EntityType.PROCESS, processName, 0);
-        List<String> missingDependencyLastInstance = missingDependencies.get(missingDependencies.size()-1);
-        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE, missingDependencyLastInstance);
-        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
-                CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
-        FeedInstanceResult r = prism.getFeedHelper()
-                .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)),
-                        "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
-        validateResponse(r, 5, 0, 0, 0, 5);
-        String inputFeed = bundles[0].getInputFeedFromBundle();
-        bundles[0].setInputFeedAvailabilityFlag("_SUCCESS");
-        ServiceResponse serviceResponse = prism.getFeedHelper().update(inputFeed, bundles[0].getInputFeedFromBundle());
-        AssertUtil.assertSucceeded(serviceResponse);
-        //Since we have not created availability flag on HDFS, the feed instance status should be partial
-        r = prism.getFeedHelper()
-                .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)),
-                        "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
-        validateResponse(r, 5, 0, 0, 5, 0);
-    }
-
-   /**
-    * Data is created for the feed, so instance status is available.
-    * Then, change the data path and update the feed. The instance status should change to partial.
-    */
-    @Test
-    public void testFeedListingAfterFeedDataPathUpdate() throws Exception {
-        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
-        bundles[0].setProcessConcurrency(1);
-        bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
-        List<List<String>> missingDependencies = OozieUtil.createMissingDependencies(cluster,
-                EntityType.PROCESS, processName, 0);
-        List<String> missingDependencyLastInstance = missingDependencies.get(missingDependencies.size()-1);
-        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE, missingDependencyLastInstance);
-        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
-                CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
-        FeedInstanceResult r = prism.getFeedHelper()
-                .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)),
-                        "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
-        validateResponse(r, 5, 0, 0, 0, 5);
-        String inputFeed = bundles[0].getInputFeedFromBundle();
-        bundles[0].setInputFeedDataPath(baseTestDir + "/inputNew" + MINUTE_DATE_PATTERN);
-        ServiceResponse serviceResponse = prism.getFeedHelper().update(inputFeed, bundles[0].getInputFeedFromBundle());
-        AssertUtil.assertSucceeded(serviceResponse);
-        //Since we have not created directories for new path, the feed instance status should be missing
-        r = prism.getFeedHelper()
-                .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)),
-                        "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
-        validateResponse(r, 5, 5, 0, 0, 0);
-    }
-
-   /**
-    * Submit the feeds on prism, and request for instance status on server. Request should succeed.
-    */
-    @Test
-    public void testFeedListingFeedSubmitOnPrismRequestOnServer() throws Exception {
-        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
-        bundles[0].setProcessConcurrency(1);
-        bundles[0].submitFeedsScheduleProcess(prism);
-        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
-        FeedInstanceResult r = cluster.getFeedHelper()
-                .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)),
-                        "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
-        validateResponse(r, 5, 5, 0, 0, 0);
-    }
-
-    /**
-     * Checks that actual number of instances with different statuses are equal to expected number
-     * of instances with matching statuses.
-     *
-     * @param instancesResult kind of response from API which should contain information about
-     *                        instances <p/>
-     *                        All parameters below reflect number of expected instances with some
-     *                        kind of status.
-     * @param totalCount      total number of instances.
-     * @param missingCount    number of running instances.
-     * @param emptyCount  number of suspended instance.
-     * @param partialCount    number of waiting instance.
-     * @param availableCount     number of killed instance.
-     */
-    private void validateResponse(FeedInstanceResult instancesResult, int totalCount,
-                                 int missingCount, int emptyCount, int partialCount, int availableCount) {
-        FeedInstanceResult.Instance[] instances = instancesResult.getInstances();
-        LOGGER.info("instances: " + Arrays.toString(instances));
-        Assert.assertNotNull(instances, "instances should be not null");
-        Assert.assertEquals(instances.length, totalCount, "Total Instances");
-        List<String> statuses = new ArrayList<>();
-        for (FeedInstanceResult.Instance instance : instances) {
-            Assert.assertNotNull(instance.getCluster());
-            Assert.assertNotNull(instance.getInstance());
-            Assert.assertNotNull(instance.getStatus());
-            Assert.assertNotNull(instance.getUri());
-            Assert.assertNotNull(instance.getCreationTime());
-            Assert.assertNotNull(instance.getSize());
-            final String status = instance.getStatus();
-            LOGGER.info("status: "+ status + ", instance: " + instance.getInstance());
-            statuses.add(status);
-        }
-
-        Assert.assertEquals(Collections.frequency(statuses, "MISSING"),
-                missingCount, "Missing Instances");
-        Assert.assertEquals(Collections.frequency(statuses, "EMPTY"),
-                emptyCount, "Empty Instances");
-        Assert.assertEquals(Collections.frequency(statuses, "PARTIAL"),
-                partialCount, "Partial Instances");
-        Assert.assertEquals(Collections.frequency(statuses, "AVAILABLE"),
-                availableCount, "Available Instances");
-    }
-}