You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/13 02:29:54 UTC
svn commit: r1157303 -
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
Author: esammer
Date: Sat Aug 13 00:29:54 2011
New Revision: 1157303
URL: http://svn.apache.org/viewvc?rev=1157303&view=rev
Log:
- Changed synchronization points (which fixed a few subtle bugs).
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java?rev=1157303&r1=1157302&r2=1157303&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java Sat Aug 13 00:29:54 2011
@@ -1,6 +1,5 @@
package org.apache.flume.lifecycle;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -35,28 +34,22 @@ public class LifecycleSupervisor impleme
}
@Override
- public void start(Context context) throws LifecycleException,
+ public synchronized void start(Context context) throws LifecycleException,
InterruptedException {
logger.info("Starting lifecycle supervisor {}", Thread.currentThread()
.getId());
- /* FIXME: Teasing this apart into manageable chunks. */
+ for (Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
+ .entrySet()) {
- synchronized (supervisedProcesses) {
- logger.debug("Checking status of all processes");
+ MonitorRunnable monitorCheckRunnable = new MonitorRunnable();
- for (Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
- .entrySet()) {
+ monitorCheckRunnable.lifecycleAware = entry.getKey();
+ monitorCheckRunnable.supervisoree = entry.getValue();
- MonitorRunnable monitorCheckRunnable = new MonitorRunnable();
-
- monitorCheckRunnable.lifecycleAware = entry.getKey();
- monitorCheckRunnable.supervisoree = entry.getValue();
-
- monitorService.scheduleAtFixedRate(monitorCheckRunnable, 0, 3,
- TimeUnit.SECONDS);
- }
+ monitorService.scheduleAtFixedRate(monitorCheckRunnable, 0, 3,
+ TimeUnit.SECONDS);
}
lifecycleState = LifecycleState.START;
@@ -65,44 +58,43 @@ public class LifecycleSupervisor impleme
}
@Override
- public void stop(Context context) throws LifecycleException,
+ public synchronized void stop(Context context) throws LifecycleException,
InterruptedException {
logger.info("Stopping lifecycle supervisor {}", Thread.currentThread()
.getId());
- monitorService.shutdown();
+ if (monitorService != null) {
+ monitorService.shutdown();
- while (!monitorService.isTerminated()) {
- monitorService.awaitTermination(500, TimeUnit.MILLISECONDS);
+ while (!monitorService.isTerminated()) {
+ monitorService.awaitTermination(500, TimeUnit.MILLISECONDS);
+ }
}
- synchronized (supervisedProcesses) {
- for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
- .entrySet()) {
+ for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
+ .entrySet()) {
+
+ if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {
- if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {
- entry.getKey().stop(context);
- LifecycleController.waitForOneOf(entry.getKey(),
- LifecycleState.STOP_OR_ERROR, 5000);
+ entry.getKey().stop(context);
+
+ if (!LifecycleController.waitForOneOf(entry.getKey(),
+ LifecycleState.STOP_OR_ERROR, 5000)) {
+ fail();
}
}
}
- lifecycleState = LifecycleState.STOP;
+ /* If we've failed, preserve the error state. */
+ if (lifecycleState.equals(LifecycleState.START)) {
+ lifecycleState = LifecycleState.STOP;
+ }
logger.debug("Lifecycle supervisor stopped");
}
- public void fail() {
- try {
- LifecycleController.stopAll(new ArrayList<LifecycleAware>(
- supervisedProcesses.keySet()));
- } catch (InterruptedException e) {
- logger
- .warn("Interrupted while stopping all outstanding supervised processes");
- }
-
+ public synchronized void fail() {
lifecycleState = LifecycleState.ERROR;
}
@@ -124,7 +116,7 @@ public class LifecycleSupervisor impleme
}
@Override
- public LifecycleState getLifecycleState() {
+ public synchronized LifecycleState getLifecycleState() {
return lifecycleState;
}