You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2014/09/12 00:19:46 UTC
[41/41] git commit: Merge remote-tracking branch master into
FALCON-585.
Merge remote-tracking branch master into FALCON-585.
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/0d3e97d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/0d3e97d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/0d3e97d7
Branch: refs/heads/FALCON-585
Commit: 0d3e97d784a9b33367ca67af99935f3d8aa3c4d0
Parents: 1f9a27f 5766b74
Author: Raghav Kumar Gautam <ra...@apache.org>
Authored: Tue Sep 9 20:12:46 2014 -0700
Committer: Raghav Kumar Gautam <ra...@apache.org>
Committed: Wed Sep 10 14:00:25 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 42 ++
.../java/org/apache/falcon/cli/FalconCLI.java | 89 +++-
.../org/apache/falcon/client/FalconClient.java | 177 +++++--
.../org/apache/falcon/resource/EntityList.java | 2 +-
.../falcon/resource/EntitySummaryResult.java | 220 +++++++++
.../org/apache/falcon/entity/EntityUtil.java | 68 +++
.../InstanceRelationshipGraphBuilder.java | 107 ++++-
.../falcon/metadata/MetadataMappingService.java | 8 +-
.../metadata/RelationshipGraphBuilder.java | 13 +-
.../falcon/metadata/RelationshipLabel.java | 8 +-
.../apache/falcon/retention/EvictionHelper.java | 88 ++++
.../falcon/workflow/WorkflowExecutionArgs.java | 8 +-
.../workflow/WorkflowExecutionContext.java | 31 +-
.../WorkflowJobEndNotificationService.java | 5 +
.../apache/falcon/entity/EntityUtilTest.java | 33 ++
.../metadata/MetadataMappingServiceTest.java | 421 +++++++++++++----
.../workflow/WorkflowExecutionContextTest.java | 4 +-
.../WorkflowJobEndNotificationServiceTest.java | 4 +-
docs/src/site/twiki/FalconCLI.twiki | 64 ++-
docs/src/site/twiki/restapi/EntityList.twiki | 38 +-
docs/src/site/twiki/restapi/EntitySummary.twiki | 73 +++
docs/src/site/twiki/restapi/InstanceList.twiki | 29 +-
docs/src/site/twiki/restapi/InstanceLogs.twiki | 32 +-
.../site/twiki/restapi/InstanceRunning.twiki | 26 +-
.../src/site/twiki/restapi/InstanceStatus.twiki | 30 +-
.../site/twiki/restapi/InstanceSummary.twiki | 17 +-
docs/src/site/twiki/restapi/ResourceList.twiki | 13 +-
falcon-regression/CHANGES.txt | 55 ++-
.../falcon/regression/core/bundle/Bundle.java | 54 +--
.../core/enumsAndConstants/MerlinConstants.java | 4 +
.../core/helpers/ClusterEntityHelperImpl.java | 14 +-
.../core/interfaces/IEntityManagerHelper.java | 130 +++--
.../regression/core/supportClasses/Brother.java | 16 +-
.../falcon/regression/core/util/BundleUtil.java | 9 +-
.../regression/core/util/CleanupUtil.java | 11 +-
.../falcon/regression/core/util/HadoopUtil.java | 66 +--
.../regression/core/util/KerberosHelper.java | 3 +-
.../falcon/regression/core/util/Util.java | 85 ++--
.../regression/testHelper/BaseTestClass.java | 9 +
.../falcon/regression/AuthorizationTest.java | 127 ++---
.../regression/ELExp_FutureAndLatestTest.java | 20 +-
.../falcon/regression/ELValidationsTest.java | 5 +
.../regression/EmbeddedPigScriptTest.java | 83 ++--
.../regression/FeedClusterUpdateTest.java | 83 ++--
.../regression/FeedInstanceStatusTest.java | 188 +++-----
.../falcon/regression/FeedReplicationTest.java | 23 +-
.../falcon/regression/FeedResumeTest.java | 36 +-
.../falcon/regression/FeedScheduleTest.java | 41 +-
.../falcon/regression/FeedStatusTest.java | 41 +-
.../regression/FeedSubmitAndScheduleTest.java | 57 +--
.../falcon/regression/FeedSubmitTest.java | 35 +-
.../falcon/regression/FeedSuspendTest.java | 42 +-
.../falcon/regression/InstanceParamTest.java | 52 +-
.../falcon/regression/InstanceSummaryTest.java | 182 +++----
.../apache/falcon/regression/NewRetryTest.java | 103 ++--
.../falcon/regression/NoOutputProcessTest.java | 28 +-
.../ProcessInstanceColoMixedTest.java | 21 +-
.../regression/ProcessInstanceKillsTest.java | 118 ++---
.../regression/ProcessInstanceRerunTest.java | 159 +++----
.../regression/ProcessInstanceResumeTest.java | 225 +++------
.../regression/ProcessInstanceRunningTest.java | 70 +--
.../regression/ProcessInstanceStatusTest.java | 160 +++----
.../regression/ProcessInstanceSuspendTest.java | 129 ++---
.../falcon/regression/ProcessLibPathTest.java | 46 +-
.../regression/hcat/HCatFeedOperationsTest.java | 269 +++++++++++
.../falcon/regression/hcat/HCatProcessTest.java | 7 +
.../regression/hcat/HCatReplicationTest.java | 16 +-
.../regression/hcat/HCatRetentionTest.java | 13 +-
.../lineage/LineageApiProcessInstanceTest.java | 11 +-
.../regression/lineage/LineageApiTest.java | 25 +-
.../regression/prism/EntityDryRunTest.java | 45 +-
.../prism/FeedDelayParallelTimeoutTest.java | 13 +-
.../regression/prism/FeedRetentionTest.java | 219 ---------
.../prism/NewPrismProcessUpdateTest.java | 121 ++---
.../regression/prism/OptionalInputTest.java | 150 +++---
.../prism/PrismClusterDeleteTest.java | 34 +-
.../prism/PrismConcurrentRequestTest.java | 32 +-
.../regression/prism/PrismFeedDeleteTest.java | 133 ++----
.../prism/PrismFeedLateReplicationTest.java | 19 +-
.../PrismFeedReplicationPartitionExpTest.java | 79 ++--
.../prism/PrismFeedReplicationUpdateTest.java | 33 +-
.../regression/prism/PrismFeedResumeTest.java | 142 +++---
.../regression/prism/PrismFeedScheduleTest.java | 10 +-
.../regression/prism/PrismFeedSnSTest.java | 288 ++++++------
.../regression/prism/PrismFeedSuspendTest.java | 113 ++---
.../regression/prism/PrismFeedUpdateTest.java | 94 ++--
.../prism/PrismProcessDeleteTest.java | 101 ++--
.../prism/PrismProcessResumeTest.java | 166 +++----
.../prism/PrismProcessScheduleTest.java | 167 +++----
.../regression/prism/PrismProcessSnSTest.java | 164 +++----
.../prism/PrismProcessSuspendTest.java | 104 ++--
.../regression/prism/PrismSubmitTest.java | 114 ++---
.../prism/ProcessPartitionExpVariableTest.java | 7 +-
.../prism/RescheduleKilledProcessTest.java | 39 +-
.../RescheduleProcessInFinalStatesTest.java | 31 +-
.../falcon/regression/prism/RetentionTest.java | 87 +++-
.../prism/UpdateAtSpecificTimeTest.java | 469 +++++++------------
.../falcon/regression/ui/LineageGraphTest.java | 8 +-
.../falcon/regression/ui/ProcessUITest.java | 36 +-
.../FETL-BillingRC.xml | 14 +-
.../test/resources/impressionRC/cluster-0.1.xml | 43 --
.../impressionRC/feed-FETL-ImpressionRC.xml | 45 --
.../impressionRC/feed-FETL-RequestRC.xml | 46 --
.../resources/impressionRC/feed-FETL2-RRLog.xml | 33 --
.../process-FETL-ImpressionRC-Conversion.xml | 45 --
falcon-regression/pom.xml | 4 -
.../falcon/messaging/JMSMessageConsumer.java | 36 +-
.../falcon/messaging/JMSMessageProducer.java | 20 +-
.../falcon/messaging/FeedProducerTest.java | 12 +-
.../messaging/JMSMessageConsumerTest.java | 4 +-
.../messaging/JMSMessageProducerTest.java | 41 +-
.../falcon/messaging/ProcessProducerTest.java | 12 +-
.../org/apache/falcon/aspect/GenericAlert.java | 7 +
.../feed/FeedReplicationCoordinatorBuilder.java | 15 +-
.../feed/FeedRetentionCoordinatorBuilder.java | 4 +-
.../ProcessExecutionCoordinatorBuilder.java | 26 +-
.../falcon/workflow/FalconPostProcessing.java | 7 +-
.../src/main/resources/action/post-process.xml | 2 +-
oozie/src/main/resources/action/pre-process.xml | 4 +-
.../feed/OozieFeedWorkflowBuilderTest.java | 9 +-
.../falcon/oozie/process/AbstractTestBase.java | 13 +-
.../OozieProcessWorkflowBuilderTest.java | 16 +-
.../workflow/FalconPostProcessingTest.java | 56 ++-
pom.xml | 6 +
.../falcon/resource/AbstractEntityManager.java | 180 ++++---
.../resource/AbstractInstanceManager.java | 234 ++++-----
.../AbstractSchedulableEntityManager.java | 107 ++++-
.../resource/proxy/InstanceManagerProxy.java | 22 +-
.../proxy/SchedulableEntityManagerProxy.java | 55 ++-
.../falcon/resource/EntityManagerTest.java | 66 ++-
.../metadata/LineageMetadataResourceTest.java | 4 +-
.../apache/falcon/latedata/LateDataHandler.java | 10 +-
.../falcon/rerun/handler/LateRerunConsumer.java | 22 +-
.../rerun/handler/TestLateRerunHandler.java | 13 +
.../apache/falcon/retention/FeedEvictor.java | 30 +-
.../falcon/cluster/util/EmbeddedCluster.java | 24 +-
.../apache/falcon/resource/InstanceManager.java | 21 +-
.../resource/SchedulableEntityManager.java | 30 +-
.../java/org/apache/falcon/cli/FalconCLIIT.java | 107 ++++-
.../apache/falcon/late/LateDataHandlerIT.java | 4 +-
.../falcon/resource/EntityManagerJerseyIT.java | 18 +-
.../org/apache/falcon/resource/TestContext.java | 5 +-
142 files changed, 4842 insertions(+), 4169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --cc falcon-regression/CHANGES.txt
index 2e2b447,b74a4ee..2f5de05
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@@ -5,25 -5,44 +5,62 @@@ Trunk (Unreleased
INCOMPATIBLE CHANGES
NEW FEATURES
+ FALCON-589 Add test cases for various feed operations on Hcat feeds (Karishma G
+ via Samarth Gupta)
IMPROVEMENTS
-
- FALCON-629 Add javadoc for method stitchTwoHdfsPaths (Raghav Kumar Gautam
- via Samarth Gupta)
-
+ FALCON-585 Remove requirement to have write access to / (Raghav Kumar Gautam)
++ FALCON-680: Tests using LocalDC, impressionRC, updateBundle should stop using root dir
++ (Raghav Kumar Gautam)
++
++ FALCON-679: Tests using ELbundle should stop writing to root dir (Raghav Kumar Gautam)
++
+ FALCON-641 Tests using LateDataBundles should stop using root dir (Raghav
+ Kumar Gautam via Arpit Gupta)
++
+ FALCON-633 RetryTests and Retentions tests should stop using root dir
+ (Raghav Kumar Gautam via Samarth Gupta)
++
++ FALCON-629 Add javadoc for method stitchTwoHdfsPaths (Raghav Kumar Gautam
++ via Samarth Gupta)
++
+ FALCON-606 hcat tests should stop using root dir (Raghav Kumar Gautam
+ via Arpit Gupta)
+
- FALCON-619 ELExp_FutureAndLatestTest stabilization (Paul Isaychuk via
- Arpit Gupta)
+ FALCON-675 Request URLS moved from parameters into methods in falcon-regression (Ruslan
+ Ostafiychuk)
+
+ FALCON-656 add test in falcon regression's Authorization test where non-feed owner updates
+ a feed with a dependent process(Karishma via Samarth Gupta)
+
+ FALCON-674 General code factored out for ProcessInstance* tests (Paul Isaychuk via Ruslan
+ Ostafiychuk)
+
+ FALCON-657 String datePattern moved to BaseTestClass (Ruslan Ostafiychuk)
- FALCON-610 Refactoring and documentation updates (Paul Isaychuk via
- Arpit Gupta)
+ FALCON-643 Tests with zero-output/input scenario amended to match test case (Paul Isaychuk via
+ Ruslan Ostafiychuk)
+
+ FALCON-660 7 test classes refactored and few of them documented (Paul Isaychuk via
+ Ruslan Ostafiychuk)
+
+ FALCON-653 Add falcon regression test for zero input process(Karishma via Samarth Gupta)
+ FALCON-655 Skip workflow upload if process won't be submitted (Ruslan Ostafiychuk)
+
+ FALCON-587 Don't delete input data in @AfterClass in falcon-regression tests if
+ clean_test_dir=false (Ruslan Ostafiychuk)
+
+ FALCON-646 Refactoring, documentation stuff (Paul Isaychuk via Ruslan Ostafiychuk)
+
+ FALCON-572 HadoopUtil cleanup in falcon-regression (Ruslan Ostafiychuk via Samarth Gupta)
+ FALCON-632 Refactoring, documentation stuff (Paul Isaychuk via Samarth Gupta)
+
+ FALCON-609 UpdateAtSpecificTimeTest, InstanceSummaryTest tagged, fixed, refactored
+ (Paul Isaychuk via Samarth Gupta)
+
+ FALCON-619 ELExp_FutureAndLatestTest stabilization (Paul Isaychuk via Arpit Gupta)
-
++
+ FALCON-610 Refactoring and documentation updates (Paul Isaychuk via Arpit Gupta)
FALCON-581 Refactor code for cross product and make it a method
(Raghav Kumar Gautam via Arpit Gupta)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
index ff5a6a8,6d8b032..1536d2c
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
@@@ -44,10 -44,8 +44,9 @@@ import org.apache.falcon.regression.Ent
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.response.ServiceResponse;
import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
- import org.apache.falcon.regression.core.util.Util.URLS;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
index 166fb3d,922c030..62838f8
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
@@@ -44,56 -44,38 +44,52 @@@ public final class BundleUtil
}
private static final Logger LOGGER = Logger.getLogger(BundleUtil.class);
- public static Bundle readLateDataBundle() throws IOException {
- return readBundleFromFolder("LateDataBundles");
+ public static Bundle readLateDataBundle(String appPath, String testName) throws IOException {
+ return generateBundleFromTemplate("LateDataBundles", appPath, testName);
}
- public static Bundle readRetryBundle() throws IOException {
- return readBundleFromFolder("RetryTests");
+ public static Bundle readRetryBundle(String appPath, String testName) throws IOException {
+ return generateBundleFromTemplate("RetryTests", appPath, testName);
}
- public static Bundle readRetentionBundle() throws IOException {
- return readBundleFromFolder("RetentionBundles");
+ public static Bundle readRetentionBundle(String appPath, String testName) throws IOException {
+ return generateBundleFromTemplate("RetentionBundles", appPath, testName);
}
- public static Bundle readELBundle() throws IOException {
- return readBundleFromFolder("ELbundle");
+ public static Bundle readELBundle(String appPath, String testName) throws IOException {
+ return generateBundleFromTemplate("ELbundle", appPath, testName);
}
- public static Bundle readHCatBundle() throws IOException {
- return readBundleFromFolder("hcat");
+ public static Bundle readHCatBundle(String appPath, String testName) throws IOException {
+ return generateBundleFromTemplate("hcat", appPath, testName);
}
- public static Bundle readHCat2Bundle() throws IOException {
- return readBundleFromFolder("hcat_2");
+ public static Bundle readHCat2Bundle(String appPath, String testName) throws IOException {
+ return generateBundleFromTemplate("hcat_2", appPath, testName);
}
- public static Bundle readLocalDCBundle() throws IOException {
- return readBundleFromFolder("LocalDC_feedReplicaltion_BillingRC");
+ public static Bundle readLocalDCBundle(String appPath, String testName) throws IOException {
+ return generateBundleFromTemplate("LocalDC_feedReplicaltion_BillingRC", appPath, testName);
}
- public static Bundle readImpressionRCBundle(String appPath, String testName) throws IOException {
- return generateBundleFromTemplate("impressionRC", appPath, testName);
- }
-
- public static Bundle readUpdateBundle() throws IOException {
- return readBundleFromFolder("updateBundle");
+ public static Bundle readUpdateBundle(String appPath, String testName) throws IOException {
+ return generateBundleFromTemplate("updateBundle", appPath, testName);
}
+ /** Generate a bundle from the template stored in a directory
+ * @param templatePath name of directory where template is stored
+ * @param appPath application path where staging/temp/working directory will live
+ * @param testName name of test
+ * @return the customized bundle
+ * @throws IOException
+ */
+ private static Bundle generateBundleFromTemplate(String templatePath, String appPath,
+ String testName) throws IOException {
+ final Bundle bundle = readBundleFromFolder(templatePath);
+ bundle.updateClusterLocations(HadoopUtil.stitchHdfsPath(appPath, testName));
+ return bundle;
+ }
-
++
private static Bundle readBundleFromFolder(final String folderPath) throws IOException {
LOGGER.info("Loading xmls from directory: " + folderPath);
File directory = null;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
index c348348,af5c01a..735a732
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
@@@ -394,60 -368,4 +368,32 @@@ public final class HadoopUtil
copyDataToFolders(fs, folderPrefix, folderPaths,
OSUtil.NORMAL_INPUT + "_SUCCESS", OSUtil.NORMAL_INPUT + "log_01.txt");
}
+
+ /**
- * Removes general folder on file system. Creates folders according to passed folders list
- * and fills them with data if required.
- *
- * @param fileSystem destination file system
- * @param prefix prefix of path where data should be placed
- * @param folderList list of folders to be created and filled with data if required
- * @param uploadData if folders should be filled with data
- * @throws IOException
- */
- public static void replenishData(FileSystem fileSystem, String prefix, List<String> folderList,
- boolean uploadData) throws IOException {
- //purge data first
- deleteDirIfExists(prefix, fileSystem);
-
- folderList.add(SOMETHING_RANDOM);
-
- for (final String folder : folderList) {
- final String pathString = prefix + folder;
- LOGGER.info(fileSystem.getUri() + pathString);
- fileSystem.mkdirs(new Path(pathString));
- if (uploadData) {
- fileSystem.copyFromLocalFile(new Path(OSUtil.RESOURCES + "log_01.txt"),
- new Path(pathString));
- }
- }
- }
-
- /**
+ * Stitch two or more hadoop paths. For eg: stitchHdfsPath("/tmp/", "/test") = "/tmp/test",
+ * stitchHdfsPath("/tmp/", "test") = "/tmp/test", stitchHdfsPath("/tmp", "test") = "/tmp/test"
+ * @param path1 the first path to be stitched
+ * @param path2 the second path to be stitched
+ * @param pathParts other paths to be stitched
+ * @return final stitched path
+ */
+ public static String stitchHdfsPath(String path1, String path2, String... pathParts) {
+ String retValue = stitchTwoHdfsPaths(path1, path2);
+ for (String pathPart : pathParts) {
+ retValue = stitchTwoHdfsPaths(retValue, pathPart);
+ }
+ return retValue;
+ }
+
+ /**
+ * Stitch two hadoop paths. For eg: stitchTwoHdfsPaths("/tmp/", "/test") = "/tmp/test",
+ * stitchTwoHdfsPaths("/tmp/", "test") = "/tmp/test",
+ * stitchTwoHdfsPaths("/tmp", "test") = "/tmp/test"
+ * @param path1 the first path to be stitched
+ * @param path2 the second path to be stitched
+ * @return final stitched path
+ */
+ private static String stitchTwoHdfsPaths(String path1, String path2) {
+ return path1.replaceAll("/*$", "") + "/" + path2.replaceAll("^/*", "");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/main/java/org/apache/falcon/regression/testHelper/BaseTestClass.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/main/java/org/apache/falcon/regression/testHelper/BaseTestClass.java
index d892332,c16c8f4..bc78699
--- a/falcon-regression/merlin/src/main/java/org/apache/falcon/regression/testHelper/BaseTestClass.java
+++ b/falcon-regression/merlin/src/main/java/org/apache/falcon/regression/testHelper/BaseTestClass.java
@@@ -45,16 -45,14 +45,17 @@@ public class BaseTestClass
}
}
- public ColoHelper prism;
- public List<ColoHelper> servers;
- public List<FileSystem> serverFS;
- public List<OozieClient> serverOC;
- public String baseHDFSDir = "/tmp/falcon-regression";
+ public final ColoHelper prism;
+ public final List<ColoHelper> servers;
+ public final List<FileSystem> serverFS;
+ public final List<OozieClient> serverOC;
+ public final String baseHDFSDir = Config.getProperty("falcon.regression.test.dir",
+ "/tmp/falcon-regression");
+ public final String baseAppHDFSDir = Config.getProperty("falcon.regression.app.dir",
+ "/tmp/falcon-regression-app");
public static final String PRISM_PREFIX = "prism";
protected Bundle[] bundles;
+ public static final String MINUTE_DATE_PATTERN = "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
public BaseTestClass() {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExp_FutureAndLatestTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExp_FutureAndLatestTest.java
index d19daed,25e2dfe..24e94ea
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExp_FutureAndLatestTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExp_FutureAndLatestTest.java
@@@ -50,10 -51,7 +51,9 @@@ public class ELExp_FutureAndLatestTest
ColoHelper cluster = servers.get(0);
FileSystem clusterFS = serverFS.get(0);
OozieClient clusterOC = serverOC.get(0);
- private String prefix;
private String baseTestDir = baseHDFSDir + "/ELExp_FutureAndLatest";
+ private String testInputDir = baseTestDir + "/input";
+ private String testOutputDir = baseTestDir + "/output";
private String aggregateWorkflowDir = baseTestDir + "/aggregator";
private static final Logger logger = Logger.getLogger(ELExp_FutureAndLatestTest.class);
@@@ -69,10 -67,8 +69,8 @@@
String startDate = TimeUtil.getTimeWrtSystemTime(-20);
String endDate = TimeUtil.getTimeWrtSystemTime(70);
- b.setInputFeedDataPath(testInputDir + "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
- b.setInputFeedDataPath(baseTestDir + "/ELExp_latest/testData" + MINUTE_DATE_PATTERN);
++ b.setInputFeedDataPath(testInputDir + MINUTE_DATE_PATTERN);
b.setProcessWorkflow(aggregateWorkflowDir);
- prefix = b.getFeedDataPathPrefix();
- HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 1);
@@@ -82,13 -79,11 +81,11 @@@
@BeforeMethod(alwaysRun = true)
public void setUp(Method method) throws Exception {
logger.info("test name: " + method.getName());
- bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
bundles[0] = new Bundle(bundles[0], cluster);
bundles[0].generateUniqueBundle();
- bundles[0].setInputFeedDataPath(
- testInputDir + "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
- bundles[0].setOutputFeedLocationData(
- testOutputDir + "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
- bundles[0].setInputFeedDataPath(baseTestDir + "/ELExp_latest/testData" +
- MINUTE_DATE_PATTERN);
++ bundles[0].setInputFeedDataPath(testInputDir + MINUTE_DATE_PATTERN);
++ bundles[0].setOutputFeedLocationData(testOutputDir + MINUTE_DATE_PATTERN);
bundles[0].setInputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setInputFeedValidity("2010-04-01T00:00Z", "2015-04-01T00:00Z");
String processStart = TimeUtil.getTimeWrtSystemTime(-3);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
index 6a9edd3,99ffe37..85fed2f
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
@@@ -72,26 -73,19 +73,19 @@@ public class EmbeddedPigScriptTest exte
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
-
logger.info("in @BeforeClass");
+
//copy pig script
HadoopUtil.uploadDir(clusterFS, pigScriptDir, OSUtil.RESOURCES + "pig");
-
- Bundle bundle = BundleUtil.readELBundle();
+ Bundle bundle = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
bundle.generateUniqueBundle();
bundle = new Bundle(bundle, cluster);
-
String startDate = "2010-01-02T00:40Z";
String endDate = "2010-01-02T01:10Z";
-
bundle.setInputFeedDataPath(inputPath);
- prefix = bundle.getFeedDataPathPrefix();
- HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
-
- List<String> dataDates =
- TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
-
- HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+ List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+ HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT,
+ bundle.getFeedDataPathPrefix(), dataDates);
}
@BeforeMethod(alwaysRun = true)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java
index 738d0d1,bb629b8..f7a706d
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java
@@@ -55,10 -54,9 +54,9 @@@ public class FeedResumeTest extends Bas
@BeforeMethod(alwaysRun = true)
public void setup(Method method) throws Exception {
LOGGER.info("test name: " + method.getName());
- bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
bundles[0].generateUniqueBundle();
bundles[0] = new Bundle(bundles[0], cluster);
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
bundles[0].submitClusters(prism);
feed = bundles[0].getInputFeedFromBundle();
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java
index 7ae5da4,8500d84..8f0ee4e
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java
@@@ -51,14 -50,9 +50,9 @@@ public class FeedScheduleTest extends B
@BeforeMethod(alwaysRun = true)
public void setUp(Method method) throws Exception {
LOGGER.info("test name: " + method.getName());
- bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
bundles[0] = new Bundle(bundles[0], cluster);
bundles[0].generateUniqueBundle();
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
Bundle.submitCluster(bundles[0]);
feed = bundles[0].getInputFeedFromBundle();
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedStatusTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedStatusTest.java
index 75def82,6278513..4ee229f
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedStatusTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedStatusTest.java
@@@ -58,10 -54,9 +54,9 @@@ public class FeedStatusTest extends Bas
@BeforeMethod(alwaysRun = true)
public void setUp(Method method) throws Exception {
LOGGER.info("test name: " + method.getName());
- bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
bundles[0].generateUniqueBundle();
bundles[0] = new Bundle(bundles[0], cluster);
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
//submit the cluster
ServiceResponse response =
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
index a658102,db82a8c..2c246de
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
@@@ -63,10 -57,10 +57,10 @@@ public class FeedSubmitAndScheduleTest
@BeforeMethod(alwaysRun = true)
public void setUp(Method method) throws Exception {
LOGGER.info("test name: " + method.getName());
- bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
bundles[0] = new Bundle(bundles[0], cluster);
bundles[0].generateUniqueBundle();
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ feed = bundles[0].getDataSets().get(0);
}
@AfterMethod(alwaysRun = true)
@@@ -179,7 -160,9 +160,9 @@@
AssertUtil.assertSucceeded(response);
AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
}
- }
-
-
+ @AfterClass(alwaysRun = true)
+ public void tearDownClass() throws IOException {
+ cleanTestDirs();
+ }
-}
++}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitTest.java
index a8795db,8d96741..3bd71ab
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitTest.java
@@@ -48,14 -47,9 +47,9 @@@ public class FeedSubmitTest extends Bas
@BeforeMethod(alwaysRun = true)
public void setUp(Method method) throws Exception {
LOGGER.info("test name: " + method.getName());
- bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
bundles[0].generateUniqueBundle();
bundles[0] = new Bundle(bundles[0], cluster);
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
//submit the cluster
ServiceResponse response =
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSuspendTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSuspendTest.java
index d324b64,9676a30..adab5a6
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSuspendTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSuspendTest.java
@@@ -51,14 -50,9 +50,9 @@@ public class FeedSuspendTest extends Ba
@BeforeMethod(alwaysRun = true)
public void setUp(Method method) throws Exception {
LOGGER.info("test name: " + method.getName());
- bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
bundles[0].generateUniqueBundle();
bundles[0] = new Bundle(bundles[0], cluster);
- bundles[0].setProcessWorkflow(aggregateWorkflowDir);
//submit the cluster
ServiceResponse response =
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceParamTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
index 2e62360,db600ff..6c873c3
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
@@@ -57,19 -60,8 +60,9 @@@ import java.util.List
@Test(groups = "embedded")
public class InstanceSummaryTest extends BaseTestClass {
- //1. process : test summary single cluster few instance some future some past
- //2. process : test multiple cluster, full past on one cluster,
- // full future on one cluster, half future / past on third one
-
- // 3. feed : same as test 1 for feed
- // 4. feed : same as test 2 for feed
-
-
String baseTestHDFSDir = baseHDFSDir + "/InstanceSummaryTest";
- String feedInputPath = baseTestHDFSDir +
- "/testInputData/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
- String feedOutputPath = baseTestHDFSDir +
- "/testOutputData/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ String feedInputPath = baseTestHDFSDir + "/testInputData" + MINUTE_DATE_PATTERN;
++ String feedOutputPath = baseTestHDFSDir + "/testOutputData" + MINUTE_DATE_PATTERN;
String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
String startTime;
String endTime;
@@@ -290,23 -255,23 +257,22 @@@
feedInputPath, 1);*/
//submit and schedule feed
- prism.getFeedHelper().submitAndSchedule(Util.URLS
- .SUBMIT_AND_SCHEDULE_URL, feed);
+ prism.getFeedHelper().submitAndSchedule(feed);
InstancesSummaryResult r = prism.getFeedHelper()
- .getInstanceSummary(Util.readEntityName(feed),
- "?start=" + startTime);
-
- r = prism.getFeedHelper()
- .getInstanceSummary(Util.readEntityName(feed),
- "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(endTime,
- -20));
+ .getInstanceSummary(Util.readEntityName(feed), "?start=" + startTime);
+ r = prism.getFeedHelper().getInstanceSummary(Util.readEntityName(feed),
+ "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(endTime, -20));
}
-
@AfterMethod(alwaysRun = true)
public void tearDown() throws IOException {
- processBundle.deleteBundle(prism);
- removeBundles();
+ removeBundles(processBundle);
}
+
+ @AfterClass(alwaysRun = true)
+ public void tearDownClass() throws IOException {
+ cleanTestDirs();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
index 8bcc797,d4f31e9..27d4fdf
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
@@@ -74,11 -75,8 +75,10 @@@ public class NewRetryTest extends BaseT
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy/MM/dd/HH/mm");
final private String baseTestDir = baseHDFSDir + "/NewRetryTest";
final private String aggregateWorkflowDir = baseTestDir + "/aggregator";
- final private String lateDir = baseTestDir + "/lateDataTest/testFolders";
- final private String latePath = lateDir + MINUTE_DATE_PATTERN;
+ final private String lateInputDir = baseTestDir + "/lateDataTest/inputFolders/";
- final private String lateInputPath = lateInputDir + "${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
++ final private String lateInputPath = lateInputDir + MINUTE_DATE_PATTERN;
+ final private String lateOutputDir = baseTestDir + "/lateDataTest/outputFolders/";
- final private String lateOutputPath = lateOutputDir
- + "${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
++ final private String lateOutputPath = lateOutputDir + MINUTE_DATE_PATTERN;
private DateTime startDate;
private DateTime endDate;
@@@ -182,11 -178,11 +182,11 @@@
AssertUtil.assertFailed(response);
} else {
AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+ HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+ HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
//now wait till the process is over
AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ prism.getProcessHelper().schedule(bundles[0].getProcessData()));
String bundleId = OozieUtil.getBundles(clusterOC,
Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
@@@ -240,11 -236,11 +240,11 @@@
AssertUtil.assertFailed(response);
} else {
AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+ HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+ HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
@@@ -293,10 -289,10 +293,10 @@@
AssertUtil.assertFailed(response);
} else {
AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+ HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+ HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
@@@ -349,10 -345,10 +349,10 @@@
AssertUtil.assertFailed(response);
} else {
AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+ HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+ HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
@@@ -397,10 -393,10 +397,10 @@@
AssertUtil.assertFailed(response);
} else {
AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+ HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+ HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
@@@ -449,10 -444,10 +448,10 @@@
AssertUtil.assertFailed(response);
} else {
AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+ HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+ HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
@@@ -503,10 -498,10 +502,10 @@@
AssertUtil.assertFailed(response);
} else {
AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+ HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+ HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
@@@ -555,10 -550,10 +554,10 @@@
AssertUtil.assertFailed(response);
} else {
AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+ HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+ HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
@@@ -594,10 -589,10 +593,10 @@@
AssertUtil.assertFailed(response);
} else {
AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+ HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+ HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
@@@ -648,10 -643,10 +647,10 @@@
AssertUtil.assertFailed(response);
} else {
AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+ HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+ HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
Util.readEntityName(bundles[0].getProcessData()),
@@@ -703,10 -698,10 +702,10 @@@
AssertUtil.assertFailed(response);
} else {
AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+ HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+ HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ prism.getProcessHelper().schedule(bundles[0].getProcessData()));
String bundleId = OozieUtil.getBundles(clusterOC,
Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
List<DateTime> dates = null;
@@@ -800,13 -795,13 +799,13 @@@
AssertUtil.assertFailed(response);
} else {
AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+ HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+ HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
List<String> initialData =
Util.getHadoopDataFromDir(clusterFS, bundles[0].getInputFeedFromBundle(),
- lateDir);
+ lateInputDir);
AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ prism.getProcessHelper().schedule(bundles[0].getProcessData()));
String bundleId = OozieUtil.getBundles(clusterOC,
Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
List<DateTime> dates = null;
@@@ -878,10 -873,10 +877,10 @@@
AssertUtil.assertFailed(response);
} else {
AssertUtil.assertSucceeded(response);
- HadoopUtil.deleteDirIfExists(lateDir, clusterFS);
- HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateDir);
+ HadoopUtil.deleteDirIfExists(lateInputDir, clusterFS);
+ HadoopUtil.lateDataReplenish(clusterFS, 20, 0, lateInputDir);
AssertUtil.assertSucceeded(
- prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ prism.getProcessHelper().schedule(bundles[0].getProcessData()));
//now wait till the process is over
String bundleId = OozieUtil.getBundles(clusterOC,
Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
index 86faf3b,638768d..cf91d83
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
@@@ -54,24 -57,19 +57,20 @@@ public class NoOutputProcessTest extend
FileSystem clusterFS = serverFS.get(0);
OozieClient clusterOC = serverOC.get(0);
String testDir = baseHDFSDir + "/NoOutputProcessTest";
- String inputPath = testDir + "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
- String outputPath = testDir + "/output/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ String inputPath = testDir + "/input" + MINUTE_DATE_PATTERN;
++ String outputPath = testDir + "/output/" + MINUTE_DATE_PATTERN;
String aggregateWorkflowDir = testDir + "/aggregator";
private static final Logger logger = Logger.getLogger(NoOutputProcessTest.class);
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
-
logger.info("in @BeforeClass");
HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-
- Bundle b = BundleUtil.readELBundle();
+ Bundle b = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
b.generateUniqueBundle();
b = new Bundle(b, cluster);
-
String startDate = "2010-01-03T00:00Z";
String endDate = "2010-01-03T03:00Z";
-
b.setInputFeedDataPath(inputPath);
String prefix = b.getFeedDataPathPrefix();
HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
@@@ -89,9 -85,11 +86,12 @@@
bundles[0] = new Bundle(bundles[0], cluster);
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
bundles[0].setInputFeedDataPath(inputPath);
+ bundles[0].setOutputFeedLocationData(outputPath);
bundles[0].setProcessValidity("2010-01-03T02:30Z", "2010-01-03T02:45Z");
bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+ process.setOutputs(null);
+ bundles[0].setProcessData(process.toString());
bundles[0].submitFeedsScheduleProcess(prism);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
index 03fdd94,7b938ec..68b308e
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
@@@ -66,27 -66,21 +66,21 @@@ public class ProcessInstanceKillsTest e
public void createTestData() throws Exception {
LOGGER.info("in @BeforeClass");
HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-
- Bundle b = BundleUtil.readELBundle();
+ Bundle b = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
b.generateUniqueBundle();
b = new Bundle(b, cluster);
-
String startDate = "2010-01-01T23:20Z";
String endDate = "2010-01-02T01:21Z";
-
b.setInputFeedDataPath(feedInputPath);
- String prefix = b.getFeedDataPathPrefix();
- HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
-
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
- HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+ HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT,
+ b.getFeedDataPathPrefix(), dataDates);
}
@BeforeMethod(alwaysRun = true)
public void setup(Method method) throws Exception {
LOGGER.info("test name: " + method.getName());
-
- bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
bundles[0] = new Bundle(bundles[0], cluster);
bundles[0].generateUniqueBundle();
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
index 422c6b3,7a7e735..8934eb7
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
@@@ -67,22 -66,17 +66,17 @@@ public class ProcessInstanceRunningTes
public void createTestData() throws Exception {
LOGGER.info("in @BeforeClass");
HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-
- Bundle bundle = BundleUtil.readELBundle();
+ Bundle bundle = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
bundle.generateUniqueBundle();
bundle = new Bundle(bundle, cluster);
-
String startDate = "2010-01-02T00:40Z";
String endDate = "2010-01-02T01:11Z";
-
bundle.setInputFeedDataPath(feedInputPath);
- String prefix = bundle.getFeedDataPathPrefix();
- HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
- HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+ HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT,
+ bundle.getFeedDataPathPrefix(), dataDates);
}
-
@BeforeMethod(alwaysRun = true)
public void setup(Method method) throws Exception {
LOGGER.info("test name: " + method.getName());
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
index 4f06501,12a3907..df959b5
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
@@@ -75,21 -76,15 +76,16 @@@ public class ProcessInstanceStatusTest
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
LOGGER.info("in @BeforeClass");
-
HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
- Bundle bundle = BundleUtil.readELBundle();
+
+ Bundle bundle = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
bundle.generateUniqueBundle();
bundle = new Bundle(bundle, cluster);
-
String startDate = "2010-01-01T23:40Z";
String endDate = "2010-01-02T02:40Z";
-
bundle.setInputFeedDataPath(feedInputPath);
String prefix = bundle.getFeedDataPathPrefix();
-
HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
-
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
index b64fca4,c2d8c9b..743990e
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
@@@ -55,35 -59,29 +59,29 @@@ public class ProcessLibPathTest extend
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
-
logger.info("in @BeforeClass");
+
//common lib for both test cases
HadoopUtil.uploadDir(clusterFS, testLibDir, OSUtil.RESOURCES_OOZIE + "lib");
-
- Bundle b = BundleUtil.readELBundle();
+ Bundle b = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
b.generateUniqueBundle();
b = new Bundle(b, cluster);
-
String startDate = "2010-01-01T22:00Z";
String endDate = "2010-01-02T03:00Z";
-
- b.setInputFeedDataPath(testDir + "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+ b.setInputFeedDataPath(testDir + "/input" + MINUTE_DATE_PATTERN);
String prefix = b.getFeedDataPathPrefix();
HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
-
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
-
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
}
@BeforeMethod(alwaysRun = true)
public void testName(Method method) throws Exception {
logger.info("test name: " + method.getName());
- bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
bundles[0] = new Bundle(bundles[0], cluster);
bundles[0].generateUniqueBundle();
- bundles[0].setInputFeedDataPath(baseHDFSDir + "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+ bundles[0].setInputFeedDataPath(baseHDFSDir + MINUTE_DATE_PATTERN);
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
index 0000000,8b5c01c..5598744
mode 000000,100644..100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
@@@ -1,0 -1,269 +1,269 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.falcon.regression.hcat;
+
+ import org.apache.falcon.entity.v0.EntityType;
+ import org.apache.falcon.entity.v0.Frequency;
+ import org.apache.falcon.entity.v0.cluster.Interfacetype;
+ import org.apache.falcon.entity.v0.feed.ActionType;
+ import org.apache.falcon.entity.v0.feed.ClusterType;
+ import org.apache.falcon.regression.Entities.FeedMerlin;
+ import org.apache.falcon.regression.core.bundle.Bundle;
+ import org.apache.falcon.regression.core.enumsAndConstants.FeedType;
+ import org.apache.falcon.regression.core.helpers.ColoHelper;
+ import org.apache.falcon.regression.core.response.ServiceResponse;
+ import org.apache.falcon.regression.core.util.AssertUtil;
+ import org.apache.falcon.regression.core.util.BundleUtil;
+ import org.apache.falcon.regression.core.util.HCatUtil;
+ import org.apache.falcon.regression.core.util.OSUtil;
+ import org.apache.falcon.regression.core.util.Util;
+ import org.apache.falcon.regression.core.util.InstanceUtil;
+ import org.apache.falcon.regression.core.util.XmlUtil;
+ import org.apache.falcon.regression.testHelper.BaseTestClass;
+ import org.apache.hive.hcatalog.api.HCatClient;
+ import org.apache.hive.hcatalog.api.HCatCreateTableDesc;
+ import org.apache.hive.hcatalog.common.HCatException;
+ import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+ import org.apache.log4j.Logger;
+ import org.apache.oozie.client.Job;
+ import org.apache.oozie.client.OozieClient;
+ import org.testng.Assert;
+ import org.testng.annotations.BeforeClass;
+ import org.testng.annotations.BeforeMethod;
+ import org.testng.annotations.AfterMethod;
+ import org.testng.annotations.AfterClass;
+ import org.testng.annotations.Test;
+
+ import java.io.IOException;
+ import java.lang.reflect.Method;
+ import java.util.ArrayList;
+ import java.util.List;
+
+ public class HCatFeedOperationsTest extends BaseTestClass {
+
+ ColoHelper cluster = servers.get(0);
+ OozieClient clusterOC = serverOC.get(0);
+ HCatClient clusterHC = cluster.getClusterHelper().getHCatClient();
+
+ ColoHelper cluster2 = servers.get(1);
+ OozieClient cluster2OC = serverOC.get(1);
+ HCatClient cluster2HC = cluster2.getClusterHelper().getHCatClient();
+
+ private String dbName = "default";
+ private String tableName = "hcatFeedOperationsTest";
+ private String randomTblName = "randomTable_HcatFeedOperationsTest";
+ private String feed;
+ private String aggregateWorkflowDir = baseHDFSDir + "/HCatFeedOperationsTest/aggregator";
+ private static final Logger LOGGER = Logger.getLogger(HCatFeedOperationsTest.class);
+
+ public void uploadWorkflow() throws Exception {
+ uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ }
+
+ @BeforeClass(alwaysRun = true)
+ public void createTestData() throws Exception {
+ //create an empty table for feed operations
+ ArrayList<HCatFieldSchema> partitions = new ArrayList<HCatFieldSchema>();
+ partitions.add(HCatUtil.getStringSchema("year", "yearPartition"));
+ createEmptyTable(clusterHC, dbName, tableName, partitions);
+
+ //A random table to test submission of replication feed when table doesn't exist on target
+ createEmptyTable(clusterHC, dbName, randomTblName, partitions);
+
+ //create empty table on target cluster
+ createEmptyTable(cluster2HC, dbName, tableName, new ArrayList<HCatFieldSchema>());
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ LOGGER.info("test name: " + method.getName());
- Bundle bundle = BundleUtil.readHCatBundle();
++ Bundle bundle = BundleUtil.readHCatBundle(baseAppHDFSDir, this.getClass().getSimpleName());
+ bundles[0] = new Bundle(bundle, cluster.getPrefix());
+ bundles[0].generateUniqueBundle();
+ bundles[0].setClusterInterface(Interfacetype.REGISTRY, cluster.getClusterHelper().getHCatEndpoint());
+
+
+ bundles[1] = new Bundle(bundle, cluster2.getPrefix());
+ bundles[1].generateUniqueBundle();
+ bundles[1].setClusterInterface(Interfacetype.REGISTRY, cluster2.getClusterHelper().getHCatEndpoint());
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws HCatException {
+ removeBundles();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void tearDownClass() throws IOException {
+ clusterHC.dropTable(dbName, tableName, true);
+ clusterHC.dropTable(dbName, randomTblName, true);
+ cluster2HC.dropTable(dbName, tableName, true);
+ cleanTestDirs();
+ }
+
+ /**
+ * Submit Hcat feed when Hcat table mentioned in table uri does not exist. Response should reflect failure.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void submitFeedWhenTableDoesNotExist() throws Exception {
+ Bundle.submitCluster(bundles[1]);
+ feed = bundles[1].getInputFeedFromBundle();
+ FeedMerlin feedObj = new FeedMerlin(feed);
+ feedObj.setTableValue(dbName, randomTblName, FeedType.YEARLY.getHcatPathValue());
+ ServiceResponse response = prism.getFeedHelper().submitEntity(feedObj.toString());
+ AssertUtil.assertFailed(response);
+ }
+
+ /**
+ * Submit Hcat feed when Hcat table mentioned in table uri exists. Delete that feed, and re-submit.
+ * All responses should reflect success.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void submitFeedPostDeletionWhenTableExists() throws Exception {
+ Bundle.submitCluster(bundles[0]);
+ feed = bundles[0].getInputFeedFromBundle();
+ FeedMerlin feedObj = new FeedMerlin(feed);
+ feedObj.setTableValue(dbName, tableName, FeedType.YEARLY.getHcatPathValue());
+ ServiceResponse response = prism.getFeedHelper().submitEntity(feedObj.toString());
+ AssertUtil.assertSucceeded(response);
+
+ response = prism.getFeedHelper().delete(feedObj.toString());
+ AssertUtil.assertSucceeded(response);
+
+ response = prism.getFeedHelper().submitEntity(feedObj.toString());
+ AssertUtil.assertSucceeded(response);
+ }
+
+ /**
+ * Submit Hcat Replication feed when Hcat table mentioned in table uri does not exist on target. The response is
+ * Partial, with successful with submit/schedule on source.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void submitAndScheduleReplicationFeedWhenTableDoesNotExistOnTarget() throws Exception {
+ Bundle.submitCluster(bundles[0], bundles[1]);
+ final String startDate = "2010-01-01T20:00Z";
+ final String endDate = "2099-01-01T00:00Z";
+ String tableUri = "catalog:" + dbName + ":" + randomTblName + "#year=${YEAR}";
+ bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setInputFeedValidity(startDate, endDate);
+ bundles[0].setInputFeedTableUri(tableUri);
+
+ feed = bundles[0].getDataSets().get(0);
+ // set cluster 2 as the target.
+ feed = InstanceUtil.setFeedClusterWithTable(feed,
+ XmlUtil.createValidity(startDate, endDate),
+ XmlUtil.createRtention("months(9000)", ActionType.DELETE),
+ Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+ tableUri);
+
+ AssertUtil.assertPartial(prism.getFeedHelper().submitAndSchedule(feed));
+ }
+
+ /**
+ * Submit Hcat Replication feed when Hcat table mentioned in table uri exists on both source and target. The response is
+ * Psucceeded, and a replication co-rdinator should apear on target oozie. The test however does not ensure that
+ * replication goes through.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget() throws Exception {
+ Bundle.submitCluster(bundles[0], bundles[1]);
+ final String startDate = "2010-01-01T20:00Z";
+ final String endDate = "2099-01-01T00:00Z";
+ String tableUri = "catalog:" + dbName + ":" + tableName + "#year=${YEAR}";
+ bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
+ bundles[0].setInputFeedValidity(startDate, endDate);
+ bundles[0].setInputFeedTableUri(tableUri);
+
+ feed = bundles[0].getDataSets().get(0);
+ // set cluster 2 as the target.
+ feed = InstanceUtil.setFeedClusterWithTable(feed,
+ XmlUtil.createValidity(startDate, endDate),
+ XmlUtil.createRtention("months(9000)", ActionType.DELETE),
+ Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
+ tableUri);
+
+ AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
+ Assert.assertEquals(InstanceUtil
+ .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed),
+ "REPLICATION"), 1);
+ //This test doesn't wait for replication to succeed.
+ }
+
+ /**
+ * Submit Hcat Replication feed. Suspend the feed, and check that feed was suspended on
+ * both clusters. Now resume feed, and check that status is running on both clusters.
+ * The test however does not ensure that replication goes through.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void suspendAndResumeReplicationFeed() throws Exception {
+
+ submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget();
+
+ AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(feed));
+
+ //check that feed suspended on both clusters
+ AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, feed, Job.Status.SUSPENDED);
+
+ AssertUtil.assertSucceeded(prism.getFeedHelper().resume(feed));
+
+ AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, feed, Job.Status.RUNNING);
+ }
+
+ /**
+ * Submit Hcat Replication feed. Delete the feed, and check that feed was deleted on
+ * both clusters. The test however does not ensure that replication goes through.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void deleteReplicationFeed() throws Exception {
+ submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget();
+
+ AssertUtil.assertSucceeded(prism.getFeedHelper().delete(feed));
+ AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.KILLED);
+ AssertUtil.checkStatus(cluster2OC, EntityType.FEED, feed, Job.Status.KILLED);
+ }
+
+
+ public static void createEmptyTable(HCatClient cli, String dbName, String tabName, List<HCatFieldSchema> partitionCols) throws HCatException{
+
+ ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+ cols.add(HCatUtil.getStringSchema("id", "id comment"));
+ HCatCreateTableDesc tableDesc = HCatCreateTableDesc
+ .create(dbName, tabName, cols)
+ .partCols(partitionCols)
+ .fileFormat("textfile")
+ .ifNotExists(true)
+ .isTableExternal(true)
+ .build();
+ cli.createTable(tableDesc);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/EntityDryRunTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayParallelTimeoutTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java
index 25a8789,0000000..e69de29
mode 100644,000000..100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismClusterDeleteTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismConcurrentRequestTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedResumeTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSuspendTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessResumeTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSuspendTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
index 233c50f,378cbbd..71af1e2
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java
@@@ -80,19 -79,17 +79,18 @@@ public class UpdateAtSpecificTimeTest e
@BeforeMethod(alwaysRun = true)
public void setup(Method method) throws IOException {
logger.info("test name: " + method.getName());
- Bundle bundle = BundleUtil.readLocalDCBundle();
+ Bundle bundle =
+ BundleUtil.readLocalDCBundle(baseAppHDFSDir, this.getClass().getSimpleName());
- bundles[0] = new Bundle(bundle, cluster_1);
- bundles[1] = new Bundle(bundle, cluster_2);
- bundles[2] = new Bundle(bundle, cluster_3);
+ bundles[0] = new Bundle(bundle, cluster1);
+ bundles[1] = new Bundle(bundle, cluster2);
+ bundles[2] = new Bundle(bundle, cluster3);
bundles[0].generateUniqueBundle();
bundles[1].generateUniqueBundle();
bundles[2].generateUniqueBundle();
- processBundle = BundleUtil.readELBundle();
+ processBundle = BundleUtil.readELBundle(baseAppHDFSDir, this.getClass().getSimpleName());
- processBundle = new Bundle(processBundle, cluster_1);
+ processBundle = new Bundle(processBundle, cluster1);
processBundle.generateUniqueBundle();
processBundle.setProcessWorkflow(aggregateWorkflowDir);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/LineageGraphTest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/LineageGraphTest.java
index b725b14,d7ac724..b014dad
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/LineageGraphTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/LineageGraphTest.java
@@@ -62,9 -62,7 +62,8 @@@ public class LineageGraphTest extends B
private String baseTestDir = baseHDFSDir + "/LineageGraphTest";
private String aggregateWorkflowDir = baseTestDir + "/aggregator";
private static final Logger logger = Logger.getLogger(LineageGraphTest.class);
- String datePattern = "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
- String feedInputPath = baseTestDir + "/input" + datePattern;
- String feedOutputPath = baseTestDir + "/output" + datePattern;
- String feedInputPath = baseTestDir + "/input" + MINUTE_DATE_PATTERN;
++ String feedInputPath = baseTestDir + "/input" + MINUTE_DATE_PATTERN;;
++ String feedOutputPath = baseTestDir + "/output" + MINUTE_DATE_PATTERN;;
private FileSystem clusterFS = serverFS.get(0);
private OozieClient clusterOC = serverOC.get(0);
private String processName = null;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d3e97d7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
----------------------------------------------------------------------
diff --cc falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
index 6c9c27f,76173b7..6f673c9
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
@@@ -95,8 -95,7 +95,8 @@@ public class ProcessUITest extends Base
bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.minutes);
bundles[0].setProcessConcurrency(5);
bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.minutes);
- bundles[0].setInputFeedDataPath(feedInputPath + datePattern);
- bundles[0].setOutputFeedLocationData(feedOutputPath + datePattern);
+ bundles[0].setInputFeedDataPath(feedInputPath + MINUTE_DATE_PATTERN);
++ bundles[0].setOutputFeedLocationData(feedOutputPath + MINUTE_DATE_PATTERN);
Process process = bundles[0].getProcessObject();
Inputs inputs = new Inputs();
Input input = new Input();