You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:21 UTC

[22/27] adding falcon-regression

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/lineage/VerticesResult.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/lineage/VerticesResult.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/lineage/VerticesResult.java
new file mode 100644
index 0000000..a35cd2d
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/lineage/VerticesResult.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.response.lineage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class VerticesResult extends GraphResult {
+    List<Vertex> results;
+
+    public List<Vertex> getResults() {
+        return results;
+    }
+
+    @Override
+    public String toString() {
+        return "AllVertices{" +
+            "totalSize=" + totalSize +
+            ", results=" + results +
+            '}';
+    }
+
+    public List<Vertex> filterByType(Vertex.VERTEX_TYPE vertex_type) {
+        return filterVerticesByType(vertex_type, results);
+    }
+
+    public List<Vertex> filterVerticesByType(Vertex.VERTEX_TYPE vertex_type,
+                                             List<Vertex> vertexList) {
+        List<Vertex> result = new ArrayList<Vertex>();
+        for (Vertex vertex : vertexList) {
+            if(vertex.getType() == vertex_type) {
+                result.add(vertex);
+            }
+        }
+        return result;
+    }
+
+    public List<Vertex> filterByName(String name) {
+        return filterVerticesByName(name, results);
+    }
+
+    public List<Vertex> filterVerticesByName(String name, List<Vertex> vertexList) {
+        List<Vertex> result = new ArrayList<Vertex>();
+        for (Vertex vertex : vertexList) {
+            if(vertex.getName().equals(name)) {
+                result.add(vertex);
+            }
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/Brother.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/Brother.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/Brother.java
new file mode 100755
index 0000000..77a5cc6
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/Brother.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.supportClasses;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.interfaces.IEntityManagerHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.testng.TestNGException;
+import org.apache.log4j.Logger;
+
+public class Brother extends Thread {
+    String operation;
+    String data;
+    URLS url;
+    ServiceResponse output;
+    private static final Logger logger = Logger.getLogger(Brother.class);
+
+    public ServiceResponse getOutput() {
+        return output;
+    }
+
+    IEntityManagerHelper entityManagerHelper;
+
+    public Brother(String threadName, String operation, EntityType entityType, ThreadGroup tGroup,
+                   Bundle b, ColoHelper p, URLS url) {
+        super(tGroup, threadName);
+        this.operation = operation;
+        switch (entityType) {
+            case PROCESS:
+                this.data = b.getProcessData();
+                this.entityManagerHelper = p.getProcessHelper();
+                break;
+            case CLUSTER:
+                this.entityManagerHelper = p.getClusterHelper();
+                this.data = b.getClusters().get(0);
+                break;
+            case FEED:
+                this.entityManagerHelper = p.getFeedHelper();
+                this.data = b.getDataSets().get(0);
+                break;
+            default:
+                logger.error("Unexpected entityType=" + entityType);
+        }
+        this.url = url;
+        this.output = new ServiceResponse();
+    }
+
+    public void run() {
+        try {
+            sleep(50L);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new TestNGException(e.getMessage());
+        }
+        logger.info("Brother " + this.getName() + " will be executing " + operation);
+        try {
+            switch (url) {
+                case SUBMIT_URL:
+                    output = entityManagerHelper.submitEntity(url, data);
+                    break;
+                case GET_ENTITY_DEFINITION:
+                    output = entityManagerHelper.getEntityDefinition(url, data);
+                    break;
+                case DELETE_URL:
+                    output = entityManagerHelper.delete(url, data);
+                    break;
+                case SUSPEND_URL:
+                    output = entityManagerHelper.suspend(url, data);
+                    break;
+                case SCHEDULE_URL:
+                    output = entityManagerHelper.schedule(url, data);
+                    break;
+                case RESUME_URL:
+                    output = entityManagerHelper.resume(url, data);
+                    break;
+                case SUBMIT_AND_SCHEDULE_URL:
+                    output = entityManagerHelper.submitAndSchedule(url, data);
+                    break;
+                case STATUS_URL:
+                    output = entityManagerHelper.getStatus(url, data);
+                    break;
+                default:
+                    logger.error("Unexpected url: " + url);
+                    break;
+            }
+            logger.info("Brother " + getName() + "'s response to the " + operation + " is: " +
+                output);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/HadoopFileEditor.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/HadoopFileEditor.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/HadoopFileEditor.java
new file mode 100644
index 0000000..0ebcb53
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/HadoopFileEditor.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.supportClasses;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedWriter;
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HadoopFileEditor {
+    private static final Logger logger = Logger.getLogger(HadoopFileEditor.class);
+    FileSystem fs;
+    List<String> paths;
+    List<String> files;
+
+    public HadoopFileEditor(FileSystem fs) {
+        this.fs = fs;
+        paths = new ArrayList<String>();
+        files = new ArrayList<String>();
+    }
+
+    /*
+    method to edit a file present on HDFS. Path is the location on HDFS,
+    2nd param is the first instance of string after u want ur tesxt to be
+    inserted, 3rd param is the text u want to insert
+     */
+    public void edit(String path, String putAfterString, String toBeInserted) throws IOException {
+        paths.add(path);
+        String currentFile = Util.getFileNameFromPath(path);
+        files.add(currentFile);
+        FileUtils.deleteQuietly(new File(currentFile));
+        FileUtils.deleteQuietly(new File("." + currentFile + ".crc"));
+        FileUtils.deleteQuietly(new File(currentFile + ".bck"));
+        FileUtils.deleteQuietly(new File("tmp"));
+
+        Path file = new Path(path);
+        //check if currentFile exists or not
+        if (fs.exists(file)) {
+            fs.copyToLocalFile(file, new Path(currentFile));
+            FileUtils.copyFile(new File(currentFile), new File(currentFile + ".bck"));
+            BufferedWriter bufwriter = new BufferedWriter(new FileWriter
+                ("tmp"));
+            BufferedReader br = new BufferedReader(new FileReader(currentFile));
+            String line;
+            boolean isInserted = false;
+            while ((line = br.readLine()) != null) {
+                bufwriter.write(line);
+                bufwriter.write('\n');
+                if (line.contains(putAfterString) && !isInserted) {
+                    bufwriter.write(toBeInserted);
+                    isInserted = true;
+                }
+            }
+            br.close();
+            bufwriter.close();
+            FileUtils.deleteQuietly(new File(currentFile));
+            FileUtils.copyFile(new File("tmp"), new File(currentFile));
+            FileUtils.deleteQuietly(new File("tmp"));
+
+            fs.delete(file, false);
+            File crcFile = new File("." + currentFile + ".crc");
+            if (crcFile.exists())
+                crcFile.delete();
+            fs.copyFromLocalFile(new Path(currentFile), file);
+        } else {
+            logger.info("Nothing to do, " + currentFile + " does not exists");
+        }
+    }
+
+    /*
+    puts back the original file to HDFS that was editied by edit function
+     */
+    public void restore() throws IOException {
+        for (int i = 0; i < paths.size(); i++) {
+            fs.delete(new Path(paths.get(i)), false);
+            FileUtils.deleteQuietly(new File(files.get(i)));
+            FileUtils.copyFile(new File(files.get(i) + ".bck"),
+                new File(files.get(i)));
+            fs.copyFromLocalFile(new Path(files.get(i)), new Path(paths.get(i)));
+            FileUtils.deleteQuietly(new File(files.get(i)));
+            FileUtils.deleteQuietly(new File(files.get(i) + ".bck"));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/JmsMessageConsumer.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/JmsMessageConsumer.java
new file mode 100644
index 0000000..d7e952b
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/JmsMessageConsumer.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.supportClasses;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.log4j.Logger;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+
+public class JmsMessageConsumer extends Thread {
+    /*URL of the JMS server
+    brokerURL = "tcp://host:61616?daemon=true";
+    ActiveMQConnection.DEFAULT_BROKER_URL;
+    Name of the queue we will receive messages from
+    String subject = "IVORY.TOPIC";*/
+
+    private static final Logger logger = Logger.getLogger(JmsMessageConsumer.class);
+    private static final int MAX_MESSAGE_COUNT = 1000;
+
+    final String brokerURL;
+    final String topicName;
+    final List<MapMessage> receivedMessages;
+
+    public List<MapMessage> getReceivedMessages() {
+        return receivedMessages;
+    }
+
+    public JmsMessageConsumer(String topicName, String brokerURL) {
+        super(topicName);
+        this.topicName = topicName;
+        this.brokerURL = brokerURL;
+        receivedMessages = new ArrayList<MapMessage>();
+    }
+
+    @Override
+    public void run() {
+        try {
+            // Getting JMS connection from the server
+            Connection connection = new ActiveMQConnectionFactory(brokerURL).createConnection();
+            connection.start();
+
+            // Creating session for sending messages
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createTopic(topicName);
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            try {
+                logger.info("Starting to receive messages.");
+                int count = 0;
+                for (; count < MAX_MESSAGE_COUNT; ++ count) {
+                    Message message = consumer.receive(); //blocking call
+                    if (message == null) {
+                        logger.info("Received empty message, count = " + count);
+                    } else {
+                        logger.info("Received message, id = " + message.getJMSMessageID());
+                        receivedMessages.add((MapMessage) message);
+                    }
+                }
+                if (count >= MAX_MESSAGE_COUNT) {
+                    logger.warn("Not reading more messages, already read " + count + " messages.");
+                }
+            } finally {
+                logger.info("Stopping to receive messages.");
+                connection.close();
+            }
+        } catch (Exception e) {
+            logger.info("caught exception: " + ExceptionUtils.getStackTrace(e));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
new file mode 100644
index 0000000..db73eda
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.response.APIResult;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.testng.Assert;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Util methods for assert.
+ */
+public final class AssertUtil {
+
+    private AssertUtil() {
+        throw new AssertionError("Instantiating utility class...");
+    }
+
+    private static final Logger LOGGER = Logger.getLogger(AssertUtil.class);
+
+    /**
+     * Checks that any path in list doesn't contains a string.
+     *
+     * @param paths list of paths
+     * @param shouldNotBePresent string that shouldn't be present
+     */
+    public static void failIfStringFoundInPath(
+        List<Path> paths, String... shouldNotBePresent) {
+        for (Path path : paths) {
+            for (String aShouldNotBePresent : shouldNotBePresent) {
+                if (path.toUri().toString().contains(aShouldNotBePresent)) {
+                    Assert.fail("String " + aShouldNotBePresent + " was not expected in path "
+                            +
+                            path.toUri().toString());
+                }
+            }
+        }
+    }
+
+    /**
+     * Checks that two lists have same size.
+     *
+     * @param expected expected list
+     * @param actual   actual list
+     */
+    public static void checkForListSizes(List<?> expected, List<?> actual) {
+        if (expected.size() != actual.size()) {
+            LOGGER.info("expected = " + expected);
+        }
+        checkForListSize(actual, expected.size());
+    }
+
+    /**
+     * Checks that two lists have same size.
+     *
+     * @param elements list of elements
+     * @param expectedSize expected size of the list
+     */
+    public static void checkForListSize(List<?> elements, int expectedSize) {
+        if (elements.size() != expectedSize) {
+            LOGGER.info("expectedSize = " + expectedSize);
+            LOGGER.info("elements.size() = " + elements.size());
+            LOGGER.info("elements = " + elements);
+        }
+        Assert.assertEquals(elements.size(), expectedSize,
+            "Size of expected and actual list don't match.");
+    }
+
+    /**
+     * Checks that two lists has expected diff element.
+     *
+     * @param initialState first list
+     * @param finalState   second list
+     * @param filename     expected diff element
+     * @param expectedDiff diff count (positive for new elements)
+     */
+    public static void compareDataStoreStates(List<String> initialState,
+                                              List<String> finalState, String filename,
+                                              int expectedDiff) {
+
+        if (expectedDiff > -1) {
+            finalState.removeAll(initialState);
+            Assert.assertEquals(finalState.size(), expectedDiff);
+            if (expectedDiff != 0) {
+                Assert.assertTrue(finalState.get(0).contains(filename));
+            }
+        } else {
+            expectedDiff = expectedDiff * -1;
+            initialState.removeAll(finalState);
+            Assert.assertEquals(initialState.size(), expectedDiff);
+            if (expectedDiff != 0) {
+                Assert.assertTrue(initialState.get(0).contains(filename));
+            }
+        }
+
+
+    }
+
+    /**
+     * Checks that two lists has expected diff element.
+     *
+     * @param initialState first list
+     * @param finalState   second list
+     * @param expectedDiff diff count (positive for new elements)
+     */
+    public static void compareDataStoreStates(List<String> initialState,
+                                              List<String> finalState, int expectedDiff) {
+
+        if (expectedDiff > -1) {
+            finalState.removeAll(initialState);
+            Assert.assertEquals(finalState.size(), expectedDiff);
+
+        } else {
+            expectedDiff = expectedDiff * -1;
+            initialState.removeAll(finalState);
+            Assert.assertEquals(initialState.size(), expectedDiff);
+
+        }
+
+
+    }
+
+    /**
+     * Checks that ServiceResponse status is SUCCEEDED.
+     *
+     * @param response ServiceResponse
+     * @throws JAXBException
+     */
+    public static void assertSucceeded(ServiceResponse response) throws JAXBException {
+        Assert.assertEquals(Util.parseResponse(response).getStatus(),
+            APIResult.Status.SUCCEEDED, "Status should be SUCCEEDED");
+        Assert.assertEquals(Util.parseResponse(response).getStatusCode(), 200,
+            "Status code should be 200");
+        Assert.assertNotNull(Util.parseResponse(response).getMessage(), "Status message is null");
+    }
+
+    /**
+     * Checks that ProcessInstancesResult status is SUCCEEDED.
+     *
+     * @param response ProcessInstancesResult
+     */
+    public static void assertSucceeded(InstancesResult response) {
+        Assert.assertNotNull(response.getMessage());
+        Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED,
+            "Status should be SUCCEEDED");
+    }
+
+    /**
+     * Checks that ServiceResponse status is status FAILED.
+     *
+     * @param response ServiceResponse
+     * @param message  message for exception
+     * @throws JAXBException
+     */
+    public static void assertFailed(final ServiceResponse response, final String message)
+        throws JAXBException {
+        assertFailedWithStatus(response, 400, message);
+    }
+
+    /**
+     * Checks that ServiceResponse status is status FAILED with some status code.
+     *
+     * @param response   ServiceResponse
+     * @param statusCode expected status code
+     * @param message    message for exception
+     * @throws JAXBException
+     */
+    public static void assertFailedWithStatus(final ServiceResponse response, final int statusCode,
+                                              final String message) throws JAXBException {
+        Assert.assertNotEquals(response.message, "null", "response message should not be null");
+        Assert.assertEquals(Util.parseResponse(response).getStatus(),
+            APIResult.Status.FAILED, message);
+        Assert.assertEquals(Util.parseResponse(response).getStatusCode(), statusCode,
+            message);
+        Assert.assertNotNull(Util.parseResponse(response).getRequestId(), "RequestId is null");
+    }
+
+    /**
+     * Checks that ServiceResponse status is status PARTIAL.
+     *
+     * @param response ServiceResponse
+     * @throws JAXBException
+     */
+    public static void assertPartial(ServiceResponse response) throws JAXBException {
+        Assert.assertEquals(Util.parseResponse(response).getStatus(), APIResult.Status.PARTIAL);
+        Assert.assertEquals(Util.parseResponse(response).getStatusCode(), 400);
+        Assert.assertNotNull(Util.parseResponse(response).getMessage());
+    }
+
+    /**
+     * Checks that ServiceResponse status is status FAILED with status code 400.
+     *
+     * @param response ServiceResponse
+     * @throws JAXBException
+     */
+    public static void assertFailed(ServiceResponse response) throws JAXBException {
+        Assert.assertNotEquals(response.message, "null", "response message should not be null");
+
+        Assert.assertEquals(Util.parseResponse(response).getStatus(), APIResult.Status.FAILED);
+        Assert.assertEquals(Util.parseResponse(response).getStatusCode(), 400);
+    }
+
+    /**
+     * Checks that status of some entity job is equal to expected. Method can wait
+     * 100 seconds for expected status.
+     *
+     * @param oozieClient    OozieClient
+     * @param entityType     FEED or PROCESS
+     * @param data           feed or proceess XML
+     * @param expectedStatus expected Job.Status of entity
+     * @throws OozieClientException
+     */
+    public static void checkStatus(OozieClient oozieClient, EntityType entityType, String data,
+                                   Job.Status expectedStatus)
+        throws OozieClientException {
+        String name = null;
+        if (entityType == EntityType.FEED) {
+            name = Util.readEntityName(data);
+        } else if (entityType == EntityType.PROCESS) {
+            name = Util.readEntityName(data);
+        }
+        Assert.assertEquals(
+            OozieUtil.verifyOozieJobStatus(oozieClient, name, entityType, expectedStatus), true,
+            "Status should be " + expectedStatus);
+    }
+
+    /**
+     * Checks that status of some entity job is equal to expected. Method can wait
+     * 100 seconds for expected status.
+     *
+     * @param oozieClient    OozieClient
+     * @param entityType     FEED or PROCESS
+     * @param bundle         Bundle with feed or process data
+     * @param expectedStatus expected Job.Status of entity
+     * @throws OozieClientException
+     */
+    public static void checkStatus(OozieClient oozieClient, EntityType entityType, Bundle bundle,
+                                   Job.Status expectedStatus)
+        throws OozieClientException {
+        String data = null;
+        if (entityType == EntityType.FEED) {
+            data = bundle.getDataSets().get(0);
+        } else if (entityType == EntityType.PROCESS) {
+            data = bundle.getProcessData();
+        }
+        checkStatus(oozieClient, entityType, data, expectedStatus);
+    }
+
+    /**
+     * Checks that status of some entity job is NOT equal to expected.
+     *
+     * @param oozieClient    OozieClient
+     * @param entityType     FEED or PROCESS
+     * @param data           feed or proceess XML
+     * @param expectedStatus expected Job.Status of entity
+     * @throws OozieClientException
+     */
+    public static void checkNotStatus(OozieClient oozieClient, EntityType entityType, String data,
+                                      Job.Status expectedStatus)
+        throws OozieClientException {
+        String processName = null;
+        if (entityType == EntityType.FEED) {
+            processName = Util.readEntityName(data);
+        } else if (entityType == EntityType.PROCESS) {
+            processName = Util.readEntityName(data);
+        }
+        Assert.assertNotEquals(OozieUtil.getOozieJobStatus(oozieClient, processName,
+            entityType), expectedStatus, "Status should not be " + expectedStatus);
+    }
+
+    /**
+     * Checks that status of some entity job is NOT equal to expected.
+     *
+     * @param oozieClient    OozieClient
+     * @param entityType     FEED or PROCESS
+     * @param bundle         Bundle with feed or process data
+     * @param expectedStatus expected Job.Status of entity
+     * @throws OozieClientException
+     */
+    public static void checkNotStatus(OozieClient oozieClient, EntityType entityType,
+                                      Bundle bundle, Job.Status expectedStatus)
+        throws OozieClientException {
+        String data = null;
+        if (entityType == EntityType.FEED) {
+            data = bundle.getDataSets().get(0);
+        } else if (entityType == EntityType.PROCESS) {
+            data = bundle.getProcessData();
+        }
+        checkNotStatus(oozieClient, entityType, data, expectedStatus);
+    }
+
+    /**
+     * Checks size of the content a two locations.
+     *
+     * @param firstPath  path to the first location
+     * @param secondPath path to the second location
+     * @param fs         hadoop file system for the locations
+     * @throws IOException
+     */
+    public static void checkContentSize(String firstPath, String secondPath, FileSystem fs) throws
+        IOException {
+        final ContentSummary firstSummary = fs.getContentSummary(new Path(firstPath));
+        final ContentSummary secondSummary = fs.getContentSummary(new Path(secondPath));
+        LOGGER.info(firstPath + " : firstSummary = " + firstSummary.toString(false));
+        LOGGER.info(secondPath + " : secondSummary = " + secondSummary.toString(false));
+        Assert.assertEquals(firstSummary.getLength(), secondSummary.getLength(),
+            "Contents at the two locations don't have same size.");
+    }
+
+    /**
+     * Fail the test because of the supplied exception.
+     * @param e exception
+     */
+    public static void fail(Exception e) {
+        LOGGER.info("Got exception: " + ExceptionUtils.getStackTrace(e));
+        Assert.fail("Failing because of exception.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
new file mode 100644
index 0000000..1f73523
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * util methods related to bundle.
+ */
+public final class BundleUtil {
+    private BundleUtil() {
+        throw new AssertionError("Instantiating utility class...");
+    }
+    private static final Logger LOGGER = Logger.getLogger(BundleUtil.class);
+
+    public static Bundle readLateDataBundle() throws IOException {
+        return readBundleFromFolder("LateDataBundles");
+    }
+
+    public static Bundle readRetryBundle() throws IOException {
+        return readBundleFromFolder("RetryTests");
+    }
+
+    public static Bundle readRetentionBundle() throws IOException {
+        return readBundleFromFolder("RetentionBundles");
+    }
+
+    public static Bundle readELBundle() throws IOException {
+        return readBundleFromFolder("ELbundle");
+    }
+
+    public static Bundle readHCatBundle() throws IOException {
+        return readBundleFromFolder("hcat");
+    }
+
+    public static Bundle readHCat2Bundle() throws IOException {
+        return readBundleFromFolder("hcat_2");
+    }
+
+    public static Bundle readLocalDCBundle() throws IOException {
+        return readBundleFromFolder("LocalDC_feedReplicaltion_BillingRC");
+    }
+
+    public static Bundle readImpressionRCBundle() throws IOException {
+        return readBundleFromFolder("impressionRC");
+    }
+
+    public static Bundle readUpdateBundle() throws IOException {
+        return readBundleFromFolder("updateBundle");
+    }
+
+    private static Bundle readBundleFromFolder(final String folderPath) throws IOException {
+        LOGGER.info("Loading xmls from directory: " + folderPath);
+        File directory = null;
+        try {
+            directory = new File(BundleUtil.class.getResource("/" + folderPath).toURI());
+        } catch (URISyntaxException e) {
+            Assert.fail("could not find dir: " + folderPath);
+        }
+        final Collection<File> list = FileUtils.listFiles(directory, new String[] {"xml"}, true);
+        File[] files = list.toArray(new File[list.size()]);
+        Arrays.sort(files);
+        String clusterData = "";
+        final List<String> dataSets = new ArrayList<String>();
+        String processData = "";
+
+        for (File file : files) {
+            LOGGER.info("Loading data from path: " + file.getAbsolutePath());
+            final String data = IOUtils.toString(file.toURI());
+
+            if (data.contains("uri:ivory:cluster:0.1") || data.contains("uri:falcon:cluster:0.1")) {
+                LOGGER.info("data been added to cluster");
+                clusterData = data;
+            } else if (data.contains("uri:ivory:feed:0.1")
+                    ||
+                data.contains("uri:falcon:feed:0.1")) {
+                LOGGER.info("data been added to feed");
+                dataSets.add(InstanceUtil.setFeedACL(data));
+            } else if (data.contains("uri:ivory:process:0.1")
+                    ||
+                data.contains("uri:falcon:process:0.1")) {
+                LOGGER.info("data been added to process");
+                processData = data;
+            }
+        }
+        Assert.assertNotNull(clusterData, "expecting cluster data to be non-empty");
+        Assert.assertTrue(!dataSets.isEmpty(), "expecting feed data to be non-empty");
+        return new Bundle(clusterData, dataSets, processData);
+    }
+
+    public static void submitAllClusters(ColoHelper prismHelper, Bundle... b)
+        throws IOException, URISyntaxException, AuthenticationException {
+        for (Bundle aB : b) {
+            ServiceResponse r = prismHelper.getClusterHelper()
+                .submitEntity(Util.URLS.SUBMIT_URL, aB.getClusters().get(0));
+            Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/CleanupUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/CleanupUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/CleanupUtil.java
new file mode 100644
index 0000000..5415083
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/CleanupUtil.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.interfaces.IEntityManagerHelper;
+import org.apache.falcon.regression.core.response.EntitiesResult;
+import org.apache.falcon.regression.core.response.EntityResult;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * util methods related to conf.
+ */
+public final class CleanupUtil {
+    private CleanupUtil() {
+        throw new AssertionError("Instantiating utility class...");
+    }
+    private static final Logger LOGGER = Logger.getLogger(CleanupUtil.class);
+
+    public static List<String> getAllProcesses(ColoHelper prism)
+        throws IOException, URISyntaxException, AuthenticationException, JAXBException {
+        return getAllEntitiesOfOneType(prism.getProcessHelper());
+    }
+
+    public static List<String> getAllFeeds(ColoHelper prism)
+        throws IOException, URISyntaxException, AuthenticationException, JAXBException {
+        return getAllEntitiesOfOneType(prism.getFeedHelper());
+    }
+
+    public static List<String> getAllClusters(ColoHelper prism)
+        throws IOException, URISyntaxException, AuthenticationException, JAXBException {
+        return getAllEntitiesOfOneType(prism.getClusterHelper());
+    }
+
+    private static List<String> getAllEntitiesOfOneType(IEntityManagerHelper iEntityManagerHelper)
+        throws IOException, URISyntaxException, AuthenticationException, JAXBException {
+        final EntitiesResult entitiesResult = getEntitiesResultOfOneType(iEntityManagerHelper);
+        List<String> clusters = new ArrayList<String>();
+        for (EntityResult entity : entitiesResult.getEntities()) {
+            clusters.add(entity.getName());
+        }
+        return clusters;
+    }
+
+    private static EntitiesResult getEntitiesResultOfOneType(
+        IEntityManagerHelper iEntityManagerHelper)
+        throws IOException, URISyntaxException, AuthenticationException, JAXBException {
+        final ServiceResponse clusterResponse =
+            iEntityManagerHelper.listEntities(Util.URLS.LIST_URL);
+        JAXBContext jc = JAXBContext.newInstance(EntitiesResult.class);
+        Unmarshaller u = jc.createUnmarshaller();
+        return (EntitiesResult) u.unmarshal(
+            new StringReader(clusterResponse.getMessage()));
+    }
+
+    public static void cleanAllClustersQuietly(ColoHelper prism) {
+        try {
+            final List<String> clusters = getAllClusters(prism);
+            for (String cluster : clusters) {
+                try {
+                    prism.getClusterHelper().deleteByName(Util.URLS.DELETE_URL, cluster, null);
+                } catch (Exception e) {
+                    LOGGER.warn("Caught exception: " + ExceptionUtils.getStackTrace(e));
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.warn("Unable to get a list of clusters because of exception: "
+                    +
+                ExceptionUtils.getStackTrace(e));
+        }
+    }
+
+    public static void cleanAllFeedsQuietly(ColoHelper prism) {
+        try {
+            final List<String> feeds = getAllFeeds(prism);
+            for (String feed : feeds) {
+                try {
+                    prism.getFeedHelper().deleteByName(Util.URLS.DELETE_URL, feed, null);
+                } catch (Exception e) {
+                    LOGGER.warn("Caught exception: " + ExceptionUtils.getStackTrace(e));
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.warn("Unable to get a list of feeds because of exception: "
+                    +
+                ExceptionUtils.getStackTrace(e));
+        }
+    }
+
+    public static void cleanAllProcessesQuietly(ColoHelper prism,
+                                                IEntityManagerHelper entityManagerHelper) {
+        try {
+            final List<String> processes = getAllProcesses(prism);
+            for (String process : processes) {
+                try {
+                    entityManagerHelper.deleteByName(Util.URLS.DELETE_URL, process, null);
+                } catch (Exception e) {
+                    LOGGER.warn("Caught exception: " + ExceptionUtils.getStackTrace(e));
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.warn("Unable to get a list of feeds because of exception: "
+                    +
+                ExceptionUtils.getStackTrace(e));
+        }
+    }
+
+    public static void cleanAllEntities(ColoHelper prism) {
+        cleanAllProcessesQuietly(prism, prism.getProcessHelper());
+        cleanAllFeedsQuietly(prism);
+        cleanAllClustersQuietly(prism);
+    }
+
+    public static void deleteQuietly(IEntityManagerHelper helper, String feed) {
+        try {
+            helper.delete(Util.URLS.DELETE_URL, feed);
+        } catch (Exception e) {
+            LOGGER.info("Caught exception: " + ExceptionUtils.getStackTrace(e));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Config.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Config.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Config.java
new file mode 100644
index 0000000..a6e83ef
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Config.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+public class Config {
+    private static final Logger logger = Logger.getLogger(Config.class);
+
+    private static final String MERLIN_PROPERTIES = "Merlin.properties";
+    private static final Config INSTANCE = new Config(MERLIN_PROPERTIES);
+
+    private PropertiesConfiguration confObj;
+    private Config(String propFileName) {
+        try {
+            logger.info("Going to read properties from: " + propFileName);
+            confObj = new PropertiesConfiguration(Config.class.getResource("/" + propFileName));
+        } catch (ConfigurationException e) {
+            Assert.fail("Could not read properties because of exception: " + e);
+        }
+    }
+
+    public static String getProperty(String key) {
+        return INSTANCE.confObj.getString(key);
+    }
+
+    public static String[] getStringArray(String key) {
+        return INSTANCE.confObj.getStringArray(key);
+    }
+
+    public static String getProperty(String key, String defaultValue) {
+        return INSTANCE.confObj.getString(key, defaultValue);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/ExecUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/ExecUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/ExecUtil.java
new file mode 100644
index 0000000..d240e76
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/ExecUtil.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import com.jcraft.jsch.UserInfo;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * util methods related to exec.
+ */
+public final class ExecUtil {
+    private ExecUtil() {
+        throw new AssertionError("Instantiating utility class...");
+    }
+    private static final Logger LOGGER = Logger.getLogger(ExecUtil.class);
+
+    static List<String> runRemoteScriptAsSudo(final String hostName, final String userName,
+                                              final String password, final String command,
+                                              final String runAs, final String identityFile) throws
+        JSchException, IOException {
+        JSch jsch = new JSch();
+        Session session = jsch.getSession(userName, hostName, 22);
+        // only set the password if its not empty
+        if (null != password && !password.isEmpty()) {
+            session.setUserInfo(new HardcodedUserInfo(password));
+        }
+        Properties config = new Properties();
+        config.setProperty("StrictHostKeyChecking", "no");
+        config.setProperty("UserKnownHostsFile", "/dev/null");
+        // only set the password if its not empty
+        if (null == password || password.isEmpty()) {
+            jsch.addIdentity(identityFile);
+        }
+        session.setConfig(config);
+        session.connect();
+        Assert.assertTrue(session.isConnected(), "The session was not connected correctly!");
+
+        List<String> data = new ArrayList<String>();
+
+        ChannelExec channel = (ChannelExec) session.openChannel("exec");
+        channel.setPty(true);
+        String runCmd;
+        if (null == runAs || runAs.isEmpty()) {
+            runCmd = "sudo -S -p '' " + command;
+        } else {
+            runCmd = String.format("sudo su - %s -c '%s'", runAs, command);
+        }
+        if (userName.equals(runAs)) {
+            runCmd = command;
+        }
+        LOGGER.info(
+            "host_name: " + hostName + " user_name: " + userName + " password: " + password
+                    +
+                " command: " +runCmd);
+        channel.setCommand(runCmd);
+        InputStream in = channel.getInputStream();
+        OutputStream out = channel.getOutputStream();
+        channel.setErrStream(System.err);
+        channel.connect();
+        TimeUtil.sleepSeconds(20);
+        // only print the password if its not empty
+        if (null != password && !password.isEmpty()) {
+            out.write((password + "\n").getBytes());
+            out.flush();
+        }
+
+        //save console output to data
+        BufferedReader r = new BufferedReader(new InputStreamReader(in));
+        String line;
+        while (true) {
+            while ((line=r.readLine())!=null) {
+                LOGGER.debug(line);
+                data.add(line);
+            }
+            if (channel.isClosed()) {
+                break;
+            }
+        }
+
+        byte[] tmp = new byte[1024];
+        while (true) {
+            while (in.available() > 0) {
+                int i = in.read(tmp, 0, 1024);
+                if (i < 0) {
+                    break;
+                }
+                LOGGER.info(new String(tmp, 0, i));
+            }
+            if (channel.isClosed()) {
+                LOGGER.info("exit-status: " + channel.getExitStatus());
+                break;
+            }
+            TimeUtil.sleepSeconds(1);
+        }
+
+        in.close();
+        channel.disconnect();
+        session.disconnect();
+        out.close();
+        return data;
+    }
+
+    public static ExecResult executeCommand(String command) {
+        LOGGER.info("Command to be executed: " + command);
+        StringBuilder errors = new StringBuilder();
+        StringBuilder output = new StringBuilder();
+
+        try {
+            Process process = Runtime.getRuntime().exec(command);
+
+            BufferedReader errorReader =
+                new BufferedReader(new InputStreamReader(process.getErrorStream()));
+            BufferedReader consoleReader =
+                new BufferedReader(new InputStreamReader(process.getInputStream()));
+
+            String line;
+            while ((line = errorReader.readLine()) != null) {
+                errors.append(line).append("\n");
+            }
+
+            while ((line = consoleReader.readLine()) != null) {
+                output.append(line).append("\n");
+            }
+            final int exitVal = process.waitFor();
+            LOGGER.info("exitVal: " + exitVal);
+            LOGGER.info("output: " + output);
+            LOGGER.info("errors: " + errors);
+            return new ExecResult(exitVal, output.toString().trim(), errors.toString().trim());
+        } catch (InterruptedException e) {
+            Assert.fail("Process execution failed:" + ExceptionUtils.getStackTrace(e));
+        } catch (IOException e) {
+            Assert.fail("Process execution failed:" + ExceptionUtils.getStackTrace(e));
+        }
+        return null;
+    }
+
+    public static int executeCommandGetExitCode(String command) {
+        return executeCommand(command).getExitVal();
+    }
+
+    public static String executeCommandGetOutput(String command) {
+        return executeCommand(command).getOutput();
+    }
+
+    private  static final class HardcodedUserInfo implements UserInfo {
+
+        private final String password;
+
+        private HardcodedUserInfo(String password) {
+            this.password = password;
+        }
+
+        public String getPassphrase() {
+            return null;
+        }
+
+        public String getPassword() {
+            return password;
+        }
+
+        public boolean promptPassword(String s) {
+            return true;
+        }
+
+        public boolean promptPassphrase(String s) {
+            return true;
+        }
+
+        public boolean promptYesNo(String s) {
+            return true;
+        }
+
+        public void showMessage(String s) {
+            LOGGER.info("message = " + s);
+        }
+    }
+
+    private static final class ExecResult {
+
+        private final int exitVal;
+        private final String output;
+        private final String error;
+
+        private ExecResult(final int exitVal, final String output, final String error) {
+            this.exitVal = exitVal;
+            this.output = output;
+            this.error = error;
+        }
+
+        public int getExitVal() {
+            return exitVal;
+        }
+
+        public String getOutput() {
+            return output;
+        }
+
+        public String getError() {
+            return error;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Generator.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Generator.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Generator.java
new file mode 100644
index 0000000..86f9d6f
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Generator.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+public class Generator {
+    final String prefix;
+    final String postfix;
+    final String formatString;
+    private int count;
+
+
+    private Generator(String prefix, String postfix, String formatString) {
+        this.prefix = prefix;
+        this.postfix = postfix;
+        this.count = 0;
+        this.formatString = formatString;
+    }
+
+    public String generate() {
+        count++;
+        return String.format(formatString, prefix, count, postfix);
+    }
+
+    public static Generator getNameGenerator(String prefix, String postfix) {
+        return new Generator(prefix, postfix, "%s%03d-%s");
+    }
+
+    public static Generator getHadoopPathGenerator(String prefix, String postfix) {
+        return new Generator(prefix, postfix, "%s_%03d%s");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/GraphAssert.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/GraphAssert.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/GraphAssert.java
new file mode 100644
index 0000000..ed0ea48
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/GraphAssert.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
+import org.apache.falcon.regression.core.response.lineage.Edge;
+import org.apache.falcon.regression.core.response.lineage.EdgesResult;
+import org.apache.falcon.regression.core.response.lineage.GraphResult;
+import org.apache.falcon.regression.core.response.lineage.NODE_TYPE;
+import org.apache.falcon.regression.core.response.lineage.Vertex;
+import org.apache.falcon.regression.core.response.lineage.VerticesResult;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+/**
+ * util methods for Graph Asserts.
+ */
+public final class GraphAssert {
+    private GraphAssert() {
+        throw new AssertionError("Instantiating utility class...");
+    }
+    private static final Logger LOGGER = Logger.getLogger(GraphAssert.class);
+
+    /**
+     * Check that the result has certain minimum number of vertices.
+     * @param graphResult the result to be checked
+     * @param minNumOfVertices required number of vertices
+     */
+    public static void checkVerticesPresence(final GraphResult graphResult,
+                                             final int minNumOfVertices) {
+        Assert.assertTrue(graphResult.getTotalSize() >= minNumOfVertices,
+            "graphResult should have at least " + minNumOfVertices + " vertex");
+    }
+
+    /**
+     * Check that the vertices in the result are sane.
+     * @param verticesResult the result to be checked
+     */
+    public static void assertVertexSanity(final VerticesResult verticesResult) {
+        Assert.assertEquals(verticesResult.getResults().size(), verticesResult.getTotalSize(),
+            "Size of vertices don't match");
+        for (Vertex vertex : verticesResult.getResults()) {
+            Assert.assertNotNull(vertex.get_id(),
+                "id of the vertex should be non-null: " + vertex);
+            Assert.assertEquals(vertex.get_type(), NODE_TYPE.VERTEX,
+                "_type of the vertex should be non-null: " + vertex);
+            Assert.assertNotNull(vertex.getName(),
+                "name of the vertex should be non-null: " + vertex);
+            Assert.assertNotNull(vertex.getType(),
+                "id of the vertex should be non-null: " + vertex);
+            Assert.assertNotNull(vertex.getTimestamp(),
+                "id of the vertex should be non-null: " + vertex);
+        }
+    }
+
+    /**
+     * Check that edges in the result are sane.
+     * @param edgesResult result to be checked
+     */
+    public static void assertEdgeSanity(final EdgesResult edgesResult) {
+        Assert.assertEquals(edgesResult.getResults().size(), edgesResult.getTotalSize(),
+            "Size of edges don't match");
+        for (Edge edge : edgesResult.getResults()) {
+            assertEdgeSanity(edge);
+        }
+    }
+
+    /**
+     * Check that edge is sane.
+     * @param edge edge to be checked
+     */
+    public static void assertEdgeSanity(Edge edge) {
+        Assert.assertNotNull(edge.get_id(), "id of an edge can't be null: " + edge);
+        Assert.assertEquals(edge.get_type(), NODE_TYPE.EDGE,
+            "_type of an edge can't be null: " + edge);
+        Assert.assertNotNull(edge.get_label(), "_label of an edge can't be null: " + edge);
+        Assert.assertTrue(edge.get_inV() > 0, "_inV of an edge can't be null: " + edge);
+        Assert.assertTrue(edge.get_outV() > 0, "_outV of an edge can't be null: " + edge);
+    }
+
+    /**
+     * Check that user vertex is present.
+     * @param verticesResult the result to be checked
+     */
+    public static void assertUserVertexPresence(final VerticesResult verticesResult) {
+        checkVerticesPresence(verticesResult, 1);
+        for(Vertex vertex : verticesResult.getResults()) {
+            if (vertex.getType() == Vertex.VERTEX_TYPE.USER
+                    && vertex.getName().equals(MerlinConstants.CURRENT_USER_NAME)) {
+                return;
+            }
+        }
+        Assert.fail(String.format("Vertex corresponding to user: %s is not present.",
+            MerlinConstants.CURRENT_USER_NAME));
+    }
+
+    /**
+     * Check that a vertex of a certain name is present.
+     * @param verticesResult the result to be checked
+     * @param name expected name
+     */
+    public static void assertVertexPresence(final VerticesResult verticesResult, final String name) {
+        checkVerticesPresence(verticesResult, 1);
+        for (Vertex vertex : verticesResult.getResults()) {
+            if (vertex.getName().equals(name)) {
+                return;
+            }
+        }
+        Assert.fail(String.format("Vertex of name: %s is not present.", name));
+    }
+
+    /**
+     * Check that the result has at least a certain number of vertices of a certain type.
+     * @param verticesResult the result to be checked
+     * @param vertexType vertex type
+     * @param minOccurrence required number of vertices
+     */
+    public static void assertVerticesPresenceMinOccur(final VerticesResult verticesResult,
+                                                      final Vertex.VERTEX_TYPE vertexType,
+                                                      final int minOccurrence) {
+        int occurrence = 0;
+        for(Vertex vertex : verticesResult.getResults()) {
+            if (vertex.getType() == vertexType) {
+                LOGGER.info("Found vertex: " + vertex);
+                occurrence++;
+                if (occurrence >= minOccurrence) {
+                    return;
+                }
+            }
+        }
+        Assert.fail(String.format("Expected at least %d vertices of type %s. But found only %d",
+            minOccurrence, vertexType, occurrence));
+    }
+
+    /**
+     * Check result to contain at least a certain number of edges of a certain type.
+     * @param edgesResult result to be checked
+     * @param edgeLabel edge label
+     * @param minOccurrence required number of edges
+     */
+    public static void assertEdgePresenceMinOccur(final EdgesResult edgesResult,
+                                                  final Edge.LEBEL_TYPE edgeLabel,
+                                                  final int minOccurrence) {
+        int occurrence = 0;
+        for(Edge edge : edgesResult.getResults()) {
+            if (edge.get_label() == edgeLabel) {
+                LOGGER.info("Found edge: " + edge);
+                occurrence++;
+                if (occurrence >= minOccurrence) {
+                    return;
+                }
+            }
+        }
+        Assert.fail(String.format("Expected at least %d vertices of type %s. But found only %d",
+            minOccurrence, edgeLabel, occurrence));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HCatUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HCatUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HCatUtil.java
new file mode 100644
index 0000000..d878ecb
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HCatUtil.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+
+import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+
+/**
+ * util methods for HCat.
+ */
+public final class HCatUtil {
+    private HCatUtil() {
+        throw new AssertionError("Instantiating utility class...");
+    }
+
+    public static HCatClient getHCatClient(String hCatEndPoint, String hiveMetaStorePrinciple)
+        throws HCatException {
+        HiveConf hcatConf = new HiveConf();
+        hcatConf.set("hive.metastore.local", "false");
+        hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, hCatEndPoint);
+        hcatConf.setVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL, hiveMetaStorePrinciple);
+        hcatConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, MerlinConstants.IS_SECURE);
+        hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+        hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+            HCatSemanticAnalyzer.class.getName());
+        hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+        hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+        hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+        return HCatClient.create(hcatConf);
+    }
+
+    @SuppressWarnings("deprecation")
+    public static HCatFieldSchema getStringSchema(String fieldName, String comment) throws HCatException {
+        return new HCatFieldSchema(fieldName, HCatFieldSchema.Type.STRING, comment);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
new file mode 100644
index 0000000..c33700c
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Util methods related to hadoop.
+ */
+public final class HadoopUtil {
+
+    public static final String SOMETHING_RANDOM = "somethingRandom";
+    private static final Logger LOGGER = Logger.getLogger(HadoopUtil.class);
+
+    private HadoopUtil() {
+        throw new AssertionError("Instantiating utility class...");
+    }
+    public static List<String> getAllFilesHDFS(FileSystem fs, Path location) throws IOException {
+
+        List<String> files = new ArrayList<String>();
+        if (!fs.exists(location)) {
+            return files;
+        }
+        FileStatus[] stats = fs.listStatus(location);
+
+        for (FileStatus stat : stats) {
+            if (!isDir(stat)) {
+                files.add(stat.getPath().toString());
+            }
+        }
+        return files;
+    }
+
+    public static List<Path> getAllDirsRecursivelyHDFS(
+        FileSystem fs, Path location, int depth) throws IOException {
+
+        List<Path> returnList = new ArrayList<Path>();
+
+        FileStatus[] stats = fs.listStatus(location);
+
+        for (FileStatus stat : stats) {
+            if (isDir(stat)) {
+                returnList.add(stat.getPath());
+                if (depth > 0) {
+                    returnList.addAll(getAllDirsRecursivelyHDFS(fs, stat.getPath(), depth - 1));
+                }
+            }
+
+        }
+
+        return returnList;
+    }
+
+    public static List<Path> getAllFilesRecursivelyHDFS(
+        FileSystem fs, Path location) throws IOException {
+
+        List<Path> returnList = new ArrayList<Path>();
+
+        FileStatus[] stats;
+        try {
+            stats = fs.listStatus(location);
+        } catch (FileNotFoundException e) {
+            e.printStackTrace();
+            return new ArrayList<Path>();
+        }
+
+        if (stats == null) {
+            return returnList;
+        }
+        for (FileStatus stat : stats) {
+
+            if (!isDir(stat)) {
+                if (!stat.getPath().toUri().toString().contains("_SUCCESS")) {
+                    returnList.add(stat.getPath());
+                }
+            } else {
+                returnList.addAll(getAllFilesRecursivelyHDFS(fs, stat.getPath()));
+            }
+        }
+
+        return returnList;
+
+    }
+
+    @SuppressWarnings("deprecation")
+    private static boolean isDir(FileStatus stat) {
+        return stat.isDir();
+    }
+
+    public static void copyDataToFolder(final FileSystem fs, final String dstHdfsDir,
+                                        final String srcFileLocation)
+        throws IOException {
+        LOGGER.info(String.format("Copying local dir %s to hdfs location %s on %s",
+            srcFileLocation, dstHdfsDir, fs.getUri()));
+        fs.copyFromLocalFile(new Path(srcFileLocation), new Path(dstHdfsDir));
+    }
+
+    public static void uploadDir(final FileSystem fs, final String dstHdfsDir,
+                                 final String localLocation)
+        throws IOException {
+        LOGGER.info(String.format("Uploading local dir %s to hdfs location %s", localLocation,
+            dstHdfsDir));
+        HadoopUtil.deleteDirIfExists(dstHdfsDir, fs);
+        HadoopUtil.copyDataToFolder(fs, dstHdfsDir, localLocation);
+    }
+
+    public static List<String> getHDFSSubFoldersName(FileSystem fs,
+                                                     String baseDir) throws IOException {
+
+        List<String> returnList = new ArrayList<String>();
+
+        FileStatus[] stats = fs.listStatus(new Path(baseDir));
+
+
+        for (FileStatus stat : stats) {
+            if (isDir(stat)) {
+                returnList.add(stat.getPath().getName());
+            }
+
+        }
+
+
+        return returnList;
+    }
+
+    public static boolean isFilePresentHDFS(FileSystem fs, String hdfsPath, String fileToCheckFor)
+        throws IOException {
+
+        LOGGER.info("getting file from folder: " + hdfsPath);
+
+        List<String> fileNames = getAllFileNamesFromHDFS(fs, hdfsPath);
+
+        for (String filePath : fileNames) {
+
+            if (filePath.contains(fileToCheckFor)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private static List<String> getAllFileNamesFromHDFS(
+        FileSystem fs, String hdfsPath) throws IOException {
+
+        List<String> returnList = new ArrayList<String>();
+
+        LOGGER.info("getting file from folder: " + hdfsPath);
+        FileStatus[] stats = fs.listStatus(new Path(hdfsPath));
+
+        for (FileStatus stat : stats) {
+            String currentPath = stat.getPath().toUri().getPath(); // gives directory name
+            if (!isDir(stat)) {
+                returnList.add(currentPath);
+            }
+
+
+        }
+        return returnList;
+    }
+
+    public static void recreateDir(FileSystem fs, String path) throws IOException {
+
+        deleteDirIfExists(path, fs);
+        LOGGER.info("creating hdfs dir: " + path + " on " + fs.getConf().get("fs.default.name"));
+        fs.mkdirs(new Path(path));
+
+    }
+
+    public static void recreateDir(List<FileSystem> fileSystems, String path) throws IOException {
+
+        for (FileSystem fs : fileSystems) {
+            recreateDir(fs, path);
+        }
+    }
+
+    public static void deleteDirIfExists(String hdfsPath, FileSystem fs) throws IOException {
+        Path path = new Path(hdfsPath);
+        if (fs.exists(path)) {
+            LOGGER.info(String.format("Deleting HDFS path: %s on %s", path, fs.getUri()));
+            fs.delete(path, true);
+        } else {
+            LOGGER.info(String.format(
+                "Not deleting non-existing HDFS path: %s on %s", path, fs.getUri()));
+        }
+    }
+
+    public static void flattenAndPutDataInFolder(FileSystem fs, String inputPath,
+                                                 List<String> remoteLocations) throws IOException {
+        flattenAndPutDataInFolder(fs, inputPath, "", remoteLocations);
+    }
+
+    public static List<String> flattenAndPutDataInFolder(FileSystem fs, String inputPath,
+                                                 String remotePathPrefix,
+                                                 List<String> remoteLocations) throws IOException {
+        if (StringUtils.isNotEmpty(remotePathPrefix)) {
+            deleteDirIfExists(remotePathPrefix, fs);
+        }
+        LOGGER.info("Creating data in folders: \n" + remoteLocations);
+        File input = new File(inputPath);
+        File[] files = input.isDirectory() ? input.listFiles() : new File[]{input};
+        List<Path> filePaths = new ArrayList<Path>();
+        assert files != null;
+        for (final File file : files) {
+            if (!file.isDirectory()) {
+                final Path filePath = new Path(file.getAbsolutePath());
+                filePaths.add(filePath);
+            }
+        }
+
+        if (!remotePathPrefix.endsWith("/") && !remoteLocations.get(0).startsWith("/")) {
+            remotePathPrefix += "/";
+        }
+        Pattern pattern = Pattern.compile(":[\\d]+/"); // remove 'hdfs(hftp)://server:port'
+        List<String> locations = new ArrayList<String>();
+        for (String remoteDir : remoteLocations) {
+            String remoteLocation = remotePathPrefix + remoteDir;
+            if (pattern.matcher(remoteLocation).find()) {
+                remoteLocation = remoteLocation.split(":[\\d]+")[1];
+            }
+            locations.add(remoteLocation);
+            LOGGER.info(String.format("copying to: %s files: %s",
+                fs.getUri() + remoteLocation, Arrays.toString(files)));
+            if (!fs.exists(new Path(remoteLocation))) {
+                fs.mkdirs(new Path(remoteLocation));
+            }
+
+            fs.copyFromLocalFile(false, true, filePaths.toArray(new Path[filePaths.size()]),
+                new Path(remoteLocation));
+        }
+        return locations;
+    }
+
+    public static void createLateDataFoldersWithRandom(FileSystem fs, String folderPrefix,
+        List<String> folderList) throws IOException {
+        LOGGER.info("creating late data folders.....");
+        folderList.add(SOMETHING_RANDOM);
+
+        for (final String folder : folderList) {
+            fs.mkdirs(new Path(folderPrefix + folder));
+        }
+
+        LOGGER.info("created all late data folders.....");
+    }
+
+    public static void copyDataToFolders(FileSystem fs, List<String> folderList,
+        String directory, String folderPrefix) throws IOException {
+        LOGGER.info("copying data into folders....");
+        List<String> fileLocations = new ArrayList<String>();
+        File[] files = new File(directory).listFiles();
+        if (files != null) {
+            for (final File file : files) {
+                fileLocations.add(file.toString());
+            }
+        }
+        copyDataToFolders(fs, folderPrefix, folderList,
+                fileLocations.toArray(new String[fileLocations.size()]));
+    }
+
+    public static void copyDataToFolders(FileSystem fs, final String folderPrefix,
+        List<String> folderList, String... fileLocations) throws IOException {
+        for (final String folder : folderList) {
+            boolean r;
+            String folderSpace = folder.replaceAll("/", "_");
+            File f = new File(OSUtil.NORMAL_INPUT + folderSpace + ".txt");
+            if (!f.exists()) {
+                r = f.createNewFile();
+                if (!r) {
+                    LOGGER.info("file could not be created");
+                }
+            }
+
+            FileWriter fr = new FileWriter(f);
+            fr.append("folder");
+            fr.close();
+            fs.copyFromLocalFile(new Path(f.getAbsolutePath()), new Path(folderPrefix + folder));
+            r = f.delete();
+            if (!r) {
+                LOGGER.info("delete was not successful");
+            }
+
+            Path[] srcPaths = new Path[fileLocations.length];
+            for (int i = 0; i < srcPaths.length; ++i) {
+                srcPaths[i] = new Path(fileLocations[i]);
+            }
+            LOGGER.info(String.format("copying  %s to %s%s on %s", Arrays.toString(srcPaths),
+                folderPrefix, folder, fs.getUri()));
+            fs.copyFromLocalFile(false, true, srcPaths, new Path(folderPrefix + folder));
+        }
+    }
+
+    public static void lateDataReplenish(FileSystem fs, int interval,
+        int minuteSkip, String folderPrefix) throws IOException {
+        List<String> folderData = TimeUtil.getMinuteDatesOnEitherSide(interval, minuteSkip);
+
+        createLateDataFoldersWithRandom(fs, folderPrefix, folderData);
+        copyDataToFolders(fs, folderData, OSUtil.NORMAL_INPUT, folderPrefix);
+    }
+
+    public static void createLateDataFolders(FileSystem fs, final String folderPrefix,
+                                             List<String> folderList) throws IOException {
+        for (final String folder : folderList) {
+            fs.mkdirs(new Path(folderPrefix + folder));
+        }
+    }
+
+    public static void injectMoreData(FileSystem fs, final String remoteLocation,
+                                      String localLocation) throws IOException {
+        File[] files = new File(localLocation).listFiles();
+        assert files != null;
+        for (final File file : files) {
+            if (!file.isDirectory()) {
+                String path = remoteLocation + "/" + System.currentTimeMillis() / 1000 + "/";
+                LOGGER.info("inserting data@ " + path);
+                fs.copyFromLocalFile(new Path(file.getAbsolutePath()), new Path(path));
+            }
+        }
+
+    }
+
+    public static void putFileInFolderHDFS(FileSystem fs, int interval, int minuteSkip,
+                                           String folderPrefix, String fileToBePut)
+        throws IOException {
+        List<String> folderPaths = TimeUtil.getMinuteDatesOnEitherSide(interval, minuteSkip);
+        LOGGER.info("folderData: " + folderPaths.toString());
+
+        createLateDataFolders(fs, folderPrefix, folderPaths);
+
+        if (fileToBePut.equals("_SUCCESS")) {
+            copyDataToFolders(fs, folderPrefix, folderPaths, OSUtil.NORMAL_INPUT + "_SUCCESS");
+        } else {
+            copyDataToFolders(fs, folderPrefix, folderPaths, OSUtil.NORMAL_INPUT + "log_01.txt");
+        }
+
+    }
+
+    public static void lateDataReplenishWithoutSuccess(FileSystem fs, int interval,
+        int minuteSkip, String folderPrefix, String postFix) throws IOException {
+        List<String> folderPaths = TimeUtil.getMinuteDatesOnEitherSide(interval, minuteSkip);
+        LOGGER.info("folderData: " + folderPaths.toString());
+
+        if (postFix != null) {
+            for (int i = 0; i < folderPaths.size(); i++) {
+                folderPaths.set(i, folderPaths.get(i) + postFix);
+            }
+        }
+
+        createLateDataFolders(fs, folderPrefix, folderPaths);
+        copyDataToFolders(fs, folderPrefix, folderPaths,
+                OSUtil.NORMAL_INPUT + "log_01.txt");
+    }
+
+    public static void lateDataReplenish(FileSystem fs, int interval, int minuteSkip,
+                                         String folderPrefix, String postFix) throws IOException {
+        List<String> folderPaths = TimeUtil.getMinuteDatesOnEitherSide(interval, minuteSkip);
+        LOGGER.info("folderData: " + folderPaths.toString());
+
+        if (postFix != null) {
+            for (int i = 0; i < folderPaths.size(); i++) {
+                folderPaths.set(i, folderPaths.get(i) + postFix);
+            }
+        }
+
+        createLateDataFolders(fs, folderPrefix, folderPaths);
+        copyDataToFolders(fs, folderPrefix, folderPaths,
+            OSUtil.NORMAL_INPUT + "_SUCCESS", OSUtil.NORMAL_INPUT + "log_01.txt");
+    }
+
+    public static void replenishData(FileSystem fileSystem, String prefix, List<String> folderList,
+        boolean uploadData) throws IOException {
+        //purge data first
+        deleteDirIfExists(prefix, fileSystem);
+
+        folderList.add(SOMETHING_RANDOM);
+
+        for (final String folder : folderList) {
+            final String pathString = prefix + folder;
+            LOGGER.info(fileSystem.getUri() + pathString);
+            fileSystem.mkdirs(new Path(pathString));
+            if (uploadData) {
+                fileSystem.copyFromLocalFile(new Path(OSUtil.RESOURCES + "log_01.txt"),
+                        new Path(pathString));
+            }
+        }
+    }
+}