You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2015/09/09 11:01:35 UTC

falcon git commit: FALCON-348 Add shutdown hook for Falcon (Contributed by Sandeep Samudrala)

Repository: falcon
Updated Branches:
  refs/heads/master 4ad28f630 -> 6bbfe2366


FALCON-348 Add shutdown hook for Falcon (Contributed by Sandeep Samudrala)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/6bbfe236
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/6bbfe236
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/6bbfe236

Branch: refs/heads/master
Commit: 6bbfe2366b32d885a5b67fc4accb11ed76b889ca
Parents: 4ad28f6
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Wed Sep 9 14:31:19 2015 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Wed Sep 9 14:31:19 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 prism/src/main/java/org/apache/falcon/Main.java | 19 +++++++++++++++--
 .../rerun/handler/AbstractRerunConsumer.java    | 22 +++++++++++++-------
 .../rerun/handler/AbstractRerunHandler.java     |  4 ++++
 .../falcon/rerun/handler/LateRerunHandler.java  | 10 ++++++++-
 .../falcon/rerun/handler/RetryHandler.java      |  9 +++++++-
 .../apache/falcon/rerun/queue/ActiveMQueue.java |  3 ---
 .../falcon/rerun/service/LateRunService.java    |  6 ++++--
 8 files changed, 57 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 30f2b8c..196490d 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,7 @@ Trunk (Unreleased)
     
     FALCON-1250 Throw error when keys in startup.properties do not start with "*." or domain+"."(Narayan Periwal via Ajay Yadava)
 
+    FALCON-348 Add shutdown hook for Falcon (Sandeep Samudrala via Pallavi Rao)
  
   OPTIMIZATIONS
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/prism/src/main/java/org/apache/falcon/Main.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/Main.java b/prism/src/main/java/org/apache/falcon/Main.java
index 96e003c..d8bbfbd 100644
--- a/prism/src/main/java/org/apache/falcon/Main.java
+++ b/prism/src/main/java/org/apache/falcon/Main.java
@@ -38,6 +38,8 @@ public final class Main {
     private static final Logger LOG = LoggerFactory.getLogger(Main.class);
     private static final String APP_PATH = "app";
     private static final String APP_PORT = "port";
+    private static EmbeddedServer server;
+    private static BrokerService broker;
 
     /**
      * Prevent users from constructing this.
@@ -60,7 +62,19 @@ public final class Main {
         return new GnuParser().parse(options, args);
     }
 
+    static class ShutDown extends Thread {
+        public void run() {
+            try {
+                LOG.info("calling shutdown hook");
+                server.stop();
+                broker.stop();
+            } catch (Exception e) {
+                LOG.error("Server shutdown failed with " , e);
+            }
+        }
+    }
     public static void main(String[] args) throws Exception {
+        Runtime.getRuntime().addShutdownHook(new ShutDown());
         CommandLine cmd = parseArgs(args);
         String projectVersion = BuildProperties.get().getProperty("project.version");
         String appPath = "webapp/target/falcon-webapp-" + projectVersion;
@@ -79,7 +93,7 @@ public final class Main {
         LOG.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
         LOG.info("Server starting with TLS ? {} on port {}", enableTLS, appPort);
         LOG.info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
-        EmbeddedServer server = EmbeddedServer.newServer(appPort, appPath, enableTLS);
+        server = EmbeddedServer.newServer(appPort, appPath, enableTLS);
         server.start();
     }
 
@@ -109,12 +123,13 @@ public final class Main {
             int mqport = Integer.valueOf(System.getProperty("falcon.embeddedmq.port", "61616"));
             LOG.info("Starting ActiveMQ at port {} with data dir {}", mqport, dataDir);
 
-            BrokerService broker = new BrokerService();
+            broker = new BrokerService();
             broker.setUseJmx(false);
             broker.setDataDirectory(dataDir);
             broker.addConnector("vm://localhost");
             broker.addConnector("tcp://0.0.0.0:" + mqport);
             broker.setSchedulerSupport(true);
+            broker.setUseShutdownHook(false);
             broker.start();
         }
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
index 9ee94c5..582cb15 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.rerun.handler;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.GenericAlert;
 import org.apache.falcon.entity.v0.Frequency;
@@ -50,20 +51,25 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
         int attempt = 1;
         AbstractRerunPolicy policy = new ExpBackoffPolicy();
         Frequency frequency = new Frequency("minutes(1)");
-        while (true) {
+        while (!Thread.currentThread().isInterrupted()) {
             try {
                 T message;
                 try {
                     message = handler.takeFromQueue();
                     attempt = 1;
                 } catch (FalconException e) {
-                    LOG.error("Error while reading message from the queue", e);
-                    GenericAlert.alertRerunConsumerFailed(
-                            "Error while reading message from the queue: ", e);
-                    Thread.sleep(policy.getDelay(frequency, attempt));
-                    handler.reconnect();
-                    attempt++;
-                    continue;
+                    if (ExceptionUtils.getRootCause(e) instanceof InterruptedException){
+                        LOG.info("Rerun handler daemon has been interrupted");
+                        return;
+                    } else {
+                        LOG.error("Error while reading message from the queue", e);
+                        GenericAlert.alertRerunConsumerFailed(
+                                "Error while reading message from the queue: ", e);
+                        Thread.sleep(policy.getDelay(frequency, attempt));
+                        handler.reconnect();
+                        attempt++;
+                        continue;
+                    }
                 }
 
                 // Login the user to access WfEngine as this user

http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index f019737..64c566e 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -48,6 +48,10 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay
         this.delayQueue.init();
     }
 
+    public void close() throws FalconException {
+        this.delayQueue.close();
+    }
+
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public abstract void handleRerun(String clusterName, String entityType,
                                      String entityName, String nominalTime, String runId,

http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index c2cb09e..785dce8 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -52,6 +52,7 @@ import java.util.Date;
  */
 public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
         AbstractRerunHandler<LaterunEvent, M> {
+    private Thread daemon;
 
     @Override
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
@@ -188,13 +189,20 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
     @Override
     public void init(M aDelayQueue) throws FalconException {
         super.init(aDelayQueue);
-        Thread daemon = new Thread(new LateRerunConsumer(this));
+        daemon = new Thread(new LateRerunConsumer(this));
         daemon.setName("LaterunHandler");
         daemon.setDaemon(true);
         daemon.start();
         LOG.info("Laterun Handler thread started");
     }
 
+    @Override
+    public void close() throws FalconException {
+        daemon.interrupt();
+        super.close();
+    }
+
+
     public Path getLateLogPath(String logDir, String nominalTime,
                                String srcClusterName) {
         //SrcClusterName valid only in case of feed

http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index c6bc36f..b952bbe 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -38,6 +38,7 @@ import org.apache.falcon.workflow.WorkflowExecutionContext;
  */
 public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
         AbstractRerunHandler<RetryEvent, M> {
+    private Thread daemon;
 
     @Override
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
@@ -85,7 +86,7 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
     @Override
     public void init(M aDelayQueue) throws FalconException {
         super.init(aDelayQueue);
-        Thread daemon = new Thread(new RetryConsumer(this));
+        daemon = new Thread(new RetryConsumer(this));
         daemon.setName("RetryHandler");
         daemon.setDaemon(true);
         daemon.start();
@@ -93,6 +94,12 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
     }
 
     @Override
+    public void close() throws FalconException {
+        daemon.interrupt();
+        super.close();
+    }
+
+    @Override
     public void onSuccess(WorkflowExecutionContext context) throws FalconException {
         // do nothing since retry does not apply for failed workflows
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
index 021e4cc..3168c31 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
@@ -68,7 +68,6 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
                     event.toString(), event.getDelay(TimeUnit.MILLISECONDS));
             return true;
         } catch (Exception e) {
-            LOG.error("Unable to offer event: {} to ActiveMQ", event, e);
             throw new FalconException("Unable to offer event:" + event + " to ActiveMQ", e);
         }
     }
@@ -91,7 +90,6 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
             LOG.debug("Dequeued Message: {}", event.toString());
             return event;
         } catch (Exception e) {
-            LOG.error("Error getting the message from ActiveMQ", e);
             throw new FalconException("Error getting the message from ActiveMQ: ", e);
         }
     }
@@ -111,7 +109,6 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
             consumer = session.createConsumer(destination);
             LOG.info("Initialized Queue on ActiveMQ: {}", destinationName);
         } catch (Exception e) {
-            LOG.error("Error starting ActiveMQ connection for delayed queue", e);
             throw new RuntimeException("Error starting ActiveMQ connection for delayed queue", e);
         }
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
index 2bb198b..8be6810 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
@@ -39,6 +39,8 @@ public class LateRunService implements FalconService {
 
     private ActiveMQueue<LaterunEvent> queue;
 
+    private AbstractRerunHandler<LaterunEvent, ActiveMQueue<LaterunEvent>> rerunHandler;
+
     @Override
     public String getName() {
         return LateRunService.class.getName();
@@ -50,8 +52,7 @@ public class LateRunService implements FalconService {
             throw new FalconException("WorkflowJobEndNotificationService must be configured ahead");
         }
 
-        AbstractRerunHandler<LaterunEvent, ActiveMQueue<LaterunEvent>> rerunHandler =
-            RerunHandlerFactory.getRerunHandler(RerunType.LATE);
+        rerunHandler = RerunHandlerFactory.getRerunHandler(RerunType.LATE);
         queue = new ActiveMQueue<LaterunEvent>(
                 StartupProperties.get()
                     .getProperty("broker.url", "failover:(tcp://localhost:61616)?initialReconnectDelay=5000"),
@@ -64,6 +65,7 @@ public class LateRunService implements FalconService {
 
     @Override
     public void destroy() throws FalconException {
+        rerunHandler.close();
         closeQuietly();
         LOG.info("LateRun thread destroyed");
     }