You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2014/11/15 03:37:05 UTC
[2/4] incubator-brooklyn git commit: Terminate UsageManager (and wait
for usage listeners)
Terminate UsageManager (and wait for usage listeners)
- also tracks how many uncompleted listener events there are,
and reports this on termination.
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/ac1ffa72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/ac1ffa72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/ac1ffa72
Branch: refs/heads/master
Commit: ac1ffa7261ad6cd6a12394df1ee6f1856c4edb4f
Parents: e8b2065
Author: Aled Sage <al...@gmail.com>
Authored: Fri Nov 14 19:52:36 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Nov 14 19:52:36 2014 +0000
----------------------------------------------------------------------
.../internal/LocalManagementContext.java | 1 +
.../management/internal/LocalUsageManager.java | 101 ++++++++++++++-----
.../management/internal/UsageManager.java | 7 ++
3 files changed, 86 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1ffa72/core/src/main/java/brooklyn/management/internal/LocalManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/management/internal/LocalManagementContext.java b/core/src/main/java/brooklyn/management/internal/LocalManagementContext.java
index 7841ec9..098e3af 100644
--- a/core/src/main/java/brooklyn/management/internal/LocalManagementContext.java
+++ b/core/src/main/java/brooklyn/management/internal/LocalManagementContext.java
@@ -318,6 +318,7 @@ public class LocalManagementContext extends AbstractManagementContext {
osgiManager.stop();
osgiManager = null;
}
+ if (usageManager != null) usageManager.terminate();
if (execution != null) execution.shutdownNow();
if (gc != null) gc.shutdownNow();
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1ffa72/core/src/main/java/brooklyn/management/internal/LocalUsageManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/management/internal/LocalUsageManager.java b/core/src/main/java/brooklyn/management/internal/LocalUsageManager.java
index 476ea3e..4e7cc98 100644
--- a/core/src/main/java/brooklyn/management/internal/LocalUsageManager.java
+++ b/core/src/main/java/brooklyn/management/internal/LocalUsageManager.java
@@ -20,13 +20,16 @@ package brooklyn.management.internal;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,11 +42,13 @@ import brooklyn.internal.storage.BrooklynStorage;
import brooklyn.location.Location;
import brooklyn.location.basic.LocationConfigKeys;
import brooklyn.location.basic.LocationInternal;
+import brooklyn.management.ManagementContextInjectable;
import brooklyn.management.usage.ApplicationUsage;
import brooklyn.management.usage.LocationUsage;
import brooklyn.util.exceptions.Exceptions;
import brooklyn.util.flags.TypeCoercions;
import brooklyn.util.javalang.Reflections;
+import brooklyn.util.time.Duration;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
@@ -51,6 +56,10 @@ import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class LocalUsageManager implements UsageManager {
@@ -93,9 +102,11 @@ public class LocalUsageManager implements UsageManager {
private final List<UsageListener> listeners = Lists.newCopyOnWriteArrayList();
- private ExecutorService listenerExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ private final AtomicInteger listenerQueueSize = new AtomicInteger();
+
+ private ListeningExecutorService listenerExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("brooklyn-usagemanager-listener-%d")
- .build());
+ .build()));
public LocalUsageManager(LocalManagementContext managementContext) {
this.managementContext = checkNotNull(managementContext, "managementContext");
@@ -103,11 +114,61 @@ public class LocalUsageManager implements UsageManager {
Collection<UsageListener> listeners = managementContext.getBrooklynProperties().getConfig(UsageManager.USAGE_LISTENERS);
if (listeners != null) {
for (UsageListener listener : listeners) {
+ if (listener instanceof ManagementContextInjectable) {
+ ((ManagementContextInjectable)listener).injectManagementContext(managementContext);
+ }
addUsageListener(listener);
}
}
}
+ public void terminate() {
+ // Wait for the listeners to finish + close the listeners
+ Duration timeout = managementContext.getBrooklynProperties().getConfig(UsageManager.USAGE_LISTENER_TERMINATION_TIMEOUT);
+ if (listenerQueueSize.get() > 0) {
+ log.info("Usage manager waiting for "+listenerQueueSize+" listener events for up to "+timeout);
+ }
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+ for (final UsageListener listener : listeners) {
+ ListenableFuture<?> future = listenerExecutor.submit(new Runnable() {
+ public void run() {
+ if (listener instanceof Closeable) {
+ try {
+ ((Closeable)listener).close();
+ } catch (IOException e) {
+ log.warn("Problem closing usage listener "+listener+" (continuing)", e);
+ }
+ }
+ }});
+ futures.add(future);
+ }
+ try {
+ Futures.successfulAsList(futures).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ log.warn("Problem terminiating usage listeners (continuing)", e);
+ } finally {
+ listenerExecutor.shutdownNow();
+ }
+ }
+
+ private void execOnListeners(final Function<UsageListener, Void> job) {
+ for (final UsageListener listener : listeners) {
+ listenerQueueSize.incrementAndGet();
+ listenerExecutor.execute(new Runnable() {
+ public void run() {
+ try {
+ job.apply(listener);
+ } catch (RuntimeException e) {
+ log.error("Problem notifying listener "+listener+" of "+job, e);
+ Exceptions.propagateIfFatal(e);
+ } finally {
+ listenerQueueSize.decrementAndGet();
+ }
+ }});
+ }
+ }
+
@Override
public void recordApplicationEvent(final Application app, final Lifecycle state) {
log.debug("Storing application lifecycle usage event: application {} in state {}", new Object[] {app, state});
@@ -121,17 +182,14 @@ public class LocalUsageManager implements UsageManager {
usage.addEvent(event);
eventMap.put(app.getId(), usage);
- for (final UsageListener listener : listeners) {
- listenerExecutor.execute(new Runnable() {
- public void run() {
- try {
- listener.onApplicationEvent(app.getId(), app.getDisplayName(), app.getEntityType().getName(), ((EntityInternal)app).toMetadataRecord(), event);
- } catch (Exception e) {
- log.error("Problem notifying listener "+listener+" of applicationEvent("+app+", "+state+")", e);
- Exceptions.propagateIfFatal(e);
- }
+ execOnListeners(new Function<UsageListener, Void>() {
+ public Void apply(UsageListener listener) {
+ listener.onApplicationEvent(app.getId(), app.getDisplayName(), app.getEntityType().getName(), ((EntityInternal)app).toMetadataRecord(), event);
+ return null;
+ }
+ public String toString() {
+ return "applicationEvent("+app+", "+state+")";
}});
- }
}
}
@@ -181,17 +239,14 @@ public class LocalUsageManager implements UsageManager {
usage.addEvent(event);
usageMap.put(loc.getId(), usage);
- for (final UsageListener listener : listeners) {
- listenerExecutor.execute(new Runnable() {
- public void run() {
- try {
- listener.onLocationEvent(loc.getId(), ((LocationInternal)loc).toMetadataRecord(), event);
- } catch (Exception e) {
- log.error("Problem notifying listener "+listener+" of locationEvent("+loc+", "+state+")", e);
- Exceptions.propagateIfFatal(e);
- }
+ execOnListeners(new Function<UsageListener, Void>() {
+ public Void apply(UsageListener listener) {
+ listener.onLocationEvent(loc.getId(), ((LocationInternal)loc).toMetadataRecord(), event);
+ return null;
+ }
+ public String toString() {
+ return "locationEvent("+loc+", "+state+")";
}});
- }
}
} else {
// normal for high-level locations
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1ffa72/core/src/main/java/brooklyn/management/internal/UsageManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/management/internal/UsageManager.java b/core/src/main/java/brooklyn/management/internal/UsageManager.java
index 56a5ace..450a73b 100644
--- a/core/src/main/java/brooklyn/management/internal/UsageManager.java
+++ b/core/src/main/java/brooklyn/management/internal/UsageManager.java
@@ -31,6 +31,7 @@ import brooklyn.management.usage.ApplicationUsage;
import brooklyn.management.usage.ApplicationUsage.ApplicationEvent;
import brooklyn.management.usage.LocationUsage;
import brooklyn.management.usage.LocationUsage.LocationEvent;
+import brooklyn.util.time.Duration;
import com.google.common.annotations.Beta;
import com.google.common.base.Predicate;
@@ -45,6 +46,12 @@ public interface UsageManager {
new TypeToken<List<UsageListener>>() {},
"brooklyn.usageManager.listeners", "Optional usage listeners (i.e. for metering)",
ImmutableList.<UsageListener>of());
+
+ public static final ConfigKey<Duration> USAGE_LISTENER_TERMINATION_TIMEOUT = ConfigKeys.newConfigKey(
+ Duration.class,
+ "brooklyn.usageManager.listeners.timeout",
+ "Timeout on termination, to wait for queue of usage listener events to be processed",
+ Duration.TEN_SECONDS);
public interface UsageListener {
public static final UsageListener NOOP = new UsageListener() {