You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/04/26 17:51:00 UTC
[44/47] git commit: checkstyle related fixes for rerun module.
checkstyle related fixes for rerun module.
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/6a39baf3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/6a39baf3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/6a39baf3
Branch: refs/heads/master
Commit: 6a39baf35709c9757c36952c8f1d9c13fed8da45
Parents: dbd980e
Author: venkatesh <ve...@hortonworks.com>
Authored: Tue Apr 23 13:27:20 2013 -0700
Committer: venkatesh <ve...@hortonworks.com>
Committed: Tue Apr 23 13:27:20 2013 -0700
----------------------------------------------------------------------
.../apache/falcon/latedata/LateDataHandler.java | 14 +-
.../apache/falcon/rerun/event/LaterunEvent.java | 6 +-
.../org/apache/falcon/rerun/event/RerunEvent.java | 13 +-
.../falcon/rerun/event/RerunEventFactory.java | 20 +-
.../org/apache/falcon/rerun/event/RetryEvent.java | 5 +
.../rerun/handler/AbstractRerunConsumer.java | 6 +
.../falcon/rerun/handler/AbstractRerunHandler.java | 14 +-
.../falcon/rerun/handler/LateRerunConsumer.java | 6 +-
.../falcon/rerun/handler/LateRerunHandler.java | 24 +-
.../falcon/rerun/handler/RerunHandlerFactory.java | 29 ++-
.../apache/falcon/rerun/handler/RetryConsumer.java | 8 +-
.../apache/falcon/rerun/handler/RetryHandler.java | 14 +-
.../falcon/rerun/policy/AbstractRerunPolicy.java | 10 +-
.../falcon/rerun/policy/ExpBackoffPolicy.java | 18 +-
.../apache/falcon/rerun/policy/FinalPolicy.java | 3 +
.../apache/falcon/rerun/policy/PeriodicPolicy.java | 4 +
.../falcon/rerun/policy/RerunPolicyFactory.java | 21 +-
.../apache/falcon/rerun/queue/ActiveMQueue.java | 17 +-
.../apache/falcon/rerun/queue/DelayedQueue.java | 5 +-
.../apache/falcon/rerun/queue/InMemoryQueue.java | 17 +-
.../falcon/rerun/service/LateRunService.java | 17 +-
.../apache/falcon/rerun/service/RetryService.java | 14 +-
.../falcon/rerun/AbstractRerunPolicyTest.java | 32 ++--
.../apache/falcon/rerun/handler/TestLateData.java | 184 +++++++-------
.../apache/falcon/rerun/queue/ActiveMQTest.java | 10 +-
.../falcon/rerun/queue/InMemoryQueueTest.java | 9 +-
26 files changed, 297 insertions(+), 223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index 00a2d87..95a3511 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -32,11 +32,12 @@ import java.io.*;
import java.util.LinkedHashMap;
import java.util.Map;
+/**
+ * A tool for late data handling.
+ */
public class LateDataHandler extends Configured implements Tool {
- private static Logger LOG = Logger.getLogger(LateDataHandler.class);
-
- static PrintStream stream = System.out;
+ private static final Logger LOG = Logger.getLogger(LateDataHandler.class);
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
@@ -110,9 +111,9 @@ public class LateDataHandler extends Configured implements Tool {
}
public String detectChanges(Path file, Map<String, Long> map, Configuration conf)
- throws Exception {
+ throws Exception {
- StringBuffer buffer = new StringBuffer();
+ StringBuilder buffer = new StringBuilder();
BufferedReader in = new BufferedReader(new InputStreamReader(file
.getFileSystem(conf).open(file)));
String line;
@@ -148,12 +149,11 @@ public class LateDataHandler extends Configured implements Tool {
} finally {
in.close();
}
-
}
public long usage(Path inPath, Configuration conf) throws IOException {
FileSystem fs = inPath.getFileSystem(conf);
- FileStatus status[] = fs.globStatus(inPath);
+ FileStatus[] status = fs.globStatus(inPath);
if (status == null || status.length == 0) {
return 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
index 7a22704..dcde876 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
@@ -17,14 +17,19 @@
*/
package org.apache.falcon.rerun.event;
+/**
+ * Event representing a late rerun.
+ */
public class LaterunEvent extends RerunEvent {
+ //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
public LaterunEvent(String clusterName, String wfId, long msgInsertTime,
long delay, String entityType, String entityName, String instance,
int runId) {
super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
instance, runId);
}
+ //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
@Override
public String toString() {
@@ -34,5 +39,4 @@ public class LaterunEvent extends RerunEvent {
+ "entityName=" + entityName + SEP + "instance=" + instance
+ SEP + "runId=" + runId;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
index 5a1e3e1..9ae6458 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
@@ -21,10 +21,16 @@ import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
+/**
+ * Event representing a rerun.
+ */
public class RerunEvent implements Delayed {
protected static final String SEP = "*";
+ /**
+ * Rerun Event type.
+ */
public enum RerunType {
RETRY, LATE
}
@@ -38,9 +44,9 @@ public class RerunEvent implements Delayed {
protected String instance;
protected int runId;
- public RerunEvent(String clusterName, String wfId,
- long msgInsertTime, long delay, String entityType, String entityName,
- String instance, int runId) {
+ //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
+ public RerunEvent(String clusterName, String wfId, long msgInsertTime, long delay,
+ String entityType, String entityName, String instance, int runId) {
this.clusterName = clusterName;
this.wfId = wfId;
this.msgInsertTime = msgInsertTime;
@@ -50,6 +56,7 @@ public class RerunEvent implements Delayed {
this.runId = runId;
this.entityType = entityType;
}
+ //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
public String getClusterName() {
return clusterName;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
index c5e1e80..54bbecf 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
@@ -22,6 +22,11 @@ import org.apache.falcon.rerun.event.RerunEvent.RerunType;
import java.util.HashMap;
import java.util.Map;
+/**
+ * Factory implementation to dole out specific implementations of RerunEvent.
+ *
+ * @param <T>
+ */
public class RerunEventFactory<T extends RerunEvent> {
public T getRerunEvent(String type, String line) {
@@ -38,9 +43,8 @@ public class RerunEventFactory<T extends RerunEvent> {
private T lateEventFromString(String line) {
Map<String, String> map = getMap(line);
return (T) new LaterunEvent(map.get("clusterName"), map.get("wfId"),
- Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map
- .get("delayInMilliSec")), map.get("entityType"),
- map.get("entityName"), map.get("instance"),
+ Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map.get("delayInMilliSec")),
+ map.get("entityType"), map.get("entityName"), map.get("instance"),
Integer.parseInt(map.get("runId")));
}
@@ -48,12 +52,10 @@ public class RerunEventFactory<T extends RerunEvent> {
public T retryEventFromString(String line) {
Map<String, String> map = getMap(line);
return (T) new RetryEvent(map.get("clusterName"), map.get("wfId"),
- Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map
- .get("delayInMilliSec")), map.get("entityType"),
- map.get("entityName"), map.get("instance"),
- Integer.parseInt(map.get("runId")), Integer.parseInt(map
- .get("attempts")), Integer.parseInt(map
- .get("failRetryCount")));
+ Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map.get("delayInMilliSec")),
+ map.get("entityType"), map.get("entityName"), map.get("instance"),
+ Integer.parseInt(map.get("runId")), Integer.parseInt(map.get("attempts")),
+ Integer.parseInt(map.get("failRetryCount")));
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
index 33248b8..44bf96e 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
@@ -17,11 +17,15 @@
*/
package org.apache.falcon.rerun.event;
+/**
+ * Event representing a retry.
+ */
public class RetryEvent extends RerunEvent {
private int attempts;
private int failRetryCount;
+ //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
public RetryEvent(String clusterName, String wfId, long msgInsertTime,
long delay, String entityType, String entityName, String instance,
int runId, int attempts, int failRetryCount) {
@@ -30,6 +34,7 @@ public class RetryEvent extends RerunEvent {
this.attempts = attempts;
this.failRetryCount = failRetryCount;
}
+ //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
public int getAttempts() {
return attempts;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
index fa1d9e3..b073117 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
@@ -26,6 +26,12 @@ import org.apache.falcon.rerun.policy.ExpBackoffPolicy;
import org.apache.falcon.rerun.queue.DelayedQueue;
import org.apache.log4j.Logger;
+/**
+ * Base class for a rerun consumer.
+ *
+ * @param <T> a rerun event
+ * @param <M> a rerun handler
+ */
public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends AbstractRerunHandler<T, DelayedQueue<T>>>
implements Runnable {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index 4a90b9f..8a41bf8 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -27,6 +27,12 @@ import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.log4j.Logger;
+/**
+ * Base class for handling reruns.
+ *
+ * @param <T> a rerun event
+ * @param <M> queue
+ */
public abstract class AbstractRerunHandler<T extends RerunEvent, M extends DelayedQueue<T>> {
protected static final Logger LOG = Logger
@@ -34,9 +40,9 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay
protected M delayQueue;
private AbstractWorkflowEngine wfEngine;
- public void init(M delayQueue) throws FalconException {
+ public void init(M aDelayQueue) throws FalconException {
this.wfEngine = WorkflowEngineFactory.getWorkflowEngine();
- this.delayQueue = delayQueue;
+ this.delayQueue = aDelayQueue;
this.delayQueue.init();
}
@@ -60,13 +66,11 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay
delayQueue.reconnect();
}
- public Entity getEntity(String entityType, String entityName)
- throws FalconException {
+ public Entity getEntity(String entityType, String entityName) throws FalconException {
return EntityUtil.getEntity(entityType, entityName);
}
public Retry getRetry(Entity entity) throws FalconException {
return EntityUtil.getRetry(entity);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index 03561fc..4088a59 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -31,6 +31,11 @@ import org.apache.hadoop.fs.Path;
import java.util.*;
+/**
+ * A consumer of late reruns.
+ *
+ * @param <T>
+ */
public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEvent>>>
extends AbstractRerunConsumer<LaterunEvent, T> {
@@ -140,5 +145,4 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
return late.detectChanges(lateLogPath, feedSizes, conf);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index e24cc69..40c5d1c 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -40,6 +40,11 @@ import org.apache.hadoop.fs.Path;
import java.util.Date;
+/**
+ * An implementation of handler for late reruns.
+ *
+ * @param <M>
+ */
public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
AbstractRerunHandler<LaterunEvent, M> {
@@ -78,9 +83,8 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
wfId));
if (fs.exists(lateLogPath)) {
boolean deleted = fs.delete(lateLogPath, true);
- if (deleted == true) {
- LOG.info("Successfully deleted late file path:"
- + lateLogPath);
+ if (deleted) {
+ LOG.info("Successfully deleted late file path:" + lateLogPath);
}
}
return;
@@ -102,8 +106,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
}
}
- private long getEventDelay(Entity entity, String nominalTime)
- throws FalconException {
+ private long getEventDelay(Entity entity, String nominalTime) throws FalconException {
Date instanceDate = EntityUtil.parseDateUTC(nominalTime);
LateProcess lateProcess = EntityUtil.getLateProcess(entity);
@@ -135,8 +138,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
return new Date(date.getTime() + milliSecondsToAdd);
}
- public static Date getCutOffTime(Entity entity, String nominalTime)
- throws FalconException {
+ public static Date getCutOffTime(Entity entity, String nominalTime) throws FalconException {
ConfigurationStore store = ConfigurationStore.get();
ExpressionHelper evaluator = ExpressionHelper.get();
@@ -193,8 +195,8 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
}
@Override
- public void init(M delayQueue) throws FalconException {
- super.init(delayQueue);
+ public void init(M aDelayQueue) throws FalconException {
+ super.init(aDelayQueue);
Thread daemon = new Thread(new LateRerunConsumer(this));
daemon.setName("LaterunHandler");
daemon.setDaemon(true);
@@ -211,8 +213,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
}
- public Configuration getConfiguration(String cluster, String wfId)
- throws FalconException {
+ public Configuration getConfiguration(String cluster, String wfId) throws FalconException {
Configuration conf = new Configuration();
conf.set(
CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
@@ -220,5 +221,4 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
AbstractWorkflowEngine.NAME_NODE));
return conf;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java
index ce76842..3abfe5a 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java
@@ -22,26 +22,29 @@ import org.apache.falcon.rerun.event.RerunEvent.RerunType;
import org.apache.falcon.rerun.event.RetryEvent;
import org.apache.falcon.rerun.queue.DelayedQueue;
-public class RerunHandlerFactory {
+/**
+ * A factory implementation to dole out rerun handlers.
+ */
+public final class RerunHandlerFactory {
- private static final RetryHandler<DelayedQueue<RetryEvent>> retryHandler
- = new RetryHandler<DelayedQueue<RetryEvent>>();
- private static final LateRerunHandler<DelayedQueue<LaterunEvent>> lateHandler
- = new LateRerunHandler<DelayedQueue<LaterunEvent>>();
+ private static final RetryHandler<DelayedQueue<RetryEvent>> RETRY_HANDLER
+ = new RetryHandler<DelayedQueue<RetryEvent>>();
+ private static final LateRerunHandler<DelayedQueue<LaterunEvent>> LATE_HANDLER
+ = new LateRerunHandler<DelayedQueue<LaterunEvent>>();
private RerunHandlerFactory() {
-
}
public static AbstractRerunHandler getRerunHandler(RerunType type) {
switch (type) {
- case RETRY:
- return retryHandler;
- case LATE:
- return lateHandler;
- default:
- throw new RuntimeException("Invalid handler:" + type);
- }
+ case RETRY:
+ return RETRY_HANDLER;
+
+ case LATE:
+ return LATE_HANDLER;
+ default:
+ throw new RuntimeException("Invalid handler:" + type);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index c084233..63dade8 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -25,6 +25,11 @@ import org.apache.falcon.util.StartupProperties;
import java.util.Date;
+/**
+ * A consumer of retry events which reruns the workflow in the workflow engine.
+ *
+ * @param <T>
+ */
public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
extends AbstractRerunConsumer<RetryEvent, T> {
@@ -90,9 +95,6 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
Integer.toString(message.getRunId()),
"Failure retry attempts exhausted");
}
-
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index 188c397..5bd8fd8 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -28,6 +28,12 @@ import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
import org.apache.falcon.rerun.policy.RerunPolicyFactory;
import org.apache.falcon.rerun.queue.DelayedQueue;
+/**
+ * An implementation of retry handler that kicks off retries until the
+ * configured attempts are exhausted.
+ *
+ * @param <M>
+ */
public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
AbstractRerunHandler<RetryEvent, M> {
@@ -80,14 +86,12 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
}
@Override
- public void init(M queue) throws FalconException {
- super.init(queue);
+ public void init(M aDelayQueue) throws FalconException {
+ super.init(aDelayQueue);
Thread daemon = new Thread(new RetryConsumer(this));
daemon.setName("RetryHandler");
daemon.setDaemon(true);
daemon.start();
- LOG.info("RetryHandler thread started");
-
+ LOG.info("RetryHandler thread started.");
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/policy/AbstractRerunPolicy.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/policy/AbstractRerunPolicy.java b/rerun/src/main/java/org/apache/falcon/rerun/policy/AbstractRerunPolicy.java
index 4bfbef2..6a30294 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/policy/AbstractRerunPolicy.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/policy/AbstractRerunPolicy.java
@@ -23,21 +23,21 @@ import org.apache.falcon.expression.ExpressionHelper;
import java.util.Date;
+/**
+ * Base class for Rerun Policy.
+ */
public abstract class AbstractRerunPolicy {
- public long getDurationInMilliSec(Frequency frequency)
- throws FalconException {
+ public long getDurationInMilliSec(Frequency frequency) throws FalconException {
ExpressionHelper helper = ExpressionHelper.get();
return helper.evaluate(frequency.toString(), Long.class);
-
}
public static Date addTime(Date date, int milliSecondsToAdd) {
return new Date(date.getTime() + milliSecondsToAdd);
}
- public abstract long getDelay(Frequency delay, int eventNumber)
- throws FalconException;
+ public abstract long getDelay(Frequency delay, int eventNumber) throws FalconException;
public abstract long getDelay(Frequency delay, Date nominaltime,
Date cutOffTime) throws FalconException;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/policy/ExpBackoffPolicy.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/policy/ExpBackoffPolicy.java b/rerun/src/main/java/org/apache/falcon/rerun/policy/ExpBackoffPolicy.java
index b23e014..86832ef 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/policy/ExpBackoffPolicy.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/policy/ExpBackoffPolicy.java
@@ -23,23 +23,22 @@ import org.apache.falcon.expression.ExpressionHelper;
import java.util.Date;
+/**
+ * An implementation of the Rerun policy that backs off exponentially.
+ */
public class ExpBackoffPolicy extends AbstractRerunPolicy {
@Override
- public long getDelay(Frequency delay, int eventNumber)
- throws FalconException {
- return (long) (getDurationInMilliSec(delay) * Math.pow(getPower(),
- eventNumber));
+ public long getDelay(Frequency delay, int eventNumber) throws FalconException {
+ return (long) (getDurationInMilliSec(delay) * Math.pow(getPower(), eventNumber));
}
@Override
- public long getDelay(Frequency delay, Date nominalTime, Date cutOffTime)
- throws FalconException {
+ public long getDelay(Frequency delay, Date nominalTime, Date cutOffTime) throws FalconException {
ExpressionHelper evaluator = ExpressionHelper.get();
Date now = new Date();
Date lateTime = nominalTime;
- long delayMilliSeconds = evaluator.evaluate(delay.toString(),
- Long.class);
+ long delayMilliSeconds = evaluator.evaluate(delay.toString(), Long.class);
int factor = 1;
// TODO we can get rid of this using formula
while (lateTime.compareTo(now) <= 0) {
@@ -49,12 +48,11 @@ public class ExpBackoffPolicy extends AbstractRerunPolicy {
if (lateTime.after(cutOffTime)) {
lateTime = cutOffTime;
}
- return (lateTime.getTime() - nominalTime.getTime());
+ return (lateTime.getTime() - nominalTime.getTime());
}
protected int getPower() {
return 2;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/policy/FinalPolicy.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/policy/FinalPolicy.java b/rerun/src/main/java/org/apache/falcon/rerun/policy/FinalPolicy.java
index 19fe8f7..8dd9c29 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/policy/FinalPolicy.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/policy/FinalPolicy.java
@@ -23,6 +23,9 @@ import org.apache.falcon.entity.v0.Frequency;
import java.util.Date;
+/**
+ * An implementation of the Rerun policy that does rerun only once.
+ */
public class FinalPolicy extends AbstractRerunPolicy {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/policy/PeriodicPolicy.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/policy/PeriodicPolicy.java b/rerun/src/main/java/org/apache/falcon/rerun/policy/PeriodicPolicy.java
index 9bcca26..b8c69d2 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/policy/PeriodicPolicy.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/policy/PeriodicPolicy.java
@@ -17,7 +17,11 @@
*/
package org.apache.falcon.rerun.policy;
+/**
+ * An implementation of the Rerun policy that is periodic.
+ */
public class PeriodicPolicy extends ExpBackoffPolicy {
+
@Override
protected int getPower() {
return 1;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/policy/RerunPolicyFactory.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/policy/RerunPolicyFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/policy/RerunPolicyFactory.java
index 5ee902a..945b6c5 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/policy/RerunPolicyFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/policy/RerunPolicyFactory.java
@@ -19,7 +19,10 @@ package org.apache.falcon.rerun.policy;
import org.apache.falcon.entity.v0.process.PolicyType;
-public class RerunPolicyFactory {
+/**
+ * A factory implementation to dole out rerun/retry policy.
+ */
+public final class RerunPolicyFactory {
private RerunPolicyFactory() {
// factory
@@ -27,15 +30,17 @@ public class RerunPolicyFactory {
public static AbstractRerunPolicy getRetryPolicy(PolicyType latePolicy) {
switch (latePolicy) {
- case PERIODIC:
- return new PeriodicPolicy();
+ case PERIODIC:
+ return new PeriodicPolicy();
+
+ case EXP_BACKOFF:
+ return new ExpBackoffPolicy();
- case EXP_BACKOFF:
- return new ExpBackoffPolicy();
+ case FINAL:
+ return new FinalPolicy();
- case FINAL:
- return new FinalPolicy();
+ default:
+ throw new IllegalArgumentException("Unhandled Retry policy: " + latePolicy);
}
- throw new IllegalArgumentException("Unhandled Retry policy: " + latePolicy);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
index 82dfc22..3fa5282 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
@@ -28,6 +28,11 @@ import javax.jms.*;
import java.util.List;
import java.util.concurrent.TimeUnit;
+/**
+ * An ActiveMQ implementation for DelayedQueue.
+ *
+ * @param <T>
+ */
public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
private ActiveMQConnection connection;
@@ -65,8 +70,8 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
if (connection == null) {
init();
}
- Session session = connection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session;
}
@@ -87,8 +92,6 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
@Override
public void populateQueue(List<T> events) {
- // TODO Auto-generated method stub
-
}
@Override
@@ -127,21 +130,25 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
producer.close();
LOG.info("Producer closed successfully");
} catch (Exception ignore) {
+ LOG.info("Producer could not be closed");
}
+
try {
LOG.info("Attempting to close consumer");
consumer.close();
LOG.info("Consumer closed successfully");
} catch (Exception ignore) {
+ LOG.info("Consumer could not be closed");
}
+
try {
LOG.info("Attempting to close connection");
connection.close();
LOG.info("Connection closed successfully");
} catch (Exception ignore) {
+ LOG.info("Connection could not be closed");
}
init();
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java
index 7e55206..393a8e5 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java
@@ -23,6 +23,10 @@ import org.apache.log4j.Logger;
import java.util.List;
+/**
+ * Base class for a Queue implementation.
+ * @param <T>
+ */
public abstract class DelayedQueue<T extends RerunEvent> {
public static final Logger LOG = Logger.getLogger(DelayedQueue.class);
@@ -35,5 +39,4 @@ public abstract class DelayedQueue<T extends RerunEvent> {
public abstract void init();
public abstract void reconnect() throws FalconException;
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
index 7243f4d..06feb92 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
@@ -28,14 +28,19 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.DelayQueue;
+/**
+ * An in-memory implementation of a DelayedQueue.
+ * @param <T>
+ */
public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> {
public static final Logger LOG = Logger.getLogger(DelayedQueue.class);
- protected DelayQueue<T> QUEUE = new DelayQueue<T>();
+
+ protected DelayQueue<T> delayQueue = new DelayQueue<T>();
private File serializeFilePath;
@Override
public boolean offer(T event) {
- boolean flag = QUEUE.offer(event);
+ boolean flag = delayQueue.offer(event);
beforeRetry(event);
LOG.debug("Enqueued Message:" + event.toString());
return flag;
@@ -45,7 +50,7 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> {
public T take() throws FalconException {
T event;
try {
- event = QUEUE.take();
+ event = delayQueue.take();
LOG.debug("Dequeued Message:" + event.toString());
afterRetry(event);
} catch (InterruptedException e) {
@@ -60,16 +65,14 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> {
public void populateQueue(List<T> events) {
for (T event : events) {
- QUEUE.offer(event);
+ delayQueue.offer(event);
}
-
}
@Override
public void init() {
List<T> events = bootstrap();
populateQueue(events);
-
}
@Override
@@ -135,7 +138,7 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> {
+ rerunFile.getAbsolutePath(), e);
}
}
- return rerunEvents;
+ return rerunEvents;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
index f8edfbc..cdbadaa 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
@@ -29,6 +29,9 @@ import org.apache.log4j.Logger;
import java.io.File;
+/**
+ * A service implementation for Late Rerun initialized at startup.
+ */
public class LateRunService implements FalconService {
private static final Logger LOG = Logger.getLogger(LateRunService.class);
@@ -40,20 +43,18 @@ public class LateRunService implements FalconService {
@Override
public void init() throws FalconException {
- AbstractRerunHandler<LaterunEvent, ActiveMQueue<LaterunEvent>> rerunHandler = RerunHandlerFactory
- .getRerunHandler(RerunType.LATE);
+ AbstractRerunHandler<LaterunEvent, ActiveMQueue<LaterunEvent>> rerunHandler =
+ RerunHandlerFactory.getRerunHandler(RerunType.LATE);
ActiveMQueue<LaterunEvent> queue = new ActiveMQueue<LaterunEvent>(
- StartupProperties
- .get()
- .getProperty("broker.url",
- "failover:(tcp://localhost:61616)?initialReconnectDelay=5000"),
+ StartupProperties.get()
+ .getProperty("broker.url", "failover:(tcp://localhost:61616)?initialReconnectDelay=5000"),
"falcon.late.queue");
rerunHandler.init(queue);
}
@Override
public void destroy() throws FalconException {
- LOG.info("LateRun thread destroyed");
+ LOG.info("LateRun thread destroyed");
}
private File getBasePath() {
@@ -64,7 +65,7 @@ public class LateRunService implements FalconService {
throw new RuntimeException("Unable to initialize late recorder @"
+ basePath);
}
+
return basePath;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/main/java/org/apache/falcon/rerun/service/RetryService.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/service/RetryService.java b/rerun/src/main/java/org/apache/falcon/rerun/service/RetryService.java
index 8a902be..b989acd 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/service/RetryService.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/service/RetryService.java
@@ -30,6 +30,9 @@ import org.apache.log4j.Logger;
import java.io.File;
+/**
+ * A service implementation for Retry initialized at startup.
+ */
public class RetryService implements FalconService {
private static final Logger LOG = Logger.getLogger(RetryService.class);
@@ -41,16 +44,15 @@ public class RetryService implements FalconService {
@Override
public void init() throws FalconException {
- AbstractRerunHandler<RetryEvent, DelayedQueue<RetryEvent>> rerunHandler = RerunHandlerFactory
- .getRerunHandler(RerunType.RETRY);
- InMemoryQueue<RetryEvent> queue = new InMemoryQueue<RetryEvent>(
- getBasePath());
+ AbstractRerunHandler<RetryEvent, DelayedQueue<RetryEvent>> rerunHandler =
+ RerunHandlerFactory.getRerunHandler(RerunType.RETRY);
+ InMemoryQueue<RetryEvent> queue = new InMemoryQueue<RetryEvent>(getBasePath());
rerunHandler.init(queue);
}
@Override
public void destroy() throws FalconException {
- LOG.info("RetryHandler thread destroyed");
+ LOG.info("RetryHandler thread destroyed");
}
private File getBasePath() {
@@ -61,7 +63,7 @@ public class RetryService implements FalconService {
throw new RuntimeException("Unable to initialize retry recorder @"
+ basePath);
}
+
return basePath;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/test/java/org/apache/falcon/rerun/AbstractRerunPolicyTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/AbstractRerunPolicyTest.java b/rerun/src/test/java/org/apache/falcon/rerun/AbstractRerunPolicyTest.java
index be74032..630ef00 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/AbstractRerunPolicyTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/AbstractRerunPolicyTest.java
@@ -27,23 +27,23 @@ import org.testng.annotations.Test;
import java.util.Date;
+/**
+ * Base test class for Rerun Policy.
+ */
public class AbstractRerunPolicyTest {
@Test
- public void TestGetDurationInMillis() throws FalconException {
+ public void testGetDurationInMillis() throws FalconException {
AbstractRerunPolicy policy = new AbstractRerunPolicy() {
@Override
public long getDelay(Frequency delay, Date nominaltime,
Date cutOffTime) throws FalconException {
- // TODO Auto-generated method stub
return 0;
}
@Override
- public long getDelay(Frequency delay, int eventNumber)
- throws FalconException {
- // TODO Auto-generated method stub
+ public long getDelay(Frequency delay, int eventNumber) throws FalconException {
return 0;
}
};
@@ -57,26 +57,26 @@ public class AbstractRerunPolicyTest {
}
@Test
- public void TestExpBackoffPolicy() throws FalconException {
+ public void testExpBackoffPolicy() throws FalconException {
AbstractRerunPolicy backoff = new ExpBackoffPolicy();
long delay = backoff.getDelay(new Frequency("minutes(2)"), 2);
Assert.assertEquals(delay, 480000);
long currentTime = System.currentTimeMillis();
- delay = backoff.getDelay(new Frequency("minutes(2)"), new Date(
- currentTime - 1 * 4 * 60 * 1000), new Date(currentTime + 1 * 60
- * 60 * 1000));
+ delay = backoff.getDelay(new Frequency("minutes(2)"),
+ new Date(currentTime - 1 * 4 * 60 * 1000),
+ new Date(currentTime + 1 * 60 * 60 * 1000));
Assert.assertEquals(delay, 1 * 6 * 60 * 1000);
currentTime = System.currentTimeMillis();
- delay = backoff.getDelay(new Frequency("minutes(1)"), new Date(
- currentTime - 1 * 9 * 60 * 1000), new Date(currentTime + 1 * 60
- * 60 * 1000));
+ delay = backoff.getDelay(new Frequency("minutes(1)"),
+ new Date(currentTime - 1 * 9 * 60 * 1000),
+ new Date(currentTime + 1 * 60 * 60 * 1000));
Assert.assertEquals(delay, 900000);
}
@Test
- public void TestPeriodicPolicy() throws FalconException, InterruptedException {
+ public void testPeriodicPolicy() throws FalconException, InterruptedException {
AbstractRerunPolicy periodic = new PeriodicPolicy();
long delay = periodic.getDelay(new Frequency("minutes(2)"), 2);
Assert.assertEquals(delay, 120000);
@@ -85,9 +85,9 @@ public class AbstractRerunPolicyTest {
long currentTime = System.currentTimeMillis();
//Thread.sleep(1000);
- delay = periodic.getDelay(new Frequency("minutes(3)"), new Date(
- currentTime), new Date(currentTime + 1 * 60
- * 60 * 1000));
+ delay = periodic.getDelay(new Frequency("minutes(3)"),
+ new Date(currentTime),
+ new Date(currentTime + 1 * 60 * 60 * 1000));
Assert.assertEquals(delay, 180000);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateData.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateData.java b/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateData.java
index 6028e10..efd51b1 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateData.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/handler/TestLateData.java
@@ -18,7 +18,6 @@
package org.apache.falcon.rerun.handler;
-import com.sun.jersey.api.client.WebResource;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.store.ConfigurationStore;
@@ -39,14 +38,16 @@ import javax.xml.bind.Unmarshaller;
import java.io.StringWriter;
import java.util.Collection;
+/**
+ * Test class for Late data handling.
+ */
public class TestLateData {
protected static final String FEED_XML = "/feed-template.xml";
- protected static String CLUSTER_XML = "/cluster-template.xml";
+ protected static final String CLUSTER_XML = "/cluster-template.xml";
protected static final String PROCESS_XML = "/process-template.xml";
protected static final String PROCESS_XML2 = "/process-template2.xml";
- protected WebResource service = null;
protected Configuration conf = new Configuration();
@BeforeClass
@@ -75,24 +76,27 @@ public class TestLateData {
ConfigurationStore store = ConfigurationStore.get();
store.remove(type, name);
switch (type) {
- case CLUSTER:
- Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(CLUSTER_XML));
- cluster.setName(name);
- ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(conf.get("fs.default.name"));
- store.publish(type, cluster);
- break;
-
- case FEED:
- Feed feed = (Feed) unmarshaller.unmarshal(this.getClass().getResource(FEED_XML));
- feed.setName(name);
- store.publish(type, feed);
- break;
-
- case PROCESS:
- Process process = (Process) unmarshaller.unmarshal(this.getClass().getResource(PROCESS_XML));
- process.setName(name);
- store.publish(type, process);
- break;
+ case CLUSTER:
+ Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(CLUSTER_XML));
+ cluster.setName(name);
+ ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(conf.get("fs.default.name"));
+ store.publish(type, cluster);
+ break;
+
+ case FEED:
+ Feed feed = (Feed) unmarshaller.unmarshal(this.getClass().getResource(FEED_XML));
+ feed.setName(name);
+ store.publish(type, feed);
+ break;
+
+ case PROCESS:
+ Process process = (Process) unmarshaller.unmarshal(this.getClass().getResource(PROCESS_XML));
+ process.setName(name);
+ store.publish(type, process);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Invalid entity type: " + type);
}
}
@@ -113,82 +117,78 @@ public class TestLateData {
store.publish(EntityType.PROCESS, process);
}
- public String marshallEntity(final Entity entity) throws FalconException,
- JAXBException {
+ public String marshallEntity(final Entity entity) throws FalconException, JAXBException {
Marshaller marshaller = entity.getEntityType().getMarshaller();
StringWriter stringWriter = new StringWriter();
marshaller.marshal(entity, stringWriter);
return stringWriter.toString();
}
-// @Test
-// private void TestLateWhenInstanceRunning() throws Exception
-// {
-// try{
-// WorkflowEngine engine = Mockito.mock(WorkflowEngine.class);
-// when(engine.instanceStatus("testCluster", "123")).thenReturn("RUNNING");
-//
-// ConfigurationStore store = ConfigurationStore.get();
-// setup();
-// String nominalTime = EntityUtil.formatDateUTC(new Date(System.currentTimeMillis() - 1800000));
-// InMemoryQueue<LaterunEvent> queue = new InMemoryQueue<LaterunEvent>(new File("target/late"));
-// latedataHandler.init(queue);
-//
-// AbstractRerunHandler handle = RerunHandlerFactory.getRerunHandler(RerunEvent.RerunType.LATE);
-// handle.handleRerun("sample", nominalTime, "123", "123", engine, System.currentTimeMillis());
-//
-// File directory = new File("target/late");
-// File[] files = directory.listFiles();
-// int noFilesBefore = files.length;
-//
-// Thread.sleep(90000);
-//
-// files = directory.listFiles();
-// int noFilesAfterRetry = files.length;
-// Assert.assertNotSame(noFilesBefore, noFilesAfterRetry);
-// }
-// catch (Exception e){
-// Assert.fail("Not expecting any exception");
-// }
-//
-// }
-//
-//
-// @Test
-// private void TestLateWhenDataPresent() throws Exception {
-// WorkflowEngine engine = Mockito.mock(WorkflowEngine.class);
-// when(engine.instanceStatus("testCluster", "123")).thenReturn(
-// "SUCCEEDED");
-//
-// LateRerunConsumer consumer = Mockito.mock(LateRerunConsumer.class);
-// when(consumer.detectLate(Mockito.any(LaterunEvent.class))).thenReturn(
-// "new data found");
-//
-// String nominalTime = EntityUtil.formatDateUTC(new Date(System
-// .currentTimeMillis() - 1800000));
-// AbstractRerunHandler handle = RerunHandlerFactory
-// .getRerunHandler(RerunEvent.RerunType.LATE);
-//
-// ConfigurationStore store = ConfigurationStore.get();
-// setup();
-//
-// InMemoryQueue<LaterunEvent> queue = new InMemoryQueue<LaterunEvent>(
-// new File("target/late"));
-// latedataHandler.init(queue);
-//
-// handle.handleRerun("sample", nominalTime, "123", "123", engine,
-// System.currentTimeMillis());
-//
-// File directory = new File("target/late");
-// File[] files = directory.listFiles();
-// int noFilesBefore = files.length;
-//
-// Thread.sleep(90000);
-//
-// files = directory.listFiles();
-// int noFilesAfterRetry = files.length;
-// Assert.assertNotSame(noFilesBefore, noFilesAfterRetry);
-//
-// }
+/*
+ @Test
+ private void TestLateWhenInstanceRunning() throws Exception {
+ try {
+ WorkflowEngine engine = Mockito.mock(WorkflowEngine.class);
+ when(engine.instanceStatus("testCluster", "123")).thenReturn("RUNNING");
+
+ ConfigurationStore store = ConfigurationStore.get();
+ setup();
+ String nominalTime = EntityUtil.formatDateUTC(new Date(System.currentTimeMillis() - 1800000));
+ InMemoryQueue<LaterunEvent> queue = new InMemoryQueue<LaterunEvent>(new File("target/late"));
+ latedataHandler.init(queue);
+
+ AbstractRerunHandler handle = RerunHandlerFactory.getRerunHandler(RerunEvent.RerunType.LATE);
+ handle.handleRerun("sample", nominalTime, "123", "123", engine, System.currentTimeMillis());
+
+ File directory = new File("target/late");
+ File[] files = directory.listFiles();
+ int noFilesBefore = files.length;
+
+ Thread.sleep(90000);
+
+ files = directory.listFiles();
+ int noFilesAfterRetry = files.length;
+ Assert.assertNotSame(noFilesBefore, noFilesAfterRetry);
+ } catch (Exception e) {
+ Assert.fail("Not expecting any exception");
+ }
+ }
+
+ @Test
+ private void TestLateWhenDataPresent() throws Exception {
+ WorkflowEngine engine = Mockito.mock(WorkflowEngine.class);
+ when(engine.instanceStatus("testCluster", "123")).thenReturn(
+ "SUCCEEDED");
+
+ LateRerunConsumer consumer = Mockito.mock(LateRerunConsumer.class);
+ when(consumer.detectLate(Mockito.any(LaterunEvent.class))).thenReturn(
+ "new data found");
+ String nominalTime = EntityUtil.formatDateUTC(new Date(System
+ .currentTimeMillis() - 1800000));
+ AbstractRerunHandler handle = RerunHandlerFactory
+ .getRerunHandler(RerunEvent.RerunType.LATE);
+
+ ConfigurationStore store = ConfigurationStore.get();
+ setup();
+
+ InMemoryQueue<LaterunEvent> queue = new InMemoryQueue<LaterunEvent>(
+ new File("target/late"));
+ latedataHandler.init(queue);
+
+ handle.handleRerun("sample", nominalTime, "123", "123", engine,
+ System.currentTimeMillis());
+
+ File directory = new File("target/late");
+ File[] files = directory.listFiles();
+ int noFilesBefore = files.length;
+
+ Thread.sleep(90000);
+
+ files = directory.listFiles();
+ int noFilesAfterRetry = files.length;
+ Assert.assertNotSame(noFilesBefore, noFilesAfterRetry);
+
+ }
+*/
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
index a13aa02..a8f3885 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
@@ -24,11 +24,14 @@ import org.apache.falcon.rerun.event.RerunEvent;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+/**
+ * Test class for ActiveMQ delayed queue implementation.
+ */
public class ActiveMQTest {
private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
private BrokerService broker;
- private static final String DESTI = "activemq.topic";
+ private static final String DESTINATION = "activemq.topic";
@BeforeClass
private void setup() throws Exception {
@@ -43,8 +46,9 @@ public class ActiveMQTest {
@Test
public void testBrokerStartAndEnqueue() {
ActiveMQueue<RerunEvent> activeMQueue = new ActiveMQueue<RerunEvent>(
- BROKER_URL, DESTI);
+ BROKER_URL, DESTINATION);
activeMQueue.init();
+
RerunEvent event = new LaterunEvent("clusterName", "wfId",
System.currentTimeMillis(), 60 * 1000, "entityType",
"entityName", "instance", 0);
@@ -58,7 +62,5 @@ public class ActiveMQTest {
} catch (Exception e) {
Assert.fail();
}
-
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6a39baf3/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
index d41453b..6aafaa5 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
@@ -25,6 +25,9 @@ import org.testng.annotations.Test;
import java.io.File;
import java.util.LinkedList;
+/**
+ * Test class for the InMemory Queue implementation of the DelayedQueue.
+ */
public class InMemoryQueueTest {
@Test(timeOut = 10000)
@@ -47,8 +50,8 @@ public class InMemoryQueueTest {
boolean inserted = false;
for (int posn = 0; posn < events.size(); posn++) {
MyEvent thisEvent = events.get(posn);
- if (thisEvent.getDelayInMilliSec() + thisEvent.getMsgInsertTime() >
- event.getDelayInMilliSec() + event.getMsgInsertTime()) {
+ if (thisEvent.getDelayInMilliSec() + thisEvent.getMsgInsertTime()
+ > event.getDelayInMilliSec() + event.getMsgInsertTime()) {
events.add(posn, event);
inserted = true;
break;
@@ -67,12 +70,14 @@ public class InMemoryQueueTest {
private class MyEvent extends RerunEvent {
+ //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
public MyEvent(String clusterName, String wfId,
long msgInsertTime, long delay, String entityType,
String entityName, String instance, int runId) {
super(clusterName, wfId, msgInsertTime, delay,
entityType, entityName, instance, runId);
}
+ //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
@Override
public RerunType getType() {