You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sj...@apache.org on 2016/01/13 11:42:49 UTC

[3/8] incubator-brooklyn git commit: BROOKLYN-214: fix cancelling of AttributeWhenReady task

BROOKLYN-214: fix cancelling of AttributeWhenReady task

Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/d5c07225
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/d5c07225
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/d5c07225

Branch: refs/heads/master
Commit: d5c072257250e1b54e08efe2ac31b4b1ff03a6e0
Parents: d058158
Author: Aled Sage <al...@gmail.com>
Authored: Tue Jan 12 13:24:43 2016 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Jan 12 13:24:43 2016 +0000

----------------------------------------------------------------------
 .../spi/dsl/BrooklynDslDeferredSupplier.java    |  40 ++++++-
 .../DependentConfigPollingYamlTest.java         | 117 +++++++++++++++++++
 2 files changed, 155 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5c07225/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java b/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
index 65bf561..a417e32 100644
--- a/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
+++ b/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
@@ -18,7 +18,10 @@
  */
 package org.apache.brooklyn.camp.brooklyn.spi.dsl;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.ExecutionContext;
@@ -55,6 +58,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
  * and should not accessed until after the components / entities are created 
  * and are being started.
  * (TODO the precise semantics of this are under development.)
+ * 
+ * The threading model is that only one thread can call {@link #get()} at a time. An interruptible
+ * lock is obtained using {@link #lock} for the duration of that method. It is important to not
+ * use {@code synchronized} because that is not interruptible - if someone tries to get the value
+ * and interrupts after a short wait, then we must release the lock immediately and return.
  * <p>
  **/
 public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier<T>, TaskFactory<Task<T>>, Serializable {
@@ -63,6 +71,15 @@ public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier
 
     private static final Logger log = LoggerFactory.getLogger(BrooklynDslDeferredSupplier.class);
 
+    /**
+     * Lock to be used, rather than {@code synchronized} blocks, for anything long-running.
+     * Use {@link #getLock()} rather than this field directly, to ensure it is reinitialised 
+     * after rebinding.
+     * 
+     * @see https://issues.apache.org/jira/browse/BROOKLYN-214
+     */
+    private transient ReentrantLock lock;
+    
     // TODO json of this object should *be* this, not wrapped this ($brooklyn:literal is a bit of a hack, though it might work!)
     @JsonInclude
     @JsonProperty(value="$brooklyn:literal")
@@ -72,8 +89,9 @@ public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier
     public BrooklynDslDeferredSupplier() {
         PlanInterpretationNode sourceNode = BrooklynDslInterpreter.currentNode();
         dsl = sourceNode!=null ? sourceNode.getOriginalValue() : null;
+        lock = new ReentrantLock();
     }
-
+    
     /** returns the current entity; for use in implementations of {@link #get()} */
     protected final static EntityInternal entity() {
         return (EntityInternal) BrooklynTaskTags.getTargetOrContextEntity(Tasks.current());
@@ -88,7 +106,13 @@ public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier
     }
 
     @Override
-    public final synchronized T get() {
+    public final T get() {
+        try {
+            getLock().lockInterruptibly();
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        }
+        
         try {
             if (log.isDebugEnabled())
                 log.debug("Queuing task to resolve "+dsl);
@@ -110,7 +134,19 @@ public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier
 
         } catch (Exception e) {
             throw Exceptions.propagate(e);
+        } finally {
+            getLock().unlock();
+        }
+    }
+
+    // Use this method, rather than the direct field, to ensure it is initialised after rebinding.
+    protected ReentrantLock getLock() {
+        synchronized (this) {
+            if (lock == null) {
+                lock = new ReentrantLock();
+            }
         }
+        return lock;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5c07225/brooklyn-server/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/DependentConfigPollingYamlTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/DependentConfigPollingYamlTest.java b/brooklyn-server/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/DependentConfigPollingYamlTest.java
new file mode 100644
index 0000000..10df5f0
--- /dev/null
+++ b/brooklyn-server/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/DependentConfigPollingYamlTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.camp.brooklyn;
+
+import static org.testng.Assert.assertTrue;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+
+@Test
+public class DependentConfigPollingYamlTest extends AbstractYamlTest {
+    private static final Logger log = LoggerFactory.getLogger(DependentConfigPollingYamlTest.class);
+    
+    private ExecutorService executor;
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    public void setUp() {
+        super.setUp();
+        executor = Executors.newCachedThreadPool();
+    }
+            
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void tearDown() {
+        if (executor != null) executor.shutdownNow();
+        super.tearDown();
+    }
+            
+    // Test for BROOKLYN-214. Previously, the brief Tasks.resolving would cause a thread to be
+    // leaked. This was because it would call into BrooklynDslDeferredSupplier.get, which would
+    // wait on a synchronized block and thus not be interruptible - the thread would be consumed
+    // forever, until the attributeWhenReady returned true!
+    //
+    // Integration test, because takes several seconds.
+    @Test(groups="Integration")
+    public void testResolveAttributeWhenReadyWithTimeoutDoesNotLeaveThreadRunning() throws Exception {
+        String yaml = Joiner.on("\n").join(
+                "services:",
+                "- type: org.apache.brooklyn.core.test.entity.TestEntity",
+                "  id: myentity",
+                "  brooklyn.config:",
+                "    test.confName: $brooklyn:entity(\"myentity\").attributeWhenReady(\"mysensor\")");
+        
+        final Entity app = createAndStartApplication(yaml);
+        final TestEntity entity = (TestEntity) Iterables.getOnlyElement(app.getChildren());
+
+        // Cause a thread to block, getting the config - previousy (before fixing 214) this would be in
+        // the synchronized block if BrooklynDslDeferredSupplier.get().
+        // The sleep is to ensure we really did get into the locking code.
+        executor.submit(new Callable<Object>() {
+            public Object call() {
+                return entity.config().get(TestEntity.CONF_NAME);
+            }});
+        Thread.sleep(100);
+        
+        // Try to resolve the value many times, each in its own task, but with a short timeout for each.
+        final int numIterations = 20;
+        final int preNumThreads = Thread.activeCount();
+        
+        for (int i = 0; i < numIterations; i++) {
+            // Same as RestValueResolver.getImmediateValue
+            Tasks.resolving(entity.config().getRaw(TestEntity.CONF_NAME).get())
+                    .as(Object.class)
+                    .defaultValue("UNRESOLVED")
+                    .timeout(Duration.millis(100))
+                    .context(entity)
+                    .swallowExceptions()
+                    .get();
+        }
+
+        // Confirm we haven't left threads behind.
+        Asserts.succeedsEventually(new Runnable() {
+            public void run() {
+                int postNumThreads = Thread.activeCount();
+                String msg = "pre="+preNumThreads+"; post="+postNumThreads+"; iterations="+numIterations;
+                log.info(msg);
+                assertTrue(postNumThreads < preNumThreads + (numIterations / 2), msg);
+            }});
+    }
+
+    @Override
+    protected Logger getLogger() {
+        return log;
+    }
+}