You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/13 02:29:58 UTC

svn commit: r1157304 - in /incubator/flume/branches/flume-728/flume-ng-core/src: main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java test/java/org/apache/flume/lifecycle/TestLifecycleSupervisor.java

Author: esammer
Date: Sat Aug 13 00:29:57 2011
New Revision: 1157304

URL: http://svn.apache.org/viewvc?rev=1157304&view=rev
Log:
- Added support for "unsupervising" services as well as changing the desired
  state (which in turn effects a state transition by the monitor thread).
- Switched from using a fixed scheduled delay, we now schedule the next monitor
  run at the end of the current (i.e. chained scheduling). The reason we need
  to do this is because ScheduledExecutorService provides no (polite) method of
  unscheduling something so we go old school.
- Added hinkey unit tests for unsupervise() and setDesiredState() in
  LifecycleSupervisor.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/TestLifecycleSupervisor.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java?rev=1157304&r1=1157303&r2=1157304&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java Sat Aug 13 00:29:57 2011
@@ -11,6 +11,7 @@ import org.apache.flume.Context;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class LifecycleSupervisor implements LifecycleAware {
@@ -101,6 +102,14 @@ public class LifecycleSupervisor impleme
   public synchronized void supervise(LifecycleAware lifecycleAware,
       SupervisorPolicy policy, LifecycleState desiredState) {
 
+    Preconditions.checkState(!supervisedProcesses.containsKey(lifecycleAware),
+        "Refusing to supervise " + lifecycleAware + " more than once");
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Supervising service:{} policy:{} desiredState:{}",
+          new Object[] { lifecycleAware, policy, desiredState });
+    }
+
     Supervisoree process = new Supervisoree();
     process.status = new Status();
 
@@ -110,9 +119,35 @@ public class LifecycleSupervisor impleme
     MonitorRunnable monitorRunnable = new MonitorRunnable();
     monitorRunnable.lifecycleAware = lifecycleAware;
     monitorRunnable.supervisoree = process;
+    monitorRunnable.monitorService = monitorService;
 
     supervisedProcesses.put(lifecycleAware, process);
-    monitorService.scheduleAtFixedRate(monitorRunnable, 0, 3, TimeUnit.SECONDS);
+    monitorService.schedule(monitorRunnable, 0, TimeUnit.SECONDS);
+  }
+
+  public synchronized void unsupervise(LifecycleAware lifecycleAware) {
+
+    Preconditions.checkState(supervisedProcesses.containsKey(lifecycleAware),
+        "Unaware of " + lifecycleAware + " - can not unsupervise");
+
+    logger.debug("Unsupervising service:{}", lifecycleAware);
+
+    Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
+    supervisoree.status.discard = true;
+  }
+
+  public synchronized void setDesiredState(LifecycleAware lifecycleAware,
+      LifecycleState desiredState) {
+
+    Preconditions.checkState(supervisedProcesses.containsKey(lifecycleAware),
+        "Unaware of " + lifecycleAware + " - can not set desired state to "
+            + desiredState);
+
+    logger.debug("Setting desiredState:{} on service:{}", desiredState,
+        lifecycleAware);
+
+    Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
+    supervisoree.status.desiredState = desiredState;
   }
 
   @Override
@@ -122,6 +157,7 @@ public class LifecycleSupervisor impleme
 
   public static class MonitorRunnable implements Runnable {
 
+    public ScheduledExecutorService monitorService;
     public LifecycleAware lifecycleAware;
     public Supervisoree supervisoree;
 
@@ -130,6 +166,11 @@ public class LifecycleSupervisor impleme
       logger.debug("checking process:{} supervisoree:{}", lifecycleAware,
           supervisoree);
 
+      if (supervisoree.status.discard) {
+        logger.debug("Halting monitoring on {}", supervisoree);
+        return;
+      }
+
       long now = System.currentTimeMillis();
 
       if (supervisoree.status.firstSeen == null) {
@@ -184,6 +225,8 @@ public class LifecycleSupervisor impleme
         }
       }
 
+      monitorService.schedule(this, 3, TimeUnit.SECONDS);
+
       logger.debug("Status check complete");
     }
 
@@ -195,12 +238,13 @@ public class LifecycleSupervisor impleme
     public LifecycleState lastSeenState;
     public LifecycleState desiredState;
     public int failures;
+    public boolean discard;
 
     @Override
     public String toString() {
       return "{ lastSeen:" + lastSeen + " lastSeenState:" + lastSeenState
           + " desiredState:" + desiredState + " firstSeen:" + firstSeen
-          + " failures:" + failures + " }";
+          + " failures:" + failures + " discard:" + discard + " }";
     }
 
   }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/TestLifecycleSupervisor.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/TestLifecycleSupervisor.java?rev=1157304&r1=1157303&r2=1157304&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/TestLifecycleSupervisor.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/TestLifecycleSupervisor.java Sat Aug 13 00:29:57 2011
@@ -1,10 +1,12 @@
 package org.apache.flume.lifecycle;
 
 import org.apache.flume.Context;
+import org.apache.flume.CounterGroup;
 import org.apache.flume.LogicalNode;
 import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
 import org.apache.flume.sink.NullSink;
 import org.apache.flume.source.SequenceGeneratorSource;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -105,4 +107,86 @@ public class TestLifecycleSupervisor {
     supervisor.stop(context);
   }
 
+  @Test
+  public void testUnsuperviseServce() throws LifecycleException,
+      InterruptedException {
+    Context context = new Context();
+
+    supervisor.start(context);
+
+    LifecycleAware service = new CountingLifecycleAware();
+    SupervisorPolicy policy = new SupervisorPolicy.OnceOnlyPolicy();
+
+    supervisor.supervise(service, policy, LifecycleState.START);
+    supervisor.unsupervise(service);
+
+    service.stop(context);
+
+    supervisor.stop(context);
+  }
+
+  @Test
+  public void testStopServce() throws LifecycleException, InterruptedException {
+    Context context = new Context();
+
+    supervisor.start(context);
+
+    CountingLifecycleAware service = new CountingLifecycleAware();
+    SupervisorPolicy policy = new SupervisorPolicy.OnceOnlyPolicy();
+
+    Assert.assertEquals(Long.valueOf(0), service.counterGroup.get("start"));
+    Assert.assertEquals(Long.valueOf(0), service.counterGroup.get("stop"));
+
+    supervisor.supervise(service, policy, LifecycleState.START);
+
+    Thread.sleep(3200);
+
+    Assert.assertEquals(Long.valueOf(1), service.counterGroup.get("start"));
+    Assert.assertEquals(Long.valueOf(0), service.counterGroup.get("stop"));
+
+    supervisor.setDesiredState(service, LifecycleState.STOP);
+
+    Thread.sleep(3200);
+
+    Assert.assertEquals(Long.valueOf(1), service.counterGroup.get("start"));
+    Assert.assertEquals(Long.valueOf(1), service.counterGroup.get("stop"));
+
+    supervisor.stop(context);
+  }
+
+  public static class CountingLifecycleAware implements LifecycleAware {
+
+    public CounterGroup counterGroup;
+    private LifecycleState lifecycleState;
+
+    public CountingLifecycleAware() {
+      lifecycleState = LifecycleState.IDLE;
+      counterGroup = new CounterGroup();
+    }
+
+    @Override
+    public void start(Context context) throws LifecycleException,
+        InterruptedException {
+
+      counterGroup.incrementAndGet("start");
+
+      lifecycleState = LifecycleState.START;
+    }
+
+    @Override
+    public void stop(Context context) throws LifecycleException,
+        InterruptedException {
+
+      counterGroup.incrementAndGet("stop");
+
+      lifecycleState = LifecycleState.STOP;
+    }
+
+    @Override
+    public LifecycleState getLifecycleState() {
+      return lifecycleState;
+    }
+
+  }
+
 }