You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:49:06 UTC
svn commit: r1156923 -
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
Author: esammer
Date: Fri Aug 12 00:49:06 2011
New Revision: 1156923
URL: http://svn.apache.org/viewvc?rev=1156923&view=rev
Log:
- Added LifecycleSupervisor - like an Akka / Erlang OTP supervisor but for LifecycleAware services.
Added:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java?rev=1156923&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java Fri Aug 12 00:49:06 2011
@@ -0,0 +1,229 @@
+package org.apache.flume.lifecycle;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class LifecycleSupervisor implements LifecycleAware {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(LifecycleSupervisor.class);
+
+ private Map<LifecycleAware, Supervisoree> supervisedProcesses;
+ private ScheduledExecutorService monitorService;
+
+ private LifecycleState lifecycleState;
+
+ public LifecycleSupervisor() {
+ lifecycleState = LifecycleState.IDLE;
+ supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();
+ monitorService = Executors.newScheduledThreadPool(
+ 3,
+ new ThreadFactoryBuilder().setNameFormat(
+ "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
+ .build());
+ }
+
+ @Override
+ public void start(Context context) throws LifecycleException,
+ InterruptedException {
+
+ logger.info("Starting lifecycle supervisor {}", Thread.currentThread()
+ .getId());
+
+ /* FIXME: Teasing this apart into manageable chunks. */
+
+ Runnable monitorCheckRunnable = new Runnable() {
+
+ @Override
+ public void run() {
+ synchronized (supervisedProcesses) {
+ logger.debug("Checking status of all processes");
+
+ for (Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
+ .entrySet()) {
+
+ LifecycleAware lifecycleAware = entry.getKey();
+ Supervisoree supervisoree = entry.getValue();
+
+ logger.debug("checking process:{} supervisoree:{}", lifecycleAware,
+ supervisoree);
+
+ long now = System.currentTimeMillis();
+
+ if (supervisoree.status.firstSeen == null) {
+ logger.debug("first time seeing {}", lifecycleAware);
+
+ supervisoree.status.firstSeen = now;
+ }
+
+ supervisoree.status.lastSeen = now;
+ supervisoree.status.lastSeenState = lifecycleAware
+ .getLifecycleState();
+
+ if (!lifecycleAware.getLifecycleState().equals(
+ supervisoree.status.desiredState)) {
+
+ logger.debug("Want to transition {} from {} to {} (failures:{})",
+ new Object[] { lifecycleAware,
+ supervisoree.status.lastSeenState,
+ supervisoree.status.desiredState,
+ supervisoree.status.failures });
+
+ Context context = new Context();
+
+ switch (supervisoree.status.desiredState) {
+ case START:
+ try {
+ lifecycleAware.start(context);
+ } catch (Exception e) {
+ logger.error("Unable to start " + lifecycleAware
+ + " - Exception follows.", e);
+ supervisoree.status.failures++;
+ }
+ break;
+ case STOP:
+ try {
+ lifecycleAware.stop(context);
+ } catch (Exception e) {
+ logger.error("Unable to stop " + lifecycleAware
+ + " - Exception follows.", e);
+ supervisoree.status.failures++;
+ }
+ break;
+ default:
+ logger.warn("I refuse to acknowledge {} as a desired state",
+ supervisoree.status.desiredState);
+ }
+
+ if (!supervisoree.policy.isValid(lifecycleAware,
+ supervisoree.status)) {
+ logger
+ .error(
+ "Policy {} of {} has been violated - supervisor should exit!",
+ supervisoree.policy, lifecycleAware);
+ }
+ }
+ }
+
+ logger.debug("Status check complete");
+ }
+ }
+
+ };
+
+ monitorService.scheduleAtFixedRate(monitorCheckRunnable, 0, 3,
+ TimeUnit.SECONDS);
+
+ lifecycleState = LifecycleState.START;
+
+ logger.debug("Lifecycle supervisor started");
+ }
+
+ @Override
+ public void stop(Context context) throws LifecycleException,
+ InterruptedException {
+
+ logger.info("Stopping lifecycle supervisor {}", Thread.currentThread()
+ .getId());
+
+ monitorService.shutdown();
+
+ while (!monitorService.isTerminated()) {
+ monitorService.awaitTermination(500, TimeUnit.MILLISECONDS);
+ }
+
+ synchronized (supervisedProcesses) {
+ for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
+ .entrySet()) {
+
+ if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {
+ entry.getKey().stop(context);
+ LifecycleController.waitForOneOf(entry.getKey(),
+ LifecycleState.STOP_OR_ERROR, 5000);
+ }
+ }
+ }
+
+ lifecycleState = LifecycleState.STOP;
+
+ logger.debug("Lifecycle supervisor stopped");
+ }
+
+ public synchronized void supervise(LifecycleAware lifecycleAware,
+ SupervisorPolicy policy, LifecycleState desiredState) {
+
+ Supervisoree process = new Supervisoree();
+ process.status = new Status();
+
+ process.policy = policy;
+ process.status.desiredState = desiredState;
+
+ supervisedProcesses.put(lifecycleAware, process);
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+
+ public static class Status {
+ public Long firstSeen;
+ public Long lastSeen;
+ public LifecycleState lastSeenState;
+ public LifecycleState desiredState;
+ public int failures;
+
+ @Override
+ public String toString() {
+ return "{ lastSeen:" + lastSeen + " lastSeenState:" + lastSeenState
+ + " desiredState:" + desiredState + " firstSeen:" + firstSeen
+ + " failures:" + failures + " }";
+ }
+
+ }
+
+ public static abstract class SupervisorPolicy {
+
+ abstract boolean isValid(LifecycleAware object, Status status);
+
+ public static class AlwaysRestartPolicy extends SupervisorPolicy {
+
+ @Override
+ boolean isValid(LifecycleAware object, Status status) {
+ return true;
+ }
+ }
+
+ public static class OnceOnlyPolicy extends SupervisorPolicy {
+
+ @Override
+ boolean isValid(LifecycleAware object, Status status) {
+ return status.failures == 0;
+ }
+ }
+
+ }
+
+ private static class Supervisoree {
+
+ public SupervisorPolicy policy;
+ public Status status;
+
+ @Override
+ public String toString() {
+ return "{ status:" + status + " policy:" + policy + " }";
+ }
+
+ }
+
+}