You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/13 00:46:35 UTC

svn commit: r1157283 - in /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle: LifecycleController.java LifecycleSupervisor.java

Author: esammer
Date: Fri Aug 12 22:46:34 2011
New Revision: 1157283

URL: http://svn.apache.org/viewvc?rev=1157283&view=rev
Log:
- Added support for stopping a list of services in LifecycleController.
- Moved monitor check runnable to a defined class within LifecycleSupervisor.
- Node monitor checks can now run in parallel (scheduled separately), up to 5 at a time.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java?rev=1157283&r1=1157282&r2=1157283&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java Fri Aug 12 22:46:34 2011
@@ -1,5 +1,7 @@
 package org.apache.flume.lifecycle;
 
+import java.util.List;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,8 +33,10 @@ public class LifecycleController {
   public static boolean waitForOneOf(LifecycleAware delegate,
       LifecycleState[] states, long timeout) throws InterruptedException {
 
-    logger.debug("Waiting for state {} for delegate:{} up to {}ms",
-        new Object[] { states, delegate, timeout });
+    if (logger.isDebugEnabled()) {
+      logger.debug("Waiting for state {} for delegate:{} up to {}ms",
+          new Object[] { states, delegate, timeout });
+    }
 
     long sleepInterval = Math.max(shortestSleepDuration, timeout
         / maxNumberOfChecks);
@@ -55,4 +59,12 @@ public class LifecycleController {
     return false;
   }
 
+  public static void stopAll(List<LifecycleAware> services)
+      throws InterruptedException {
+
+    for (LifecycleAware service : services) {
+      waitForOneOf(service, LifecycleState.STOP_OR_ERROR);
+    }
+  }
+
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java?rev=1157283&r1=1157282&r2=1157283&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java Fri Aug 12 22:46:34 2011
@@ -1,5 +1,6 @@
 package org.apache.flume.lifecycle;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -27,7 +28,7 @@ public class LifecycleSupervisor impleme
     lifecycleState = LifecycleState.IDLE;
     supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();
     monitorService = Executors.newScheduledThreadPool(
-        3,
+        5,
         new ThreadFactoryBuilder().setNameFormat(
             "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
             .build());
@@ -42,87 +43,21 @@ public class LifecycleSupervisor impleme
 
     /* FIXME: Teasing this apart into manageable chunks. */
 
-    Runnable monitorCheckRunnable = new Runnable() {
+    synchronized (supervisedProcesses) {
+      logger.debug("Checking status of all processes");
 
-      @Override
-      public void run() {
-        synchronized (supervisedProcesses) {
-          logger.debug("Checking status of all processes");
-
-          for (Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
-              .entrySet()) {
-
-            LifecycleAware lifecycleAware = entry.getKey();
-            Supervisoree supervisoree = entry.getValue();
-
-            logger.debug("checking process:{} supervisoree:{}", lifecycleAware,
-                supervisoree);
-
-            long now = System.currentTimeMillis();
-
-            if (supervisoree.status.firstSeen == null) {
-              logger.debug("first time seeing {}", lifecycleAware);
-
-              supervisoree.status.firstSeen = now;
-            }
-
-            supervisoree.status.lastSeen = now;
-            supervisoree.status.lastSeenState = lifecycleAware
-                .getLifecycleState();
-
-            if (!lifecycleAware.getLifecycleState().equals(
-                supervisoree.status.desiredState)) {
-
-              logger.debug("Want to transition {} from {} to {} (failures:{})",
-                  new Object[] { lifecycleAware,
-                      supervisoree.status.lastSeenState,
-                      supervisoree.status.desiredState,
-                      supervisoree.status.failures });
-
-              Context context = new Context();
-
-              switch (supervisoree.status.desiredState) {
-              case START:
-                try {
-                  lifecycleAware.start(context);
-                } catch (Exception e) {
-                  logger.error("Unable to start " + lifecycleAware
-                      + " - Exception follows.", e);
-                  supervisoree.status.failures++;
-                }
-                break;
-              case STOP:
-                try {
-                  lifecycleAware.stop(context);
-                } catch (Exception e) {
-                  logger.error("Unable to stop " + lifecycleAware
-                      + " - Exception follows.", e);
-                  supervisoree.status.failures++;
-                }
-                break;
-              default:
-                logger.warn("I refuse to acknowledge {} as a desired state",
-                    supervisoree.status.desiredState);
-              }
-
-              if (!supervisoree.policy.isValid(lifecycleAware,
-                  supervisoree.status)) {
-                logger
-                    .error(
-                        "Policy {} of {} has been violated - supervisor should exit!",
-                        supervisoree.policy, lifecycleAware);
-              }
-            }
-          }
+      for (Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
+          .entrySet()) {
 
-          logger.debug("Status check complete");
-        }
-      }
+        MonitorRunnable monitorCheckRunnable = new MonitorRunnable();
 
-    };
+        monitorCheckRunnable.lifecycleAware = entry.getKey();
+        monitorCheckRunnable.supervisoree = entry.getValue();
 
-    monitorService.scheduleAtFixedRate(monitorCheckRunnable, 0, 3,
-        TimeUnit.SECONDS);
+        monitorService.scheduleAtFixedRate(monitorCheckRunnable, 0, 3,
+            TimeUnit.SECONDS);
+      }
+    }
 
     lifecycleState = LifecycleState.START;
 
@@ -159,6 +94,18 @@ public class LifecycleSupervisor impleme
     logger.debug("Lifecycle supervisor stopped");
   }
 
+  public void fail() {
+    try {
+      LifecycleController.stopAll(new ArrayList<LifecycleAware>(
+          supervisedProcesses.keySet()));
+    } catch (InterruptedException e) {
+      logger
+          .warn("Interrupted while stopping all outstanding supervised processes");
+    }
+
+    lifecycleState = LifecycleState.ERROR;
+  }
+
   public synchronized void supervise(LifecycleAware lifecycleAware,
       SupervisorPolicy policy, LifecycleState desiredState) {
 
@@ -168,7 +115,12 @@ public class LifecycleSupervisor impleme
     process.policy = policy;
     process.status.desiredState = desiredState;
 
+    MonitorRunnable monitorRunnable = new MonitorRunnable();
+    monitorRunnable.lifecycleAware = lifecycleAware;
+    monitorRunnable.supervisoree = process;
+
     supervisedProcesses.put(lifecycleAware, process);
+    monitorService.scheduleAtFixedRate(monitorRunnable, 0, 3, TimeUnit.SECONDS);
   }
 
   @Override
@@ -176,6 +128,75 @@ public class LifecycleSupervisor impleme
     return lifecycleState;
   }
 
+  public static class MonitorRunnable implements Runnable {
+
+    public LifecycleAware lifecycleAware;
+    public Supervisoree supervisoree;
+
+    @Override
+    public void run() {
+      logger.debug("checking process:{} supervisoree:{}", lifecycleAware,
+          supervisoree);
+
+      long now = System.currentTimeMillis();
+
+      if (supervisoree.status.firstSeen == null) {
+        logger.debug("first time seeing {}", lifecycleAware);
+
+        supervisoree.status.firstSeen = now;
+      }
+
+      supervisoree.status.lastSeen = now;
+      supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
+
+      if (!lifecycleAware.getLifecycleState().equals(
+          supervisoree.status.desiredState)) {
+
+        logger
+            .debug("Want to transition {} from {} to {} (failures:{})",
+                new Object[] { lifecycleAware,
+                    supervisoree.status.lastSeenState,
+                    supervisoree.status.desiredState,
+                    supervisoree.status.failures });
+
+        Context context = new Context();
+
+        switch (supervisoree.status.desiredState) {
+        case START:
+          try {
+            lifecycleAware.start(context);
+          } catch (Exception e) {
+            logger.error("Unable to start " + lifecycleAware
+                + " - Exception follows.", e);
+            supervisoree.status.failures++;
+          }
+          break;
+        case STOP:
+          try {
+            lifecycleAware.stop(context);
+          } catch (Exception e) {
+            logger.error("Unable to stop " + lifecycleAware
+                + " - Exception follows.", e);
+            supervisoree.status.failures++;
+          }
+          break;
+        default:
+          logger.warn("I refuse to acknowledge {} as a desired state",
+              supervisoree.status.desiredState);
+        }
+
+        if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
+          logger.error(
+              "Policy {} of {} has been violated - supervisor should exit!",
+              supervisoree.policy, lifecycleAware);
+        }
+      }
+
+      logger.debug("Status check complete");
+    }
+
+  }
+
   public static class Status {
     public Long firstSeen;
     public Long lastSeen;