You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2014/11/15 03:37:05 UTC

[2/4] incubator-brooklyn git commit: Terminate UsageManager (and wait for usage listeners)

Terminate UsageManager (and wait for usage listeners)

- also tracks how many uncompleted listener events there are,
  and reports this on termination.

Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/ac1ffa72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/ac1ffa72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/ac1ffa72

Branch: refs/heads/master
Commit: ac1ffa7261ad6cd6a12394df1ee6f1856c4edb4f
Parents: e8b2065
Author: Aled Sage <al...@gmail.com>
Authored: Fri Nov 14 19:52:36 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Nov 14 19:52:36 2014 +0000

----------------------------------------------------------------------
 .../internal/LocalManagementContext.java        |   1 +
 .../management/internal/LocalUsageManager.java  | 101 ++++++++++++++-----
 .../management/internal/UsageManager.java       |   7 ++
 3 files changed, 86 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1ffa72/core/src/main/java/brooklyn/management/internal/LocalManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/management/internal/LocalManagementContext.java b/core/src/main/java/brooklyn/management/internal/LocalManagementContext.java
index 7841ec9..098e3af 100644
--- a/core/src/main/java/brooklyn/management/internal/LocalManagementContext.java
+++ b/core/src/main/java/brooklyn/management/internal/LocalManagementContext.java
@@ -318,6 +318,7 @@ public class LocalManagementContext extends AbstractManagementContext {
             osgiManager.stop();
             osgiManager = null;
         }
+        if (usageManager != null) usageManager.terminate();
         if (execution != null) execution.shutdownNow();
         if (gc != null) gc.shutdownNow();
     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1ffa72/core/src/main/java/brooklyn/management/internal/LocalUsageManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/management/internal/LocalUsageManager.java b/core/src/main/java/brooklyn/management/internal/LocalUsageManager.java
index 476ea3e..4e7cc98 100644
--- a/core/src/main/java/brooklyn/management/internal/LocalUsageManager.java
+++ b/core/src/main/java/brooklyn/management/internal/LocalUsageManager.java
@@ -20,13 +20,16 @@ package brooklyn.management.internal;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,11 +42,13 @@ import brooklyn.internal.storage.BrooklynStorage;
 import brooklyn.location.Location;
 import brooklyn.location.basic.LocationConfigKeys;
 import brooklyn.location.basic.LocationInternal;
+import brooklyn.management.ManagementContextInjectable;
 import brooklyn.management.usage.ApplicationUsage;
 import brooklyn.management.usage.LocationUsage;
 import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.flags.TypeCoercions;
 import brooklyn.util.javalang.Reflections;
+import brooklyn.util.time.Duration;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
@@ -51,6 +56,10 @@ import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class LocalUsageManager implements UsageManager {
@@ -93,9 +102,11 @@ public class LocalUsageManager implements UsageManager {
 
     private final List<UsageListener> listeners = Lists.newCopyOnWriteArrayList();
     
-    private ExecutorService listenerExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+    private final AtomicInteger listenerQueueSize = new AtomicInteger();
+    
+    private ListeningExecutorService listenerExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
             .setNameFormat("brooklyn-usagemanager-listener-%d")
-            .build());
+            .build()));
 
     public LocalUsageManager(LocalManagementContext managementContext) {
         this.managementContext = checkNotNull(managementContext, "managementContext");
@@ -103,11 +114,61 @@ public class LocalUsageManager implements UsageManager {
         Collection<UsageListener> listeners = managementContext.getBrooklynProperties().getConfig(UsageManager.USAGE_LISTENERS);
         if (listeners != null) {
             for (UsageListener listener : listeners) {
+                if (listener instanceof ManagementContextInjectable) {
+                    ((ManagementContextInjectable)listener).injectManagementContext(managementContext);
+                }
                 addUsageListener(listener);
             }
         }
     }
 
+    public void terminate() {
+        // Wait for the listeners to finish + close the listeners
+        Duration timeout = managementContext.getBrooklynProperties().getConfig(UsageManager.USAGE_LISTENER_TERMINATION_TIMEOUT);
+        if (listenerQueueSize.get() > 0) {
+            log.info("Usage manager waiting for "+listenerQueueSize+" listener events for up to "+timeout);
+        }
+        List<ListenableFuture<?>> futures = Lists.newArrayList();
+        for (final UsageListener listener : listeners) {
+            ListenableFuture<?> future = listenerExecutor.submit(new Runnable() {
+                public void run() {
+                    if (listener instanceof Closeable) {
+                        try {
+                            ((Closeable)listener).close();
+                        } catch (IOException e) {
+                            log.warn("Problem closing usage listener "+listener+" (continuing)", e);
+                        }
+                    }
+                }});
+            futures.add(future);
+        }
+        try {
+            Futures.successfulAsList(futures).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            Exceptions.propagateIfFatal(e);
+            log.warn("Problem terminiating usage listeners (continuing)", e);
+        } finally {
+            listenerExecutor.shutdownNow();
+        }
+    }
+
+    private void execOnListeners(final Function<UsageListener, Void> job) {
+        for (final UsageListener listener : listeners) {
+            listenerQueueSize.incrementAndGet();
+            listenerExecutor.execute(new Runnable() {
+                public void run() {
+                    try {
+                        job.apply(listener);
+                    } catch (RuntimeException e) {
+                        log.error("Problem notifying listener "+listener+" of "+job, e);
+                        Exceptions.propagateIfFatal(e);
+                    } finally {
+                        listenerQueueSize.decrementAndGet();
+                    }
+                }});
+        }
+    }
+    
     @Override
     public void recordApplicationEvent(final Application app, final Lifecycle state) {
         log.debug("Storing application lifecycle usage event: application {} in state {}", new Object[] {app, state});
@@ -121,17 +182,14 @@ public class LocalUsageManager implements UsageManager {
             usage.addEvent(event);        
             eventMap.put(app.getId(), usage);
             
-            for (final UsageListener listener : listeners) {
-                listenerExecutor.execute(new Runnable() {
-                    public void run() {
-                        try {
-                            listener.onApplicationEvent(app.getId(), app.getDisplayName(), app.getEntityType().getName(), ((EntityInternal)app).toMetadataRecord(), event);
-                        } catch (Exception e) {
-                            log.error("Problem notifying listener "+listener+" of applicationEvent("+app+", "+state+")", e);
-                            Exceptions.propagateIfFatal(e);
-                        }
+            execOnListeners(new Function<UsageListener, Void>() {
+                    public Void apply(UsageListener listener) {
+                        listener.onApplicationEvent(app.getId(), app.getDisplayName(), app.getEntityType().getName(), ((EntityInternal)app).toMetadataRecord(), event);
+                        return null;
+                    }
+                    public String toString() {
+                        return "applicationEvent("+app+", "+state+")";
                     }});
-            }
         }
     }
     
@@ -181,17 +239,14 @@ public class LocalUsageManager implements UsageManager {
                 usage.addEvent(event);
                 usageMap.put(loc.getId(), usage);
                 
-                for (final UsageListener listener : listeners) {
-                    listenerExecutor.execute(new Runnable() {
-                        public void run() {
-                            try {
-                                listener.onLocationEvent(loc.getId(), ((LocationInternal)loc).toMetadataRecord(), event);
-                            } catch (Exception e) {
-                                log.error("Problem notifying listener "+listener+" of locationEvent("+loc+", "+state+")", e);
-                                Exceptions.propagateIfFatal(e);
-                            }
+                execOnListeners(new Function<UsageListener, Void>() {
+                        public Void apply(UsageListener listener) {
+                            listener.onLocationEvent(loc.getId(), ((LocationInternal)loc).toMetadataRecord(), event);
+                            return null;
+                        }
+                        public String toString() {
+                            return "locationEvent("+loc+", "+state+")";
                         }});
-                }
             }
         } else {
             // normal for high-level locations

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1ffa72/core/src/main/java/brooklyn/management/internal/UsageManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/management/internal/UsageManager.java b/core/src/main/java/brooklyn/management/internal/UsageManager.java
index 56a5ace..450a73b 100644
--- a/core/src/main/java/brooklyn/management/internal/UsageManager.java
+++ b/core/src/main/java/brooklyn/management/internal/UsageManager.java
@@ -31,6 +31,7 @@ import brooklyn.management.usage.ApplicationUsage;
 import brooklyn.management.usage.ApplicationUsage.ApplicationEvent;
 import brooklyn.management.usage.LocationUsage;
 import brooklyn.management.usage.LocationUsage.LocationEvent;
+import brooklyn.util.time.Duration;
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Predicate;
@@ -45,6 +46,12 @@ public interface UsageManager {
             new TypeToken<List<UsageListener>>() {},
             "brooklyn.usageManager.listeners", "Optional usage listeners (i.e. for metering)",
             ImmutableList.<UsageListener>of());
+    
+    public static final ConfigKey<Duration> USAGE_LISTENER_TERMINATION_TIMEOUT = ConfigKeys.newConfigKey(
+            Duration.class,
+            "brooklyn.usageManager.listeners.timeout",
+            "Timeout on termination, to wait for queue of usage listener events to be processed",
+            Duration.TEN_SECONDS);
 
     public interface UsageListener {
         public static final UsageListener NOOP = new UsageListener() {