You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2014/09/12 00:19:30 UTC

[25/41] git commit: FALCON-643 Tests with zero-output/input scenario amended to match test case. Contributed by Paul Isaychuk

FALCON-643 Tests with zero-output/input scenario amended to match test case. Contributed by Paul Isaychuk


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/d9c115e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/d9c115e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/d9c115e0

Branch: refs/heads/FALCON-585
Commit: d9c115e0e9b55b8a61bdce1c43de88a4e1cc6d0d
Parents: 14b9add
Author: Ruslan Ostafiychuk <ro...@apache.org>
Authored: Thu Sep 4 12:56:47 2014 +0300
Committer: Ruslan Ostafiychuk <ro...@apache.org>
Committed: Thu Sep 4 12:57:35 2014 +0300

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |  3 +
 .../falcon/regression/NoOutputProcessTest.java  | 18 +---
 .../regression/prism/PrismFeedUpdateTest.java   | 88 ++++++++++----------
 3 files changed, 53 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d9c115e0/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index b60c23c..0b4714a 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -9,6 +9,9 @@ Trunk (Unreleased)
    via Samarth Gupta)
 
   IMPROVEMENTS
+   FALCON-643 Tests with zero-output/input scenario amended to match test case (Paul Isaychuk via
+   Ruslan Ostafiychuk)
+
    FALCON-660 7 test classes refactored and few of them documented (Paul Isaychuk via
    Ruslan Ostafiychuk)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d9c115e0/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
index 25456a2..2c30f83 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.regression;
 
+import org.apache.falcon.regression.Entities.ProcessMerlin;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
@@ -62,26 +63,20 @@ public class NoOutputProcessTest extends BaseTestClass {
 
     @BeforeClass(alwaysRun = true)
     public void createTestData() throws Exception {
-
         logger.info("in @BeforeClass");
         HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-
         Bundle b = BundleUtil.readELBundle();
         b.generateUniqueBundle();
         b = new Bundle(b, cluster);
-
         String startDate = "2010-01-03T00:00Z";
         String endDate = "2010-01-03T03:00Z";
-
         b.setInputFeedDataPath(inputPath);
         String prefix = b.getFeedDataPathPrefix();
         HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
-
         List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
         HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
     }
 
-
     @BeforeMethod(alwaysRun = true)
     public void testName(Method method) throws Exception {
         logger.info("test name: " + method.getName());
@@ -92,6 +87,9 @@ public class NoOutputProcessTest extends BaseTestClass {
         bundles[0].setInputFeedDataPath(inputPath);
         bundles[0].setProcessValidity("2010-01-03T02:30Z", "2010-01-03T02:45Z");
         bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+        process.setOutputs(null);
+        bundles[0].setProcessData(process.toString());
         bundles[0].submitFeedsScheduleProcess(prism);
     }
 
@@ -110,16 +108,12 @@ public class NoOutputProcessTest extends BaseTestClass {
         //wait for all the instances to complete
         InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
             CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
-
         Assert.assertEquals(messageConsumer.getReceivedMessages().size(), 3,
             " Message for all the 3 instance not found");
-
         messageConsumer.interrupt();
-
         Util.printMessageData(messageConsumer);
     }
 
-
     @Test(enabled = true, groups = {"singleCluster"})
     public void rm() throws Exception {
         JmsMessageConsumer consumerEntityMsg =
@@ -127,22 +121,18 @@ public class NoOutputProcessTest extends BaseTestClass {
         JmsMessageConsumer consumerProcessMsg =
             new JmsMessageConsumer("FALCON." + bundles[0].getProcessName(),
                 cluster.getClusterHelper().getActiveMQ());
-
         consumerEntityMsg.start();
         consumerProcessMsg.start();
 
         //wait for all the instances to complete
         InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
             CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
-
         Assert.assertEquals(consumerEntityMsg.getReceivedMessages().size(), 3,
             " Message for all the 3 instance not found");
         Assert.assertEquals(consumerProcessMsg.getReceivedMessages().size(), 3,
             " Message for all the 3 instance not found");
-
         consumerEntityMsg.interrupt();
         consumerProcessMsg.interrupt();
-
         Util.printMessageData(consumerEntityMsg);
         Util.printMessageData(consumerProcessMsg);
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d9c115e0/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
index 89f3686..c0e1617 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
@@ -37,6 +37,7 @@ import org.apache.falcon.regression.core.util.XmlUtil;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.OozieClient;
 import org.testng.Assert;
@@ -59,7 +60,7 @@ public class PrismFeedUpdateTest extends BaseTestClass {
     ColoHelper cluster1 = servers.get(0);
     ColoHelper cluster2 = servers.get(1);
     FileSystem server1FS = serverFS.get(0);
-    OozieClient OC1 = serverOC.get(0);
+    OozieClient cluster1OC = serverOC.get(0);
     String baseTestDir = baseHDFSDir + "/PrismFeedUpdateTest";
     String aggregateWorkflowDir = baseTestDir + "/aggregator";
     public final String cluster1colo = cluster1.getClusterHelper().getColoName();
@@ -97,23 +98,26 @@ public class PrismFeedUpdateTest extends BaseTestClass {
     public void updateFeedQueue_dependentMultipleProcess_oneProcessZeroInput() throws Exception {
         //cluster1colo and cluster2colo are source. feed01 on cluster1colo target cluster2colo,
         // feed02 on cluster2colo target cluster1colo
+        String cluster1Def = bundles[0].getClusters().get(0);
+        String cluster2Def = bundles[1].getClusters().get(0);
 
-        //get 3 unique bundles
         //set cluster colos
         bundles[0].setCLusterColo(cluster1colo);
-        logger.info("cluster bundles[0]: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
-
+        logger.info("cluster bundles[0]: " + Util.prettyPrintXml(cluster1Def));
         bundles[1].setCLusterColo(cluster2colo);
-        logger.info("cluster bundles[1]: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
+        logger.info("cluster bundles[1]: " + Util.prettyPrintXml(cluster2Def));
 
-        //submit 3 clusters
+        //submit 2 clusters
+        AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(Util.URLS.SUBMIT_URL,
+            cluster1Def));
+        AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(Util.URLS.SUBMIT_URL,
+            cluster2Def));
 
         //get 2 unique feeds
         String feed01 = bundles[0].getInputFeedFromBundle();
         String outputFeed = bundles[0].getOutputFeedFromBundle();
 
-        //set source and target for the 2 feeds
-
+        /* set source and target for the 2 feeds */
         //set clusters to null;
         feed01 = InstanceUtil
             .setFeedCluster(feed01,
@@ -126,68 +130,63 @@ public class PrismFeedUpdateTest extends BaseTestClass {
                 XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
                 ClusterType.SOURCE, null);
 
-
         //set new feed input data
         feed01 = Util.setFeedPathValue(feed01,
             baseTestDir + "/feed01/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/");
 
-
         //generate data in both the colos cluster1colo and cluster2colo
         String prefix = InstanceUtil.getFeedPrefix(feed01);
+        String startTime = TimeUtil.getTimeWrtSystemTime(-40);
+        System.out.println("Start time = " + startTime);
         HadoopUtil.deleteDirIfExists(prefix.substring(1), server1FS);
-        HadoopUtil.lateDataReplenish(server1FS, 70, 1, prefix, null);
-
-        String startTime = TimeUtil.getTimeWrtSystemTime(-50);
+        HadoopUtil.lateDataReplenish(server1FS, 80, 20, prefix, null);
 
         //set clusters for feed01
         feed01 = InstanceUtil
             .setFeedCluster(feed01, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
                 XmlUtil.createRtention("hours(10)", ActionType.DELETE),
-                Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
-                null);
+                Util.readEntityName(cluster1Def), ClusterType.SOURCE, null);
         feed01 = InstanceUtil
             .setFeedCluster(feed01, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
                 XmlUtil.createRtention("hours(10)", ActionType.DELETE),
-                Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET,
-                null);
+                Util.readEntityName(cluster2Def), ClusterType.TARGET, null);
 
         //set clusters for output feed
         outputFeed = InstanceUtil.setFeedCluster(outputFeed,
             XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
             XmlUtil.createRtention("hours(10)", ActionType.DELETE),
-            Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE, null);
+            Util.readEntityName(cluster1Def), ClusterType.SOURCE, null);
         outputFeed = InstanceUtil.setFeedCluster(outputFeed,
             XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
             XmlUtil.createRtention("hours(10)", ActionType.DELETE),
-            Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null);
-
+            Util.readEntityName(cluster2Def), ClusterType.TARGET, null);
 
         //submit and schedule feeds
         logger.info("feed01: " + Util.prettyPrintXml(feed01));
         logger.info("outputFeed: " + Util.prettyPrintXml(outputFeed));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(Util.URLS
+            .SUBMIT_AND_SCHEDULE_URL, feed01));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(Util.URLS
+            .SUBMIT_AND_SCHEDULE_URL, outputFeed));
 
-        //create 2 process with 2 clusters
-
+        /* create 2 process with 2 clusters */
         //get first process
         String process01 = bundles[0].getProcessData();
 
         //add clusters to process
-
         String processStartTime = TimeUtil.getTimeWrtSystemTime(-11);
         String processEndTime = TimeUtil.getTimeWrtSystemTime(70);
-
-
         process01 = InstanceUtil
             .setProcessCluster(process01, null,
                 XmlUtil.createProcessValidity(startTime, "2099-01-01T00:00Z"));
         process01 = InstanceUtil
-            .setProcessCluster(process01, Util.readEntityName(bundles[0].getClusters().get(0)),
+            .setProcessCluster(process01, Util.readEntityName(cluster1Def),
                 XmlUtil.createProcessValidity(processStartTime, processEndTime));
         process01 = InstanceUtil
-            .setProcessCluster(process01, Util.readEntityName(bundles[1].getClusters().get(0)),
+            .setProcessCluster(process01, Util.readEntityName(cluster2Def),
                 XmlUtil.createProcessValidity(processStartTime, processEndTime));
 
-        //get 2nd process :
+        //get 2nd process
         String process02 = process01;
         process02 = InstanceUtil
             .setProcessName(process02, "zeroInputProcess" + new Random().nextInt());
@@ -197,24 +196,27 @@ public class PrismFeedUpdateTest extends BaseTestClass {
         processMerlin.setProcessFeeds(feed, 0, 0, 1);
         process02 = processMerlin.toString();
 
-
         //submit and schedule both process
         logger.info("process: " + Util.prettyPrintXml(process01));
         logger.info("process: " + Util.prettyPrintXml(process02));
-
-
+        AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(Util.URLS
+            .SUBMIT_AND_SCHEDULE_URL, process01));
+        AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(Util.URLS
+            .SUBMIT_AND_SCHEDULE_URL, process02));
         logger.info("Wait till process goes into running ");
+        InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(process01), 1,
+            CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 1);
+        InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(process02), 1,
+            CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 1);
 
         //change feed location path
         outputFeed = Util.setFeedProperty(outputFeed, "queueName", "myQueue");
-
         logger.info("updated feed: " + Util.prettyPrintXml(outputFeed));
 
         //update feed first time
-        prism.getFeedHelper().update(outputFeed, outputFeed);
+        AssertUtil.assertSucceeded(prism.getFeedHelper().update(outputFeed, outputFeed));
     }
 
-
     /**
      * schedules a feed and dependent process. Process start and end are in past
      * Test for bug https://issues.apache.org/jira/browse/FALCON-500
@@ -222,24 +224,25 @@ public class PrismFeedUpdateTest extends BaseTestClass {
     @Test
     public void dependentProcessSucceeded()
         throws Exception {
-        bundles[0].setProcessValidity("2014-06-01T04:00Z","2014-06-01T04:02Z");
+        bundles[0].setProcessValidity("2014-06-01T04:00Z", "2014-06-01T04:02Z");
         bundles[0].submitAndScheduleAllFeeds();
         bundles[0].submitAndScheduleProcess();
 
         InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
-        OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, bundles[0].getProcessName(),
+        OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS,
+            bundles[0].getProcessName(),
             0, 0);
         InstanceUtil.waitForBundleToReachState(cluster1, bundles[0].getProcessName(),
             Job.Status.SUCCEEDED, 20);
 
         FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
-        feed.addProperty("someProp","someVal");
+        feed.addProperty("someProp", "someVal");
         AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed.toString(), feed.toString()));
         //check for new feed bundle creation
         Assert.assertEquals(OozieUtil.getNumberOfBundle(prism, EntityType.FEED,
-            feed.getName()),2);
+            feed.getName()), 2);
         Assert.assertEquals(OozieUtil.getNumberOfBundle(cluster1, EntityType.PROCESS,
-            bundles[0].getProcessName()),1);
+            bundles[0].getProcessName()), 1);
     }
 
     /**
@@ -256,7 +259,8 @@ public class PrismFeedUpdateTest extends BaseTestClass {
         bundles[0].submitAndScheduleProcess();
 
         InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
-        OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, bundles[0].getProcessName(),
+        OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS,
+            bundles[0].getProcessName(),
             0, 0);
 
         FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
@@ -264,9 +268,9 @@ public class PrismFeedUpdateTest extends BaseTestClass {
         AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed.toString(), feed.toString()));
         //check for new feed bundle creation
         Assert.assertEquals(OozieUtil.getNumberOfBundle(cluster1, EntityType.FEED,
-            feed.getName()),2);
+            feed.getName()), 2);
         Assert.assertEquals(OozieUtil.getNumberOfBundle(cluster1, EntityType.PROCESS,
-            bundles[0].getProcessName()),2);
+            bundles[0].getProcessName()), 2);
     }
 
     @AfterClass(alwaysRun = true)