You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:49:06 UTC

svn commit: r1156923 - /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java

Author: esammer
Date: Fri Aug 12 00:49:06 2011
New Revision: 1156923

URL: http://svn.apache.org/viewvc?rev=1156923&view=rev
Log:
- Added LifecycleSupervisor - like an Akka / Erlang OTP supervisor but for LifecycleAware services.

Added:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java?rev=1156923&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java Fri Aug 12 00:49:06 2011
@@ -0,0 +1,229 @@
+package org.apache.flume.lifecycle;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class LifecycleSupervisor implements LifecycleAware {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(LifecycleSupervisor.class);
+
+  private Map<LifecycleAware, Supervisoree> supervisedProcesses;
+  private ScheduledExecutorService monitorService;
+
+  private LifecycleState lifecycleState;
+
+  public LifecycleSupervisor() {
+    lifecycleState = LifecycleState.IDLE;
+    supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();
+    monitorService = Executors.newScheduledThreadPool(
+        3,
+        new ThreadFactoryBuilder().setNameFormat(
+            "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
+            .build());
+  }
+
+  @Override
+  public void start(Context context) throws LifecycleException,
+      InterruptedException {
+
+    logger.info("Starting lifecycle supervisor {}", Thread.currentThread()
+        .getId());
+
+    /* FIXME: Teasing this apart into manageable chunks. */
+
+    Runnable monitorCheckRunnable = new Runnable() {
+
+      @Override
+      public void run() {
+        synchronized (supervisedProcesses) {
+          logger.debug("Checking status of all processes");
+
+          for (Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
+              .entrySet()) {
+
+            LifecycleAware lifecycleAware = entry.getKey();
+            Supervisoree supervisoree = entry.getValue();
+
+            logger.debug("checking process:{} supervisoree:{}", lifecycleAware,
+                supervisoree);
+
+            long now = System.currentTimeMillis();
+
+            if (supervisoree.status.firstSeen == null) {
+              logger.debug("first time seeing {}", lifecycleAware);
+
+              supervisoree.status.firstSeen = now;
+            }
+
+            supervisoree.status.lastSeen = now;
+            supervisoree.status.lastSeenState = lifecycleAware
+                .getLifecycleState();
+
+            if (!lifecycleAware.getLifecycleState().equals(
+                supervisoree.status.desiredState)) {
+
+              logger.debug("Want to transition {} from {} to {} (failures:{})",
+                  new Object[] { lifecycleAware,
+                      supervisoree.status.lastSeenState,
+                      supervisoree.status.desiredState,
+                      supervisoree.status.failures });
+
+              Context context = new Context();
+
+              switch (supervisoree.status.desiredState) {
+              case START:
+                try {
+                  lifecycleAware.start(context);
+                } catch (Exception e) {
+                  logger.error("Unable to start " + lifecycleAware
+                      + " - Exception follows.", e);
+                  supervisoree.status.failures++;
+                }
+                break;
+              case STOP:
+                try {
+                  lifecycleAware.stop(context);
+                } catch (Exception e) {
+                  logger.error("Unable to stop " + lifecycleAware
+                      + " - Exception follows.", e);
+                  supervisoree.status.failures++;
+                }
+                break;
+              default:
+                logger.warn("I refuse to acknowledge {} as a desired state",
+                    supervisoree.status.desiredState);
+              }
+
+              if (!supervisoree.policy.isValid(lifecycleAware,
+                  supervisoree.status)) {
+                logger
+                    .error(
+                        "Policy {} of {} has been violated - supervisor should exit!",
+                        supervisoree.policy, lifecycleAware);
+              }
+            }
+          }
+
+          logger.debug("Status check complete");
+        }
+      }
+
+    };
+
+    monitorService.scheduleAtFixedRate(monitorCheckRunnable, 0, 3,
+        TimeUnit.SECONDS);
+
+    lifecycleState = LifecycleState.START;
+
+    logger.debug("Lifecycle supervisor started");
+  }
+
+  @Override
+  public void stop(Context context) throws LifecycleException,
+      InterruptedException {
+
+    logger.info("Stopping lifecycle supervisor {}", Thread.currentThread()
+        .getId());
+
+    monitorService.shutdown();
+
+    while (!monitorService.isTerminated()) {
+      monitorService.awaitTermination(500, TimeUnit.MILLISECONDS);
+    }
+
+    synchronized (supervisedProcesses) {
+      for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
+          .entrySet()) {
+
+        if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {
+          entry.getKey().stop(context);
+          LifecycleController.waitForOneOf(entry.getKey(),
+              LifecycleState.STOP_OR_ERROR, 5000);
+        }
+      }
+    }
+
+    lifecycleState = LifecycleState.STOP;
+
+    logger.debug("Lifecycle supervisor stopped");
+  }
+
+  public synchronized void supervise(LifecycleAware lifecycleAware,
+      SupervisorPolicy policy, LifecycleState desiredState) {
+
+    Supervisoree process = new Supervisoree();
+    process.status = new Status();
+
+    process.policy = policy;
+    process.status.desiredState = desiredState;
+
+    supervisedProcesses.put(lifecycleAware, process);
+  }
+
+  @Override
+  public LifecycleState getLifecycleState() {
+    return lifecycleState;
+  }
+
+  public static class Status {
+    public Long firstSeen;
+    public Long lastSeen;
+    public LifecycleState lastSeenState;
+    public LifecycleState desiredState;
+    public int failures;
+
+    @Override
+    public String toString() {
+      return "{ lastSeen:" + lastSeen + " lastSeenState:" + lastSeenState
+          + " desiredState:" + desiredState + " firstSeen:" + firstSeen
+          + " failures:" + failures + " }";
+    }
+
+  }
+
+  public static abstract class SupervisorPolicy {
+
+    abstract boolean isValid(LifecycleAware object, Status status);
+
+    public static class AlwaysRestartPolicy extends SupervisorPolicy {
+
+      @Override
+      boolean isValid(LifecycleAware object, Status status) {
+        return true;
+      }
+    }
+
+    public static class OnceOnlyPolicy extends SupervisorPolicy {
+
+      @Override
+      boolean isValid(LifecycleAware object, Status status) {
+        return status.failures == 0;
+      }
+    }
+
+  }
+
+  private static class Supervisoree {
+
+    public SupervisorPolicy policy;
+    public Status status;
+
+    @Override
+    public String toString() {
+      return "{ status:" + status + " policy:" + policy + " }";
+    }
+
+  }
+
+}