You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/13 00:46:35 UTC
svn commit: r1157283 - in
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle:
LifecycleController.java LifecycleSupervisor.java
Author: esammer
Date: Fri Aug 12 22:46:34 2011
New Revision: 1157283
URL: http://svn.apache.org/viewvc?rev=1157283&view=rev
Log:
- Added support for stopping a list of services in LifecycleController.
- Moved monitor check runnable to a defined class within LifecycleSupervisor.
- Node monitor checks can now run in parallel (scheduled separately), up to 5 at a time.
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java?rev=1157283&r1=1157282&r2=1157283&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java Fri Aug 12 22:46:34 2011
@@ -1,5 +1,7 @@
package org.apache.flume.lifecycle;
+import java.util.List;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,8 +33,10 @@ public class LifecycleController {
public static boolean waitForOneOf(LifecycleAware delegate,
LifecycleState[] states, long timeout) throws InterruptedException {
- logger.debug("Waiting for state {} for delegate:{} up to {}ms",
- new Object[] { states, delegate, timeout });
+ if (logger.isDebugEnabled()) {
+ logger.debug("Waiting for state {} for delegate:{} up to {}ms",
+ new Object[] { states, delegate, timeout });
+ }
long sleepInterval = Math.max(shortestSleepDuration, timeout
/ maxNumberOfChecks);
@@ -55,4 +59,12 @@ public class LifecycleController {
return false;
}
+ public static void stopAll(List<LifecycleAware> services)
+ throws InterruptedException {
+
+ for (LifecycleAware service : services) {
+ waitForOneOf(service, LifecycleState.STOP_OR_ERROR);
+ }
+ }
+
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java?rev=1157283&r1=1157282&r2=1157283&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java Fri Aug 12 22:46:34 2011
@@ -1,5 +1,6 @@
package org.apache.flume.lifecycle;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -27,7 +28,7 @@ public class LifecycleSupervisor impleme
lifecycleState = LifecycleState.IDLE;
supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();
monitorService = Executors.newScheduledThreadPool(
- 3,
+ 5,
new ThreadFactoryBuilder().setNameFormat(
"lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
.build());
@@ -42,87 +43,21 @@ public class LifecycleSupervisor impleme
/* FIXME: Teasing this apart into manageable chunks. */
- Runnable monitorCheckRunnable = new Runnable() {
+ synchronized (supervisedProcesses) {
+ logger.debug("Checking status of all processes");
- @Override
- public void run() {
- synchronized (supervisedProcesses) {
- logger.debug("Checking status of all processes");
-
- for (Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
- .entrySet()) {
-
- LifecycleAware lifecycleAware = entry.getKey();
- Supervisoree supervisoree = entry.getValue();
-
- logger.debug("checking process:{} supervisoree:{}", lifecycleAware,
- supervisoree);
-
- long now = System.currentTimeMillis();
-
- if (supervisoree.status.firstSeen == null) {
- logger.debug("first time seeing {}", lifecycleAware);
-
- supervisoree.status.firstSeen = now;
- }
-
- supervisoree.status.lastSeen = now;
- supervisoree.status.lastSeenState = lifecycleAware
- .getLifecycleState();
-
- if (!lifecycleAware.getLifecycleState().equals(
- supervisoree.status.desiredState)) {
-
- logger.debug("Want to transition {} from {} to {} (failures:{})",
- new Object[] { lifecycleAware,
- supervisoree.status.lastSeenState,
- supervisoree.status.desiredState,
- supervisoree.status.failures });
-
- Context context = new Context();
-
- switch (supervisoree.status.desiredState) {
- case START:
- try {
- lifecycleAware.start(context);
- } catch (Exception e) {
- logger.error("Unable to start " + lifecycleAware
- + " - Exception follows.", e);
- supervisoree.status.failures++;
- }
- break;
- case STOP:
- try {
- lifecycleAware.stop(context);
- } catch (Exception e) {
- logger.error("Unable to stop " + lifecycleAware
- + " - Exception follows.", e);
- supervisoree.status.failures++;
- }
- break;
- default:
- logger.warn("I refuse to acknowledge {} as a desired state",
- supervisoree.status.desiredState);
- }
-
- if (!supervisoree.policy.isValid(lifecycleAware,
- supervisoree.status)) {
- logger
- .error(
- "Policy {} of {} has been violated - supervisor should exit!",
- supervisoree.policy, lifecycleAware);
- }
- }
- }
+ for (Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
+ .entrySet()) {
- logger.debug("Status check complete");
- }
- }
+ MonitorRunnable monitorCheckRunnable = new MonitorRunnable();
- };
+ monitorCheckRunnable.lifecycleAware = entry.getKey();
+ monitorCheckRunnable.supervisoree = entry.getValue();
- monitorService.scheduleAtFixedRate(monitorCheckRunnable, 0, 3,
- TimeUnit.SECONDS);
+ monitorService.scheduleAtFixedRate(monitorCheckRunnable, 0, 3,
+ TimeUnit.SECONDS);
+ }
+ }
lifecycleState = LifecycleState.START;
@@ -159,6 +94,18 @@ public class LifecycleSupervisor impleme
logger.debug("Lifecycle supervisor stopped");
}
+ public void fail() {
+ try {
+ LifecycleController.stopAll(new ArrayList<LifecycleAware>(
+ supervisedProcesses.keySet()));
+ } catch (InterruptedException e) {
+ logger
+ .warn("Interrupted while stopping all outstanding supervised processes");
+ }
+
+ lifecycleState = LifecycleState.ERROR;
+ }
+
public synchronized void supervise(LifecycleAware lifecycleAware,
SupervisorPolicy policy, LifecycleState desiredState) {
@@ -168,7 +115,12 @@ public class LifecycleSupervisor impleme
process.policy = policy;
process.status.desiredState = desiredState;
+ MonitorRunnable monitorRunnable = new MonitorRunnable();
+ monitorRunnable.lifecycleAware = lifecycleAware;
+ monitorRunnable.supervisoree = process;
+
supervisedProcesses.put(lifecycleAware, process);
+ monitorService.scheduleAtFixedRate(monitorRunnable, 0, 3, TimeUnit.SECONDS);
}
@Override
@@ -176,6 +128,75 @@ public class LifecycleSupervisor impleme
return lifecycleState;
}
+ public static class MonitorRunnable implements Runnable {
+
+ public LifecycleAware lifecycleAware;
+ public Supervisoree supervisoree;
+
+ @Override
+ public void run() {
+ logger.debug("checking process:{} supervisoree:{}", lifecycleAware,
+ supervisoree);
+
+ long now = System.currentTimeMillis();
+
+ if (supervisoree.status.firstSeen == null) {
+ logger.debug("first time seeing {}", lifecycleAware);
+
+ supervisoree.status.firstSeen = now;
+ }
+
+ supervisoree.status.lastSeen = now;
+ supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
+
+ if (!lifecycleAware.getLifecycleState().equals(
+ supervisoree.status.desiredState)) {
+
+ logger
+ .debug("Want to transition {} from {} to {} (failures:{})",
+ new Object[] { lifecycleAware,
+ supervisoree.status.lastSeenState,
+ supervisoree.status.desiredState,
+ supervisoree.status.failures });
+
+ Context context = new Context();
+
+ switch (supervisoree.status.desiredState) {
+ case START:
+ try {
+ lifecycleAware.start(context);
+ } catch (Exception e) {
+ logger.error("Unable to start " + lifecycleAware
+ + " - Exception follows.", e);
+ supervisoree.status.failures++;
+ }
+ break;
+ case STOP:
+ try {
+ lifecycleAware.stop(context);
+ } catch (Exception e) {
+ logger.error("Unable to stop " + lifecycleAware
+ + " - Exception follows.", e);
+ supervisoree.status.failures++;
+ }
+ break;
+ default:
+ logger.warn("I refuse to acknowledge {} as a desired state",
+ supervisoree.status.desiredState);
+ }
+
+ if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
+ logger.error(
+ "Policy {} of {} has been violated - supervisor should exit!",
+ supervisoree.policy, lifecycleAware);
+ }
+ }
+
+ logger.debug("Status check complete");
+ }
+
+ }
+
public static class Status {
public Long firstSeen;
public Long lastSeen;