You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/04/26 17:50:25 UTC

[09/47] Fixes for Checkstyle

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/process/src/test/resources/config/late/late-process2.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/late/late-process2.xml b/process/src/test/resources/config/late/late-process2.xml
index a9d3576..bc507ad 100644
--- a/process/src/test/resources/config/late/late-process2.xml
+++ b/process/src/test/resources/config/late/late-process2.xml
@@ -16,8 +16,8 @@
   ~ See the License for the specific language governing permissions and
   ~ limitations under the License.
   -->
-<process name="late-process2"  xmlns="uri:falcon:process:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-	<!-- where -->
+<process name="late-process2" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
     <clusters>
         <cluster name="late-cluster">
             <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
@@ -32,25 +32,26 @@
 
     <!-- what -->
     <inputs>
-        <input name="impression" feed="late-feed1" start-instance="today(0,0)" end-instance="today(0,2)" /> 
-        <input name="clicks" feed="late-feed2" start-instance="yesterday(0,0)" end-instance="today(0,0)" partition="*/US"/>
+        <input name="impression" feed="late-feed1" start-instance="today(0,0)" end-instance="today(0,2)"/>
+        <input name="clicks" feed="late-feed2" start-instance="yesterday(0,0)" end-instance="today(0,0)"
+               partition="*/US"/>
     </inputs>
 
     <outputs>
-        <output name="clicksummary" feed="late-feed3" instance="today(0,0)" />
+        <output name="clicksummary" feed="late-feed3" instance="today(0,0)"/>
     </outputs>
 
     <!-- how -->
     <properties>
-    	<property name="procprop" value="procprop"/>
+        <property name="procprop" value="procprop"/>
     </properties>
-    
-    <workflow engine="oozie" path="/user/guest/workflow" />
 
-    <retry policy="periodic" delay="hours(10)" attempts="3" />
+    <workflow engine="oozie" path="/user/guest/workflow"/>
+
+    <retry policy="periodic" delay="hours(10)" attempts="3"/>
 
     <late-process policy="exp-backoff" delay="hours(1)">
-        <late-input feed="impression" workflow-path="hdfs://impression/late/workflow" />
-        <late-input feed="clicks" workflow-path="hdfs://clicks/late/workflow" />
+        <late-input feed="impression" workflow-path="hdfs://impression/late/workflow"/>
+        <late-input feed="clicks" workflow-path="hdfs://clicks/late/workflow"/>
     </late-process>
 </process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/process/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/process-0.1.xml b/process/src/test/resources/config/process/process-0.1.xml
index b5a7f50..91d5e0f 100644
--- a/process/src/test/resources/config/process/process-0.1.xml
+++ b/process/src/test/resources/config/process/process-0.1.xml
@@ -4,11 +4,11 @@
     License. You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0 ~ ~ Unless required by applicable law or agreed to in writing, software ~ distributed 
     under the License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing 
     permissions and ~ limitations under the License. -->
-<process name="sample" xmlns="uri:falcon:process:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+<process name="sample" xmlns="uri:falcon:process:0.1">
     <!-- where -->
     <clusters>
         <cluster name="corp">
-            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z" />
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
         </cluster>
     </clusters>
 
@@ -20,26 +20,26 @@
 
     <!-- what -->
     <inputs>
-        <input name="impression" feed="impressions" start="today(0,0)" end="today(0,2)" />
-        <input name="click" feed="clicks" start="yesterday(0,0)" end="latest(0)" partition="*/US" />
+        <input name="impression" feed="impressions" start="today(0,0)" end="today(0,2)"/>
+        <input name="click" feed="clicks" start="yesterday(0,0)" end="latest(0)" partition="*/US"/>
     </inputs>
 
     <outputs>
-        <output name="clicksummary" feed="impressions" instance="today(0,0)" />
+        <output name="clicksummary" feed="impressions" instance="today(0,0)"/>
     </outputs>
 
     <!-- how -->
     <properties>
-        <property name="procprop" value="procprop" />
-        <property name="mapred.job.priority" value="LOW" />
+        <property name="procprop" value="procprop"/>
+        <property name="mapred.job.priority" value="LOW"/>
     </properties>
 
-    <workflow engine="oozie" path="/user/guest/workflow" />
+    <workflow engine="oozie" path="/user/guest/workflow"/>
 
-    <retry policy="periodic" delay="hours(10)" attempts="3" />
+    <retry policy="periodic" delay="hours(10)" attempts="3"/>
 
     <late-process policy="exp-backoff" delay="hours(1)">
-        <late-input input="impression" workflow-path="hdfs://impression/late/workflow" />
-        <late-input input="clicks" workflow-path="hdfs://clicks/late/workflow" />
+        <late-input input="impression" workflow-path="hdfs://impression/late/workflow"/>
+        <late-input input="clicks" workflow-path="hdfs://clicks/late/workflow"/>
     </late-process>
 </process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java b/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
index 969a59c..7dfd406 100644
--- a/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 public class CustomReplicator extends DistCp {
 
     private static Logger LOG = Logger.getLogger(CustomReplicator.class);
+
     /**
      * Public Constructor. Creates DistCp object with specified input-parameters.
      * (E.g. source-paths, target-location, etc.)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index a693d75..fc0b5ac 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -17,16 +17,7 @@
  */
 package org.apache.falcon.replication;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -39,31 +30,36 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
 public class FeedReplicator extends Configured implements Tool {
 
-	private static Logger LOG = Logger.getLogger(FeedReplicator.class);
+    private static Logger LOG = Logger.getLogger(FeedReplicator.class);
 
     public static void main(String[] args) throws Exception {
-		ToolRunner.run(new Configuration(), new FeedReplicator(), args);
-	}
+        ToolRunner.run(new Configuration(), new FeedReplicator(), args);
+    }
 
-	@Override
-	public int run(String[] args) throws Exception {
+    @Override
+    public int run(String[] args) throws Exception {
 
         DistCpOptions options = getDistCpOptions(args);
-        
+
         Configuration conf = this.getConf();
-		// inject wf configs
-		Path confPath = new Path("file:///"
-				+ System.getProperty("oozie.action.conf.xml"));
-
-		LOG.info(confPath + " found conf ? "
-				+ confPath.getFileSystem(conf).exists(confPath));
-		conf.addResource(confPath);
-        
-		DistCp distCp = new CustomReplicator(conf, options);
-		LOG.info("Started DistCp");
-		distCp.execute();
+        // inject wf configs
+        Path confPath = new Path("file:///"
+                + System.getProperty("oozie.action.conf.xml"));
+
+        LOG.info(confPath + " found conf ? "
+                + confPath.getFileSystem(conf).exists(confPath));
+        conf.addResource(confPath);
+
+        DistCp distCp = new CustomReplicator(conf, options);
+        LOG.info("Started DistCp");
+        distCp.execute();
 
         Path targetPath = options.getTargetPath();
         FileSystem fs = targetPath.getFileSystem(getConf());
@@ -79,28 +75,30 @@ public class FeedReplicator extends Configured implements Tool {
         String fixedPath = getFixedPath(relativePath);
 
         FileStatus[] files = fs.globStatus(new Path(targetPath.toString() + "/" + fixedPath));
-		if (files != null) {
-			for (FileStatus file : files) {
-            fs.create(new Path(file.getPath(), FileOutputCommitter.SUCCEEDED_FILE_NAME)).close();
-            LOG.info("Created " + new Path(file.getPath(), FileOutputCommitter.SUCCEEDED_FILE_NAME));
-			}
-		} else {
-			LOG.info("No files present in path: "
-					+ new Path(targetPath.toString() + "/" + fixedPath)
-							.toString());
-		}
-		LOG.info("Completed DistCp");
-		return 0;
-	}
+        if (files != null) {
+            for (FileStatus file : files) {
+                fs.create(new Path(file.getPath(), FileOutputCommitter.SUCCEEDED_FILE_NAME)).close();
+                LOG.info("Created " + new Path(file.getPath(), FileOutputCommitter.SUCCEEDED_FILE_NAME));
+            }
+        } else {
+            LOG.info("No files present in path: "
+                    + new Path(targetPath.toString() + "/" + fixedPath)
+                    .toString());
+        }
+        LOG.info("Completed DistCp");
+        return 0;
+    }
 
     private String getFixedPath(String relativePath) throws IOException {
         String[] patterns = relativePath.split("/");
         int part = patterns.length - 1;
         for (int index = patterns.length - 1; index >= 0; index--) {
             String pattern = patterns[index];
-            if (pattern.isEmpty()) continue;
+            if (pattern.isEmpty()) {
+                continue;
+            }
             Pattern r = FilteredCopyListing.getRegEx(pattern);
-            if (!r.toString().equals("(" + pattern + "/)|(" + pattern + "$)"))  {
+            if (!r.toString().equals("(" + pattern + "/)|(" + pattern + "$)")) {
                 continue;
             }
             part = index;
@@ -114,42 +112,42 @@ public class FeedReplicator extends Configured implements Tool {
     }
 
     public DistCpOptions getDistCpOptions(String[] args) throws ParseException {
-		Options options = new Options();
-		Option opt;
-		opt = new Option("maxMaps", true,
-				"max number of maps to use for this copy");
-		opt.setRequired(true);
-		options.addOption(opt);
+        Options options = new Options();
+        Option opt;
+        opt = new Option("maxMaps", true,
+                "max number of maps to use for this copy");
+        opt.setRequired(true);
+        options.addOption(opt);
 
         opt = new Option("sourcePaths", true,
-				"comma separtated list of source paths to be copied");
-		opt.setRequired(true);
-		options.addOption(opt);
+                "comma separtated list of source paths to be copied");
+        opt.setRequired(true);
+        options.addOption(opt);
 
         opt = new Option("targetPath", true, "target path");
-		opt.setRequired(true);
-		options.addOption(opt);
+        opt.setRequired(true);
+        options.addOption(opt);
 
-		CommandLine cmd = new GnuParser().parse(options, args);
-		String[] paths = cmd.getOptionValue("sourcePaths").trim().split(",");
-		List<Path> srcPaths = getPaths(paths);
-		String trgPath = cmd.getOptionValue("targetPath").trim();
+        CommandLine cmd = new GnuParser().parse(options, args);
+        String[] paths = cmd.getOptionValue("sourcePaths").trim().split(",");
+        List<Path> srcPaths = getPaths(paths);
+        String trgPath = cmd.getOptionValue("targetPath").trim();
 
-		DistCpOptions distcpOptions = new DistCpOptions(srcPaths, new Path(
-				trgPath));
+        DistCpOptions distcpOptions = new DistCpOptions(srcPaths, new Path(
+                trgPath));
         distcpOptions.setSyncFolder(true);
-		distcpOptions.setBlocking(true);
-		distcpOptions
-				.setMaxMaps(Integer.valueOf(cmd.getOptionValue("maxMaps")));
-
-		return distcpOptions;
-	}
-
-	private List<Path> getPaths(String[] paths) {
-		List<Path> listPaths = new ArrayList<Path>();
-		for (String path : paths) {
-			listPaths.add(new Path(path));
-		}
-		return listPaths;
-	}
+        distcpOptions.setBlocking(true);
+        distcpOptions
+                .setMaxMaps(Integer.valueOf(cmd.getOptionValue("maxMaps")));
+
+        return distcpOptions;
+    }
+
+    private List<Path> getPaths(String[] paths) {
+        List<Path> listPaths = new ArrayList<Path>();
+        for (String path : paths) {
+            listPaths.add(new Path(path));
+        }
+        return listPaths;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java b/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
index e1b6276..c1698e3 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
@@ -32,12 +32,18 @@ import java.util.regex.Pattern;
 public class FilteredCopyListing extends SimpleCopyListing {
     private static final Logger LOG = Logger.getLogger(FilteredCopyListing.class);
 
-    /** Default pattern character: Escape any special meaning. */
-    private static final char  PAT_ESCAPE = '\\';
-    /** Default pattern character: Any single character. */
-    private static final char  PAT_ANY = '.';
-    /** Default pattern character: Character set close. */
-    private static final char  PAT_SET_CLOSE = ']';
+    /**
+     * Default pattern character: Escape any special meaning.
+     */
+    private static final char PAT_ESCAPE = '\\';
+    /**
+     * Default pattern character: Any single character.
+     */
+    private static final char PAT_ANY = '.';
+    /**
+     * Default pattern character: Character set close.
+     */
+    private static final char PAT_SET_CLOSE = ']';
 
     private Pattern regex;
 
@@ -55,7 +61,9 @@ public class FilteredCopyListing extends SimpleCopyListing {
 
     @Override
     protected boolean shouldCopy(Path path, DistCpOptions options) {
-        if (path.getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) return false;
+        if (path.getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
+            return false;
+        }
         return regex == null || regex.matcher(path.toString()).find();
     }
 
@@ -74,8 +82,9 @@ public class FilteredCopyListing extends SimpleCopyListing {
 
         // Validate the pattern
         len = filePattern.length();
-        if (len == 0)
+        if (len == 0) {
             return null;
+        }
 
         setOpen = 0;
         setRange = false;
@@ -89,8 +98,9 @@ public class FilteredCopyListing extends SimpleCopyListing {
             if (pCh == PAT_ESCAPE) {
                 fileRegex.append(pCh);
                 i++;
-                if (i >= len)
+                if (i >= len) {
                     error("An escaped character does not present", filePattern, i);
+                }
                 pCh = filePattern.charAt(i);
             } else if (isJavaRegexSpecialChar(pCh)) {
                 fileRegex.append(PAT_ESCAPE);
@@ -121,8 +131,9 @@ public class FilteredCopyListing extends SimpleCopyListing {
                 error("Incomplete character set range", filePattern, i);
             } else if (pCh == PAT_SET_CLOSE && setOpen > 0) {
                 // End of a character set
-                if (setOpen < 2)
+                if (setOpen < 2) {
                     error("Unexpected end of set", filePattern, i);
+                }
                 setOpen = 0;
             } else if (setOpen > 0) {
                 // Normal character, or the end of a character set range
@@ -143,7 +154,7 @@ public class FilteredCopyListing extends SimpleCopyListing {
 
     private static void error(String s, String pattern, int pos) throws IOException {
         throw new IOException("Illegal file pattern: "
-                +s+ " for glob "+ pattern + " at " + pos);
+                + s + " for glob " + pattern + " at " + pos);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/replication/src/test/java/org/apache/falcon/repliation/FeedReplicatorTest.java
----------------------------------------------------------------------
diff --git a/replication/src/test/java/org/apache/falcon/repliation/FeedReplicatorTest.java b/replication/src/test/java/org/apache/falcon/repliation/FeedReplicatorTest.java
index f2ccfd8..1935e51 100644
--- a/replication/src/test/java/org/apache/falcon/repliation/FeedReplicatorTest.java
+++ b/replication/src/test/java/org/apache/falcon/repliation/FeedReplicatorTest.java
@@ -17,9 +17,9 @@
  */
 package org.apache.falcon.repliation;
 
+import org.apache.falcon.replication.FeedReplicator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.tools.DistCpOptions;
-import org.apache.falcon.replication.FeedReplicator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -37,9 +37,9 @@ public class FeedReplicatorTest {
          * <arg>-targetPath</arg><arg>${distcpTargetPaths}</arg>
          */
         FeedReplicator replicator = new FeedReplicator();
-        DistCpOptions options = replicator.getDistCpOptions(new String[] { "true", "-maxMaps", "5", "-sourcePaths",
-                "hdfs://localhost:8020/tmp/", "-targetPath",
-                "hdfs://localhost1:8020/tmp/" });
+        DistCpOptions options = replicator.getDistCpOptions(new String[]{"true", "-maxMaps", "5", "-sourcePaths",
+                                                                         "hdfs://localhost:8020/tmp/", "-targetPath",
+                                                                         "hdfs://localhost1:8020/tmp/"});
 
         List<Path> srcPaths = new ArrayList<Path>();
         srcPaths.add(new Path("hdfs://localhost:8020/tmp/"));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java
----------------------------------------------------------------------
diff --git a/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java b/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java
index dc17cc2..5054bf8 100644
--- a/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java
+++ b/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java
@@ -34,7 +34,9 @@ import org.testng.annotations.Test;
 
 import java.io.DataOutputStream;
 import java.net.URI;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 public class FilteredCopyListingTest {
 
@@ -69,8 +71,7 @@ public class FilteredCopyListingTest {
             fileSystem = FileSystem.getLocal(new Configuration());
             fileSystem.mkdirs(new Path(path));
             recordInExpectedValues(path);
-        }
-        finally {
+        } finally {
             IOUtils.cleanup(null, fileSystem);
         }
     }
@@ -80,8 +81,7 @@ public class FilteredCopyListingTest {
         try {
             fileSystem = FileSystem.getLocal(new Configuration());
             fileSystem.delete(new Path(path), true);
-        }
-        finally {
+        } finally {
             IOUtils.cleanup(null, fileSystem);
         }
     }
@@ -93,8 +93,7 @@ public class FilteredCopyListingTest {
             fileSystem = FileSystem.getLocal(new Configuration());
             outputStream = fileSystem.create(new Path(path), true, 10);
             recordInExpectedValues(path);
-        }
-        finally {
+        } finally {
             IOUtils.cleanup(null, fileSystem, outputStream);
         }
     }
@@ -209,7 +208,7 @@ public class FilteredCopyListingTest {
     private void verifyContents(Path listingPath, int expected) throws Exception {
         SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.getLocal(new Configuration()),
                 listingPath, new Configuration());
-        Text key   = new Text();
+        Text key = new Text();
         FileStatus value = new FileStatus();
         Map<String, String> actualValues = new HashMap<String, String>();
         while (reader.next(key, value)) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index 73dc882..00a2d87 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -18,19 +18,7 @@
 
 package org.apache.falcon.latedata;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -40,131 +28,139 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
 
+import java.io.*;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 public class LateDataHandler extends Configured implements Tool {
 
-	private static Logger LOG = Logger.getLogger(LateDataHandler.class);
-
-	static PrintStream stream = System.out;
-
-	public static void main(String[] args) throws Exception {
-		Configuration conf = new Configuration();
-		Path confPath = new Path("file:///"
-				+ System.getProperty("oozie.action.conf.xml"));
-
-		LOG.info(confPath + " found ? "
-				+ confPath.getFileSystem(conf).exists(confPath));
-		conf.addResource(confPath);
-		ToolRunner.run(conf, new LateDataHandler(), args);
-	}
-
-	private static CommandLine getCommand(String[] args) throws ParseException {
-		Options options = new Options();
-
-		Option opt = new Option("out", true, "Out file name");
-		opt.setRequired(true);
-		options.addOption(opt);
-		opt = new Option("paths", true,
-				"Comma separated path list, further separated by #");
-		opt.setRequired(true);
-		options.addOption(opt);
-		opt = new Option("falconInputFeeds", true,
-				"Input feed names, further separated by #");
-		opt.setRequired(true);
-		options.addOption(opt);
-
-		return new GnuParser().parse(options, args);
-	}
-
-	@Override
-	public int run(String[] args) throws Exception {
-
-		CommandLine command = getCommand(args);
-
-		Path file = new Path(command.getOptionValue("out"));
-		Map<String, Long> map = new LinkedHashMap<String, Long>();
-		String pathStr = getOptionValue(command, "paths");
-		if(pathStr == null)
-		    return 0;
-		
-		String[] pathGroups = pathStr.split("#");
-		String[] inputFeeds = getOptionValue(command, "falconInputFeeds").split(
-				"#");
-		for (int index = 0; index < pathGroups.length; index++) {
-			long usage = 0;
-			for (String pathElement : pathGroups[index].split(",")) {
-				Path inPath = new Path(pathElement);
-				usage += usage(inPath, getConf());
-			}
-			map.put(inputFeeds[index], usage);
-		}
-		LOG.info("MAP data: " + map);
-
-		OutputStream out = file.getFileSystem(getConf()).create(file);
-		for (Map.Entry<String, Long> entry : map.entrySet()) {
-			out.write((entry.getKey() + "=" + entry.getValue() + "\n")
-					.getBytes());
-		}
-		out.close();
-		return 0;
-	}
-
-	private String getOptionValue(CommandLine command, String option) {
-	    String value = command.getOptionValue(option);
-	    if(value.equals("null"))
-	        return null;
-	    return value;
-	}
-	
-	public String detectChanges(Path file, Map<String, Long> map, Configuration conf)
-			throws Exception {
-
-		StringBuffer buffer = new StringBuffer();
-		BufferedReader in = new BufferedReader(new InputStreamReader(file
-				.getFileSystem(conf).open(file)));
-		String line;
-		try {
-			Map<String, Long> recorded = new LinkedHashMap<String, Long>();
-			while ((line = in.readLine()) != null) {
-				if (line.isEmpty())
-					continue;
-				int index = line.indexOf('=');
-				String key = line.substring(0, index);
-				long size = Long.parseLong(line.substring(index + 1));
-				recorded.put(key, size);
-			}
-
-			for (Map.Entry<String, Long> entry : map.entrySet()) {
-				if (recorded.get(entry.getKey()) == null) {
-					LOG.info("No matching key " + entry.getKey());
-					continue;
-				}
-				if (!recorded.get(entry.getKey()).equals(entry.getValue())) {
-					LOG.info("Recorded size:"+recorded.get(entry.getKey())+"  is different from new size" + entry.getValue());
-					buffer.append(entry.getKey()).append(',');
-				}
-			}
-			if (buffer.length() == 0) {
-				return "";
-			} else {
-				return buffer.substring(0, buffer.length() - 1);
-			}
-
-		} finally {
-			in.close();
-		}
-
-	}
-
-	public long usage(Path inPath, Configuration conf) throws IOException {
-		FileSystem fs = inPath.getFileSystem(conf);
-		FileStatus status[] = fs.globStatus(inPath);
-		if (status == null || status.length == 0) {
-			return 0;
-		}
-		long totalSize = 0;
-		for (FileStatus statu : status) {
-			totalSize += fs.getContentSummary(statu.getPath()).getLength();
-		}
-		return totalSize;
-	}
+    private static Logger LOG = Logger.getLogger(LateDataHandler.class);
+
+    static PrintStream stream = System.out;
+
+    public static void main(String[] args) throws Exception {
+        Configuration conf = new Configuration();
+        Path confPath = new Path("file:///"
+                + System.getProperty("oozie.action.conf.xml"));
+
+        LOG.info(confPath + " found ? "
+                + confPath.getFileSystem(conf).exists(confPath));
+        conf.addResource(confPath);
+        ToolRunner.run(conf, new LateDataHandler(), args);
+    }
+
+    private static CommandLine getCommand(String[] args) throws ParseException {
+        Options options = new Options();
+
+        Option opt = new Option("out", true, "Out file name");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new Option("paths", true,
+                "Comma separated path list, further separated by #");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new Option("falconInputFeeds", true,
+                "Input feed names, further separated by #");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return new GnuParser().parse(options, args);
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+        CommandLine command = getCommand(args);
+
+        Path file = new Path(command.getOptionValue("out"));
+        Map<String, Long> map = new LinkedHashMap<String, Long>();
+        String pathStr = getOptionValue(command, "paths");
+        if (pathStr == null) {
+            return 0;
+        }
+
+        String[] pathGroups = pathStr.split("#");
+        String[] inputFeeds = getOptionValue(command, "falconInputFeeds").split(
+                "#");
+        for (int index = 0; index < pathGroups.length; index++) {
+            long usage = 0;
+            for (String pathElement : pathGroups[index].split(",")) {
+                Path inPath = new Path(pathElement);
+                usage += usage(inPath, getConf());
+            }
+            map.put(inputFeeds[index], usage);
+        }
+        LOG.info("MAP data: " + map);
+
+        OutputStream out = file.getFileSystem(getConf()).create(file);
+        for (Map.Entry<String, Long> entry : map.entrySet()) {
+            out.write((entry.getKey() + "=" + entry.getValue() + "\n")
+                    .getBytes());
+        }
+        out.close();
+        return 0;
+    }
+
+    private String getOptionValue(CommandLine command, String option) {
+        String value = command.getOptionValue(option);
+        if (value.equals("null")) {
+            return null;
+        }
+        return value;
+    }
+
+    public String detectChanges(Path file, Map<String, Long> map, Configuration conf)
+            throws Exception {
+
+        StringBuffer buffer = new StringBuffer();
+        BufferedReader in = new BufferedReader(new InputStreamReader(file
+                .getFileSystem(conf).open(file)));
+        String line;
+        try {
+            Map<String, Long> recorded = new LinkedHashMap<String, Long>();
+            while ((line = in.readLine()) != null) {
+                if (line.isEmpty()) {
+                    continue;
+                }
+                int index = line.indexOf('=');
+                String key = line.substring(0, index);
+                long size = Long.parseLong(line.substring(index + 1));
+                recorded.put(key, size);
+            }
+
+            for (Map.Entry<String, Long> entry : map.entrySet()) {
+                if (recorded.get(entry.getKey()) == null) {
+                    LOG.info("No matching key " + entry.getKey());
+                    continue;
+                }
+                if (!recorded.get(entry.getKey()).equals(entry.getValue())) {
+                    LOG.info("Recorded size:" + recorded.get(entry.getKey()) + "  is different from new size"
+                            + entry.getValue());
+                    buffer.append(entry.getKey()).append(',');
+                }
+            }
+            if (buffer.length() == 0) {
+                return "";
+            } else {
+                return buffer.substring(0, buffer.length() - 1);
+            }
+
+        } finally {
+            in.close();
+        }
+
+    }
+
+    public long usage(Path inPath, Configuration conf) throws IOException {
+        FileSystem fs = inPath.getFileSystem(conf);
+        FileStatus status[] = fs.globStatus(inPath);
+        if (status == null || status.length == 0) {
+            return 0;
+        }
+        long totalSize = 0;
+        for (FileStatus statu : status) {
+            totalSize += fs.getContentSummary(statu.getPath()).getLength();
+        }
+        return totalSize;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
index 5f050ca..7a22704 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
@@ -19,20 +19,20 @@ package org.apache.falcon.rerun.event;
 
 public class LaterunEvent extends RerunEvent {
 
-	public LaterunEvent(String clusterName, String wfId, long msgInsertTime,
-			long delay, String entityType, String entityName, String instance,
-			int runId) {
-		super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
-				instance, runId);
-	}
+    public LaterunEvent(String clusterName, String wfId, long msgInsertTime,
+                        long delay, String entityType, String entityName, String instance,
+                        int runId) {
+        super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
+                instance, runId);
+    }
 
-	@Override
-	public String toString() {
-		return "clusterName=" + clusterName + SEP + "wfId=" + wfId + SEP
-				+ "msgInsertTime=" + msgInsertTime + SEP + "delayInMilliSec="
-				+ delayInMilliSec + SEP + "entityType=" + entityType + SEP
-				+ "entityName=" + entityName + SEP + "instance=" + instance
-				+ SEP + "runId=" + runId;
-	}
+    @Override
+    public String toString() {
+        return "clusterName=" + clusterName + SEP + "wfId=" + wfId + SEP
+                + "msgInsertTime=" + msgInsertTime + SEP + "delayInMilliSec="
+                + delayInMilliSec + SEP + "entityType=" + entityType + SEP
+                + "entityName=" + entityName + SEP + "instance=" + instance
+                + SEP + "runId=" + runId;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
index 9526e0a..5a1e3e1 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
@@ -23,91 +23,91 @@ import java.util.concurrent.TimeUnit;
 
 public class RerunEvent implements Delayed {
 
-	protected static final String SEP = "*";
-	
-	public  enum RerunType{
-		RETRY, LATE
-	}
-
-	protected String clusterName;
-	protected String wfId;
-	protected long msgInsertTime;
-	protected long delayInMilliSec;
-	protected String entityType;
-	protected String entityName;
-	protected String instance;
-	protected int runId;
-
-	public RerunEvent(String clusterName, String wfId,
-			long msgInsertTime, long delay, String entityType, String entityName,
-			String instance, int runId) {
-		this.clusterName = clusterName;
-		this.wfId = wfId;
-		this.msgInsertTime = msgInsertTime;
-		this.delayInMilliSec = delay;
-		this.entityName = entityName;
-		this.instance = instance;
-		this.runId = runId;
-		this.entityType=entityType;
-	}
-
-	public String getClusterName() {
-		return clusterName;
-	}
-
-	public String getWfId() {
-		return wfId;
-	}
-
-	public long getDelayInMilliSec() {
-		return delayInMilliSec;
-	}
-
-	public String getEntityName() {
-		return entityName;
-	}
-
-	public String getInstance() {
-		return instance;
-	}
-
-	public int getRunId() {
-		return runId;
-	}
-	
-	public String getEntityType(){
-		return entityType;
-	}
-
-	@Override
-	public int compareTo(Delayed o) {
+    protected static final String SEP = "*";
+
+    public enum RerunType {
+        RETRY, LATE
+    }
+
+    protected String clusterName;
+    protected String wfId;
+    protected long msgInsertTime;
+    protected long delayInMilliSec;
+    protected String entityType;
+    protected String entityName;
+    protected String instance;
+    protected int runId;
+
+    public RerunEvent(String clusterName, String wfId,
+                      long msgInsertTime, long delay, String entityType, String entityName,
+                      String instance, int runId) {
+        this.clusterName = clusterName;
+        this.wfId = wfId;
+        this.msgInsertTime = msgInsertTime;
+        this.delayInMilliSec = delay;
+        this.entityName = entityName;
+        this.instance = instance;
+        this.runId = runId;
+        this.entityType = entityType;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public String getWfId() {
+        return wfId;
+    }
+
+    public long getDelayInMilliSec() {
+        return delayInMilliSec;
+    }
+
+    public String getEntityName() {
+        return entityName;
+    }
+
+    public String getInstance() {
+        return instance;
+    }
+
+    public int getRunId() {
+        return runId;
+    }
+
+    public String getEntityType() {
+        return entityType;
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
         RerunEvent event = (RerunEvent) o;
         return new Date(msgInsertTime + delayInMilliSec).
                 compareTo(new Date(event.msgInsertTime + event.delayInMilliSec));
-	}
-
-	@Override
-	public long getDelay(TimeUnit unit) {
-		return unit.convert((msgInsertTime - System.currentTimeMillis())
-				+ delayInMilliSec, TimeUnit.MILLISECONDS);
-	}
-
-	public long getMsgInsertTime() {
-		return msgInsertTime;
-	}
-
-	public void setMsgInsertTime(long msgInsertTime) {
-		this.msgInsertTime = msgInsertTime;
-	}
-
-	public RerunType getType() {
-		if (this instanceof RetryEvent) {
-			return RerunType.RETRY;
-		} else if (this instanceof LaterunEvent) {
-			return RerunType.LATE;
-		} else {
-			return null;
-		}
-	}
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+        return unit.convert((msgInsertTime - System.currentTimeMillis())
+                + delayInMilliSec, TimeUnit.MILLISECONDS);
+    }
+
+    public long getMsgInsertTime() {
+        return msgInsertTime;
+    }
+
+    public void setMsgInsertTime(long msgInsertTime) {
+        this.msgInsertTime = msgInsertTime;
+    }
+
+    public RerunType getType() {
+        if (this instanceof RetryEvent) {
+            return RerunType.RETRY;
+        } else if (this instanceof LaterunEvent) {
+            return RerunType.LATE;
+        } else {
+            return null;
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
index fcdb836..c5e1e80 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
@@ -17,53 +17,54 @@
  */
 package org.apache.falcon.rerun.event;
 
+import org.apache.falcon.rerun.event.RerunEvent.RerunType;
+
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.falcon.rerun.event.RerunEvent.RerunType;
-
 public class RerunEventFactory<T extends RerunEvent> {
 
-	public T getRerunEvent(String type, String line) {
-		if (type.startsWith(RerunType.RETRY.name())) {
-			return retryEventFromString(line);
-		} else if (type.startsWith(RerunType.LATE.name())) {
-			return lateEventFromString(line);
-		} else
-			return null;
-	}
+    public T getRerunEvent(String type, String line) {
+        if (type.startsWith(RerunType.RETRY.name())) {
+            return retryEventFromString(line);
+        } else if (type.startsWith(RerunType.LATE.name())) {
+            return lateEventFromString(line);
+        } else {
+            return null;
+        }
+    }
 
-	@SuppressWarnings("unchecked")
-	private T lateEventFromString(String line) {
-		Map<String, String> map = getMap(line);
-		return (T) new LaterunEvent(map.get("clusterName"), map.get("wfId"),
-				Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map
-						.get("delayInMilliSec")), map.get("entityType"),
-				map.get("entityName"), map.get("instance"),
-				Integer.parseInt(map.get("runId")));
-	}
+    @SuppressWarnings("unchecked")
+    private T lateEventFromString(String line) {
+        Map<String, String> map = getMap(line);
+        return (T) new LaterunEvent(map.get("clusterName"), map.get("wfId"),
+                Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map
+                .get("delayInMilliSec")), map.get("entityType"),
+                map.get("entityName"), map.get("instance"),
+                Integer.parseInt(map.get("runId")));
+    }
 
-	@SuppressWarnings("unchecked")
-	public T retryEventFromString(String line) {
-		Map<String, String> map = getMap(line);
-		return (T) new RetryEvent(map.get("clusterName"), map.get("wfId"),
-				Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map
-						.get("delayInMilliSec")), map.get("entityType"),
-				map.get("entityName"), map.get("instance"),
-				Integer.parseInt(map.get("runId")), Integer.parseInt(map
-						.get("attempts")), Integer.parseInt(map
-						.get("failRetryCount")));
+    @SuppressWarnings("unchecked")
+    public T retryEventFromString(String line) {
+        Map<String, String> map = getMap(line);
+        return (T) new RetryEvent(map.get("clusterName"), map.get("wfId"),
+                Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map
+                .get("delayInMilliSec")), map.get("entityType"),
+                map.get("entityName"), map.get("instance"),
+                Integer.parseInt(map.get("runId")), Integer.parseInt(map
+                .get("attempts")), Integer.parseInt(map
+                .get("failRetryCount")));
 
-	}
+    }
 
-	private Map<String, String> getMap(String message) {
-		String[] items = message.split("\\" + RerunEvent.SEP);
-		Map<String, String> map = new HashMap<String, String>();
-		for (String item : items) {
-			String[] pair = item.split("=");
-			map.put(pair[0], pair[1]);
-		}
-		return map;
-	}
+    private Map<String, String> getMap(String message) {
+        String[] items = message.split("\\" + RerunEvent.SEP);
+        Map<String, String> map = new HashMap<String, String>();
+        for (String item : items) {
+            String[] pair = item.split("=");
+            map.put(pair[0], pair[1]);
+        }
+        return map;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
index 7ff4361..33248b8 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
@@ -19,39 +19,39 @@ package org.apache.falcon.rerun.event;
 
 public class RetryEvent extends RerunEvent {
 
-	private int attempts;
-	private int failRetryCount;
-
-	public RetryEvent(String clusterName, String wfId, long msgInsertTime,
-			long delay, String entityType, String entityName, String instance,
-			int runId, int attempts, int failRetryCount) {
-		super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
-				instance, runId);
-		this.attempts = attempts;
-		this.failRetryCount = failRetryCount;
-	}
-
-	public int getAttempts() {
-		return attempts;
-	}
-
-	public int getFailRetryCount() {
-		return failRetryCount;
-	}
-
-	public void setFailRetryCount(int failRetryCount) {
-		this.failRetryCount = failRetryCount;
-	}
-
-	@Override
-	public String toString() {
-
-		return "clusterName=" + clusterName + SEP + "wfId=" + wfId + SEP
-				+ "msgInsertTime=" + msgInsertTime + SEP + "delayInMilliSec="
-				+ delayInMilliSec + SEP + "entityType=" + entityType + SEP
-				+ "entityName=" + entityName + SEP + "instance=" + instance
-				+ SEP + "runId=" + runId + SEP + "attempts=" + attempts + SEP
-				+ "failRetryCount=" + failRetryCount;
-	}
+    private int attempts;
+    private int failRetryCount;
+
+    public RetryEvent(String clusterName, String wfId, long msgInsertTime,
+                      long delay, String entityType, String entityName, String instance,
+                      int runId, int attempts, int failRetryCount) {
+        super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
+                instance, runId);
+        this.attempts = attempts;
+        this.failRetryCount = failRetryCount;
+    }
+
+    public int getAttempts() {
+        return attempts;
+    }
+
+    public int getFailRetryCount() {
+        return failRetryCount;
+    }
+
+    public void setFailRetryCount(int failRetryCount) {
+        this.failRetryCount = failRetryCount;
+    }
+
+    @Override
+    public String toString() {
+
+        return "clusterName=" + clusterName + SEP + "wfId=" + wfId + SEP
+                + "msgInsertTime=" + msgInsertTime + SEP + "delayInMilliSec="
+                + delayInMilliSec + SEP + "entityType=" + entityType + SEP
+                + "entityName=" + entityName + SEP + "instance=" + instance
+                + SEP + "runId=" + runId + SEP + "attempts=" + attempts + SEP
+                + "failRetryCount=" + failRetryCount;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
index 5e2fa50..fa1d9e3 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
@@ -27,47 +27,47 @@ import org.apache.falcon.rerun.queue.DelayedQueue;
 import org.apache.log4j.Logger;
 
 public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends AbstractRerunHandler<T, DelayedQueue<T>>>
-		implements Runnable {
+        implements Runnable {
 
-	protected static final Logger LOG = Logger
-			.getLogger(AbstractRerunConsumer.class);
+    protected static final Logger LOG = Logger
+            .getLogger(AbstractRerunConsumer.class);
 
-	protected M handler;
+    protected M handler;
 
-	public AbstractRerunConsumer(M handler) {
-		this.handler = handler;
-	}
+    public AbstractRerunConsumer(M handler) {
+        this.handler = handler;
+    }
 
-	@Override
-	public void run() {
-		int attempt = 1;
-		AbstractRerunPolicy policy = new ExpBackoffPolicy();
-		Frequency frequency = new Frequency("minutes(1)");
-		while (true) {
-			try {
-				T message = null;
-				try {
-					message = handler.takeFromQueue();
-					attempt = 1;
-				} catch (FalconException e) {
-					LOG.error("Error while reading message from the queue: ", e);
-					GenericAlert.alertRerunConsumerFailed(
-							"Error while reading message from the queue: ", e);
-					Thread.sleep(policy.getDelay(frequency, attempt));
-					handler.reconnect();
-					attempt++;
-					continue;
-				}
-				String jobStatus = handler.getWfEngine().getWorkflowStatus(
-						message.getClusterName(), message.getWfId());
-				handleRerun(message.getClusterName(), jobStatus, message);
+    @Override
+    public void run() {
+        int attempt = 1;
+        AbstractRerunPolicy policy = new ExpBackoffPolicy();
+        Frequency frequency = new Frequency("minutes(1)");
+        while (true) {
+            try {
+                T message = null;
+                try {
+                    message = handler.takeFromQueue();
+                    attempt = 1;
+                } catch (FalconException e) {
+                    LOG.error("Error while reading message from the queue: ", e);
+                    GenericAlert.alertRerunConsumerFailed(
+                            "Error while reading message from the queue: ", e);
+                    Thread.sleep(policy.getDelay(frequency, attempt));
+                    handler.reconnect();
+                    attempt++;
+                    continue;
+                }
+                String jobStatus = handler.getWfEngine().getWorkflowStatus(
+                        message.getClusterName(), message.getWfId());
+                handleRerun(message.getClusterName(), jobStatus, message);
 
-			} catch (Throwable e) {
-				LOG.error("Error in rerun consumer:", e);
-			}
-		}
+            } catch (Throwable e) {
+                LOG.error("Error in rerun consumer:", e);
+            }
+        }
 
-	}
+    }
 
-	protected abstract void handleRerun(String cluster, String jobStatus, T message);
+    protected abstract void handleRerun(String cluster, String jobStatus, T message);
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index 66f9c2a..4a90b9f 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -29,44 +29,44 @@ import org.apache.log4j.Logger;
 
 public abstract class AbstractRerunHandler<T extends RerunEvent, M extends DelayedQueue<T>> {
 
-	protected static final Logger LOG = Logger
-			.getLogger(LateRerunHandler.class);
-	protected M delayQueue;
-	private AbstractWorkflowEngine wfEngine;
+    protected static final Logger LOG = Logger
+            .getLogger(LateRerunHandler.class);
+    protected M delayQueue;
+    private AbstractWorkflowEngine wfEngine;
 
-	public void init(M delayQueue) throws FalconException {
-		this.wfEngine = WorkflowEngineFactory.getWorkflowEngine();
-		this.delayQueue = delayQueue;
-		this.delayQueue.init();
-	}
+    public void init(M delayQueue) throws FalconException {
+        this.wfEngine = WorkflowEngineFactory.getWorkflowEngine();
+        this.delayQueue = delayQueue;
+        this.delayQueue.init();
+    }
 
-	public abstract void handleRerun(String cluster, String entityType,
-			String entityName, String nominalTime, String runId, String wfId,
-			long msgReceivedTime);
+    public abstract void handleRerun(String cluster, String entityType,
+                                     String entityName, String nominalTime, String runId, String wfId,
+                                     long msgReceivedTime);
 
-	public AbstractWorkflowEngine getWfEngine() {
-		return wfEngine;
-	}
+    public AbstractWorkflowEngine getWfEngine() {
+        return wfEngine;
+    }
 
-	public boolean offerToQueue(T event) throws FalconException {
-		return delayQueue.offer(event);
-	}
+    public boolean offerToQueue(T event) throws FalconException {
+        return delayQueue.offer(event);
+    }
 
-	public T takeFromQueue() throws FalconException {
-		return delayQueue.take();
-	}
-	
-	public void reconnect() throws FalconException {
-		delayQueue.reconnect();
-	}
+    public T takeFromQueue() throws FalconException {
+        return delayQueue.take();
+    }
 
-	public Entity getEntity(String entityType, String entityName)
-			throws FalconException {
-		return EntityUtil.getEntity(entityType, entityName);
-	}
+    public void reconnect() throws FalconException {
+        delayQueue.reconnect();
+    }
 
-	public Retry getRetry(Entity entity) throws FalconException {
-		return EntityUtil.getRetry(entity);
-	}
+    public Entity getEntity(String entityType, String entityName)
+            throws FalconException {
+        return EntityUtil.getEntity(entityType, entityName);
+    }
+
+    public Retry getRetry(Entity entity) throws FalconException {
+        return EntityUtil.getRetry(entity);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index fc88f0e..03561fc 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -17,16 +17,6 @@
  */
 package org.apache.falcon.rerun.handler;
 
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.falcon.aspect.GenericAlert;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
@@ -35,115 +25,120 @@ import org.apache.falcon.entity.v0.process.LateInput;
 import org.apache.falcon.latedata.LateDataHandler;
 import org.apache.falcon.rerun.event.LaterunEvent;
 import org.apache.falcon.rerun.queue.DelayedQueue;
-import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.util.*;
 
 public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEvent>>>
-		extends AbstractRerunConsumer<LaterunEvent, T> {
+        extends AbstractRerunConsumer<LaterunEvent, T> {
 
-	public LateRerunConsumer(T handler) {
-		super(handler);
-	}
+    public LateRerunConsumer(T handler) {
+        super(handler);
+    }
 
-	@Override
-	protected void handleRerun(String cluster, String jobStatus,
-			LaterunEvent message) {
-		try {
-			if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")
-					|| jobStatus.equals("SUSPENDED")) {
-				LOG.debug("Re-enqueing message in LateRerunHandler for workflow with same delay as job status is running:"
-						+ message.getWfId());
-				message.setMsgInsertTime(System.currentTimeMillis());
-				handler.offerToQueue(message);
-				return;
-			}
+    @Override
+    protected void handleRerun(String cluster, String jobStatus,
+                               LaterunEvent message) {
+        try {
+            if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")
+                    || jobStatus.equals("SUSPENDED")) {
+                LOG.debug(
+                        "Re-enqueing message in LateRerunHandler for workflow with same delay as job status is running:"
+                                + message.getWfId());
+                message.setMsgInsertTime(System.currentTimeMillis());
+                handler.offerToQueue(message);
+                return;
+            }
 
-			String detectLate = detectLate(message);
+            String detectLate = detectLate(message);
 
-			if (detectLate.equals("")) {
-				LOG.debug("No Late Data Detected, scheduling next late rerun for wf-id: "
-						+ message.getWfId()
-						+ " at "
-						+ SchemaHelper.formatDateUTC(new Date()));
-				handler.handleRerun(cluster, message.getEntityType(),
-						message.getEntityName(), message.getInstance(),
-						Integer.toString(message.getRunId()),
-						message.getWfId(), System.currentTimeMillis());
-				return;
-			}
+            if (detectLate.equals("")) {
+                LOG.debug("No Late Data Detected, scheduling next late rerun for wf-id: "
+                        + message.getWfId()
+                        + " at "
+                        + SchemaHelper.formatDateUTC(new Date()));
+                handler.handleRerun(cluster, message.getEntityType(),
+                        message.getEntityName(), message.getInstance(),
+                        Integer.toString(message.getRunId()),
+                        message.getWfId(), System.currentTimeMillis());
+                return;
+            }
 
-			LOG.info("Late changes detected in the following feeds: "
-					+ detectLate);
+            LOG.info("Late changes detected in the following feeds: "
+                    + detectLate);
 
-			handler.getWfEngine().reRun(message.getClusterName(),
-					message.getWfId(), null);
-			LOG.info("Scheduled late rerun for wf-id: " + message.getWfId()
-					+ " on cluster: " + message.getClusterName());
-		} catch (Exception e) {
-			LOG.warn(
-					"Late Re-run failed for instance "
-							+ message.getEntityName() + ":"
-							+ message.getInstance() + " after "
-							+ message.getDelayInMilliSec() + " with message:",
-					e);
-			GenericAlert.alertLateRerunFailed(message.getEntityType(),
-					message.getEntityName(), message.getInstance(),
-					message.getWfId(), Integer.toString(message.getRunId()),
-					e.getMessage());
-		}
+            handler.getWfEngine().reRun(message.getClusterName(),
+                    message.getWfId(), null);
+            LOG.info("Scheduled late rerun for wf-id: " + message.getWfId()
+                    + " on cluster: " + message.getClusterName());
+        } catch (Exception e) {
+            LOG.warn(
+                    "Late Re-run failed for instance "
+                            + message.getEntityName() + ":"
+                            + message.getInstance() + " after "
+                            + message.getDelayInMilliSec() + " with message:",
+                    e);
+            GenericAlert.alertLateRerunFailed(message.getEntityType(),
+                    message.getEntityName(), message.getInstance(),
+                    message.getWfId(), Integer.toString(message.getRunId()),
+                    e.getMessage());
+        }
 
-	}
+    }
 
-	public String detectLate(LaterunEvent message) throws Exception {
-		LateDataHandler late = new LateDataHandler();
-		String falconInputFeeds = handler.getWfEngine().getWorkflowProperty(
-				message.getClusterName(), message.getWfId(), "falconInputFeeds");
-		String logDir = handler.getWfEngine().getWorkflowProperty(
-				message.getClusterName(), message.getWfId(), "logDir");
-		String falconInPaths = handler.getWfEngine().getWorkflowProperty(
-				message.getClusterName(), message.getWfId(), "falconInPaths");
-		String nominalTime = handler.getWfEngine().getWorkflowProperty(
-				message.getClusterName(), message.getWfId(), "nominalTime");
-		String srcClusterName = handler.getWfEngine().getWorkflowProperty(
-				message.getClusterName(), message.getWfId(), "srcClusterName");
+    public String detectLate(LaterunEvent message) throws Exception {
+        LateDataHandler late = new LateDataHandler();
+        String falconInputFeeds = handler.getWfEngine().getWorkflowProperty(
+                message.getClusterName(), message.getWfId(), "falconInputFeeds");
+        String logDir = handler.getWfEngine().getWorkflowProperty(
+                message.getClusterName(), message.getWfId(), "logDir");
+        String falconInPaths = handler.getWfEngine().getWorkflowProperty(
+                message.getClusterName(), message.getWfId(), "falconInPaths");
+        String nominalTime = handler.getWfEngine().getWorkflowProperty(
+                message.getClusterName(), message.getWfId(), "nominalTime");
+        String srcClusterName = handler.getWfEngine().getWorkflowProperty(
+                message.getClusterName(), message.getWfId(), "srcClusterName");
 
-		Configuration conf = handler.getConfiguration(message.getClusterName(),
-				message.getWfId());
-		Path lateLogPath = handler.getLateLogPath(logDir, nominalTime,
-				srcClusterName);
-		FileSystem fs = FileSystem.get(conf);
-		if (!fs.exists(lateLogPath)) {
-			LOG.warn("Late log file:" + lateLogPath + " not found:");
-			return "";
-		}
-		Map<String, Long> feedSizes = new LinkedHashMap<String, Long>();
-		String[] pathGroups = falconInPaths.split("#");
-		String[] inputFeeds = falconInputFeeds.split("#");
-		Entity entity = EntityUtil.getEntity(message.getEntityType(),
-				message.getEntityName());
+        Configuration conf = handler.getConfiguration(message.getClusterName(),
+                message.getWfId());
+        Path lateLogPath = handler.getLateLogPath(logDir, nominalTime,
+                srcClusterName);
+        FileSystem fs = FileSystem.get(conf);
+        if (!fs.exists(lateLogPath)) {
+            LOG.warn("Late log file:" + lateLogPath + " not found:");
+            return "";
+        }
+        Map<String, Long> feedSizes = new LinkedHashMap<String, Long>();
+        String[] pathGroups = falconInPaths.split("#");
+        String[] inputFeeds = falconInputFeeds.split("#");
+        Entity entity = EntityUtil.getEntity(message.getEntityType(),
+                message.getEntityName());
 
-		List<String> lateFeed = new ArrayList<String>();
-		if (EntityUtil.getLateProcess(entity) != null) {
-			for (LateInput li : EntityUtil.getLateProcess(entity)
-					.getLateInputs()) {
-				lateFeed.add(li.getInput());
-			}
-			for (int index = 0; index < pathGroups.length; index++) {
-				if (lateFeed.contains(inputFeeds[index])) {
-					long usage = 0;
-					for (String pathElement : pathGroups[index].split(",")) {
-						Path inPath = new Path(pathElement);
-						usage += late.usage(inPath, conf);
-					}
-					feedSizes.put(inputFeeds[index], usage);
-				}
-			}
-		} else {
-			LOG.warn("Late process is not configured for entity: "
-					+ message.getEntityType() + "(" + message.getEntityName()
-					+ ")");
-		}
+        List<String> lateFeed = new ArrayList<String>();
+        if (EntityUtil.getLateProcess(entity) != null) {
+            for (LateInput li : EntityUtil.getLateProcess(entity)
+                    .getLateInputs()) {
+                lateFeed.add(li.getInput());
+            }
+            for (int index = 0; index < pathGroups.length; index++) {
+                if (lateFeed.contains(inputFeeds[index])) {
+                    long usage = 0;
+                    for (String pathElement : pathGroups[index].split(",")) {
+                        Path inPath = new Path(pathElement);
+                        usage += late.usage(inPath, conf);
+                    }
+                    feedSizes.put(inputFeeds[index], usage);
+                }
+            }
+        } else {
+            LOG.warn("Late process is not configured for entity: "
+                    + message.getEntityType() + "(" + message.getEntityName()
+                    + ")");
+        }
 
-		return late.detectChanges(lateLogPath, feedSizes, conf);
-	}
+        return late.detectChanges(lateLogPath, feedSizes, conf);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index e2145cb..ad19157 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -17,12 +17,6 @@
  */
 package org.apache.falcon.rerun.handler;
 
-import java.util.Date;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.GenericAlert;
 import org.apache.falcon.entity.EntityUtil;
@@ -31,10 +25,7 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.LateInput;
-import org.apache.falcon.entity.v0.process.LateProcess;
-import org.apache.falcon.entity.v0.process.PolicyType;
+import org.apache.falcon.entity.v0.process.*;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.rerun.event.LaterunEvent;
@@ -42,185 +33,192 @@ import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
 import org.apache.falcon.rerun.policy.RerunPolicyFactory;
 import org.apache.falcon.rerun.queue.DelayedQueue;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Date;
 
 public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
-		AbstractRerunHandler<LaterunEvent, M> {
-
-	@Override
-	public void handleRerun(String cluster, String entityType,
-			String entityName, String nominalTime, String runId, String wfId,
-			long msgReceivedTime) {
-
-		try {
-			Entity entity = EntityUtil.getEntity(entityType, entityName);
-			try {
-				if (EntityUtil.getLateProcess(entity) == null
-						|| EntityUtil.getLateProcess(entity).getLateInputs() == null
-						|| EntityUtil.getLateProcess(entity).getLateInputs()
-								.size() == 0) {
-					LOG.info("Late rerun not configured for entity: " + entityName);
-					return;
-				}
-			} catch (FalconException e) {
-				LOG.error("Unable to get Late Process for entity:" + entityName);
-				return;
-			}
-			int intRunId = Integer.parseInt(runId);
-			Date msgInsertTime = EntityUtil.parseDateUTC(nominalTime);
-			Long wait = getEventDelay(entity, nominalTime);
-			if (wait == -1) {
-				LOG.info("Late rerun expired for entity: "+entityType+"("+entityName+")");
-				String logDir = this.getWfEngine().getWorkflowProperty(cluster,
-						wfId, "logDir");
-				String srcClusterName = this.getWfEngine().getWorkflowProperty(
-						cluster, wfId, "srcClusterName");
-				Path lateLogPath = this.getLateLogPath(logDir,
-						EntityUtil.UTCtoURIDate(nominalTime), srcClusterName);
-				LOG.info("Going to delete path:" +lateLogPath);
-				FileSystem fs = FileSystem.get(getConfiguration(cluster,
-						wfId));
-				if (fs.exists(lateLogPath)) {
-					boolean deleted = fs.delete(lateLogPath, true);
-					if (deleted == true) {
-						LOG.info("Successfully deleted late file path:"
-								+ lateLogPath);
-					}
-				}
-				return;
-			}
-
-			LOG.debug("Scheduling the late rerun for entity instance : "
-					+ entityType + "(" + entityName + ")" + ":" + nominalTime
-					+ " And WorkflowId: " + wfId);
-			LaterunEvent event = new LaterunEvent(cluster, wfId,
-					msgInsertTime.getTime(), wait, entityType, entityName,
-					nominalTime, intRunId);
-			offerToQueue(event);
-		} catch (Exception e) {
-			LOG.error("Unable to schedule late rerun for entity instance : "
-					+ entityType + "(" + entityName + ")" + ":" + nominalTime
-					+ " And WorkflowId: " + wfId, e);
-			GenericAlert.alertLateRerunFailed(entityType, entityName,
-					nominalTime, wfId, runId, e.getMessage());
-		}
-	}
-
-	private long getEventDelay(Entity entity, String nominalTime)
-			throws FalconException {
-
-		Date instanceDate = EntityUtil.parseDateUTC(nominalTime);
-		LateProcess lateProcess = EntityUtil.getLateProcess(entity);
-		if (lateProcess == null) {
-			LOG.warn("Late run not applicable for entity:"
-					+ entity.getEntityType() + "(" + entity.getName() + ")");
-			return -1;
-		}
-		PolicyType latePolicy = lateProcess.getPolicy();
-		Date cutOffTime = getCutOffTime(entity, nominalTime);
-		Date now = new Date();
-		Long wait = null;
-
-		if (now.after(cutOffTime)) {
-			LOG.warn("Feed Cut Off time: "
-					+ SchemaHelper.formatDateUTC(cutOffTime)
-					+ " has expired, Late Rerun can not be scheduled");
-			return -1;
-		} else {
-			AbstractRerunPolicy rerunPolicy = RerunPolicyFactory
-					.getRetryPolicy(latePolicy);
-			wait = rerunPolicy.getDelay(lateProcess.getDelay(), instanceDate,
-					cutOffTime);
-		}
-		return wait;
-	}
-
-	public static Date addTime(Date date, int milliSecondsToAdd) {
-		return new Date(date.getTime() + milliSecondsToAdd);
-	}
-
-	public static Date getCutOffTime(Entity entity, String nominalTime)
-			throws FalconException {
-
-		ConfigurationStore store = ConfigurationStore.get();
-		ExpressionHelper evaluator = ExpressionHelper.get();
-		Date instanceStart = EntityUtil.parseDateUTC(nominalTime);
-		ExpressionHelper.setReferenceDate(instanceStart);
-		Date endTime = new Date();
-		Date feedCutOff = new Date(0);
-		if (entity.getEntityType() == EntityType.FEED) {
-			if (((Feed) entity).getLateArrival() == null) {
-				LOG.debug("Feed's " + entity.getName()
-						+ " late arrival cut-off is not configured, returning");
-				return feedCutOff;
-			}
-			String lateCutOff = ((Feed) entity).getLateArrival().getCutOff()
-					.toString();
-			endTime = EntityUtil.parseDateUTC(nominalTime);
-			long feedCutOffPeriod = evaluator.evaluate(lateCutOff, Long.class);
-			endTime = addTime(endTime, (int) feedCutOffPeriod);
-			return endTime;
-		} else if (entity.getEntityType() == EntityType.PROCESS) {
-			Process process = (Process) entity;
-			for (LateInput lp : process.getLateProcess().getLateInputs()) {
-				Feed feed = null;
-				String endInstanceTime = "";
-				for (Input input : process.getInputs().getInputs()) {
-					if (input.getName().equals(lp.getInput())) {
-						endInstanceTime = input.getEnd();
-						feed = store.get(EntityType.FEED, input.getFeed());
-						break;
-					}
-				}
-				if (feed.getLateArrival() == null) {
-					LOG.debug("Feed's " + feed.getName()
-							+ " late arrival cut-off is not configured, ignoring this feed");
-					continue;
-				}
-				String lateCutOff = feed.getLateArrival().getCutOff()
-						.toString();
-				endTime = evaluator.evaluate(endInstanceTime, Date.class);
-				long feedCutOffPeriod = evaluator.evaluate(lateCutOff,
-						Long.class);
-				endTime = addTime(endTime, (int) feedCutOffPeriod);
-
-				if (endTime.after(feedCutOff))
-					feedCutOff = endTime;
-			}
-			return feedCutOff;
-		} else {
-			throw new FalconException(
-					"Invalid entity while getting cut-off time:"
-							+ entity.getName());
-		}
-	}
-
-	@Override
-	public void init(M delayQueue) throws FalconException {
-		super.init(delayQueue);
-		Thread daemon = new Thread(new LateRerunConsumer(this));
-		daemon.setName("LaterunHandler");
-		daemon.setDaemon(true);
-		daemon.start();
-		LOG.info("Laterun Handler  thread started");
-	}
-	
-	public Path getLateLogPath(String logDir, String nominalTime,
-			String srcClusterName) {
-		//SrcClusterName valid only in case of feed
-		return new Path(logDir + "/latedata/" + nominalTime + "/"
-				+ (srcClusterName == null
-				? "" : srcClusterName));
-
-	}
-	
-	public Configuration getConfiguration(String cluster, String wfId)
-			throws FalconException {
-		Configuration conf = new Configuration();
-		conf.set(
-				CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
-				this.getWfEngine().getWorkflowProperty(cluster, wfId,
-						AbstractWorkflowEngine.NAME_NODE));
-		return conf;
-	}
+        AbstractRerunHandler<LaterunEvent, M> {
+
+    @Override
+    public void handleRerun(String cluster, String entityType,
+                            String entityName, String nominalTime, String runId, String wfId,
+                            long msgReceivedTime) {
+
+        try {
+            Entity entity = EntityUtil.getEntity(entityType, entityName);
+            try {
+                if (EntityUtil.getLateProcess(entity) == null
+                        || EntityUtil.getLateProcess(entity).getLateInputs() == null
+                        || EntityUtil.getLateProcess(entity).getLateInputs()
+                        .size() == 0) {
+                    LOG.info("Late rerun not configured for entity: " + entityName);
+                    return;
+                }
+            } catch (FalconException e) {
+                LOG.error("Unable to get Late Process for entity:" + entityName);
+                return;
+            }
+            int intRunId = Integer.parseInt(runId);
+            Date msgInsertTime = EntityUtil.parseDateUTC(nominalTime);
+            Long wait = getEventDelay(entity, nominalTime);
+            if (wait == -1) {
+                LOG.info("Late rerun expired for entity: " + entityType + "(" + entityName + ")");
+                String logDir = this.getWfEngine().getWorkflowProperty(cluster,
+                        wfId, "logDir");
+                String srcClusterName = this.getWfEngine().getWorkflowProperty(
+                        cluster, wfId, "srcClusterName");
+                Path lateLogPath = this.getLateLogPath(logDir,
+                        EntityUtil.UTCtoURIDate(nominalTime), srcClusterName);
+                LOG.info("Going to delete path:" + lateLogPath);
+                FileSystem fs = FileSystem.get(getConfiguration(cluster,
+                        wfId));
+                if (fs.exists(lateLogPath)) {
+                    boolean deleted = fs.delete(lateLogPath, true);
+                    if (deleted == true) {
+                        LOG.info("Successfully deleted late file path:"
+                                + lateLogPath);
+                    }
+                }
+                return;
+            }
+
+            LOG.debug("Scheduling the late rerun for entity instance : "
+                    + entityType + "(" + entityName + ")" + ":" + nominalTime
+                    + " And WorkflowId: " + wfId);
+            LaterunEvent event = new LaterunEvent(cluster, wfId,
+                    msgInsertTime.getTime(), wait, entityType, entityName,
+                    nominalTime, intRunId);
+            offerToQueue(event);
+        } catch (Exception e) {
+            LOG.error("Unable to schedule late rerun for entity instance : "
+                    + entityType + "(" + entityName + ")" + ":" + nominalTime
+                    + " And WorkflowId: " + wfId, e);
+            GenericAlert.alertLateRerunFailed(entityType, entityName,
+                    nominalTime, wfId, runId, e.getMessage());
+        }
+    }
+
+    private long getEventDelay(Entity entity, String nominalTime)
+            throws FalconException {
+
+        Date instanceDate = EntityUtil.parseDateUTC(nominalTime);
+        LateProcess lateProcess = EntityUtil.getLateProcess(entity);
+        if (lateProcess == null) {
+            LOG.warn("Late run not applicable for entity:"
+                    + entity.getEntityType() + "(" + entity.getName() + ")");
+            return -1;
+        }
+        PolicyType latePolicy = lateProcess.getPolicy();
+        Date cutOffTime = getCutOffTime(entity, nominalTime);
+        Date now = new Date();
+        Long wait = null;
+
+        if (now.after(cutOffTime)) {
+            LOG.warn("Feed Cut Off time: "
+                    + SchemaHelper.formatDateUTC(cutOffTime)
+                    + " has expired, Late Rerun can not be scheduled");
+            return -1;
+        } else {
+            AbstractRerunPolicy rerunPolicy = RerunPolicyFactory
+                    .getRetryPolicy(latePolicy);
+            wait = rerunPolicy.getDelay(lateProcess.getDelay(), instanceDate,
+                    cutOffTime);
+        }
+        return wait;
+    }
+
+    public static Date addTime(Date date, int milliSecondsToAdd) {
+        return new Date(date.getTime() + milliSecondsToAdd);
+    }
+
+    public static Date getCutOffTime(Entity entity, String nominalTime)
+            throws FalconException {
+
+        ConfigurationStore store = ConfigurationStore.get();
+        ExpressionHelper evaluator = ExpressionHelper.get();
+        Date instanceStart = EntityUtil.parseDateUTC(nominalTime);
+        ExpressionHelper.setReferenceDate(instanceStart);
+        Date endTime = new Date();
+        Date feedCutOff = new Date(0);
+        if (entity.getEntityType() == EntityType.FEED) {
+            if (((Feed) entity).getLateArrival() == null) {
+                LOG.debug("Feed's " + entity.getName()
+                        + " late arrival cut-off is not configured, returning");
+                return feedCutOff;
+            }
+            String lateCutOff = ((Feed) entity).getLateArrival().getCutOff()
+                    .toString();
+            endTime = EntityUtil.parseDateUTC(nominalTime);
+            long feedCutOffPeriod = evaluator.evaluate(lateCutOff, Long.class);
+            endTime = addTime(endTime, (int) feedCutOffPeriod);
+            return endTime;
+        } else if (entity.getEntityType() == EntityType.PROCESS) {
+            Process process = (Process) entity;
+            for (LateInput lp : process.getLateProcess().getLateInputs()) {
+                Feed feed = null;
+                String endInstanceTime = "";
+                for (Input input : process.getInputs().getInputs()) {
+                    if (input.getName().equals(lp.getInput())) {
+                        endInstanceTime = input.getEnd();
+                        feed = store.get(EntityType.FEED, input.getFeed());
+                        break;
+                    }
+                }
+                if (feed.getLateArrival() == null) {
+                    LOG.debug("Feed's " + feed.getName()
+                            + " late arrival cut-off is not configured, ignoring this feed");
+                    continue;
+                }
+                String lateCutOff = feed.getLateArrival().getCutOff()
+                        .toString();
+                endTime = evaluator.evaluate(endInstanceTime, Date.class);
+                long feedCutOffPeriod = evaluator.evaluate(lateCutOff,
+                        Long.class);
+                endTime = addTime(endTime, (int) feedCutOffPeriod);
+
+                if (endTime.after(feedCutOff)) {
+                    feedCutOff = endTime;
+                }
+            }
+            return feedCutOff;
+        } else {
+            throw new FalconException(
+                    "Invalid entity while getting cut-off time:"
+                            + entity.getName());
+        }
+    }
+
+    @Override
+    public void init(M delayQueue) throws FalconException {
+        super.init(delayQueue);
+        Thread daemon = new Thread(new LateRerunConsumer(this));
+        daemon.setName("LaterunHandler");
+        daemon.setDaemon(true);
+        daemon.start();
+        LOG.info("Laterun Handler  thread started");
+    }
+
+    public Path getLateLogPath(String logDir, String nominalTime,
+                               String srcClusterName) {
+        //SrcClusterName valid only in case of feed
+        return new Path(logDir + "/latedata/" + nominalTime + "/"
+                + (srcClusterName == null
+                ? "" : srcClusterName));
+
+    }
+
+    public Configuration getConfiguration(String cluster, String wfId)
+            throws FalconException {
+        Configuration conf = new Configuration();
+        conf.set(
+                CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
+                this.getWfEngine().getWorkflowProperty(cluster, wfId,
+                        AbstractWorkflowEngine.NAME_NODE));
+        return conf;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java
index 1901890..ce76842 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java
@@ -17,29 +17,31 @@
  */
 package org.apache.falcon.rerun.handler;
 
-import org.apache.falcon.rerun.event.RerunEvent.RerunType;
 import org.apache.falcon.rerun.event.LaterunEvent;
+import org.apache.falcon.rerun.event.RerunEvent.RerunType;
 import org.apache.falcon.rerun.event.RetryEvent;
 import org.apache.falcon.rerun.queue.DelayedQueue;
 
 public class RerunHandlerFactory {
 
-	private static final RetryHandler<DelayedQueue<RetryEvent>> retryHandler = new RetryHandler<DelayedQueue<RetryEvent>>();
-	private static final LateRerunHandler<DelayedQueue<LaterunEvent>> lateHandler = new LateRerunHandler<DelayedQueue<LaterunEvent>>();
+    private static final RetryHandler<DelayedQueue<RetryEvent>> retryHandler
+            = new RetryHandler<DelayedQueue<RetryEvent>>();
+    private static final LateRerunHandler<DelayedQueue<LaterunEvent>> lateHandler
+            = new LateRerunHandler<DelayedQueue<LaterunEvent>>();
 
-	private RerunHandlerFactory() {
+    private RerunHandlerFactory() {
 
-	}
+    }
 
-	public static AbstractRerunHandler getRerunHandler(RerunType type) {
-		switch (type) {
-		case RETRY:
-			return retryHandler;
-		case LATE:
-			return lateHandler;
-		default:
-			throw new RuntimeException("Invalid handler:" + type);
-		}
+    public static AbstractRerunHandler getRerunHandler(RerunType type) {
+        switch (type) {
+            case RETRY:
+                return retryHandler;
+            case LATE:
+                return lateHandler;
+            default:
+                throw new RuntimeException("Invalid handler:" + type);
+        }
 
-	}
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index a30d2da..c084233 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -17,82 +17,82 @@
  */
 package org.apache.falcon.rerun.handler;
 
-import java.util.Date;
-
 import org.apache.falcon.aspect.GenericAlert;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.rerun.event.RetryEvent;
 import org.apache.falcon.rerun.queue.DelayedQueue;
 import org.apache.falcon.util.StartupProperties;
 
+import java.util.Date;
+
 public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
-		extends AbstractRerunConsumer<RetryEvent, T> {
+        extends AbstractRerunConsumer<RetryEvent, T> {
 
-	public RetryConsumer(T handler) {
-		super(handler);
-	}
+    public RetryConsumer(T handler) {
+        super(handler);
+    }
 
-	@Override
-	protected void handleRerun(String cluster, String jobStatus,
-			RetryEvent message) {
-		try {
-			if (!jobStatus.equals("KILLED")) {
-				LOG.debug("Re-enqueing message in RetryHandler for workflow with same delay as job status is running:"
-						+ message.getWfId());
-				message.setMsgInsertTime(System.currentTimeMillis());
-				handler.offerToQueue(message);
-				return;
-			}
-			LOG.info("Retrying attempt:"
-					+ (message.getRunId() + 1)
-					+ " out of configured: "
-					+ message.getAttempts()
-					+ " attempt for instance::"
-					+ message.getEntityName()
-					+ ":"
-					+ message.getInstance()
-					+ " And WorkflowId: "
-					+ message.getWfId()
-					+ " At time: "
-					+ SchemaHelper.formatDateUTC(new Date(System
-							.currentTimeMillis())));
-			handler.getWfEngine().reRun(message.getClusterName(),
-					message.getWfId(), null);
-		} catch (Exception e) {
-			int maxFailRetryCount = Integer.parseInt(StartupProperties.get()
-					.getProperty("max.retry.failure.count", "1"));
-			if (message.getFailRetryCount() < maxFailRetryCount) {
-				LOG.warn(
-						"Retrying again for process instance "
-								+ message.getEntityName() + ":"
-								+ message.getInstance() + " after "
-								+ message.getDelayInMilliSec()
-								+ " seconds as Retry failed with message:", e);
-				message.setFailRetryCount(message.getFailRetryCount() + 1);
-				try {
-					handler.offerToQueue(message);
-				} catch (Exception ex) {
-					LOG.error("Unable to re-offer to queue:", ex);
-					GenericAlert.alertRetryFailed(message.getEntityType(),
-							message.getEntityName(), message.getInstance(),
-							message.getWfId(),
-							Integer.toString(message.getRunId()),
-							ex.getMessage());
-				}
-			} else {
-				LOG.warn(
-						"Failure retry attempts exhausted for instance: "
-								+ message.getEntityName() + ":"
-								+ message.getInstance(), e);
-				GenericAlert.alertRetryFailed(message.getEntityType(),
-						message.getEntityName(), message.getInstance(),
-						message.getWfId(),
-						Integer.toString(message.getRunId()),
-						"Failure retry attempts exhausted");
-			}
+    @Override
+    protected void handleRerun(String cluster, String jobStatus,
+                               RetryEvent message) {
+        try {
+            if (!jobStatus.equals("KILLED")) {
+                LOG.debug("Re-enqueing message in RetryHandler for workflow with same delay as job status is running:"
+                        + message.getWfId());
+                message.setMsgInsertTime(System.currentTimeMillis());
+                handler.offerToQueue(message);
+                return;
+            }
+            LOG.info("Retrying attempt:"
+                    + (message.getRunId() + 1)
+                    + " out of configured: "
+                    + message.getAttempts()
+                    + " attempt for instance::"
+                    + message.getEntityName()
+                    + ":"
+                    + message.getInstance()
+                    + " And WorkflowId: "
+                    + message.getWfId()
+                    + " At time: "
+                    + SchemaHelper.formatDateUTC(new Date(System
+                    .currentTimeMillis())));
+            handler.getWfEngine().reRun(message.getClusterName(),
+                    message.getWfId(), null);
+        } catch (Exception e) {
+            int maxFailRetryCount = Integer.parseInt(StartupProperties.get()
+                    .getProperty("max.retry.failure.count", "1"));
+            if (message.getFailRetryCount() < maxFailRetryCount) {
+                LOG.warn(
+                        "Retrying again for process instance "
+                                + message.getEntityName() + ":"
+                                + message.getInstance() + " after "
+                                + message.getDelayInMilliSec()
+                                + " seconds as Retry failed with message:", e);
+                message.setFailRetryCount(message.getFailRetryCount() + 1);
+                try {
+                    handler.offerToQueue(message);
+                } catch (Exception ex) {
+                    LOG.error("Unable to re-offer to queue:", ex);
+                    GenericAlert.alertRetryFailed(message.getEntityType(),
+                            message.getEntityName(), message.getInstance(),
+                            message.getWfId(),
+                            Integer.toString(message.getRunId()),
+                            ex.getMessage());
+                }
+            } else {
+                LOG.warn(
+                        "Failure retry attempts exhausted for instance: "
+                                + message.getEntityName() + ":"
+                                + message.getInstance(), e);
+                GenericAlert.alertRetryFailed(message.getEntityType(),
+                        message.getEntityName(), message.getInstance(),
+                        message.getWfId(),
+                        Integer.toString(message.getRunId()),
+                        "Failure retry attempts exhausted");
+            }
 
-		}
+        }
 
-	}
+    }
 
 }