You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/13 02:29:54 UTC

svn commit: r1157303 - /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java

Author: esammer
Date: Sat Aug 13 00:29:54 2011
New Revision: 1157303

URL: http://svn.apache.org/viewvc?rev=1157303&view=rev
Log:
- Changed synchronization points (which fixed a few subtle bugs).

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java?rev=1157303&r1=1157302&r2=1157303&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java Sat Aug 13 00:29:54 2011
@@ -1,6 +1,5 @@
 package org.apache.flume.lifecycle;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -35,28 +34,22 @@ public class LifecycleSupervisor impleme
   }
 
   @Override
-  public void start(Context context) throws LifecycleException,
+  public synchronized void start(Context context) throws LifecycleException,
       InterruptedException {
 
     logger.info("Starting lifecycle supervisor {}", Thread.currentThread()
         .getId());
 
-    /* FIXME: Teasing this apart into manageable chunks. */
+    for (Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
+        .entrySet()) {
 
-    synchronized (supervisedProcesses) {
-      logger.debug("Checking status of all processes");
+      MonitorRunnable monitorCheckRunnable = new MonitorRunnable();
 
-      for (Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
-          .entrySet()) {
+      monitorCheckRunnable.lifecycleAware = entry.getKey();
+      monitorCheckRunnable.supervisoree = entry.getValue();
 
-        MonitorRunnable monitorCheckRunnable = new MonitorRunnable();
-
-        monitorCheckRunnable.lifecycleAware = entry.getKey();
-        monitorCheckRunnable.supervisoree = entry.getValue();
-
-        monitorService.scheduleAtFixedRate(monitorCheckRunnable, 0, 3,
-            TimeUnit.SECONDS);
-      }
+      monitorService.scheduleAtFixedRate(monitorCheckRunnable, 0, 3,
+          TimeUnit.SECONDS);
     }
 
     lifecycleState = LifecycleState.START;
@@ -65,44 +58,43 @@ public class LifecycleSupervisor impleme
   }
 
   @Override
-  public void stop(Context context) throws LifecycleException,
+  public synchronized void stop(Context context) throws LifecycleException,
       InterruptedException {
 
     logger.info("Stopping lifecycle supervisor {}", Thread.currentThread()
         .getId());
 
-    monitorService.shutdown();
+    if (monitorService != null) {
+      monitorService.shutdown();
 
-    while (!monitorService.isTerminated()) {
-      monitorService.awaitTermination(500, TimeUnit.MILLISECONDS);
+      while (!monitorService.isTerminated()) {
+        monitorService.awaitTermination(500, TimeUnit.MILLISECONDS);
+      }
     }
 
-    synchronized (supervisedProcesses) {
-      for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
-          .entrySet()) {
+    for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
+        .entrySet()) {
+
+      if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {
 
-        if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {
-          entry.getKey().stop(context);
-          LifecycleController.waitForOneOf(entry.getKey(),
-              LifecycleState.STOP_OR_ERROR, 5000);
+        entry.getKey().stop(context);
+
+        if (!LifecycleController.waitForOneOf(entry.getKey(),
+            LifecycleState.STOP_OR_ERROR, 5000)) {
+          fail();
         }
       }
     }
 
-    lifecycleState = LifecycleState.STOP;
+    /* If we've failed, preserve the error state. */
+    if (lifecycleState.equals(LifecycleState.START)) {
+      lifecycleState = LifecycleState.STOP;
+    }
 
     logger.debug("Lifecycle supervisor stopped");
   }
 
-  public void fail() {
-    try {
-      LifecycleController.stopAll(new ArrayList<LifecycleAware>(
-          supervisedProcesses.keySet()));
-    } catch (InterruptedException e) {
-      logger
-          .warn("Interrupted while stopping all outstanding supervised processes");
-    }
-
+  public synchronized void fail() {
     lifecycleState = LifecycleState.ERROR;
   }
 
@@ -124,7 +116,7 @@ public class LifecycleSupervisor impleme
   }
 
   @Override
-  public LifecycleState getLifecycleState() {
+  public synchronized LifecycleState getLifecycleState() {
     return lifecycleState;
   }