You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/11/29 13:40:19 UTC

[05/13] brooklyn-server git commit: ServiceFailureDetector yaml tests, and fix

ServiceFailureDetector yaml tests, and fix


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/c856cd15
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/c856cd15
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/c856cd15

Branch: refs/heads/master
Commit: c856cd15112c046ae5f0c8b578b84a213b51bcc0
Parents: bcb9689
Author: Aled Sage <al...@gmail.com>
Authored: Wed Nov 16 09:51:43 2016 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Mon Nov 28 21:11:48 2016 +0000

----------------------------------------------------------------------
 .../ServiceFailureDetectorYamlTest.java         | 189 +++++++++++++++++++
 .../core/sensor/SensorEventPredicates.java      |  81 ++++++++
 .../entity/RecordingSensorEventListener.java    |  23 +++
 .../policy/ha/ServiceFailureDetector.java       |  63 ++++---
 4 files changed, 329 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c856cd15/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ServiceFailureDetectorYamlTest.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ServiceFailureDetectorYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ServiceFailureDetectorYamlTest.java
new file mode 100644
index 0000000..4374c2b
--- /dev/null
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ServiceFailureDetectorYamlTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.camp.brooklyn;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+import java.util.Map;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.sensor.Enricher;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.entity.RecordingSensorEventListener;
+import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceNotUpLogic;
+import org.apache.brooklyn.core.sensor.SensorEventPredicates;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.policy.ha.HASensors;
+import org.apache.brooklyn.policy.ha.ServiceFailureDetector;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+@Test
+public class ServiceFailureDetectorYamlTest extends AbstractYamlTest {
+    
+    @SuppressWarnings("unused")
+    private static final Logger log = LoggerFactory.getLogger(ServiceFailureDetectorYamlTest.class);
+    
+    static final String INDICATOR_KEY_1 = "test-indicator-1";
+
+    static final String appId = "my-app";
+    static final String appVersion = "1.2.3";
+    static final String appVersionedId = appId + ":" + appVersion;
+    
+    static final String catalogYamlSimple = Joiner.on("\n").join(
+            "brooklyn.catalog:",
+            "  id: " + appId,
+            "  version: " + appVersion,
+            "  itemType: entity",
+            "  item:",
+            "    type: " + TestEntity.class.getName(),
+            "    brooklyn.enrichers:",
+            "    - type: " + ServiceFailureDetector.class.getName());
+
+    static final String catalogYamlWithDsl = Joiner.on("\n").join(
+            "brooklyn.catalog:",
+            "  id: my-app",
+            "  version: 1.2.3",
+            "  itemType: entity",
+            "  item:",
+            "    services:",
+            "    - type: " + TestEntity.class.getName(),//FailingEntity.class.getName(),
+            "      brooklyn.parameters:",
+            "      - name: custom.stabilizationDelay",
+            "        type: " + Duration.class.getName(),
+            "        default: 1ms",
+            "      - name: custom.republishTime",
+            "        type: " + Duration.class.getName(),
+            "        default: 1m",
+            "      brooklyn.enrichers:",
+            "      - type: " + ServiceFailureDetector.class.getName(),
+            "        brooklyn.config:",
+            "          serviceOnFire.stabilizationDelay: $brooklyn:config(\"custom.stabilizationDelay\")",
+            "          entityFailed.stabilizationDelay: $brooklyn:config(\"custom.stabilizationDelay\")",
+            "          entityRecovered.stabilizationDelay: $brooklyn:config(\"custom.stabilizationDelay\")",
+            "          entityFailed.republishTime: $brooklyn:config(\"custom.republishTime\")");
+
+    static final String catalogYamlWithDslReferenceParentDefault = Joiner.on("\n").join(
+            "brooklyn.catalog:",
+            "  id: my-app",
+            "  version: 1.2.3",
+            "  itemType: entity",
+            "  item:",
+            "    brooklyn.parameters:",
+            "    - name: custom.stabilizationDelay",
+            "      type: " + Duration.class.getName(),
+            "      default: 1ms",
+            "    - name: custom.republishTime",
+            "      type: " + Duration.class.getName(),
+            "      default: 1m",
+            "    services:",
+            "    - type: " + TestEntity.class.getName(),//FailingEntity.class.getName(),
+            "      brooklyn.enrichers:",
+            "      - type: " + ServiceFailureDetector.class.getName(),
+            "        brooklyn.config:",
+            "          serviceOnFire.stabilizationDelay: $brooklyn:config(\"custom.stabilizationDelay\")",
+            "          entityFailed.stabilizationDelay: $brooklyn:config(\"custom.stabilizationDelay\")",
+            "          entityRecovered.stabilizationDelay: $brooklyn:config(\"custom.stabilizationDelay\")",
+            "          entityFailed.republishTime: $brooklyn:config(\"custom.republishTime\")");
+
+    @Test
+    public void testDefaults() throws Exception {
+        runTest(catalogYamlSimple, appVersionedId);
+    }
+    
+    @Test
+    public void testWithDslConfig() throws Exception {
+        Entity app = runTest(catalogYamlWithDsl, appVersionedId);
+        
+        TestEntity newEntity = (TestEntity) Iterables.getOnlyElement(app.getChildren());
+        ServiceFailureDetector newEnricher = assertHasEnricher(newEntity, ServiceFailureDetector.class);
+        assertEnricherConfigMatchesDsl(newEnricher);
+    }
+
+    @Test
+    public void testWithDslConfigReferenceParentDefault() throws Exception {
+        Entity app = runTest(catalogYamlWithDslReferenceParentDefault, appVersionedId);
+        
+        TestEntity newEntity = (TestEntity) Iterables.getOnlyElement(app.getChildren());
+        ServiceFailureDetector newEnricher = assertHasEnricher(newEntity, ServiceFailureDetector.class);
+        assertEnricherConfigMatchesDsl(newEnricher);
+    }
+    
+    protected Entity runTest(String catalogYaml, String appId) throws Exception {
+        addCatalogItems(catalogYaml);
+
+        String appYaml = Joiner.on("\n").join(
+                "services:",
+                "- type: " + appId);
+        Entity app = createStartWaitAndLogApplication(appYaml);
+        TestEntity entity = (TestEntity) Iterables.getOnlyElement(app.getChildren());
+        assertHasEnricher(entity, ServiceFailureDetector.class);
+        
+        // Confirm ServiceFailureDetector triggers event
+        RecordingSensorEventListener<Object> listener = subscribeToHaSensors(entity);
+        
+        ServiceNotUpLogic.updateNotUpIndicator(entity, INDICATOR_KEY_1, "Simulate a problem");
+        listener.assertHasEventEventually(SensorEventPredicates.sensorEqualTo(HASensors.ENTITY_FAILED));
+        listener.assertEventCount(1);
+        listener.clearEvents();
+        
+        ServiceNotUpLogic.clearNotUpIndicator(entity, INDICATOR_KEY_1);
+        listener.assertHasEventEventually(SensorEventPredicates.sensorEqualTo(HASensors.ENTITY_RECOVERED));
+        listener.assertEventCount(1);
+        
+        return app;
+    }
+    
+    protected static <T extends Enricher> T assertHasEnricher(Entity entity, Class<T> enricherClazz) {
+        Enricher enricher = Iterables.find(entity.enrichers(), Predicates.instanceOf(enricherClazz));
+        assertNotNull(enricher);
+        return enricherClazz.cast(enricher);
+    }
+    
+    protected static void assertEnricherConfig(Enricher enricher, Map<? extends ConfigKey<?>, ?> expectedVals) {
+        for (Map.Entry<? extends ConfigKey<?>, ?> entry : expectedVals.entrySet()) {
+            ConfigKey<?> key = entry.getKey();
+            Object actual = enricher.config().get(key);
+            assertEquals(actual, entry.getValue(), "key="+key+"; expected="+entry.getValue()+"; actual="+actual);
+        }
+    }
+    
+    protected static void assertEnricherConfigMatchesDsl(Enricher enricher) {
+        assertEnricherConfig(enricher, ImmutableMap.<ConfigKey<?>, Object>of(
+                ServiceFailureDetector.ENTITY_FAILED_STABILIZATION_DELAY, Duration.ONE_MILLISECOND,
+                ServiceFailureDetector.SERVICE_ON_FIRE_STABILIZATION_DELAY, Duration.ONE_MILLISECOND,
+                ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.ONE_MILLISECOND,
+                ServiceFailureDetector.ENTITY_FAILED_REPUBLISH_TIME, Duration.ONE_MINUTE));
+    }
+
+    protected static RecordingSensorEventListener<Object> subscribeToHaSensors(Entity entity) {
+        RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>();
+        entity.subscriptions().subscribe(entity, HASensors.ENTITY_RECOVERED, listener);
+        entity.subscriptions().subscribe(entity, HASensors.ENTITY_FAILED, listener);
+        return listener;
+    }
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c856cd15/core/src/main/java/org/apache/brooklyn/core/sensor/SensorEventPredicates.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/SensorEventPredicates.java b/core/src/main/java/org/apache/brooklyn/core/sensor/SensorEventPredicates.java
new file mode 100644
index 0000000..00eb996
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/SensorEventPredicates.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.sensor;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.util.guava.SerializablePredicate;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+
+public class SensorEventPredicates {
+
+    public static Predicate<SensorEvent<?>> sensorEqualTo(final Sensor<?> val) {
+        return sensorSatisfies(Predicates.<Sensor<?>>equalTo(val));
+    }
+    
+    public static Predicate<SensorEvent<?>> sensorSatisfies(final Predicate<? super Sensor<?>> condition) {
+        return new SensorSatisfies(condition);
+    }
+    
+    protected static class SensorSatisfies implements SerializablePredicate<SensorEvent<?>> {
+        private static final long serialVersionUID = -3585200249520308941L;
+        
+        protected final Predicate<? super Sensor<?>> condition;
+        protected SensorSatisfies(Predicate<? super Sensor<?>> condition) {
+            this.condition = condition;
+        }
+        @Override
+        public boolean apply(@Nullable SensorEvent<?> input) {
+            return (input != null) && condition.apply(input.getSensor());
+        }
+        @Override
+        public String toString() {
+            return "sensorSatisfies("+condition+")";
+        }
+    }
+
+    public static <T> Predicate<SensorEvent<T>> valueEqualTo(final T val) {
+        return valueSatisfies(Predicates.equalTo(val));
+    }
+    
+    public static <T> Predicate<SensorEvent<T>> valueSatisfies(final Predicate<? super T> condition) {
+        return new ValueSatisfies<T>(condition);
+    }
+    
+    protected static class ValueSatisfies<T> implements SerializablePredicate<SensorEvent<T>> {
+        private static final long serialVersionUID = 2805443606039228221L;
+        
+        protected final Predicate<? super T> condition;
+        protected ValueSatisfies(Predicate<? super T> condition) {
+            this.condition = condition;
+        }
+        @Override
+        public boolean apply(@Nullable SensorEvent<T> input) {
+            return (input != null) && condition.apply(input.getValue());
+        }
+        @Override
+        public String toString() {
+            return "valueSatisfies("+condition+")";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c856cd15/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java b/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
index e0c4ed0..c54a0fb 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
@@ -28,10 +28,13 @@ import java.util.Objects;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.core.task.Tasks;
+import org.testng.Assert;
 
 import com.google.common.base.Function;
+import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -98,6 +101,26 @@ public class RecordingSensorEventListener<T> implements SensorEventListener<T>,
                 .transform(new GetValueFunction<T>());
     }
 
+    public void assertHasEvent(Predicate<? super SensorEvent<T>> filter) {
+        for (SensorEvent<T> event : events) {
+            if (filter.apply(event)) {
+                return;
+            }
+        }
+        Assert.fail("No event matching filter "+ filter);
+    }
+
+    public void assertHasEventEventually(final Predicate<? super SensorEvent<T>> filter) {
+        Asserts.succeedsEventually(new Runnable() {
+            public void run() {
+                assertHasEvent(filter);
+            }});
+    }
+    
+    public void assertEventCount(int expected) {
+        Assert.assertEquals(events.size(), expected, "events="+events);
+    }
+
     /**
      * Clears all events recorded by the listener.
      */

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c856cd15/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
index d36f25a..5c5aaeb 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
@@ -301,37 +301,46 @@ public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceStat
         return description;
     }
     
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @SuppressWarnings({ "rawtypes" })
     protected void recomputeAfterDelay(long delay) {
-        if (isRunning() && executorQueued.compareAndSet(false, true)) {
-            long now = System.currentTimeMillis();
-            delay = Math.max(0, Math.max(delay, (executorTime + MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now));
-            if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in {}ms", this, delay);
-            
-            Runnable job = new Runnable() {
-                @Override public void run() {
-                    try {
-                        executorTime = System.currentTimeMillis();
-                        executorQueued.set(false);
+        // TODO Execute in same thread as other onEvent calls are done in (i.e. same conceptually 
+        // single-threaded executor as the subscription-manager will use).
+        //
+        // TODO Disabling the use of executorQueued check - it was causing assertions to fail that 
+        // we'd triggered the ENTITY_FAILED/ENTITY_RECOVERED. Previously used:
+        //    if (executorQueued.compareAndSet(false, true)) {
+        // My guess is that the next call to onEvent() didn't always call recomputeAfterDelay with
+        // the recalculated desired delay, as desired by the skipped call. But not sure why.
+        
+        if (!isRunning()) return;
+
+        long now = System.currentTimeMillis();
+        delay = Math.max(0, Math.max(delay, (executorTime + MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now));
+        if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in {}ms", this, delay);
+        
+        Runnable job = new Runnable() {
+            @Override public void run() {
+                try {
+                    executorTime = System.currentTimeMillis();
+                    executorQueued.set(false);
 
-                        onEvent(null);
-                        
-                    } catch (Exception e) {
-                        if (isRunning()) {
-                            LOG.error("Error in enricher "+this+": "+e, e);
-                        } else {
-                            if (LOG.isDebugEnabled()) LOG.debug("Error in enricher "+this+" (but no longer running): "+e, e);
-                        }
-                    } catch (Throwable t) {
-                        LOG.error("Error in enricher "+this+": "+t, t);
-                        throw Exceptions.propagate(t);
+                    onEvent(null);
+                    
+                } catch (Exception e) {
+                    if (isRunning()) {
+                        LOG.error("Error in enricher "+this+": "+e, e);
+                    } else {
+                        if (LOG.isDebugEnabled()) LOG.debug("Error in enricher "+this+" (but no longer running): "+e, e);
                     }
+                } catch (Throwable t) {
+                    LOG.error("Error in enricher "+this+": "+t, t);
+                    throw Exceptions.propagate(t);
                 }
-            };
-            
-            ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask(job));
-            ((EntityInternal)entity).getExecutionContext().submit(task);
-        }
+            }
+        };
+        
+        ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask(job));
+        ((EntityInternal)entity).getExecutionContext().submit(task);
     }
     
     private String getTimeStringSince(Long time) {