You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sj...@apache.org on 2016/01/13 11:42:49 UTC
[3/8] incubator-brooklyn git commit: BROOKLYN-214: fix cancelling of
AttributeWhenReady task
BROOKLYN-214: fix cancelling of AttributeWhenReady task
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/d5c07225
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/d5c07225
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/d5c07225
Branch: refs/heads/master
Commit: d5c072257250e1b54e08efe2ac31b4b1ff03a6e0
Parents: d058158
Author: Aled Sage <al...@gmail.com>
Authored: Tue Jan 12 13:24:43 2016 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Jan 12 13:24:43 2016 +0000
----------------------------------------------------------------------
.../spi/dsl/BrooklynDslDeferredSupplier.java | 40 ++++++-
.../DependentConfigPollingYamlTest.java | 117 +++++++++++++++++++
2 files changed, 155 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5c07225/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java b/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
index 65bf561..a417e32 100644
--- a/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
+++ b/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
@@ -18,7 +18,10 @@
*/
package org.apache.brooklyn.camp.brooklyn.spi.dsl;
+import java.io.IOException;
+import java.io.ObjectInputStream;
import java.io.Serializable;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.ExecutionContext;
@@ -55,6 +58,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
* and should not accessed until after the components / entities are created
* and are being started.
* (TODO the precise semantics of this are under development.)
+ *
+ * The threading model is that only one thread can call {@link #get()} at a time. An interruptible
+ * lock is obtained using {@link #lock} for the duration of that method. It is important to not
+ * use {@code synchronized} because that is not interruptible - if someone tries to get the value
+ * and interrupts after a short wait, then we must release the lock immediately and return.
* <p>
**/
public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier<T>, TaskFactory<Task<T>>, Serializable {
@@ -63,6 +71,15 @@ public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier
private static final Logger log = LoggerFactory.getLogger(BrooklynDslDeferredSupplier.class);
+ /**
+ * Lock to be used, rather than {@code synchronized} blocks, for anything long-running.
+ * Use {@link #getLock()} rather than this field directly, to ensure it is reinitialised
+ * after rebinding.
+ *
+ * @see https://issues.apache.org/jira/browse/BROOKLYN-214
+ */
+ private transient ReentrantLock lock;
+
// TODO json of this object should *be* this, not wrapped this ($brooklyn:literal is a bit of a hack, though it might work!)
@JsonInclude
@JsonProperty(value="$brooklyn:literal")
@@ -72,8 +89,9 @@ public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier
public BrooklynDslDeferredSupplier() {
PlanInterpretationNode sourceNode = BrooklynDslInterpreter.currentNode();
dsl = sourceNode!=null ? sourceNode.getOriginalValue() : null;
+ lock = new ReentrantLock();
}
-
+
/** returns the current entity; for use in implementations of {@link #get()} */
protected final static EntityInternal entity() {
return (EntityInternal) BrooklynTaskTags.getTargetOrContextEntity(Tasks.current());
@@ -88,7 +106,13 @@ public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier
}
@Override
- public final synchronized T get() {
+ public final T get() {
+ try {
+ getLock().lockInterruptibly();
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+
try {
if (log.isDebugEnabled())
log.debug("Queuing task to resolve "+dsl);
@@ -110,7 +134,19 @@ public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier
} catch (Exception e) {
throw Exceptions.propagate(e);
+ } finally {
+ getLock().unlock();
+ }
+ }
+
+ // Use this method, rather than the direct field, to ensure it is initialised after rebinding.
+ protected ReentrantLock getLock() {
+ synchronized (this) {
+ if (lock == null) {
+ lock = new ReentrantLock();
+ }
}
+ return lock;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5c07225/brooklyn-server/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/DependentConfigPollingYamlTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/DependentConfigPollingYamlTest.java b/brooklyn-server/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/DependentConfigPollingYamlTest.java
new file mode 100644
index 0000000..10df5f0
--- /dev/null
+++ b/brooklyn-server/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/DependentConfigPollingYamlTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.camp.brooklyn;
+
+import static org.testng.Assert.assertTrue;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+
+@Test
+public class DependentConfigPollingYamlTest extends AbstractYamlTest {
+ private static final Logger log = LoggerFactory.getLogger(DependentConfigPollingYamlTest.class);
+
+ private ExecutorService executor;
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ public void setUp() {
+ super.setUp();
+ executor = Executors.newCachedThreadPool();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void tearDown() {
+ if (executor != null) executor.shutdownNow();
+ super.tearDown();
+ }
+
+ // Test for BROOKLYN-214. Previously, the brief Tasks.resolving would cause a thread to be
+ // leaked. This was because it would call into BrooklynDslDeferredSupplier.get, which would
+ // wait on a synchronized block and thus not be interruptible - the thread would be consumed
+ // forever, until the attributeWhenReady returned true!
+ //
+ // Integration test, because takes several seconds.
+ @Test(groups="Integration")
+ public void testResolveAttributeWhenReadyWithTimeoutDoesNotLeaveThreadRunning() throws Exception {
+ String yaml = Joiner.on("\n").join(
+ "services:",
+ "- type: org.apache.brooklyn.core.test.entity.TestEntity",
+ " id: myentity",
+ " brooklyn.config:",
+ " test.confName: $brooklyn:entity(\"myentity\").attributeWhenReady(\"mysensor\")");
+
+ final Entity app = createAndStartApplication(yaml);
+ final TestEntity entity = (TestEntity) Iterables.getOnlyElement(app.getChildren());
+
+ // Cause a thread to block, getting the config - previousy (before fixing 214) this would be in
+ // the synchronized block if BrooklynDslDeferredSupplier.get().
+ // The sleep is to ensure we really did get into the locking code.
+ executor.submit(new Callable<Object>() {
+ public Object call() {
+ return entity.config().get(TestEntity.CONF_NAME);
+ }});
+ Thread.sleep(100);
+
+ // Try to resolve the value many times, each in its own task, but with a short timeout for each.
+ final int numIterations = 20;
+ final int preNumThreads = Thread.activeCount();
+
+ for (int i = 0; i < numIterations; i++) {
+ // Same as RestValueResolver.getImmediateValue
+ Tasks.resolving(entity.config().getRaw(TestEntity.CONF_NAME).get())
+ .as(Object.class)
+ .defaultValue("UNRESOLVED")
+ .timeout(Duration.millis(100))
+ .context(entity)
+ .swallowExceptions()
+ .get();
+ }
+
+ // Confirm we haven't left threads behind.
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ int postNumThreads = Thread.activeCount();
+ String msg = "pre="+preNumThreads+"; post="+postNumThreads+"; iterations="+numIterations;
+ log.info(msg);
+ assertTrue(postNumThreads < preNumThreads + (numIterations / 2), msg);
+ }});
+ }
+
+ @Override
+ protected Logger getLogger() {
+ return log;
+ }
+}