You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:23 UTC
[24/27] adding falcon-regression
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java
new file mode 100644
index 0000000..11d3fb0
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.Entities;
+
+import org.apache.commons.beanutils.PropertyUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.Property;
+import org.apache.falcon.entity.v0.feed.Retention;
+import org.apache.falcon.entity.v0.feed.RetentionType;
+import org.apache.falcon.entity.v0.feed.Validity;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+import javax.xml.bind.JAXBException;
+import java.io.StringWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FeedMerlin extends Feed {
+
+ private static final Logger logger = Logger.getLogger(FeedMerlin.class);
+
+ public FeedMerlin(String feedData) {
+ this((Feed) fromString(EntityType.FEED, feedData));
+ }
+
+ public FeedMerlin(final Feed feed) {
+ try {
+ PropertyUtils.copyProperties(this, feed);
+ } catch (IllegalAccessException e) {
+ Assert.fail("Can't create ClusterMerlin: " + ExceptionUtils.getStackTrace(e));
+ } catch (InvocationTargetException e) {
+ Assert.fail("Can't create ClusterMerlin: " + ExceptionUtils.getStackTrace(e));
+ } catch (NoSuchMethodException e) {
+ Assert.fail("Can't create ClusterMerlin: " + ExceptionUtils.getStackTrace(e));
+ }
+ }
+
+ public static List<FeedMerlin> fromString(List<String> feedStrings) {
+ List <FeedMerlin> feeds = new ArrayList<FeedMerlin>();
+ for (String feedString : feedStrings) {
+ feeds.add(new FeedMerlin(feedString));
+ }
+ return feeds;
+ }
+
+ /**
+ * Method sets a number of clusters to feed definition
+ *
+ * @param newClusters list of definitions of clusters which are to be set to feed
+ * @param location location of data on every cluster
+ * @param startTime start of feed validity on every cluster
+ * @param endTime end of feed validity on every cluster
+ */
+ public void setFeedClusters(List<String> newClusters, String location, String startTime,
+ String endTime) {
+ Clusters cs = new Clusters();
+ setFrequency(new Frequency("" + 5, Frequency.TimeUnit.minutes));
+
+ for (String newCluster : newClusters) {
+ Cluster c = new Cluster();
+ c.setName(new ClusterMerlin(newCluster).getName());
+ Location l = new Location();
+ l.setType(LocationType.DATA);
+ l.setPath(location + "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+ Locations ls = new Locations();
+ ls.getLocations().add(l);
+ c.setLocations(ls);
+ Validity v = new Validity();
+ startTime = TimeUtil.addMinsToTime(startTime, -180);
+ endTime = TimeUtil.addMinsToTime(endTime, 180);
+ v.setStart(TimeUtil.oozieDateToDate(startTime).toDate());
+ v.setEnd(TimeUtil.oozieDateToDate(endTime).toDate());
+ c.setValidity(v);
+ Retention r = new Retention();
+ r.setAction(ActionType.DELETE);
+ Frequency f1 = new Frequency("" + 20, Frequency.TimeUnit.hours);
+ r.setLimit(f1);
+ r.setType(RetentionType.INSTANCE);
+ c.setRetention(r);
+ cs.getClusters().add(c);
+ }
+ setClusters(cs);
+ }
+
+ public void setRetentionValue(String retentionValue) {
+ for (org.apache.falcon.entity.v0.feed.Cluster cluster : getClusters().getClusters()) {
+ cluster.getRetention().setLimit(new Frequency(retentionValue));
+ }
+ }
+
+ public void setTableValue(String dBName, String tableName, String pathValue) {
+ getTable().setUri("catalog:" + dBName + ":" + tableName + "#" + pathValue);
+ }
+
+ @Override
+ public String toString() {
+ try {
+ StringWriter sw = new StringWriter();
+ EntityType.FEED.getMarshaller().marshal(this, sw);
+ return sw.toString();
+ } catch (JAXBException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void setLocation(LocationType locationType, String feedInputPath) {
+ for (Location location : getLocations().getLocations()) {
+ if(location.getType() == locationType) {
+ location.setPath(feedInputPath);
+ }
+ }
+ }
+
+ public void addProperty(String someProp, String someVal) {
+ Property property = new Property();
+ property.setName(someProp);
+ property.setValue(someVal);
+ this.getProperties().getProperties().add(property);
+ }
+
+ /**
+ * Sets unique names for the feed
+ * @return mapping of old name to new name
+ */
+ public Map<? extends String, ? extends String> setUniqueName() {
+ final String oldName = getName();
+ final String newName = oldName + Util.getUniqueString();
+ setName(newName);
+ final HashMap<String, String> nameMap = new HashMap<String, String>(1);
+ nameMap.put(oldName, newName);
+ return nameMap;
+ }
+
+ public void renameClusters(Map<String, String> clusterNameMap) {
+ for (Cluster cluster : getClusters().getClusters()) {
+ final String oldName = cluster.getName();
+ final String newName = clusterNameMap.get(oldName);
+ if(!StringUtils.isEmpty(newName)) {
+ cluster.setName(newName);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
new file mode 100644
index 0000000..b9db07e
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.Entities;
+
+import org.apache.commons.beanutils.PropertyUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Cluster;
+import org.apache.falcon.entity.v0.process.Clusters;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Inputs;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Outputs;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Properties;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.hadoop.fs.FileSystem;
+import org.testng.Assert;
+
+import javax.xml.bind.JAXBException;
+import java.io.StringWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ProcessMerlin extends Process {
+ public ProcessMerlin(String processData) {
+ this((Process) fromString(EntityType.PROCESS, processData));
+ }
+
+ public ProcessMerlin(final Process process) {
+ try {
+ PropertyUtils.copyProperties(this, process);
+ } catch (IllegalAccessException e) {
+ Assert.fail("Can't create ClusterMerlin: " + ExceptionUtils.getStackTrace(e));
+ } catch (InvocationTargetException e) {
+ Assert.fail("Can't create ClusterMerlin: " + ExceptionUtils.getStackTrace(e));
+ } catch (NoSuchMethodException e) {
+ Assert.fail("Can't create ClusterMerlin: " + ExceptionUtils.getStackTrace(e));
+ }
+ }
+
+ /**
+ * Method sets a number of clusters to process definition
+ *
+ * @param newClusters list of definitions of clusters which are to be set to process
+ * (clusters on which process should run)
+ * @param startTime start of process validity on every cluster
+ * @param endTime end of process validity on every cluster
+ */
+ public void setProcessClusters(List<String> newClusters, String startTime, String endTime) {
+ Clusters cs = new Clusters();
+ for (String newCluster : newClusters) {
+ Cluster c = new Cluster();
+ c.setName(new ClusterMerlin(newCluster).getName());
+ org.apache.falcon.entity.v0.process.Validity v =
+ new org.apache.falcon.entity.v0.process.Validity();
+ v.setStart(TimeUtil.oozieDateToDate(startTime).toDate());
+ v.setEnd(TimeUtil.oozieDateToDate(endTime).toDate());
+ c.setValidity(v);
+ cs.getClusters().add(c);
+ }
+ setClusters(cs);
+ }
+
+ public Bundle setFeedsToGenerateData(FileSystem fs, Bundle b) {
+ Date start = getClusters().getClusters().get(0).getValidity().getStart();
+ Format formatter = new SimpleDateFormat("yyyy'-'MM'-'dd'T'HH':'mm'Z'");
+ String startDate = formatter.format(start);
+ Date end = getClusters().getClusters().get(0).getValidity().getEnd();
+ String endDate = formatter.format(end);
+
+ Map<String, FeedMerlin> inpFeeds = getInputFeeds(b);
+ for (FeedMerlin feedElement : inpFeeds.values()) {
+ feedElement.getClusters().getClusters().get(0).getValidity()
+ .setStart(TimeUtil.oozieDateToDate(startDate).toDate());
+ feedElement.getClusters().getClusters().get(0).getValidity()
+ .setEnd(TimeUtil.oozieDateToDate(endDate).toDate());
+ b.writeFeedElement(feedElement, feedElement.getName());
+ }
+ return b;
+ }
+
+ public Map<String, FeedMerlin> getInputFeeds(Bundle b) {
+ Map<String, FeedMerlin> inpFeeds = new HashMap<String, FeedMerlin>();
+ for (Input input : getInputs().getInputs()) {
+ for (String feed : b.getDataSets()) {
+ if (Util.readEntityName(feed).equalsIgnoreCase(input.getFeed())) {
+ FeedMerlin feedO = new FeedMerlin(feed);
+ inpFeeds.put(Util.readEntityName(feed), feedO);
+ break;
+ }
+ }
+ }
+ return inpFeeds;
+ }
+
+ public final void setProperty(String name, String value) {
+ Property p = new Property();
+ p.setName(name);
+ p.setValue(value);
+ if (null == getProperties() || null == getProperties()
+ .getProperties() || getProperties().getProperties().size()
+ <= 0) {
+ Properties props = new Properties();
+ props.getProperties().add(p);
+ setProperties(props);
+ } else {
+ getProperties().getProperties().add(p);
+ }
+ }
+
+ @Override
+ public String toString() {
+ try {
+ StringWriter sw = new StringWriter();
+ EntityType.PROCESS.getMarshaller().marshal(this, sw);
+ return sw.toString();
+ } catch (JAXBException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void renameClusters(Map<String, String> clusterNameMap) {
+ for (Cluster cluster : getClusters().getClusters()) {
+ final String oldName = cluster.getName();
+ final String newName = clusterNameMap.get(oldName);
+ if (!StringUtils.isEmpty(newName)) {
+ cluster.setName(newName);
+ }
+ }
+ }
+
+ public void renameFeeds(Map<String, String> feedNameMap) {
+ for(Input input : getInputs().getInputs()) {
+ final String oldName = input.getFeed();
+ final String newName = feedNameMap.get(oldName);
+ if (!StringUtils.isEmpty(newName)) {
+ input.setFeed(newName);
+ }
+ }
+ for(Output output : getOutputs().getOutputs()) {
+ final String oldName = output.getFeed();
+ final String newName = feedNameMap.get(oldName);
+ if (!StringUtils.isEmpty(newName)) {
+ output.setFeed(newName);
+ }
+ }
+ }
+
+ /**
+ * Sets unique names for the process
+ * @return mapping of old name to new name
+ */
+ public Map<? extends String, ? extends String> setUniqueName() {
+ final String oldName = getName();
+ final String newName = oldName + Util.getUniqueString();
+ setName(newName);
+ final HashMap<String, String> nameMap = new HashMap<String, String>(1);
+ nameMap.put(oldName, newName);
+ return nameMap;
+ }
+
+ /**
+ * Method sets optional/compulsory inputs and outputs of process according to list of feed
+ * definitions and matching numeric parameters. Optional inputs are set first and then
+ * compulsory ones.
+ *
+ * @param newDataSets list of feed definitions
+ * @param numberOfInputs number of desired inputs
+ * @param numberOfOptionalInput how many inputs should be optional
+ * @param numberOfOutputs number of outputs
+ */
+ public void setProcessFeeds(List<String> newDataSets,
+ int numberOfInputs, int numberOfOptionalInput,
+ int numberOfOutputs) {
+ int numberOfOptionalSet = 0;
+ boolean isFirst = true;
+
+ Inputs is = new Inputs();
+ for (int i = 0; i < numberOfInputs; i++) {
+ Input in = new Input();
+ in.setEnd("now(0,0)");
+ in.setStart("now(0,-20)");
+ if (numberOfOptionalSet < numberOfOptionalInput) {
+ in.setOptional(true);
+ in.setName("inputData" + i);
+ numberOfOptionalSet++;
+ } else {
+ in.setOptional(false);
+ if (isFirst) {
+ in.setName("inputData");
+ isFirst = false;
+ } else
+ in.setName("inputData" + i);
+ }
+ in.setFeed(new FeedMerlin(newDataSets.get(i)).getName());
+ is.getInputs().add(in);
+ }
+
+ setInputs(is);
+ if (numberOfInputs == 0) {
+ setInputs(null);
+ }
+
+ Outputs os = new Outputs();
+ for (int i = 0; i < numberOfOutputs; i++) {
+ Output op = new Output();
+ op.setFeed(new FeedMerlin(newDataSets.get(numberOfInputs - i)).getName());
+ op.setName("outputData");
+ op.setInstance("now(0,0)");
+ os.getOutputs().add(op);
+ }
+ setOutputs(os);
+ setLateProcess(null);
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
new file mode 100644
index 0000000..84ecb9c
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java
@@ -0,0 +1,1044 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.bundle;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfaces;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Cluster;
+import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Inputs;
+import org.apache.falcon.entity.v0.process.LateProcess;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Outputs;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.entity.v0.process.Retry;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.regression.Entities.ClusterMerlin;
+import org.apache.falcon.regression.Entities.FeedMerlin;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.Assert;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A bundle abstraction.
+ */
+public class Bundle {
+
+ private static final String PRISM_PREFIX = "prism";
+ private static ColoHelper prismHelper = new ColoHelper(PRISM_PREFIX);
+ private static final Logger LOGGER = Logger.getLogger(Bundle.class);
+
+ private List<String> clusters;
+ private List<String> dataSets;
+ private String processData;
+
+ public void submitFeed() throws Exception {
+ submitClusters(prismHelper);
+
+ AssertUtil.assertSucceeded(
+ prismHelper.getFeedHelper().submitEntity(URLS.SUBMIT_URL, dataSets.get(0)));
+ }
+
+ public void submitAndScheduleFeed() throws Exception {
+ submitClusters(prismHelper);
+
+ AssertUtil.assertSucceeded(prismHelper.getFeedHelper().submitAndSchedule(
+ URLS.SUBMIT_AND_SCHEDULE_URL, dataSets.get(0)));
+ }
+
+ public void submitAndScheduleFeedUsingColoHelper(ColoHelper coloHelper) throws Exception {
+ submitFeed();
+
+ AssertUtil.assertSucceeded(
+ coloHelper.getFeedHelper().schedule(Util.URLS.SCHEDULE_URL, dataSets.get(0)));
+ }
+
+ public void submitAndScheduleAllFeeds()
+ throws JAXBException, IOException, URISyntaxException, AuthenticationException {
+ submitClusters(prismHelper);
+
+ for (String feed : dataSets) {
+ AssertUtil.assertSucceeded(
+ prismHelper.getFeedHelper().submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed));
+ }
+ }
+
+ public ServiceResponse submitProcess(boolean shouldSucceed) throws JAXBException,
+ IOException, URISyntaxException, AuthenticationException {
+ submitAndScheduleAllFeeds();
+ ServiceResponse r = prismHelper.getProcessHelper().submitEntity(URLS.SUBMIT_URL,
+ processData);
+ if (shouldSucceed) {
+ AssertUtil.assertSucceeded(r);
+ }
+ return r;
+ }
+
+ public void submitFeedsScheduleProcess() throws Exception {
+ submitClusters(prismHelper);
+
+ submitFeeds(prismHelper);
+
+ AssertUtil.assertSucceeded(prismHelper.getProcessHelper().submitAndSchedule(
+ URLS.SUBMIT_AND_SCHEDULE_URL, processData));
+ }
+
+
+ public void submitAndScheduleProcess() throws Exception {
+ submitAndScheduleAllFeeds();
+
+ AssertUtil.assertSucceeded(prismHelper.getProcessHelper().submitAndSchedule(
+ URLS.SUBMIT_AND_SCHEDULE_URL, processData));
+ }
+
+ public void submitAndScheduleProcessUsingColoHelper(ColoHelper coloHelper) throws Exception {
+ submitProcess(true);
+
+ AssertUtil.assertSucceeded(
+ coloHelper.getProcessHelper().schedule(URLS.SCHEDULE_URL, processData));
+ }
+
+ public List<String> getClusters() {
+ return clusters;
+ }
+
+ public Bundle(String clusterData, List<String> dataSets, String processData) {
+ this.dataSets = dataSets;
+ this.processData = processData;
+ this.clusters = new ArrayList<String>();
+ this.clusters.add(clusterData);
+ }
+
+ public Bundle(Bundle bundle, String prefix) {
+ this.dataSets = new ArrayList<String>(bundle.getDataSets());
+ this.processData = bundle.getProcessData();
+ this.clusters = new ArrayList<String>();
+ for (String cluster : bundle.getClusters()) {
+ this.clusters.add(Util.getEnvClusterXML(cluster, prefix));
+ }
+ }
+
+ public Bundle(Bundle bundle, ColoHelper helper) {
+ this(bundle, helper.getPrefix());
+ }
+
+ public void setClusterData(List<String> pClusters) {
+ this.clusters = new ArrayList<String>(pClusters);
+ }
+ /**
+ * Unwraps cluster element to string and writes it to bundle.
+ *
+ * @param c Cluster object to be unwrapped and set into bundle
+ */
+ public void writeClusterElement(org.apache.falcon.entity.v0.cluster.Cluster c) {
+ final List<String> clusters = new ArrayList<String>();
+ clusters.add(c.toString());
+ setClusterData(clusters);
+ }
+
+ /**
+ * Wraps bundle cluster in a Cluster object.
+ *
+ * @return cluster definition in a form of Cluster object
+ */
+ public org.apache.falcon.entity.v0.cluster.Cluster getClusterElement() {
+ return (org.apache.falcon.entity.v0.cluster.Cluster)
+ Entity.fromString(EntityType.CLUSTER, getClusters().get(0));
+ }
+
+
+ public List<String> getClusterNames() {
+ List<String> clusterNames = new ArrayList<String>();
+ for (String cluster : clusters) {
+ final org.apache.falcon.entity.v0.cluster.Cluster clusterObject =
+ Util.getClusterObject(cluster);
+ clusterNames.add(clusterObject.getName());
+ }
+ return clusterNames;
+ }
+
+ public List<String> getDataSets() {
+ return dataSets;
+ }
+
+ public void setDataSets(List<String> dataSets) {
+ this.dataSets = dataSets;
+ }
+
+ public String getProcessData() {
+ return processData;
+ }
+
+ public void setProcessData(String processData) {
+ this.processData = processData;
+ }
+
+ /**
+ * Generates unique entities within a bundle changing their names and names of dependant items
+ * to unique.
+ */
+ public void generateUniqueBundle() {
+ /* creating new names */
+ List<ClusterMerlin> clusterMerlinList = ClusterMerlin.fromString(clusters);
+ Map<String, String> clusterNameMap = new HashMap<String, String>();
+ for (ClusterMerlin clusterMerlin : clusterMerlinList) {
+ clusterNameMap.putAll(clusterMerlin.setUniqueName());
+ }
+
+ List<FeedMerlin> feedMerlinList = FeedMerlin.fromString(dataSets);
+ Map<String, String> feedNameMap = new HashMap<String, String>();
+ for (FeedMerlin feedMerlin : feedMerlinList) {
+ feedNameMap.putAll(feedMerlin.setUniqueName());
+ }
+
+ /* setting new names in feeds and process */
+ for (FeedMerlin feedMerlin : feedMerlinList) {
+ feedMerlin.renameClusters(clusterNameMap);
+ }
+
+ /* setting variables */
+ clusters.clear();
+ for (ClusterMerlin clusterMerlin : clusterMerlinList) {
+ clusters.add(clusterMerlin.toString());
+ }
+ dataSets.clear();
+ for (FeedMerlin feedMerlin : feedMerlinList) {
+ dataSets.add(feedMerlin.toString());
+ }
+
+ if (StringUtils.isNotEmpty(processData)) {
+ ProcessMerlin processMerlin = new ProcessMerlin(processData);
+ processMerlin.setUniqueName();
+ processMerlin.renameClusters(clusterNameMap);
+ processMerlin.renameFeeds(feedNameMap);
+ processData = processMerlin.toString();
+ }
+ }
+
+ public ServiceResponse submitBundle(ColoHelper helper)
+ throws JAXBException, IOException, URISyntaxException, AuthenticationException {
+
+ submitClusters(helper);
+
+ //lets submit all data first
+ submitFeeds(helper);
+
+ return helper.getProcessHelper().submitEntity(URLS.SUBMIT_URL, getProcessData());
+ }
+
+ /**
+ * Submit all the entities and schedule the process.
+ *
+ * @param helper helper of prism host
+ * @return message from schedule response
+ * @throws IOException
+ * @throws JAXBException
+ * @throws URISyntaxException
+ * @throws AuthenticationException
+ */
+ public String submitFeedsScheduleProcess(ColoHelper helper)
+ throws IOException, JAXBException, URISyntaxException,
+ AuthenticationException {
+ ServiceResponse submitResponse = submitBundle(helper);
+ if (submitResponse.getCode() == 400) {
+ return submitResponse.getMessage();
+ }
+
+ //lets schedule the damn thing now :)
+ ServiceResponse scheduleResult =
+ helper.getProcessHelper().schedule(URLS.SCHEDULE_URL, getProcessData());
+ AssertUtil.assertSucceeded(scheduleResult);
+ TimeUtil.sleepSeconds(7);
+ return scheduleResult.getMessage();
+ }
+
+ /**
+ * Sets the only process input.
+ *
+ * @param startEl its start in terms of EL expression
+ * @param endEl its end in terms of EL expression
+ */
+ public void setProcessInput(String startEl, String endEl) {
+ Process process = getProcessObject();
+ Inputs inputs = new Inputs();
+ Input input = new Input();
+ input.setFeed(Util.readEntityName(getInputFeedFromBundle()));
+ input.setStart(startEl);
+ input.setEnd(endEl);
+ input.setName("inputData");
+ inputs.getInputs().add(input);
+ process.setInputs(inputs);
+ this.setProcessData(process.toString());
+ }
+
+ public void setInvalidData() {
+ int index = 0;
+ Feed dataElement = (Feed) Entity.fromString(EntityType.FEED, dataSets.get(0));
+ if (!dataElement.getName().contains("raaw-logs16")) {
+ dataElement = (Feed) Entity.fromString(EntityType.FEED, dataSets.get(1));
+ index = 1;
+ }
+
+
+ String oldLocation = dataElement.getLocations().getLocations().get(0).getPath();
+ LOGGER.info("oldlocation: " + oldLocation);
+ dataElement.getLocations().getLocations().get(0).setPath(
+ oldLocation.substring(0, oldLocation.indexOf('$')) + "invalid/"
+ +
+ oldLocation.substring(oldLocation.indexOf('$')));
+ LOGGER.info("new location: " + dataElement.getLocations().getLocations().get(0).getPath());
+ dataSets.set(index, dataElement.toString());
+ }
+
+
+ public void setFeedValidity(String feedStart, String feedEnd, String feedName) {
+ Feed feedElement = getFeedElement(feedName);
+ feedElement.getClusters().getClusters().get(0).getValidity()
+ .setStart(TimeUtil.oozieDateToDate(feedStart).toDate());
+ feedElement.getClusters().getClusters().get(0).getValidity()
+ .setEnd(TimeUtil.oozieDateToDate(feedEnd).toDate());
+ writeFeedElement(feedElement, feedName);
+ }
+
+ public int getInitialDatasetFrequency() {
+ Feed dataElement = (Feed) Entity.fromString(EntityType.FEED, dataSets.get(0));
+ if (!dataElement.getName().contains("raaw-logs16")) {
+ dataElement = (Feed) Entity.fromString(EntityType.FEED, dataSets.get(1));
+ }
+ if (dataElement.getFrequency().getTimeUnit() == TimeUnit.hours) {
+ return (Integer.parseInt(dataElement.getFrequency().getFrequency())) * 60;
+ } else {
+ return (Integer.parseInt(dataElement.getFrequency().getFrequency()));
+ }
+ }
+
+ public Date getStartInstanceProcess(Calendar time) {
+ Process processElement = getProcessObject();
+ LOGGER.info("start instance: " + processElement.getInputs().getInputs().get(0).getStart());
+ return TimeUtil.getMinutes(processElement.getInputs().getInputs().get(0).getStart(), time);
+ }
+
+ public Date getEndInstanceProcess(Calendar time) {
+ Process processElement = getProcessObject();
+ LOGGER.info("end instance: " + processElement.getInputs().getInputs().get(0).getEnd());
+ LOGGER.info("timezone in getendinstance: " + time.getTimeZone().toString());
+ LOGGER.info("time in getendinstance: " + time.getTime());
+ return TimeUtil.getMinutes(processElement.getInputs().getInputs().get(0).getEnd(), time);
+ }
+
+ public void setDatasetInstances(String startInstance, String endInstance) {
+ Process processElement = getProcessObject();
+ processElement.getInputs().getInputs().get(0).setStart(startInstance);
+ processElement.getInputs().getInputs().get(0).setEnd(endInstance);
+ setProcessData(processElement.toString());
+ }
+
+ public void setProcessPeriodicity(int frequency, TimeUnit periodicity) {
+ Process processElement = getProcessObject();
+ Frequency frq = new Frequency("" + frequency, periodicity);
+ processElement.setFrequency(frq);
+ setProcessData(processElement.toString());
+ }
+
+ public void setProcessInputStartEnd(String start, String end) {
+ Process processElement = getProcessObject();
+ for (Input input : processElement.getInputs().getInputs()) {
+ input.setStart(start);
+ input.setEnd(end);
+ }
+ setProcessData(processElement.toString());
+ }
+
+ public void setOutputFeedPeriodicity(int frequency, TimeUnit periodicity) {
+ Process processElement = (Process) Entity.fromString(EntityType.PROCESS, processData);
+ String outputDataset = null;
+ int datasetIndex;
+ for (datasetIndex = 0; datasetIndex < dataSets.size(); datasetIndex++) {
+ outputDataset = dataSets.get(datasetIndex);
+ if (outputDataset.contains(processElement.getOutputs().getOutputs().get(0).getFeed())) {
+ break;
+ }
+ }
+
+ Feed feedElement = (Feed) Entity.fromString(EntityType.FEED, outputDataset);
+
+ feedElement.setFrequency(new Frequency("" + frequency, periodicity));
+ dataSets.set(datasetIndex, feedElement.toString());
+ LOGGER.info("modified o/p dataSet is: " + dataSets.get(datasetIndex));
+ }
+
+ public int getProcessConcurrency() {
+ return getProcessObject().getParallel();
+ }
+
+ public void setOutputFeedLocationData(String path) {
+ Process processElement = (Process) Entity.fromString(EntityType.PROCESS, processData);
+ String outputDataset = null;
+ int datasetIndex;
+ for (datasetIndex = 0; datasetIndex < dataSets.size(); datasetIndex++) {
+ outputDataset = dataSets.get(datasetIndex);
+ if (outputDataset.contains(processElement.getOutputs().getOutputs().get(0).getFeed())) {
+ break;
+ }
+ }
+
+ Feed feedElement = (Feed) Entity.fromString(EntityType.FEED, outputDataset);
+ Location l = new Location();
+ l.setPath(path);
+ l.setType(LocationType.DATA);
+ feedElement.getLocations().getLocations().set(0, l);
+ dataSets.set(datasetIndex, feedElement.toString());
+ LOGGER.info("modified location path dataSet is: " + dataSets.get(datasetIndex));
+ }
+
+ public void setProcessConcurrency(int concurrency) {
+ Process processElement = getProcessObject();
+ processElement.setParallel((concurrency));
+ setProcessData(processElement.toString());
+ }
+
+ public void setProcessWorkflow(String wfPath) {
+ setProcessWorkflow(wfPath, null);
+ }
+
+ public void setProcessWorkflow(String wfPath, EngineType engineType) {
+ setProcessWorkflow(wfPath, null, engineType);
+ }
+
+ public void setProcessWorkflow(String wfPath, String libPath, EngineType engineType) {
+ Process processElement = getProcessObject();
+ Workflow w = processElement.getWorkflow();
+ if (engineType != null) {
+ w.setEngine(engineType);
+ }
+ if (libPath != null) {
+ w.setLib(libPath);
+ }
+ w.setPath(wfPath);
+ processElement.setWorkflow(w);
+ setProcessData(processElement.toString());
+ }
+
+ public Process getProcessObject() {
+ return (Process) Entity.fromString(EntityType.PROCESS, getProcessData());
+ }
+
+ public Feed getFeedElement(String feedName) {
+ return (Feed) Entity.fromString(EntityType.FEED, getFeed(feedName));
+ }
+
+
+ public String getFeed(String feedName) {
+ for (String feed : getDataSets()) {
+ if (Util.readEntityName(feed).contains(feedName)) {
+ return feed;
+ }
+ }
+
+ return null;
+ }
+
+
+ public void writeFeedElement(Feed feedElement, String feedName) {
+ writeFeedElement(feedElement.toString(), feedName);
+ }
+
+
+ public void writeFeedElement(String feedString, String feedName) {
+ dataSets.set(dataSets.indexOf(getFeed(feedName)), feedString);
+ }
+
+
+ public void setInputFeedPeriodicity(int frequency, TimeUnit periodicity) {
+ String feedName = getInputFeedNameFromBundle();
+ Feed feedElement = getFeedElement(feedName);
+ Frequency frq = new Frequency("" + frequency, periodicity);
+ feedElement.setFrequency(frq);
+ writeFeedElement(feedElement, feedName);
+
+ }
+
+ public void setInputFeedValidity(String startInstance, String endInstance) {
+ String feedName = getInputFeedNameFromBundle();
+ this.setFeedValidity(startInstance, endInstance, feedName);
+ }
+
+ public void setOutputFeedValidity(String startInstance, String endInstance) {
+ String feedName = getOutputFeedNameFromBundle();
+ this.setFeedValidity(startInstance, endInstance, feedName);
+ }
+
+ public void setInputFeedDataPath(String path) {
+ String feedName = getInputFeedNameFromBundle();
+ Feed feedElement = getFeedElement(feedName);
+ final List<Location> locations = feedElement.getLocations().getLocations();
+ for (Location location : locations) {
+ if(location.getType() == LocationType.DATA) {
+ locations.get(0).setPath(path);
+ }
+ }
+ writeFeedElement(feedElement, feedName);
+ }
+
+ public String getFeedDataPathPrefix() {
+ Feed feedElement =
+ getFeedElement(getInputFeedNameFromBundle());
+ return Util.getPathPrefix(feedElement.getLocations().getLocations().get(0)
+ .getPath());
+ }
+
+ public void setProcessValidity(DateTime startDate, DateTime endDate) {
+
+ DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd/HH:mm");
+
+ String start = formatter.print(startDate).replace("/", "T") + "Z";
+ String end = formatter.print(endDate).replace("/", "T") + "Z";
+
+ Process processElement = (Process) Entity.fromString(EntityType.PROCESS, processData);
+
+ for (Cluster cluster : processElement.getClusters().getClusters()) {
+
+ org.apache.falcon.entity.v0.process.Validity validity =
+ new org.apache.falcon.entity.v0.process.Validity();
+ validity.setStart(TimeUtil.oozieDateToDate(start).toDate());
+ validity.setEnd(TimeUtil.oozieDateToDate(end).toDate());
+ cluster.setValidity(validity);
+
+ }
+
+ processData = processElement.toString();
+ }
+
+ public void setProcessValidity(String startDate, String endDate) {
+ Process processElement = (Process) Entity.fromString(EntityType.PROCESS, processData);
+
+ for (Cluster cluster : processElement.getClusters().getClusters()) {
+ org.apache.falcon.entity.v0.process.Validity validity =
+ new org.apache.falcon.entity.v0.process.Validity();
+ validity.setStart(TimeUtil.oozieDateToDate(startDate).toDate());
+ validity.setEnd(TimeUtil.oozieDateToDate(endDate).toDate());
+ cluster.setValidity(validity);
+
+ }
+
+ processData = processElement.toString();
+ }
+
+ public void setProcessLatePolicy(LateProcess lateProcess) {
+ Process processElement = (Process) Entity.fromString(EntityType.PROCESS, processData);
+ processElement.setLateProcess(lateProcess);
+ processData = processElement.toString();
+ }
+
+
+ public void verifyDependencyListing(ColoHelper coloHelper) {
+ //display dependencies of process:
+ String dependencies = coloHelper.getProcessHelper().getDependencies(
+ Util.readEntityName(getProcessData()));
+
+ //verify presence
+ for (String cluster : clusters) {
+ Assert.assertTrue(dependencies.contains("(cluster) " + Util.readEntityName(cluster)));
+ }
+ for (String feed : getDataSets()) {
+ Assert.assertTrue(dependencies.contains("(feed) " + Util.readEntityName(feed)));
+ for (String cluster : clusters) {
+ Assert.assertTrue(coloHelper.getFeedHelper().getDependencies(
+ Util.readEntityName(feed))
+ .contains("(cluster) " + Util.readEntityName(cluster)));
+ }
+ Assert.assertFalse(coloHelper.getFeedHelper().getDependencies(
+ Util.readEntityName(feed))
+ .contains("(process)" + Util.readEntityName(getProcessData())));
+ }
+
+
+ }
+
+ public void addProcessInput(String feed, String feedName) {
+ Process processElement = getProcessObject();
+ Input in1 = processElement.getInputs().getInputs().get(0);
+ Input in2 = new Input();
+ in2.setEnd(in1.getEnd());
+ in2.setFeed(feed);
+ in2.setName(feedName);
+ in2.setPartition(in1.getPartition());
+ in2.setStart(in1.getStart());
+ processElement.getInputs().getInputs().add(in2);
+ setProcessData(processElement.toString());
+ }
+
+ public void setProcessName(String newName) {
+ Process processElement = getProcessObject();
+ processElement.setName(newName);
+ setProcessData(processElement.toString());
+
+ }
+
+ public void setRetry(Retry retry) {
+ LOGGER.info("old process: " + Util.prettyPrintXml(processData));
+ Process processObject = getProcessObject();
+ processObject.setRetry(retry);
+ processData = processObject.toString();
+ LOGGER.info("updated process: " + Util.prettyPrintXml(processData));
+ }
+
+ public void setInputFeedAvailabilityFlag(String flag) {
+ String feedName = getInputFeedNameFromBundle();
+ Feed feedElement = getFeedElement(feedName);
+ feedElement.setAvailabilityFlag(flag);
+ writeFeedElement(feedElement, feedName);
+ }
+
+ public void setCLusterColo(String colo) {
+ org.apache.falcon.entity.v0.cluster.Cluster c =
+ getClusterElement();
+ c.setColo(colo);
+ writeClusterElement(c);
+
+ }
+
+ public void setClusterInterface(Interfacetype interfacetype, String value) {
+ org.apache.falcon.entity.v0.cluster.Cluster c =
+ getClusterElement();
+ final Interfaces interfaces = c.getInterfaces();
+ final List<Interface> interfaceList = interfaces.getInterfaces();
+ for (final Interface anInterface : interfaceList) {
+ if (anInterface.getType() == interfacetype) {
+ anInterface.setEndpoint(value);
+ }
+ }
+ writeClusterElement(c);
+ }
+
+ public void setInputFeedTableUri(String tableUri) {
+ final String feedStr = getInputFeedFromBundle();
+ Feed feed = (Feed) Entity.fromString(EntityType.FEED, feedStr);
+ final CatalogTable catalogTable = new CatalogTable();
+ catalogTable.setUri(tableUri);
+ feed.setTable(catalogTable);
+ writeFeedElement(feed, feed.getName());
+ }
+
+ public void setOutputFeedTableUri(String tableUri) {
+ final String feedStr = getOutputFeedFromBundle();
+ Feed feed = (Feed) Entity.fromString(EntityType.FEED, feedStr);
+ final CatalogTable catalogTable = new CatalogTable();
+ catalogTable.setUri(tableUri);
+ feed.setTable(catalogTable);
+ writeFeedElement(feed, feed.getName());
+ }
+
+ public void setCLusterWorkingPath(String clusterData, String path) {
+
+ org.apache.falcon.entity.v0.cluster.Cluster c =
+ (org.apache.falcon.entity.v0.cluster.Cluster)
+ Entity.fromString(EntityType.CLUSTER, clusterData);
+
+ for (int i = 0; i < c.getLocations().getLocations().size(); i++) {
+ if (c.getLocations().getLocations().get(i).getName().contains("working")) {
+ c.getLocations().getLocations().get(i).setPath(path);
+ }
+ }
+
+ //this.setClusterData(clusterData)
+ writeClusterElement(c);
+ }
+
+
+ public void submitClusters(ColoHelper helper)
+ throws JAXBException, IOException, URISyntaxException, AuthenticationException {
+ submitClusters(helper, null);
+ }
+
+ public void submitClusters(ColoHelper helper, String user)
+ throws JAXBException, IOException, URISyntaxException, AuthenticationException {
+ for (String cluster : this.clusters) {
+ AssertUtil.assertSucceeded(
+ helper.getClusterHelper().submitEntity(URLS.SUBMIT_URL, cluster, user));
+ }
+ }
+
+ public void submitFeeds(ColoHelper helper)
+ throws JAXBException, IOException, URISyntaxException, AuthenticationException {
+ for (String feed : this.dataSets) {
+ AssertUtil.assertSucceeded(
+ helper.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed));
+ }
+ }
+
+ public void addClusterToBundle(String clusterData, ClusterType type,
+ String startTime, String endTime) {
+ clusterData = setNewClusterName(clusterData);
+
+ this.clusters.add(clusterData);
+ //now to add clusters to feeds
+ for (int i = 0; i < dataSets.size(); i++) {
+ FeedMerlin feedObject = new FeedMerlin(dataSets.get(i));
+ org.apache.falcon.entity.v0.feed.Cluster cluster =
+ new org.apache.falcon.entity.v0.feed.Cluster();
+ cluster.setName(Util.getClusterObject(clusterData).getName());
+ cluster.setValidity(feedObject.getClusters().getClusters().get(0).getValidity());
+ cluster.setType(type);
+ cluster.setRetention(feedObject.getClusters().getClusters().get(0).getRetention());
+ feedObject.getClusters().getClusters().add(cluster);
+
+ dataSets.remove(i);
+ dataSets.add(i, feedObject.toString());
+
+ }
+
+ //now to add cluster to process
+ ProcessMerlin processObject = new ProcessMerlin(processData);
+ Cluster cluster = new Cluster();
+ cluster.setName(Util.getClusterObject(clusterData).getName());
+ org.apache.falcon.entity.v0.process.Validity v =
+ processObject.getClusters().getClusters().get(0).getValidity();
+ if (StringUtils.isNotEmpty(startTime)) {
+ v.setStart(TimeUtil.oozieDateToDate(startTime).toDate());
+ }
+ if (StringUtils.isNotEmpty(endTime)) {
+ v.setEnd(TimeUtil.oozieDateToDate(endTime).toDate());
+ }
+ cluster.setValidity(v);
+ processObject.getClusters().getClusters().add(cluster);
+ this.processData = processObject.toString();
+
+ }
+
+ private String setNewClusterName(String clusterData) {
+ ClusterMerlin clusterObj = new ClusterMerlin(clusterData);
+ clusterObj.setName(clusterObj.getName() + this.clusters.size() + 1);
+ return clusterObj.toString();
+ }
+
+ public void deleteBundle(ColoHelper helper) {
+
+ try {
+ helper.getProcessHelper().delete(URLS.DELETE_URL, getProcessData());
+ } catch (Exception e) {
+ e.getStackTrace();
+ }
+
+ for (String dataset : getDataSets()) {
+ try {
+ helper.getFeedHelper().delete(URLS.DELETE_URL, dataset);
+ } catch (Exception e) {
+ e.getStackTrace();
+ }
+ }
+
+ for (String cluster : this.getClusters()) {
+ try {
+ helper.getClusterHelper().delete(URLS.DELETE_URL, cluster);
+ } catch (Exception e) {
+ e.getStackTrace();
+ }
+ }
+
+
+ }
+
+ public String getProcessName() {
+
+ return Util.getProcessName(this.getProcessData());
+ }
+
+ public void setProcessLibPath(String libPath) {
+ Process processElement = getProcessObject();
+ Workflow wf = processElement.getWorkflow();
+ wf.setLib(libPath);
+ processElement.setWorkflow(wf);
+ setProcessData(processElement.toString());
+
+ }
+
+ public void setProcessTimeOut(int magnitude, TimeUnit unit) {
+ Process processElement = getProcessObject();
+ Frequency frq = new Frequency("" + magnitude, unit);
+ processElement.setTimeout(frq);
+ setProcessData(processElement.toString());
+ }
+
+ public static void submitCluster(Bundle... bundles)
+ throws IOException, URISyntaxException, AuthenticationException {
+
+ for (Bundle bundle : bundles) {
+ ServiceResponse r =
+ prismHelper.getClusterHelper()
+ .submitEntity(URLS.SUBMIT_URL, bundle.getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"), r.getMessage());
+ }
+
+
+ }
+
+ /**
+ * Generates unique entities definitions: clusters, feeds and process, populating them with
+ * desired values of different properties.
+ *
+ * @param numberOfClusters number of clusters on which feeds and process should run
+ * @param numberOfInputs number of desired inputs in process definition
+ * @param numberOfOptionalInput how many inputs should be optional
+ * @param inputBasePaths base data path for inputs
+ * @param numberOfOutputs number of outputs
+ * @param startTime start of feeds and process validity on every cluster
+ * @param endTime end of feeds and process validity on every cluster
+ */
+ public void generateRequiredBundle(int numberOfClusters, int numberOfInputs,
+ int numberOfOptionalInput,
+ String inputBasePaths, int numberOfOutputs, String startTime,
+ String endTime) {
+ //generate and set clusters
+ ClusterMerlin c = new ClusterMerlin(getClusters().get(0));
+ c.setUniqueName();
+ List<String> newClusters = new ArrayList<String>();
+ final String clusterName = c.getName();
+ for (int i = 0; i < numberOfClusters; i++) {
+ c.setName(clusterName + i);
+ newClusters.add(i, c.toString());
+ }
+ setClusterData(newClusters);
+
+ //generate and set newDataSets
+ List<String> newDataSets = new ArrayList<String>();
+ for (int i = 0; i < numberOfInputs; i++) {
+ final FeedMerlin feed = new FeedMerlin(getDataSets().get(0));
+ feed.setUniqueName();
+ feed.setFeedClusters(newClusters, inputBasePaths + "/input" + i, startTime, endTime);
+ newDataSets.add(feed.toString());
+ }
+ for (int i = 0; i < numberOfOutputs; i++) {
+ final FeedMerlin feed = new FeedMerlin(getDataSets().get(0));
+ feed.setUniqueName();
+ feed.setFeedClusters(newClusters, inputBasePaths + "/output" + i, startTime, endTime);
+ newDataSets.add(feed.toString());
+ }
+ setDataSets(newDataSets);
+
+ //add clusters and feed to process
+ ProcessMerlin processMerlin = new ProcessMerlin(getProcessData());
+ processMerlin.setUniqueName();
+ processMerlin.setProcessClusters(newClusters, startTime, endTime);
+ processMerlin.setProcessFeeds(newDataSets, numberOfInputs,
+ numberOfOptionalInput, numberOfOutputs);
+ setProcessData(processMerlin.toString());
+ }
+
+ public void submitAndScheduleBundle(ColoHelper helper, boolean checkSuccess)
+ throws IOException, JAXBException, URISyntaxException, AuthenticationException {
+
+ for (int i = 0; i < getClusters().size(); i++) {
+ ServiceResponse r = helper.getClusterHelper()
+ .submitEntity(URLS.SUBMIT_URL, getClusters().get(i));
+ if (checkSuccess) {
+ AssertUtil.assertSucceeded(r);
+ }
+ }
+ for (int i = 0; i < getDataSets().size(); i++) {
+ ServiceResponse r = helper.getFeedHelper().submitAndSchedule(
+ URLS.SUBMIT_AND_SCHEDULE_URL, getDataSets().get(i));
+ if (checkSuccess) {
+ AssertUtil.assertSucceeded(r);
+ }
+ }
+ ServiceResponse r = helper.getProcessHelper().submitAndSchedule(
+ URLS.SUBMIT_AND_SCHEDULE_URL, getProcessData());
+ if (checkSuccess) {
+ AssertUtil.assertSucceeded(r);
+ }
+ }
+
+ /**
+ * Changes names of process inputs.
+ *
+ * @param names desired names of inputs
+ */
+ public void setProcessInputNames(String... names) {
+ Process p = (Process) Entity.fromString(EntityType.PROCESS, processData);
+ for (int i = 0; i < names.length; i++) {
+ p.getInputs().getInputs().get(i).setName(names[i]);
+ }
+ processData = p.toString();
+ }
+
+ /**
+ * Adds optional property to process definition.
+ *
+ * @param properties desired properties to be added
+ */
+ public void addProcessProperty(Property... properties) {
+ Process p = (Process) Entity.fromString(EntityType.PROCESS, processData);
+ for (Property property : properties) {
+ p.getProperties().getProperties().add(property);
+ }
+ processData = p.toString();
+ }
+
+ /**
+ * Sets partition for each input, according to number of supplied partitions.
+ *
+ * @param partition partitions to be set
+ */
+ public void setProcessInputPartition(String... partition) {
+ Process p = (Process) Entity.fromString(EntityType.PROCESS, processData);
+ for (int i = 0; i < partition.length; i++) {
+ p.getInputs().getInputs().get(i).setPartition(partition[i]);
+ }
+ processData = p.toString();
+ }
+
+ /**
+ * Sets name(s) of the process output(s).
+ *
+ * @param names new names of the outputs
+ */
+ public void setProcessOutputNames(String... names) {
+ Process p = (Process) Entity.fromString(EntityType.PROCESS, processData);
+ Outputs outputs = p.getOutputs();
+ Assert.assertEquals(outputs.getOutputs().size(), names.length,
+ "Number of output names is not equal to number of outputs in process");
+ for (int i = 0; i < names.length; i++) {
+ outputs.getOutputs().get(i).setName(names[i]);
+ }
+ p.setOutputs(outputs);
+ processData = p.toString();
+ }
+
+ public void addInputFeedToBundle(String feedRefName, String feed, int templateInputIdx) {
+ this.getDataSets().add(feed);
+ String feedName = Util.readEntityName(feed);
+ String vProcessData = getProcessData();
+
+ Process processObject = (Process) Entity.fromString(EntityType.PROCESS, vProcessData);
+ final List<Input> processInputs = processObject.getInputs().getInputs();
+ Input templateInput = processInputs.get(templateInputIdx);
+ Input newInput = new Input();
+ newInput.setFeed(feedName);
+ newInput.setName(feedRefName);
+ newInput.setOptional(templateInput.isOptional());
+ newInput.setStart(templateInput.getStart());
+ newInput.setEnd(templateInput.getEnd());
+ newInput.setPartition(templateInput.getPartition());
+ processInputs.add(newInput);
+ setProcessData(processObject.toString());
+ }
+
+ public void addOutputFeedToBundle(String feedRefName, String feed, int templateOutputIdx) {
+ this.getDataSets().add(feed);
+ String feedName = Util.readEntityName(feed);
+ Process processObject = getProcessObject();
+ final List<Output> processOutputs = processObject.getOutputs().getOutputs();
+ Output templateOutput = processOutputs.get(templateOutputIdx);
+ Output newOutput = new Output();
+ newOutput.setFeed(feedName);
+ newOutput.setName(feedRefName);
+ newOutput.setInstance(templateOutput.getInstance());
+ processOutputs.add(newOutput);
+ setProcessData(processObject.toString());
+ }
+
+ public void setProcessProperty(String property, String value) {
+ ProcessMerlin process = new ProcessMerlin(this.getProcessData());
+ process.setProperty(property, value);
+ this.setProcessData(process.toString());
+
+ }
+
+ public String getDatasetPath() {
+ Feed dataElement = (Feed) Entity.fromString(EntityType.FEED, getDataSets().get(0));
+ if (!dataElement.getName().contains("raaw-logs16")) {
+ dataElement = (Feed) Entity.fromString(EntityType.FEED, getDataSets().get(1));
+ }
+ return dataElement.getLocations().getLocations().get(0).getPath();
+ }
+
+ public String getInputFeedFromBundle() {
+ String processData = getProcessData();
+ Process processObject = (Process) Entity.fromString(EntityType.PROCESS, processData);
+ for (Input input : processObject.getInputs().getInputs()) {
+ for (String feed : getDataSets()) {
+ if (Util.readEntityName(feed).equalsIgnoreCase(input.getFeed())) {
+ return feed;
+ }
+ }
+ }
+ return null;
+ }
+
+ public String getOutputFeedFromBundle() {
+ String processData = getProcessData();
+ Process processObject = (Process) Entity.fromString(EntityType.PROCESS, processData);
+
+ for (Output output : processObject.getOutputs().getOutputs()) {
+ for (String feed : getDataSets()) {
+ if (Util.readEntityName(feed).equalsIgnoreCase(output.getFeed())) {
+ return feed;
+ }
+ }
+ }
+ return null;
+ }
+
+ public String getOutputFeedNameFromBundle() {
+ String feedData = getOutputFeedFromBundle();
+ Feed feedObject = (Feed) Entity.fromString(EntityType.FEED, feedData);
+ return feedObject.getName();
+ }
+
+ public String getInputFeedNameFromBundle() {
+ String feedData = getInputFeedFromBundle();
+ Feed feedObject = (Feed) Entity.fromString(EntityType.FEED, feedData);
+ return feedObject.getName();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/FeedType.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/FeedType.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/FeedType.java
new file mode 100644
index 0000000..67a19b0
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/FeedType.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.enumsAndConstants;
+
+import org.apache.commons.lang.StringUtils;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * Enum to represent different feed periodicity.
+ */
+public enum FeedType {
+ MINUTELY("minutely", "yyyy/MM/dd/HH/mm", "${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
+ "year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR};minute=${MINUTE}") {
+ public DateTime addTime(DateTime dateTime, int amount) {
+ return dateTime.plusMinutes(amount);
+ }
+ },
+ HOURLY("hourly", "yyyy/MM/dd/HH", "${YEAR}/${MONTH}/${DAY}/${HOUR}",
+ "year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR}") {
+ @Override
+ public DateTime addTime(DateTime dateTime, int amount) {
+ return dateTime.plusHours(amount);
+ }
+ },
+ DAILY("daily", "yyyy/MM/dd", "${YEAR}/${MONTH}/${DAY}",
+ "year=${YEAR};month=${MONTH};day=${DAY}") {
+ @Override
+ public DateTime addTime(DateTime dateTime, int amount) {
+ return dateTime.plusDays(amount);
+ }
+ },
+ MONTHLY("monthly", "yyyy/MM", "${YEAR}/${MONTH}",
+ "year=${YEAR};month=${MONTH}") {
+ @Override
+ public DateTime addTime(DateTime dateTime, int amount) {
+ return dateTime.plusMonths(amount);
+ }
+ },
+ YEARLY("yearly", "yyyy", "${YEAR}",
+ "year=${YEAR}") {
+ @Override
+ public DateTime addTime(DateTime dateTime, int amount) {
+ return dateTime.plusYears(amount);
+ }
+ };
+
+ private final String value;
+ private final String pathValue;
+ private final String hcatPathValue;
+ private final DateTimeFormatter formatter;
+
+ private FeedType(String value, String format, String pathValue, String hcatPathValue) {
+ this.value = value;
+ formatter = DateTimeFormat.forPattern(format);
+ this.pathValue = pathValue;
+ this.hcatPathValue = hcatPathValue;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public String getPathValue() {
+ return pathValue;
+ }
+
+ public String getHcatPathValue() {
+ return hcatPathValue;
+ }
+
+ public DateTimeFormatter getFormatter() {
+ return formatter;
+ }
+
+ public int getDirDepth() {
+ return StringUtils.countMatches(pathValue, "/");
+ }
+
+ public abstract DateTime addTime(DateTime dateTime, int amount);
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/MerlinConstants.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/MerlinConstants.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/MerlinConstants.java
new file mode 100644
index 0000000..3c6779f
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/MerlinConstants.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.enumsAndConstants;
+
+import org.apache.falcon.regression.core.util.Config;
+import org.apache.falcon.request.RequestKeys;
+import org.apache.hadoop.conf.Configuration;
+import org.testng.Assert;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+
+/**
+ * Class for test contants.
+ */
+public final class MerlinConstants {
+ private MerlinConstants() {
+ }
+
+ private static final Logger LOGGER = Logger.getLogger(MerlinConstants.class);
+
+ public static final boolean IS_SECURE =
+ "kerberos".equals(new Configuration().get("hadoop.security.authentication", "simple"));
+ public static final String CURRENT_USER_NAME = System.getProperty("user.name");
+ private static final String CURRENT_USER_KEYTAB_STR =
+ "current_user_keytab";
+ private static final String USER_2_NAME_STR = "user2_name";
+ private static final String USER_2_KEYTAB_STR = "user2_keytab";
+ public static final String USER2_NAME;
+ private static HashMap<String, String> keyTabMap;
+ public static final String ACL_OWNER = Config.getProperty("ACL.OWNER", RequestKeys.CURRENT_USER);
+ public static final String ACL_GROUP = Config.getProperty("ACL.GROUP", "default");
+
+ /* initialize keyTabMap */
+ static {
+ final String currentUserKeytab = Config.getProperty(CURRENT_USER_KEYTAB_STR);
+ final String user2Name = Config.getProperty(USER_2_NAME_STR);
+ final String user2Keytab = Config.getProperty(USER_2_KEYTAB_STR);
+ LOGGER.info("CURRENT_USER_NAME: " + CURRENT_USER_NAME);
+ LOGGER.info("currentUserKeytab: " + currentUserKeytab);
+ LOGGER.info("user2Name: " + user2Name);
+ LOGGER.info("user2Keytab: " + user2Keytab);
+ USER2_NAME = user2Name;
+ keyTabMap = new HashMap<String, String>();
+ keyTabMap.put(CURRENT_USER_NAME, currentUserKeytab);
+ keyTabMap.put(user2Name, user2Keytab);
+ }
+
+ public static String getKeytabForUser(String user) {
+ Assert.assertTrue(keyTabMap.containsKey(user), "Unknown user: " + user);
+ return keyTabMap.get(user);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/RetentionUnit.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/RetentionUnit.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/RetentionUnit.java
new file mode 100644
index 0000000..d1ece18
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/RetentionUnit.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.enumsAndConstants;
+
+import org.joda.time.DateTime;
+
+public enum RetentionUnit {
+ MINUTES("minutes") {
+ @Override
+ public DateTime minusTime(DateTime dateTime, int amount) {
+ return dateTime.minusMinutes(amount);
+ }
+ }, HOURS("hours") {
+ @Override
+ public DateTime minusTime(DateTime dateTime, int amount) {
+ return dateTime.minusHours(amount);
+ }
+ }, DAYS("days") {
+ @Override
+ public DateTime minusTime(DateTime dateTime, int amount) {
+ return dateTime.minusDays(amount);
+ }
+ }, MONTHS("months") {
+ @Override
+ public DateTime minusTime(DateTime dateTime, int amount) {
+ return dateTime.minusMonths(amount);
+ }
+ }, YEARS("years") {
+ @Override
+ public DateTime minusTime(DateTime dateTime, int amount) {
+ return dateTime.minusYears(amount);
+ }
+ };
+
+ private String value;
+
+ private RetentionUnit(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public abstract DateTime minusTime(DateTime dateTime, int amount);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ClusterEntityHelperImpl.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ClusterEntityHelperImpl.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ClusterEntityHelperImpl.java
new file mode 100644
index 0000000..129ea62
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ClusterEntityHelperImpl.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.helpers;
+
+import org.apache.falcon.regression.core.interfaces.IEntityManagerHelper;
+import org.apache.falcon.regression.core.response.InstancesSummaryResult;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+public class ClusterEntityHelperImpl extends IEntityManagerHelper {
+
+
+ private static final String INVALID_ERR = "Not Valid for Cluster Entity";
+ private static final Logger logger = Logger.getLogger(ClusterEntityHelperImpl.class);
+
+ public ClusterEntityHelperImpl(String prefix) {
+ super(prefix);
+ }
+
+ public String getEntityType() {
+ return "cluster";
+ }
+
+ public String getEntityName(String entity) {
+ return Util.readEntityName(entity);
+ }
+
+ public ServiceResponse getStatus(URLS url, String data, String user) {
+ throw new UnsupportedOperationException(INVALID_ERR);
+ }
+
+ public ServiceResponse resume(URLS url, String data, String user) {
+ throw new UnsupportedOperationException(INVALID_ERR);
+ }
+
+ public ServiceResponse schedule(URLS url, String data, String user) {
+ throw new UnsupportedOperationException(INVALID_ERR);
+ }
+
+ public ServiceResponse submitAndSchedule(URLS url, String data, String user) {
+ throw new UnsupportedOperationException(INVALID_ERR);
+ }
+
+ public ServiceResponse suspend(URLS url, String data, String user) {
+ throw new UnsupportedOperationException(INVALID_ERR);
+ }
+
+ @Override
+ public InstancesResult getRunningInstance(
+ URLS processRunningInstance, String name, String user) {
+ throw new UnsupportedOperationException(INVALID_ERR);
+ }
+
+ @Override
+ public InstancesResult getProcessInstanceStatus(
+ String readEntityName, String params, String user) {
+ throw new UnsupportedOperationException(INVALID_ERR);
+ }
+
+
+ public InstancesResult getProcessInstanceSuspend(
+ String readEntityName, String params, String user) {
+ throw new UnsupportedOperationException(INVALID_ERR);
+ }
+
+ @Override
+ public ServiceResponse update(String oldEntity, String newEntity, String user) {
+ throw new UnsupportedOperationException(INVALID_ERR);
+ }
+
+ @Override
+ public ServiceResponse update(String oldEntity, String newEntity, String updateTime,
+ String user) throws IOException {
+ throw new UnsupportedOperationException(INVALID_ERR);
+ }
+
+ @Override
+ public InstancesResult getProcessInstanceKill(String readEntityName,
+ String string, String user) {
+ throw new UnsupportedOperationException(INVALID_ERR);
+ }
+
+ @Override
+ public InstancesResult getProcessInstanceRerun(
+ String readEntityName, String string, String user) {
+ throw new UnsupportedOperationException(INVALID_ERR);
+ }
+
+ @Override
+ public InstancesResult getProcessInstanceResume(
+ String readEntityName, String string, String user) {
+ throw new UnsupportedOperationException(INVALID_ERR);
+ }
+
+ @Override
+ public InstancesSummaryResult getInstanceSummary(String readEntityName,
+ String string
+ ) throws
+ IOException, URISyntaxException {
+ logger.info(INVALID_ERR);
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ColoHelper.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ColoHelper.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ColoHelper.java
new file mode 100644
index 0000000..05b4f73
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/ColoHelper.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.helpers;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.interfaces.EntityHelperFactory;
+import org.apache.falcon.regression.core.interfaces.IEntityManagerHelper;
+import org.apache.log4j.Logger;
+
+public class ColoHelper {
+ private static final Logger logger = Logger.getLogger(ColoHelper.class);
+ private final IEntityManagerHelper clusterHelper;
+ private final IEntityManagerHelper processHelper;
+ private final IEntityManagerHelper feedHelper;
+ private String prefix;
+
+ public ColoHelper(String prefix) {
+ this.prefix = prefix;
+ clusterHelper = EntityHelperFactory.getEntityHelper(EntityType.CLUSTER, prefix);
+ processHelper = EntityHelperFactory.getEntityHelper(EntityType.PROCESS, prefix);
+ feedHelper = EntityHelperFactory.getEntityHelper(EntityType.FEED, prefix);
+ }
+
+ public IEntityManagerHelper getClusterHelper() {
+ return clusterHelper;
+ }
+
+ public IEntityManagerHelper getFeedHelper() {
+ return feedHelper;
+ }
+
+ public IEntityManagerHelper getProcessHelper() {
+ return processHelper;
+ }
+
+ public String getPrefix() {
+ return prefix;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/DataEntityHelperImpl.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/DataEntityHelperImpl.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/DataEntityHelperImpl.java
new file mode 100644
index 0000000..7800c50
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/DataEntityHelperImpl.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.helpers;
+
+import org.apache.falcon.regression.core.interfaces.IEntityManagerHelper;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.log4j.Logger;
+
+public class DataEntityHelperImpl extends IEntityManagerHelper {
+
+ private static final Logger logger = Logger.getLogger(DataEntityHelperImpl.class);
+
+ public DataEntityHelperImpl(String prefix) {
+ super(prefix);
+ }
+
+ public String getEntityType() {
+ return "feed";
+ }
+
+ public String getEntityName(String entity) {
+ return Util.readEntityName(entity);
+ }
+
+}
+