You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/03/01 09:25:52 UTC
[06/51] [partial] falcon git commit: FALCON-1830 Removed code source
directories and updated pom
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
deleted file mode 100644
index 514fd10..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.entity.v0.feed.ActionType;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-
-/**
- * Feed instance status tests.
- */
-@Test(groups = "embedded")
-public class FeedInstanceStatusTest extends BaseTestClass {
-
- private String baseTestDir = cleanAndGetTestDir();
- private String feedInputPath = baseTestDir + MINUTE_DATE_PATTERN;
- private String aggregateWorkflowDir = baseTestDir + "/aggregator";
-
- private ColoHelper cluster2 = servers.get(1);
- private ColoHelper cluster3 = servers.get(2);
- private FileSystem cluster2FS = serverFS.get(1);
- private FileSystem cluster3FS = serverFS.get(2);
- private static final Logger LOGGER = Logger.getLogger(FeedInstanceStatusTest.class);
-
- @BeforeClass(alwaysRun = true)
- public void uploadWorkflow() throws Exception {
- uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
- }
-
- @BeforeMethod(alwaysRun = true)
- public void setup() throws Exception {
- Bundle bundle = BundleUtil.readELBundle();
- for (int i = 0; i < 3; i++) {
- bundles[i] = new Bundle(bundle, servers.get(i));
- bundles[i].generateUniqueBundle(this);
- bundles[i].setProcessWorkflow(aggregateWorkflowDir);
- }
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- /**
- * Goes through the whole feed replication workflow checking its instances status while.
- * submitting feed, scheduling it, performing different combinations of actions like
- * -submit, -resume, -kill, -rerun.
- */
- @Test(groups = {"multiCluster"})
- public void feedInstanceStatusRunning() throws Exception {
- bundles[0].setInputFeedDataPath(feedInputPath);
-
- AssertUtil.assertSucceeded(prism.getClusterHelper()
- .submitEntity(bundles[0].getClusters().get(0)));
-
- AssertUtil.assertSucceeded(prism.getClusterHelper()
- .submitEntity(bundles[1].getClusters().get(0)));
-
- AssertUtil.assertSucceeded(prism.getClusterHelper()
- .submitEntity(bundles[2].getClusters().get(0)));
-
- String feed = bundles[0].getDataSets().get(0);
- String feedName = Util.readEntityName(feed);
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
- String startTime = TimeUtil.getTimeWrtSystemTime(-50);
- final String startPlus20Min = TimeUtil.addMinsToTime(startTime, 20);
- final String startPlus40Min = TimeUtil.addMinsToTime(startTime, 40);
- final String startPlus100Min = TimeUtil.addMinsToTime(startTime, 100);
-
- feed = FeedMerlin.fromString(feed)
- .addFeedCluster(new FeedMerlin.FeedClusterBuilder(
- Util.readEntityName(bundles[1].getClusters().get(0)))
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
- .withClusterType(ClusterType.SOURCE)
- .withPartition("US/${cluster.colo}")
- .build())
- .toString();
- feed = FeedMerlin.fromString(feed).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(startPlus20Min,
- TimeUtil.addMinsToTime(startTime, 85))
- .withClusterType(ClusterType.TARGET)
- .build())
- .toString();
- feed = FeedMerlin.fromString(feed).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(startPlus40Min,
- TimeUtil.addMinsToTime(startTime, 110))
- .withClusterType(ClusterType.SOURCE)
- .withPartition("UK/${cluster.colo}")
- .build())
- .toString();
-
- LOGGER.info("feed: " + Util.prettyPrintXml(feed));
-
- //status before submit
- prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus100Min
- + "&end=" + TimeUtil.addMinsToTime(startTime, 120));
-
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed));
- prism.getFeedHelper().getProcessInstanceStatus(feedName,
- "?start=" + startTime + "&end=" + startPlus100Min);
-
- AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed));
-
- // both replication instances
- prism.getFeedHelper().getProcessInstanceStatus(feedName,
- "?start=" + startTime + "&end=" + startPlus100Min);
-
- // single instance at -30
- prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus20Min);
-
- //single at -10
- prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min);
-
- //single at 10
- prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min);
-
- //single at 30
- prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min);
-
- String postFix = "/US/" + cluster2.getClusterHelper().getColoName();
- String prefix = bundles[0].getFeedDataPathPrefix();
- HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
- HadoopUtil.lateDataReplenish(cluster2FS, 80, 20, prefix, postFix);
-
- postFix = "/UK/" + cluster3.getClusterHelper().getColoName();
- prefix = bundles[0].getFeedDataPathPrefix();
- HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS);
- HadoopUtil.lateDataReplenish(cluster3FS, 80, 20, prefix, postFix);
-
- // both replication instances
- prism.getFeedHelper().getProcessInstanceStatus(feedName,
- "?start=" + startTime + "&end=" + startPlus100Min);
-
- // single instance at -30
- prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus20Min);
-
- //single at -10
- prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min);
-
- //single at 10
- prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min);
-
- //single at 30
- prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min);
-
- LOGGER.info("Wait till feed goes into running ");
-
- //suspend instances -10
- prism.getFeedHelper().getProcessInstanceSuspend(feedName, "?start=" + startPlus40Min);
- prism.getFeedHelper().getProcessInstanceStatus(feedName,
- "?start=" + startPlus20Min + "&end=" + startPlus40Min);
-
- //resuspend -10 and suspend -30 source specific
- prism.getFeedHelper().getProcessInstanceSuspend(feedName,
- "?start=" + startPlus20Min + "&end=" + startPlus40Min);
- prism.getFeedHelper().getProcessInstanceStatus(feedName,
- "?start=" + startPlus20Min + "&end=" + startPlus40Min);
-
- //resume -10 and -30
- prism.getFeedHelper().getProcessInstanceResume(feedName,
- "?start=" + startPlus20Min + "&end=" + startPlus40Min);
- prism.getFeedHelper().getProcessInstanceStatus(feedName,
- "?start=" + startPlus20Min + "&end=" + startPlus40Min);
-
- //get running instances
- prism.getFeedHelper().getRunningInstance(feedName);
-
- //rerun succeeded instance
- prism.getFeedHelper().getProcessInstanceRerun(feedName, "?start=" + startTime);
- prism.getFeedHelper().getProcessInstanceStatus(feedName,
- "?start=" + startTime + "&end=" + startPlus20Min);
-
- //kill instance
- prism.getFeedHelper().getProcessInstanceKill(feedName,
- "?start=" + TimeUtil.addMinsToTime(startTime, 44));
- prism.getFeedHelper().getProcessInstanceKill(feedName, "?start=" + startTime);
-
- //end time should be less than end of validity i.e startTime + 110
- prism.getFeedHelper().getProcessInstanceStatus(feedName,
- "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 110));
-
- //rerun killed instance
- prism.getFeedHelper().getProcessInstanceRerun(feedName, "?start=" + startTime);
- prism.getFeedHelper().getProcessInstanceStatus(feedName,
- "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 110));
-
- //kill feed
- prism.getFeedHelper().delete(feed);
- InstancesResult responseInstance = prism.getFeedHelper().getProcessInstanceStatus(feedName,
- "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 110));
-
- LOGGER.info(responseInstance.getMessage());
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
deleted file mode 100644
index 5bb5e6e..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.ActionType;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.OozieClientException;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBException;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.List;
-
-/**
- * This test submits and schedules feed and then check for replication.
- * On adding further late data it checks whether the data has been replicated correctly in the given late cut-off time.
- * Assuming that late frequency set in server is 3 minutes. Although value can be changed according to requirement.
- */
-@Test(groups = "embedded")
-public class FeedLateRerunTest extends BaseTestClass {
-
- private ColoHelper cluster1 = servers.get(0);
- private ColoHelper cluster2 = servers.get(1);
- private FileSystem cluster1FS = serverFS.get(0);
- private FileSystem cluster2FS = serverFS.get(1);
- private OozieClient cluster2OC = serverOC.get(1);
- private String baseTestDir = cleanAndGetTestDir();
- private String feedDataLocation = baseTestDir + "/source" + MINUTE_DATE_PATTERN;
- private String targetPath = baseTestDir + "/target";
- private String targetDataLocation = targetPath + MINUTE_DATE_PATTERN;
- private static final Logger LOGGER = Logger.getLogger(FeedLateRerunTest.class);
- private String source = null;
- private String target = null;
-
- @BeforeMethod(alwaysRun = true)
- public void setUp() throws JAXBException, IOException {
- Bundle bundle = BundleUtil.readFeedReplicationBundle();
- bundles[0] = new Bundle(bundle, cluster1);
- bundles[1] = new Bundle(bundle, cluster2);
- bundles[0].generateUniqueBundle(this);
- bundles[1].generateUniqueBundle(this);
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- @Test(dataProvider = "dataFlagProvider")
- public void testLateRerun(boolean dataFlag)
- throws URISyntaxException, AuthenticationException, InterruptedException, IOException,
- OozieClientException, JAXBException {
- Bundle.submitCluster(bundles[0], bundles[1]);
- String startTime = TimeUtil.getTimeWrtSystemTime(0);
- String endTime = TimeUtil.addMinsToTime(startTime, 30);
- LOGGER.info("Time range between : " + startTime + " and " + endTime);
-
- //configure feed
- FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
- feed.setFilePath(feedDataLocation);
- //erase all clusters from feed definition
- feed.clearFeedClusters();
- //set cluster1 as source
- feed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.SOURCE)
- .build());
- //set cluster2 as target
- feed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.TARGET)
- .withDataLocation(targetDataLocation)
- .build());
- String entityName = feed.getName();
-
- //submit and schedule feed
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
-
- //check if coordinator exists
- InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, entityName, "REPLICATION"), 1);
-
- //Finding bundleId of replicated instance on target
- String bundleId = OozieUtil.getLatestBundleID(cluster2OC, entityName, EntityType.FEED);
-
- //Finding and creating missing dependencies
- List<String> missingDependencies = getAndCreateDependencies(
- cluster1FS, cluster1.getPrefix(), cluster2OC, bundleId, dataFlag, entityName);
- int count = 1;
- for (String location : missingDependencies) {
- if (count==1) {
- source = location;
- count++;
- }
- }
- source=splitPathFromIp(source, "8020");
- LOGGER.info("source : " + source);
- target = source.replace("source", "target");
- LOGGER.info("target : " + target);
- /* Sleep for some time ( as is defined in runtime property of server ).
- Let the instance rerun and then it should succeed.*/
- int sleepMins = 8;
- for(int i=0; i < sleepMins; i++) {
- LOGGER.info("Waiting...");
- TimeUtil.sleepSeconds(60);
- }
- String bundleID = OozieUtil.getLatestBundleID(cluster2OC, entityName, EntityType.FEED);
- OozieUtil.validateRetryAttempts(cluster2OC, bundleID, EntityType.FEED, 1);
-
- //check if data has been replicated correctly
- List<Path> cluster1ReplicatedData = HadoopUtil
- .getAllFilesRecursivelyHDFS(cluster1FS, new Path(HadoopUtil.cutProtocol(source)));
- List<Path> cluster2ReplicatedData = HadoopUtil
- .getAllFilesRecursivelyHDFS(cluster2FS, new Path(HadoopUtil.cutProtocol(target)));
- AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
- }
-
- private String splitPathFromIp(String src, String port) {
- String reqSrc, tempSrc = "";
- if (src.contains(":")) {
- String[] tempPath = src.split(":");
- for (String aTempPath : tempPath) {
- if (aTempPath.startsWith(port)) {
- tempSrc = aTempPath;
- }
- }
- }
- if (tempSrc.isEmpty()) {
- reqSrc = src;
- } else {
- reqSrc=tempSrc.replace(port, "");
- }
- return reqSrc;
- }
-
- /* prismHelper1 - source colo, prismHelper2 - target colo */
- private List<String> getAndCreateDependencies(FileSystem sourceFS, String prefix, OozieClient targetOC,
- String bundleId, boolean dataFlag, String entityName) throws OozieClientException, IOException {
- List<String> missingDependencies = OozieUtil.getMissingDependencies(targetOC, bundleId);
- for (int i = 0; i < 10 && missingDependencies == null; ++i) {
- TimeUtil.sleepSeconds(30);
- LOGGER.info("sleeping...");
- missingDependencies = OozieUtil.getMissingDependencies(targetOC, bundleId);
- }
- Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
- //print missing dependencies
- for (String dependency : missingDependencies) {
- LOGGER.info("dependency from job: " + dependency);
- }
- // Creating missing dependencies
- HadoopUtil.createFolders(sourceFS, prefix, missingDependencies);
- //Adding data to empty folders depending on dataFlag
- if (dataFlag) {
- int tempCount = 1;
- for (String location : missingDependencies) {
- if (tempCount==1) {
- LOGGER.info("Transferring data to : " + location);
- HadoopUtil.copyDataToFolder(sourceFS, location, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml"));
- tempCount++;
- }
- }
- }
- //replication should start, wait while it ends
- InstanceUtil.waitTillInstanceReachState(targetOC, entityName, 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
- // Adding data for late rerun
- int tempCounter = 1;
- for (String dependency : missingDependencies) {
- if (tempCounter==1) {
- LOGGER.info("Transferring late data to : " + dependency);
- HadoopUtil.copyDataToFolder(sourceFS, dependency,
- OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.properties"));
- }
- tempCounter++;
- }
- return missingDependencies;
- }
-
- @DataProvider(name = "dataFlagProvider")
- private Object[][] dataFlagProvider() {
- return new Object[][] {
- new Object[] {true, },
- new Object[] {false, },
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
deleted file mode 100644
index a936aa1..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
+++ /dev/null
@@ -1,581 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.ActionType;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.supportClasses.ExecResult;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.OozieClient;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBException;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * feed replication test.
- * Replicates empty directories as well as directories containing data.
- */
-@Test(groups = "embedded")
-public class FeedReplicationTest extends BaseTestClass {
-
- private ColoHelper cluster1 = servers.get(0);
- private ColoHelper cluster2 = servers.get(1);
- private ColoHelper cluster3 = servers.get(2);
- private FileSystem cluster1FS = serverFS.get(0);
- private FileSystem cluster2FS = serverFS.get(1);
- private FileSystem cluster3FS = serverFS.get(2);
- private OozieClient cluster2OC = serverOC.get(1);
- private OozieClient cluster3OC = serverOC.get(2);
- private String baseTestDir = cleanAndGetTestDir();
- private String sourcePath = baseTestDir + "/source";
- private String feedDataLocation = baseTestDir + "/source" + MINUTE_DATE_PATTERN;
- private String targetPath = baseTestDir + "/target";
- private String targetDataLocation = targetPath + MINUTE_DATE_PATTERN;
- private static final Logger LOGGER = Logger.getLogger(FeedReplicationTest.class);
-
- @BeforeMethod(alwaysRun = true)
- public void setUp() throws JAXBException, IOException {
- Bundle bundle = BundleUtil.readFeedReplicationBundle();
-
- bundles[0] = new Bundle(bundle, cluster1);
- bundles[1] = new Bundle(bundle, cluster2);
- bundles[2] = new Bundle(bundle, cluster3);
-
- bundles[0].generateUniqueBundle(this);
- bundles[1].generateUniqueBundle(this);
- bundles[2].generateUniqueBundle(this);
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() throws IOException {
- removeTestClassEntities();
- cleanTestsDirs();
- }
-
- /**
- * Test demonstrates replication of stored data from one source cluster to one target cluster.
- * It checks the lifecycle of replication workflow instance including its creation. When
- * replication ends test checks if data was replicated correctly.
- * Also checks for presence of _SUCCESS file in target directory.
- */
- @Test(dataProvider = "dataFlagProvider")
- public void replicate1Source1Target(boolean dataFlag)
- throws Exception {
- Bundle.submitCluster(bundles[0], bundles[1]);
- String startTime = TimeUtil.getTimeWrtSystemTime(0);
- String endTime = TimeUtil.addMinsToTime(startTime, 5);
- LOGGER.info("Time range between : " + startTime + " and " + endTime);
-
- //configure feed
- FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
- feed.setFilePath(feedDataLocation);
- //erase all clusters from feed definition
- feed.clearFeedClusters();
- //set cluster1 as source
- feed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.SOURCE)
- .build());
- //set cluster2 as target
- feed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.TARGET)
- .withDataLocation(targetDataLocation)
- .build());
- feed.withProperty("job.counter", "true");
-
- //submit and schedule feed
- LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
-
- //upload necessary data
- DateTime date = new DateTime(startTime, DateTimeZone.UTC);
- DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'");
- String timePattern = fmt.print(date);
- String sourceLocation = sourcePath + "/" + timePattern + "/";
- String targetLocation = targetPath + "/" + timePattern + "/";
- HadoopUtil.recreateDir(cluster1FS, sourceLocation);
-
- Path toSource = new Path(sourceLocation);
- Path toTarget = new Path(targetLocation);
- if (dataFlag) {
- HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
- OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml"));
- HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
- OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile1.txt"));
- }
-
- //check if coordinator exists
- InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1);
-
- //replication should start, wait while it ends
- InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed.toString()), 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
-
- //check if data has been replicated correctly
- List<Path> cluster1ReplicatedData = HadoopUtil
- .getAllFilesRecursivelyHDFS(cluster1FS, toSource);
- List<Path> cluster2ReplicatedData = HadoopUtil
- .getAllFilesRecursivelyHDFS(cluster2FS, toTarget);
-
- AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
-
- //_SUCCESS does not exist in source
- Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster1FS, toSource, ""), false);
-
- //_SUCCESS should exist in target
- Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, ""), true);
-
- AssertUtil.assertLogMoverPath(true, Util.readEntityName(feed.toString()),
- cluster2FS, "feed", "Success logs are not present");
-
- ExecResult execResult = cluster1.getFeedHelper().getCLIMetrics(feed.getName());
- AssertUtil.assertCLIMetrics(execResult, feed.getName(), 1, dataFlag);
- }
-
- /**
- * Test demonstrates replication of stored data from one source cluster to two target clusters.
- * It checks the lifecycle of replication workflow instances including their creation on both
- * targets. When replication ends test checks if data was replicated correctly.
- * Also checks for presence of _SUCCESS file in target directory.
- */
- @Test(dataProvider = "dataFlagProvider")
- public void replicate1Source2Targets(boolean dataFlag) throws Exception {
- Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
- String startTime = TimeUtil.getTimeWrtSystemTime(0);
- String endTime = TimeUtil.addMinsToTime(startTime, 5);
- LOGGER.info("Time range between : " + startTime + " and " + endTime);
-
- //configure feed
- FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
- feed.setFilePath(feedDataLocation);
- //erase all clusters from feed definition
- feed.clearFeedClusters();
- //set cluster1 as source
- feed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.SOURCE)
- .build());
- //set cluster2 as target
- feed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.TARGET)
- .withDataLocation(targetDataLocation)
- .build());
- //set cluster3 as target
- feed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.TARGET)
- .withDataLocation(targetDataLocation)
- .build());
- feed.withProperty("job.counter", "true");
-
- //submit and schedule feed
- LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
-
- //upload necessary data
- DateTime date = new DateTime(startTime, DateTimeZone.UTC);
- DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'");
- String timePattern = fmt.print(date);
- String sourceLocation = sourcePath + "/" + timePattern + "/";
- String targetLocation = targetPath + "/" + timePattern + "/";
- HadoopUtil.recreateDir(cluster1FS, sourceLocation);
-
- Path toSource = new Path(sourceLocation);
- Path toTarget = new Path(targetLocation);
-
- if (dataFlag) {
- HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
- OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml"));
- HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
- OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile1.txt"));
- }
-
- //check if all coordinators exist
- InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0);
- InstanceUtil.waitTillInstancesAreCreated(cluster3OC, feed.toString(), 0);
-
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feed.getName(), "REPLICATION"), 1);
- //replication on cluster 2 should start, wait till it ends
- InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
-
- //replication on cluster 3 should start, wait till it ends
- InstanceUtil.waitTillInstanceReachState(cluster3OC, feed.getName(), 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
-
- //check if data has been replicated correctly
- List<Path> cluster1ReplicatedData = HadoopUtil
- .getAllFilesRecursivelyHDFS(cluster1FS, toSource);
- List<Path> cluster2ReplicatedData = HadoopUtil
- .getAllFilesRecursivelyHDFS(cluster2FS, toTarget);
- List<Path> cluster3ReplicatedData = HadoopUtil
- .getAllFilesRecursivelyHDFS(cluster3FS, toTarget);
-
- AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
- AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster3ReplicatedData);
-
- //_SUCCESS does not exist in source
- Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster1FS, toSource, ""), false);
-
- //_SUCCESS should exist in target
- Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, ""), true);
- Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster3FS, toTarget, ""), true);
-
- AssertUtil.assertLogMoverPath(true, Util.readEntityName(feed.toString()),
- cluster2FS, "feed", "Success logs are not present");
-
- ExecResult execResult = cluster1.getFeedHelper().getCLIMetrics(feed.getName());
- AssertUtil.assertCLIMetrics(execResult, feed.getName(), 1, dataFlag);
- }
-
- /**
- * Test demonstrates how replication depends on availability flag. Scenario includes one
- * source and one target cluster. When feed is submitted and scheduled and data is available,
- * feed still waits for availability flag (file which name is defined as availability flag in
- * feed definition). As soon as mentioned file is got uploaded in data directory,
- * replication starts and when it ends test checks if data was replicated correctly.
- * Also checks for presence of availability flag in target directory.
- */
- @Test(dataProvider = "dataFlagProvider")
- public void availabilityFlagTest(boolean dataFlag) throws Exception {
- //replicate1Source1Target scenario + set availability flag but don't upload required file
- Bundle.submitCluster(bundles[0], bundles[1]);
- String startTime = TimeUtil.getTimeWrtSystemTime(0);
- String endTime = TimeUtil.addMinsToTime(startTime, 5);
- LOGGER.info("Time range between : " + startTime + " and " + endTime);
-
- //configure feed
- String availabilityFlagName = "availabilityFlag.txt";
- String feedName = Util.readEntityName(bundles[0].getDataSets().get(0));
- FeedMerlin feedElement = bundles[0].getFeedElement(feedName);
- feedElement.setAvailabilityFlag(availabilityFlagName);
- bundles[0].writeFeedElement(feedElement, feedName);
- FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
- feed.setFilePath(feedDataLocation);
- //erase all clusters from feed definition
- feed.clearFeedClusters();
- //set cluster1 as source
- feed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.SOURCE)
- .build());
- //set cluster2 as target
- feed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.TARGET)
- .withDataLocation(targetDataLocation)
- .build());
- feed.withProperty("job.counter", "true");
-
- //submit and schedule feed
- LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
-
- //upload necessary data
- DateTime date = new DateTime(startTime, DateTimeZone.UTC);
- DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'");
- String timePattern = fmt.print(date);
- String sourceLocation = sourcePath + "/" + timePattern + "/";
- String targetLocation = targetPath + "/" + timePattern + "/";
- HadoopUtil.recreateDir(cluster1FS, sourceLocation);
-
- Path toSource = new Path(sourceLocation);
- Path toTarget = new Path(targetLocation);
- if (dataFlag) {
- HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
- OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml"));
- HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
- OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile1.txt"));
- }
-
- //check while instance is got created
- InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0);
-
- //check if coordinator exists
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 1);
-
- //replication should not start even after time
- TimeUtil.sleepSeconds(60);
- InstancesResult r = prism.getFeedHelper().getProcessInstanceStatus(feedName,
- "?start=" + startTime + "&end=" + endTime);
- InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0);
- LOGGER.info("Replication didn't start.");
-
- //create availability flag on source
- HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.concat(OSUtil.RESOURCES, availabilityFlagName));
-
- //check if instance become running
- InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
- CoordinatorAction.Status.RUNNING, EntityType.FEED);
-
- //wait till instance succeed
- InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
-
- //check if data was replicated correctly
- List<Path> cluster1ReplicatedData = HadoopUtil
- .getAllFilesRecursivelyHDFS(cluster1FS, toSource);
- LOGGER.info("Data on source cluster: " + cluster1ReplicatedData);
- List<Path> cluster2ReplicatedData = HadoopUtil
- .getAllFilesRecursivelyHDFS(cluster2FS, toTarget);
- LOGGER.info("Data on target cluster: " + cluster2ReplicatedData);
- AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
-
- //availabilityFlag exists in source
- Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster1FS, toSource, availabilityFlagName), true);
-
- //availabilityFlag should exist in target
- Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, availabilityFlagName), true);
-
- AssertUtil.assertLogMoverPath(true, Util.readEntityName(feed.toString()),
- cluster2FS, "feed", "Success logs are not present");
-
- ExecResult execResult = cluster1.getFeedHelper().getCLIMetrics(feed.getName());
- AssertUtil.assertCLIMetrics(execResult, feed.getName(), 1, dataFlag);
- }
-
- /**
- * Test for https://issues.apache.org/jira/browse/FALCON-668.
- * Check that new DistCp options are allowed.
- */
- @Test
- public void testNewDistCpOptions() throws Exception {
- Bundle.submitCluster(bundles[0], bundles[1]);
- String startTime = TimeUtil.getTimeWrtSystemTime(0);
- String endTime = TimeUtil.addMinsToTime(startTime, 5);
- LOGGER.info("Time range between : " + startTime + " and " + endTime);
- //configure feed
- String feedName = Util.readEntityName(bundles[0].getDataSets().get(0));
- FeedMerlin feedElement = bundles[0].getFeedElement(feedName);
- bundles[0].writeFeedElement(feedElement, feedName);
- FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
- feed.setFilePath(feedDataLocation);
- //erase all clusters from feed definition
- feed.clearFeedClusters();
- //set cluster1 as source
- feed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.SOURCE)
- .build());
- //set cluster2 as target
- feed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.TARGET)
- .withDataLocation(targetDataLocation)
- .build());
- feed.withProperty("job.counter", "true");
-
- //add custom properties to feed
- HashMap<String, String> propMap = new HashMap<>();
- propMap.put("overwrite", "true");
- propMap.put("ignoreErrors", "false");
- propMap.put("skipChecksum", "false");
- propMap.put("removeDeletedFiles", "true");
- propMap.put("preserveBlockSize", "true");
- propMap.put("preserveReplicationNumber", "true");
- propMap.put("preservePermission", "true");
- for (Map.Entry<String, String> entry : propMap.entrySet()) {
- feed.withProperty(entry.getKey(), entry.getValue());
- }
- //add custom property which shouldn't be passed to workflow
- HashMap<String, String> unsupportedPropMap = new HashMap<>();
- unsupportedPropMap.put("myCustomProperty", "true");
- feed.withProperty("myCustomProperty", "true");
-
- //upload necessary data to source
- DateTime date = new DateTime(startTime, DateTimeZone.UTC);
- DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'");
- String timePattern = fmt.print(date);
- String sourceLocation = sourcePath + "/" + timePattern + "/";
- HadoopUtil.recreateDir(cluster1FS, sourceLocation);
- HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml"));
- HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile1.txt"));
-
- //copy 2 files to target to check if they will be deleted because of removeDeletedFiles property
- String targetLocation = targetPath + "/" + timePattern + "/";
- cluster2FS.copyFromLocalFile(new Path(OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile3.txt")),
- new Path(targetLocation + "dataFile3.txt"));
-
- //submit and schedule feed
- LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
-
- //check while instance is got created
- InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0);
-
- //check if coordinator exists and replication starts
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1);
- InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
- CoordinatorAction.Status.RUNNING, EntityType.FEED);
-
- //check that properties were passed to workflow definition
- String bundleId = OozieUtil.getLatestBundleID(cluster2OC, feedName, EntityType.FEED);
- String coordId = OozieUtil.getReplicationCoordID(bundleId, cluster2.getFeedHelper()).get(0);
- CoordinatorAction coordinatorAction = cluster2OC.getCoordJobInfo(coordId).getActions().get(0);
- String wfDefinition = cluster2OC.getJobDefinition(coordinatorAction.getExternalId());
- LOGGER.info(String.format("Definition of coordinator job action %s : \n %s \n",
- coordinatorAction.getExternalId(), Util.prettyPrintXml(wfDefinition)));
- Assert.assertTrue(OozieUtil.propsArePresentInWorkflow(wfDefinition, "replication", propMap),
- "New distCp supported properties should be passed to replication args list.");
- Assert.assertFalse(OozieUtil.propsArePresentInWorkflow(wfDefinition, "replication", unsupportedPropMap),
- "Unsupported properties shouldn't be passed to replication args list.");
-
- //check that replication succeeds
- InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
-
- List<Path> finalFiles = HadoopUtil.getAllFilesRecursivelyHDFS(cluster2FS, new Path(targetPath));
- Assert.assertEquals(finalFiles.size(), 2, "Only replicated files should be present on target "
- + "because of 'removeDeletedFiles' distCp property.");
-
- ExecResult execResult = cluster1.getFeedHelper().getCLIMetrics(feed.getName());
- AssertUtil.assertCLIMetrics(execResult, feed.getName(), 1, true);
- }
-
- /**
- * Test demonstrates failure pf replication of stored data from one source cluster to one target cluster.
- * When replication job fails test checks if failed logs are present in staging directory or not.
- */
- @Test
- public void replicate1Source1TargetFail()
- throws Exception {
- Bundle.submitCluster(bundles[0], bundles[1]);
- String startTime = TimeUtil.getTimeWrtSystemTime(0);
- String endTime = TimeUtil.addMinsToTime(startTime, 5);
- LOGGER.info("Time range between : " + startTime + " and " + endTime);
-
- //configure feed
- FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
- feed.setFilePath(feedDataLocation);
- //erase all clusters from feed definition
- feed.clearFeedClusters();
- //set cluster1 as source
- feed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.SOURCE)
- .build());
- //set cluster2 as target
- feed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.TARGET)
- .withDataLocation(targetDataLocation)
- .build());
-
- //submit and schedule feed
- LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
-
- //upload necessary data
- DateTime date = new DateTime(startTime, DateTimeZone.UTC);
- DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'");
- String timePattern = fmt.print(date);
- String sourceLocation = sourcePath + "/" + timePattern + "/";
- String targetLocation = targetPath + "/" + timePattern + "/";
- HadoopUtil.recreateDir(cluster1FS, sourceLocation);
-
- Path toSource = new Path(sourceLocation);
- Path toTarget = new Path(targetLocation);
- HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml"));
- HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile1.txt"));
-
- //check if coordinator exists
- InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1);
-
- //check if instance become running
- InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
- CoordinatorAction.Status.RUNNING, EntityType.FEED);
-
- HadoopUtil.deleteDirIfExists(sourceLocation, cluster1FS);
-
- //check if instance became killed
- InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
- CoordinatorAction.Status.KILLED, EntityType.FEED);
-
- AssertUtil.assertLogMoverPath(false, Util.readEntityName(feed.toString()),
- cluster2FS, "feed", "Success logs are not present");
- }
-
- /* Flag value denotes whether to add data for replication or not.
- * flag=true : add data for replication.
- * flag=false : let empty directories be replicated.
- */
- @DataProvider(name = "dataFlagProvider")
- private Object[][] dataFlagProvider() {
- return new Object[][] {
- new Object[] {true, },
- new Object[] {false, },
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java
deleted file mode 100644
index ec117d7..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-/**
- * Feed resume tests.
- */
-@Test(groups = "embedded")
-public class FeedResumeTest extends BaseTestClass {
-
- private final AbstractEntityHelper feedHelper = prism.getFeedHelper();
- private String feed;
- private ColoHelper cluster = servers.get(0);
- private OozieClient clusterOC = serverOC.get(0);
-
- @BeforeMethod(alwaysRun = true)
- public void setup() throws Exception {
- bundles[0] = BundleUtil.readELBundle();
- bundles[0].generateUniqueBundle(this);
- bundles[0] = new Bundle(bundles[0], cluster);
- bundles[0].submitClusters(prism);
- feed = bundles[0].getInputFeedFromBundle();
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- /**
- * Launches feed, suspends it and then resumes and checks if it got running.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void resumeSuspendedFeed() throws Exception {
- AssertUtil.assertSucceeded(feedHelper.submitAndSchedule(feed));
- AssertUtil.assertSucceeded(feedHelper.suspend(feed));
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
- AssertUtil.assertSucceeded(feedHelper.resume(feed));
- ServiceResponse response = feedHelper.getStatus(feed);
- String colo = feedHelper.getColo();
- Assert.assertTrue(response.getMessage().contains(colo + "/RUNNING"));
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
- }
-
-
- /**
- * Tries to resume feed that wasn't submitted and scheduled. Attempt should fail.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void resumeNonExistentFeed() throws Exception {
- AssertUtil.assertFailed(feedHelper.resume(feed));
- }
-
- /**
- * Tries to resume deleted feed. Attempt should fail.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void resumeDeletedFeed() throws Exception {
- AssertUtil.assertSucceeded(feedHelper.submitAndSchedule(feed));
- AssertUtil.assertSucceeded(feedHelper.delete(feed));
- AssertUtil.assertFailed(feedHelper.resume(feed));
- }
-
- /**
- * Tries to resume scheduled feed which wasn't suspended. Feed status shouldn't change.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void resumeScheduledFeed() throws Exception {
- AssertUtil.assertSucceeded(feedHelper.submitAndSchedule(feed));
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
- AssertUtil.assertSucceeded(feedHelper.resume(feed));
- ServiceResponse response = feedHelper.getStatus(feed);
- String colo = feedHelper.getColo();
- Assert.assertTrue(response.getMessage().contains(colo + "/RUNNING"));
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSLATest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSLATest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSLATest.java
deleted file mode 100644
index 28ddbd7..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSLATest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.ActionType;
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-/**
- * Feed SLA tests.
- */
-@Test(groups = "embedded")
-public class FeedSLATest extends BaseTestClass {
-
- private ColoHelper cluster = servers.get(0);
- private String baseTestDir = cleanAndGetTestDir();
- private String feedInputPath = baseTestDir + "/input" + MINUTE_DATE_PATTERN;
- private static final Logger LOGGER = Logger.getLogger(FeedSLATest.class);
-
- private FeedMerlin feedMerlin;
- private String startTime;
- private String endTime;
-
- @BeforeMethod(alwaysRun = true)
- public void setup() throws Exception {
- Bundle bundle = BundleUtil.readELBundle();
- bundles[0] = new Bundle(bundle, cluster);
- bundles[0].generateUniqueBundle(this);
- bundles[0].setInputFeedDataPath(feedInputPath);
-
- startTime = TimeUtil.getTimeWrtSystemTime(0);
- endTime = TimeUtil.addMinsToTime(startTime, 120);
- LOGGER.info("Time range between : " + startTime + " and " + endTime);
- ServiceResponse response =
- prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0));
- AssertUtil.assertSucceeded(response);
-
- feedMerlin = new FeedMerlin(bundles[0].getInputFeedFromBundle());
- feedMerlin.setFrequency(new Frequency("1", Frequency.TimeUnit.hours));
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- /**
- * Submit feed with correctly adjusted sla. Response should reflect success.
- *
- */
-
- @Test
- public void submitValidFeedSLA() throws Exception {
-
- feedMerlin.clearFeedClusters();
- feedMerlin.addFeedCluster(new FeedMerlin.FeedClusterBuilder(
- Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .build());
-
- //set slaLow and slaHigh
- feedMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours), new Frequency("6", Frequency.TimeUnit.hours));
-
- final ServiceResponse serviceResponse =
- prism.getFeedHelper().submitEntity(feedMerlin.toString());
- AssertUtil.assertSucceeded(serviceResponse);
- }
-
- /**
- * Submit feed with slaHigh greater than feed retention. Response should reflect failure.
- *
- */
-
- @Test
- public void submitFeedWithSLAHigherThanRetention() throws Exception {
-
- feedMerlin.clearFeedClusters();
- feedMerlin.addFeedCluster(new FeedMerlin.FeedClusterBuilder(
- Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention((new Frequency("2", Frequency.TimeUnit.hours)).toString(), ActionType.DELETE)
- .withValidity(startTime, endTime)
- .build());
-
- //set slaLow and slaHigh
- feedMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours), new Frequency("6", Frequency.TimeUnit.hours));
-
- final ServiceResponse serviceResponse =
- prism.getFeedHelper().submitEntity(feedMerlin.toString());
- String message = "Feed's retention limit: "
- + feedMerlin.getClusters().getClusters().get(0).getRetention().getLimit()
- + " of referenced cluster " + bundles[0].getClusterNames().get(0)
- + " should be more than feed's late arrival cut-off period: "
- + feedMerlin.getSla().getSlaHigh().getTimeUnit()
- + "(" + feedMerlin.getSla().getSlaHigh().getFrequency() + ")"
- + " for feed: " + bundles[0].getInputFeedNameFromBundle();
- validate(serviceResponse, message);
- }
-
-
- /**
- * Submit feed with slaHigh less than slaLow. Response should reflect failure.
- *
- */
- @Test
- public void submitFeedWithSLAHighLowerthanSLALow() throws Exception {
-
- feedMerlin.clearFeedClusters();
- feedMerlin.addFeedCluster(new FeedMerlin.FeedClusterBuilder(
- Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention((new Frequency("6", Frequency.TimeUnit.hours)).toString(), ActionType.DELETE)
- .withValidity(startTime, endTime)
- .build());
-
- //set slaLow and slaHigh
- feedMerlin.setSla(new Frequency("4", Frequency.TimeUnit.hours), new Frequency("2", Frequency.TimeUnit.hours));
-
- final ServiceResponse serviceResponse =
- prism.getFeedHelper().submitEntity(feedMerlin.toString());
- String message = "slaLow of Feed: " + feedMerlin.getSla().getSlaLow().getTimeUnit() + "("
- + feedMerlin.getSla().getSlaLow().getFrequency() + ")is greater than slaHigh: "
- + feedMerlin.getSla().getSlaHigh().getTimeUnit() + "(" + feedMerlin.getSla().getSlaHigh().getFrequency()
- + ") for cluster: " + bundles[0].getClusterNames().get(0);
- validate(serviceResponse, message);
- }
-
- /**
- * Submit feed with slaHigh and slaLow greater than feed retention. Response should reflect failure.
- *
- */
- @Test
- public void submitFeedWithSLAHighSLALowHigherThanRetention() throws Exception {
-
- feedMerlin.clearFeedClusters();
- feedMerlin.addFeedCluster(new FeedMerlin.FeedClusterBuilder(
- Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention((new Frequency("4", Frequency.TimeUnit.hours)).toString(), ActionType.DELETE)
- .withValidity(startTime, endTime)
- .build());
-
- //set slaLow and slaHigh
- feedMerlin.setSla(new Frequency("5", Frequency.TimeUnit.hours), new Frequency("6", Frequency.TimeUnit.hours));
-
- final ServiceResponse serviceResponse =
- prism.getFeedHelper().submitEntity(feedMerlin.toString());
- String message = "Feed's retention limit: "
- + feedMerlin.getClusters().getClusters().get(0).getRetention().getLimit()
- + " of referenced cluster " + bundles[0].getClusterNames().get(0)
- + " should be more than feed's late arrival cut-off period: "
- + feedMerlin.getSla().getSlaHigh().getTimeUnit() +"(" + feedMerlin.getSla().getSlaHigh().getFrequency()
- + ")" + " for feed: " + bundles[0].getInputFeedNameFromBundle();
- validate(serviceResponse, message);
- }
-
- /**
- * Submit feed with slaHigh and slaLow having equal value. Response should reflect success.
- *
- */
- @Test
- public void submitFeedWithSameSLAHighSLALow() throws Exception {
-
- feedMerlin.clearFeedClusters();
- feedMerlin.addFeedCluster(new FeedMerlin.FeedClusterBuilder(
- Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention((new Frequency("7", Frequency.TimeUnit.hours)).toString(), ActionType.DELETE)
- .withValidity(startTime, endTime)
- .build());
-
- //set slaLow and slaHigh
- feedMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours), new Frequency("3", Frequency.TimeUnit.hours));
-
- final ServiceResponse serviceResponse =
- prism.getFeedHelper().submitEntity(feedMerlin.toString());
- AssertUtil.assertSucceeded(serviceResponse);
- }
-
- private void validate(ServiceResponse response, String message) throws Exception {
- AssertUtil.assertFailed(response);
- LOGGER.info("Expected message is : " + message);
- Assert.assertTrue(response.getMessage().contains(message),
- "Correct response was not present in feed schedule. Feed response is : "
- + response.getMessage());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java
deleted file mode 100644
index 79b722a..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.client.OozieClient;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-/**
- * Feed schedule tests.
- */
-@Test(groups = "embedded")
-public class FeedScheduleTest extends BaseTestClass {
-
- private ColoHelper cluster = servers.get(0);
- private OozieClient clusterOC = serverOC.get(0);
- private String feed;
-
- @BeforeMethod(alwaysRun = true)
- public void setUp() throws Exception {
- bundles[0] = BundleUtil.readELBundle();
- bundles[0] = new Bundle(bundles[0], cluster);
- bundles[0].generateUniqueBundle(this);
- Bundle.submitCluster(bundles[0]);
- feed = bundles[0].getInputFeedFromBundle();
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- /**
- * Tries to schedule already scheduled feed. Request should be considered as correct.
- * Feed status shouldn't change.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void scheduleAlreadyScheduledFeed() throws Exception {
- ServiceResponse response = prism.getFeedHelper().submitEntity(feed);
- AssertUtil.assertSucceeded(response);
-
- response = prism.getFeedHelper().schedule(feed);
- AssertUtil.assertSucceeded(response);
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
-
- //now try re-scheduling again
- response = prism.getFeedHelper().schedule(feed);
- AssertUtil.assertSucceeded(response);
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
- }
-
- /**
- * Schedule correct feed. Feed should got running.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void scheduleValidFeed() throws Exception {
- //submit feed
- ServiceResponse response = prism.getFeedHelper().submitEntity(feed);
- AssertUtil.assertSucceeded(response);
-
- //now schedule the thing
- response = prism.getFeedHelper().schedule(feed);
- AssertUtil.assertSucceeded(response);
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
- }
-
- /**
- * Tries to schedule already scheduled and suspended feed. Suspended status shouldn't change.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void scheduleSuspendedFeed() throws Exception {
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
-
- //now suspend
- AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(feed));
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
- //now schedule this!
- AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed));
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
- }
-
- /**
- * Schedules and deletes feed. Tries to schedule it. Request should fail.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void scheduleKilledFeed() throws Exception {
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
-
- //now suspend
- AssertUtil.assertSucceeded(prism.getFeedHelper().delete(feed));
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.KILLED);
- //now schedule this!
- AssertUtil.assertFailed(prism.getFeedHelper().schedule(feed));
- }
-
- /**
- * Tries to schedule feed which wasn't submitted. Request should fail.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void scheduleNonExistentFeed() throws Exception {
- AssertUtil.assertFailed(prism.getFeedHelper().schedule(feed));
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedStatusTest.java
deleted file mode 100644
index d5e8696..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedStatusTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-/**
- * Feed status tests. Checks getStatus functionality.
- */
-@Test(groups = "embedded")
-public class FeedStatusTest extends BaseTestClass {
-
- private ColoHelper cluster = servers.get(0);
- private OozieClient clusterOC = serverOC.get(0);
- private String feed;
- private static final Logger LOGGER = Logger.getLogger(FeedStatusTest.class);
-
-
- @BeforeMethod(alwaysRun = true)
- public void setUp() throws Exception {
- bundles[0] = BundleUtil.readELBundle();
- bundles[0].generateUniqueBundle(this);
- bundles[0] = new Bundle(bundles[0], cluster);
-
- //submit the cluster
- ServiceResponse response =
- prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0));
- AssertUtil.assertSucceeded(response);
- feed = bundles[0].getInputFeedFromBundle();
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- /**
- * Schedules feed. Queries a feed status and checks the response
- * correctness and a feed status correspondence.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void getStatusForScheduledFeed() throws Exception {
- ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed);
- LOGGER.info("Feed: " + Util.prettyPrintXml(feed));
- AssertUtil.assertSucceeded(response);
-
- response = prism.getFeedHelper().getStatus(feed);
-
- AssertUtil.assertSucceeded(response);
-
- String colo = prism.getFeedHelper().getColo();
- Assert.assertTrue(response.getMessage().contains(colo + "/RUNNING"));
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
- }
-
- /**
- * Schedules and suspends feed. Queries a feed status and checks the response
- * correctness and a feed status correspondence.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void getStatusForSuspendedFeed() throws Exception {
- ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed);
-
- AssertUtil.assertSucceeded(response);
-
- response = prism.getFeedHelper().suspend(feed);
- AssertUtil.assertSucceeded(response);
-
- response = prism.getFeedHelper().getStatus(feed);
-
- AssertUtil.assertSucceeded(response);
- String colo = prism.getFeedHelper().getColo();
- Assert.assertTrue(response.getMessage().contains(colo + "/SUSPENDED"));
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
- }
-
- /**
- * Submits feed. Queries a feed status and checks the response
- * correctness and a feed status correspondence.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void getStatusForSubmittedFeed() throws Exception {
- ServiceResponse response = prism.getFeedHelper().submitEntity(feed);
-
- AssertUtil.assertSucceeded(response);
-
- response = prism.getFeedHelper().getStatus(feed);
-
- AssertUtil.assertSucceeded(response);
- String colo = prism.getFeedHelper().getColo();
- Assert.assertTrue(response.getMessage().contains(colo + "/SUBMITTED"));
- AssertUtil.checkNotStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
- }
-
- /**
- * Removes feed. Queries a feed status. Checks that the response correctness.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void getStatusForDeletedFeed() throws Exception {
- ServiceResponse response = prism.getFeedHelper().submitEntity(feed);
- AssertUtil.assertSucceeded(response);
-
- response = prism.getFeedHelper().delete(feed);
- AssertUtil.assertSucceeded(response);
-
- response = prism.getFeedHelper().getStatus(feed);
- AssertUtil.assertFailed(response);
-
- Assert.assertTrue(
- response.getMessage().contains(Util.readEntityName(feed) + " (FEED) not found"));
- AssertUtil.checkNotStatus(clusterOC, EntityType.FEED, feed, Job.Status.KILLED);
- }
-
- /**
- * Queries a status of feed which wasn't submitted and checks the response.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void getStatusForNonExistentFeed() throws Exception {
- ServiceResponse response = prism.getFeedHelper().getStatus(feed);
- AssertUtil.assertFailed(response);
- Assert.assertTrue(
- response.getMessage().contains(Util.readEntityName(feed) + " (FEED) not found"));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
deleted file mode 100644
index f7bf0f8..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.client.OozieClient;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBException;
-import java.io.IOException;
-import java.net.URISyntaxException;
-
-/**
- * Feed submit and schedule tests.
- */
-@Test(groups = "embedded")
-public class FeedSubmitAndScheduleTest extends BaseTestClass {
-
- private ColoHelper cluster = servers.get(0);
- private OozieClient clusterOC = serverOC.get(0);
- private String feed;
-
- @BeforeMethod(alwaysRun = true)
- public void setUp() throws Exception {
- bundles[0] = BundleUtil.readELBundle();
- bundles[0] = new Bundle(bundles[0], cluster);
- bundles[0].generateUniqueBundle(this);
- feed = bundles[0].getDataSets().get(0);
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- //remove entities which belong to both default and different user
- removeTestClassEntities(null, MerlinConstants.DIFFERENT_USER_NAME);
- }
-
- @Test(groups = {"singleCluster"})
- public void snsNewFeed() throws Exception {
- submitFirstClusterScheduleFirstFeed();
- }
-
- /**
- * Submits and schedules feed with a cluster it depends on.
- *
- * @throws JAXBException
- * @throws IOException
- * @throws URISyntaxException
- * @throws AuthenticationException
- */
- private void submitFirstClusterScheduleFirstFeed()
- throws JAXBException, IOException, URISyntaxException, AuthenticationException,
- InterruptedException {
- AssertUtil.assertSucceeded(prism.getClusterHelper()
- .submitEntity(bundles[0].getClusters().get(0)));
- ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed);
- AssertUtil.assertSucceeded(response);
- }
-
- /**
- * Submits and schedules a feed and then tries to do the same on it. Checks that status
- * hasn't changed and response is successful.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void snsExistingFeed() throws Exception {
- submitFirstClusterScheduleFirstFeed();
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
-
- //get created bundle id
- String bundleId = OozieUtil.getLatestBundleID(clusterOC, Util.readEntityName(feed), EntityType.FEED);
-
- //try to submit and schedule the same process again
- ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed);
- AssertUtil.assertSucceeded(response);
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
-
- //check that new bundle wasn't created
- OozieUtil.verifyNewBundleCreation(clusterOC, bundleId, null, feed, false, false);
- }
-
- /**
- * Try to submit and schedule feed without submitting cluster it depends on.
- * Request should fail.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void snsFeedWithoutCluster() throws Exception {
- ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed);
- AssertUtil.assertFailed(response);
- }
-
- /**
- * Submits and schedules feed. Removes it. Submitted and schedules removed feed.
- * Checks response and status of feed.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void snsDeletedFeed() throws Exception {
- submitFirstClusterScheduleFirstFeed();
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
- AssertUtil.assertSucceeded(prism.getFeedHelper().delete(feed));
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.KILLED);
- ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed);
- AssertUtil.assertSucceeded(response);
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
- }
-
- /**
- * Suspends feed, submit and schedules it. Checks that response is successful,
- * feed status hasn't changed.
- *
- * @throws Exception
- */
- @Test(groups = {"singleCluster"})
- public void snsSuspendedFeed() throws Exception {
- submitFirstClusterScheduleFirstFeed();
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
- AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(feed));
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
- ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed);
- AssertUtil.assertSucceeded(response);
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
- }
-
- /**
- * Test for https://issues.apache.org/jira/browse/FALCON-1647.
- * Create cluster entity as user1. Submit and schedule feed entity feed1 in this cluster as user1.
- * Now try to submit and schedule a feed entity feed2 in this cluster as user2.
- */
- @Test
- public void snsDiffFeedDiffUserSameCluster()
- throws URISyntaxException, AuthenticationException, InterruptedException, IOException, JAXBException {
- bundles[0].submitClusters(prism);
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
- FeedMerlin feedMerlin = FeedMerlin.fromString(feed);
- feedMerlin.setName(feedMerlin.getName() + "-2");
- feedMerlin.setACL(MerlinConstants.DIFFERENT_USER_NAME, MerlinConstants.DIFFERENT_USER_GROUP, "*");
- ServiceResponse response = prism.getFeedHelper().submitAndSchedule(
- feedMerlin.toString(), MerlinConstants.DIFFERENT_USER_NAME, null);
- AssertUtil.assertSucceeded(response);
- }
-}