You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2015/09/09 11:01:35 UTC
falcon git commit: FALCON-348 Add shutdown hook for Falcon
(Contributed by Sandeep Samudrala)
Repository: falcon
Updated Branches:
refs/heads/master 4ad28f630 -> 6bbfe2366
FALCON-348 Add shutdown hook for Falcon (Contributed by Sandeep Samudrala)
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/6bbfe236
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/6bbfe236
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/6bbfe236
Branch: refs/heads/master
Commit: 6bbfe2366b32d885a5b67fc4accb11ed76b889ca
Parents: 4ad28f6
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Wed Sep 9 14:31:19 2015 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Wed Sep 9 14:31:19 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
prism/src/main/java/org/apache/falcon/Main.java | 19 +++++++++++++++--
.../rerun/handler/AbstractRerunConsumer.java | 22 +++++++++++++-------
.../rerun/handler/AbstractRerunHandler.java | 4 ++++
.../falcon/rerun/handler/LateRerunHandler.java | 10 ++++++++-
.../falcon/rerun/handler/RetryHandler.java | 9 +++++++-
.../apache/falcon/rerun/queue/ActiveMQueue.java | 3 ---
.../falcon/rerun/service/LateRunService.java | 6 ++++--
8 files changed, 57 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 30f2b8c..196490d 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,7 @@ Trunk (Unreleased)
FALCON-1250 Throw error when keys in startup.properties do not start with "*." or domain+"."(Narayan Periwal via Ajay Yadava)
+ FALCON-348 Add shutdown hook for Falcon (Sandeep Samudrala via Pallavi Rao)
OPTIMIZATIONS
http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/prism/src/main/java/org/apache/falcon/Main.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/Main.java b/prism/src/main/java/org/apache/falcon/Main.java
index 96e003c..d8bbfbd 100644
--- a/prism/src/main/java/org/apache/falcon/Main.java
+++ b/prism/src/main/java/org/apache/falcon/Main.java
@@ -38,6 +38,8 @@ public final class Main {
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
private static final String APP_PATH = "app";
private static final String APP_PORT = "port";
+ private static EmbeddedServer server;
+ private static BrokerService broker;
/**
* Prevent users from constructing this.
@@ -60,7 +62,19 @@ public final class Main {
return new GnuParser().parse(options, args);
}
+ static class ShutDown extends Thread {
+ public void run() {
+ try {
+ LOG.info("calling shutdown hook");
+ server.stop();
+ broker.stop();
+ } catch (Exception e) {
+ LOG.error("Server shutdown failed with " , e);
+ }
+ }
+ }
public static void main(String[] args) throws Exception {
+ Runtime.getRuntime().addShutdownHook(new ShutDown());
CommandLine cmd = parseArgs(args);
String projectVersion = BuildProperties.get().getProperty("project.version");
String appPath = "webapp/target/falcon-webapp-" + projectVersion;
@@ -79,7 +93,7 @@ public final class Main {
LOG.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
LOG.info("Server starting with TLS ? {} on port {}", enableTLS, appPort);
LOG.info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
- EmbeddedServer server = EmbeddedServer.newServer(appPort, appPath, enableTLS);
+ server = EmbeddedServer.newServer(appPort, appPath, enableTLS);
server.start();
}
@@ -109,12 +123,13 @@ public final class Main {
int mqport = Integer.valueOf(System.getProperty("falcon.embeddedmq.port", "61616"));
LOG.info("Starting ActiveMQ at port {} with data dir {}", mqport, dataDir);
- BrokerService broker = new BrokerService();
+ broker = new BrokerService();
broker.setUseJmx(false);
broker.setDataDirectory(dataDir);
broker.addConnector("vm://localhost");
broker.addConnector("tcp://0.0.0.0:" + mqport);
broker.setSchedulerSupport(true);
+ broker.setUseShutdownHook(false);
broker.start();
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
index 9ee94c5..582cb15 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
@@ -17,6 +17,7 @@
*/
package org.apache.falcon.rerun.handler;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.aspect.GenericAlert;
import org.apache.falcon.entity.v0.Frequency;
@@ -50,20 +51,25 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
int attempt = 1;
AbstractRerunPolicy policy = new ExpBackoffPolicy();
Frequency frequency = new Frequency("minutes(1)");
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
T message;
try {
message = handler.takeFromQueue();
attempt = 1;
} catch (FalconException e) {
- LOG.error("Error while reading message from the queue", e);
- GenericAlert.alertRerunConsumerFailed(
- "Error while reading message from the queue: ", e);
- Thread.sleep(policy.getDelay(frequency, attempt));
- handler.reconnect();
- attempt++;
- continue;
+ if (ExceptionUtils.getRootCause(e) instanceof InterruptedException){
+ LOG.info("Rerun handler daemon has been interrupted");
+ return;
+ } else {
+ LOG.error("Error while reading message from the queue", e);
+ GenericAlert.alertRerunConsumerFailed(
+ "Error while reading message from the queue: ", e);
+ Thread.sleep(policy.getDelay(frequency, attempt));
+ handler.reconnect();
+ attempt++;
+ continue;
+ }
}
// Login the user to access WfEngine as this user
http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index f019737..64c566e 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -48,6 +48,10 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay
this.delayQueue.init();
}
+ public void close() throws FalconException {
+ this.delayQueue.close();
+ }
+
//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
public abstract void handleRerun(String clusterName, String entityType,
String entityName, String nominalTime, String runId,
http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index c2cb09e..785dce8 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -52,6 +52,7 @@ import java.util.Date;
*/
public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
AbstractRerunHandler<LaterunEvent, M> {
+ private Thread daemon;
@Override
//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
@@ -188,13 +189,20 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
@Override
public void init(M aDelayQueue) throws FalconException {
super.init(aDelayQueue);
- Thread daemon = new Thread(new LateRerunConsumer(this));
+ daemon = new Thread(new LateRerunConsumer(this));
daemon.setName("LaterunHandler");
daemon.setDaemon(true);
daemon.start();
LOG.info("Laterun Handler thread started");
}
+ @Override
+ public void close() throws FalconException {
+ daemon.interrupt();
+ super.close();
+ }
+
+
public Path getLateLogPath(String logDir, String nominalTime,
String srcClusterName) {
//SrcClusterName valid only in case of feed
http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index c6bc36f..b952bbe 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -38,6 +38,7 @@ import org.apache.falcon.workflow.WorkflowExecutionContext;
*/
public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
AbstractRerunHandler<RetryEvent, M> {
+ private Thread daemon;
@Override
//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
@@ -85,7 +86,7 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
@Override
public void init(M aDelayQueue) throws FalconException {
super.init(aDelayQueue);
- Thread daemon = new Thread(new RetryConsumer(this));
+ daemon = new Thread(new RetryConsumer(this));
daemon.setName("RetryHandler");
daemon.setDaemon(true);
daemon.start();
@@ -93,6 +94,12 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
}
@Override
+ public void close() throws FalconException {
+ daemon.interrupt();
+ super.close();
+ }
+
+ @Override
public void onSuccess(WorkflowExecutionContext context) throws FalconException {
// do nothing since retry does not apply for failed workflows
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
index 021e4cc..3168c31 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
@@ -68,7 +68,6 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
event.toString(), event.getDelay(TimeUnit.MILLISECONDS));
return true;
} catch (Exception e) {
- LOG.error("Unable to offer event: {} to ActiveMQ", event, e);
throw new FalconException("Unable to offer event:" + event + " to ActiveMQ", e);
}
}
@@ -91,7 +90,6 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
LOG.debug("Dequeued Message: {}", event.toString());
return event;
} catch (Exception e) {
- LOG.error("Error getting the message from ActiveMQ", e);
throw new FalconException("Error getting the message from ActiveMQ: ", e);
}
}
@@ -111,7 +109,6 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
consumer = session.createConsumer(destination);
LOG.info("Initialized Queue on ActiveMQ: {}", destinationName);
} catch (Exception e) {
- LOG.error("Error starting ActiveMQ connection for delayed queue", e);
throw new RuntimeException("Error starting ActiveMQ connection for delayed queue", e);
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
index 2bb198b..8be6810 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
@@ -39,6 +39,8 @@ public class LateRunService implements FalconService {
private ActiveMQueue<LaterunEvent> queue;
+ private AbstractRerunHandler<LaterunEvent, ActiveMQueue<LaterunEvent>> rerunHandler;
+
@Override
public String getName() {
return LateRunService.class.getName();
@@ -50,8 +52,7 @@ public class LateRunService implements FalconService {
throw new FalconException("WorkflowJobEndNotificationService must be configured ahead");
}
- AbstractRerunHandler<LaterunEvent, ActiveMQueue<LaterunEvent>> rerunHandler =
- RerunHandlerFactory.getRerunHandler(RerunType.LATE);
+ rerunHandler = RerunHandlerFactory.getRerunHandler(RerunType.LATE);
queue = new ActiveMQueue<LaterunEvent>(
StartupProperties.get()
.getProperty("broker.url", "failover:(tcp://localhost:61616)?initialReconnectDelay=5000"),
@@ -64,6 +65,7 @@ public class LateRunService implements FalconService {
@Override
public void destroy() throws FalconException {
+ rerunHandler.close();
closeQuietly();
LOG.info("LateRun thread destroyed");
}