You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ro...@apache.org on 2015/03/25 15:44:38 UTC
[2/2] falcon git commit: FALCON-1112 Migrate methods related to
*Merlin.java classes from Util.java to their respective *Merlin.java.
Contributed by Paul Isaychuk
FALCON-1112 Migrate methods related to *Merlin.java classes from Util.java to their respective
*Merlin.java. Contributed by Paul Isaychuk
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/5b5113d1
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/5b5113d1
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/5b5113d1
Branch: refs/heads/master
Commit: 5b5113d19d891872fa48b8c524347a49f3f1cdf5
Parents: d718ad7
Author: Ruslan Ostafiychuk <ro...@apache.org>
Authored: Wed Mar 25 16:42:50 2015 +0200
Committer: Ruslan Ostafiychuk <ro...@apache.org>
Committed: Wed Mar 25 16:42:59 2015 +0200
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 3 +
.../regression/Entities/ClusterMerlin.java | 17 +-
.../falcon/regression/Entities/FeedMerlin.java | 77 ++++++-
.../regression/Entities/ProcessMerlin.java | 14 +-
.../falcon/regression/core/bundle/Bundle.java | 16 +-
.../helpers/entity/ProcessEntityHelper.java | 4 +-
.../falcon/regression/core/util/BundleUtil.java | 89 +++++++-
.../falcon/regression/core/util/Util.java | 207 +------------------
.../falcon/regression/AuthorizationTest.java | 73 +++----
.../regression/ELExpCurrentAndLastWeekTest.java | 3 +-
.../falcon/regression/ELValidationsTest.java | 3 +-
.../regression/EmbeddedPigScriptTest.java | 3 +-
.../apache/falcon/regression/NewRetryTest.java | 32 +--
.../ProcessInstanceColoMixedTest.java | 74 ++++---
.../regression/ProcessInstanceSuspendTest.java | 5 +-
.../falcon/regression/ProcessLateRerunTest.java | 28 +--
.../falcon/regression/hcat/HCatProcessTest.java | 12 +-
.../lineage/LineageApiProcessInstanceTest.java | 4 +-
.../regression/prism/EntityDryRunTest.java | 18 +-
.../prism/FeedDelayParallelTimeoutTest.java | 11 +-
.../prism/NewPrismProcessUpdateTest.java | 93 ++++-----
.../regression/prism/OptionalInputTest.java | 45 ++--
.../prism/PrismFeedReplicationUpdateTest.java | 80 ++++---
.../regression/prism/PrismFeedUpdateTest.java | 81 ++++----
.../regression/prism/PrismSubmitTest.java | 11 +-
.../prism/ProcessPartitionExpVariableTest.java | 3 +-
.../prism/RescheduleKilledProcessTest.java | 18 +-
.../prism/UpdateAtSpecificTimeTest.java | 12 +-
28 files changed, 465 insertions(+), 571 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 2024016..91f1bec 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -63,6 +63,9 @@ Trunk (Unreleased)
via Samarth Gupta)
IMPROVEMENTS
+ FALCON-1112 Migrate methods related to *Merlin.java classes from Util.java to their respective
+ *Merlin.java (Paul Isaychuk via Ruslan Ostafiychuk)
+
FALCON-1113 Clean up data files in merlin resource directory. Create better names for them
(Paul Isaychuk via Ruslan Ostafiychuk)
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java
index 22d8ca4..22ec5da 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java
@@ -27,10 +27,7 @@ import org.testng.Assert;
import javax.xml.bind.JAXBException;
import java.io.StringWriter;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
/** Class for representing a cluster xml. */
@@ -41,21 +38,9 @@ public class ClusterMerlin extends Cluster {
clusterData);
try {
PropertyUtils.copyProperties(this, cluster);
- } catch (IllegalAccessException e) {
+ } catch (ReflectiveOperationException e) {
Assert.fail("Can't create ClusterMerlin: " + ExceptionUtils.getStackTrace(e));
- } catch (InvocationTargetException e) {
- Assert.fail("Can't create ClusterMerlin: " + ExceptionUtils.getStackTrace(e));
- } catch (NoSuchMethodException e) {
- Assert.fail("Can't create ClusterMerlin: " + ExceptionUtils.getStackTrace(e));
- }
- }
-
- public static List<ClusterMerlin> fromString(List<String> clusterStrings) {
- List<ClusterMerlin> clusters = new ArrayList<ClusterMerlin>();
- for (String clusterString : clusterStrings) {
- clusters.add(new ClusterMerlin(clusterString));
}
- return clusters;
}
@Override
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java
index a458de9..76d34d8 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java
@@ -41,7 +41,6 @@ import org.testng.Assert;
import javax.xml.bind.JAXBException;
import java.io.StringWriter;
-import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -58,12 +57,8 @@ public class FeedMerlin extends Feed {
try {
PropertyUtils.copyProperties(this, feed);
this.setACL(feed.getACL());
- } catch (IllegalAccessException e) {
- Assert.fail("Can't create ClusterMerlin: " + ExceptionUtils.getStackTrace(e));
- } catch (InvocationTargetException e) {
- Assert.fail("Can't create ClusterMerlin: " + ExceptionUtils.getStackTrace(e));
- } catch (NoSuchMethodException e) {
- Assert.fail("Can't create ClusterMerlin: " + ExceptionUtils.getStackTrace(e));
+ } catch (ReflectiveOperationException e) {
+ Assert.fail("Can't create FeedMerlin: " + ExceptionUtils.getStackTrace(e));
}
}
@@ -79,6 +74,74 @@ public class FeedMerlin extends Feed {
return new FeedMerlin(feedString);
}
+ /**
+ * Sets custom feed property.
+ * @param propertyName custom property name
+ * @param propertyValue custom property value
+ */
+ public FeedMerlin setFeedProperty(String propertyName, String propertyValue) {
+ boolean found = false;
+ for (Property prop : this.getProperties().getProperties()) {
+ //check if it is present
+ if (prop.getName().equalsIgnoreCase(propertyName)) {
+ prop.setValue(propertyValue);
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ Property property = new Property();
+ property.setName(propertyName);
+ property.setValue(propertyValue);
+ this.getProperties().getProperties().add(property);
+ }
+ return this;
+ }
+
+ /**
+ * @return feed data path
+ */
+ public String getFeedPath() {
+ for (Location location : this.getLocations().getLocations()) {
+ if (location.getType() == LocationType.DATA) {
+ return location.getPath();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Sets cut-off period.
+ * @param frequency cut-off period
+ */
+ public FeedMerlin insertLateFeedValue(Frequency frequency) {
+ this.getLateArrival().setCutOff(frequency);
+ return this;
+ }
+
+ /**
+ * Sets data location for a feed.
+ * @param pathValue new path
+ */
+ public FeedMerlin setFeedPathValue(String pathValue) {
+ for (Location location : this.getLocations().getLocations()) {
+ if (location.getType() == LocationType.DATA) {
+ location.setPath(pathValue);
+ }
+ }
+ return this;
+ }
+
+ /**
+ * Sets name for a cluster by given order number.
+ * @param clusterName new cluster name
+ * @param clusterIndex index of cluster which should be updated
+ */
+ public FeedMerlin setClusterNameInFeed(String clusterName, int clusterIndex) {
+ this.getClusters().getClusters().get(clusterIndex).setName(clusterName);
+ return this;
+ }
+
/** clear clusters of this feed. */
public FeedMerlin clearFeedClusters() {
getClusters().getClusters().clear();
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
index f869844..01fdd04 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
@@ -40,7 +40,6 @@ import org.testng.Assert;
import javax.xml.bind.JAXBException;
import java.io.StringWriter;
-import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -53,20 +52,11 @@ public class ProcessMerlin extends Process {
public ProcessMerlin(final Process process) {
try {
PropertyUtils.copyProperties(this, process);
- } catch (IllegalAccessException e) {
- Assert.fail("Can't create ClusterMerlin: " + ExceptionUtils.getStackTrace(e));
- } catch (InvocationTargetException e) {
- Assert.fail("Can't create ClusterMerlin: " + ExceptionUtils.getStackTrace(e));
- } catch (NoSuchMethodException e) {
- Assert.fail("Can't create ClusterMerlin: " + ExceptionUtils.getStackTrace(e));
+ } catch (ReflectiveOperationException e) {
+ Assert.fail("Can't create ProcessMerlin: " + ExceptionUtils.getStackTrace(e));
}
}
- public static ProcessMerlin fromString(String processString) {
- return new ProcessMerlin(processString);
- }
-
-
public ProcessMerlin clearProcessCluster() {
getClusters().getClusters().clear();
return this;
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
index 876ffa4..b0fa0a5 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
@@ -45,6 +45,7 @@ import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.response.ServiceResponse;
import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
@@ -157,7 +158,7 @@ public class Bundle {
this.processData = bundle.getProcessData();
this.clusters = new ArrayList<String>();
for (String cluster : bundle.getClusters()) {
- this.clusters.add(Util.getEnvClusterXML(cluster, prefix));
+ this.clusters.add(BundleUtil.getEnvClusterXML(cluster, prefix).toString());
}
}
@@ -192,9 +193,7 @@ public class Bundle {
public List<String> getClusterNames() {
List<String> clusterNames = new ArrayList<String>();
for (String cluster : clusters) {
- final org.apache.falcon.entity.v0.cluster.Cluster clusterObject =
- Util.getClusterObject(cluster);
- clusterNames.add(clusterObject.getName());
+ clusterNames.add(new ClusterMerlin(cluster).getName());
}
return clusterNames;
}
@@ -229,7 +228,7 @@ public class Bundle {
*/
public void generateUniqueBundle(String prefix) {
/* creating new names */
- List<ClusterMerlin> clusterMerlinList = ClusterMerlin.fromString(clusters);
+ List<ClusterMerlin> clusterMerlinList = BundleUtil.fromString(clusters);
Map<String, String> clusterNameMap = new HashMap<String, String>();
for (ClusterMerlin clusterMerlin : clusterMerlinList) {
clusterNameMap.putAll(clusterMerlin.setUniqueName(prefix));
@@ -728,7 +727,7 @@ public class Bundle {
FeedMerlin feedObject = new FeedMerlin(dataSets.get(i));
org.apache.falcon.entity.v0.feed.Cluster cluster =
new org.apache.falcon.entity.v0.feed.Cluster();
- cluster.setName(Util.getClusterObject(clusterData).getName());
+ cluster.setName(new ClusterMerlin(clusterData).getName());
cluster.setValidity(feedObject.getClusters().getClusters().get(0).getValidity());
cluster.setType(type);
cluster.setRetention(feedObject.getClusters().getClusters().get(0).getRetention());
@@ -742,7 +741,7 @@ public class Bundle {
//now to add cluster to process
ProcessMerlin processObject = new ProcessMerlin(processData);
Cluster cluster = new Cluster();
- cluster.setName(Util.getClusterObject(clusterData).getName());
+ cluster.setName(new ClusterMerlin(clusterData).getName());
org.apache.falcon.entity.v0.process.Validity v =
processObject.getClusters().getClusters().get(0).getValidity();
if (StringUtils.isNotEmpty(startTime)) {
@@ -791,8 +790,7 @@ public class Bundle {
}
public String getProcessName() {
-
- return Util.getProcessName(this.getProcessData());
+ return new ProcessMerlin(this.processData).getName();
}
public void setProcessLibPath(String libPath) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/ProcessEntityHelper.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/ProcessEntityHelper.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/ProcessEntityHelper.java
index 57f2320..76ad638 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/ProcessEntityHelper.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/ProcessEntityHelper.java
@@ -18,7 +18,7 @@
package org.apache.falcon.regression.core.helpers.entity;
-import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
/** Helper class to work with process endpoints of a colo. */
public class ProcessEntityHelper extends AbstractEntityHelper {
@@ -32,7 +32,7 @@ public class ProcessEntityHelper extends AbstractEntityHelper {
}
public String getEntityName(String entity) {
- return Util.getProcessName(entity);
+ return new ProcessMerlin(entity).getName();
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
index c53d927..0b2c4e1 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
@@ -22,6 +22,8 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.cluster.Location;
import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.regression.Entities.ClusterMerlin;
@@ -126,8 +128,8 @@ public final class BundleUtil {
final String protectionPropName = "hadoop.rpc.protection";
final String protectionPropValue = Config.getProperty(protectionPropName);
if (StringUtils.isNotEmpty(protectionPropValue)) {
- final Property property = Util.getFalconClusterPropertyObject(
- protectionPropName, protectionPropValue.trim());
+ final Property property = getFalconClusterPropertyObject(
+ protectionPropName, protectionPropValue.trim());
clusterMerlin.getProperties().getProperties().add(property);
}
clusterData = clusterMerlin.toString();
@@ -159,4 +161,87 @@ public final class BundleUtil {
}
}
+ /**
+ * Configures cluster definition according to provided properties.
+ * @param cluster cluster which should be configured
+ * @param prefix current cluster prefix
+ * @return modified cluster definition
+ */
+ public static ClusterMerlin getEnvClusterXML(String cluster, String prefix) {
+ ClusterMerlin clusterObject = new ClusterMerlin(cluster);
+ if ((null == prefix) || prefix.isEmpty()) {
+ prefix = "";
+ } else {
+ prefix = prefix + ".";
+ }
+ String hcatEndpoint = Config.getProperty(prefix + "hcat_endpoint");
+
+ //now read and set relevant values
+ for (Interface iface : clusterObject.getInterfaces().getInterfaces()) {
+ if (iface.getType() == Interfacetype.READONLY) {
+ iface.setEndpoint(Config.getProperty(prefix + "cluster_readonly"));
+ } else if (iface.getType() == Interfacetype.WRITE) {
+ iface.setEndpoint(Config.getProperty(prefix + "cluster_write"));
+ } else if (iface.getType() == Interfacetype.EXECUTE) {
+ iface.setEndpoint(Config.getProperty(prefix + "cluster_execute"));
+ } else if (iface.getType() == Interfacetype.WORKFLOW) {
+ iface.setEndpoint(Config.getProperty(prefix + "oozie_url"));
+ } else if (iface.getType() == Interfacetype.MESSAGING) {
+ iface.setEndpoint(Config.getProperty(prefix + "activemq_url"));
+ } else if (iface.getType() == Interfacetype.REGISTRY) {
+ iface.setEndpoint(hcatEndpoint);
+ }
+ }
+ //set colo name:
+ clusterObject.setColo(Config.getProperty(prefix + "colo"));
+ // properties in the cluster needed when secure mode is on
+ if (MerlinConstants.IS_SECURE) {
+ // get the properties object for the cluster
+ org.apache.falcon.entity.v0.cluster.Properties clusterProperties =
+ clusterObject.getProperties();
+ // add the namenode principal to the properties object
+ clusterProperties.getProperties().add(getFalconClusterPropertyObject(
+ "dfs.namenode.kerberos.principal",
+ Config.getProperty(prefix + "namenode.kerberos.principal", "none")));
+
+ // add the hive meta store principal to the properties object
+ clusterProperties.getProperties().add(getFalconClusterPropertyObject(
+ "hive.metastore.kerberos.principal",
+ Config.getProperty(prefix + "hive.metastore.kerberos.principal", "none")));
+
+ // Until oozie has better integration with secure hive we need to send the properites to
+ // falcon.
+ // hive.metastore.sasl.enabled = true
+ clusterProperties.getProperties()
+ .add(getFalconClusterPropertyObject("hive.metastore.sasl.enabled", "true"));
+ // Only set the metastore uri if its not empty or null.
+ if (null != hcatEndpoint && !hcatEndpoint.isEmpty()) {
+ //hive.metastore.uris
+ clusterProperties.getProperties()
+ .add(getFalconClusterPropertyObject("hive.metastore.uris", hcatEndpoint));
+ }
+ }
+ return clusterObject;
+ }
+
+ /**
+ * Forms property object based on parameters.
+ * @param name property name
+ * @param value property value
+ * @return property object
+ */
+ private static Property getFalconClusterPropertyObject(String name, String value) {
+ Property property = new Property();
+ property.setName(name);
+ property.setValue(value);
+ return property;
+ }
+
+ public static List<ClusterMerlin> fromString(List<String> clusterStrings) {
+ List<ClusterMerlin> clusters = new ArrayList<ClusterMerlin>();
+ for (String clusterString : clusterStrings) {
+ clusters.add(new ClusterMerlin(clusterString));
+ }
+ return clusters;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
index e313eee..a3d8e87 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
@@ -28,14 +28,6 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.regression.Entities.ClusterMerlin;
import org.apache.falcon.regression.Entities.FeedMerlin;
import org.apache.falcon.regression.Entities.ProcessMerlin;
-import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
-import org.apache.falcon.entity.v0.cluster.Interface;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Property;
-import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper;
import org.apache.falcon.regression.core.response.ServiceResponse;
@@ -125,15 +117,6 @@ public final class Util {
}
/**
- * @param data process definition
- * @return process name
- */
- public static String getProcessName(String data) {
- ProcessMerlin processElement = new ProcessMerlin(data);
- return processElement.getName();
- }
-
- /**
* @param data string data
* @return is data should be considered as XMl
*/
@@ -207,7 +190,7 @@ public final class Util {
public static List<String> getHadoopDataFromDir(FileSystem fs, String feed, String dir)
throws IOException {
List<String> finalResult = new ArrayList<String>();
- String feedPath = getFeedPath(feed);
+ String feedPath = new FeedMerlin(feed).getFeedPath();
int depth = feedPath.split(dir)[1].split("/").length - 1;
List<Path> results = HadoopUtil.getAllDirsRecursivelyHDFS(fs, new Path(dir), depth);
for (Path result : results) {
@@ -220,75 +203,6 @@ public final class Util {
}
/**
- * Sets custom feed property.
- * @param feed feed definition
- * @param propertyName custom property name
- * @param propertyValue custom property value
- * @return updated feed
- */
- public static String setFeedProperty(String feed, String propertyName, String propertyValue) {
- FeedMerlin feedObject = new FeedMerlin(feed);
- boolean found = false;
- for (Property prop : feedObject.getProperties().getProperties()) {
- //check if it is present
- if (prop.getName().equalsIgnoreCase(propertyName)) {
- prop.setValue(propertyValue);
- found = true;
- break;
- }
- }
- if (!found) {
- Property property = new Property();
- property.setName(propertyName);
- property.setValue(propertyValue);
- feedObject.getProperties().getProperties().add(property);
- }
- return feedObject.toString();
- }
-
- /**
- * @param feed feed definition
- * @return feed data path
- */
- public static String getFeedPath(String feed) {
- FeedMerlin feedObject = new FeedMerlin(feed);
- for (Location location : feedObject.getLocations().getLocations()) {
- if (location.getType() == LocationType.DATA) {
- return location.getPath();
- }
- }
- return null;
- }
-
- /**
- * Sets cut-off period.
- * @param feed feed definition
- * @param frequency cut-off period
- * @return updated feed
- */
- public static String insertLateFeedValue(String feed, Frequency frequency) {
- FeedMerlin feedObject = new FeedMerlin(feed);
- feedObject.getLateArrival().setCutOff(frequency);
- return feedObject.toString();
- }
-
- /**
- * Sets data location for a feed.
- * @param feed feed definition
- * @param pathValue new path
- * @return updated feed
- */
- public static String setFeedPathValue(String feed, String pathValue) {
- FeedMerlin feedObject = new FeedMerlin(feed);
- for (Location location : feedObject.getLocations().getLocations()) {
- if (location.getType() == LocationType.DATA) {
- location.setPath(pathValue);
- }
- }
- return feedObject.toString();
- }
-
- /**
* Finds first folder within a date range.
* @param startTime start date
* @param endTime end date
@@ -308,39 +222,6 @@ public final class Util {
return null;
}
- /**
- * @param feedString feed definition
- * @param newName new name
- * @return feed with updated name
- */
- public static String setFeedName(String feedString, String newName) {
- FeedMerlin feedObject = new FeedMerlin(feedString);
- feedObject.setName(newName);
- return feedObject.toString().trim();
- }
-
- /**
- * Sets name for a cluster by given order number.
- * @param feedString feed which contains a cluster
- * @param clusterName new cluster name
- * @param clusterIndex index of cluster which should be updated
- * @return feed with cluster name updated
- */
- public static String setClusterNameInFeed(String feedString, String clusterName,
- int clusterIndex) {
- FeedMerlin feedObject = new FeedMerlin(feedString);
- feedObject.getClusters().getClusters().get(clusterIndex).setName(clusterName);
- return feedObject.toString().trim();
- }
-
- /**
- * @param clusterXML cluster definition
- * @return cluster definition converted to object representation
- */
- public static ClusterMerlin getClusterObject(String clusterXML) {
- return new ClusterMerlin(clusterXML);
- }
-
public static List<String> getInstanceFinishTimes(ColoHelper coloHelper, String workflowId)
throws IOException, JSchException {
List<String> raw = ExecUtil.runRemoteScriptAsSudo(coloHelper.getProcessHelper()
@@ -436,14 +317,6 @@ public final class Util {
}
/**
- * @param processData process definition
- * @return process definition converted to object representation.
- */
- public static Process getProcessObject(String processData) {
- return new ProcessMerlin(processData);
- }
-
- /**
* Prints JMSConsumer messages content.
* @param messageConsumer the source JMSConsumer
* @throws JMSException
@@ -463,84 +336,6 @@ public final class Util {
}
/**
- * Configures cluster definition according to provided properties.
- * @param cluster cluster which should be configured
- * @param prefix current cluster prefix
- * @return modified cluster definition
- */
- public static String getEnvClusterXML(String cluster, String prefix) {
- ClusterMerlin clusterObject = getClusterObject(cluster);
- if ((null == prefix) || prefix.isEmpty()) {
- prefix = "";
- } else {
- prefix = prefix + ".";
- }
- String hcatEndpoint = Config.getProperty(prefix + "hcat_endpoint");
-
- //now read and set relevant values
- for (Interface iface : clusterObject.getInterfaces().getInterfaces()) {
- if (iface.getType() == Interfacetype.READONLY) {
- iface.setEndpoint(Config.getProperty(prefix + "cluster_readonly"));
- } else if (iface.getType() == Interfacetype.WRITE) {
- iface.setEndpoint(Config.getProperty(prefix + "cluster_write"));
- } else if (iface.getType() == Interfacetype.EXECUTE) {
- iface.setEndpoint(Config.getProperty(prefix + "cluster_execute"));
- } else if (iface.getType() == Interfacetype.WORKFLOW) {
- iface.setEndpoint(Config.getProperty(prefix + "oozie_url"));
- } else if (iface.getType() == Interfacetype.MESSAGING) {
- iface.setEndpoint(Config.getProperty(prefix + "activemq_url"));
- } else if (iface.getType() == Interfacetype.REGISTRY) {
- iface.setEndpoint(hcatEndpoint);
- }
- }
- //set colo name:
- clusterObject.setColo(Config.getProperty(prefix + "colo"));
- // properties in the cluster needed when secure mode is on
- if (MerlinConstants.IS_SECURE) {
- // get the properties object for the cluster
- org.apache.falcon.entity.v0.cluster.Properties clusterProperties =
- clusterObject.getProperties();
- // add the namenode principal to the properties object
- clusterProperties.getProperties().add(getFalconClusterPropertyObject(
- "dfs.namenode.kerberos.principal",
- Config.getProperty(prefix + "namenode.kerberos.principal", "none")));
-
- // add the hive meta store principal to the properties object
- clusterProperties.getProperties().add(getFalconClusterPropertyObject(
- "hive.metastore.kerberos.principal",
- Config.getProperty(prefix + "hive.metastore.kerberos.principal", "none")));
-
- // Until oozie has better integration with secure hive we need to send the properites to
- // falcon.
- // hive.metastore.sasl.enabled = true
- clusterProperties.getProperties()
- .add(getFalconClusterPropertyObject("hive.metastore.sasl.enabled", "true"));
- // Only set the metastore uri if its not empty or null.
- if (null != hcatEndpoint && !hcatEndpoint.isEmpty()) {
- //hive.metastore.uris
- clusterProperties.getProperties()
- .add(getFalconClusterPropertyObject("hive.metastore.uris", hcatEndpoint));
- }
- }
- return clusterObject.toString();
- }
-
- /**
- * Forms property object based on parameters.
- * @param name property name
- * @param value property value
- * @return property object
- */
- public static org.apache.falcon.entity.v0.cluster.Property
- getFalconClusterPropertyObject(String name, String value) {
- org.apache.falcon.entity.v0.cluster.Property property = new org
- .apache.falcon.entity.v0.cluster.Property();
- property.setName(name);
- property.setValue(value);
- return property;
- }
-
- /**
* Get entity type according to its definition.
* @param entity entity which is under analysis
* @return entity type
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
index 0b934dc..eaa69f0 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java
@@ -19,6 +19,7 @@
package org.apache.falcon.regression;
import org.apache.commons.httpclient.HttpStatus;
+import org.apache.falcon.regression.Entities.FeedMerlin;
import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
@@ -501,19 +502,18 @@ public class AuthorizationTest extends BaseTestClass {
public void u1SubmitU2UpdateFeed()
throws URISyntaxException, IOException, AuthenticationException, JAXBException,
InterruptedException {
- String feed = bundles[0].getInputFeedFromBundle();
+ FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
//submit feed
bundles[0].submitClusters(prism);
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed));
- String definition = prism.getFeedHelper().getEntityDefinition(feed).getMessage();
- Assert.assertTrue(definition.contains(Util
- .readEntityName(feed)) && !definition.contains("(feed) not found"),
+ AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed.toString()));
+ String definition = prism.getFeedHelper().getEntityDefinition(feed.toString()).getMessage();
+ Assert.assertTrue(definition.contains(feed.getName()) && !definition.contains("(feed) not found"),
"Feed should be already submitted");
//update feed definition
- String newFeed = Util.setFeedPathValue(feed, baseTestDir + "/randomPath"
- + MINUTE_DATE_PATTERN);
+ FeedMerlin newFeed = new FeedMerlin(feed.toString());
+ newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN);
//try to update feed by U2
- final ServiceResponse serviceResponse = prism.getFeedHelper().update(feed, newFeed,
+ final ServiceResponse serviceResponse = prism.getFeedHelper().update(feed.toString(), newFeed.toString(),
TimeUtil.getTimeWrtSystemTime(0),
MerlinConstants.USER2_NAME);
AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
@@ -524,16 +524,16 @@ public class AuthorizationTest extends BaseTestClass {
// .org/jira/browse/FALCON-388
@Test(enabled = false)
public void u1ScheduleU2UpdateFeed() throws Exception {
- String feed = bundles[0].getInputFeedFromBundle();
+ FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
//submit and schedule feed
bundles[0].submitClusters(prism);
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
+ AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
+ AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed.toString(), Job.Status.RUNNING);
//update feed definition
- String newFeed = Util.setFeedPathValue(feed, baseTestDir + "/randomPath"
- + MINUTE_DATE_PATTERN);
+ FeedMerlin newFeed = new FeedMerlin(feed);
+ newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN);
//try to update feed by U2
- final ServiceResponse serviceResponse = prism.getFeedHelper().update(feed, newFeed,
+ final ServiceResponse serviceResponse = prism.getFeedHelper().update(feed.toString(), newFeed.toString(),
TimeUtil.getTimeWrtSystemTime(0),
MerlinConstants.USER2_NAME);
AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST,
@@ -588,15 +588,15 @@ public class AuthorizationTest extends BaseTestClass {
// .org/jira/browse/FALCON-388
@Test(enabled = false)
public void u1ScheduleFeedU2ScheduleDependantProcessU1UpdateFeed() throws Exception {
- String feed = bundles[0].getInputFeedFromBundle();
+ FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
String process = bundles[0].getProcessData();
process = InstanceUtil.setProcessValidity(process, "2010-01-02T01:00Z", "2099-01-02T01:00Z");
//submit both feeds
bundles[0].submitClusters(prism);
bundles[0].submitFeeds(prism);
//schedule input feed by U1
- AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed));
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
+ AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed.toString()));
+ AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed.toString(), Job.Status.RUNNING);
//by U2 schedule process dependant on scheduled feed by U1
ServiceResponse serviceResponse = prism.getProcessHelper()
@@ -611,24 +611,22 @@ public class AuthorizationTest extends BaseTestClass {
getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
//get old feed details
- String oldFeedBundleId = InstanceUtil
- .getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED);
- String oldFeedUser =
- getBundleUser(cluster, Util.readEntityName(feed), EntityType.FEED);
+ String oldFeedBundleId = InstanceUtil.getLatestBundleID(cluster, feed.getName(), EntityType.FEED);
+ String oldFeedUser = getBundleUser(cluster, feed.getName(), EntityType.FEED);
//update feed definition
- String newFeed = Util.setFeedPathValue(feed, baseTestDir + "/randomPath"
- + MINUTE_DATE_PATTERN);
+ FeedMerlin newFeed = new FeedMerlin(feed.toString());
+ newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN);
//update feed by U1
- serviceResponse = prism.getFeedHelper().update(feed, newFeed,
+ serviceResponse = prism.getFeedHelper().update(feed.toString(), newFeed.toString(),
TimeUtil.getTimeWrtSystemTime(0), MerlinConstants.CURRENT_USER_NAME);
AssertUtil.assertSucceeded(serviceResponse);
//new feed bundle should be created by U1
- OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, newFeed, true, false);
+ OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, newFeed.toString(), true, false);
String newFeedUser =
- getBundleUser(cluster, Util.readEntityName(newFeed), EntityType.FEED);
+ getBundleUser(cluster, newFeed.getName(), EntityType.FEED);
Assert.assertEquals(oldFeedUser, newFeedUser, "User should be the same");
//new process bundle should be created by U2
@@ -642,15 +640,15 @@ public class AuthorizationTest extends BaseTestClass {
// .org/jira/browse/FALCON-388
@Test(enabled = false)
public void u1ScheduleFeedU2ScheduleDependantProcessU2UpdateFeed() throws Exception {
- String feed = bundles[0].getInputFeedFromBundle();
+ FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
String process = bundles[0].getProcessData();
process = InstanceUtil.setProcessValidity(process, "2010-01-02T01:00Z", "2099-01-02T01:00Z");
//submit both feeds
bundles[0].submitClusters(prism);
bundles[0].submitFeeds(prism);
//schedule input feed by U1
- AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed));
- AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING);
+ AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed.toString()));
+ AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed.toString(), Job.Status.RUNNING);
//by U2 schedule process dependent on scheduled feed by U1
ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(process,
@@ -659,8 +657,8 @@ public class AuthorizationTest extends BaseTestClass {
AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
//update feed definition
- String newFeed = Util.setFeedPathValue(feed, baseTestDir + "/randomPath"
- + MINUTE_DATE_PATTERN);
+ FeedMerlin newFeed = new FeedMerlin(feed.toString());
+ newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN);
//get old process details
String oldProcessBundleId = InstanceUtil
@@ -669,20 +667,17 @@ public class AuthorizationTest extends BaseTestClass {
getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
//get old feed details
- String oldFeedBundleId = InstanceUtil
- .getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED);
- String oldFeedUser =
- getBundleUser(cluster, Util.readEntityName(feed), EntityType.FEED);
+ String oldFeedBundleId = InstanceUtil.getLatestBundleID(cluster, feed.getName(), EntityType.FEED);
+ String oldFeedUser = getBundleUser(cluster, feed.getName(), EntityType.FEED);
//update feed by U2
- serviceResponse = prism.getFeedHelper().update(feed, newFeed,
+ serviceResponse = prism.getFeedHelper().update(feed.toString(), newFeed.toString(),
TimeUtil.getTimeWrtSystemTime(0), MerlinConstants.USER2_NAME);
AssertUtil.assertSucceeded(serviceResponse);
//new feed bundle should be created by U2
- OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, newFeed, true, false);
- String newFeedUser =
- getBundleUser(cluster, Util.readEntityName(newFeed), EntityType.FEED);
+ OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, newFeed.toString(), true, false);
+ String newFeedUser = getBundleUser(cluster, newFeed.getName(), EntityType.FEED);
Assert.assertNotEquals(oldFeedUser, newFeedUser, "User should not be the same");
Assert.assertEquals(MerlinConstants.USER2_NAME, newFeedUser);
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
index 7cce198..b7eb77f 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java
@@ -18,6 +18,7 @@
package org.apache.falcon.regression;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
@@ -148,7 +149,7 @@ public class ELExpCurrentAndLastWeekTest extends BaseTestClass {
private List<String> getMissingDependencies(ColoHelper prismHelper, Bundle bundle) throws OozieClientException {
List<String> bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(),
- Util.getProcessName(bundle.getProcessData()), EntityType.PROCESS);
+ new ProcessMerlin(bundle.getProcessData()).getName(), EntityType.PROCESS);
String coordID = bundles.get(0);
List<String> missingDependencies =
OozieUtil.getMissingDependencies(prismHelper, coordID);
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
index 18efe9b..41e3002 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java
@@ -19,6 +19,7 @@
package org.apache.falcon.regression;
import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.util.BundleUtil;
@@ -156,7 +157,7 @@ public class ELValidationsTest extends BaseTestClass {
List<String> bundles = null;
for (int i = 0; i < 10; ++i) {
bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(),
- Util.getProcessName(bundle.getProcessData()), EntityType.PROCESS);
+ new ProcessMerlin(bundle.getProcessData()).getName(), EntityType.PROCESS);
if (bundles.size() > 0) {
break;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
index bddf4ef..69be47a 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
@@ -163,8 +163,7 @@ public class EmbeddedPigScriptTest extends BaseTestClass {
InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
int counter = OSUtil.IS_WINDOWS ? 100 : 50;
- InstanceUtil.waitForBundleToReachState(cluster, Util.getProcessName(bundles[0]
- .getProcessData()), Job.Status.SUCCEEDED, counter);
+ InstanceUtil.waitForBundleToReachState(cluster, bundles[0].getProcessName(), Job.Status.SUCCEEDED, counter);
r = prism.getProcessHelper().getRunningInstance(processName);
InstanceUtil.validateSuccessWOInstances(r);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
index a666796..13a9776 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java
@@ -19,6 +19,7 @@
package org.apache.falcon.regression;
+import org.apache.falcon.regression.Entities.FeedMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
@@ -94,11 +95,10 @@ public class NewRetryTest extends BaseTestClass {
endDate = new DateTime(DateTimeZone.UTC).plusMinutes(2);
bundles[0].setProcessValidity(startDate, endDate);
- String feed =
- Util.setFeedPathValue(bundles[0].getInputFeedFromBundle(), latePath);
- feed = Util.insertLateFeedValue(feed, new Frequency("minutes(8)"));
+ FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+ feed.setFeedPathValue(latePath).insertLateFeedValue(new Frequency("minutes(8)"));
bundles[0].getDataSets().remove(bundles[0].getInputFeedFromBundle());
- bundles[0].getDataSets().add(feed);
+ bundles[0].getDataSets().add(feed.toString());
bundles[0].setOutputFeedLocationData(baseTestDir + "/output" + MINUTE_DATE_PATTERN);
bundles[0].submitClusters(prism);
}
@@ -678,11 +678,11 @@ public class NewRetryTest extends BaseTestClass {
@Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
public void testRetryInSuspendedAndResumeCaseWithLateData(Retry retry) throws Exception {
- String feed =
- Util.setFeedPathValue(bundles[0].getInputFeedFromBundle(), latePath);
- feed = Util.insertLateFeedValue(feed, new Frequency("minutes(10)"));
+ FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+ feed.setFeedPathValue(latePath);
+ feed.insertLateFeedValue(new Frequency("minutes(10)"));
bundles[0].getDataSets().remove(bundles[0].getInputFeedFromBundle());
- bundles[0].getDataSets().add(feed);
+ bundles[0].getDataSets().add(feed.toString());
bundles[0].setRetry(retry);
for (String data : bundles[0].getDataSets()) {
@@ -772,13 +772,13 @@ public class NewRetryTest extends BaseTestClass {
@Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
public void testRetryInLateDataCase(Retry retry) throws Exception {
- String feed =
- Util.setFeedPathValue(bundles[0].getInputFeedFromBundle(), latePath);
+ FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+ feed.setFeedPathValue(latePath);
- feed = Util.insertLateFeedValue(feed, getFrequency(retry));
+ feed.insertLateFeedValue(getFrequency(retry));
bundles[0].getDataSets().remove(bundles[0].getInputFeedFromBundle());
- bundles[0].getDataSets().add(feed);
+ bundles[0].getDataSets().add(feed.toString());
bundles[0].setRetry(retry);
@@ -852,11 +852,11 @@ public class NewRetryTest extends BaseTestClass {
@Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false)
public void testRetryInDeleteAfterPartialRetryCase(Retry retry) throws Exception {
- String feed =
- Util.setFeedPathValue(bundles[0].getInputFeedFromBundle(), latePath);
- feed = Util.insertLateFeedValue(feed, new Frequency("minutes(1)"));
+ FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+ feed.setFeedPathValue(latePath);
+ feed.insertLateFeedValue(new Frequency("minutes(1)"));
bundles[0].getDataSets().remove(bundles[0].getInputFeedFromBundle());
- bundles[0].getDataSets().add(feed);
+ bundles[0].getDataSets().add(feed.toString());
bundles[0].setRetry(retry);
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
index 2fc4779..48cb59b 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
@@ -101,109 +101,107 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass {
public void mixed01C1sC2sC1eC2e() throws Exception {
//ua1 and ua3 are source. ua2 target. feed01 on ua1 , feed02 on ua3
//get 2 unique feeds
- String feed01 = bundles[0].getInputFeedFromBundle();
- String feed02 = bundles[1].getInputFeedFromBundle();
- String outputFeed = bundles[0].getOutputFeedFromBundle();
+ FeedMerlin feed01 = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+ FeedMerlin feed02 = new FeedMerlin(bundles[1].getInputFeedFromBundle());
+ FeedMerlin outputFeed = new FeedMerlin(bundles[0].getOutputFeedFromBundle());
//set source and target for the 2 feeds
//set clusters to null;
- feed01 = FeedMerlin.fromString(feed01).clearFeedClusters().toString();
- feed02 = FeedMerlin.fromString(feed02).clearFeedClusters().toString();
- outputFeed = FeedMerlin.fromString(outputFeed).clearFeedClusters().toString();
+ feed01.clearFeedClusters();
+ feed02.clearFeedClusters();
+ outputFeed.clearFeedClusters();
//set new feed input data
- feed01 = Util.setFeedPathValue(feed01, String.format(feedPath, 1));
- feed02 = Util.setFeedPathValue(feed02, String.format(feedPath, 2));
+ feed01.setFeedPathValue(String.format(feedPath, 1));
+ feed02.setFeedPathValue(String.format(feedPath, 2));
//generate data in both the colos ua1 and ua3
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(
TimeUtil.getTimeWrtSystemTime(-35), TimeUtil.getTimeWrtSystemTime(25), 1);
- String prefix = InstanceUtil.getFeedPrefix(feed01);
+ String prefix = InstanceUtil.getFeedPrefix(feed01.toString());
HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS);
HadoopUtil.flattenAndPutDataInFolder(cluster1FS, OSUtil.SINGLE_FILE, prefix, dataDates);
- prefix = InstanceUtil.getFeedPrefix(feed02);
+ prefix = InstanceUtil.getFeedPrefix(feed02.toString());
HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS);
HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.SINGLE_FILE, prefix, dataDates);
String startTime = TimeUtil.getTimeWrtSystemTime(-70);
//set clusters for feed01
- feed01 = FeedMerlin.fromString(feed01).addFeedCluster(
+ feed01.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
.withRetention("days(10000)", ActionType.DELETE)
.withValidity(startTime, "2099-01-01T00:00Z")
.withClusterType(ClusterType.SOURCE)
- .build()).toString();
- feed01 = FeedMerlin.fromString(feed01).addFeedCluster(
+ .build());
+ feed01.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
.withRetention("days(10000)", ActionType.DELETE)
.withValidity(startTime, "2099-01-01T00:00Z")
.withClusterType(ClusterType.TARGET)
- .build()).toString();
+ .build());
//set clusters for feed02
- feed02 = FeedMerlin.fromString(feed02).addFeedCluster(
+ feed02.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
.withRetention("days(10000)", ActionType.DELETE)
.withValidity(startTime, "2099-01-01T00:00Z")
.withClusterType(ClusterType.TARGET)
- .build()).toString();
- feed02 = FeedMerlin.fromString(feed02).addFeedCluster(
+ .build());
+ feed02.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
.withRetention("days(10000)", ActionType.DELETE)
.withValidity(startTime, "2099-01-01T00:00Z")
.withClusterType(ClusterType.SOURCE)
- .build()).toString();
+ .build());
//set clusters for output feed
- outputFeed = FeedMerlin.fromString(outputFeed).addFeedCluster(
+ outputFeed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
.withRetention("days(10000)", ActionType.DELETE)
.withValidity(startTime, "2099-01-01T00:00Z")
.withClusterType(ClusterType.SOURCE)
- .build()).toString();
- outputFeed = FeedMerlin.fromString(outputFeed).addFeedCluster(
+ .build());
+ outputFeed.addFeedCluster(
new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
.withRetention("days(10000)", ActionType.DELETE)
.withValidity(startTime, "2099-01-01T00:00Z")
.withClusterType(ClusterType.TARGET)
- .build()).toString();
+ .build());
//submit and schedule feeds
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed01));
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed02));
- AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(outputFeed));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed01.toString()));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed02.toString()));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(outputFeed.toString()));
String processStartTime = TimeUtil.getTimeWrtSystemTime(-16);
// String processEndTime = InstanceUtil.getTimeWrtSystemTime(20);
- String process = bundles[0].getProcessData();
- process = ProcessMerlin.fromString(process).clearProcessCluster().toString();
- process = ProcessMerlin.fromString(process).addProcessCluster(
+ ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+ process.clearProcessCluster();
+ process.addProcessCluster(
new ProcessMerlin.ProcessClusterBuilder(
Util.readEntityName(bundles[0].getClusters().get(0)))
.withValidity(processStartTime, TimeUtil.addMinsToTime(processStartTime, 35))
- .build())
- .toString();
- process = ProcessMerlin.fromString(process).addProcessCluster(
+ .build());
+ process.addProcessCluster(
new ProcessMerlin.ProcessClusterBuilder(
Util.readEntityName(bundles[1].getClusters().get(0)))
.withValidity(TimeUtil.addMinsToTime(processStartTime, 16),
TimeUtil.addMinsToTime(processStartTime, 45))
- .build())
- .toString();
- process = InstanceUtil.addProcessInputFeed(process, Util.readEntityName(feed02),
- Util.readEntityName(feed02));
+ .build());
+ process = new ProcessMerlin(InstanceUtil.addProcessInputFeed(process.toString(), feed02.getName(),
+ feed02.getName()));
//submit and schedule process
- prism.getProcessHelper().submitAndSchedule(process);
+ prism.getProcessHelper().submitAndSchedule(process.toString());
LOGGER.info("Wait till process goes into running ");
- InstanceUtil.waitTillInstanceReachState(serverOC.get(0), Util.getProcessName(process), 1,
+ InstanceUtil.waitTillInstanceReachState(serverOC.get(0), process.getName(), 1,
Status.RUNNING, EntityType.PROCESS);
- InstanceUtil.waitTillInstanceReachState(serverOC.get(1), Util.getProcessName(process), 1,
+ InstanceUtil.waitTillInstanceReachState(serverOC.get(1), process.getName(), 1,
Status.RUNNING, EntityType.PROCESS);
final String processName = Util.readEntityName(bundles[0].getProcessData());
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
index 588355a..26348bd 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
@@ -18,6 +18,7 @@
package org.apache.falcon.regression;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
@@ -114,8 +115,8 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
bundles[0].submitFeedsScheduleProcess(prism);
InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, Util.getProcessName(bundles[0]
- .getProcessData()), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, new ProcessMerlin(bundles[0]
+ .getProcessData()).getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
InstancesResult r = prism.getProcessHelper().getProcessInstanceSuspend(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T01:01Z");
AssertUtil.assertSucceeded(r);
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
index dc032f1..b41cf05 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
@@ -101,12 +101,11 @@ public class ProcessLateRerunTest extends BaseTestClass {
LOGGER.info("Waiting...");
TimeUtil.sleepSeconds(60);
}
- InstanceUtil.waitTillInstanceReachState(cluster1OC,
- Util.getProcessName(bundles[0].getProcessData()), 1,
+ InstanceUtil.waitTillInstanceReachState(cluster1OC, bundles[0].getProcessName(), 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
List<String> bundleList = OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(),
- Util.getProcessName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
String bundleID = bundleList.get(0);
OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1);
@@ -143,12 +142,11 @@ public class ProcessLateRerunTest extends BaseTestClass {
LOGGER.info("Waiting...");
TimeUtil.sleepSeconds(60);
}
- InstanceUtil.waitTillInstanceReachState(cluster1OC,
- Util.getProcessName(bundles[0].getProcessData()), 1,
+ InstanceUtil.waitTillInstanceReachState(cluster1OC, bundles[0].getProcessName(), 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
List<String> bundleList = OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(),
- Util.getProcessName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
String bundleID = bundleList.get(0);
OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1);
@@ -189,12 +187,11 @@ public class ProcessLateRerunTest extends BaseTestClass {
LOGGER.info("Waiting...");
TimeUtil.sleepSeconds(60);
}
- InstanceUtil.waitTillInstanceReachState(cluster1OC,
- Util.getProcessName(bundles[0].getProcessData()), 1,
+ InstanceUtil.waitTillInstanceReachState(cluster1OC, bundles[0].getProcessName(), 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
List<String> bundleList = OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(),
- Util.getProcessName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
String bundleID = bundleList.get(0);
OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1);
@@ -247,12 +244,11 @@ public class ProcessLateRerunTest extends BaseTestClass {
TimeUtil.sleepSeconds(60);
}
- InstanceUtil.waitTillInstanceReachState(cluster1OC,
- Util.getProcessName(bundles[0].getProcessData()), 1,
+ InstanceUtil.waitTillInstanceReachState(cluster1OC, bundles[0].getProcessName(), 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
List<String> bundleList = OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(),
- Util.getProcessName(bundles[0].getProcessData()), EntityType.PROCESS);
+ bundles[0].getProcessName(), EntityType.PROCESS);
String bundleID = bundleList.get(0);
OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 0);
@@ -269,7 +265,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
List<String> bundles = null;
for (int i = 0; i < 10; ++i) {
bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(),
- Util.getProcessName(bundle.getProcessData()), EntityType.PROCESS);
+ bundle.getProcessName(), EntityType.PROCESS);
if (bundles.size() > 0) {
break;
}
@@ -292,8 +288,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
//create missing dependencies
LOGGER.info("Creating missing dependencies...");
- OozieUtil.createMissingDependencies(prismHelper, EntityType.PROCESS,
- Util.getProcessName(bundle.getProcessData()), 0, 0);
+ OozieUtil.createMissingDependencies(prismHelper, EntityType.PROCESS, bundle.getProcessName(), 0, 0);
//Adding data to empty folders depending on dataFlag
if (dataFlag) {
@@ -309,8 +304,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
//Process succeeding on empty folders
LOGGER.info("Waiting for process to succeed...");
- InstanceUtil.waitTillInstanceReachState(oozieClient,
- Util.getProcessName(bundle.getProcessData()), 1,
+ InstanceUtil.waitTillInstanceReachState(oozieClient, bundle.getProcessName(), 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
TimeUtil.sleepSeconds(30);
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
index d23d666..202298e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java
@@ -516,12 +516,12 @@ public class HCatProcessTest extends BaseTestClass {
bundles[0].setInputFeedValidity(startDate, endDate);
//
- String nonHCatFeed = BundleUtil.readELBundle().getOutputFeedFromBundle();
+ FeedMerlin nonHCatFeed = new FeedMerlin(BundleUtil.readELBundle().getOutputFeedFromBundle());
final String outputFeedName = bundles[0].getOutputFeedNameFromBundle();
- nonHCatFeed = Util.setFeedName(nonHCatFeed, outputFeedName);
+ nonHCatFeed.setName(outputFeedName);
final List<String> clusterNames = bundles[0].getClusterNames();
Assert.assertEquals(clusterNames.size(), 1, "Expected only one cluster in the bundle.");
- nonHCatFeed = Util.setClusterNameInFeed(nonHCatFeed, clusterNames.get(0), 0);
+ nonHCatFeed.setClusterNameInFeed(clusterNames.get(0), 0);
bundles[0].writeFeedElement(nonHCatFeed, outputFeedName);
bundles[0].setOutputFeedLocationData(outputHDFSDir + "/"
+ StringUtils.join(new String[]{"${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator));
@@ -567,12 +567,12 @@ public class HCatProcessTest extends BaseTestClass {
.location(outputHDFSDir)
.build());
- String nonHCatFeed = BundleUtil.readELBundle().getInputFeedFromBundle();
+ FeedMerlin nonHCatFeed = new FeedMerlin(BundleUtil.readELBundle().getInputFeedFromBundle());
final String inputFeedName = bundles[0].getInputFeedNameFromBundle();
- nonHCatFeed = Util.setFeedName(nonHCatFeed, inputFeedName);
+ nonHCatFeed.setName(inputFeedName);
final List<String> clusterNames = bundles[0].getClusterNames();
Assert.assertEquals(clusterNames.size(), 1, "Expected only one cluster in the bundle.");
- nonHCatFeed = Util.setClusterNameInFeed(nonHCatFeed, clusterNames.get(0), 0);
+ nonHCatFeed.setClusterNameInFeed(clusterNames.get(0), 0);
bundles[0].writeFeedElement(nonHCatFeed, inputFeedName);
bundles[0].setInputFeedDataPath(inputHDFSDir + "/"
+ StringUtils.join(new String[]{"${YEAR}", "${MONTH}", "${DAY}", "${HOUR}"}, separator));
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java
index 2ec2b20..64a7e2e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java
@@ -32,7 +32,6 @@ import org.apache.falcon.regression.core.util.InstanceUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.falcon.resource.InstancesResult;
import org.apache.hadoop.fs.FileSystem;
@@ -105,8 +104,7 @@ public class LineageApiProcessInstanceTest extends BaseTestClass {
outputFeedName = bundles[0].getOutputFeedNameFromBundle();
Job.Status status = null;
for (int i = 0; i < 20; i++) {
- status = InstanceUtil.getDefaultCoordinatorStatus(cluster,
- Util.getProcessName(bundles[0].getProcessData()), 0);
+ status = InstanceUtil.getDefaultCoordinatorStatus(cluster, bundles[0].getProcessName(), 0);
if (status == Job.Status.SUCCEEDED || status == Job.Status.KILLED) {
break;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/EntityDryRunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/EntityDryRunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/EntityDryRunTest.java
index e87579f..32d5e24 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/EntityDryRunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/EntityDryRunTest.java
@@ -19,6 +19,7 @@
package org.apache.falcon.regression.prism;
import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.Entities.FeedMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.response.ServiceResponse;
@@ -28,7 +29,6 @@ import org.apache.falcon.regression.core.util.HadoopUtil;
import org.apache.falcon.regression.core.util.OSUtil;
import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
-import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
@@ -115,10 +115,10 @@ public class EntityDryRunTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testDryRunFailureScheduleFeed() throws Exception {
- String feed = bundles[0].getInputFeedFromBundle();
- feed = Util.setFeedProperty(feed, "EntityDryRunTestProp", "${coord:someEL(1)");
+ FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+ feed.setFeedProperty("EntityDryRunTestProp", "${coord:someEL(1)");
bundles[0].submitClusters(prism);
- ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed);
+ ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed.toString());
validate(response,
"Entity schedule failed for feed: " + bundles[0].getInputFeedNameFromBundle());
}
@@ -129,15 +129,15 @@ public class EntityDryRunTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testDryRunFailureUpdateFeed() throws Exception {
bundles[0].submitClusters(prism);
- String feed = bundles[0].getInputFeedFromBundle();
- ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed);
+ FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+ ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed.toString());
AssertUtil.assertSucceeded(response);
- feed = Util.setFeedProperty(feed, "EntityDryRunTestProp", "${coord:someEL(1)");
- response = prism.getFeedHelper().update(feed, feed);
+ feed.setFeedProperty("EntityDryRunTestProp", "${coord:someEL(1)");
+ response = prism.getFeedHelper().update(feed.toString(), feed.toString());
validate(response, "The new entity (feed) " + bundles[0].getInputFeedNameFromBundle()
+ " can't be scheduled");
Assert.assertEquals(
- OozieUtil.getNumberOfBundle(clusterOC, EntityType.FEED, Util.readEntityName(feed)), 1,
+ OozieUtil.getNumberOfBundle(clusterOC, EntityType.FEED, feed.getName()), 1,
"more than one bundle found after failed update request");
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayParallelTimeoutTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayParallelTimeoutTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayParallelTimeoutTest.java
index a829bd1..8474401 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayParallelTimeoutTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayParallelTimeoutTest.java
@@ -73,12 +73,12 @@ public class FeedDelayParallelTimeoutTest extends BaseTestClass {
bundles[0].setInputFeedDataPath(feedInputPath);
Bundle.submitCluster(bundles[0], bundles[1]);
- String feedOutput01 = bundles[0].getDataSets().get(0);
+ FeedMerlin feedOutput01 = new FeedMerlin(bundles[0].getDataSets().get(0));
org.apache.falcon.entity.v0.Frequency delay =
new org.apache.falcon.entity.v0.Frequency(
"hours(5)");
- feedOutput01 = FeedMerlin.fromString(feedOutput01).clearFeedClusters().toString();
+ feedOutput01.clearFeedClusters();
// uncomment below 2 line when falcon in sync with falcon
@@ -104,10 +104,9 @@ public class FeedDelayParallelTimeoutTest extends BaseTestClass {
// Util.readClusterName(bundles[0].getClusters().get(0)),ClusterType.TARGET,"",
// feedOutputPath);
- feedOutput01 = Util.setFeedProperty(feedOutput01, "timeout", "minutes(35)");
- feedOutput01 = Util.setFeedProperty(feedOutput01, "parallel", "3");
+ feedOutput01.setFeedProperty("timeout", "minutes(35)").setFeedProperty("parallel", "3");
- LOGGER.info("feedOutput01: " + Util.prettyPrintXml(feedOutput01));
- prism.getFeedHelper().submitAndSchedule(feedOutput01);
+ LOGGER.info("feedOutput01: " + Util.prettyPrintXml(feedOutput01.toString()));
+ prism.getFeedHelper().submitAndSchedule(feedOutput01.toString());
}
}