You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:13 UTC
[14/27] adding falcon-regression
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
new file mode 100644
index 0000000..e77d534
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.InstancesResult.WorkflowStatus;
+import org.apache.falcon.regression.core.response.ResponseKeys;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.Job;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Process instance suspend tests.
+ */
+@Test(groups = "embedded")
+public class ProcessInstanceSuspendTest extends BaseTestClass {
+
+ private String baseTestHDFSDir = baseHDFSDir + "/ProcessInstanceSuspendTest";
+ private String feedInputPath = baseTestHDFSDir +
+ "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private String feedOutputPath =
+ baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+ private ColoHelper cluster = servers.get(0);
+ private FileSystem clusterFS = serverFS.get(0);
+ private static final Logger LOGGER = Logger.getLogger(ProcessInstanceSuspendTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void createTestData() throws Exception {
+ LOGGER.info("in @BeforeClass");
+ HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+
+ Bundle bundle = BundleUtil.readELBundle();
+ bundle = new Bundle(bundle, cluster);
+ String startDate = "2010-01-01T23:40Z";
+ String endDate = "2010-01-02T01:40Z";
+ bundle.setInputFeedDataPath(feedInputPath);
+ String prefix = bundle.getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+
+ List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+ HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setup(Method method) throws Exception {
+ LOGGER.info("test name: " + method.getName());
+ bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = new Bundle(bundles[0], cluster);
+ bundles[0].generateUniqueBundle();
+ bundles[0].setInputFeedDataPath(feedInputPath);
+ bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+
+ /**
+ * Schedule process. Try to suspend instances with start/end parameters which are
+ * wider then process validity range. Should fail.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceSuspendLargeRange() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(5);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ TimeUtil.sleepSeconds(15);
+ AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ Job.Status.RUNNING);
+ InstancesResult result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
+ InstanceUtil.validateResponse(result, 5, 5, 0, 0, 0);
+ prism.getProcessHelper()
+ .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z");
+ result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z");
+ InstanceUtil.validateSuccessWithStatusCode(result, 400);
+ }
+
+ /**
+ * Schedule single-instance process. Wait till instance succeed. Try to suspend
+ * succeeded instance. Action should be performed successfully as indempotent action.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceSuspendSucceeded() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(1);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstanceUtil.waitTillInstanceReachState(serverOC.get(0), Util.getProcessName(bundles[0]
+ .getProcessData()), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z");
+ InstanceUtil.validateSuccessWithStatusCode(r, 0);
+ }
+
+ /**
+ * Schedule process. Check that all instances are running. Suspend them. Check that all are
+ * suspended. In every action valid time range is used.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceSuspendAll() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(5);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ TimeUtil.sleepSeconds(15);
+ AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ Job.Status.RUNNING);
+ InstancesResult result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+ InstanceUtil.validateResponse(result, 5, 5, 0, 0, 0);
+ prism.getProcessHelper()
+ .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+ result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+ InstanceUtil.validateResponse(result, 5, 0, 5, 0, 0);
+ }
+
+ /**
+ * Schedule process and try to perform -suspend action without date range parameters.
+ * Attempt should fail.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceSuspendWoParams() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(2);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ Job.Status.RUNNING);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()), null);
+ InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.UNPARSEABLE_DATE);
+ }
+
+ /**
+ * Schedule process with 3 running and 2 waiting instances expected. Suspend ones which are
+ * running. Check that now 3 are suspended and 2 are still waiting.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceSuspendStartAndEnd() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(3);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ TimeUtil.sleepSeconds(15);
+ AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ Job.Status.RUNNING);
+ InstancesResult result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:22Z");
+ InstanceUtil.validateResponse(result, 5, 3, 0, 2, 0);
+ prism.getProcessHelper()
+ .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:15Z");
+ result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:22Z");
+ InstanceUtil.validateResponse(result, 5, 0, 3, 2, 0);
+ }
+
+ /**
+ * Try to suspend process which wasn't submitted and scheduled. Action should fail.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceSuspendNonExistent() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(5);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ Job.Status.RUNNING);
+ InstancesResult r =
+ prism.getProcessHelper()
+ .getProcessInstanceSuspend("invalidName", "?start=2010-01-02T01:20Z");
+ if ((r.getStatusCode() != ResponseKeys.PROCESS_NOT_FOUND)) {
+ Assert.assertTrue(false);
+ }
+ }
+
+ /**
+ * Schedule process. Perform -suspend action using only -start parameter which points to start
+ * time of process. Check that only 1 instance is suspended then.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceSuspendOnlyStart() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(3);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ Job.Status.RUNNING);
+ prism.getProcessHelper()
+ .getRunningInstance(URLS.INSTANCE_RUNNING,
+ Util.readEntityName(bundles[0].getProcessData()));
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z");
+ InstanceUtil.validateSuccessOnlyStart(r, WorkflowStatus.SUSPENDED);
+ prism.getProcessHelper()
+ .getRunningInstance(URLS.INSTANCE_RUNNING,
+ Util.readEntityName(bundles[0].getProcessData()));
+ }
+
+ /**
+ * Schedule process with number of instances running. Perform -suspend action using only -start
+ * parameter with value which points to expected last time of instantiation. Check that only
+ * the last instance is suspended.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceSuspendSuspendLast() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(5);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ Job.Status.RUNNING);
+ InstancesResult result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+ InstanceUtil.validateResponse(result, 5, 5, 0, 0, 0);
+ prism.getProcessHelper()
+ .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:20Z");
+ result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+ InstanceUtil.validateResponse(result, 5, 4, 1, 0, 0);
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void deleteData() throws Exception {
+ LOGGER.info("in @AfterClass");
+ Bundle bundle = BundleUtil.readELBundle();
+ bundle = new Bundle(bundle, cluster);
+ bundle.setInputFeedDataPath(feedInputPath);
+ String prefix = bundle.getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
new file mode 100644
index 0000000..ef6860b
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job.Status;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Tests with process lib folder detached from workflow.xml
+ */
+@Test(groups = "embedded")
+public class ProcessLibPathTest extends BaseTestClass {
+
+ ColoHelper cluster = servers.get(0);
+ FileSystem clusterFS = serverFS.get(0);
+ String testLibDir = baseHDFSDir + "/ProcessLibPath/TestLib";
+ private static final Logger logger = Logger.getLogger(ProcessLibPathTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void createTestData() throws Exception {
+
+ logger.info("in @BeforeClass");
+ //common lib for both test cases
+ HadoopUtil.uploadDir(clusterFS, testLibDir, OSUtil.RESOURCES_OOZIE + "lib");
+
+ Bundle b = BundleUtil.readELBundle();
+ b.generateUniqueBundle();
+ b = new Bundle(b, cluster);
+
+ String startDate = "2010-01-01T22:00Z";
+ String endDate = "2010-01-02T03:00Z";
+
+ b.setInputFeedDataPath(baseHDFSDir + "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+ String prefix = b.getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+
+ List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+
+ HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+ }
+
+
+ @BeforeMethod(alwaysRun = true)
+ public void testName(Method method) throws Exception {
+ logger.info("test name: " + method.getName());
+ bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = new Bundle(bundles[0], cluster);
+ bundles[0].generateUniqueBundle();
+ bundles[0].setInputFeedDataPath(baseHDFSDir + "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(
+ baseHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+ bundles[0].setProcessConcurrency(1);
+ bundles[0].setProcessLibPath(testLibDir);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+
+ /**
+ * Test which test a process with no lib folder in workflow location
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void setDifferentLibPathWithNoLibFolderInWorkflowfLocaltion() throws Exception {
+ String workflowDir = testLibDir + "/aggregatorLib1/";
+ HadoopUtil.uploadDir(clusterFS, workflowDir, OSUtil.RESOURCES_OOZIE);
+ HadoopUtil.deleteDirIfExists(workflowDir + "/lib", clusterFS);
+ logger.info("processData: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstanceUtil
+ .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+ }
+
+ /**
+ * Test which test a process with wrong jar in lib folder in workflow location
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void setDifferentLibPathWithWrongJarInWorkflowLib() throws Exception {
+ String workflowDir = testLibDir + "/aggregatorLib2/";
+ HadoopUtil.uploadDir(clusterFS, workflowDir, OSUtil.RESOURCES_OOZIE);
+ HadoopUtil.recreateDir(clusterFS, workflowDir + "/lib");
+ HadoopUtil.copyDataToFolder(clusterFS, workflowDir + "/lib",
+ OSUtil.RESOURCES + "ivory-oozie-lib-0.1.jar");
+ logger.info("processData: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstanceUtil
+ .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TestngListener.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TestngListener.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TestngListener.java
new file mode 100644
index 0000000..64febac
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TestngListener.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.testHelper.BaseUITestClass;
+import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
+import org.openqa.selenium.OutputType;
+import org.openqa.selenium.TakesScreenshot;
+import org.testng.ITestContext;
+import org.testng.ITestListener;
+import org.testng.ITestResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class TestngListener implements ITestListener {
+ private static final Logger logger = Logger.getLogger(TestngListener.class);
+
+ @Override
+ public void onTestStart(ITestResult result) {
+ logLine();
+ logger.info(
+ String.format("Testing going to start for: %s.%s %s", result.getTestClass().getName(),
+ result.getName(), Arrays.toString(result.getParameters())));
+ NDC.push(result.getName());
+ }
+
+ private void logLine() {
+ logger.info(
+ "-----------------------------------------------------------------------------------------------");
+ }
+
+ private void logEndOfTest(ITestResult result, String outcome) {
+ logger.info(
+ String.format("Testing going to end for: %s.%s(%s) %s", result.getTestClass().getName(),
+ result.getName(), Arrays.toString(result.getParameters()), outcome));
+ NDC.pop();
+ logLine();
+ }
+
+ @Override
+ public void onTestSuccess(ITestResult result) {
+ logEndOfTest(result, "SUCCESS");
+ }
+
+ @Override
+ public void onTestFailure(ITestResult result) {
+ logEndOfTest(result, "FAILED");
+ if (BaseUITestClass.getDRIVER() != null) {
+ byte[] scrFile = ((TakesScreenshot)BaseUITestClass.getDRIVER()).getScreenshotAs
+ (OutputType.BYTES);
+ try {
+ String filename = OSUtil.getPath("target", "surefire-reports", "screenshots", String.format("%s.%s.png",
+ result.getTestClass().getRealClass().getSimpleName(), result.getName()));
+ FileUtils.writeByteArrayToFile(new File(filename), scrFile);
+ } catch (IOException e) {
+ logger.info("Saving screenshot FAILED: " + e.getCause());
+ }
+ }
+
+ logger.info(ExceptionUtils.getStackTrace(result.getThrowable()));
+ logLine();
+ }
+
+ @Override
+ public void onTestSkipped(ITestResult result) {
+ logEndOfTest(result, "SKIPPED");
+ }
+
+ @Override
+ public void onTestFailedButWithinSuccessPercentage(ITestResult result) {
+ logEndOfTest(result, "TestFailedButWithinSuccessPercentage");
+ }
+
+ @Override
+ public void onStart(ITestContext context) {
+ }
+
+ @Override
+ public void onFinish(ITestContext context) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
new file mode 100644
index 0000000..32062ba
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
@@ -0,0 +1,649 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.hcat;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.regression.Entities.FeedMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HCatUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.api.HCatAddPartitionDesc;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.HCatCreateTableDesc;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Test(groups = "embedded")
+public class HCatProcessTest extends BaseTestClass {
+ private static final Logger logger = Logger.getLogger(HCatProcessTest.class);
+ ColoHelper cluster = servers.get(0);
+ FileSystem clusterFS = serverFS.get(0);
+ OozieClient clusterOC = serverOC.get(0);
+ HCatClient clusterHC;
+
+ final String testDir = "/HCatProcessTest";
+ final String baseTestHDFSDir = baseHDFSDir + testDir;
+ String hiveScriptDir = baseTestHDFSDir + "/hive";
+ String hiveScriptFile = hiveScriptDir + "/script.hql";
+ String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+ String hiveScriptFileNonHCatInput = hiveScriptDir + "/script_non_hcat_input.hql";
+ String hiveScriptFileNonHCatOutput = hiveScriptDir + "/script_non_hcat_output.hql";
+ String hiveScriptTwoHCatInputOneHCatOutput =
+ hiveScriptDir + "/script_two_hcat_input_one_hcat_output.hql";
+ String hiveScriptOneHCatInputTwoHCatOutput =
+ hiveScriptDir + "/script_one_hcat_input_two_hcat_output.hql";
+ String hiveScriptTwoHCatInputTwoHCatOutput =
+ hiveScriptDir + "/script_two_hcat_input_two_hcat_output.hql";
+ final String inputHDFSDir = baseTestHDFSDir + "/input";
+ final String inputHDFSDir2 = baseTestHDFSDir + "/input2";
+ final String outputHDFSDir = baseTestHDFSDir + "/output";
+ final String outputHDFSDir2 = baseTestHDFSDir + "/output2";
+
+ final String dbName = "default";
+ final String inputTableName = "hcatprocesstest_input_table";
+ final String inputTableName2 = "hcatprocesstest_input_table2";
+ final String outputTableName = "hcatprocesstest_output_table";
+ final String outputTableName2 = "hcatprocesstest_output_table2";
+ public static final String col1Name = "id";
+ public static final String col2Name = "value";
+ public static final String partitionColumn = "dt";
+
+ private static final String hcatDir = OSUtil.getPath("src", "test", "resources", "hcat");
+ private static final String localHCatData = OSUtil.getPath(hcatDir, "data");
+ private static final String hiveScript = OSUtil.getPath(hcatDir, "hivescript");
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp() throws Exception {
+ clusterHC = cluster.getClusterHelper().getHCatClient();
+ bundles[0] = BundleUtil.readHCatBundle();
+ bundles[0] = new Bundle(bundles[0], cluster);
+ bundles[0].generateUniqueBundle();
+ bundles[0].setProcessWorkflow(hiveScriptFile, EngineType.HIVE);
+ bundles[0].setClusterInterface(Interfacetype.REGISTRY,
+ cluster.getClusterHelper().getHCatEndpoint());
+
+ HadoopUtil.deleteDirIfExists(baseTestHDFSDir, clusterFS);
+ HadoopUtil.uploadDir(clusterFS, hiveScriptDir, hiveScript);
+ HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ HadoopUtil.recreateDir(clusterFS, outputHDFSDir);
+ HadoopUtil.recreateDir(clusterFS, outputHDFSDir2);
+ clusterHC.dropTable(dbName, inputTableName, true);
+ clusterHC.dropTable(dbName, inputTableName2, true);
+ clusterHC.dropTable(dbName, outputTableName, true);
+ clusterHC.dropTable(dbName, outputTableName2, true);
+ }
+
+ @DataProvider
+ public String[][] generateSeparators() {
+ //disabling till FALCON-372 is fixed
+ //return new String[][] {{"-"}, {"/"}};
+ return new String[][]{{"-"},};
+ }
+
+ @Test(dataProvider = "generateSeparators")
+ public void OneHCatInputOneHCatOutput(String separator) throws Exception {
+ /* upload data and create partition */
+ final String startDate = "2010-01-01T20:00Z";
+ final String endDate = "2010-01-02T04:00Z";
+ final String datePattern =
+ StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH"}, separator);
+ List<String> dataDates = getDatesList(startDate, endDate, datePattern, 60);
+
+ final List<String> dataset = HadoopUtil
+ .flattenAndPutDataInFolder(clusterFS, localHCatData, inputHDFSDir, dataDates);
+
+ ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+ cols.add(HCatUtil.getStringSchema(col1Name, col1Name + " comment"));
+ cols.add(HCatUtil.getStringSchema(col2Name, col2Name + " comment"));
+ ArrayList<HCatFieldSchema> partitionCols = new ArrayList<HCatFieldSchema>();
+
+ partitionCols.add(HCatUtil.getStringSchema(partitionColumn, partitionColumn + " partition"));
+ clusterHC.createTable(HCatCreateTableDesc
+ .create(dbName, inputTableName, cols)
+ .partCols(partitionCols)
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .location(inputHDFSDir)
+ .build());
+
+ clusterHC.createTable(HCatCreateTableDesc
+ .create(dbName, outputTableName, cols)
+ .partCols(partitionCols)
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .location(outputHDFSDir)
+ .build());
+
+ addPartitionsToTable(dataDates, dataset, "dt", dbName, inputTableName);
+
+ final String tableUriPartitionFragment = StringUtils.join(
+ new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator);
+ String inputTableUri =
+ "catalog:" + dbName + ":" + inputTableName + tableUriPartitionFragment;
+ bundles[0].setInputFeedTableUri(inputTableUri);
+ bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setInputFeedValidity(startDate, endDate);
+ String outputTableUri =
+ "catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment;
+ bundles[0].setOutputFeedTableUri(outputTableUri);
+ bundles[0].setOutputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setOutputFeedValidity(startDate, endDate);
+
+ bundles[0].setProcessValidity(startDate, endDate);
+ bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
+ bundles[0].submitFeedsScheduleProcess();
+
+ InstanceUtil.waitTillInstanceReachState(
+ clusterOC, bundles[0].getProcessName(), 1, CoordinatorAction.Status.SUCCEEDED,
+ EntityType.PROCESS);
+
+ AssertUtil.checkContentSize(inputHDFSDir + "/" + dataDates.get(0),
+ outputHDFSDir + "/dt=" + dataDates.get(0), clusterFS);
+ }
+
+ @Test(dataProvider = "generateSeparators")
+ public void TwoHCatInputOneHCatOutput(String separator) throws Exception {
+ /* upload data and create partition */
+ final String startDate = "2010-01-01T20:00Z";
+ final String endDate = "2010-01-02T04:00Z";
+ final String datePattern =
+ StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH"}, separator);
+ List<String> dataDates = getDatesList(startDate, endDate, datePattern, 60);
+
+ final List<String> dataset = HadoopUtil
+ .flattenAndPutDataInFolder(clusterFS, localHCatData, inputHDFSDir, dataDates);
+ final List<String> dataset2 = HadoopUtil
+ .flattenAndPutDataInFolder(clusterFS, localHCatData, inputHDFSDir2, dataDates);
+
+ ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+ cols.add(HCatUtil.getStringSchema(col1Name, col1Name + " comment"));
+ cols.add(HCatUtil.getStringSchema(col2Name, col2Name + " comment"));
+ ArrayList<HCatFieldSchema> partitionCols = new ArrayList<HCatFieldSchema>();
+
+ partitionCols.add(HCatUtil.getStringSchema(partitionColumn, partitionColumn + " partition"));
+ clusterHC.createTable(HCatCreateTableDesc
+ .create(dbName, inputTableName, cols)
+ .partCols(partitionCols)
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .location(inputHDFSDir)
+ .build());
+
+ clusterHC.createTable(HCatCreateTableDesc
+ .create(dbName, inputTableName2, cols)
+ .partCols(partitionCols)
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .location(inputHDFSDir2)
+ .build());
+
+ clusterHC.createTable(HCatCreateTableDesc
+ .create(dbName, outputTableName, cols)
+ .partCols(partitionCols)
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .location(outputHDFSDir)
+ .build());
+
+ addPartitionsToTable(dataDates, dataset, "dt", dbName, inputTableName);
+ addPartitionsToTable(dataDates, dataset2, "dt", dbName, inputTableName2);
+
+ final String tableUriPartitionFragment = StringUtils.join(
+ new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator);
+ String inputTableUri =
+ "catalog:" + dbName + ":" + inputTableName + tableUriPartitionFragment;
+ String inputTableUri2 =
+ "catalog:" + dbName + ":" + inputTableName2 + tableUriPartitionFragment;
+ bundles[0].setInputFeedTableUri(inputTableUri);
+ bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setInputFeedValidity(startDate, endDate);
+ final String inputFeed1 = bundles[0].getInputFeedFromBundle();
+ final String inputFeed2Name = "second-" + Util.readEntityName(inputFeed1);
+
+ FeedMerlin feedObj = new FeedMerlin(inputFeed1);
+ feedObj.setName(inputFeed2Name);
+ feedObj.getTable().setUri(inputTableUri2);
+
+ String inputFeed2 = feedObj.toString();
+ bundles[0].addInputFeedToBundle("inputData2", inputFeed2, 0);
+
+ String outputTableUri =
+ "catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment;
+ bundles[0].setOutputFeedTableUri(outputTableUri);
+ bundles[0].setOutputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setOutputFeedValidity(startDate, endDate);
+
+ bundles[0].setProcessValidity(startDate, endDate);
+ bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
+ bundles[0].setProcessWorkflow(hiveScriptTwoHCatInputOneHCatOutput, EngineType.HIVE);
+ bundles[0].submitFeedsScheduleProcess();
+
+ InstanceUtil.waitTillInstanceReachState(
+ clusterOC, bundles[0].getProcessName(), 1, CoordinatorAction.Status.SUCCEEDED,
+ EntityType.PROCESS);
+
+ final ContentSummary inputContentSummary =
+ clusterFS.getContentSummary(new Path(inputHDFSDir + "/" + dataDates.get(0)));
+ final ContentSummary inputContentSummary2 =
+ clusterFS.getContentSummary(new Path(inputHDFSDir2 + "/" + dataDates.get(0)));
+ final ContentSummary outputContentSummary =
+ clusterFS.getContentSummary(new Path(outputHDFSDir + "/dt=" + dataDates.get(0)));
+ logger.info("inputContentSummary = " + inputContentSummary.toString(false));
+ logger.info("inputContentSummary2 = " + inputContentSummary2.toString(false));
+ logger.info("outputContentSummary = " + outputContentSummary.toString(false));
+ Assert.assertEquals(inputContentSummary.getLength() + inputContentSummary2.getLength(),
+ outputContentSummary.getLength(),
+ "Unexpected size of the output.");
+ }
+
+ @Test(dataProvider = "generateSeparators")
+ public void OneHCatInputTwoHCatOutput(String separator) throws Exception {
+ /* upload data and create partition */
+ final String startDate = "2010-01-01T20:00Z";
+ final String endDate = "2010-01-02T04:00Z";
+ final String datePattern =
+ StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH"}, separator);
+ List<String> dataDates = getDatesList(startDate, endDate, datePattern, 60);
+
+ final List<String> dataset = HadoopUtil
+ .flattenAndPutDataInFolder(clusterFS, localHCatData, inputHDFSDir, dataDates);
+
+ ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+ cols.add(HCatUtil.getStringSchema(col1Name, col1Name + " comment"));
+ cols.add(HCatUtil.getStringSchema(col2Name, col2Name + " comment"));
+ ArrayList<HCatFieldSchema> partitionCols = new ArrayList<HCatFieldSchema>();
+
+ partitionCols.add(HCatUtil.getStringSchema(partitionColumn, partitionColumn + " partition"));
+ clusterHC.createTable(HCatCreateTableDesc
+ .create(dbName, inputTableName, cols)
+ .partCols(partitionCols)
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .location(inputHDFSDir)
+ .build());
+
+ clusterHC.createTable(HCatCreateTableDesc
+ .create(dbName, outputTableName, cols)
+ .partCols(partitionCols)
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .location(outputHDFSDir)
+ .build());
+
+ clusterHC.createTable(HCatCreateTableDesc
+ .create(dbName, outputTableName2, cols)
+ .partCols(partitionCols)
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .location(outputHDFSDir2)
+ .build());
+
+ addPartitionsToTable(dataDates, dataset, "dt", dbName, inputTableName);
+
+ final String tableUriPartitionFragment = StringUtils.join(
+ new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator);
+ String inputTableUri =
+ "catalog:" + dbName + ":" + inputTableName + tableUriPartitionFragment;
+ bundles[0].setInputFeedTableUri(inputTableUri);
+ bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setInputFeedValidity(startDate, endDate);
+ String outputTableUri =
+ "catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment;
+ String outputTableUri2 =
+ "catalog:" + dbName + ":" + outputTableName2 + tableUriPartitionFragment;
+ bundles[0].setOutputFeedTableUri(outputTableUri);
+ bundles[0].setOutputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setOutputFeedValidity(startDate, endDate);
+ final String outputFeed1 = bundles[0].getInputFeedFromBundle();
+ final String outputFeed2Name = "second-" + Util.readEntityName(outputFeed1);
+ FeedMerlin feedObj = new FeedMerlin(outputFeed1);
+ feedObj.setName(outputFeed2Name);
+ feedObj.getTable().setUri(outputTableUri2);
+ bundles[0].addOutputFeedToBundle("outputData2", feedObj.toString(), 0);
+
+ bundles[0].setProcessValidity(startDate, endDate);
+ bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
+ bundles[0].setProcessWorkflow(hiveScriptOneHCatInputTwoHCatOutput, EngineType.HIVE);
+ bundles[0].submitFeedsScheduleProcess();
+
+ InstanceUtil.waitTillInstanceReachState(
+ clusterOC, bundles[0].getProcessName(), 1, CoordinatorAction.Status.SUCCEEDED,
+ EntityType.PROCESS);
+
+ AssertUtil.checkContentSize(inputHDFSDir + "/" + dataDates.get(0),
+ outputHDFSDir + "/dt=" + dataDates.get(0), clusterFS);
+ AssertUtil.checkContentSize(inputHDFSDir + "/" + dataDates.get(0),
+ outputHDFSDir2 + "/dt=" + dataDates.get(0), clusterFS);
+ }
+
+
+ @Test(dataProvider = "generateSeparators")
+ public void TwoHCatInputTwoHCatOutput(String separator) throws Exception {
+ /* upload data and create partition */
+ final String startDate = "2010-01-01T20:00Z";
+ final String endDate = "2010-01-02T04:00Z";
+ final String datePattern =
+ StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH"}, separator);
+ List<String> dataDates = getDatesList(startDate, endDate, datePattern, 60);
+
+ final List<String> dataset = HadoopUtil
+ .flattenAndPutDataInFolder(clusterFS, localHCatData, inputHDFSDir, dataDates);
+ final List<String> dataset2 = HadoopUtil
+ .flattenAndPutDataInFolder(clusterFS, localHCatData, inputHDFSDir2, dataDates);
+
+ ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+ cols.add(HCatUtil.getStringSchema(col1Name, col1Name + " comment"));
+ cols.add(HCatUtil.getStringSchema(col2Name, col2Name + " comment"));
+ ArrayList<HCatFieldSchema> partitionCols = new ArrayList<HCatFieldSchema>();
+
+ partitionCols.add(HCatUtil.getStringSchema(partitionColumn, partitionColumn + " partition"));
+ clusterHC.createTable(HCatCreateTableDesc
+ .create(dbName, inputTableName, cols)
+ .partCols(partitionCols)
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .location(inputHDFSDir)
+ .build());
+
+ clusterHC.createTable(HCatCreateTableDesc
+ .create(dbName, inputTableName2, cols)
+ .partCols(partitionCols)
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .location(inputHDFSDir2)
+ .build());
+
+ clusterHC.createTable(HCatCreateTableDesc
+ .create(dbName, outputTableName, cols)
+ .partCols(partitionCols)
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .location(outputHDFSDir)
+ .build());
+
+ clusterHC.createTable(HCatCreateTableDesc
+ .create(dbName, outputTableName2, cols)
+ .partCols(partitionCols)
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .location(outputHDFSDir2)
+ .build());
+
+ addPartitionsToTable(dataDates, dataset, "dt", dbName, inputTableName);
+ addPartitionsToTable(dataDates, dataset2, "dt", dbName, inputTableName2);
+
+ final String tableUriPartitionFragment = StringUtils.join(
+ new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator);
+ String inputTableUri =
+ "catalog:" + dbName + ":" + inputTableName + tableUriPartitionFragment;
+ String inputTableUri2 =
+ "catalog:" + dbName + ":" + inputTableName2 + tableUriPartitionFragment;
+ bundles[0].setInputFeedTableUri(inputTableUri);
+ bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setInputFeedValidity(startDate, endDate);
+ final String inputFeed1 = bundles[0].getInputFeedFromBundle();
+ final String inputFeed2Name = "second-" + Util.readEntityName(inputFeed1);
+ FeedMerlin feedObj = new FeedMerlin(inputFeed1);
+ feedObj.setName(inputFeed2Name);
+ feedObj.getTable().setUri(inputTableUri2);
+ String inputFeed2 = feedObj.toString();
+ bundles[0].addInputFeedToBundle("inputData2", inputFeed2, 0);
+
+ String outputTableUri =
+ "catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment;
+ String outputTableUri2 =
+ "catalog:" + dbName + ":" + outputTableName2 + tableUriPartitionFragment;
+ bundles[0].setOutputFeedTableUri(outputTableUri);
+ bundles[0].setOutputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setOutputFeedValidity(startDate, endDate);
+ final String outputFeed1 = bundles[0].getOutputFeedFromBundle();
+ final String outputFeed2Name = "second-" + Util.readEntityName(outputFeed1);
+ FeedMerlin feedObj2 = new FeedMerlin(outputFeed1);
+ feedObj2.setName(outputFeed2Name);
+ feedObj2.getTable().setUri(outputTableUri2);
+ String outputFeed2 = feedObj2.toString();
+ bundles[0].addOutputFeedToBundle("outputData2", outputFeed2, 0);
+ bundles[0].setProcessValidity(startDate, endDate);
+ bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
+ bundles[0].setProcessWorkflow(hiveScriptTwoHCatInputTwoHCatOutput, EngineType.HIVE);
+ bundles[0].submitFeedsScheduleProcess();
+
+ InstanceUtil.waitTillInstanceReachState(
+ clusterOC, bundles[0].getProcessName(), 1, CoordinatorAction.Status.SUCCEEDED,
+ EntityType.PROCESS);
+
+ final ContentSummary inputContentSummary =
+ clusterFS.getContentSummary(new Path(inputHDFSDir + "/" + dataDates.get(0)));
+ final ContentSummary inputContentSummary2 =
+ clusterFS.getContentSummary(new Path(inputHDFSDir2 + "/" + dataDates.get(0)));
+ final ContentSummary outputContentSummary =
+ clusterFS.getContentSummary(new Path(outputHDFSDir + "/dt=" + dataDates.get(0)));
+ final ContentSummary outputContentSummary2 =
+ clusterFS.getContentSummary(new Path(outputHDFSDir2 + "/dt=" + dataDates.get(0)));
+ logger.info("inputContentSummary = " + inputContentSummary.toString(false));
+ logger.info("inputContentSummary2 = " + inputContentSummary2.toString(false));
+ logger.info("outputContentSummary = " + outputContentSummary.toString(false));
+ logger.info("outputContentSummary2 = " + outputContentSummary2.toString(false));
+ Assert.assertEquals(inputContentSummary.getLength() + inputContentSummary2.getLength(),
+ outputContentSummary.getLength(),
+ "Unexpected size of the output.");
+ Assert.assertEquals(inputContentSummary.getLength() + inputContentSummary2.getLength(),
+ outputContentSummary2.getLength(),
+ "Unexpected size of the output.");
+ }
+
+
+ @Test(dataProvider = "generateSeparators")
+ public void OneHCatInputOneNonHCatOutput(String separator) throws Exception {
+ final String startDate = "2010-01-01T20:00Z";
+ final String endDate = "2010-01-02T04:00Z";
+ /* upload data and create partition */
+ final String datePattern =
+ StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH"}, separator);
+ List<String> dataDates = getDatesList(startDate, endDate, datePattern, 60);
+
+ final List<String> dataset = HadoopUtil
+ .flattenAndPutDataInFolder(clusterFS, localHCatData, inputHDFSDir, dataDates);
+
+ ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+ cols.add(HCatUtil.getStringSchema(col1Name, col1Name + " comment"));
+ cols.add(HCatUtil.getStringSchema(col2Name, col2Name + " comment"));
+ ArrayList<HCatFieldSchema> partitionCols = new ArrayList<HCatFieldSchema>();
+
+ partitionCols.add(HCatUtil.getStringSchema(partitionColumn, partitionColumn + " partition"));
+ clusterHC.createTable(HCatCreateTableDesc
+ .create(dbName, inputTableName, cols)
+ .partCols(partitionCols)
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .location(inputHDFSDir)
+ .build());
+
+ addPartitionsToTable(dataDates, dataset, "dt", dbName, inputTableName);
+
+ final String tableUriPartitionFragment = StringUtils.join(
+ new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator);
+ String inputTableUri =
+ "catalog:" + dbName + ":" + inputTableName + tableUriPartitionFragment;
+ bundles[0].setInputFeedTableUri(inputTableUri);
+ bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setInputFeedValidity(startDate, endDate);
+
+ //
+ String nonHCatFeed = BundleUtil.readELBundle().getOutputFeedFromBundle();
+ final String outputFeedName = bundles[0].getOutputFeedNameFromBundle();
+ nonHCatFeed = Util.setFeedName(nonHCatFeed, outputFeedName);
+ final List<String> clusterNames = bundles[0].getClusterNames();
+ Assert.assertEquals(clusterNames.size(), 1, "Expected only one cluster in the bundle.");
+ nonHCatFeed = Util.setClusterNameInFeed(nonHCatFeed, clusterNames.get(0), 0);
+ bundles[0].writeFeedElement(nonHCatFeed, outputFeedName);
+ bundles[0].setOutputFeedLocationData(outputHDFSDir + "/" +
+ StringUtils.join(new String[]{"${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator));
+ bundles[0].setOutputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setOutputFeedValidity(startDate, endDate);
+
+ bundles[0].setProcessValidity(startDate, endDate);
+ bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
+
+ bundles[0].setProcessWorkflow(hiveScriptFileNonHCatOutput, EngineType.HIVE);
+ bundles[0].submitFeedsScheduleProcess();
+
+ InstanceUtil.waitTillInstanceReachState(
+ clusterOC, bundles[0].getProcessName(), 1, CoordinatorAction.Status.SUCCEEDED,
+ EntityType.PROCESS);
+
+ AssertUtil.checkContentSize(inputHDFSDir + "/" + dataDates.get(0),
+ outputHDFSDir + "/" + dataDates.get(0), clusterFS);
+ }
+
+ @Test(dataProvider = "generateSeparators")
+ public void OneNonCatInputOneHCatOutput(String separator) throws Exception {
+ /* upload data and create partition */
+ final String startDate = "2010-01-01T20:00Z";
+ final String endDate = "2010-01-02T04:00Z";
+ final String datePattern =
+ StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH"}, separator);
+ List<String> dataDates = getDatesList(startDate, endDate, datePattern, 60);
+
+ final List<String> dataset = HadoopUtil.
+ flattenAndPutDataInFolder(clusterFS, localHCatData, inputHDFSDir, dataDates);
+
+ ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+ cols.add(HCatUtil.getStringSchema(col1Name, col1Name + " comment"));
+ cols.add(HCatUtil.getStringSchema(col2Name, col2Name + " comment"));
+ ArrayList<HCatFieldSchema> partitionCols = new ArrayList<HCatFieldSchema>();
+
+ partitionCols.add(HCatUtil.getStringSchema(partitionColumn, partitionColumn + " partition"));
+ clusterHC.createTable(HCatCreateTableDesc
+ .create(dbName, outputTableName, cols)
+ .partCols(partitionCols)
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .location(outputHDFSDir)
+ .build());
+
+ String nonHCatFeed = BundleUtil.readELBundle().getInputFeedFromBundle();
+ final String inputFeedName = bundles[0].getInputFeedNameFromBundle();
+ nonHCatFeed = Util.setFeedName(nonHCatFeed, inputFeedName);
+ final List<String> clusterNames = bundles[0].getClusterNames();
+ Assert.assertEquals(clusterNames.size(), 1, "Expected only one cluster in the bundle.");
+ nonHCatFeed = Util.setClusterNameInFeed(nonHCatFeed, clusterNames.get(0), 0);
+ bundles[0].writeFeedElement(nonHCatFeed, inputFeedName);
+ bundles[0].setInputFeedDataPath(inputHDFSDir + "/" +
+ StringUtils.join(new String[]{"${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator));
+ bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setInputFeedValidity(startDate, endDate);
+
+ final String tableUriPartitionFragment = StringUtils.join(
+ new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator);
+ String outputTableUri =
+ "catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment;
+ bundles[0].setOutputFeedTableUri(outputTableUri);
+ bundles[0].setOutputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setOutputFeedValidity(startDate, endDate);
+
+ bundles[0].setProcessWorkflow(hiveScriptFileNonHCatInput, EngineType.HIVE);
+ bundles[0].setProcessValidity(startDate, endDate);
+ bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
+ bundles[0].submitFeedsScheduleProcess();
+
+ InstanceUtil.waitTillInstanceReachState(
+ clusterOC, bundles[0].getProcessName(), 1, CoordinatorAction.Status.SUCCEEDED,
+ EntityType.PROCESS);
+
+ AssertUtil.checkContentSize(inputHDFSDir + "/" + dataDates.get(0),
+ outputHDFSDir + "/dt=" + dataDates.get(0), clusterFS);
+ }
+
+ private void addPartitionsToTable(List<String> partitions, List<String> partitionLocations,
+ String partitionCol,
+ String dbName, String tableName) throws HCatException {
+ Assert.assertEquals(partitions.size(), partitionLocations.size(),
+ "Number of locations is not same as number of partitions.");
+ final List<HCatAddPartitionDesc> partitionDesc = new ArrayList<HCatAddPartitionDesc>();
+ for (int i = 0; i < partitions.size(); ++i) {
+ final String partition = partitions.get(i);
+ final Map<String, String> onePartition = new HashMap<String, String>();
+ onePartition.put(partitionCol, partition);
+ final String partitionLoc = partitionLocations.get(i);
+ partitionDesc.add(
+ HCatAddPartitionDesc.create(dbName, tableName, partitionLoc, onePartition).build());
+ }
+ clusterHC.addPartitions(partitionDesc);
+ }
+
+ public static List<String> getDatesList(String startDate, String endDate, String datePattern,
+ int skipMinutes) {
+ DateTime startDateJoda = new DateTime(TimeUtil.oozieDateToDate(startDate));
+ DateTime endDateJoda = new DateTime(TimeUtil.oozieDateToDate(endDate));
+ DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+ logger.info("generating data between " + formatter.print(startDateJoda) + " and " +
+ formatter.print(endDateJoda));
+ List<String> dates = new ArrayList<String>();
+ dates.add(formatter.print(startDateJoda));
+ while (!startDateJoda.isAfter(endDateJoda)) {
+ startDateJoda = startDateJoda.plusMinutes(skipMinutes);
+ dates.add(formatter.print(startDateJoda));
+ }
+ return dates;
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
new file mode 100644
index 0000000..860e680
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.hcat;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HCatUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.XmlUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hive.hcatalog.api.HCatAddPartitionDesc;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.HCatCreateTableDesc;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.joda.time.format.DateTimeFormat;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Test(groups = "embedded")
+public class HCatReplicationTest extends BaseTestClass {
+
+ private static final Logger logger = Logger.getLogger(HCatReplicationTest.class);
+ ColoHelper cluster = servers.get(0);
+ FileSystem clusterFS = serverFS.get(0);
+ HCatClient clusterHC;
+
+ ColoHelper cluster2 = servers.get(1);
+ FileSystem cluster2FS = serverFS.get(1);
+ OozieClient cluster2OC = serverOC.get(1);
+ HCatClient cluster2HC;
+
+ ColoHelper cluster3 = servers.get(2);
+ FileSystem cluster3FS = serverFS.get(2);
+ OozieClient cluster3OC = serverOC.get(2);
+ HCatClient cluster3HC;
+
+ final String baseTestHDFSDir = baseHDFSDir + "/HCatReplicationTest";
+
+ final String dbName = "default";
+ private static final String localHCatData = OSUtil.getPath(OSUtil.RESOURCES, "hcat", "data");
+
+ @BeforeClass(alwaysRun = true)
+ public void beforeClass() throws IOException {
+ clusterHC = cluster.getClusterHelper().getHCatClient();
+ cluster2HC = cluster2.getClusterHelper().getHCatClient();
+ cluster3HC = cluster3.getClusterHelper().getHCatClient();
+ // create the base dir on all clusters.
+ HadoopUtil.recreateDir(serverFS, baseTestHDFSDir);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp() throws Exception {
+ Bundle bundle = BundleUtil.readHCatBundle();
+ bundles[0] = new Bundle(bundle, cluster.getPrefix());
+ bundles[0].generateUniqueBundle();
+ bundles[0].setClusterInterface(Interfacetype.REGISTRY,
+ cluster.getClusterHelper().getHCatEndpoint());
+
+ bundles[1] = new Bundle(bundle, cluster2.getPrefix());
+ bundles[1].generateUniqueBundle();
+ bundles[1].setClusterInterface(Interfacetype.REGISTRY, cluster2.getClusterHelper()
+ .getHCatEndpoint());
+
+ bundles[2] = new Bundle(bundle, cluster3.getPrefix());
+ bundles[2].generateUniqueBundle();
+ bundles[2].setClusterInterface(Interfacetype.REGISTRY, cluster3.getClusterHelper()
+ .getHCatEndpoint());
+
+ }
+
+ @DataProvider
+ public String[][] generateSeparators() {
+ //disabling till FALCON-372 is fixed
+ //return new String[][] {{"-"}, {"/"}};
+ return new String[][]{{"-"},};
+ }
+
+ // make sure oozie changes mentioned FALCON-389 are done on the clusters. Otherwise the test
+ // will fail.
+ // Noticed with hive 0.13 we need the following issues resolved to work HIVE-6848 and
+ // HIVE-6868. Also oozie share libs need to have hive jars that have these jira's resolved and
+ // the maven depenendcy you are using to run the tests has to have hcat that has these fixed.
+ @Test(dataProvider = "generateSeparators")
+ public void oneSourceOneTarget(String separator) throws Exception {
+ String tcName = "HCatReplication_oneSourceOneTarget";
+ if (separator.equals("-")) {
+ tcName += "_hyphen";
+ } else {
+ tcName += "_slash";
+ }
+ String tblName = tcName;
+ String testHdfsDir = baseTestHDFSDir + "/" + tcName;
+ HadoopUtil.recreateDir(serverFS, testHdfsDir);
+ final String startDate = "2010-01-01T20:00Z";
+ final String endDate = "2099-01-01T00:00Z";
+ final String dataEndDate = "2010-01-01T21:00Z";
+ final String tableUriPartitionFragment = StringUtils
+ .join(new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator);
+ String tableUri = "catalog:" + dbName + ":" + tblName + tableUriPartitionFragment;
+ final String datePattern =
+ StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH"}, separator);
+ // use the start date for both as this will only generate 2 partitions.
+ List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, dataEndDate, 60,
+ DateTimeFormat.forPattern(datePattern));
+
+ final List<String> dataset = HadoopUtil.flattenAndPutDataInFolder(clusterFS,
+ localHCatData, testHdfsDir, dataDates);
+ final String col1Name = "id";
+ final String col2Name = "value";
+ final String partitionColumn = "dt";
+
+ ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+ cols.add(HCatUtil.getStringSchema(col1Name, col1Name + " comment"));
+ cols.add(HCatUtil.getStringSchema(col2Name, col2Name + " comment"));
+ ArrayList<HCatFieldSchema> partitionCols = new ArrayList<HCatFieldSchema>();
+
+ // create table on cluster 1 and add data to it.
+ partitionCols.add(HCatUtil.getStringSchema(partitionColumn, partitionColumn + " partition"));
+ createTable(clusterHC, dbName, tblName, cols, partitionCols, testHdfsDir);
+ addPartitionsToTable(dataDates, dataset, "dt", dbName, tblName, clusterHC);
+
+ // create table on target cluster.
+ createTable(cluster2HC, dbName, tblName, cols, partitionCols, testHdfsDir);
+
+ Bundle.submitCluster(bundles[0], bundles[1]);
+
+ bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setInputFeedValidity(startDate, endDate);
+ bundles[0].setInputFeedTableUri(tableUri);
+
+ String feed = bundles[0].getDataSets().get(0);
+ // set the cluster 2 as the target.
+ feed = InstanceUtil.setFeedClusterWithTable(feed,
+ XmlUtil.createValidity(startDate, endDate),
+ XmlUtil.createRtention("months(9000)", ActionType.DELETE),
+ Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+ tableUri);
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL,
+ feed)
+ );
+ TimeUtil.sleepSeconds(15);
+ //check if all coordinators exist
+ Assert.assertEquals(InstanceUtil
+ .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+ "REPLICATION"), 1);
+
+ //replication should start, wait while it ends
+ // we will check for 2 instances so that both partitions are copied over.
+ InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+
+ //check if data was replicated correctly
+ List<Path> cluster1ReplicatedData = HadoopUtil
+ .getAllFilesRecursivelyHDFS(clusterFS, new Path(testHdfsDir));
+ logger.info("Data on source cluster: " + cluster1ReplicatedData);
+ List<Path> cluster2ReplicatedData = HadoopUtil
+ .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testHdfsDir));
+ logger.info("Data on target cluster: " + cluster2ReplicatedData);
+ AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
+
+ }
+
+ // make sure oozie changes mentioned FALCON-389 are done on the clusters. Otherwise the test
+ // will fail.
+ // Noticed with hive 0.13 we need the following issues resolved to work HIVE-6848 and
+ // HIVE-6868. Also oozie share libs need to have hive jars that have these jira's resolved and
+ // the maven depenendcy you are using to run the tests has to have hcat that has these fixed.
+ // This test can fail randomly because of https://issues.apache.org/jira/browse/FALCON-401
+ @Test(dataProvider = "generateSeparators")
+ public void oneSourceTwoTarget(String separator) throws Exception {
+ String tcName = "HCatReplication_oneSourceTwoTarget";
+ if (separator.equals("-")) {
+ tcName += "_hyphen";
+ } else {
+ tcName += "_slash";
+ }
+ String tblName = tcName;
+ String testHdfsDir = baseTestHDFSDir + "/" + tcName;
+ HadoopUtil.recreateDir(serverFS, testHdfsDir);
+ final String startDate = "2010-01-01T20:00Z";
+ final String endDate = "2099-01-01T00:00Z";
+ final String dataEndDate = "2010-01-01T21:00Z";
+ final String tableUriPartitionFragment = StringUtils
+ .join(new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator);
+ String tableUri = "catalog:" + dbName + ":" + tblName + tableUriPartitionFragment;
+ final String datePattern =
+ StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH"}, separator);
+ // use the start date for both as this will only generate 2 partitions.
+ List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, dataEndDate, 60,
+ DateTimeFormat.forPattern(datePattern));
+
+ final List<String> dataset = HadoopUtil.flattenAndPutDataInFolder(clusterFS,
+ localHCatData, testHdfsDir, dataDates);
+ final String col1Name = "id";
+ final String col2Name = "value";
+ final String partitionColumn = "dt";
+
+ ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+ cols.add(HCatUtil.getStringSchema(col1Name, col1Name + " comment"));
+ cols.add(HCatUtil.getStringSchema(col2Name, col2Name + " comment"));
+ ArrayList<HCatFieldSchema> partitionCols = new ArrayList<HCatFieldSchema>();
+
+ // create table on cluster 1 and add data to it.
+ partitionCols.add(HCatUtil.getStringSchema(partitionColumn, partitionColumn + " partition"));
+ createTable(clusterHC, dbName, tblName, cols, partitionCols, testHdfsDir);
+ addPartitionsToTable(dataDates, dataset, "dt", dbName, tblName, clusterHC);
+
+ // create table on target cluster.
+ createTable(cluster2HC, dbName, tblName, cols, partitionCols, testHdfsDir);
+ createTable(cluster3HC, dbName, tblName, cols, partitionCols, testHdfsDir);
+
+ Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
+
+ bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setInputFeedValidity(startDate, endDate);
+ bundles[0].setInputFeedTableUri(tableUri);
+
+ String feed = bundles[0].getDataSets().get(0);
+ // set the cluster 2 as the target.
+ feed = InstanceUtil.setFeedClusterWithTable(feed,
+ XmlUtil.createValidity(startDate, endDate),
+ XmlUtil.createRtention("months(9000)", ActionType.DELETE),
+ Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+ tableUri);
+ // set the cluster 3 as the target.
+ feed = InstanceUtil.setFeedClusterWithTable(feed,
+ XmlUtil.createValidity(startDate, endDate),
+ XmlUtil.createRtention("months(9000)", ActionType.DELETE),
+ Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.TARGET, null,
+ tableUri);
+
+ AssertUtil.assertSucceeded(
+ prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL,
+ feed)
+ );
+ TimeUtil.sleepSeconds(15);
+ //check if all coordinators exist
+ Assert.assertEquals(InstanceUtil
+ .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+ "REPLICATION"), 1);
+
+ //check if all coordinators exist
+ Assert.assertEquals(InstanceUtil
+ .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed),
+ "REPLICATION"), 1);
+
+ //replication should start, wait while it ends
+ // we will check for 2 instances so that both partitions are copied over.
+ InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+
+ //replication should start, wait while it ends
+ // we will check for 2 instances so that both partitions are copied over.
+ InstanceUtil.waitTillInstanceReachState(cluster3OC, Util.readEntityName(feed), 2,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+
+ //check if data was replicated correctly
+ List<Path> srcData = HadoopUtil
+ .getAllFilesRecursivelyHDFS(clusterFS, new Path(testHdfsDir));
+ logger.info("Data on source cluster: " + srcData);
+ List<Path> cluster2TargetData = HadoopUtil
+ .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testHdfsDir));
+ logger.info("Data on target cluster: " + cluster2TargetData);
+ AssertUtil.checkForListSizes(srcData, cluster2TargetData);
+ List<Path> cluster3TargetData = HadoopUtil
+ .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testHdfsDir));
+ logger.info("Data on target cluster: " + cluster3TargetData);
+ AssertUtil.checkForListSizes(srcData, cluster3TargetData);
+ }
+
+ //TODO: More tests need to be added such as
+ // Tests to make sure new partitions that are added are replicated
+ // Tests to make sure partitions that do no match the pattern are not copied
+
+ private void addPartitionsToTable(List<String> partitions, List<String> partitionLocations,
+ String partitionCol,
+ String dbName, String tableName, HCatClient hc) throws
+ HCatException {
+ Assert.assertEquals(partitions.size(), partitionLocations.size(),
+ "Number of locations is not same as number of partitions.");
+ final List<HCatAddPartitionDesc> partitionDesc = new ArrayList<HCatAddPartitionDesc>();
+ for (int i = 0; i < partitions.size(); ++i) {
+ final String partition = partitions.get(i);
+ final Map<String, String> onePartition = new HashMap<String, String>();
+ onePartition.put(partitionCol, partition);
+ final String partitionLoc = partitionLocations.get(i);
+ partitionDesc
+ .add(HCatAddPartitionDesc.create(dbName, tableName, partitionLoc, onePartition)
+ .build());
+ }
+ logger.info("adding partitions: " + partitionDesc);
+ hc.addPartitions(partitionDesc);
+ }
+
+ private static void createTable(HCatClient hcatClient, String dbName, String tblName,
+ List<HCatFieldSchema> cols, List<HCatFieldSchema> partitionCols,
+ String hdfsDir) throws HCatException {
+ hcatClient.dropTable(dbName, tblName, true);
+ hcatClient.createTable(HCatCreateTableDesc
+ .create(dbName, tblName, cols)
+ .partCols(partitionCols)
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .location(hdfsDir)
+ .build());
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+}