You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/04/26 17:51:00 UTC

[44/47] git commit: checkstyle related fixes for rerun module.

checkstyle related fixes for rerun module.


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/6a39baf3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/6a39baf3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/6a39baf3

Branch: refs/heads/master
Commit: 6a39baf35709c9757c36952c8f1d9c13fed8da45
Parents: dbd980e
Author: venkatesh <ve...@hortonworks.com>
Authored: Tue Apr 23 13:27:20 2013 -0700
Committer: venkatesh <ve...@hortonworks.com>
Committed: Tue Apr 23 13:27:20 2013 -0700

----------------------------------------------------------------------
 .../apache/falcon/latedata/LateDataHandler.java    |   14 +-
 .../apache/falcon/rerun/event/LaterunEvent.java    |    6 +-
 .../org/apache/falcon/rerun/event/RerunEvent.java  |   13 +-
 .../falcon/rerun/event/RerunEventFactory.java      |   20 +-
 .../org/apache/falcon/rerun/event/RetryEvent.java  |    5 +
 .../rerun/handler/AbstractRerunConsumer.java       |    6 +
 .../falcon/rerun/handler/AbstractRerunHandler.java |   14 +-
 .../falcon/rerun/handler/LateRerunConsumer.java    |    6 +-
 .../falcon/rerun/handler/LateRerunHandler.java     |   24 +-
 .../falcon/rerun/handler/RerunHandlerFactory.java  |   29 ++-
 .../apache/falcon/rerun/handler/RetryConsumer.java |    8 +-
 .../apache/falcon/rerun/handler/RetryHandler.java  |   14 +-
 .../falcon/rerun/policy/AbstractRerunPolicy.java   |   10 +-
 .../falcon/rerun/policy/ExpBackoffPolicy.java      |   18 +-
 .../apache/falcon/rerun/policy/FinalPolicy.java    |    3 +
 .../apache/falcon/rerun/policy/PeriodicPolicy.java |    4 +
 .../falcon/rerun/policy/RerunPolicyFactory.java    |   21 +-
 .../apache/falcon/rerun/queue/ActiveMQueue.java    |   17 +-
 .../apache/falcon/rerun/queue/DelayedQueue.java    |    5 +-
 .../apache/falcon/rerun/queue/InMemoryQueue.java   |   17 +-
 .../falcon/rerun/service/LateRunService.java       |   17 +-
 .../apache/falcon/rerun/service/RetryService.java  |   14 +-
 .../falcon/rerun/AbstractRerunPolicyTest.java      |   32 ++--
 .../apache/falcon/rerun/handler/TestLateData.java  |  184 +++++++-------
 .../apache/falcon/rerun/queue/ActiveMQTest.java    |   10 +-
 .../falcon/rerun/queue/InMemoryQueueTest.java      |    9 +-
 26 files changed, 297 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index 00a2d87..95a3511 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -32,11 +32,12 @@ import java.io.*;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+/**
+ * A tool for late data handling.
+ */
 public class LateDataHandler extends Configured implements Tool {
 
-    private static Logger LOG = Logger.getLogger(LateDataHandler.class);
-
-    static PrintStream stream = System.out;
+    private static final Logger LOG = Logger.getLogger(LateDataHandler.class);
 
     public static void main(String[] args) throws Exception {
         Configuration conf = new Configuration();
@@ -110,9 +111,9 @@ public class LateDataHandler extends Configured implements Tool {
     }
 
     public String detectChanges(Path file, Map<String, Long> map, Configuration conf)
-            throws Exception {
+        throws Exception {
 
-        StringBuffer buffer = new StringBuffer();
+        StringBuilder buffer = new StringBuilder();
         BufferedReader in = new BufferedReader(new InputStreamReader(file
                 .getFileSystem(conf).open(file)));
         String line;
@@ -148,12 +149,11 @@ public class LateDataHandler extends Configured implements Tool {
         } finally {
             in.close();
         }
-
     }
 
     public long usage(Path inPath, Configuration conf) throws IOException {
         FileSystem fs = inPath.getFileSystem(conf);
-        FileStatus status[] = fs.globStatus(inPath);
+        FileStatus[] status = fs.globStatus(inPath);
         if (status == null || status.length == 0) {
             return 0;
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
index 7a22704..dcde876 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
@@ -17,14 +17,19 @@
  */
 package org.apache.falcon.rerun.event;
 
+/**
+ * Event representing a late rerun.
+ */
 public class LaterunEvent extends RerunEvent {
 
+    //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
     public LaterunEvent(String clusterName, String wfId, long msgInsertTime,
                         long delay, String entityType, String entityName, String instance,
                         int runId) {
         super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
                 instance, runId);
     }
+    //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
 
     @Override
     public String toString() {
@@ -34,5 +39,4 @@ public class LaterunEvent extends RerunEvent {
                 + "entityName=" + entityName + SEP + "instance=" + instance
                 + SEP + "runId=" + runId;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
index 5a1e3e1..9ae6458 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
@@ -21,10 +21,16 @@ import java.util.Date;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 
+/**
+ * Event representing a rerun.
+ */
 public class RerunEvent implements Delayed {
 
     protected static final String SEP = "*";
 
+    /**
+     * Rerun Event type.
+     */
     public enum RerunType {
         RETRY, LATE
     }
@@ -38,9 +44,9 @@ public class RerunEvent implements Delayed {
     protected String instance;
     protected int runId;
 
-    public RerunEvent(String clusterName, String wfId,
-                      long msgInsertTime, long delay, String entityType, String entityName,
-                      String instance, int runId) {
+    //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
+    public RerunEvent(String clusterName, String wfId, long msgInsertTime, long delay,
+                      String entityType, String entityName, String instance, int runId) {
         this.clusterName = clusterName;
         this.wfId = wfId;
         this.msgInsertTime = msgInsertTime;
@@ -50,6 +56,7 @@ public class RerunEvent implements Delayed {
         this.runId = runId;
         this.entityType = entityType;
     }
+    //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
 
     public String getClusterName() {
         return clusterName;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
index c5e1e80..54bbecf 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
@@ -22,6 +22,11 @@ import org.apache.falcon.rerun.event.RerunEvent.RerunType;
 import java.util.HashMap;
 import java.util.Map;
 
+/**
+ * Factory implementation to dole out specific implementations of RerunEvent.
+ *
+ * @param <T>
+ */
 public class RerunEventFactory<T extends RerunEvent> {
 
     public T getRerunEvent(String type, String line) {
@@ -38,9 +43,8 @@ public class RerunEventFactory<T extends RerunEvent> {
     private T lateEventFromString(String line) {
         Map<String, String> map = getMap(line);
         return (T) new LaterunEvent(map.get("clusterName"), map.get("wfId"),
-                Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map
-                .get("delayInMilliSec")), map.get("entityType"),
-                map.get("entityName"), map.get("instance"),
+                Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map.get("delayInMilliSec")),
+                map.get("entityType"), map.get("entityName"), map.get("instance"),
                 Integer.parseInt(map.get("runId")));
     }
 
@@ -48,12 +52,10 @@ public class RerunEventFactory<T extends RerunEvent> {
     public T retryEventFromString(String line) {
         Map<String, String> map = getMap(line);
         return (T) new RetryEvent(map.get("clusterName"), map.get("wfId"),
-                Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map
-                .get("delayInMilliSec")), map.get("entityType"),
-                map.get("entityName"), map.get("instance"),
-                Integer.parseInt(map.get("runId")), Integer.parseInt(map
-                .get("attempts")), Integer.parseInt(map
-                .get("failRetryCount")));
+                Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map.get("delayInMilliSec")),
+                map.get("entityType"), map.get("entityName"), map.get("instance"),
+                Integer.parseInt(map.get("runId")), Integer.parseInt(map.get("attempts")),
+                Integer.parseInt(map.get("failRetryCount")));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
index 33248b8..44bf96e 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
@@ -17,11 +17,15 @@
  */
 package org.apache.falcon.rerun.event;
 
+/**
+ * Event representing a retry.
+ */
 public class RetryEvent extends RerunEvent {
 
     private int attempts;
     private int failRetryCount;
 
+    //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
     public RetryEvent(String clusterName, String wfId, long msgInsertTime,
                       long delay, String entityType, String entityName, String instance,
                       int runId, int attempts, int failRetryCount) {
@@ -30,6 +34,7 @@ public class RetryEvent extends RerunEvent {
         this.attempts = attempts;
         this.failRetryCount = failRetryCount;
     }
+    //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
 
     public int getAttempts() {
         return attempts;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
index fa1d9e3..b073117 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
@@ -26,6 +26,12 @@ import org.apache.falcon.rerun.policy.ExpBackoffPolicy;
 import org.apache.falcon.rerun.queue.DelayedQueue;
 import org.apache.log4j.Logger;
 
+/**
+ * Base class for a rerun consumer.
+ *
+ * @param <T> a rerun event
+ * @param <M> a rerun handler
+ */
 public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends AbstractRerunHandler<T, DelayedQueue<T>>>
         implements Runnable {
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index 4a90b9f..8a41bf8 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -27,6 +27,12 @@ import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.log4j.Logger;
 
+/**
+ * Base class for handling reruns.
+ *
+ * @param <T> a rerun event
+ * @param <M> queue
+ */
 public abstract class AbstractRerunHandler<T extends RerunEvent, M extends DelayedQueue<T>> {
 
     protected static final Logger LOG = Logger
@@ -34,9 +40,9 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay
     protected M delayQueue;
     private AbstractWorkflowEngine wfEngine;
 
-    public void init(M delayQueue) throws FalconException {
+    public void init(M aDelayQueue) throws FalconException {
         this.wfEngine = WorkflowEngineFactory.getWorkflowEngine();
-        this.delayQueue = delayQueue;
+        this.delayQueue = aDelayQueue;
         this.delayQueue.init();
     }
 
@@ -60,13 +66,11 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay
         delayQueue.reconnect();
     }
 
-    public Entity getEntity(String entityType, String entityName)
-            throws FalconException {
+    public Entity getEntity(String entityType, String entityName) throws FalconException {
         return EntityUtil.getEntity(entityType, entityName);
     }
 
     public Retry getRetry(Entity entity) throws FalconException {
         return EntityUtil.getRetry(entity);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index 03561fc..4088a59 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -31,6 +31,11 @@ import org.apache.hadoop.fs.Path;
 
 import java.util.*;
 
+/**
+ * A consumer of late reruns.
+ *
+ * @param <T>
+ */
 public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEvent>>>
         extends AbstractRerunConsumer<LaterunEvent, T> {
 
@@ -140,5 +145,4 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
 
         return late.detectChanges(lateLogPath, feedSizes, conf);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index e24cc69..40c5d1c 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -40,6 +40,11 @@ import org.apache.hadoop.fs.Path;
 
 import java.util.Date;
 
+/**
+ * An implementation of handler for late reruns.
+ *
+ * @param <M>
+ */
 public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
         AbstractRerunHandler<LaterunEvent, M> {
 
@@ -78,9 +83,8 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
                         wfId));
                 if (fs.exists(lateLogPath)) {
                     boolean deleted = fs.delete(lateLogPath, true);
-                    if (deleted == true) {
-                        LOG.info("Successfully deleted late file path:"
-                                + lateLogPath);
+                    if (deleted) {
+                        LOG.info("Successfully deleted late file path:" + lateLogPath);
                     }
                 }
                 return;
@@ -102,8 +106,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
         }
     }
 
-    private long getEventDelay(Entity entity, String nominalTime)
-            throws FalconException {
+    private long getEventDelay(Entity entity, String nominalTime) throws FalconException {
 
         Date instanceDate = EntityUtil.parseDateUTC(nominalTime);
         LateProcess lateProcess = EntityUtil.getLateProcess(entity);
@@ -135,8 +138,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
         return new Date(date.getTime() + milliSecondsToAdd);
     }
 
-    public static Date getCutOffTime(Entity entity, String nominalTime)
-            throws FalconException {
+    public static Date getCutOffTime(Entity entity, String nominalTime) throws FalconException {
 
         ConfigurationStore store = ConfigurationStore.get();
         ExpressionHelper evaluator = ExpressionHelper.get();
@@ -193,8 +195,8 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
     }
 
     @Override
-    public void init(M delayQueue) throws FalconException {
-        super.init(delayQueue);
+    public void init(M aDelayQueue) throws FalconException {
+        super.init(aDelayQueue);
         Thread daemon = new Thread(new LateRerunConsumer(this));
         daemon.setName("LaterunHandler");
         daemon.setDaemon(true);
@@ -211,8 +213,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
 
     }
 
-    public Configuration getConfiguration(String cluster, String wfId)
-            throws FalconException {
+    public Configuration getConfiguration(String cluster, String wfId) throws FalconException {
         Configuration conf = new Configuration();
         conf.set(
                 CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
@@ -220,5 +221,4 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
                         AbstractWorkflowEngine.NAME_NODE));
         return conf;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java
index ce76842..3abfe5a 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java
@@ -22,26 +22,29 @@ import org.apache.falcon.rerun.event.RerunEvent.RerunType;
 import org.apache.falcon.rerun.event.RetryEvent;
 import org.apache.falcon.rerun.queue.DelayedQueue;
 
-public class RerunHandlerFactory {
+/**
+ * A factory implementation to dole out rerun handlers.
+ */
+public final class RerunHandlerFactory {
 
-    private static final RetryHandler<DelayedQueue<RetryEvent>> retryHandler
-            = new RetryHandler<DelayedQueue<RetryEvent>>();
-    private static final LateRerunHandler<DelayedQueue<LaterunEvent>> lateHandler
-            = new LateRerunHandler<DelayedQueue<LaterunEvent>>();
+    private static final RetryHandler<DelayedQueue<RetryEvent>> RETRY_HANDLER
+        = new RetryHandler<DelayedQueue<RetryEvent>>();
+    private static final LateRerunHandler<DelayedQueue<LaterunEvent>> LATE_HANDLER
+        = new LateRerunHandler<DelayedQueue<LaterunEvent>>();
 
     private RerunHandlerFactory() {
-
     }
 
     public static AbstractRerunHandler getRerunHandler(RerunType type) {
         switch (type) {
-            case RETRY:
-                return retryHandler;
-            case LATE:
-                return lateHandler;
-            default:
-                throw new RuntimeException("Invalid handler:" + type);
-        }
+        case RETRY:
+            return RETRY_HANDLER;
+
+        case LATE:
+            return LATE_HANDLER;
 
+        default:
+            throw new RuntimeException("Invalid handler:" + type);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index c084233..63dade8 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -25,6 +25,11 @@ import org.apache.falcon.util.StartupProperties;
 
 import java.util.Date;
 
+/**
+ * A consumer of retry events which reruns the workflow in the workflow engine.
+ *
+ * @param <T>
+ */
 public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
         extends AbstractRerunConsumer<RetryEvent, T> {
 
@@ -90,9 +95,6 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
                         Integer.toString(message.getRunId()),
                         "Failure retry attempts exhausted");
             }
-
         }
-
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index 188c397..5bd8fd8 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -28,6 +28,12 @@ import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
 import org.apache.falcon.rerun.policy.RerunPolicyFactory;
 import org.apache.falcon.rerun.queue.DelayedQueue;
 
+/**
+ * An implementation of retry handler that kicks off retries until the
+ * configured attempts are exhausted.
+ *
+ * @param <M>
+ */
 public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
         AbstractRerunHandler<RetryEvent, M> {
 
@@ -80,14 +86,12 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
     }
 
     @Override
-    public void init(M queue) throws FalconException {
-        super.init(queue);
+    public void init(M aDelayQueue) throws FalconException {
+        super.init(aDelayQueue);
         Thread daemon = new Thread(new RetryConsumer(this));
         daemon.setName("RetryHandler");
         daemon.setDaemon(true);
         daemon.start();
-        LOG.info("RetryHandler  thread started");
-
+        LOG.info("RetryHandler thread started.");
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/policy/AbstractRerunPolicy.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/policy/AbstractRerunPolicy.java b/rerun/src/main/java/org/apache/falcon/rerun/policy/AbstractRerunPolicy.java
index 4bfbef2..6a30294 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/policy/AbstractRerunPolicy.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/policy/AbstractRerunPolicy.java
@@ -23,21 +23,21 @@ import org.apache.falcon.expression.ExpressionHelper;
 
 import java.util.Date;
 
+/**
+ * Base class for Rerun Policy.
+ */
 public abstract class AbstractRerunPolicy {
 
-    public long getDurationInMilliSec(Frequency frequency)
-            throws FalconException {
+    public long getDurationInMilliSec(Frequency frequency) throws FalconException {
         ExpressionHelper helper = ExpressionHelper.get();
         return helper.evaluate(frequency.toString(), Long.class);
-
     }
 
     public static Date addTime(Date date, int milliSecondsToAdd) {
         return new Date(date.getTime() + milliSecondsToAdd);
     }
 
-    public abstract long getDelay(Frequency delay, int eventNumber)
-            throws FalconException;
+    public abstract long getDelay(Frequency delay, int eventNumber) throws FalconException;
 
     public abstract long getDelay(Frequency delay, Date nominaltime,
                                   Date cutOffTime) throws FalconException;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/policy/ExpBackoffPolicy.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/policy/ExpBackoffPolicy.java b/rerun/src/main/java/org/apache/falcon/rerun/policy/ExpBackoffPolicy.java
index b23e014..86832ef 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/policy/ExpBackoffPolicy.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/policy/ExpBackoffPolicy.java
@@ -23,23 +23,22 @@ import org.apache.falcon.expression.ExpressionHelper;
 
 import java.util.Date;
 
+/**
+ * An implementation of the Rerun policy that backs off exponentially.
+ */
 public class ExpBackoffPolicy extends AbstractRerunPolicy {
 
     @Override
-    public long getDelay(Frequency delay, int eventNumber)
-            throws FalconException {
-        return (long) (getDurationInMilliSec(delay) * Math.pow(getPower(),
-                eventNumber));
+    public long getDelay(Frequency delay, int eventNumber) throws FalconException {
+        return (long) (getDurationInMilliSec(delay) * Math.pow(getPower(), eventNumber));
     }
 
     @Override
-    public long getDelay(Frequency delay, Date nominalTime, Date cutOffTime)
-            throws FalconException {
+    public long getDelay(Frequency delay, Date nominalTime, Date cutOffTime) throws FalconException {
         ExpressionHelper evaluator = ExpressionHelper.get();
         Date now = new Date();
         Date lateTime = nominalTime;
-        long delayMilliSeconds = evaluator.evaluate(delay.toString(),
-                Long.class);
+        long delayMilliSeconds = evaluator.evaluate(delay.toString(), Long.class);
         int factor = 1;
         // TODO we can get rid of this using formula
         while (lateTime.compareTo(now) <= 0) {
@@ -49,12 +48,11 @@ public class ExpBackoffPolicy extends AbstractRerunPolicy {
         if (lateTime.after(cutOffTime)) {
             lateTime = cutOffTime;
         }
-        return (lateTime.getTime() - nominalTime.getTime());
 
+        return (lateTime.getTime() - nominalTime.getTime());
     }
 
     protected int getPower() {
         return 2;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/policy/FinalPolicy.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/policy/FinalPolicy.java b/rerun/src/main/java/org/apache/falcon/rerun/policy/FinalPolicy.java
index 19fe8f7..8dd9c29 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/policy/FinalPolicy.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/policy/FinalPolicy.java
@@ -23,6 +23,9 @@ import org.apache.falcon.entity.v0.Frequency;
 
 import java.util.Date;
 
+/**
+ * An implementation of the Rerun policy that does rerun only once.
+ */
 public class FinalPolicy extends AbstractRerunPolicy {
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/policy/PeriodicPolicy.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/policy/PeriodicPolicy.java b/rerun/src/main/java/org/apache/falcon/rerun/policy/PeriodicPolicy.java
index 9bcca26..b8c69d2 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/policy/PeriodicPolicy.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/policy/PeriodicPolicy.java
@@ -17,7 +17,11 @@
  */
 package org.apache.falcon.rerun.policy;
 
+/**
+ * An implementation of the Rerun policy that is periodic.
+ */
 public class PeriodicPolicy extends ExpBackoffPolicy {
+
     @Override
     protected int getPower() {
         return 1;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/policy/RerunPolicyFactory.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/policy/RerunPolicyFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/policy/RerunPolicyFactory.java
index 5ee902a..945b6c5 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/policy/RerunPolicyFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/policy/RerunPolicyFactory.java
@@ -19,7 +19,10 @@ package org.apache.falcon.rerun.policy;
 
 import org.apache.falcon.entity.v0.process.PolicyType;
 
-public class RerunPolicyFactory {
+/**
+ * A factory implementation to dole out rerun/retry policy.
+ */
+public final class RerunPolicyFactory {
 
     private RerunPolicyFactory() {
         // factory
@@ -27,15 +30,17 @@ public class RerunPolicyFactory {
 
     public static AbstractRerunPolicy getRetryPolicy(PolicyType latePolicy) {
         switch (latePolicy) {
-            case PERIODIC:
-                return new PeriodicPolicy();
+        case PERIODIC:
+            return new PeriodicPolicy();
+
+        case EXP_BACKOFF:
+            return new ExpBackoffPolicy();
 
-            case EXP_BACKOFF:
-                return new ExpBackoffPolicy();
+        case FINAL:
+            return new FinalPolicy();
 
-            case FINAL:
-                return new FinalPolicy();
+        default:
+            throw new IllegalArgumentException("Unhandled Retry policy: " + latePolicy);
         }
-        throw new IllegalArgumentException("Unhandled Retry policy: " + latePolicy);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
index 82dfc22..3fa5282 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
@@ -28,6 +28,11 @@ import javax.jms.*;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+/**
+ * An ActiveMQ implementation for DelayedQueue.
+ *
+ * @param <T>
+ */
 public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
 
     private ActiveMQConnection connection;
@@ -65,8 +70,8 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
         if (connection == null) {
             init();
         }
-        Session session = connection.createSession(false,
-                Session.AUTO_ACKNOWLEDGE);
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         return session;
     }
 
@@ -87,8 +92,6 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
 
     @Override
     public void populateQueue(List<T> events) {
-        // TODO Auto-generated method stub
-
     }
 
     @Override
@@ -127,21 +130,25 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
             producer.close();
             LOG.info("Producer closed successfully");
         } catch (Exception ignore) {
+            LOG.info("Producer could not be closed");
         }
+
         try {
             LOG.info("Attempting to close consumer");
             consumer.close();
             LOG.info("Consumer closed successfully");
         } catch (Exception ignore) {
+            LOG.info("Consumer could not be closed");
         }
+
         try {
             LOG.info("Attempting to close connection");
             connection.close();
             LOG.info("Connection closed successfully");
         } catch (Exception ignore) {
+            LOG.info("Connection could not be closed");
         }
 
         init();
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java
index 7e55206..393a8e5 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java
@@ -23,6 +23,10 @@ import org.apache.log4j.Logger;
 
 import java.util.List;
 
+/**
+ * Base class for a Queue implementation.
+ * @param <T>
+ */
 public abstract class DelayedQueue<T extends RerunEvent> {
     public static final Logger LOG = Logger.getLogger(DelayedQueue.class);
 
@@ -35,5 +39,4 @@ public abstract class DelayedQueue<T extends RerunEvent> {
     public abstract void init();
 
     public abstract void reconnect() throws FalconException;
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
index 7243f4d..06feb92 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
@@ -28,14 +28,19 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.DelayQueue;
 
+/**
+ * An in-memory implementation of a DelayedQueue.
+ * @param <T>
+ */
 public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> {
     public static final Logger LOG = Logger.getLogger(DelayedQueue.class);
-    protected DelayQueue<T> QUEUE = new DelayQueue<T>();
+
+    protected DelayQueue<T> delayQueue = new DelayQueue<T>();
     private File serializeFilePath;
 
     @Override
     public boolean offer(T event) {
-        boolean flag = QUEUE.offer(event);
+        boolean flag = delayQueue.offer(event);
         beforeRetry(event);
         LOG.debug("Enqueued Message:" + event.toString());
         return flag;
@@ -45,7 +50,7 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> {
     public T take() throws FalconException {
         T event;
         try {
-            event = QUEUE.take();
+            event = delayQueue.take();
             LOG.debug("Dequeued Message:" + event.toString());
             afterRetry(event);
         } catch (InterruptedException e) {
@@ -60,16 +65,14 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> {
 
     public void populateQueue(List<T> events) {
         for (T event : events) {
-            QUEUE.offer(event);
+            delayQueue.offer(event);
         }
-
     }
 
     @Override
     public void init() {
         List<T> events = bootstrap();
         populateQueue(events);
-
     }
 
     @Override
@@ -135,7 +138,7 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> {
                                 + rerunFile.getAbsolutePath(), e);
             }
         }
-        return rerunEvents;
 
+        return rerunEvents;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
index f8edfbc..cdbadaa 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
@@ -29,6 +29,9 @@ import org.apache.log4j.Logger;
 
 import java.io.File;
 
+/**
+ * A service implementation for Late Rerun initialized at startup.
+ */
 public class LateRunService implements FalconService {
 
     private static final Logger LOG = Logger.getLogger(LateRunService.class);
@@ -40,20 +43,18 @@ public class LateRunService implements FalconService {
 
     @Override
     public void init() throws FalconException {
-        AbstractRerunHandler<LaterunEvent, ActiveMQueue<LaterunEvent>> rerunHandler = RerunHandlerFactory
-                .getRerunHandler(RerunType.LATE);
+        AbstractRerunHandler<LaterunEvent, ActiveMQueue<LaterunEvent>> rerunHandler =
+            RerunHandlerFactory.getRerunHandler(RerunType.LATE);
         ActiveMQueue<LaterunEvent> queue = new ActiveMQueue<LaterunEvent>(
-                StartupProperties
-                        .get()
-                        .getProperty("broker.url",
-                                "failover:(tcp://localhost:61616)?initialReconnectDelay=5000"),
+                StartupProperties.get()
+                    .getProperty("broker.url", "failover:(tcp://localhost:61616)?initialReconnectDelay=5000"),
                 "falcon.late.queue");
         rerunHandler.init(queue);
     }
 
     @Override
     public void destroy() throws FalconException {
-        LOG.info("LateRun  thread destroyed");
+        LOG.info("LateRun thread destroyed");
     }
 
     private File getBasePath() {
@@ -64,7 +65,7 @@ public class LateRunService implements FalconService {
             throw new RuntimeException("Unable to initialize late recorder @"
                     + basePath);
         }
+
         return basePath;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/service/RetryService.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/service/RetryService.java b/rerun/src/main/java/org/apache/falcon/rerun/service/RetryService.java
index 8a902be..b989acd 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/service/RetryService.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/service/RetryService.java
@@ -30,6 +30,9 @@ import org.apache.log4j.Logger;
 
 import java.io.File;
 
+/**
+ * A service implementation for Retry initialized at startup.
+ */
 public class RetryService implements FalconService {
 
     private static final Logger LOG = Logger.getLogger(RetryService.class);
@@ -41,16 +44,15 @@ public class RetryService implements FalconService {
 
     @Override
     public void init() throws FalconException {
-        AbstractRerunHandler<RetryEvent, DelayedQueue<RetryEvent>> rerunHandler = RerunHandlerFactory
-                .getRerunHandler(RerunType.RETRY);
-        InMemoryQueue<RetryEvent> queue = new InMemoryQueue<RetryEvent>(
-                getBasePath());
+        AbstractRerunHandler<RetryEvent, DelayedQueue<RetryEvent>> rerunHandler =
+            RerunHandlerFactory.getRerunHandler(RerunType.RETRY);
+        InMemoryQueue<RetryEvent> queue = new InMemoryQueue<RetryEvent>(getBasePath());
         rerunHandler.init(queue);
     }
 
     @Override
     public void destroy() throws FalconException {
-        LOG.info("RetryHandler  thread destroyed");
+        LOG.info("RetryHandler thread destroyed");
     }
 
     private File getBasePath() {
@@ -61,7 +63,7 @@ public class RetryService implements FalconService {
             throw new RuntimeException("Unable to initialize retry recorder @"
                     + basePath);
         }
+
         return basePath;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/test/java/org/apache/falcon/rerun/AbstractRerunPolicyTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/AbstractRerunPolicyTest.java b/rerun/src/test/java/org/apache/falcon/rerun/AbstractRerunPolicyTest.java
index be74032..630ef00 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/AbstractRerunPolicyTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/AbstractRerunPolicyTest.java
@@ -27,23 +27,23 @@ import org.testng.annotations.Test;
 
 import java.util.Date;
 
+/**
+ * Base test class for Rerun Policy.
+ */
 public class AbstractRerunPolicyTest {
 
     @Test
-    public void TestGetDurationInMillis() throws FalconException {
+    public void testGetDurationInMillis() throws FalconException {
         AbstractRerunPolicy policy = new AbstractRerunPolicy() {
 
             @Override
             public long getDelay(Frequency delay, Date nominaltime,
                                  Date cutOffTime) throws FalconException {
-                // TODO Auto-generated method stub
                 return 0;
             }
 
             @Override
-            public long getDelay(Frequency delay, int eventNumber)
-                    throws FalconException {
-                // TODO Auto-generated method stub
+            public long getDelay(Frequency delay, int eventNumber) throws FalconException {
                 return 0;
             }
         };
@@ -57,26 +57,26 @@ public class AbstractRerunPolicyTest {
     }
 
     @Test
-    public void TestExpBackoffPolicy() throws FalconException {
+    public void testExpBackoffPolicy() throws FalconException {
         AbstractRerunPolicy backoff = new ExpBackoffPolicy();
         long delay = backoff.getDelay(new Frequency("minutes(2)"), 2);
         Assert.assertEquals(delay, 480000);
 
         long currentTime = System.currentTimeMillis();
-        delay = backoff.getDelay(new Frequency("minutes(2)"), new Date(
-                currentTime - 1 * 4 * 60 * 1000), new Date(currentTime + 1 * 60
-                * 60 * 1000));
+        delay = backoff.getDelay(new Frequency("minutes(2)"),
+                new Date(currentTime - 1 * 4 * 60 * 1000),
+                new Date(currentTime + 1 * 60 * 60 * 1000));
         Assert.assertEquals(delay, 1 * 6 * 60 * 1000);
 
         currentTime = System.currentTimeMillis();
-        delay = backoff.getDelay(new Frequency("minutes(1)"), new Date(
-                currentTime - 1 * 9 * 60 * 1000), new Date(currentTime + 1 * 60
-                * 60 * 1000));
+        delay = backoff.getDelay(new Frequency("minutes(1)"),
+                new Date(currentTime - 1 * 9 * 60 * 1000),
+                new Date(currentTime + 1 * 60 * 60 * 1000));
         Assert.assertEquals(delay, 900000);
     }
 
     @Test
-    public void TestPeriodicPolicy() throws FalconException, InterruptedException {
+    public void testPeriodicPolicy() throws FalconException, InterruptedException {
         AbstractRerunPolicy periodic = new PeriodicPolicy();
         long delay = periodic.getDelay(new Frequency("minutes(2)"), 2);
         Assert.assertEquals(delay, 120000);
@@ -85,9 +85,9 @@ public class AbstractRerunPolicyTest {
 
         long currentTime = System.currentTimeMillis();
         //Thread.sleep(1000);
-        delay = periodic.getDelay(new Frequency("minutes(3)"), new Date(
-                currentTime), new Date(currentTime + 1 * 60
-                * 60 * 1000));
+        delay = periodic.getDelay(new Frequency("minutes(3)"),
+                new Date(currentTime),
+                new Date(currentTime + 1 * 60 * 60 * 1000));
         Assert.assertEquals(delay, 180000);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateData.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateData.java b/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateData.java
index 6028e10..efd51b1 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateData.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateData.java
@@ -18,7 +18,6 @@
 
 package org.apache.falcon.rerun.handler;
 
-import com.sun.jersey.api.client.WebResource;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.store.ConfigurationStore;
@@ -39,14 +38,16 @@ import javax.xml.bind.Unmarshaller;
 import java.io.StringWriter;
 import java.util.Collection;
 
+/**
+ * Test class for Late data handling.
+ */
 public class TestLateData {
 
     protected static final String FEED_XML = "/feed-template.xml";
-    protected static String CLUSTER_XML = "/cluster-template.xml";
+    protected static final String CLUSTER_XML = "/cluster-template.xml";
     protected static final String PROCESS_XML = "/process-template.xml";
     protected static final String PROCESS_XML2 = "/process-template2.xml";
 
-    protected WebResource service = null;
     protected Configuration conf = new Configuration();
 
     @BeforeClass
@@ -75,24 +76,27 @@ public class TestLateData {
         ConfigurationStore store = ConfigurationStore.get();
         store.remove(type, name);
         switch (type) {
-            case CLUSTER:
-                Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(CLUSTER_XML));
-                cluster.setName(name);
-                ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(conf.get("fs.default.name"));
-                store.publish(type, cluster);
-                break;
-
-            case FEED:
-                Feed feed = (Feed) unmarshaller.unmarshal(this.getClass().getResource(FEED_XML));
-                feed.setName(name);
-                store.publish(type, feed);
-                break;
-
-            case PROCESS:
-                Process process = (Process) unmarshaller.unmarshal(this.getClass().getResource(PROCESS_XML));
-                process.setName(name);
-                store.publish(type, process);
-                break;
+        case CLUSTER:
+            Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(CLUSTER_XML));
+            cluster.setName(name);
+            ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(conf.get("fs.default.name"));
+            store.publish(type, cluster);
+            break;
+
+        case FEED:
+            Feed feed = (Feed) unmarshaller.unmarshal(this.getClass().getResource(FEED_XML));
+            feed.setName(name);
+            store.publish(type, feed);
+            break;
+
+        case PROCESS:
+            Process process = (Process) unmarshaller.unmarshal(this.getClass().getResource(PROCESS_XML));
+            process.setName(name);
+            store.publish(type, process);
+            break;
+
+        default:
+            throw new IllegalArgumentException("Invalid entity type: " + type);
         }
     }
 
@@ -113,82 +117,78 @@ public class TestLateData {
         store.publish(EntityType.PROCESS, process);
     }
 
-    public String marshallEntity(final Entity entity) throws FalconException,
-                                                             JAXBException {
+    public String marshallEntity(final Entity entity) throws FalconException, JAXBException {
         Marshaller marshaller = entity.getEntityType().getMarshaller();
         StringWriter stringWriter = new StringWriter();
         marshaller.marshal(entity, stringWriter);
         return stringWriter.toString();
     }
 
-//	@Test
-//	private void TestLateWhenInstanceRunning() throws Exception
-//	{
-//		try{
-//        WorkflowEngine engine = Mockito.mock(WorkflowEngine.class);
-//        when(engine.instanceStatus("testCluster", "123")).thenReturn("RUNNING");
-//        
-//		ConfigurationStore store = ConfigurationStore.get();
-//		setup();
-//		String nominalTime = EntityUtil.formatDateUTC(new Date(System.currentTimeMillis() - 1800000));
-//        InMemoryQueue<LaterunEvent> queue = new InMemoryQueue<LaterunEvent>(new File("target/late"));
-//        latedataHandler.init(queue);
-//        
-//        AbstractRerunHandler handle = RerunHandlerFactory.getRerunHandler(RerunEvent.RerunType.LATE);
-//        handle.handleRerun("sample", nominalTime, "123", "123", engine, System.currentTimeMillis());
-//        
-//        File directory = new File("target/late");
-//        File[] files = directory.listFiles();
-//        int noFilesBefore = files.length;
-//        
-//        Thread.sleep(90000);
-//        
-//        files = directory.listFiles();
-//        int noFilesAfterRetry = files.length;        
-//        Assert.assertNotSame(noFilesBefore, noFilesAfterRetry);
-//		}
-//		catch (Exception e){
-//			Assert.fail("Not expecting any exception");
-//		}
-//        
-//	}
-//	
-//	
-//	@Test
-//	private void TestLateWhenDataPresent() throws Exception {
-//		WorkflowEngine engine = Mockito.mock(WorkflowEngine.class);
-//		when(engine.instanceStatus("testCluster", "123")).thenReturn(
-//				"SUCCEEDED");
-//
-//		LateRerunConsumer consumer = Mockito.mock(LateRerunConsumer.class);
-//		when(consumer.detectLate(Mockito.any(LaterunEvent.class))).thenReturn(
-//				"new data found");
-//
-//		String nominalTime = EntityUtil.formatDateUTC(new Date(System
-//				.currentTimeMillis() - 1800000));
-//		AbstractRerunHandler handle = RerunHandlerFactory
-//				.getRerunHandler(RerunEvent.RerunType.LATE);
-//
-//		ConfigurationStore store = ConfigurationStore.get();
-//		setup();
-//
-//		InMemoryQueue<LaterunEvent> queue = new InMemoryQueue<LaterunEvent>(
-//				new File("target/late"));
-//		latedataHandler.init(queue);
-//
-//		handle.handleRerun("sample", nominalTime, "123", "123", engine,
-//				System.currentTimeMillis());
-//
-//		File directory = new File("target/late");
-//		File[] files = directory.listFiles();
-//		int noFilesBefore = files.length;
-//
-//		Thread.sleep(90000);
-//
-//		files = directory.listFiles();
-//		int noFilesAfterRetry = files.length;
-//		Assert.assertNotSame(noFilesBefore, noFilesAfterRetry);
-//
-//	}
+/*
+    @Test
+    private void TestLateWhenInstanceRunning() throws Exception {
+        try {
+            WorkflowEngine engine = Mockito.mock(WorkflowEngine.class);
+            when(engine.instanceStatus("testCluster", "123")).thenReturn("RUNNING");
+
+            ConfigurationStore store = ConfigurationStore.get();
+            setup();
+            String nominalTime = EntityUtil.formatDateUTC(new Date(System.currentTimeMillis() - 1800000));
+            InMemoryQueue<LaterunEvent> queue = new InMemoryQueue<LaterunEvent>(new File("target/late"));
+            latedataHandler.init(queue);
+
+            AbstractRerunHandler handle = RerunHandlerFactory.getRerunHandler(RerunEvent.RerunType.LATE);
+            handle.handleRerun("sample", nominalTime, "123", "123", engine, System.currentTimeMillis());
+
+            File directory = new File("target/late");
+            File[] files = directory.listFiles();
+            int noFilesBefore = files.length;
+
+            Thread.sleep(90000);
+
+            files = directory.listFiles();
+            int noFilesAfterRetry = files.length;
+            Assert.assertNotSame(noFilesBefore, noFilesAfterRetry);
+        } catch (Exception e) {
+            Assert.fail("Not expecting any exception");
+        }
+    }
+
+    @Test
+    private void TestLateWhenDataPresent() throws Exception {
+        WorkflowEngine engine = Mockito.mock(WorkflowEngine.class);
+        when(engine.instanceStatus("testCluster", "123")).thenReturn(
+                "SUCCEEDED");
+
+        LateRerunConsumer consumer = Mockito.mock(LateRerunConsumer.class);
+        when(consumer.detectLate(Mockito.any(LaterunEvent.class))).thenReturn(
+                "new data found");
 
+        String nominalTime = EntityUtil.formatDateUTC(new Date(System
+                .currentTimeMillis() - 1800000));
+        AbstractRerunHandler handle = RerunHandlerFactory
+                .getRerunHandler(RerunEvent.RerunType.LATE);
+
+        ConfigurationStore store = ConfigurationStore.get();
+        setup();
+
+        InMemoryQueue<LaterunEvent> queue = new InMemoryQueue<LaterunEvent>(
+                new File("target/late"));
+        latedataHandler.init(queue);
+
+        handle.handleRerun("sample", nominalTime, "123", "123", engine,
+                System.currentTimeMillis());
+
+        File directory = new File("target/late");
+        File[] files = directory.listFiles();
+        int noFilesBefore = files.length;
+
+        Thread.sleep(90000);
+
+        files = directory.listFiles();
+        int noFilesAfterRetry = files.length;
+        Assert.assertNotSame(noFilesBefore, noFilesAfterRetry);
+
+    }
+*/
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
index a13aa02..a8f3885 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
@@ -24,11 +24,14 @@ import org.apache.falcon.rerun.event.RerunEvent;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+/**
+ * Test class for ActiveMQ delayed queue implementation.
+ */
 public class ActiveMQTest {
 
     private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
     private BrokerService broker;
-    private static final String DESTI = "activemq.topic";
+    private static final String DESTINATION = "activemq.topic";
 
     @BeforeClass
     private void setup() throws Exception {
@@ -43,8 +46,9 @@ public class ActiveMQTest {
     @Test
     public void testBrokerStartAndEnqueue() {
         ActiveMQueue<RerunEvent> activeMQueue = new ActiveMQueue<RerunEvent>(
-                BROKER_URL, DESTI);
+                BROKER_URL, DESTINATION);
         activeMQueue.init();
+
         RerunEvent event = new LaterunEvent("clusterName", "wfId",
                 System.currentTimeMillis(), 60 * 1000, "entityType",
                 "entityName", "instance", 0);
@@ -58,7 +62,5 @@ public class ActiveMQTest {
         } catch (Exception e) {
             Assert.fail();
         }
-
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
index d41453b..6aafaa5 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
@@ -25,6 +25,9 @@ import org.testng.annotations.Test;
 import java.io.File;
 import java.util.LinkedList;
 
+/**
+ * Test class for the InMemory Queue implementation of the DelayedQueue.
+ */
 public class InMemoryQueueTest {
 
     @Test(timeOut = 10000)
@@ -47,8 +50,8 @@ public class InMemoryQueueTest {
             boolean inserted = false;
             for (int posn = 0; posn < events.size(); posn++) {
                 MyEvent thisEvent = events.get(posn);
-                if (thisEvent.getDelayInMilliSec() + thisEvent.getMsgInsertTime() >
-                        event.getDelayInMilliSec() + event.getMsgInsertTime()) {
+                if (thisEvent.getDelayInMilliSec() + thisEvent.getMsgInsertTime()
+                        > event.getDelayInMilliSec() + event.getMsgInsertTime()) {
                     events.add(posn, event);
                     inserted = true;
                     break;
@@ -67,12 +70,14 @@ public class InMemoryQueueTest {
 
     private class MyEvent extends RerunEvent {
 
+        //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
         public MyEvent(String clusterName, String wfId,
                        long msgInsertTime, long delay, String entityType,
                        String entityName, String instance, int runId) {
             super(clusterName, wfId, msgInsertTime, delay,
                     entityType, entityName, instance, runId);
         }
+        //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
 
         @Override
         public RerunType getType() {