You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2014/09/12 00:19:17 UTC
[12/41] git commit: FALCON-646 Refactoring,
documentation stuff. Contributed by Paul Isaychuk
FALCON-646 Refactoring, documentation stuff. Contributed by Paul Isaychuk
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/9774414a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/9774414a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/9774414a
Branch: refs/heads/FALCON-585
Commit: 9774414a6e654edf6071d70d630bcbb0a960b420
Parents: 23eed9f
Author: Ruslan Ostafiychuk <ro...@apache.org>
Authored: Fri Aug 29 13:12:19 2014 +0300
Committer: Ruslan Ostafiychuk <ro...@apache.org>
Committed: Fri Aug 29 13:12:19 2014 +0300
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 6 +-
.../falcon/regression/core/util/Util.java | 82 ++++++-----
.../regression/FeedSubmitAndScheduleTest.java | 32 ++---
.../regression/prism/FeedRetentionTest.java | 39 ++---
.../regression/prism/OptionalInputTest.java | 143 ++++++-------------
5 files changed, 123 insertions(+), 179 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9774414a/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index ad6ef79..f5cfb93 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -9,8 +9,10 @@ Trunk (Unreleased)
via Samarth Gupta)
IMPROVEMENTS
-
- FALCON-572 HadoopUtil cleanup in falcon-regression (Ruslan Ostafiychuk via Samarth Gupta)
+
+ FALCON-646 Refactoring, documentation stuff (Paul Isaychuk via Ruslan Ostafiychuk)
+
+ FALCON-572 HadoopUtil cleanup in falcon-regression (Ruslan Ostafiychuk via Samarth Gupta)
FALCON-632 Refactoring, documentation stuff (Paul Isaychuk via Samarth Gupta)
FALCON-609 UpdateAtSpecificTimeTest, InstanceSummaryTest tagged, fixed, refactored
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9774414a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
index 3fc9388..6485784 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
@@ -119,12 +119,10 @@ public final class Util {
}
public static APIResult parseResponse(ServiceResponse response) throws JAXBException {
-
if (!isXML(response.getMessage())) {
return new APIResult(APIResult.Status.FAILED, response.getMessage(), "somerandomstring",
response.getCode());
}
-
JAXBContext jc = JAXBContext.newInstance(APIResult.class);
Unmarshaller u = jc.createUnmarshaller();
APIResult temp;
@@ -143,7 +141,6 @@ public final class Util {
temp.setStatus(APIResult.Status.FAILED);
}
}
-
return temp;
}
@@ -170,30 +167,25 @@ public final class Util {
}
public static String getUniqueString() {
-
return "-" + UUID.randomUUID().toString().split("-")[0];
}
public static List<String> getHadoopDataFromDir(FileSystem fs, String feed, String dir)
throws IOException {
List<String> finalResult = new ArrayList<String>();
-
String feedPath = getFeedPath(feed);
int depth = feedPath.split(dir)[1].split("/").length - 1;
List<Path> results = HadoopUtil.getAllDirsRecursivelyHDFS(fs,
new Path(dir), depth);
-
for (Path result : results) {
int pathDepth = result.toString().split(dir)[1].split("/").length - 1;
if (pathDepth == depth) {
finalResult.add(result.toString().split(dir)[1]);
}
}
-
return finalResult;
}
-
public static String setFeedProperty(String feed, String propertyName, String propertyValue) {
FeedMerlin feedObject = new FeedMerlin(feed);
boolean found = false;
@@ -205,20 +197,15 @@ public final class Util {
break;
}
}
-
if (!found) {
Property property = new Property();
property.setName(propertyName);
property.setValue(propertyValue);
feedObject.getProperties().getProperties().add(property);
}
-
-
return feedObject.toString();
-
}
-
public static String getFeedPath(String feed) {
FeedMerlin feedObject = new FeedMerlin(feed);
for (Location location : feedObject.getLocations().getLocations()) {
@@ -236,7 +223,6 @@ public final class Util {
return feedObject.toString();
}
-
public static String setFeedPathValue(String feed, String pathValue) {
FeedMerlin feedObject = new FeedMerlin(feed);
for (Location location : feedObject.getLocations().getLocations()) {
@@ -247,11 +233,9 @@ public final class Util {
return feedObject.toString();
}
-
public static String findFolderBetweenGivenTimeStamps(DateTime startTime, DateTime endTime,
List<String> folderList) {
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy/MM/dd/HH/mm");
-
for (String folder : folderList) {
if (folder.compareTo(formatter.print(startTime)) >= 0
&&
@@ -285,8 +269,7 @@ public final class Util {
.getQaHost(), coloHelper.getProcessHelper().getUsername(),
coloHelper.getProcessHelper().getPassword(),
"cat /var/log/ivory/application.* | grep \"" + workflowId + "\" | grep "
- +
- "\"Received\" | awk '{print $2}'",
+ + "\"Received\" | awk '{print $2}'",
coloHelper.getProcessHelper().getUsername(),
coloHelper.getProcessHelper().getIdentityFile()
);
@@ -313,7 +296,6 @@ public final class Util {
for (String line : raw) {
finalList.add(line.split(",")[0]);
}
-
return finalList;
}
@@ -327,7 +309,6 @@ public final class Util {
public static void startService(IEntityManagerHelper helper)
throws IOException, JSchException, AuthenticationException, URISyntaxException {
-
ExecUtil.runRemoteScriptAsSudo(helper.getQaHost(), helper.getUsername(),
helper.getPassword(), helper.getServiceStartCmd(), helper.getServiceUser(),
helper.getIdentityFile());
@@ -372,17 +353,13 @@ public final class Util {
}
}
-
public static String getEnvClusterXML(String cluster, String prefix) {
-
- ClusterMerlin clusterObject =
- getClusterObject(cluster);
+ ClusterMerlin clusterObject = getClusterObject(cluster);
if ((null == prefix) || prefix.isEmpty()) {
prefix = "";
} else {
prefix = prefix + ".";
}
-
String hcatEndpoint = Config.getProperty(prefix + "hcat_endpoint");
//now read and set relevant values
@@ -401,7 +378,6 @@ public final class Util {
iface.setEndpoint(hcatEndpoint);
}
}
-
//set colo name:
clusterObject.setColo(Config.getProperty(prefix + "colo"));
// properties in the cluster needed when secure mode is on
@@ -454,6 +430,13 @@ public final class Util {
return null;
}
+ /**
+ * Compares two definitions
+ * @param server1 server where 1st definition is stored
+ * @param server2 server where 2nd definition is stored
+ * @param entity entity which is under analysis
+ * @return are definitions identical
+ */
public static boolean isDefinitionSame(ColoHelper server1, ColoHelper server2,
String entity)
throws URISyntaxException, IOException, AuthenticationException, JAXBException,
@@ -463,10 +446,9 @@ public final class Util {
}
/**
- * emuns used for instance api.
+ * enums used for instance api.
*/
public enum URLS {
-
LIST_URL("/api/entities/list"),
SUBMIT_URL("/api/entities/submit"),
GET_ENTITY_DEFINITION("/api/entities/definition"),
@@ -497,17 +479,27 @@ public final class Util {
}
}
-
+ /**
+ * @param pathString whole path
+ * @return path to basic data folder
+ */
public static String getPathPrefix(String pathString) {
return pathString.substring(0, pathString.indexOf('$'));
}
+ /**
+ * @param path whole path
+ * @return file name which is retrieved from a path
+ */
public static String getFileNameFromPath(String path) {
-
return path.substring(path.lastIndexOf('/') + 1, path.length());
}
-
+ /**
+ * Defines request type according to request url
+ * @param url request url
+ * @return request type
+ */
public static String getMethodType(String url) {
List<String> postList = new ArrayList<String>();
postList.add("/entities/validate");
@@ -531,10 +523,14 @@ public final class Util {
return "delete";
}
}
-
return "get";
}
+ /**
+ * Prints xml in readable form
+ * @param xmlString xmlString
+ * @return formatted xmlString
+ */
public static String prettyPrintXml(final String xmlString) {
if (xmlString == null) {
return null;
@@ -554,19 +550,27 @@ public final class Util {
} catch (TransformerException e) {
return xmlString;
}
-
}
+ /**
+ * Converts json string to readable form
+ * @param jsonString json string
+ * @return formatted string
+ */
public static String prettyPrintJson(final String jsonString) {
if (jsonString == null) {
return null;
}
Gson gson = new GsonBuilder().setPrettyPrinting().create();
JsonElement json = new JsonParser().parse(jsonString);
-
return gson.toJson(json);
}
+ /**
+ * Prints xml or json in pretty and readable format
+ * @param str xml or json string
+ * @return converted xml or json
+ */
public static String prettyPrintXmlOrJson(final String str) {
if (str == null) {
return null;
@@ -583,6 +587,13 @@ public final class Util {
return str;
}
+ /**
+ * Tries to get entity definition.
+ * @param cluster cluster where definition is stored
+ * @param entity entity for which definition is required
+ * @param shouldReturn should the definition be successfully retrieved or not
+ * @return entity definition
+ */
public static String getEntityDefinition(ColoHelper cluster,
String entity,
boolean shouldReturn) throws
@@ -597,10 +608,8 @@ public final class Util {
} else {
helper = cluster.getClusterHelper();
}
-
ServiceResponse response = helper.getEntityDefinition(URLS
.GET_ENTITY_DEFINITION, entity);
-
if (shouldReturn) {
AssertUtil.assertSucceeded(response);
} else {
@@ -608,7 +617,6 @@ public final class Util {
}
String result = response.getMessage();
Assert.assertNotNull(result);
-
return result;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9774414a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
index ab13dd4..38cf080 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java
@@ -54,6 +54,7 @@ public class FeedSubmitAndScheduleTest extends BaseTestClass {
private OozieClient clusterOC = serverOC.get(0);
private String aggregateWorkflowDir = baseHDFSDir + "/FeedSubmitAndScheduleTest/aggregator";
private static final Logger LOGGER = Logger.getLogger(FeedSubmitAndScheduleTest.class);
+ private String feed;
@BeforeMethod(alwaysRun = true)
public void uploadWorkflow() throws Exception {
@@ -67,6 +68,7 @@ public class FeedSubmitAndScheduleTest extends BaseTestClass {
bundles[0] = new Bundle(bundles[0], cluster);
bundles[0].generateUniqueBundle();
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ feed = bundles[0].getDataSets().get(0);
}
@AfterMethod(alwaysRun = true)
@@ -91,8 +93,8 @@ public class FeedSubmitAndScheduleTest extends BaseTestClass {
throws JAXBException, IOException, URISyntaxException, AuthenticationException {
Assert.assertEquals(Util.parseResponse(prism.getClusterHelper()
.submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0))).getStatusCode(), 200);
- ServiceResponse response = prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0));
+ ServiceResponse response = prism.getFeedHelper().submitAndSchedule(URLS
+ .SUBMIT_AND_SCHEDULE_URL, feed);
AssertUtil.assertSucceeded(response);
}
@@ -109,17 +111,16 @@ public class FeedSubmitAndScheduleTest extends BaseTestClass {
//get created bundle id
String bundleId = InstanceUtil
- .getLatestBundleID(cluster, Util.readEntityName(bundles[0].getDataSets().get(0)),
- EntityType.FEED);
+ .getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED);
//try to submit and schedule the same process again
ServiceResponse response = prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
AssertUtil.assertSucceeded(response);
AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
//check that new bundle wasn't created
- OozieUtil.verifyNewBundleCreation(cluster, bundleId, null, bundles[0].getDataSets().get(0), false, false);
+ OozieUtil.verifyNewBundleCreation(cluster, bundleId, null, feed, false, false);
}
/**
@@ -131,7 +132,7 @@ public class FeedSubmitAndScheduleTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void snsFeedWithoutCluster() throws Exception {
ServiceResponse response = prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
AssertUtil.assertFailed(response);
}
@@ -145,14 +146,11 @@ public class FeedSubmitAndScheduleTest extends BaseTestClass {
public void snsDeletedFeed() throws Exception {
submitFirstClusterScheduleFirstFeed();
AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
- Assert.assertEquals(
- Util.parseResponse(prism.getFeedHelper()
- .delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)))
- .getStatusCode(), 200);
+ Assert.assertEquals(Util.parseResponse(prism.getFeedHelper().delete(URLS.DELETE_URL,
+ feed)).getStatusCode(), 200);
AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.KILLED);
-
ServiceResponse response = prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
AssertUtil.assertSucceeded(response);
AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
}
@@ -168,14 +166,10 @@ public class FeedSubmitAndScheduleTest extends BaseTestClass {
submitFirstClusterScheduleFirstFeed();
AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
Assert.assertEquals(Util.parseResponse(
- prism.getFeedHelper()
- .suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)))
- .getStatusCode(),
- 200);
+ prism.getFeedHelper().suspend(URLS.SUSPEND_URL, feed)).getStatusCode(), 200);
AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
ServiceResponse response = prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0));
-
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed);
AssertUtil.assertSucceeded(response);
AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9774414a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java
index e0571b5..f20877f 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedRetentionTest.java
@@ -98,6 +98,10 @@ public class FeedRetentionTest extends BaseTestClass {
String inputData = inputPath + "${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
String outputPathTemplate = baseHDFSDir +
"/testOutput/op%d/ivoryRetention0%d/%s/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ String cluster1Def = bundles[0].getClusters().get(0);
+ String cluster2Def = bundles[1].getClusters().get(0);
+ String cluster1Name = Util.readEntityName(cluster1Def);
+ String cluster2Name = Util.readEntityName(cluster2Def);
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
TimeUtil.getTimeWrtSystemTime(-5), TimeUtil.getTimeWrtSystemTime(10), 1);
@@ -106,11 +110,10 @@ public class FeedRetentionTest extends BaseTestClass {
HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.RESOURCES + "thriftRRMar0602.gz",
inputPath, dataDates);
- prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
- prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, cluster1Def);
+ prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, cluster2Def);
String feedOutput01 = bundles[0].getFeed("FETL-RequestRC");
-
feedOutput01 = InstanceUtil.setFeedCluster(feedOutput01,
XmlUtil.createValidity("2010-10-01T12:00Z", "2099-01-01T00:00Z"),
XmlUtil.createRtention("days(10000)", ActionType.DELETE), null,
@@ -119,8 +122,7 @@ public class FeedRetentionTest extends BaseTestClass {
feedOutput01 = InstanceUtil.setFeedCluster(feedOutput01,
XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:10Z"),
XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
- Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
- "${cluster.colo}",
+ cluster1Name, ClusterType.SOURCE, "${cluster.colo}",
String.format(outputPathTemplate, 1, 1, "data"),
String.format(outputPathTemplate, 1, 1, "stats"),
String.format(outputPathTemplate, 1, 1, "meta"),
@@ -129,8 +131,7 @@ public class FeedRetentionTest extends BaseTestClass {
feedOutput01 = InstanceUtil.setFeedCluster(feedOutput01,
XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:25Z"),
XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
- Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
- "${cluster.colo}",
+ cluster2Name, ClusterType.SOURCE,"${cluster.colo}",
String.format(outputPathTemplate, 1, 2, "data"),
String.format(outputPathTemplate, 1, 2, "stats"),
String.format(outputPathTemplate, 1, 2, "meta"),
@@ -149,8 +150,7 @@ public class FeedRetentionTest extends BaseTestClass {
feedOutput02 = InstanceUtil.setFeedCluster(feedOutput02,
XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:10Z"),
XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
- Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
- "${cluster.colo}",
+ cluster1Name, ClusterType.SOURCE, "${cluster.colo}",
String.format(outputPathTemplate, 2, 1, "data"),
String.format(outputPathTemplate, 2, 1, "stats"),
String.format(outputPathTemplate, 2, 1, "meta"),
@@ -159,8 +159,7 @@ public class FeedRetentionTest extends BaseTestClass {
feedOutput02 = InstanceUtil.setFeedCluster(feedOutput02,
XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:25Z"),
XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
- Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
- "${cluster.colo}",
+ cluster2Name, ClusterType.SOURCE, "${cluster.colo}",
String.format(outputPathTemplate, 2, 2, "data"),
String.format(outputPathTemplate, 2, 2, "stats"),
String.format(outputPathTemplate, 2, 2, "meta"),
@@ -171,8 +170,7 @@ public class FeedRetentionTest extends BaseTestClass {
prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feedOutput02));
String feedInput = bundles[0].getFeed("FETL2-RRLog");
- feedInput = InstanceUtil
- .setFeedCluster(feedInput,
+ feedInput = InstanceUtil.setFeedCluster(feedInput,
XmlUtil.createValidity("2010-10-01T12:00Z", "2099-01-01T00:00Z"),
XmlUtil.createRtention("days(10000)", ActionType.DELETE), null,
ClusterType.SOURCE, null);
@@ -180,14 +178,12 @@ public class FeedRetentionTest extends BaseTestClass {
feedInput = InstanceUtil.setFeedCluster(feedInput,
XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:10Z"),
XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
- Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
- "${cluster.colo}", inputData);
+ cluster1Name, ClusterType.SOURCE, "${cluster.colo}", inputData);
feedInput = InstanceUtil.setFeedCluster(feedInput,
XmlUtil.createValidity("2010-10-01T12:00Z", "2099-10-01T12:25Z"),
XmlUtil.createRtention("minutes(5)", ActionType.DELETE),
- Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
- "${cluster.colo}", inputData);
+ cluster2Name, ClusterType.SOURCE, "${cluster.colo}", inputData);
AssertUtil.assertSucceeded(
prism.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feedInput));
@@ -196,22 +192,17 @@ public class FeedRetentionTest extends BaseTestClass {
process = InstanceUtil.setProcessCluster(process, null,
XmlUtil.createProcessValidity("2012-10-01T12:00Z", "2012-10-01T12:10Z"));
- process = InstanceUtil.setProcessCluster(process,
- Util.readEntityName(bundles[0].getClusters().get(0)),
+ process = InstanceUtil.setProcessCluster(process, cluster1Name,
XmlUtil.createProcessValidity(TimeUtil.getTimeWrtSystemTime(-2),
TimeUtil.getTimeWrtSystemTime(5)));
- process = InstanceUtil.setProcessCluster(process,
- Util.readEntityName(bundles[1].getClusters().get(0)),
+ process = InstanceUtil.setProcessCluster(process, cluster2Name,
XmlUtil.createProcessValidity(TimeUtil.getTimeWrtSystemTime(-2),
TimeUtil.getTimeWrtSystemTime(5)));
logger.info("process: " + Util.prettyPrintXml(process));
-
AssertUtil.assertSucceeded(
prism.getProcessHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process));
-
AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feedOutput01));
AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feedOutput02));
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9774414a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
index 4b8fe0a..01c00a9 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java
@@ -81,28 +81,24 @@ public class OptionalInputTest extends BaseTestClass {
*/
@Test(enabled = true, groups = {"singleCluster"})
public void optionalTest_1optional_1compulsary() throws Exception {
- bundles[0].generateRequiredBundle(1, 2, 1, inputPath, 1, "2010-01-02T01:00Z",
- "2010-01-02T01:12Z");
+ bundles[0].generateRequiredBundle(1, 2, 1, inputPath, 1,
+ "2010-01-02T01:00Z", "2010-01-02T01:12Z");
for (int i = 0; i < bundles[0].getClusters().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
-
for (int i = 0; i < bundles[0].getDataSets().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
- logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
+ String process = bundles[0].getProcessData();
+ logger.info(Util.prettyPrintXml(process));
bundles[0].submitAndScheduleBundle(prism, false);
-
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide("2010-01-02T00:50Z",
"2010-01-02T01:10Z", 5);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
inputPath + "/input1/", dataDates);
-
- InstanceUtil
- .waitTillInstanceReachState(oozieClient,
- Util.getProcessName(bundles[0].getProcessData()),
+ InstanceUtil.waitTillInstanceReachState(oozieClient, Util.getProcessName(process),
2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
}
@@ -115,37 +111,30 @@ public class OptionalInputTest extends BaseTestClass {
*/
@Test(enabled = true, groups = {"singleCluster"})
public void optionalTest_1optional_2compulsary() throws Exception {
- bundles[0].generateRequiredBundle(1, 3, 1, inputPath, 1, "2010-01-02T01:00Z",
- "2010-01-02T01:12Z");
-
+ bundles[0].generateRequiredBundle(1, 3, 1, inputPath, 1,
+ "2010-01-02T01:00Z", "2010-01-02T01:12Z");
for (int i = 0; i < bundles[0].getClusters().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
-
for (int i = 0; i < bundles[0].getDataSets().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
+ String processName = Util.readEntityName(bundles[0].getProcessData());
logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
-
bundles[0].submitAndScheduleBundle(prism, false);
logger.info("instanceShouldStillBeInWaitingState");
- InstanceUtil
- .waitTillInstanceReachState(oozieClient,
- Util.getProcessName(bundles[0].getProcessData()),
+ InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
- List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide("2010-01-02T00:50Z",
- "2010-01-02T01:10Z", 5);
+ List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
+ "2010-01-02T00:50Z", "2010-01-02T01:10Z", 5);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
inputPath + "/input1/", dataDates);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
inputPath + "/input2/", dataDates);
-
- InstanceUtil
- .waitTillInstanceReachState(oozieClient,
- Util.getProcessName(bundles[0].getProcessData()),
+ InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
}
@@ -160,32 +149,24 @@ public class OptionalInputTest extends BaseTestClass {
public void optionalTest_2optional_1compulsary() throws Exception {
bundles[0].generateRequiredBundle(1, 3, 2, inputPath, 1, "2010-01-02T01:00Z",
"2010-01-02T01:12Z");
-
for (int i = 0; i < bundles[0].getClusters().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
-
for (int i = 0; i < bundles[0].getDataSets().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
+ String processName = Util.readEntityName(bundles[0].getProcessData());
logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
bundles[0].submitAndScheduleBundle(prism, false);
-
- InstanceUtil
- .waitTillInstanceReachState(oozieClient,
- Util.getProcessName(bundles[0].getProcessData()),
+ InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
-
- List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide("2010-01-02T00:50Z",
- "2010-01-02T01:10Z", 5);
+ List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
+ "2010-01-02T00:50Z", "2010-01-02T01:10Z", 5);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
inputPath + "/input2/", dataDates);
-
- InstanceUtil
- .waitTillInstanceReachState(oozieClient,
- Util.getProcessName(bundles[0].getProcessData()),
+ InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
}
@@ -201,16 +182,15 @@ public class OptionalInputTest extends BaseTestClass {
String startTime = TimeUtil.getTimeWrtSystemTime(-4);
String endTime = TimeUtil.getTimeWrtSystemTime(10);
bundles[0].generateRequiredBundle(1, 2, 1, inputPath, 1, startTime, endTime);
-
for (int i = 0; i < bundles[0].getClusters().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
-
for (int i = 0; i < bundles[0].getDataSets().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
- logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
+ String process = bundles[0].getProcessData();
+ logger.info(Util.prettyPrintXml(process));
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
TimeUtil.addMinsToTime(startTime, -10), endTime, 5);
@@ -220,12 +200,8 @@ public class OptionalInputTest extends BaseTestClass {
for (String date : dataDates) {
HadoopUtil.recreateDir(clusterFS, inputPath + "/input0/" + date);
}
-
bundles[0].submitFeedsScheduleProcess(prism);
-
- InstanceUtil
- .waitTillInstanceReachState(oozieClient,
- Util.getProcessName(bundles[0].getProcessData()),
+ InstanceUtil.waitTillInstanceReachState(oozieClient, Util.getProcessName(process),
2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
}
@@ -237,24 +213,19 @@ public class OptionalInputTest extends BaseTestClass {
*/
@Test(enabled = true, groups = {"singleCluster"})
public void optionalTest_allInputOptional() throws Exception {
- bundles[0].generateRequiredBundle(1, 2, 2, inputPath, 1, "2010-01-02T01:00Z",
- "2010-01-02T01:12Z");
-
+ bundles[0].generateRequiredBundle(1, 2, 2, inputPath, 1,
+ "2010-01-02T01:00Z", "2010-01-02T01:12Z");
bundles[0].setProcessInputNames("inputData");
-
for (int i = 0; i < bundles[0].getClusters().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
-
for (int i = 0; i < bundles[0].getDataSets().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
- logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
+ String process = bundles[0].getProcessData();
+ logger.info(Util.prettyPrintXml(process));
bundles[0].submitAndScheduleBundle(prism, false);
-
- InstanceUtil
- .waitTillInstanceReachState(oozieClient,
- Util.getProcessName(bundles[0].getProcessData()),
+ InstanceUtil.waitTillInstanceReachState(oozieClient, Util.getProcessName(process),
2, CoordinatorAction.Status.KILLED, EntityType.PROCESS);
}
@@ -271,55 +242,43 @@ public class OptionalInputTest extends BaseTestClass {
String startTime = TimeUtil.getTimeWrtSystemTime(-4);
String endTime = TimeUtil.getTimeWrtSystemTime(30);
bundles[0].generateRequiredBundle(1, 2, 1, inputPath, 1, startTime, endTime);
-
for (int i = 0; i < bundles[0].getClusters().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
-
for (int i = 0; i < bundles[0].getDataSets().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(2);
- logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
+ String process = bundles[0].getProcessData();
+ String processName = Util.getProcessName(process);
+ logger.info(Util.prettyPrintXml(process));
bundles[0].submitAndScheduleBundle(prism, true);
-
- InstanceUtil
- .waitTillInstanceReachState(oozieClient,
- Util.getProcessName(bundles[0].getProcessData()),
+ InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
-
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
TimeUtil.addMinsToTime(startTime, -10), endTime, 5);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
inputPath + "/input1/", dataDates);
- InstanceUtil
- .waitTillInstanceReachState(oozieClient,
- Util.getProcessName(bundles[0].getProcessData()),
+ InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
- final ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ final ProcessMerlin processMerlin = new ProcessMerlin(process);
processMerlin.setProcessFeeds(bundles[0].getDataSets(), 2, 0, 1);
bundles[0].setProcessData(processMerlin.toString());
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
- logger.info("modified process:" + Util.prettyPrintXml(bundles[0].getProcessData()));
+ process = bundles[0].getProcessData();
+ logger.info("modified process:" + Util.prettyPrintXml(process));
- prism.getProcessHelper().update(bundles[0].getProcessData(), bundles[0].getProcessData());
+ prism.getProcessHelper().update(process, process);
//from now on ... it should wait of input0 also
-
- InstanceUtil
- .waitTillInstanceReachState(oozieClient,
- Util.getProcessName(bundles[0].getProcessData()),
+ InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
-
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
inputPath + "/input0/", dataDates);
-
- InstanceUtil
- .waitTillInstanceReachState(oozieClient,
- Util.getProcessName(bundles[0].getProcessData()),
+ InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
}
@@ -336,52 +295,42 @@ public class OptionalInputTest extends BaseTestClass {
String startTime = TimeUtil.getTimeWrtSystemTime(-4);
String endTime = TimeUtil.getTimeWrtSystemTime(30);
bundles[0].generateRequiredBundle(1, 2, 1, inputPath, 1, startTime, endTime);
-
for (int i = 0; i < bundles[0].getClusters().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getClusters().get(i)));
-
for (int i = 0; i < bundles[0].getDataSets().size(); i++)
logger.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i)));
bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)");
bundles[0].setProcessConcurrency(4);
- logger.info(Util.prettyPrintXml(bundles[0].getProcessData()));
+ String process = bundles[0].getProcessData();
+ String processName = Util.getProcessName(process);
+ logger.info(Util.prettyPrintXml(process));
bundles[0].submitAndScheduleBundle(prism, true);
-
- InstanceUtil
- .waitTillInstanceReachState(oozieClient,
- Util.getProcessName(bundles[0].getProcessData()),
+ InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
TimeUtil.addMinsToTime(startTime, -10), TimeUtil.addMinsToTime(endTime, 10), 5);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
inputPath + "/input1/", dataDates);
- InstanceUtil
- .waitTillInstanceReachState(oozieClient,
- Util.getProcessName(bundles[0].getProcessData()),
+ InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
- final ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData());
+ final ProcessMerlin processMerlin = new ProcessMerlin(process);
processMerlin.setProcessFeeds(bundles[0].getDataSets(), 2, 2, 1);
bundles[0].setProcessData(processMerlin.toString());
+ process = bundles[0].getProcessData();
//delete all input data
HadoopUtil.deleteDirIfExists(inputPath + "/", clusterFS);
-
bundles[0].setProcessInputNames("inputData0", "inputData");
+ logger.info("modified process:" + Util.prettyPrintXml(process));
- logger.info("modified process:" + Util.prettyPrintXml(bundles[0].getProcessData()));
+ prism.getProcessHelper().update(process, process);
- prism.getProcessHelper().update(bundles[0].getProcessData(), bundles[0].getProcessData());
-
- logger.info("modified process:" + Util.prettyPrintXml(bundles[0].getProcessData()));
//from now on ... it should wait of input0 also
-
- InstanceUtil
- .waitTillInstanceReachState(oozieClient,
- Util.getProcessName(bundles[0].getProcessData()),
+ InstanceUtil.waitTillInstanceReachState(oozieClient, processName,
2, CoordinatorAction.Status.KILLED, EntityType.PROCESS);
}
}