You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2017/02/18 01:00:11 UTC

[1/3] brooklyn-server git commit: better presentation for sensor publishing tasks

Repository: brooklyn-server
Updated Branches:
  refs/heads/master 110482861 -> ebcea30f9


better presentation for sensor publishing tasks

previously they didn't even have a name; now they have a nice name, description, and a tag.
there is some attempt to optimize the use of toString so it isn't hugely computationally expensive,
although this will increase expense a bit; i tend to think it's worth it for increased visibility.
(if we are publishing vast numbers of sensor events maybe we are doing something wrong, and if this
is identified as the bottleneck we can parameterise it, and in any case when we come to be multi-host
we'll need to revisit how we do this as currently we're thinking a good datastore is good enough for
the intended volumes, as opposed to a dedicated message bus.)


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/2c2f3fcd
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/2c2f3fcd
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/2c2f3fcd

Branch: refs/heads/master
Commit: 2c2f3fcdc456358dff108a62e0cd5ce09783048c
Parents: 1104828
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Thu Feb 16 10:31:50 2017 +0000
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Thu Feb 16 10:31:50 2017 +0000

----------------------------------------------------------------------
 .../brooklyn/core/mgmt/BrooklynTaskTags.java    |   2 +
 .../mgmt/internal/LocalSubscriptionManager.java | 144 +++++++++++--------
 2 files changed, 83 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2c2f3fcd/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
index 39b2f70..2280be7 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
@@ -71,6 +71,8 @@ public class BrooklynTaskTags extends TaskTags {
     public static final String BROOKLYN_SERVER_TASK_TAG = "BROOKLYN-SERVER";
     /** Tag for a task which represents an effector */
     public static final String EFFECTOR_TAG = "EFFECTOR";
+    /** Tag for a task which represents a sensor being published */
+    public static final String SENSOR_TAG = "SENSOR";
     /** Tag for a task which *is* interesting, in contrast to {@link #TRANSIENT_TASK_TAG} */
     public static final String NON_TRANSIENT_TASK_TAG = "NON-TRANSIENT";
     /** indicates a task is transient, roughly that is to say it is uninteresting -- 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2c2f3fcd/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index 9de2d3b..7039ee2 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -42,6 +42,7 @@ import org.apache.brooklyn.api.sensor.Sensor;
 import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.sensor.BasicSensorEvent;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
@@ -86,10 +87,13 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
         return allSubscriptions.size();
     }
 
+    /** The total number of sensor change events generated (irrespective of number subscribers, see {@link #getTotalEventsDelivered()}) */
     public long getTotalEventsPublished() {
         return totalEventsPublishedCount.get();
     }
     
+    /** The total number of sensor change events submitted for delivery, counting multiple deliveries for multipe subscribers (see {@link #getTotalEventsPublished()}),
+     * but excluding initial notifications, and incremented when submitted ie prior to delivery */
     public long getTotalEventsDelivered() {
         return totalEventsDeliveredCount.get();
     }
@@ -111,7 +115,7 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
             s.subscriberExecutionManagerTagSupplied = true;
         } else {
             s.subscriberExecutionManagerTag = 
-                s.subscriber instanceof Entity ? "subscription-delivery-entity-"+((Entity)s.subscriber).getId()+"["+s.subscriber+"]" : 
+                s.subscriber instanceof Entity ? "subscription-delivery-entity-"+((Entity)s.subscriber).getId() : 
                 s.subscriber instanceof String ? "subscription-delivery-string["+s.subscriber+"]" : 
                 "subscription-delivery-object["+s.subscriber+"]";
             s.subscriberExecutionManagerTagSupplied = false;
@@ -139,37 +143,8 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
                 LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute sensor: "+s);
             } else {
                 if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
-                List<Object> tags = MutableList.builder()
-                        .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
-                        .add(s.subscriberExecutionManagerTag)
-                        .build()
-                        .asUnmodifiable();
-                Map<String, ?> execFlags = MutableMap.of("tags", tags);
-                em.submit(execFlags, new Runnable() {
-                    @Override
-                    public String toString() {
-                        return "LSM.publishInitialValue("+s.producer+", "+s.sensor+")";
-                    }
-                    @Override
-                    public void run() {
-                        Object val = s.producer.getAttribute((AttributeSensor<?>) s.sensor);
-                        @SuppressWarnings("rawtypes") // TODO s.listener.onEvent gives compilation error if try to use <T>
-                        SensorEvent event = new BasicSensorEvent(s.sensor, s.producer, val);
-                        if (s.eventFilter!=null && !s.eventFilter.apply(event))
-                            return;
-                        try {
-                            int count = s.eventCount.incrementAndGet();
-                            if (count > 0 && count % 1000 == 0) LOG.debug("{} events for subscriber {}", count, s);
-                            
-                            s.listener.onEvent(event);
-                        } catch (Throwable t) {
-                            if (event!=null && event.getSource()!=null && Entities.isNoLongerManaged(event.getSource())) {
-                                LOG.debug("Error processing initial-value subscription to "+LocalSubscriptionManager.this+", after entity unmanaged: "+t, t);
-                            } else {
-                                LOG.warn("Error processing initial-value subscription to "+LocalSubscriptionManager.this+": "+t, t);
-                            }
-                        }
-                    }});
+                T val = (T) s.producer.getAttribute((AttributeSensor<?>) s.sensor);
+                submitPublishEvent(s, new BasicSensorEvent<T>(s.sensor, s.producer, val), true);
             }
         }
         
@@ -237,42 +212,85 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
         if (groovyTruth(subs)) {
             if (LOG.isTraceEnabled()) LOG.trace("sending {}, {} to {}", new Object[] {event.getSensor().getName(), event, join(subs, ",")});
             for (Subscription s : subs) {
-                if (s.eventFilter!=null && !s.eventFilter.apply(event))
-                    continue;
-                final Subscription sAtClosureCreation = s;
-                
-                List<Object> tags = MutableList.builder()
-                        .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
-                        .add(s.subscriberExecutionManagerTag)
-                        .build()
-                        .asUnmodifiable();
-                Map<String, ?> execFlags = MutableMap.of("tags", tags);
-                
-                em.submit(execFlags, new Runnable() {
-                    @Override
-                    public String toString() {
-                        return "LSM.publish("+event+")";
-                    }
-                    @Override
-                    public void run() {
-                        try {
-                            int count = sAtClosureCreation.eventCount.incrementAndGet();
-                            if (count > 0 && count % 1000 == 0) LOG.debug("{} events for subscriber {}", count, sAtClosureCreation);
-                            
-                            sAtClosureCreation.listener.onEvent(event);
-                        } catch (Throwable t) {
-                            if (event!=null && event.getSource()!=null && Entities.isNoLongerManaged(event.getSource())) {
-                                LOG.debug("Error processing subscriptions to "+this+", after entity unmanaged: "+t, t);
-                            } else {
-                                LOG.warn("Error processing subscriptions to "+this+": "+t, t);
-                            }
-                        }
-                    }});
+                submitPublishEvent(s, event, false);
+                // excludes initial so only do it here
                 totalEventsDeliveredCount.incrementAndGet();
             }
         }
     }
     
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private void submitPublishEvent(final Subscription s, final SensorEvent<?> event, final boolean isInitial) {
+        if (s.eventFilter!=null && !s.eventFilter.apply(event))
+            return;
+        
+        List<Object> tags = MutableList.builder()
+            .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
+            .add(s.subscriberExecutionManagerTag)
+            .add(BrooklynTaskTags.SENSOR_TAG)
+            .build()
+            .asUnmodifiable();
+        
+        StringBuilder name = new StringBuilder("sensor ");
+        StringBuilder description = new StringBuilder("Sensor ");
+        String sensorName = s.sensor==null ? null : s.sensor.getName();
+        String sourceName = event.getSource()==null ? null : event.getSource().getId();
+        name.append(sourceName);
+        name.append(":");
+        name.append(sensorName);
+        
+        description.append(sensorName);
+        description.append(" on ");
+        description.append(sourceName);
+        description.append(" publishing to ");
+        description.append(s.subscriber instanceof Entity ? ((Entity)s.subscriber).getId() : s.subscriber);
+        
+        if (includeDescriptionForSensorTask(event)) {
+            name.append(" ");
+            name.append(event.getValue());
+            description.append(", value: ");
+            description.append(event.getValue());
+        }
+        Map<String, Object> execFlags = MutableMap.of("tags", tags, 
+            "displayName", name.toString(),
+            "description", description.toString());
+        
+        em.submit(execFlags, new Runnable() {
+            @Override
+            public String toString() {
+                if (isInitial) {
+                    return "LSM.publishInitial("+event+")";
+                } else {
+                    return "LSM.publish("+event+")";
+                }
+            }
+            @Override
+            public void run() {
+                try {
+                    int count = s.eventCount.incrementAndGet();
+                    if (count > 0 && count % 1000 == 0) LOG.debug("{} events for subscriber {}", count, s);
+                    
+                    s.listener.onEvent(event);
+                } catch (Throwable t) {
+                    if (event!=null && event.getSource()!=null && Entities.isNoLongerManaged(event.getSource())) {
+                        LOG.debug("Error processing subscriptions to "+this+", after entity unmanaged: "+t, t);
+                    } else {
+                        LOG.warn("Error processing subscriptions to "+this+": "+t, t);
+                    }
+                }
+            }});
+    }
+    
+    protected boolean includeDescriptionForSensorTask(SensorEvent<?> event) {
+        // just do it for simple/quick things to avoid expensive toStrings
+        // (info is rarely useful, but occasionally it will be)
+        if (event.getValue()==null) return true;
+        Class<?> clazz = event.getValue().getClass();
+        if (clazz.isEnum() || clazz.isPrimitive() || Number.class.isAssignableFrom(clazz) || 
+            clazz.equals(String.class)) return true;
+        return false;
+    }
+
     @Override
     public String toString() {
         return tostring;


[3/3] brooklyn-server git commit: This closes #561

Posted by he...@apache.org.
This closes #561


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/ebcea30f
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/ebcea30f
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/ebcea30f

Branch: refs/heads/master
Commit: ebcea30f9b47657ca5a852f7a4368b37cfa57076
Parents: 1104828 870a4f8
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Sat Feb 18 00:59:55 2017 +0000
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Sat Feb 18 00:59:55 2017 +0000

----------------------------------------------------------------------
 .../brooklyn/core/mgmt/BrooklynTaskTags.java    |   2 +
 .../mgmt/internal/LocalSubscriptionManager.java | 147 +++++++++++--------
 2 files changed, 86 insertions(+), 63 deletions(-)
----------------------------------------------------------------------



[2/3] brooklyn-server git commit: address PR comments

Posted by he...@apache.org.
address PR comments


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/870a4f85
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/870a4f85
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/870a4f85

Branch: refs/heads/master
Commit: 870a4f853b79520c3019b8adb7279c005a4fc27a
Parents: 2c2f3fc
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Sat Feb 18 00:59:37 2017 +0000
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Sat Feb 18 00:59:37 2017 +0000

----------------------------------------------------------------------
 .../core/mgmt/internal/LocalSubscriptionManager.java     | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/870a4f85/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index 7039ee2..2349c73 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -49,6 +49,7 @@ import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.task.BasicExecutionManager;
 import org.apache.brooklyn.util.core.task.SingleThreadedScheduler;
 import org.apache.brooklyn.util.text.Identifiers;
+import org.apache.brooklyn.util.text.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -233,15 +234,17 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
         
         StringBuilder name = new StringBuilder("sensor ");
         StringBuilder description = new StringBuilder("Sensor ");
-        String sensorName = s.sensor==null ? null : s.sensor.getName();
+        String sensorName = s.sensor==null ? "<null-sensor>" : s.sensor.getName();
         String sourceName = event.getSource()==null ? null : event.getSource().getId();
-        name.append(sourceName);
-        name.append(":");
+        if (Strings.isNonBlank(sourceName)) {
+            name.append(sourceName);
+            name.append(":");
+        }
         name.append(sensorName);
         
         description.append(sensorName);
         description.append(" on ");
-        description.append(sourceName);
+        description.append(sourceName==null ? "<null-source>" : sourceName);
         description.append(" publishing to ");
         description.append(s.subscriber instanceof Entity ? ((Entity)s.subscriber).getId() : s.subscriber);