You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/03/01 09:25:53 UTC
[07/51] [partial] falcon git commit: FALCON-1830 Removed code source
directories and updated pom
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
deleted file mode 100644
index a0922cb..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.TestNGException;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBException;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.text.DecimalFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
-
-
-/**
- * EL Validations tests.
- */
-@Test(groups = "embedded")
-public class ELValidationsTest extends BaseTestClass {
-
- private ColoHelper cluster = servers.get(0);
- private static final Logger LOGGER = Logger.getLogger(ELValidationsTest.class);
- private String aggregateWorkflowDir = cleanAndGetTestDir() + "/aggregator";
-
-
- @Test(groups = {"0.1", "0.2"})
- public void startInstBeforeFeedStartToday02() throws Exception {
- String response =
- testWith("2009-02-02T20:00Z", "2011-12-31T00:00Z", "2009-02-02T20:00Z",
- "2011-12-31T00:00Z", "now(-40,0)", "currentYear(20,30,24,20)", false);
- validate(response);
- }
-
- @Test(groups = {"singleCluster"})
- public void startInstAfterFeedEnd() throws Exception {
- String response = testWith(null, null, null, null,
- "currentYear(10,0,22,0)", "now(4,20)", false);
- validate(response);
- }
-
- @Test(groups = {"singleCluster"})
- public void bothInstReverse() throws Exception {
- String response = testWith(null, null, null, null,
- "now(0,0)", "now(-100,0)", false);
- validate(response);
- }
-
- @Test(groups = {"singleCluster"}, dataProvider = "EL-DP")
- public void expressionLanguageTest(String startInstance, String endInstance) throws Exception {
- testWith(null, null, null, null, startInstance, endInstance, true);
- }
-
- @DataProvider(name = "EL-DP")
- public Object[][] getELData() {
- return new Object[][]{
- {"now(-3,0)", "now(4,20)"},
- {"yesterday(22,0)", "now(4,20)"},
- {"currentMonth(0,22,0)", "now(4,20)"},
- {"lastMonth(30,22,0)", "now(4,20)"},
- {"currentYear(0,0,22,0)", "currentYear(1,1,22,0)"},
- {"currentMonth(0,22,0)", "currentMonth(1,22,20)"},
- {"lastMonth(30,22,0)", "lastMonth(60,2,40)"},
- {"lastYear(12,0,22,0)", "lastYear(13,1,22,0)"},
- };
- }
-
- private void validate(String response) {
- if ((response.contains("End instance ") || response.contains("Start instance"))
- && (response.contains("for feed") || response.contains("of feed"))
- && (response.contains("is before the start of feed")
- || response.contains("is after the end of feed"))) {
- return;
- }
- if (response.contains("End instance")
- && response.contains("is before the start instance")) {
- return;
- }
- Assert.fail("Response is not valid");
- }
-
- private String testWith(String feedStart,
- String feedEnd, String processStart,
- String processEnd,
- String startInstance, String endInstance, boolean isMatch)
- throws IOException, JAXBException, ParseException, URISyntaxException {
- HadoopUtil.uploadDir(cluster.getClusterHelper().getHadoopFS(),
- aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
- Bundle bundle = BundleUtil.readELBundle();
- bundle = new Bundle(bundle, cluster.getPrefix());
- bundle.generateUniqueBundle(this);
- bundle.setProcessWorkflow(aggregateWorkflowDir);
- if (feedStart != null && feedEnd != null) {
- bundle.setFeedValidity(feedStart, feedEnd, bundle.getInputFeedNameFromBundle());
- }
- if (processStart != null && processEnd != null) {
- bundle.setProcessValidity(processStart, processEnd);
- }
- try {
- bundle.setInvalidData();
- bundle.setDatasetInstances(startInstance, endInstance);
- String submitResponse = bundle.submitFeedsScheduleProcess(prism).getMessage();
- LOGGER.info("processData in try is: " + Util.prettyPrintXml(bundle.getProcessData()));
- TimeUtil.sleepSeconds(45);
- if (isMatch) {
- getAndMatchDependencies(serverOC.get(0), bundle);
- }
- return submitResponse;
- } catch (Exception e) {
- e.printStackTrace();
- throw new TestNGException(e);
- } finally {
- LOGGER.info("deleting entity:");
- bundle.deleteBundle(prism);
- }
- }
-
- private void getAndMatchDependencies(OozieClient oozieClient, Bundle bundle) {
- try {
- List<String> bundles = null;
- for (int i = 0; i < 10; ++i) {
- bundles = OozieUtil.getBundles(oozieClient, bundle.getProcessName(), EntityType.PROCESS);
- if (bundles.size() > 0) {
- break;
- }
- TimeUtil.sleepSeconds(30);
- }
- Assert.assertTrue(bundles != null && bundles.size() > 0, "Bundle job not created.");
- String coordID = bundles.get(0);
- LOGGER.info("coord id: " + coordID);
- List<String> missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID);
- for (int i = 0; i < 10 && missingDependencies == null; ++i) {
- TimeUtil.sleepSeconds(30);
- missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID);
- }
- Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
- for (String dependency : missingDependencies) {
- LOGGER.info("dependency from job: " + dependency);
- }
- Date jobNominalTime = OozieUtil.getNominalTime(oozieClient, coordID);
- Calendar time = Calendar.getInstance();
- time.setTime(jobNominalTime);
- LOGGER.info("nominalTime:" + jobNominalTime);
- SimpleDateFormat df = new SimpleDateFormat("dd MMM yyyy HH:mm:ss");
- LOGGER.info(
- "nominalTime in GMT string: " + df.format(jobNominalTime.getTime()) + " GMT");
- TimeZone z = time.getTimeZone();
- int offset = z.getRawOffset();
- int offsetHrs = offset / 1000 / 60 / 60;
- int offsetMins = offset / 1000 / 60 % 60;
-
- LOGGER.info("offset: " + offsetHrs);
- LOGGER.info("offset: " + offsetMins);
-
- time.add(Calendar.HOUR_OF_DAY, (-offsetHrs));
- time.add(Calendar.MINUTE, (-offsetMins));
-
- LOGGER.info("GMT Time: " + time.getTime());
-
- int frequency = bundle.getInitialDatasetFrequency();
- List<String> qaDependencyList =
- getQADepedencyList(time, bundle.getStartInstanceProcess(time),
- bundle.getEndInstanceProcess(time),
- frequency, bundle);
- for (String qaDependency : qaDependencyList) {
- LOGGER.info("qa qaDependencyList: " + qaDependency);
- }
-
- Assert.assertTrue(matchDependencies(missingDependencies, qaDependencyList));
- } catch (Exception e) {
- e.printStackTrace();
- throw new TestNGException(e);
- }
- }
-
- private boolean matchDependencies(List<String> fromJob, List<String> qaList) {
- if (fromJob.size() != qaList.size()) {
- return false;
- }
- Collections.sort(fromJob);
- Collections.sort(qaList);
- for (int index = 0; index < fromJob.size(); index++) {
- if (!fromJob.get(index).contains(qaList.get(index))) {
- return false;
- }
- }
- return true;
- }
-
- private List<String> getQADepedencyList(Calendar nominalTime, Date startRef,
- Date endRef, int frequency, Bundle bundle) {
- LOGGER.info("start ref:" + startRef);
- LOGGER.info("end ref:" + endRef);
- Calendar initialTime = Calendar.getInstance();
- initialTime.setTime(startRef);
- Calendar finalTime = Calendar.getInstance();
-
- finalTime.setTime(endRef);
- String path = bundle.getDatasetPath();
-
- TimeZone tz = TimeZone.getTimeZone("GMT");
- nominalTime.setTimeZone(tz);
- LOGGER.info("nominalTime: " + initialTime.getTime());
- LOGGER.info("finalTime: " + finalTime.getTime());
- List<String> returnList = new ArrayList<>();
- while (initialTime.getTime().before(finalTime.getTime())) {
- LOGGER.info("initialTime: " + initialTime.getTime());
- returnList.add(getPath(path, initialTime));
- initialTime.add(Calendar.MINUTE, frequency);
- }
- returnList.add(getPath(path, initialTime));
- Collections.reverse(returnList);
- return returnList;
- }
-
- private String getPath(String path, Calendar time) {
- if (path.contains("${YEAR}")) {
- path = path.replaceAll("\\$\\{YEAR\\}", Integer.toString(time.get(Calendar.YEAR)));
- }
- if (path.contains("${MONTH}")) {
- path = path.replaceAll("\\$\\{MONTH\\}", intToString(time.get(Calendar.MONTH) + 1, 2));
- }
- if (path.contains("${DAY}")) {
- path = path.replaceAll("\\$\\{DAY\\}", intToString(time.get(Calendar.DAY_OF_MONTH), 2));
- }
- if (path.contains("${HOUR}")) {
- path = path.replaceAll("\\$\\{HOUR\\}", intToString(time.get(Calendar.HOUR_OF_DAY), 2));
- }
- if (path.contains("${MINUTE}")) {
- path = path.replaceAll("\\$\\{MINUTE\\}", intToString(time.get(Calendar.MINUTE), 2));
- }
- return path;
- }
-
- private String intToString(int num, int digits) {
- assert digits > 0 : "Invalid number of digits";
-
- // create variable length array of zeros
- char[] zeros = new char[digits];
- Arrays.fill(zeros, '0');
-
- // format number as String
- DecimalFormat df = new DecimalFormat(String.valueOf(zeros));
- return df.format(num);
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
deleted file mode 100644
index c49c381..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.falcon.regression.Entities.ProcessMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.falcon.resource.InstancesResult.WorkflowStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.List;
-
-/**
- * Embedded pig script test.
- */
-@Test(groups = "embedded")
-public class EmbeddedPigScriptTest extends BaseTestClass {
-
- private ColoHelper cluster = servers.get(0);
- private FileSystem clusterFS = serverFS.get(0);
- private OozieClient clusterOC = serverOC.get(0);
- private String pigTestDir = cleanAndGetTestDir();
- private String pigScriptDir = pigTestDir + "/pig";
- private String pigScriptLocation = pigScriptDir + "/id.pig";
- private String inputPath = pigTestDir + "/input" + MINUTE_DATE_PATTERN;
- private static final Logger LOGGER = Logger.getLogger(EmbeddedPigScriptTest.class);
- private static final double TIMEOUT = 15;
- private String processName;
- private String process;
-
- @BeforeClass(alwaysRun = true)
- public void createTestData() throws Exception {
- LOGGER.info("in @BeforeClass");
-
- //copy pig script
- HadoopUtil.uploadDir(clusterFS, pigScriptDir, OSUtil.concat(OSUtil.RESOURCES, "pig"));
- Bundle bundle = BundleUtil.readELBundle();
- bundle.generateUniqueBundle(this);
- bundle = new Bundle(bundle, cluster);
- String startDate = "2010-01-02T00:40Z";
- String endDate = "2010-01-02T01:10Z";
- bundle.setInputFeedDataPath(inputPath);
- List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
- HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT,
- bundle.getFeedDataPathPrefix(), dataDates);
- }
-
- @BeforeMethod(alwaysRun = true)
- public void setUp() throws Exception {
- bundles[0] = BundleUtil.readELBundle();
- bundles[0] = new Bundle(bundles[0], cluster);
- bundles[0].generateUniqueBundle(this);
- bundles[0].setInputFeedDataPath(inputPath);
- bundles[0].setOutputFeedLocationData(pigTestDir + "/output-data" + MINUTE_DATE_PATTERN);
- bundles[0].setProcessWorkflow(pigScriptLocation);
- bundles[0].setProcessInputNames("INPUT");
- bundles[0].setProcessOutputNames("OUTPUT");
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:10Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
-
- final ProcessMerlin processElement = bundles[0].getProcessObject();
- processElement.clearProperties().withProperty("queueName", "default");
- processElement.getWorkflow().setEngine(EngineType.PIG);
- bundles[0].setProcessData(processElement.toString());
- bundles[0].submitFeedsScheduleProcess(prism);
- process = bundles[0].getProcessData();
- processName = Util.readEntityName(process);
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- @Test(groups = {"singleCluster"}, timeOut = 600000)
- public void getResumedProcessInstance() throws Exception {
- AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
- prism.getProcessHelper().suspend(process);
- TimeUtil.sleepSeconds(TIMEOUT);
- ServiceResponse status = prism.getProcessHelper().getStatus(process);
- Assert.assertTrue(status.getMessage().contains("SUSPENDED"), "Process not suspended.");
- prism.getProcessHelper().resume(process);
- TimeUtil.sleepSeconds(TIMEOUT);
- AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
- InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
- InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
- }
-
- @Test(groups = {"singleCluster"}, timeOut = 600000)
- public void getSuspendedProcessInstance() throws Exception {
- prism.getProcessHelper().suspend(process);
- TimeUtil.sleepSeconds(TIMEOUT);
- AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.SUSPENDED);
- InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
- InstanceUtil.validateSuccessWOInstances(r);
- }
-
- @Test(groups = {"singleCluster"}, timeOut = 600000)
- public void getRunningProcessInstance() throws Exception {
- AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
- TimeUtil.sleepSeconds(TIMEOUT);
- InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
- InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
- }
-
- @Test(groups = {"singleCluster"}, timeOut = 600000)
- public void getKilledProcessInstance() throws Exception {
- prism.getProcessHelper().delete(process);
- TimeUtil.sleepSeconds(TIMEOUT);
- InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
- InstanceUtil.validateError(r, ResponseErrors.PROCESS_NOT_FOUND);
- }
-
- @Test(groups = {"singleCluster"}, timeOut = 6000000)
- public void getSucceededProcessInstance() throws Exception {
- AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
- InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
- InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
- int counter = OSUtil.IS_WINDOWS ? 100 : 50;
- OozieUtil.waitForBundleToReachState(clusterOC, bundles[0].getProcessName(), Job.Status.SUCCEEDED, counter);
- r = prism.getProcessHelper().getRunningInstance(processName);
- InstanceUtil.validateSuccessWOInstances(r);
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
deleted file mode 100644
index 728b797..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.ActionType;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.MatrixUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.OozieClient;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBException;
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-
-/**
- * Tests for operations with external file systems.
- */
-@Test(groups = "embedded")
-public class ExternalFSTest extends BaseTestClass{
-
- public static final String WASB_END_POINT =
- "wasb://" + MerlinConstants.WASB_CONTAINER + "@" + MerlinConstants.WASB_ACCOUNT;
- private ColoHelper cluster = servers.get(0);
- private FileSystem clusterFS = serverFS.get(0);
- private OozieClient clusterOC = serverOC.get(0);
- private FileSystem wasbFS;
- private Bundle externalBundle;
-
- private String baseTestDir = cleanAndGetTestDir();
- private String sourcePath = baseTestDir + "/source";
- private String baseWasbDir = "/falcon-regression/" + UUID.randomUUID().toString().split("-")[0];
- private String testWasbTargetDir = baseWasbDir + '/'
- + UUID.randomUUID().toString().split("-")[0] + '/';
-
- private static final Logger LOGGER = Logger.getLogger(ExternalFSTest.class);
-
- @BeforeClass
- public void setUpClass() throws IOException {
- HadoopUtil.recreateDir(clusterFS, baseTestDir);
- Configuration conf = new Configuration();
- conf.set("fs.defaultFS", WASB_END_POINT);
- conf.set("fs.azure.account.key." + MerlinConstants.WASB_ACCOUNT,
- MerlinConstants.WASB_SECRET);
- conf.setBoolean("fs.hdfs.impl.disable.cache", false);
- wasbFS = FileSystem.get(conf);
- LOGGER.info("creating base wasb dir" + baseWasbDir);
- }
-
- @BeforeMethod(alwaysRun = true)
- public void setUp() throws JAXBException, IOException {
- Bundle bundle = BundleUtil.readFeedReplicationBundle();
-
- bundles[0] = new Bundle(bundle, cluster);
- externalBundle = new Bundle(bundle, cluster);
-
- bundles[0].generateUniqueBundle(this);
- externalBundle.generateUniqueBundle(this);
-
- LOGGER.info("checking wasb credentials with location: " + testWasbTargetDir);
- wasbFS.create(new Path(testWasbTargetDir));
- wasbFS.delete(new Path(testWasbTargetDir), true);
- }
-
- @AfterMethod
- public void tearDown() throws IOException {
- removeTestClassEntities();
- wasbFS.delete(new Path(testWasbTargetDir), true);
- }
-
- @AfterClass(alwaysRun = true)
- public void tearDownClass() throws IOException {
- wasbFS.delete(new Path(baseWasbDir), true);
- }
-
-
- @Test(dataProvider = "getInvalidTargets")
- public void invalidCredentialsExtFS(String endpoint) throws Exception {
- bundles[0].setClusterInterface(Interfacetype.READONLY, endpoint);
- bundles[0].setClusterInterface(Interfacetype.WRITE, endpoint);
-
- AssertUtil.assertFailed(prism.getClusterHelper()
- .submitEntity(bundles[0].getClusterElement().toString()));
-
- }
-
- @Test(dataProvider = "getData")
- public void replicateToExternalFS(final FileSystem externalFS,
- final String separator, final boolean withData) throws Exception {
- final String endpoint = externalFS.getUri().toString();
- Bundle.submitCluster(bundles[0], externalBundle);
- String startTime = TimeUtil.getTimeWrtSystemTime(0);
- String endTime = TimeUtil.addMinsToTime(startTime, 5);
- LOGGER.info("Time range between : " + startTime + " and " + endTime);
- String datePattern = StringUtils .join(
- new String[]{"${YEAR}", "${MONTH}", "${DAY}", "${HOUR}", "${MINUTE}"}, separator);
-
- //configure feed
- FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
- String targetDataLocation = endpoint + testWasbTargetDir + datePattern;
- feed.setFilePath(sourcePath + '/' + datePattern);
- //erase all clusters from feed definition
- feed.clearFeedClusters();
- //set local cluster as source
- feed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.SOURCE)
- .build());
- //set externalFS cluster as target
- feed.addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(Util.readEntityName(externalBundle.getClusters().get(0)))
- .withRetention("days(1000000)", ActionType.DELETE)
- .withValidity(startTime, endTime)
- .withClusterType(ClusterType.TARGET)
- .withDataLocation(targetDataLocation)
- .build());
-
- //submit and schedule feed
- LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
- datePattern = StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH", "mm"}, separator);
- //upload necessary data
- DateTime date = new DateTime(startTime, DateTimeZone.UTC);
- DateTimeFormatter fmt = DateTimeFormat.forPattern(datePattern);
- String timePattern = fmt.print(date);
- HadoopUtil.recreateDir(clusterFS, sourcePath + '/' + timePattern);
- if (withData) {
- HadoopUtil.copyDataToFolder(clusterFS, sourcePath + '/' + timePattern, OSUtil.SINGLE_FILE);
- }
-
- Path srcPath = new Path(sourcePath + '/' + timePattern);
- Path dstPath = new Path(endpoint + testWasbTargetDir + '/' + timePattern);
-
- //check if coordinator exists
- TimeUtil.sleepSeconds(10);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, feed.toString(), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(clusterOC, feed.getName(), "REPLICATION"), 1);
-
- //replication should start, wait while it ends
- InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(feed.toString()), 1,
- CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
-
- //check if data has been replicated correctly
- List<Path> cluster1ReplicatedData =
- HadoopUtil.getAllFilesRecursivelyHDFS(clusterFS, srcPath);
- List<Path> cluster2ReplicatedData =
- HadoopUtil.getAllFilesRecursivelyHDFS(externalFS, dstPath);
- AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
- final ContentSummary srcSummary = clusterFS.getContentSummary(srcPath);
- final ContentSummary dstSummary = externalFS.getContentSummary(dstPath);
- Assert.assertEquals(dstSummary.getLength(), srcSummary.getLength());
- }
-
-
-
- @DataProvider
- public Object[][] getData() {
- //"-" for single directory, "/" - for dir with subdirs };
- return MatrixUtil.crossProduct(new FileSystem[]{wasbFS},
- new String[]{"/", "-"},
- new Boolean[]{true, false});
- }
-
- @DataProvider
- public Object[][] getInvalidTargets() {
- return new Object[][]{{"wasb://invalid@invalid.blob.core.windows.net/"}};
- }
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java
deleted file mode 100644
index feb0cc1..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java
+++ /dev/null
@@ -1,678 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.regression.Entities.FeedMerlin;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.entity.v0.feed.ActionType;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.core.util.XmlUtil;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-/**
- * Feed cluster update tests.
- */
-@Test(groups = "distributed")
-public class FeedClusterUpdateTest extends BaseTestClass {
-
- private String baseTestDir = cleanAndGetTestDir();
- private String aggregateWorkflowDir = baseTestDir + "/aggregator";
- private ColoHelper cluster1 = servers.get(0);
- private ColoHelper cluster2 = servers.get(1);
- private ColoHelper cluster3 = servers.get(2);
- private OozieClient cluster1OC = serverOC.get(0);
- private OozieClient cluster2OC = serverOC.get(1);
- private OozieClient cluster3OC = serverOC.get(2);
- private FileSystem cluster2FS = serverFS.get(1);
- private FileSystem cluster3FS = serverFS.get(2);
- private String feed;
- private String feedName;
- private String startTime;
- private String feedOriginalSubmit;
- private String feedUpdated;
- private String cluster1Name;
- private String cluster2Name;
- private String cluster3Name;
- private static final Logger LOGGER = Logger.getLogger(FeedClusterUpdateTest.class);
-
-
- @BeforeClass(alwaysRun = true)
- public void createTestData() throws Exception {
- uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
- Bundle bundle = BundleUtil.readELBundle();
- for (int i = 0; i < 3; i++) {
- bundles[i] = new Bundle(bundle, servers.get(i));
- bundles[i].generateUniqueBundle(this);
- bundles[i].setProcessWorkflow(aggregateWorkflowDir);
- }
- try {
- String postFix = "/US/" + servers.get(1).getClusterHelper().getColoName();
- HadoopUtil.deleteDirIfExists(baseTestDir, cluster2FS);
- HadoopUtil.lateDataReplenish(cluster2FS, 80, 1, baseTestDir, postFix);
- postFix = "/UK/" + servers.get(2).getClusterHelper().getColoName();
- HadoopUtil.deleteDirIfExists(baseTestDir, cluster3FS);
- HadoopUtil.lateDataReplenish(cluster3FS, 80, 1, baseTestDir, postFix);
- } finally {
- removeTestClassEntities();
- }
- }
-
- @BeforeMethod(alwaysRun = true)
- public void setup() throws Exception {
- Bundle bundle = BundleUtil.readELBundle();
- for (int i = 0; i < 3; i++) {
- bundles[i] = new Bundle(bundle, servers.get(i));
- bundles[i].generateUniqueBundle(this);
- bundles[i].setProcessWorkflow(aggregateWorkflowDir);
- }
- BundleUtil.submitAllClusters(prism, bundles[0], bundles[1], bundles[2]);
- feed = bundles[0].getDataSets().get(0);
- feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
- startTime = TimeUtil.getTimeWrtSystemTime(-50);
- feedName = Util.readEntityName(feed);
- cluster1Name = Util.readEntityName(bundles[0].getClusters().get(0));
- cluster2Name = Util.readEntityName(bundles[1].getClusters().get(0));
- cluster3Name = Util.readEntityName(bundles[2].getClusters().get(0));
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- removeTestClassEntities();
- }
-
- @Test(enabled = true, groups = {"multiCluster"})
- public void addSourceCluster() throws Exception {
- //add one source and one target , schedule only on source
- feedOriginalSubmit = FeedMerlin.fromString(feed)
- .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
- .withClusterType(ClusterType.SOURCE)
- .build())
- .toString();
- feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster1Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 20),
- TimeUtil.addMinsToTime(startTime, 85))
- .withClusterType(ClusterType.TARGET)
- .build())
- .toString();
-
- LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
- ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit);
- TimeUtil.sleepSeconds(10);
- AssertUtil.assertSucceeded(response);
-
- //schedule on source
- response = cluster2.getFeedHelper().schedule(feedOriginalSubmit);
- TimeUtil.sleepSeconds(20);
- AssertUtil.assertSucceeded(response);
-
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 0);
-
- //prepare updated Feed
- feedUpdated = FeedMerlin.fromString(feed)
- .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
- .withClusterType(ClusterType.SOURCE)
- .withPartition("UK/${cluster.colo}")
- .build())
- .toString();
- feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster1Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 20),
- TimeUtil.addMinsToTime(startTime, 85))
- .withClusterType(ClusterType.TARGET)
- .build())
- .toString();
- feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster3Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 40),
- TimeUtil.addMinsToTime(startTime, 110))
- .withClusterType(ClusterType.SOURCE)
- .withPartition("UK/${cluster.colo}")
- .build())
- .toString();
-
- response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
- TimeUtil.sleepSeconds(20);
- AssertUtil.assertSucceeded(response);
-
- prism.getFeedHelper().submitAndSchedule(feedUpdated);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 2);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1);
- }
-
- @Test(enabled = true, groups = {"multiCluster"})
- public void addTargetCluster() throws Exception {
- //add one source and one target , schedule only on source
- feedOriginalSubmit = FeedMerlin.fromString(feed)
- .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
- .withClusterType(ClusterType.SOURCE)
- .build())
- .toString();
- feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster3Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 40),
- TimeUtil.addMinsToTime(startTime, 110))
- .withClusterType(ClusterType.SOURCE)
- .withPartition("UK/${cluster.colo}")
- .build())
- .toString();
-
- LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
- ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit);
- TimeUtil.sleepSeconds(10);
- AssertUtil.assertSucceeded(response);
-
- //schedule on source
- response = cluster2.getFeedHelper().schedule(feedOriginalSubmit);
- TimeUtil.sleepSeconds(20);
- AssertUtil.assertSucceeded(response);
-
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 0);
-
- //prepare updated Feed
- feedUpdated = FeedMerlin.fromString(feed)
- .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
- .withClusterType(ClusterType.SOURCE)
- .withPartition("US/${cluster.colo}")
- .build())
- .toString();
- feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster1Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 20),
- TimeUtil.addMinsToTime(startTime, 85))
- .withClusterType(ClusterType.TARGET)
- .build())
- .toString();
- feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster3Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 40),
- TimeUtil.addMinsToTime(startTime, 110))
- .withClusterType(ClusterType.SOURCE)
- .withPartition("UK/${cluster.colo}")
- .build())
- .toString();
-
- LOGGER.info("Updated Feed: " + Util.prettyPrintXml(feedUpdated));
- response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
- TimeUtil.sleepSeconds(20);
- AssertUtil.assertSucceeded(response);
-
- prism.getFeedHelper().submitAndSchedule(feedUpdated);
-
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 2);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1);
- }
-
- @Test(enabled = true, groups = {"multiCluster"})
- public void add2SourceCluster() throws Exception {
- //add one source , schedule only on source
- feedOriginalSubmit = FeedMerlin.fromString(feed)
- .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
- .withClusterType(ClusterType.SOURCE)
- .build())
- .toString();
-
- LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
- ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit);
- TimeUtil.sleepSeconds(10);
- AssertUtil.assertSucceeded(response);
-
- //schedule on source
- response = cluster2.getFeedHelper().schedule(feedOriginalSubmit);
- TimeUtil.sleepSeconds(20);
- AssertUtil.assertSucceeded(response);
-
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 0);
-
- //prepare updated Feed
- feedUpdated = FeedMerlin.fromString(feed)
- .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
- .withClusterType(ClusterType.SOURCE)
- .withPartition("US/${cluster.colo}")
- .build())
- .toString();
- feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster1Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 20),
- TimeUtil.addMinsToTime(startTime, 85))
- .withClusterType(ClusterType.SOURCE)
- .build())
- .toString();
- feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster3Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 40),
- TimeUtil.addMinsToTime(startTime, 110))
- .withClusterType(ClusterType.SOURCE)
- .withPartition("UK/${cluster.colo}")
- .build())
- .toString();
-
- LOGGER.info("Updated Feed: " + Util.prettyPrintXml(feedUpdated));
- response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
- TimeUtil.sleepSeconds(20);
- AssertUtil.assertSucceeded(response);
-
- prism.getFeedHelper().submitAndSchedule(feedUpdated);
-
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 2);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1);
- }
-
- @Test(enabled = true, groups = {"multiCluster"})
- public void add2TargetCluster() throws Exception {
- //add one source and one target , schedule only on source
- feedOriginalSubmit = FeedMerlin.fromString(feed)
- .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
- .withClusterType(ClusterType.SOURCE)
- .build())
- .toString();
-
- LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
- ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit);
- TimeUtil.sleepSeconds(10);
- AssertUtil.assertSucceeded(response);
-
- //schedule on source
- response = cluster2.getFeedHelper().schedule(feedOriginalSubmit);
- TimeUtil.sleepSeconds(20);
- AssertUtil.assertSucceeded(response);
-
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 0);
-
- //prepare updated Feed
- feedUpdated = FeedMerlin.fromString(feed)
- .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
- .withClusterType(ClusterType.SOURCE)
- .build())
- .toString();
- feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster1Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 20),
- TimeUtil.addMinsToTime(startTime, 85))
- .withClusterType(ClusterType.TARGET)
- .build())
- .toString();
- feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster3Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 40),
- TimeUtil.addMinsToTime(startTime, 110))
- .withClusterType(ClusterType.TARGET)
- .build())
- .toString();
-
- LOGGER.info("Updated Feed: " + Util.prettyPrintXml(feedUpdated));
- response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
- TimeUtil.sleepSeconds(20);
- AssertUtil.assertSucceeded(response);
-
- prism.getFeedHelper().submitAndSchedule(feedUpdated);
-
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1);
- }
-
- @Test(enabled = true, groups = {"multiCluster"})
- public void add1Source1TargetCluster() throws Exception {
- //add one source and one target , schedule only on source
- feedOriginalSubmit = FeedMerlin.fromString(feed)
- .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
- .withClusterType(ClusterType.SOURCE)
- .build())
- .toString();
-
- LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
- ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit);
- TimeUtil.sleepSeconds(10);
- AssertUtil.assertSucceeded(response);
-
- //schedule on source
- response = cluster2.getFeedHelper().schedule(feedOriginalSubmit);
- TimeUtil.sleepSeconds(20);
- AssertUtil.assertSucceeded(response);
-
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 0);
-
- //prepare updated Feed
- feedUpdated = FeedMerlin.fromString(feed)
- .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
- .withClusterType(ClusterType.SOURCE)
- .withPartition("US/${cluster.colo}")
- .build())
- .toString();
- feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster1Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 20),
- TimeUtil.addMinsToTime(startTime, 85))
- .withClusterType(ClusterType.TARGET)
- .build())
- .toString();
- feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster3Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 40),
- TimeUtil.addMinsToTime(startTime, 110))
- .withClusterType(ClusterType.SOURCE)
- .withPartition("UK/${cluster.colo}")
- .build())
- .toString();
-
- LOGGER.info("Updated Feed: " + Util.prettyPrintXml(feedUpdated));
- response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
- TimeUtil.sleepSeconds(20);
- AssertUtil.assertSucceeded(response);
-
- prism.getFeedHelper().submitAndSchedule(feedUpdated);
-
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 2);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1);
- }
-
- @Test(enabled = true, groups = {"multiCluster"})
- public void deleteSourceCluster() throws Exception {
- //add one source and one target , schedule only on source
- feedOriginalSubmit = FeedMerlin.fromString(feed)
- .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
- .withClusterType(ClusterType.SOURCE)
- .withPartition("US/${cluster.colo}")
- .build())
- .toString();
- feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster1Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 20),
- TimeUtil.addMinsToTime(startTime, 85))
- .withClusterType(ClusterType.TARGET)
- .build())
- .toString();
- feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster3Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 40),
- TimeUtil.addMinsToTime(startTime, 110))
- .withClusterType(ClusterType.SOURCE)
- .withPartition("UK/${cluster.colo}")
- .build())
- .toString();
-
- LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
- ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit);
- TimeUtil.sleepSeconds(10);
- AssertUtil.assertSucceeded(response);
-
- //schedule on source
- response = prism.getFeedHelper().schedule(feedOriginalSubmit);
- TimeUtil.sleepSeconds(20);
- AssertUtil.assertSucceeded(response);
-
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1);
-
- //prepare updated Feed
- feedUpdated = FeedMerlin.fromString(feed)
- .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
- .withClusterType(ClusterType.SOURCE)
- .build())
- .toString();
- feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster1Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 20),
- TimeUtil.addMinsToTime(startTime, 85))
- .withClusterType(ClusterType.TARGET)
- .build())
- .toString();
-
- response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
- TimeUtil.sleepSeconds(20);
- AssertUtil.assertSucceeded(response);
-
- response = cluster3.getFeedHelper().getEntityDefinition(feedUpdated);
- AssertUtil.assertFailed(response);
-
- prism.getFeedHelper().submitAndSchedule(feedUpdated);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 2);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 3);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 2);
- }
-
- @Test(enabled = true, groups = {"multiCluster"})
- public void deleteTargetCluster() throws Exception {
- /*
- this test creates a multiCluster feed. Cluster1 is the target cluster
- and cluster3 and Cluster2 are the source cluster.
-
- feed is submitted through prism so submitted to both target and
- source. Feed is scheduled through prism, so only on Cluster3 and
- Cluster2 retention coord should exists. Cluster1 one which
- is target both retention and replication coord should exists. there
- will be 2 replication coord, one each for each source cluster.
-
- then we update feed by deleting cluster1 and cluster2 from the feed
- xml and send update request.
-
- Once update is over. definition should go missing from cluster1 and
- cluster2 and prism and cluster3 should have new def
-
- there should be a new retention coord on cluster3 and old number of
- coord on cluster1 and cluster2
- */
-
- //add two source and one target
- feedOriginalSubmit = FeedMerlin.fromString(feed).clearFeedClusters().toString();
-
- feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit)
- .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65))
- .withClusterType(ClusterType.SOURCE)
- .withPartition("US/${cluster.colo}")
- .build())
- .toString();
- feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster1Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 20),
- TimeUtil.addMinsToTime(startTime, 85))
- .withClusterType(ClusterType.TARGET)
- .build())
- .toString();
- feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster3Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 40),
- TimeUtil.addMinsToTime(startTime, 110))
- .withClusterType(ClusterType.SOURCE)
- .withPartition("UK/${cluster.colo}")
- .build())
- .toString();
-
- LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit));
- ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit);
- TimeUtil.sleepSeconds(10);
- AssertUtil.assertSucceeded(response);
-
- //schedule on source
- response = prism.getFeedHelper().schedule(feedOriginalSubmit);
- TimeUtil.sleepSeconds(20);
- AssertUtil.assertSucceeded(response);
-
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1);
-
- //prepare updated Feed
- feedUpdated = FeedMerlin.fromString(feed).clearFeedClusters().toString();
- feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster(
- new FeedMerlin.FeedClusterBuilder(cluster3Name)
- .withRetention("hours(10)", ActionType.DELETE)
- .withValidity(TimeUtil.addMinsToTime(startTime, 40),
- TimeUtil.addMinsToTime(startTime, 110))
- .withClusterType(ClusterType.SOURCE)
- .withPartition("UK/${cluster.colo}")
- .build())
- .toString();
-
- LOGGER.info("Feed: " + Util.prettyPrintXml(feedUpdated));
- response = prism.getFeedHelper().update(feedUpdated, feedUpdated);
- TimeUtil.sleepSeconds(20);
- AssertUtil.assertSucceeded(response);
-
- //verify xmls definitions
- response = cluster1.getFeedHelper().getEntityDefinition(feedUpdated);
- AssertUtil.assertFailed(response);
- response = cluster2.getFeedHelper().getEntityDefinition(feedUpdated);
- AssertUtil.assertFailed(response);
- response = cluster3.getFeedHelper().getEntityDefinition(feedUpdated);
- Assert.assertTrue(XmlUtil.isIdentical(feedUpdated, response.getMessage()));
- response = prism.getFeedHelper().getEntityDefinition(feedUpdated);
- Assert.assertTrue(XmlUtil.isIdentical(feedUpdated, response.getMessage()));
-
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2);
- Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1);
- }
-
- /*
- @Test(enabled = false)
- public void delete2SourceCluster() {
-
- }
-
- @Test(enabled = false)
- public void delete2TargetCluster() {
-
- }
-
- @Test(enabled = false)
- public void delete1Source1TargetCluster() {
-
- }
- */
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java
deleted file mode 100644
index ecb5798..0000000
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression;
-
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.BundleUtil;
-import org.apache.falcon.regression.core.util.HadoopUtil;
-import org.apache.falcon.regression.core.util.InstanceUtil;
-import org.apache.falcon.regression.core.util.Util;
-import org.apache.falcon.regression.core.util.OozieUtil;
-import org.apache.falcon.regression.core.util.OSUtil;
-import org.apache.falcon.regression.core.util.AssertUtil;
-import org.apache.falcon.regression.testHelper.BaseTestClass;
-import org.apache.falcon.resource.FeedInstanceResult;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Test for https://issues.apache.org/jira/browse/FALCON-761.
- */
-@Test(groups = "embedded", timeOut = 900000)
-public class FeedInstanceListingTest extends BaseTestClass{
- private String baseTestDir = cleanAndGetTestDir();
- private String aggregateWorkflowDir = baseTestDir + "/aggregator";
- private String feedInputPath = baseTestDir + "/input" + MINUTE_DATE_PATTERN;
- private String feedOutputPath = baseTestDir + "/output-data" + MINUTE_DATE_PATTERN;
- private String processName;
-
- private ColoHelper cluster = servers.get(0);
- private FileSystem clusterFS = serverFS.get(0);
- private OozieClient clusterOC = serverOC.get(0);
-
- private static final Logger LOGGER = Logger.getLogger(FeedInstanceListingTest.class);
-
- @BeforeMethod(alwaysRun = true)
- public void setup() throws Exception {
- bundles[0] = BundleUtil.readELBundle();
- bundles[0] = new Bundle(bundles[0], cluster);
- bundles[0].generateUniqueBundle(this);
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
- bundles[0].setInputFeedDataPath(feedInputPath);
- bundles[0].setInputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
- bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes);
- processName = bundles[0].getProcessName();
- HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() throws IOException{
- cleanTestsDirs();
- removeTestClassEntities();
- }
-
- /**
- * Test when all data is available for all instances.
- */
- @Test
- public void testFeedListingWhenAllAvailable() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
- bundles[0].setProcessConcurrency(1);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- List<List<String>> missingDependencies = OozieUtil.createMissingDependencies(cluster,
- EntityType.PROCESS, processName, 0);
- List<String> missingDependencyLastInstance = missingDependencies.get(missingDependencies.size()-1);
- HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE, missingDependencyLastInstance);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
- CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
- FeedInstanceResult r = prism.getFeedHelper()
- .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
- validateResponse(r, 5, 0, 0, 0, 5);
- }
-
- /**
- *Test when only empty directories exist for all instances.
- */
- @Test
- public void testFeedListingWhenAllEmpty() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
- bundles[0].setProcessConcurrency(1);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
- CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
- FeedInstanceResult r = prism.getFeedHelper()
- .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
- validateResponse(r, 5, 0, 5, 0, 0);
- }
-
- /**
- * Test when no data is present for any instance.
- */
- @Test
- public void testFeedListingWhenAllMissing() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
- bundles[0].setProcessConcurrency(1);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- FeedInstanceResult r = prism.getFeedHelper()
- .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
- validateResponse(r, 5, 5, 0, 0, 0);
- }
-
- /**
- * Initially no availability flag is set for the feed. And data is created, so instance status is available.
- * Then, set the availability flag and update the feed. The instance status should change to partial.
- */
- @Test
- public void testFeedListingAfterFeedAvailabilityFlagUpdate() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
- bundles[0].setProcessConcurrency(1);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- List<List<String>> missingDependencies = OozieUtil.createMissingDependencies(cluster,
- EntityType.PROCESS, processName, 0);
- List<String> missingDependencyLastInstance = missingDependencies.get(missingDependencies.size()-1);
- HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE, missingDependencyLastInstance);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
- CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
- FeedInstanceResult r = prism.getFeedHelper()
- .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
- validateResponse(r, 5, 0, 0, 0, 5);
- String inputFeed = bundles[0].getInputFeedFromBundle();
- bundles[0].setInputFeedAvailabilityFlag("_SUCCESS");
- ServiceResponse serviceResponse = prism.getFeedHelper().update(inputFeed, bundles[0].getInputFeedFromBundle());
- AssertUtil.assertSucceeded(serviceResponse);
- //Since we have not created availability flag on HDFS, the feed instance status should be partial
- r = prism.getFeedHelper()
- .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
- validateResponse(r, 5, 0, 0, 5, 0);
- }
-
- /**
- * Data is created for the feed, so instance status is available.
- * Then, change the data path and update the feed. The instance status should change to partial.
- */
- @Test
- public void testFeedListingAfterFeedDataPathUpdate() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
- bundles[0].setProcessConcurrency(1);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- List<List<String>> missingDependencies = OozieUtil.createMissingDependencies(cluster,
- EntityType.PROCESS, processName, 0);
- List<String> missingDependencyLastInstance = missingDependencies.get(missingDependencies.size()-1);
- HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE, missingDependencyLastInstance);
- InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
- CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
- FeedInstanceResult r = prism.getFeedHelper()
- .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
- validateResponse(r, 5, 0, 0, 0, 5);
- String inputFeed = bundles[0].getInputFeedFromBundle();
- bundles[0].setInputFeedDataPath(baseTestDir + "/inputNew" + MINUTE_DATE_PATTERN);
- ServiceResponse serviceResponse = prism.getFeedHelper().update(inputFeed, bundles[0].getInputFeedFromBundle());
- AssertUtil.assertSucceeded(serviceResponse);
- //Since we have not created directories for new path, the feed instance status should be missing
- r = prism.getFeedHelper()
- .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
- validateResponse(r, 5, 5, 0, 0, 0);
- }
-
- /**
- * Submit the feeds on prism, and request for instance status on server. Request should succeed.
- */
- @Test
- public void testFeedListingFeedSubmitOnPrismRequestOnServer() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
- bundles[0].setProcessConcurrency(1);
- bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0);
- FeedInstanceResult r = cluster.getFeedHelper()
- .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
- validateResponse(r, 5, 5, 0, 0, 0);
- }
-
- /**
- * Checks that actual number of instances with different statuses are equal to expected number
- * of instances with matching statuses.
- *
- * @param instancesResult kind of response from API which should contain information about
- * instances <p/>
- * All parameters below reflect number of expected instances with some
- * kind of status.
- * @param totalCount total number of instances.
- * @param missingCount number of running instances.
- * @param emptyCount number of suspended instance.
- * @param partialCount number of waiting instance.
- * @param availableCount number of killed instance.
- */
- private void validateResponse(FeedInstanceResult instancesResult, int totalCount,
- int missingCount, int emptyCount, int partialCount, int availableCount) {
- FeedInstanceResult.Instance[] instances = instancesResult.getInstances();
- LOGGER.info("instances: " + Arrays.toString(instances));
- Assert.assertNotNull(instances, "instances should be not null");
- Assert.assertEquals(instances.length, totalCount, "Total Instances");
- List<String> statuses = new ArrayList<>();
- for (FeedInstanceResult.Instance instance : instances) {
- Assert.assertNotNull(instance.getCluster());
- Assert.assertNotNull(instance.getInstance());
- Assert.assertNotNull(instance.getStatus());
- Assert.assertNotNull(instance.getUri());
- Assert.assertNotNull(instance.getCreationTime());
- Assert.assertNotNull(instance.getSize());
- final String status = instance.getStatus();
- LOGGER.info("status: "+ status + ", instance: " + instance.getInstance());
- statuses.add(status);
- }
-
- Assert.assertEquals(Collections.frequency(statuses, "MISSING"),
- missingCount, "Missing Instances");
- Assert.assertEquals(Collections.frequency(statuses, "EMPTY"),
- emptyCount, "Empty Instances");
- Assert.assertEquals(Collections.frequency(statuses, "PARTIAL"),
- partialCount, "Partial Instances");
- Assert.assertEquals(Collections.frequency(statuses, "AVAILABLE"),
- availableCount, "Available Instances");
- }
-}