You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sj...@apache.org on 2015/11/19 15:36:31 UTC
[4/5] incubator-brooklyn git commit: Pollers set
cancelOnException=false when scheduling tasks
Pollers set cancelOnException=false when scheduling tasks
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/5832a15f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/5832a15f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/5832a15f
Branch: refs/heads/master
Commit: 5832a15f2a30ba81f3b5b104b17795a5520b06af
Parents: e0014b1
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Authored: Wed Nov 18 14:36:32 2015 +0000
Committer: Sam Corbett <sa...@cloudsoftcorp.com>
Committed: Thu Nov 19 14:01:54 2015 +0000
----------------------------------------------------------------------
.../org/apache/brooklyn/core/feed/Poller.java | 9 +-
.../apache/brooklyn/core/feed/PollerTest.java | 191 ++++++++++++-------
2 files changed, 125 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5832a15f/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
index d57f826..a9a34d9 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
@@ -19,6 +19,7 @@
package org.apache.brooklyn.core.feed;
import java.util.LinkedHashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -31,6 +32,7 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.task.DynamicSequentialTask;
import org.apache.brooklyn.util.core.task.ScheduledTask;
+import org.apache.brooklyn.util.core.task.TaskTags;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
@@ -155,8 +157,11 @@ public class Poller<V> {
return task;
}
};
- ScheduledTask task = new ScheduledTask(MutableMap.of("period", pollJob.pollPeriod, "displayName", "scheduled:"+scheduleName), pollingTaskFactory);
- tasks.add((ScheduledTask)Entities.submit(entity, task));
+ Map<String, ?> taskFlags = MutableMap.of("displayName", "scheduled:" + scheduleName);
+ ScheduledTask task = new ScheduledTask(taskFlags, pollingTaskFactory)
+ .period(pollJob.pollPeriod)
+ .cancelOnException(false);
+ tasks.add(Entities.submit(entity, task));
} else {
if (log.isDebugEnabled()) log.debug("Activating poll (but leaving off, as period {}) for {} (using {})", new Object[] {pollJob.pollPeriod, entity, this});
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5832a15f/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
index 0f2c1ce..6495f566 100644
--- a/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
@@ -18,91 +18,136 @@
*/
package org.apache.brooklyn.core.feed;
-import static org.testng.Assert.assertTrue;
-
+import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.core.feed.PollHandler;
-import org.apache.brooklyn.core.feed.Poller;
+import org.apache.brooklyn.api.entity.ImplementedBy;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.AbstractEntity;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
-import org.apache.brooklyn.core.test.entity.TestEntity;
-import org.apache.brooklyn.test.Asserts;
-import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.feed.function.FunctionFeed;
+import org.apache.brooklyn.feed.function.FunctionPollConfig;
import org.apache.brooklyn.util.core.task.DynamicTasks;
-import org.apache.brooklyn.util.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import com.google.common.base.Functions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
public class PollerTest extends BrooklynAppUnitTestSupport {
- private static final Logger LOG = LoggerFactory.getLogger(PollerTest.class);
-
- private TestEntity entity;
- private Poller<Integer> poller;
-
- @BeforeMethod(alwaysRun=true)
- @Override
- public void setUp() throws Exception {
- super.setUp();
- entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
- poller = new Poller<Integer>(entity, false);
+ @DataProvider(name = "specProvider")
+ public Object[][] specProvider() {
+ EntitySpec<FeedExceptionEntity> pollFailer = EntitySpec.create(FeedExceptionEntity.class)
+ .configure(FeedExceptionEntity.POLLER, new PollFailer());
+ EntitySpec<FeedExceptionEntity> taskFailer = EntitySpec.create(FeedExceptionEntity.class)
+ .configure(FeedExceptionEntity.POLLER, new TaskFailer());
+ return new Object[][]{{pollFailer}, {taskFailer}};
+ }
+
+ @Test(dataProvider = "specProvider")
+ public void testFeedContinuesWhenPollerThrows(EntitySpec<FeedExceptionEntity> spec) {
+ Map<?, ?> timeoutFlags = ImmutableMap.of("timeout", "100ms");
+ FeedExceptionEntity fee = app.createAndManageChild(spec);
+ app.start(ImmutableList.of(app.newSimulatedLocation()));
+ EntityAsserts.assertAttributeEqualsEventually(timeoutFlags, fee, FeedExceptionEntity.FLAG, true);
+
+ fee.startThrowingPollExceptions();
+ EntityAsserts.assertAttributeEqualsEventually(timeoutFlags, fee, FeedExceptionEntity.FLAG, false);
+ EntityAsserts.assertAttributeEqualsContinually(timeoutFlags, fee, FeedExceptionEntity.FLAG, false);
+
+ fee.stopThrowingPollExceptions();
+ EntityAsserts.assertAttributeEqualsEventually(timeoutFlags, fee, FeedExceptionEntity.FLAG, true);
+ EntityAsserts.assertAttributeEqualsContinually(timeoutFlags, fee, FeedExceptionEntity.FLAG, true);
}
-
- @AfterMethod(alwaysRun=true)
- @Override
- public void tearDown() throws Exception {
- if (poller != null) poller.stop();
- super.tearDown();
+
+ @ImplementedBy(FeedExceptionEntityImpl.class)
+ public static interface FeedExceptionEntity extends Entity {
+ ConfigKey<ThrowingPoller> POLLER = ConfigKeys.newConfigKey(ThrowingPoller.class, "poller");
+ AttributeSensor<Boolean> FLAG = Sensors.newBooleanSensor("flag");
+
+ void startThrowingPollExceptions();
+ void stopThrowingPollExceptions();
+ }
+
+ public static class FeedExceptionEntityImpl extends AbstractEntity implements FeedExceptionEntity {
+ private ThrowingPoller poller;
+
+ @Override
+ public void init() {
+ super.init();
+ poller = config().get(POLLER);
+ FunctionFeed.builder()
+ .entity(this)
+ .period(1L)
+ .poll(new FunctionPollConfig<Boolean, Boolean>(FLAG)
+ .callable(poller)
+ .onException(Functions.constant(false)))
+ .build();
+ }
+
+ public void startThrowingPollExceptions() {
+ this.poller.setShouldThrow(true);
+ }
+
+ public void stopThrowingPollExceptions() {
+ this.poller.setShouldThrow(false);
+ }
}
-
- @Test(groups={"Integration", "WIP"}) // because takes > 1 second
- public void testPollingSubTaskFailsOnceKeepsGoing() throws Exception {
- final AtomicInteger counter = new AtomicInteger();
- poller.scheduleAtFixedRate(
- new Callable<Integer>() {
- @Override public Integer call() throws Exception {
- int result = counter.incrementAndGet();
- if (result % 2 == 0) {
- DynamicTasks.queue("in-poll", new Runnable() {
- public void run() {
- throw new IllegalStateException("Simulating error in sub-task for poll");
- }});
+
+ private static class TaskFailer extends ThrowingPoller {
+ public Boolean execute(final boolean shouldThrow) {
+ Task<Boolean> t = Tasks.<Boolean>builder()
+ .body(new Callable<Boolean>() {
+ @Override
+ public Boolean call() {
+ if (shouldThrow) {
+ throw new IllegalArgumentException("exception in feed task");
+ }
+ return true;
}
- return result;
- }
- },
- new PollHandler<Integer>() {
- @Override public boolean checkSuccess(Integer val) {
- return true;
- }
- @Override public void onSuccess(Integer val) {
-
- }
- @Override public void onFailure(Integer val) {
- }
- @Override
- public void onException(Exception exception) {
- LOG.info("Exception in test poller", exception);
- }
- @Override public String getDescription() {
- return "mypollhandler";
- }
- },
- new Duration(10, TimeUnit.MILLISECONDS));
- poller.start();
-
- Asserts.succeedsContinually(MutableMap.of("timeout", 2*1000, "period", 500), new Runnable() {
- int oldCounter = -1;
- @Override public void run() {
- assertTrue(counter.get() > oldCounter);
- oldCounter = counter.get();
+ })
+ .build();
+ return DynamicTasks.queueIfPossible(t).orSubmitAsync().asTask().getUnchecked();
+ }
+ }
+
+ private static class PollFailer extends ThrowingPoller {
+ public Boolean execute(final boolean shouldThrow) {
+ if (shouldThrow) {
+ throw new IllegalArgumentException("exception in poller");
+ }
+ return true;
+ }
+ }
+
+ private static abstract class ThrowingPoller implements Callable<Boolean> {
+ protected final Object throwLock = new Object[0];
+ boolean shouldThrow = false;
+
+ abstract Boolean execute(boolean shouldThrow);
+
+ @Override
+ public Boolean call() throws Exception {
+ synchronized (throwLock) {
+ return execute(shouldThrow);
}
- });
+ }
+
+ public void setShouldThrow(boolean shouldThrow) {
+ synchronized (throwLock) {
+ this.shouldThrow = shouldThrow;
+ }
+ }
}
+
}