You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ro...@apache.org on 2015/03/24 16:33:20 UTC

[2/2] falcon git commit: FALCON-1113 Clean up data files in merlin resource directory. Create better names for them. Contributed by Paul Isaychuk

FALCON-1113 Clean up data files in merlin resource directory. Create better names for them. Contributed by Paul Isaychuk


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/1aac3a50
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/1aac3a50
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/1aac3a50

Branch: refs/heads/master
Commit: 1aac3a5027f1f43209cc4f9bcf8844822d6f7b9c
Parents: 969d10e
Author: Ruslan Ostafiychuk <ro...@apache.org>
Authored: Tue Mar 24 17:32:00 2015 +0200
Committer: Ruslan Ostafiychuk <ro...@apache.org>
Committed: Tue Mar 24 17:32:00 2015 +0200

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   3 +
 .../falcon/regression/core/util/OSUtil.java     |   2 -
 .../falcon/regression/FeedLateRerunTest.java    |   4 +-
 .../falcon/regression/FeedReplicationTest.java  |  17 +-
 .../falcon/regression/ProcessLateRerunTest.java |   4 +-
 .../falcon/regression/ProcessLibPathTest.java   |   2 +-
 .../regression/hcat/HCatRetentionTest.java      |   2 +-
 .../PrismFeedReplicationPartitionExpTest.java   |  11 +-
 .../falcon/regression/prism/RetentionTest.java  |   3 +-
 .../2ndLateData/Configuration.java.ignore       |  73 ----
 .../2ndLateData/DateValidator.java.ignore       |  87 ----
 .../2ndLateData/TimeUnit.java.ignore            |  36 --
 .../2ndLateData/dataFile1.txt                   |  73 ++++
 .../2ndLateData/dataFile2.txt                   |  87 ++++
 .../2ndLateData/dataFile3.txt                   |  36 ++
 .../OozieExampleInputData/lateData/log_15.txt   |  19 -
 .../EntityInstanceMessage.java.ignore           | 412 -------------------
 .../EntityInstanceMessageCreator.java.ignore    |  61 ---
 .../normalInput/MessageProducer.java.ignore     | 139 -------
 .../normalInput/dataFile.properties             |  30 ++
 .../normalInput/dataFile.xml                    |  63 +++
 .../normalInput/dataFile1.txt                   |  61 +++
 .../normalInput/dataFile2.txt                   | 412 +++++++++++++++++++
 .../normalInput/dataFile3.txt                   | 139 +++++++
 .../normalInput/dataFile4.txt                   | 174 ++++++++
 .../normalInput/jms-config.properties           |  31 --
 .../OozieExampleInputData/normalInput/log4j.xml |  63 ---
 .../normalInput/log_01.txt                      | 174 --------
 .../ReplicationResources/cluster-0.1.xml        |  41 --
 .../ReplicationResources/feed-s4Replication.xml |  38 --
 .../test/resources/ReplicationResources/id.pig  |  20 -
 .../ReplicationResources/log4testng.properties  |  29 --
 .../ReplicationResources/process-agg.xml        |  52 ---
 .../src/test/resources/feed-s4Replication.xml   |  42 --
 .../merlin/src/test/resources/log_01.txt        |  18 -
 35 files changed, 1096 insertions(+), 1362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 5a897fc..2024016 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -63,6 +63,9 @@ Trunk (Unreleased)
    via Samarth Gupta)
 
   IMPROVEMENTS
+   FALCON-1113 Clean up data files in merlin resource directory. Create better names for them
+   (Paul Isaychuk via Ruslan Ostafiychuk)
+
    FALCON-1103 RetentionTest stabilization - remove check of all retention job actions
    (Paul Isaychuk via Ruslan Ostafiychuk)
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java
index cc9e7ea..ab27ccf 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java
@@ -38,8 +38,6 @@ public final class OSUtil {
     public static final String RESOURCES_OOZIE = String.format(RESOURCES + "oozie%s", SEPARATOR);
     public static final String OOZIE_EXAMPLE_INPUT_DATA =
         String.format(RESOURCES + "OozieExampleInputData%s", SEPARATOR);
-    public static final String OOZIE_EXAMPLE_INPUT_LATE_INPUT =
-        OSUtil.OOZIE_EXAMPLE_INPUT_DATA + "lateData";
     public static final String NORMAL_INPUT =
         String.format(OOZIE_EXAMPLE_INPUT_DATA + "normalInput%s", SEPARATOR);
     public static final String SINGLE_FILE =

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
index 9751003..2c8346d 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
@@ -194,7 +194,7 @@ public class FeedLateRerunTest extends BaseTestClass {
             for (String location : missingDependencies) {
                 if (tempCount==1) {
                     LOGGER.info("Transferring data to : " + location);
-                    HadoopUtil.copyDataToFolder(clusterFS1, location, OSUtil.RESOURCES + "feed-s4Replication.xml");
+                    HadoopUtil.copyDataToFolder(clusterFS1, location, OSUtil.NORMAL_INPUT + "dataFile.xml");
                     tempCount++;
                 }
             }
@@ -207,7 +207,7 @@ public class FeedLateRerunTest extends BaseTestClass {
         for (String dependency : missingDependencies) {
             if (tempCounter==1) {
                 LOGGER.info("Transferring late data to : " + dependency);
-                HadoopUtil.copyDataToFolder(clusterFS1, dependency, OSUtil.RESOURCES + "log4j.properties");
+                HadoopUtil.copyDataToFolder(clusterFS1, dependency, OSUtil.NORMAL_INPUT + "dataFile.properties");
             }
             tempCounter++;
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
index f3a8318..eb8c4fe 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
@@ -147,9 +147,8 @@ public class FeedReplicationTest extends BaseTestClass {
         Path toSource = new Path(sourceLocation);
         Path toTarget = new Path(targetLocation);
         if (dataFlag) {
-            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
-                    OSUtil.RESOURCES + "feed-s4Replication.xml");
-            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.RESOURCES + "log_01.txt");
+            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.NORMAL_INPUT + "dataFile.xml");
+            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.NORMAL_INPUT + "dataFile1.txt");
         }
 
         //check if coordinator exists
@@ -236,9 +235,8 @@ public class FeedReplicationTest extends BaseTestClass {
         Path toTarget = new Path(targetLocation);
 
         if (dataFlag) {
-            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
-                    OSUtil.RESOURCES + "feed-s4Replication.xml");
-            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.RESOURCES + "log_01.txt");
+            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.NORMAL_INPUT + "dataFile.xml");
+            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.NORMAL_INPUT + "dataFile1.txt");
         }
 
         //check if all coordinators exist
@@ -336,9 +334,8 @@ public class FeedReplicationTest extends BaseTestClass {
         Path toSource = new Path(sourceLocation);
         Path toTarget = new Path(targetLocation);
         if (dataFlag) {
-            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
-                    OSUtil.RESOURCES + "feed-s4Replication.xml");
-            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.RESOURCES + "log_01.txt");
+            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.NORMAL_INPUT + "dataFile.xml");
+            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.NORMAL_INPUT + "dataFile1.txt");
         }
 
         //check while instance is got created
@@ -357,7 +354,7 @@ public class FeedReplicationTest extends BaseTestClass {
 
         //create availability flag on source
         HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
-                OSUtil.RESOURCES + availabilityFlagName);
+                OSUtil.OOZIE_EXAMPLE_INPUT_DATA + availabilityFlagName);
 
         //check if instance become running
         InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1,

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
index 338d90a..dc032f1 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
@@ -301,7 +301,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
                 for (String location : missingDependencies) {
                     if (tempCount==1) {
                         LOGGER.info("Transferring data to : " + location);
-                        HadoopUtil.copyDataToFolder(clusterFS, location, OSUtil.RESOURCES + "feed-s4Replication.xml");
+                        HadoopUtil.copyDataToFolder(clusterFS, location, OSUtil.NORMAL_INPUT + "dataFile.xml");
                         tempCount++;
                     }
                 }
@@ -320,7 +320,7 @@ public class ProcessLateRerunTest extends BaseTestClass {
             for (String dependency : missingDependencies) {
                 if (tempCounter==dataFolder) {
                     LOGGER.info("Transferring late data to : " + dependency);
-                    HadoopUtil.copyDataToFolder(clusterFS, dependency, OSUtil.RESOURCES + "log4j.properties");
+                    HadoopUtil.copyDataToFolder(clusterFS, dependency, OSUtil.NORMAL_INPUT + "dataFile.properties");
                 }
                 tempCounter++;
             }

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
index 7cb98cf..bc2978f 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
@@ -118,7 +118,7 @@ public class ProcessLibPathTest extends BaseTestClass {
         HadoopUtil.uploadDir(clusterFS, workflowDir, OSUtil.RESOURCES_OOZIE);
         HadoopUtil.recreateDir(clusterFS, workflowDir + "/lib");
         HadoopUtil.copyDataToFolder(clusterFS, workflowDir + "/lib/invalid.jar",
-            OSUtil.RESOURCES + "feed-s4Replication.xml");
+            OSUtil.NORMAL_INPUT + "dataFile.xml");
         bundles[0].setProcessWorkflow(workflowDir);
         LOGGER.info("processData: " + Util.prettyPrintXml(process));
         bundles[0].submitFeedsScheduleProcess(prism);

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java
index 83554e2..2213c1d 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java
@@ -117,7 +117,7 @@ public class HCatRetentionTest extends BaseTestClass {
                 freqType.getFormatter());
             AssertUtil.checkForListSizes(dataDates, dataDateStrings);
             final List<String> dataFolders = HadoopUtil.flattenAndPutDataInFolder(clusterFS,
-                OSUtil.OOZIE_EXAMPLE_INPUT_LATE_INPUT, baseTestHDFSDir, dataDateStrings);
+                OSUtil.SINGLE_FILE, baseTestHDFSDir, dataDateStrings);
             addPartitionsToExternalTable(cli, dBName, tableName, freqType, dataDates, dataFolders);
             List<String> initialData =
                 getHadoopDataFromDir(clusterFS, baseTestHDFSDir, testDir, freqType);

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
index b51c4a4..97d4e67 100755
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java
@@ -72,13 +72,10 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass {
     private String testDirWithDate = testBaseDir1 + testDate;
     private String testDirWithDateSourceTarget = testBaseDir4 + testDate;
     private String testDirWithDateSource1 = testBaseDirServer1Source + testDate;
-    private String testFile1 = OSUtil.RESOURCES
-        + OSUtil.getPath("ReplicationResources", "feed-s4Replication.xml");
-    private String testFile2 = OSUtil.RESOURCES + OSUtil.getPath("ReplicationResources", "id.pig");
-    private String testFile3 = OSUtil.RESOURCES
-        + OSUtil.getPath("ReplicationResources", "cluster-0.1.xml");
-    private String testFile4 = OSUtil.RESOURCES
-        + OSUtil.getPath("ReplicationResources", "log4testng.properties");
+    private String testFile1 = OSUtil.NORMAL_INPUT + "dataFile.xml";
+    private String testFile2 = OSUtil.RESOURCES + OSUtil.getPath("pig", "id.pig");
+    private String testFile3 = OSUtil.RESOURCES + OSUtil.getPath("ELbundle", "cluster-0.1.xml");
+    private String testFile4 = OSUtil.NORMAL_INPUT + "dataFile.properties";
     private static final Logger LOGGER =
         Logger.getLogger(PrismFeedReplicationPartitionExpTest.class);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
index c049a7e..99e240d 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
@@ -146,8 +146,7 @@ public class RetentionTest extends BaseTestClass {
         LOGGER.info("dataDates = " + dataDates);
         dataDates.add(HadoopUtil.SOMETHING_RANDOM);
         if (withData) {
-            HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.RESOURCES + "log_01.txt",
-                testHDFSDir, dataDates);
+            HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE, testHDFSDir, dataDates);
         } else {
             HadoopUtil.createFolders(clusterFS, testHDFSDir, dataDates);
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/Configuration.java.ignore
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/Configuration.java.ignore b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/Configuration.java.ignore
deleted file mode 100644
index 1ab0140..0000000
--- a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/Configuration.java.ignore
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ivory.entity.common;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class Configuration implements Iterable<Map.Entry<String, String>>, Cloneable {
-
-  private final Map<String, String> properties;
-
-  public Configuration() {
-    properties = new ConcurrentHashMap<String, String>();
-  }
-
-  public Configuration(Map<String, String> properties) {
-    this.properties = properties;
-  }
-
-  public void addConfiguration(Configuration config) {
-    for (Entry<String, String> entry : config) {
-      properties.put(entry.getKey(), entry.getValue());
-    }
-  }
-
-  public Configuration addAndReturnNewConfiguration(Configuration config) {
-    Map<String, String> newProperties = new ConcurrentHashMap<String, String>(properties);
-    for (Entry<String, String> entry : config) {
-      newProperties.put(entry.getKey(), entry.getValue());
-    }
-    return new Configuration(newProperties);
-  }
-
-  public String getConf(String name) {
-    return properties.get(name);
-  }
-
-  public void setConf(String name, String value) {
-    properties.put(name, value);
-  }
-
-  public void setConf(String name, String value, String defaultValue) {
-    if (value == null) {
-      properties.put(name, defaultValue);
-    } else {
-      properties.put(name, value);
-    }
-  }
-
-  @Override
-  public Iterator<Entry<String, String>> iterator() {
-    return properties.entrySet().iterator();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/DateValidator.java.ignore
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/DateValidator.java.ignore b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/DateValidator.java.ignore
deleted file mode 100644
index f02ed34..0000000
--- a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/DateValidator.java.ignore
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ivory.entity.common;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class DateValidator {
-
-	private Pattern pattern;
-	private Matcher matcher;
-
-	private static final String DATE_PATTERN = "(2\\d\\d\\d|19\\d\\d)-(0[1-9]|1[012])-(0[1-9]|1[0-9]|2[0-9]|3[01])T([0-1][0-9]|2[0-3]):([0-5][0-9])Z";
-
-	public DateValidator() {
-		pattern = Pattern.compile(DATE_PATTERN);
-	}
-
-	/**
-	 * Validate date format with regular expression
-	 * 
-	 * @param date
-	 *            date address for validation
-	 * @return true valid date fromat, false invalid date format
-	 */
-	public boolean validate(final String date) {
-
-		matcher = pattern.matcher(date);
-
-		if (matcher.matches()) {
-
-			matcher.reset();
-
-			if (matcher.find()) {
-
-				int year = Integer.parseInt(matcher.group(1));
-				String month = matcher.group(2);
-				String day = matcher.group(3);
-
-				if (day.equals("31")
-						&& (month.equals("4") || month.equals("6")
-								|| month.equals("9") || month.equals("11")
-								|| month.equals("04") || month.equals("06") || month
-									.equals("09"))) {
-					return false; // only 1,3,5,7,8,10,12 has 31 days
-				} else if (month.equals("2") || month.equals("02")) {
-					// leap year
-					if (year % 4 == 0) {
-						if (day.equals("30") || day.equals("31")) {
-							return false;
-						} else {
-							return true;
-						}
-					} else {
-						if (day.equals("29") || day.equals("30")
-								|| day.equals("31")) {
-							return false;
-						} else {
-							return true;
-						}
-					}
-				} else {
-					return true;
-				}
-			} else {
-				return false;
-			}
-		} else {
-			return false;
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/TimeUnit.java.ignore
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/TimeUnit.java.ignore b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/TimeUnit.java.ignore
deleted file mode 100644
index 67f9d4c..0000000
--- a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/TimeUnit.java.ignore
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ivory.entity.common;
-
-import java.util.Calendar;
-
-public enum TimeUnit {
-    MINUTE(Calendar.MINUTE), HOUR(Calendar.HOUR), DAY(Calendar.DATE), MONTH(Calendar.MONTH), END_OF_DAY(Calendar.DATE), END_OF_MONTH(
-        Calendar.MONTH), NONE(-1);
-
-    private int calendarUnit;
-
-    private TimeUnit(int calendarUnit) {
-        this.calendarUnit = calendarUnit;
-    }
-
-    public int getCalendarUnit() {
-        return calendarUnit;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/dataFile1.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/dataFile1.txt b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/dataFile1.txt
new file mode 100644
index 0000000..1ab0140
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/dataFile1.txt
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ivory.entity.common;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class Configuration implements Iterable<Map.Entry<String, String>>, Cloneable {
+
+  private final Map<String, String> properties;
+
+  public Configuration() {
+    properties = new ConcurrentHashMap<String, String>();
+  }
+
+  public Configuration(Map<String, String> properties) {
+    this.properties = properties;
+  }
+
+  public void addConfiguration(Configuration config) {
+    for (Entry<String, String> entry : config) {
+      properties.put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  public Configuration addAndReturnNewConfiguration(Configuration config) {
+    Map<String, String> newProperties = new ConcurrentHashMap<String, String>(properties);
+    for (Entry<String, String> entry : config) {
+      newProperties.put(entry.getKey(), entry.getValue());
+    }
+    return new Configuration(newProperties);
+  }
+
+  public String getConf(String name) {
+    return properties.get(name);
+  }
+
+  public void setConf(String name, String value) {
+    properties.put(name, value);
+  }
+
+  public void setConf(String name, String value, String defaultValue) {
+    if (value == null) {
+      properties.put(name, defaultValue);
+    } else {
+      properties.put(name, value);
+    }
+  }
+
+  @Override
+  public Iterator<Entry<String, String>> iterator() {
+    return properties.entrySet().iterator();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/dataFile2.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/dataFile2.txt b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/dataFile2.txt
new file mode 100644
index 0000000..e59db94
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/dataFile2.txt
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ivory.entity.common;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class DateValidator {
+
+	private Pattern pattern;
+	private Matcher matcher;
+
+	private static final String DATE_PATTERN = "(2\\d\\d\\d|19\\d\\d)-(0[1-9]|1[012])-(0[1-9]|1[0-9]|2[0-9]|3[01])T([0-1][0-9]|2[0-3]):([0-5][0-9])Z";
+
+	public DateValidator() {
+		pattern = Pattern.compile(DATE_PATTERN);
+	}
+
+	/**
+	 * Validate date format with regular expression
+	 * 
+	 * @param date
+	 *            date address for validation
+	 * @return true valid date fromat, false invalid date format
+	 */
+	public boolean validate(final String date) {
+
+		matcher = pattern.matcher(date);
+
+		if (matcher.matches()) {
+
+			matcher.reset();
+
+			if (matcher.find()) {
+
+				int year = Integer.parseInt(matcher.group(1));
+				String month = matcher.group(2);
+				String day = matcher.group(3);
+
+				if (day.equals("31")
+						&& (month.equals("4") || month.equals("6")
+								|| month.equals("9") || month.equals("11")
+								|| month.equals("04") || month.equals("06") || month
+									.equals("09"))) {
+					return false; // only 1,3,5,7,8,10,12 has 31 days
+				} else if (month.equals("2") || month.equals("02")) {
+					// leap year
+					if (year % 4 == 0) {
+						if (day.equals("30") || day.equals("31")) {
+							return false;
+						} else {
+							return true;
+						}
+					} else {
+						if (day.equals("29") || day.equals("30")
+								|| day.equals("31")) {
+							return false;
+						} else {
+							return true;
+						}
+					}
+				} else {
+					return true;
+				}
+			} else {
+				return false;
+			}
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/dataFile3.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/dataFile3.txt b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/dataFile3.txt
new file mode 100644
index 0000000..67f9d4c
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/2ndLateData/dataFile3.txt
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ivory.entity.common;
+
+import java.util.Calendar;
+
+public enum TimeUnit {
+    MINUTE(Calendar.MINUTE), HOUR(Calendar.HOUR), DAY(Calendar.DATE), MONTH(Calendar.MONTH), END_OF_DAY(Calendar.DATE), END_OF_MONTH(
+        Calendar.MONTH), NONE(-1);
+
+    private int calendarUnit;
+
+    private TimeUnit(int calendarUnit) {
+        this.calendarUnit = calendarUnit;
+    }
+
+    public int getCalendarUnit() {
+        return calendarUnit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/resources/OozieExampleInputData/lateData/log_15.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/lateData/log_15.txt b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/lateData/log_15.txt
deleted file mode 100644
index f3f94c5..0000000
--- a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/lateData/log_15.txt
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-[15] LOG!!!!!!
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/EntityInstanceMessage.java.ignore
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/EntityInstanceMessage.java.ignore b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/EntityInstanceMessage.java.ignore
deleted file mode 100644
index f992f21..0000000
--- a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/EntityInstanceMessage.java.ignore
+++ /dev/null
@@ -1,412 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.ivory.messaging;
-//
-//import org.apache.hadoop.conf.Configuration;
-//import org.apache.hadoop.fs.FileSystem;
-//import org.apache.hadoop.fs.Path;
-//import org.apache.hadoop.io.IOUtils;
-//import org.apache.log4j.Logger;
-//
-//import java.io.ByteArrayOutputStream;
-//import java.io.IOException;
-//import java.io.InputStream;
-//import java.util.HashMap;
-//import java.util.Map;
-//
-///**
-// * Value Object which is stored in JMS Topic as TextMessage
-// *
-// */
-//public class EntityInstanceMessage {
-//
-//	private final Map<ARG, String> msgs = new HashMap<ARG, String>();
-//	private static final String MSG_SEPERATOR = "$";
-//	private static final int ARG_LENGTH = EntityInstanceMessage.ARG.values().length;
-//	private static final Logger LOG = Logger
-//			.getLogger(EntityInstanceMessage.class);
-//	private static final String IVORY_PROCESS_TOPIC_NAME = "IVORY.PROCESS.TOPIC";
-//
-//	/**
-//	 * Enum for arguments that are used in coordinators to pass arguments to
-//	 * parent workflow
-//	 */
-//
-//	public enum entityOperation {
-//		GENERATE, DELETE, ARCHIVE, REPLICATE, CHMOD
-//	}
-//
-//	public enum ARG {
-//		PROCESS_NAME(0, "entityName"), FEED_NAME(1, "feedNames"), FEED_INSTANCE_PATH(
-//				2, "feedInstancePaths"), WORKFLOW_ID(3, "workflowId"), RUN_ID(
-//				4, "runId"), NOMINAL_TIME(5, "nominalTime"), TIME_STAMP(6,
-//				"timeStamp"), BROKER_URL(7, "brokerUrl"), BROKER_IMPL_CLASS(8,
-//				"brokerImplClass"), ENTITY_TYPE(9, "entityType"), OPERATION(10,
-//				"operation"), LOG_FILE(11, "logFile"), TOPIC_NAME(12,
-//				"topicName"), STATUS(13, "status"), BROKER_TTL(14, "broker.ttlInMins");
-//
-//		private int argOrder;
-//		private String argName;
-//
-//		private ARG(int argOrder, String argName) {
-//			this.argOrder = argOrder;
-//			this.argName = argName;
-//		}
-//
-//		public int ORDER() {
-//			return this.argOrder;
-//		}
-//
-//		public String NAME() {
-//			return this.argName;
-//		}
-//	}
-//
-//	public String getProcessName() {
-//		return this.msgs.get(ARG.PROCESS_NAME);
-//	}
-//
-//	public void setProcessName(String processName) {
-//		this.msgs.put(ARG.PROCESS_NAME, processName);
-//	}
-//
-//	public String getTopicName() {
-//		return this.msgs.get(ARG.TOPIC_NAME);
-//	}
-//
-//	public void setTopicName(String topicName) {
-//		this.msgs.put(ARG.TOPIC_NAME, topicName);
-//	}
-//
-//	public String getFeedName() {
-//		return this.msgs.get(ARG.FEED_NAME);
-//	}
-//
-//	public void setFeedName(String feedName) {
-//		this.msgs.put(ARG.FEED_NAME, feedName);
-//	}
-//
-//	public String getFeedInstancePath() {
-//		return this.msgs.get(ARG.FEED_INSTANCE_PATH);
-//	}
-//
-//	public void setFeedInstancePath(String feedInstancePath) {
-//		this.msgs.put(ARG.FEED_INSTANCE_PATH, feedInstancePath);
-//	}
-//
-//	public String getWorkflowId() {
-//		return this.msgs.get(ARG.WORKFLOW_ID);
-//	}
-//
-//	public void setWorkflowId(String workflowId) {
-//		this.msgs.put(ARG.WORKFLOW_ID, workflowId);
-//	}
-//
-//	public String getRunId() {
-//		return this.msgs.get(ARG.RUN_ID);
-//	}
-//
-//	public void setRunId(String runId) {
-//		this.msgs.put(ARG.RUN_ID, runId);
-//	}
-//
-//	public String getNominalTime() {
-//		return this.msgs.get(ARG.NOMINAL_TIME);
-//	}
-//
-//	public void setNominalTime(String nominalTime) {
-//		this.msgs.put(ARG.NOMINAL_TIME, nominalTime);
-//	}
-//
-//	public String getTimeStamp() {
-//		return this.msgs.get(ARG.TIME_STAMP);
-//	}
-//
-//	public void setTimeStamp(String timeStamp) {
-//		this.msgs.put(ARG.TIME_STAMP, timeStamp);
-//	}
-//
-//	public String getBrokerUrl() {
-//		return this.msgs.get(ARG.BROKER_URL);
-//	}
-//
-//	public void setBrokerUrl(String brokerUrl) {
-//		this.msgs.put(ARG.BROKER_URL, brokerUrl);
-//	}
-//
-//	public String getBrokerImplClass() {
-//		return this.msgs.get(ARG.BROKER_IMPL_CLASS);
-//	}
-//
-//	public void setBrokerImplClass(String brokerImplClass) {
-//		this.msgs.put(ARG.BROKER_IMPL_CLASS, brokerImplClass);
-//	}
-//
-//	public String getEntityType() {
-//		return this.msgs.get(ARG.ENTITY_TYPE);
-//	}
-//
-//	public void setEntityType(String entityType) {
-//		this.msgs.put(ARG.ENTITY_TYPE, entityType);
-//	}
-//
-//	public String getOperation() {
-//		return this.msgs.get(ARG.OPERATION);
-//	}
-//
-//	public void setOperation(String operation) {
-//		this.msgs.put(ARG.OPERATION, operation);
-//	}
-//
-//	public String getLogFile() {
-//		return this.msgs.get(ARG.LOG_FILE);
-//	}
-//
-//	public void setLogFile(String logFile) {
-//		this.msgs.put(ARG.LOG_FILE, logFile);
-//	}
-//
-//	public String getStatus() {
-//		return this.msgs.get(ARG.STATUS);
-//	}
-//
-//	public void setStatus(String status) {
-//		this.msgs.put(ARG.STATUS, status);
-//	}
-//
-//	public String getBrokerTTL() {
-//		return this.msgs.get(ARG.BROKER_TTL);
-//	}
-//
-//	public void setBrokerTTL(String brokerTTL) {
-//		this.msgs.put(ARG.BROKER_TTL, brokerTTL);
-//	}
-//
-//	@Override
-//	public String toString() {
-//		if (getEntityType().equalsIgnoreCase("PROCESS")
-//				&& getTopicName().equals(IVORY_PROCESS_TOPIC_NAME)) {
-//			return getIvoryMessage();
-//		}
-//		if (getEntityType().equalsIgnoreCase("FEED")) {
-//			return getFeedMessage();
-//		}
-//		return getProcessMessage();
-//
-//	}
-//
-//	private String getProcessMessage() {
-//		return getProcessName() + MSG_SEPERATOR + getFeedName() + MSG_SEPERATOR
-//				+ getFeedInstancePath() + MSG_SEPERATOR + getWorkflowId()
-//				+ MSG_SEPERATOR + getRunId() + MSG_SEPERATOR + getNominalTime()
-//				+ MSG_SEPERATOR + getTimeStamp();
-//	}
-//
-//	private String getIvoryMessage() {
-//		return getProcessName() + MSG_SEPERATOR + getFeedName() + MSG_SEPERATOR
-//				+ getFeedInstancePath() + MSG_SEPERATOR + getWorkflowId()
-//				+ MSG_SEPERATOR + getRunId() + MSG_SEPERATOR + getNominalTime()
-//				+ MSG_SEPERATOR + getTimeStamp() + MSG_SEPERATOR + getStatus();
-//	}
-//
-//	private String getFeedMessage() {
-//		return getFeedName() + MSG_SEPERATOR
-//				+ getFeedInstancePath() + MSG_SEPERATOR + getOperation()
-//				+ MSG_SEPERATOR + getWorkflowId() + MSG_SEPERATOR + getRunId()
-//				+ MSG_SEPERATOR + getNominalTime() + MSG_SEPERATOR
-//				+ getTimeStamp();
-//	}
-//
-//	/**
-//	 *
-//	 * @param args
-//	 *            - String array passed from oozie action for jms messaging
-//	 * @return ProcessMessage - Value object which is stored in JMS topic
-//	 */
-//	public static EntityInstanceMessage[] argsToMessage(String[] args) {
-//
-//		assert args.length == ARG_LENGTH : "Required number of arguments: "
-//				+ ARG_LENGTH;
-//
-//		String[] feedNames = getFeedNames(args);
-//
-//		String[] feedPaths;
-//		try {
-//			feedPaths = getFeedPaths(args);
-//		} catch (IOException e) {
-//			LOG.error("Error getting instance paths: ", e);
-//			throw new RuntimeException(e);
-//		}
-//
-//		EntityInstanceMessage[] processMessages = new EntityInstanceMessage[feedPaths.length];
-//		for (int i = 0; i < feedPaths.length; i++) {
-//			EntityInstanceMessage instanceMessage = new EntityInstanceMessage();
-//			instanceMessage
-//					.setProcessName(args[EntityInstanceMessage.ARG.PROCESS_NAME
-//							.ORDER()]);
-//			if (args[EntityInstanceMessage.ARG.ENTITY_TYPE.ORDER()]
-//					.equalsIgnoreCase("PROCESS")) {
-//				instanceMessage.setFeedName(feedNames[i]);
-//			} else {
-//				instanceMessage
-//						.setFeedName(args[EntityInstanceMessage.ARG.FEED_NAME
-//								.ORDER()]);
-//			}
-//			instanceMessage.setFeedInstancePath(feedPaths[i]);
-//			instanceMessage
-//					.setWorkflowId(args[EntityInstanceMessage.ARG.WORKFLOW_ID
-//							.ORDER()]);
-//			instanceMessage.setRunId(args[EntityInstanceMessage.ARG.RUN_ID
-//					.ORDER()]);
-//			instanceMessage
-//					.setNominalTime(args[EntityInstanceMessage.ARG.NOMINAL_TIME
-//							.ORDER()]);
-//			instanceMessage
-//					.setTimeStamp(args[EntityInstanceMessage.ARG.TIME_STAMP
-//							.ORDER()]);
-//			instanceMessage
-//					.setBrokerUrl(args[EntityInstanceMessage.ARG.BROKER_URL
-//							.ORDER()]);
-//			instanceMessage
-//					.setBrokerImplClass(args[EntityInstanceMessage.ARG.BROKER_IMPL_CLASS
-//							.ORDER()]);
-//			instanceMessage
-//					.setEntityType(args[EntityInstanceMessage.ARG.ENTITY_TYPE
-//							.ORDER()]);
-//			instanceMessage
-//					.setOperation(args[EntityInstanceMessage.ARG.OPERATION
-//							.ORDER()]);
-//			instanceMessage.setLogFile(args[EntityInstanceMessage.ARG.LOG_FILE
-//					.ORDER()]);
-//			instanceMessage
-//					.setTopicName(args[EntityInstanceMessage.ARG.TOPIC_NAME
-//							.ORDER()]);
-//			instanceMessage.setStatus(args[EntityInstanceMessage.ARG.STATUS
-//					.ORDER()]);
-//			instanceMessage.setBrokerTTL(args[EntityInstanceMessage.ARG.BROKER_TTL
-//			           					.ORDER()]);
-//
-//			processMessages[i] = instanceMessage;
-//		}
-//		return processMessages;
-//	}
-//
-//	private static String[] getFeedNames(String[] args) {
-//		String topicName = args[ARG.TOPIC_NAME.argOrder];
-//		if (topicName.equals(IVORY_PROCESS_TOPIC_NAME)) {
-//			return new String[] { args[EntityInstanceMessage.ARG.FEED_NAME
-//					.ORDER()] };
-//		}
-//		return args[EntityInstanceMessage.ARG.FEED_NAME.ORDER()].split(",");
-//	}
-//
-//	private static String[] getFeedPaths(String[] args) throws IOException {
-//		String entityType = args[EntityInstanceMessage.ARG.ENTITY_TYPE.ORDER()];
-//		String topicName = args[EntityInstanceMessage.ARG.TOPIC_NAME.ORDER()];
-//
-//		if (entityType.equalsIgnoreCase("PROCESS")
-//				&& topicName.equals(IVORY_PROCESS_TOPIC_NAME)) {
-//			LOG.debug("Returning instance paths for Ivory Topic: "
-//					+ args[EntityInstanceMessage.ARG.FEED_INSTANCE_PATH.ORDER()]);
-//			return new String[] { args[EntityInstanceMessage.ARG.FEED_INSTANCE_PATH
-//					.ORDER()] };
-//		}
-//
-//		if (entityType.equalsIgnoreCase("PROCESS")) {
-//			LOG.debug("Returning instance paths for process: "
-//					+ args[EntityInstanceMessage.ARG.FEED_INSTANCE_PATH.ORDER()]);
-//			return args[EntityInstanceMessage.ARG.FEED_INSTANCE_PATH.ORDER()]
-//					.split(",");
-//		}
-//		//
-//		Path logFile = new Path(
-//				args[EntityInstanceMessage.ARG.LOG_FILE.ORDER()]);
-//		FileSystem fs = FileSystem.get(logFile.toUri(), new Configuration());
-//		ByteArrayOutputStream writer = new ByteArrayOutputStream();
-//		InputStream instance = fs.open(logFile);
-//		IOUtils.copyBytes(instance, writer, 4096, true);
-//		String[] instancePaths = writer.toString().split("=");
-//		if (instancePaths.length == 1) {
-//			LOG.debug("Returning 0 instance paths for feed ");
-//			return new String[0];
-//		} else {
-//			LOG.debug("Returning instance paths for feed " + instancePaths[1]);
-//			return instancePaths[1].split(",");
-//		}
-//
-//	}
-//
-//	/**
-//	 *
-//	 * @param instanceMessages
-//	 *            - value object which is stored in JMS topic as TextMessage
-//	 * @return - String array.
-//	 */
-//	public static String[] messageToArgs(
-//			EntityInstanceMessage[] instanceMessages) {
-//		String[] args = new String[ARG_LENGTH];
-//
-//		args[EntityInstanceMessage.ARG.PROCESS_NAME.ORDER()] = instanceMessages[0]
-//				.getProcessName();
-//		StringBuilder feedNames = new StringBuilder();
-//		StringBuilder feedPaths = new StringBuilder();
-//
-//		for (EntityInstanceMessage instanceMessage : instanceMessages) {
-//			feedNames.append(instanceMessage.getFeedName()).append(",");
-//			feedPaths.append(instanceMessage.getFeedInstancePath()).append(",");
-//		}
-//		if (instanceMessages[0].getEntityType().equalsIgnoreCase("PROCESS")) {
-//			args[EntityInstanceMessage.ARG.FEED_NAME.ORDER()] = feedNames
-//					.toString();
-//
-//		} else {
-//			args[EntityInstanceMessage.ARG.FEED_NAME.ORDER()] = instanceMessages[0]
-//					.getFeedName();
-//		}
-//		args[EntityInstanceMessage.ARG.FEED_INSTANCE_PATH.ORDER()] = feedPaths
-//				.toString();
-//		args[EntityInstanceMessage.ARG.WORKFLOW_ID.ORDER()] = instanceMessages[0]
-//				.getWorkflowId();
-//		args[EntityInstanceMessage.ARG.RUN_ID.ORDER()] = instanceMessages[0]
-//				.getRunId();
-//		args[EntityInstanceMessage.ARG.NOMINAL_TIME.ORDER()] = instanceMessages[0]
-//				.getNominalTime();
-//		args[EntityInstanceMessage.ARG.TIME_STAMP.ORDER()] = instanceMessages[0]
-//				.getTimeStamp();
-//		args[EntityInstanceMessage.ARG.BROKER_URL.ORDER()] = instanceMessages[0]
-//				.getBrokerUrl();
-//		args[EntityInstanceMessage.ARG.BROKER_IMPL_CLASS.ORDER()] = instanceMessages[0]
-//				.getBrokerImplClass();
-//		args[EntityInstanceMessage.ARG.ENTITY_TYPE.ORDER()] = instanceMessages[0]
-//				.getEntityType();
-//		args[EntityInstanceMessage.ARG.OPERATION.ORDER()] = instanceMessages[0]
-//				.getOperation();
-//		args[EntityInstanceMessage.ARG.LOG_FILE.ORDER()] = instanceMessages[0]
-//				.getLogFile();
-//		args[EntityInstanceMessage.ARG.TOPIC_NAME.ORDER()] = instanceMessages[0]
-//				.getTopicName();
-//		args[EntityInstanceMessage.ARG.STATUS.ORDER()] = instanceMessages[0]
-//				.getStatus();
-//		args[EntityInstanceMessage.ARG.BROKER_TTL.ORDER()] = instanceMessages[0]
-//				.getBrokerTTL();
-//
-//		return args;
-//	}
-//
-//}

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/EntityInstanceMessageCreator.java.ignore
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/EntityInstanceMessageCreator.java.ignore b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/EntityInstanceMessageCreator.java.ignore
deleted file mode 100644
index f7e8bbb..0000000
--- a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/EntityInstanceMessageCreator.java.ignore
+++ /dev/null
@@ -1,61 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.ivory.messaging;
-//
-//import javax.jms.JMSException;
-//import javax.jms.Session;
-//import javax.jms.TextMessage;
-//
-//import org.apache.log4j.Logger;
-//
-///**
-// * Ivory JMS message creator- creates JMS TextMessage
-// */
-//public class EntityInstanceMessageCreator  {
-//
-//	private static final Logger LOG = Logger
-//			.getLogger(EntityInstanceMessageCreator.class);
-//
-//	private TextMessage textMessage;
-//
-//	private final EntityInstanceMessage args;
-//
-//	public EntityInstanceMessageCreator(EntityInstanceMessage args) {
-//		this.args = args;
-//	}
-//
-//	public TextMessage createMessage(Session session) throws JMSException {
-//		this.textMessage = session.createTextMessage();
-//		this.textMessage.setText(this.args.toString());
-//		LOG.debug("Sending Message: " + this.textMessage.getText());
-//		// System.out.println("Sending Message: " + this.textMessage);
-//		return this.textMessage;
-//	}
-//
-//	@Override
-//	public String toString() {
-//		try {
-//			return this.textMessage.getText();
-//		} catch (JMSException e) {
-//			return e.getMessage();
-//		}
-//
-//	}
-//
-//}

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/MessageProducer.java.ignore
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/MessageProducer.java.ignore b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/MessageProducer.java.ignore
deleted file mode 100644
index 7234f52..0000000
--- a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/MessageProducer.java.ignore
+++ /dev/null
@@ -1,139 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.ivory.messaging;
-//
-//import java.lang.reflect.InvocationTargetException;
-//
-//import javax.jms.Connection;
-//import javax.jms.ConnectionFactory;
-//import javax.jms.DeliveryMode;
-//import javax.jms.JMSException;
-//import javax.jms.Session;
-//import javax.jms.Topic;
-//
-//import org.apache.log4j.Logger;
-//
-///**
-// * Default Ivory Message Producer The configuration are loaded from
-// * jms-beans.xml
-// */
-//public class MessageProducer {
-//
-//	private Connection connection;
-//	private static final Logger LOG = Logger.getLogger(MessageProducer.class);
-//	private static final long DEFAULT_TTL = 3 * 24 * 60 * 60 * 1000;
-//
-//	/**
-//	 *
-//	 * @param arguments
-//	 *            - Accepts a Message to be send to JMS topic, creates a new
-//	 *            Topic based on topic name if it does not exist or else
-//	 *            existing topic with the same name is used to send the message.
-//	 * @throws JMSException
-//	 */
-//	protected void sendMessage(EntityInstanceMessage entityInstanceMessage)
-//			throws JMSException {
-//
-//		Session session = connection.createSession(false,
-//				Session.AUTO_ACKNOWLEDGE);
-//		Topic entityTopic = session.createTopic(entityInstanceMessage
-//				.getTopicName());
-//		javax.jms.MessageProducer producer = session
-//				.createProducer(entityTopic);
-//		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-//		long messageTTL = DEFAULT_TTL;
-//		try {
-//			long messageTTLinMins = Long.parseLong(entityInstanceMessage
-//					.getBrokerTTL());
-//			messageTTL = messageTTLinMins * 60 * 1000;
-//		} catch (NumberFormatException e) {
-//			LOG.error("Error in parsing broker.ttl, setting TTL to:"
-//					+ DEFAULT_TTL+ " milli-seconds");
-//		}
-//		producer.setTimeToLive(messageTTL);
-//		producer.send(new EntityInstanceMessageCreator(entityInstanceMessage)
-//				.createMessage(session));
-//
-//	}
-//
-//	/**
-//	 *
-//	 * @param args
-//	 *            - array of Strings, which will be used to create TextMessage
-//	 */
-//	public static void main(String[] args) {
-//		debug(args);
-//		EntityInstanceMessage[] entityInstanceMessage = EntityInstanceMessage
-//				.argsToMessage(args);
-//		if (entityInstanceMessage.length == 0) {
-//			LOG.warn("No operation on output feed");
-//			return;
-//		}
-//
-//		MessageProducer ivoryMessageProducer = new MessageProducer();
-//		try {
-//			ivoryMessageProducer.createAndStartConnection(
-//					args[EntityInstanceMessage.ARG.BROKER_IMPL_CLASS.ORDER()],
-//					"", "", entityInstanceMessage[0].getBrokerUrl());
-//			for (EntityInstanceMessage processMessage : entityInstanceMessage) {
-//				ivoryMessageProducer.sendMessage(processMessage);
-//			}
-//		} catch (JMSException e) {
-//			LOG.error(e);
-//			e.printStackTrace();
-//		} catch (Exception e) {
-//			LOG.error(e);
-//			e.printStackTrace();
-//		} finally {
-//			try {
-//				ivoryMessageProducer.connection.close();
-//			} catch (JMSException e) {
-//				e.printStackTrace();
-//			}
-//		}
-//
-//	}
-//
-//	private static void debug(String[] args) {
-//		if (LOG.isDebugEnabled()) {
-//			for (int i = 0; i < args.length; i++) {
-//				LOG.debug(args[i] + "::");
-//			}
-//		}
-//
-//	}
-//
-//	private void createAndStartConnection(String implementation,
-//			String userName, String password, String url) throws JMSException,
-//			ClassNotFoundException, IllegalArgumentException,
-//			SecurityException, InstantiationException, IllegalAccessException,
-//			InvocationTargetException, NoSuchMethodException {
-//
-//		Class<ConnectionFactory> clazz = (Class<ConnectionFactory>) MessageProducer.class
-//				.getClassLoader().loadClass(implementation);
-//
-//		ConnectionFactory connectionFactory = clazz.getConstructor(
-//				String.class, String.class, String.class).newInstance(userName,
-//				password, url);
-//
-//		connection = connectionFactory.createConnection();
-//		connection.start();
-//	}
-//
-//}

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile.properties
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile.properties b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile.properties
new file mode 100644
index 0000000..c9649d3
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile.properties
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+
+#Global configuration for Ivory JMS, default to use embedded ActiveMQ.
+
+#Defualt Active MQ url
+#ivory.broker.url = tcp://localhost:61616?daemon=true
+
+#Embedded
+ivory.broker.url = vm://localhost?broker.useJmx=false&broker.persistent=true
+
+
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile.xml b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile.xml
new file mode 100644
index 0000000..e047aaf
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+  <appender name="console" class="org.apache.log4j.ConsoleAppender">
+    <param name="Target" value="System.out"/>
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d %-5p - %m (%c{1}:%L)%n"/>
+    </layout>
+  </appender>
+
+  <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
+    <param name="File" value="/var/log/ivory/application.log"/>
+    <param name="Append" value="true"/>
+    <param name="Threshold" value="debug"/>
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d %-5p - %m (%c{1}:%L)%n"/>
+    </layout>
+  </appender>
+
+  <appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender">
+    <param name="File" value="/var/log/ivory/audit.log"/>
+    <param name="Append" value="true"/>
+    <param name="Threshold" value="debug"/>
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d %-5p - %m%n"/>
+    </layout>
+  </appender>
+
+  <logger name="org.apache.ivory" additivity="false">
+    <level value="debug"/>
+    <appender-ref ref="console" />
+  </logger>
+
+  <logger name="AUDIT">
+    <level value="info"/>
+    <appender-ref ref="AUDIT" />
+  </logger>
+
+  <root>
+    <priority value ="info" />
+    <appender-ref ref="console" />
+  </root>
+
+</log4j:configuration>

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile1.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile1.txt b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile1.txt
new file mode 100644
index 0000000..f7e8bbb
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile1.txt
@@ -0,0 +1,61 @@
+///**
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.ivory.messaging;
+//
+//import javax.jms.JMSException;
+//import javax.jms.Session;
+//import javax.jms.TextMessage;
+//
+//import org.apache.log4j.Logger;
+//
+///**
+// * Ivory JMS message creator- creates JMS TextMessage
+// */
+//public class EntityInstanceMessageCreator  {
+//
+//	private static final Logger LOG = Logger
+//			.getLogger(EntityInstanceMessageCreator.class);
+//
+//	private TextMessage textMessage;
+//
+//	private final EntityInstanceMessage args;
+//
+//	public EntityInstanceMessageCreator(EntityInstanceMessage args) {
+//		this.args = args;
+//	}
+//
+//	public TextMessage createMessage(Session session) throws JMSException {
+//		this.textMessage = session.createTextMessage();
+//		this.textMessage.setText(this.args.toString());
+//		LOG.debug("Sending Message: " + this.textMessage.getText());
+//		// System.out.println("Sending Message: " + this.textMessage);
+//		return this.textMessage;
+//	}
+//
+//	@Override
+//	public String toString() {
+//		try {
+//			return this.textMessage.getText();
+//		} catch (JMSException e) {
+//			return e.getMessage();
+//		}
+//
+//	}
+//
+//}

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile2.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile2.txt b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile2.txt
new file mode 100644
index 0000000..f992f21
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile2.txt
@@ -0,0 +1,412 @@
+///**
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.ivory.messaging;
+//
+//import org.apache.hadoop.conf.Configuration;
+//import org.apache.hadoop.fs.FileSystem;
+//import org.apache.hadoop.fs.Path;
+//import org.apache.hadoop.io.IOUtils;
+//import org.apache.log4j.Logger;
+//
+//import java.io.ByteArrayOutputStream;
+//import java.io.IOException;
+//import java.io.InputStream;
+//import java.util.HashMap;
+//import java.util.Map;
+//
+///**
+// * Value Object which is stored in JMS Topic as TextMessage
+// *
+// */
+//public class EntityInstanceMessage {
+//
+//	private final Map<ARG, String> msgs = new HashMap<ARG, String>();
+//	private static final String MSG_SEPERATOR = "$";
+//	private static final int ARG_LENGTH = EntityInstanceMessage.ARG.values().length;
+//	private static final Logger LOG = Logger
+//			.getLogger(EntityInstanceMessage.class);
+//	private static final String IVORY_PROCESS_TOPIC_NAME = "IVORY.PROCESS.TOPIC";
+//
+//	/**
+//	 * Enum for arguments that are used in coordinators to pass arguments to
+//	 * parent workflow
+//	 */
+//
+//	public enum entityOperation {
+//		GENERATE, DELETE, ARCHIVE, REPLICATE, CHMOD
+//	}
+//
+//	public enum ARG {
+//		PROCESS_NAME(0, "entityName"), FEED_NAME(1, "feedNames"), FEED_INSTANCE_PATH(
+//				2, "feedInstancePaths"), WORKFLOW_ID(3, "workflowId"), RUN_ID(
+//				4, "runId"), NOMINAL_TIME(5, "nominalTime"), TIME_STAMP(6,
+//				"timeStamp"), BROKER_URL(7, "brokerUrl"), BROKER_IMPL_CLASS(8,
+//				"brokerImplClass"), ENTITY_TYPE(9, "entityType"), OPERATION(10,
+//				"operation"), LOG_FILE(11, "logFile"), TOPIC_NAME(12,
+//				"topicName"), STATUS(13, "status"), BROKER_TTL(14, "broker.ttlInMins");
+//
+//		private int argOrder;
+//		private String argName;
+//
+//		private ARG(int argOrder, String argName) {
+//			this.argOrder = argOrder;
+//			this.argName = argName;
+//		}
+//
+//		public int ORDER() {
+//			return this.argOrder;
+//		}
+//
+//		public String NAME() {
+//			return this.argName;
+//		}
+//	}
+//
+//	public String getProcessName() {
+//		return this.msgs.get(ARG.PROCESS_NAME);
+//	}
+//
+//	public void setProcessName(String processName) {
+//		this.msgs.put(ARG.PROCESS_NAME, processName);
+//	}
+//
+//	public String getTopicName() {
+//		return this.msgs.get(ARG.TOPIC_NAME);
+//	}
+//
+//	public void setTopicName(String topicName) {
+//		this.msgs.put(ARG.TOPIC_NAME, topicName);
+//	}
+//
+//	public String getFeedName() {
+//		return this.msgs.get(ARG.FEED_NAME);
+//	}
+//
+//	public void setFeedName(String feedName) {
+//		this.msgs.put(ARG.FEED_NAME, feedName);
+//	}
+//
+//	public String getFeedInstancePath() {
+//		return this.msgs.get(ARG.FEED_INSTANCE_PATH);
+//	}
+//
+//	public void setFeedInstancePath(String feedInstancePath) {
+//		this.msgs.put(ARG.FEED_INSTANCE_PATH, feedInstancePath);
+//	}
+//
+//	public String getWorkflowId() {
+//		return this.msgs.get(ARG.WORKFLOW_ID);
+//	}
+//
+//	public void setWorkflowId(String workflowId) {
+//		this.msgs.put(ARG.WORKFLOW_ID, workflowId);
+//	}
+//
+//	public String getRunId() {
+//		return this.msgs.get(ARG.RUN_ID);
+//	}
+//
+//	public void setRunId(String runId) {
+//		this.msgs.put(ARG.RUN_ID, runId);
+//	}
+//
+//	public String getNominalTime() {
+//		return this.msgs.get(ARG.NOMINAL_TIME);
+//	}
+//
+//	public void setNominalTime(String nominalTime) {
+//		this.msgs.put(ARG.NOMINAL_TIME, nominalTime);
+//	}
+//
+//	public String getTimeStamp() {
+//		return this.msgs.get(ARG.TIME_STAMP);
+//	}
+//
+//	public void setTimeStamp(String timeStamp) {
+//		this.msgs.put(ARG.TIME_STAMP, timeStamp);
+//	}
+//
+//	public String getBrokerUrl() {
+//		return this.msgs.get(ARG.BROKER_URL);
+//	}
+//
+//	public void setBrokerUrl(String brokerUrl) {
+//		this.msgs.put(ARG.BROKER_URL, brokerUrl);
+//	}
+//
+//	public String getBrokerImplClass() {
+//		return this.msgs.get(ARG.BROKER_IMPL_CLASS);
+//	}
+//
+//	public void setBrokerImplClass(String brokerImplClass) {
+//		this.msgs.put(ARG.BROKER_IMPL_CLASS, brokerImplClass);
+//	}
+//
+//	public String getEntityType() {
+//		return this.msgs.get(ARG.ENTITY_TYPE);
+//	}
+//
+//	public void setEntityType(String entityType) {
+//		this.msgs.put(ARG.ENTITY_TYPE, entityType);
+//	}
+//
+//	public String getOperation() {
+//		return this.msgs.get(ARG.OPERATION);
+//	}
+//
+//	public void setOperation(String operation) {
+//		this.msgs.put(ARG.OPERATION, operation);
+//	}
+//
+//	public String getLogFile() {
+//		return this.msgs.get(ARG.LOG_FILE);
+//	}
+//
+//	public void setLogFile(String logFile) {
+//		this.msgs.put(ARG.LOG_FILE, logFile);
+//	}
+//
+//	public String getStatus() {
+//		return this.msgs.get(ARG.STATUS);
+//	}
+//
+//	public void setStatus(String status) {
+//		this.msgs.put(ARG.STATUS, status);
+//	}
+//
+//	public String getBrokerTTL() {
+//		return this.msgs.get(ARG.BROKER_TTL);
+//	}
+//
+//	public void setBrokerTTL(String brokerTTL) {
+//		this.msgs.put(ARG.BROKER_TTL, brokerTTL);
+//	}
+//
+//	@Override
+//	public String toString() {
+//		if (getEntityType().equalsIgnoreCase("PROCESS")
+//				&& getTopicName().equals(IVORY_PROCESS_TOPIC_NAME)) {
+//			return getIvoryMessage();
+//		}
+//		if (getEntityType().equalsIgnoreCase("FEED")) {
+//			return getFeedMessage();
+//		}
+//		return getProcessMessage();
+//
+//	}
+//
+//	private String getProcessMessage() {
+//		return getProcessName() + MSG_SEPERATOR + getFeedName() + MSG_SEPERATOR
+//				+ getFeedInstancePath() + MSG_SEPERATOR + getWorkflowId()
+//				+ MSG_SEPERATOR + getRunId() + MSG_SEPERATOR + getNominalTime()
+//				+ MSG_SEPERATOR + getTimeStamp();
+//	}
+//
+//	private String getIvoryMessage() {
+//		return getProcessName() + MSG_SEPERATOR + getFeedName() + MSG_SEPERATOR
+//				+ getFeedInstancePath() + MSG_SEPERATOR + getWorkflowId()
+//				+ MSG_SEPERATOR + getRunId() + MSG_SEPERATOR + getNominalTime()
+//				+ MSG_SEPERATOR + getTimeStamp() + MSG_SEPERATOR + getStatus();
+//	}
+//
+//	private String getFeedMessage() {
+//		return getFeedName() + MSG_SEPERATOR
+//				+ getFeedInstancePath() + MSG_SEPERATOR + getOperation()
+//				+ MSG_SEPERATOR + getWorkflowId() + MSG_SEPERATOR + getRunId()
+//				+ MSG_SEPERATOR + getNominalTime() + MSG_SEPERATOR
+//				+ getTimeStamp();
+//	}
+//
+//	/**
+//	 *
+//	 * @param args
+//	 *            - String array passed from oozie action for jms messaging
+//	 * @return ProcessMessage - Value object which is stored in JMS topic
+//	 */
+//	public static EntityInstanceMessage[] argsToMessage(String[] args) {
+//
+//		assert args.length == ARG_LENGTH : "Required number of arguments: "
+//				+ ARG_LENGTH;
+//
+//		String[] feedNames = getFeedNames(args);
+//
+//		String[] feedPaths;
+//		try {
+//			feedPaths = getFeedPaths(args);
+//		} catch (IOException e) {
+//			LOG.error("Error getting instance paths: ", e);
+//			throw new RuntimeException(e);
+//		}
+//
+//		EntityInstanceMessage[] processMessages = new EntityInstanceMessage[feedPaths.length];
+//		for (int i = 0; i < feedPaths.length; i++) {
+//			EntityInstanceMessage instanceMessage = new EntityInstanceMessage();
+//			instanceMessage
+//					.setProcessName(args[EntityInstanceMessage.ARG.PROCESS_NAME
+//							.ORDER()]);
+//			if (args[EntityInstanceMessage.ARG.ENTITY_TYPE.ORDER()]
+//					.equalsIgnoreCase("PROCESS")) {
+//				instanceMessage.setFeedName(feedNames[i]);
+//			} else {
+//				instanceMessage
+//						.setFeedName(args[EntityInstanceMessage.ARG.FEED_NAME
+//								.ORDER()]);
+//			}
+//			instanceMessage.setFeedInstancePath(feedPaths[i]);
+//			instanceMessage
+//					.setWorkflowId(args[EntityInstanceMessage.ARG.WORKFLOW_ID
+//							.ORDER()]);
+//			instanceMessage.setRunId(args[EntityInstanceMessage.ARG.RUN_ID
+//					.ORDER()]);
+//			instanceMessage
+//					.setNominalTime(args[EntityInstanceMessage.ARG.NOMINAL_TIME
+//							.ORDER()]);
+//			instanceMessage
+//					.setTimeStamp(args[EntityInstanceMessage.ARG.TIME_STAMP
+//							.ORDER()]);
+//			instanceMessage
+//					.setBrokerUrl(args[EntityInstanceMessage.ARG.BROKER_URL
+//							.ORDER()]);
+//			instanceMessage
+//					.setBrokerImplClass(args[EntityInstanceMessage.ARG.BROKER_IMPL_CLASS
+//							.ORDER()]);
+//			instanceMessage
+//					.setEntityType(args[EntityInstanceMessage.ARG.ENTITY_TYPE
+//							.ORDER()]);
+//			instanceMessage
+//					.setOperation(args[EntityInstanceMessage.ARG.OPERATION
+//							.ORDER()]);
+//			instanceMessage.setLogFile(args[EntityInstanceMessage.ARG.LOG_FILE
+//					.ORDER()]);
+//			instanceMessage
+//					.setTopicName(args[EntityInstanceMessage.ARG.TOPIC_NAME
+//							.ORDER()]);
+//			instanceMessage.setStatus(args[EntityInstanceMessage.ARG.STATUS
+//					.ORDER()]);
+//			instanceMessage.setBrokerTTL(args[EntityInstanceMessage.ARG.BROKER_TTL
+//			           					.ORDER()]);
+//
+//			processMessages[i] = instanceMessage;
+//		}
+//		return processMessages;
+//	}
+//
+//	private static String[] getFeedNames(String[] args) {
+//		String topicName = args[ARG.TOPIC_NAME.argOrder];
+//		if (topicName.equals(IVORY_PROCESS_TOPIC_NAME)) {
+//			return new String[] { args[EntityInstanceMessage.ARG.FEED_NAME
+//					.ORDER()] };
+//		}
+//		return args[EntityInstanceMessage.ARG.FEED_NAME.ORDER()].split(",");
+//	}
+//
+//	private static String[] getFeedPaths(String[] args) throws IOException {
+//		String entityType = args[EntityInstanceMessage.ARG.ENTITY_TYPE.ORDER()];
+//		String topicName = args[EntityInstanceMessage.ARG.TOPIC_NAME.ORDER()];
+//
+//		if (entityType.equalsIgnoreCase("PROCESS")
+//				&& topicName.equals(IVORY_PROCESS_TOPIC_NAME)) {
+//			LOG.debug("Returning instance paths for Ivory Topic: "
+//					+ args[EntityInstanceMessage.ARG.FEED_INSTANCE_PATH.ORDER()]);
+//			return new String[] { args[EntityInstanceMessage.ARG.FEED_INSTANCE_PATH
+//					.ORDER()] };
+//		}
+//
+//		if (entityType.equalsIgnoreCase("PROCESS")) {
+//			LOG.debug("Returning instance paths for process: "
+//					+ args[EntityInstanceMessage.ARG.FEED_INSTANCE_PATH.ORDER()]);
+//			return args[EntityInstanceMessage.ARG.FEED_INSTANCE_PATH.ORDER()]
+//					.split(",");
+//		}
+//		//
+//		Path logFile = new Path(
+//				args[EntityInstanceMessage.ARG.LOG_FILE.ORDER()]);
+//		FileSystem fs = FileSystem.get(logFile.toUri(), new Configuration());
+//		ByteArrayOutputStream writer = new ByteArrayOutputStream();
+//		InputStream instance = fs.open(logFile);
+//		IOUtils.copyBytes(instance, writer, 4096, true);
+//		String[] instancePaths = writer.toString().split("=");
+//		if (instancePaths.length == 1) {
+//			LOG.debug("Returning 0 instance paths for feed ");
+//			return new String[0];
+//		} else {
+//			LOG.debug("Returning instance paths for feed " + instancePaths[1]);
+//			return instancePaths[1].split(",");
+//		}
+//
+//	}
+//
+//	/**
+//	 *
+//	 * @param instanceMessages
+//	 *            - value object which is stored in JMS topic as TextMessage
+//	 * @return - String array.
+//	 */
+//	public static String[] messageToArgs(
+//			EntityInstanceMessage[] instanceMessages) {
+//		String[] args = new String[ARG_LENGTH];
+//
+//		args[EntityInstanceMessage.ARG.PROCESS_NAME.ORDER()] = instanceMessages[0]
+//				.getProcessName();
+//		StringBuilder feedNames = new StringBuilder();
+//		StringBuilder feedPaths = new StringBuilder();
+//
+//		for (EntityInstanceMessage instanceMessage : instanceMessages) {
+//			feedNames.append(instanceMessage.getFeedName()).append(",");
+//			feedPaths.append(instanceMessage.getFeedInstancePath()).append(",");
+//		}
+//		if (instanceMessages[0].getEntityType().equalsIgnoreCase("PROCESS")) {
+//			args[EntityInstanceMessage.ARG.FEED_NAME.ORDER()] = feedNames
+//					.toString();
+//
+//		} else {
+//			args[EntityInstanceMessage.ARG.FEED_NAME.ORDER()] = instanceMessages[0]
+//					.getFeedName();
+//		}
+//		args[EntityInstanceMessage.ARG.FEED_INSTANCE_PATH.ORDER()] = feedPaths
+//				.toString();
+//		args[EntityInstanceMessage.ARG.WORKFLOW_ID.ORDER()] = instanceMessages[0]
+//				.getWorkflowId();
+//		args[EntityInstanceMessage.ARG.RUN_ID.ORDER()] = instanceMessages[0]
+//				.getRunId();
+//		args[EntityInstanceMessage.ARG.NOMINAL_TIME.ORDER()] = instanceMessages[0]
+//				.getNominalTime();
+//		args[EntityInstanceMessage.ARG.TIME_STAMP.ORDER()] = instanceMessages[0]
+//				.getTimeStamp();
+//		args[EntityInstanceMessage.ARG.BROKER_URL.ORDER()] = instanceMessages[0]
+//				.getBrokerUrl();
+//		args[EntityInstanceMessage.ARG.BROKER_IMPL_CLASS.ORDER()] = instanceMessages[0]
+//				.getBrokerImplClass();
+//		args[EntityInstanceMessage.ARG.ENTITY_TYPE.ORDER()] = instanceMessages[0]
+//				.getEntityType();
+//		args[EntityInstanceMessage.ARG.OPERATION.ORDER()] = instanceMessages[0]
+//				.getOperation();
+//		args[EntityInstanceMessage.ARG.LOG_FILE.ORDER()] = instanceMessages[0]
+//				.getLogFile();
+//		args[EntityInstanceMessage.ARG.TOPIC_NAME.ORDER()] = instanceMessages[0]
+//				.getTopicName();
+//		args[EntityInstanceMessage.ARG.STATUS.ORDER()] = instanceMessages[0]
+//				.getStatus();
+//		args[EntityInstanceMessage.ARG.BROKER_TTL.ORDER()] = instanceMessages[0]
+//				.getBrokerTTL();
+//
+//		return args;
+//	}
+//
+//}

http://git-wip-us.apache.org/repos/asf/falcon/blob/1aac3a50/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile3.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile3.txt b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile3.txt
new file mode 100644
index 0000000..7234f52
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/OozieExampleInputData/normalInput/dataFile3.txt
@@ -0,0 +1,139 @@
+///**
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.ivory.messaging;
+//
+//import java.lang.reflect.InvocationTargetException;
+//
+//import javax.jms.Connection;
+//import javax.jms.ConnectionFactory;
+//import javax.jms.DeliveryMode;
+//import javax.jms.JMSException;
+//import javax.jms.Session;
+//import javax.jms.Topic;
+//
+//import org.apache.log4j.Logger;
+//
+///**
+// * Default Ivory Message Producer The configuration are loaded from
+// * jms-beans.xml
+// */
+//public class MessageProducer {
+//
+//	private Connection connection;
+//	private static final Logger LOG = Logger.getLogger(MessageProducer.class);
+//	private static final long DEFAULT_TTL = 3 * 24 * 60 * 60 * 1000;
+//
+//	/**
+//	 *
+//	 * @param arguments
+//	 *            - Accepts a Message to be send to JMS topic, creates a new
+//	 *            Topic based on topic name if it does not exist or else
+//	 *            existing topic with the same name is used to send the message.
+//	 * @throws JMSException
+//	 */
+//	protected void sendMessage(EntityInstanceMessage entityInstanceMessage)
+//			throws JMSException {
+//
+//		Session session = connection.createSession(false,
+//				Session.AUTO_ACKNOWLEDGE);
+//		Topic entityTopic = session.createTopic(entityInstanceMessage
+//				.getTopicName());
+//		javax.jms.MessageProducer producer = session
+//				.createProducer(entityTopic);
+//		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+//		long messageTTL = DEFAULT_TTL;
+//		try {
+//			long messageTTLinMins = Long.parseLong(entityInstanceMessage
+//					.getBrokerTTL());
+//			messageTTL = messageTTLinMins * 60 * 1000;
+//		} catch (NumberFormatException e) {
+//			LOG.error("Error in parsing broker.ttl, setting TTL to:"
+//					+ DEFAULT_TTL+ " milli-seconds");
+//		}
+//		producer.setTimeToLive(messageTTL);
+//		producer.send(new EntityInstanceMessageCreator(entityInstanceMessage)
+//				.createMessage(session));
+//
+//	}
+//
+//	/**
+//	 *
+//	 * @param args
+//	 *            - array of Strings, which will be used to create TextMessage
+//	 */
+//	public static void main(String[] args) {
+//		debug(args);
+//		EntityInstanceMessage[] entityInstanceMessage = EntityInstanceMessage
+//				.argsToMessage(args);
+//		if (entityInstanceMessage.length == 0) {
+//			LOG.warn("No operation on output feed");
+//			return;
+//		}
+//
+//		MessageProducer ivoryMessageProducer = new MessageProducer();
+//		try {
+//			ivoryMessageProducer.createAndStartConnection(
+//					args[EntityInstanceMessage.ARG.BROKER_IMPL_CLASS.ORDER()],
+//					"", "", entityInstanceMessage[0].getBrokerUrl());
+//			for (EntityInstanceMessage processMessage : entityInstanceMessage) {
+//				ivoryMessageProducer.sendMessage(processMessage);
+//			}
+//		} catch (JMSException e) {
+//			LOG.error(e);
+//			e.printStackTrace();
+//		} catch (Exception e) {
+//			LOG.error(e);
+//			e.printStackTrace();
+//		} finally {
+//			try {
+//				ivoryMessageProducer.connection.close();
+//			} catch (JMSException e) {
+//				e.printStackTrace();
+//			}
+//		}
+//
+//	}
+//
+//	private static void debug(String[] args) {
+//		if (LOG.isDebugEnabled()) {
+//			for (int i = 0; i < args.length; i++) {
+//				LOG.debug(args[i] + "::");
+//			}
+//		}
+//
+//	}
+//
+//	private void createAndStartConnection(String implementation,
+//			String userName, String password, String url) throws JMSException,
+//			ClassNotFoundException, IllegalArgumentException,
+//			SecurityException, InstantiationException, IllegalAccessException,
+//			InvocationTargetException, NoSuchMethodException {
+//
+//		Class<ConnectionFactory> clazz = (Class<ConnectionFactory>) MessageProducer.class
+//				.getClassLoader().loadClass(implementation);
+//
+//		ConnectionFactory connectionFactory = clazz.getConstructor(
+//				String.class, String.class, String.class).newInstance(userName,
+//				password, url);
+//
+//		connection = connectionFactory.createConnection();
+//		connection.start();
+//	}
+//
+//}