You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/20 00:54:11 UTC
[23/36] incubator-brooklyn git commit: Rename o.a.b.sensor.feed to
o.a.b.feed and o.a.b.core.feed
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/windows/WindowsPerformanceCounterFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/windows/WindowsPerformanceCounterFeed.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/windows/WindowsPerformanceCounterFeed.java
deleted file mode 100644
index e9767d9..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/windows/WindowsPerformanceCounterFeed.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed.windows;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import io.cloudsoft.winrm4j.winrm.WinRmToolResponse;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.api.mgmt.ExecutionContext;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.effector.EffectorTasks;
-import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.sensor.Sensors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.location.winrm.WinRmMachineLocation;
-import org.apache.brooklyn.sensor.feed.AbstractFeed;
-import org.apache.brooklyn.sensor.feed.PollHandler;
-import org.apache.brooklyn.sensor.feed.Poller;
-import org.apache.brooklyn.util.core.flags.TypeCoercions;
-import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.brooklyn.util.time.Duration;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.reflect.TypeToken;
-
-/**
- * A sensor feed that retrieves performance counters from a Windows host and posts the values to sensors.
- *
- * <p>To use this feed, you must provide the entity, and a collection of mappings between Windows performance counter
- * names and Brooklyn attribute sensors.</p>
- *
- * <p>This feed uses SSH to invoke the windows utility <tt>typeperf</tt> to query for a specific set of performance
- * counters, by name. The values are extracted from the response, and published to the entity's sensors.</p>
- *
- * <p>Example:</p>
- *
- * {@code
- * @Override
- * protected void connectSensors() {
- * WindowsPerformanceCounterFeed feed = WindowsPerformanceCounterFeed.builder()
- * .entity(entity)
- * .addSensor("\\Processor(_total)\\% Idle Time", CPU_IDLE_TIME)
- * .addSensor("\\Memory\\Available MBytes", AVAILABLE_MEMORY)
- * .build();
- * }
- * }
- *
- * @since 0.6.0
- * @author richardcloudsoft
- */
-public class WindowsPerformanceCounterFeed extends AbstractFeed {
-
- private static final Logger log = LoggerFactory.getLogger(WindowsPerformanceCounterFeed.class);
-
- // This pattern matches CSV line(s) with the date in the first field, and at least one further field.
- protected static final Pattern lineWithPerfData = Pattern.compile("^\"[\\d:/\\-. ]+\",\".*\"$", Pattern.MULTILINE);
- private static final Joiner JOINER_ON_SPACE = Joiner.on(' ');
- private static final Joiner JOINER_ON_COMMA = Joiner.on(',');
- private static final int OUTPUT_COLUMN_WIDTH = 100;
-
- @SuppressWarnings("serial")
- public static final ConfigKey<Collection<WindowsPerformanceCounterPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
- new TypeToken<Collection<WindowsPerformanceCounterPollConfig<?>>>() {},
- "polls");
-
- public static Builder builder() {
- return new Builder();
- }
-
- public static class Builder {
- private EntityLocal entity;
- private Set<WindowsPerformanceCounterPollConfig<?>> polls = Sets.newLinkedHashSet();
- private Duration period = Duration.of(30, TimeUnit.SECONDS);
- private String uniqueTag;
- private volatile boolean built;
-
- public Builder entity(EntityLocal val) {
- this.entity = checkNotNull(val, "entity");
- return this;
- }
- public Builder addSensor(WindowsPerformanceCounterPollConfig<?> config) {
- polls.add(config);
- return this;
- }
- public Builder addSensor(String performanceCounterName, AttributeSensor<?> sensor) {
- return addSensor(new WindowsPerformanceCounterPollConfig(sensor).performanceCounterName(checkNotNull(performanceCounterName, "performanceCounterName")));
- }
- public Builder addSensors(Map<String, AttributeSensor> sensors) {
- for (Map.Entry<String, AttributeSensor> entry : sensors.entrySet()) {
- addSensor(entry.getKey(), entry.getValue());
- }
- return this;
- }
- public Builder period(Duration period) {
- this.period = checkNotNull(period, "period");
- return this;
- }
- public Builder period(long millis) {
- return period(millis, TimeUnit.MILLISECONDS);
- }
- public Builder period(long val, TimeUnit units) {
- return period(Duration.of(val, units));
- }
- public Builder uniqueTag(String uniqueTag) {
- this.uniqueTag = uniqueTag;
- return this;
- }
- public WindowsPerformanceCounterFeed build() {
- built = true;
- WindowsPerformanceCounterFeed result = new WindowsPerformanceCounterFeed(this);
- result.setEntity(checkNotNull(entity, "entity"));
- result.start();
- return result;
- }
- @Override
- protected void finalize() {
- if (!built) log.warn("WindowsPerformanceCounterFeed.Builder created, but build() never called");
- }
- }
-
- /**
- * For rebind; do not call directly; use builder
- */
- public WindowsPerformanceCounterFeed() {
- }
-
- protected WindowsPerformanceCounterFeed(Builder builder) {
- List<WindowsPerformanceCounterPollConfig<?>> polls = Lists.newArrayList();
- for (WindowsPerformanceCounterPollConfig<?> config : builder.polls) {
- if (!config.isEnabled()) continue;
- @SuppressWarnings({ "unchecked", "rawtypes" })
- WindowsPerformanceCounterPollConfig<?> configCopy = new WindowsPerformanceCounterPollConfig(config);
- if (configCopy.getPeriod() < 0) configCopy.period(builder.period);
- polls.add(configCopy);
- }
- config().set(POLLS, polls);
- initUniqueTag(builder.uniqueTag, polls);
- }
-
- @Override
- protected void preStart() {
- Collection<WindowsPerformanceCounterPollConfig<?>> polls = getConfig(POLLS);
-
- long minPeriod = Integer.MAX_VALUE;
- List<String> performanceCounterNames = Lists.newArrayList();
- for (WindowsPerformanceCounterPollConfig<?> config : polls) {
- minPeriod = Math.min(minPeriod, config.getPeriod());
- performanceCounterNames.add(config.getPerformanceCounterName());
- }
-
- Iterable<String> allParams = ImmutableList.<String>builder()
- .add("(Get-Counter")
- .add("-Counter")
- .add(JOINER_ON_COMMA.join(Iterables.transform(performanceCounterNames, QuoteStringFunction.INSTANCE)))
- .add("-SampleInterval")
- .add("2") // TODO: extract SampleInterval as a config key
- .add(").CounterSamples")
- .add("|")
- .add("Format-Table")
- .add(String.format("@{Expression={$_.Path};width=%d},@{Expression={$_.CookedValue};width=%<d}", OUTPUT_COLUMN_WIDTH))
- .add("-HideTableHeaders")
- .add("|")
- .add("Out-String")
- .add("-Width")
- .add(String.valueOf(OUTPUT_COLUMN_WIDTH * 2))
- .build();
- String command = JOINER_ON_SPACE.join(allParams);
- log.debug("Windows performance counter poll command for {} will be: {}", entity, command);
-
- GetPerformanceCountersJob<WinRmToolResponse> job = new GetPerformanceCountersJob(getEntity(), command);
- getPoller().scheduleAtFixedRate(
- new CallInEntityExecutionContext(entity, job),
- new SendPerfCountersToSensors(getEntity(), polls),
- minPeriod);
- }
-
- private static class GetPerformanceCountersJob<T> implements Callable<T> {
-
- private final Entity entity;
- private final String command;
-
- GetPerformanceCountersJob(Entity entity, String command) {
- this.entity = entity;
- this.command = command;
- }
-
- @Override
- public T call() throws Exception {
- WinRmMachineLocation machine = EffectorTasks.getWinRmMachine(entity);
- WinRmToolResponse response = machine.executePsScript(command);
- return (T)response;
- }
- }
-
- @SuppressWarnings("unchecked")
- protected Poller<WinRmToolResponse> getPoller() {
- return (Poller<WinRmToolResponse>) super.getPoller();
- }
-
- /**
- * A {@link java.util.concurrent.Callable} that wraps another {@link java.util.concurrent.Callable}, where the
- * inner {@link java.util.concurrent.Callable} is executed in the context of a
- * specific entity.
- *
- * @param <T> The type of the {@link java.util.concurrent.Callable}.
- */
- private static class CallInEntityExecutionContext<T> implements Callable<T> {
- private final Callable<T> job;
- private EntityLocal entity;
-
- private CallInEntityExecutionContext(EntityLocal entity, Callable<T> job) {
- this.job = job;
- this.entity = entity;
- }
-
- @Override
- public T call() throws Exception {
- ExecutionContext executionContext = ((EntityInternal) entity).getManagementSupport().getExecutionContext();
- return executionContext.submit(Maps.newHashMap(), job).get();
- }
- }
-
- @VisibleForTesting
- static class SendPerfCountersToSensors implements PollHandler<WinRmToolResponse> {
- private final EntityLocal entity;
- private final List<WindowsPerformanceCounterPollConfig<?>> polls;
- private final Set<AttributeSensor<?>> failedAttributes = Sets.newLinkedHashSet();
- private static final Pattern MACHINE_NAME_LOOKBACK_PATTERN = Pattern.compile(String.format("(?<=\\\\\\\\.{0,%d})\\\\.*", OUTPUT_COLUMN_WIDTH));
-
- public SendPerfCountersToSensors(EntityLocal entity, Collection<WindowsPerformanceCounterPollConfig<?>> polls) {
- this.entity = entity;
- this.polls = ImmutableList.copyOf(polls);
- }
-
- @Override
- public boolean checkSuccess(WinRmToolResponse val) {
- // TODO not just using statusCode; also looking at absence of stderr.
- // Status code is (empirically) unreliable: it returns 0 sometimes even when failed
- // (but never returns non-zero on success).
- if (val.getStatusCode() != 0) return false;
- String stderr = val.getStdErr();
- if (stderr == null || stderr.length() != 0) return false;
- String out = val.getStdOut();
- if (out == null || out.length() == 0) return false;
- return true;
- }
-
- @Override
- public void onSuccess(WinRmToolResponse val) {
- for (String pollResponse : val.getStdOut().split("\r\n")) {
- if (Strings.isNullOrEmpty(pollResponse)) {
- continue;
- }
- String path = pollResponse.substring(0, OUTPUT_COLUMN_WIDTH - 1);
- // The performance counter output prepends the sensor name with "\\<machinename>" so we need to remove it
- Matcher machineNameLookbackMatcher = MACHINE_NAME_LOOKBACK_PATTERN.matcher(path);
- if (!machineNameLookbackMatcher.find()) {
- continue;
- }
- String name = machineNameLookbackMatcher.group(0).trim();
- String rawValue = pollResponse.substring(OUTPUT_COLUMN_WIDTH).replaceAll("^\\s+", "");
- WindowsPerformanceCounterPollConfig<?> config = getPollConfig(name);
- Class<?> clazz = config.getSensor().getType();
- AttributeSensor<Object> attribute = (AttributeSensor<Object>) Sensors.newSensor(clazz, config.getSensor().getName(), config.getDescription());
- try {
- Object value = TypeCoercions.coerce(rawValue, TypeToken.of(clazz));
- entity.setAttribute(attribute, value);
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- if (failedAttributes.add(attribute)) {
- log.warn("Failed to coerce value '{}' to {} for {} -> {}", new Object[] {rawValue, clazz, entity, attribute});
- } else {
- if (log.isTraceEnabled()) log.trace("Failed (repeatedly) to coerce value '{}' to {} for {} -> {}", new Object[] {rawValue, clazz, entity, attribute});
- }
- }
- }
- }
-
- @Override
- public void onFailure(WinRmToolResponse val) {
- log.error("Windows Performance Counter query did not respond as expected. exitcode={} stdout={} stderr={}",
- new Object[]{val.getStatusCode(), val.getStdOut(), val.getStdErr()});
- for (WindowsPerformanceCounterPollConfig<?> config : polls) {
- Class<?> clazz = config.getSensor().getType();
- AttributeSensor<?> attribute = Sensors.newSensor(clazz, config.getSensor().getName(), config.getDescription());
- entity.setAttribute(attribute, null);
- }
- }
-
- @Override
- public void onException(Exception exception) {
- log.error("Detected exception while retrieving Windows Performance Counters from entity " +
- entity.getDisplayName(), exception);
- for (WindowsPerformanceCounterPollConfig<?> config : polls) {
- entity.setAttribute(Sensors.newSensor(config.getSensor().getClass(), config.getPerformanceCounterName(), config.getDescription()), null);
- }
- }
-
- @Override
- public String getDescription() {
- return "" + polls;
- }
-
- @Override
- public String toString() {
- return super.toString()+"["+getDescription()+"]";
- }
-
- private WindowsPerformanceCounterPollConfig<?> getPollConfig(String sensorName) {
- for (WindowsPerformanceCounterPollConfig<?> poll : polls) {
- if (poll.getPerformanceCounterName().equalsIgnoreCase(sensorName)) {
- return poll;
- }
- }
- throw new IllegalStateException(String.format("%s not found in configured polls: %s", sensorName, polls));
- }
- }
-
- static class PerfCounterValueIterator implements Iterator<String> {
-
- // This pattern matches the contents of the first field, and optionally matches the rest of the line as
- // further fields. Feed the second match back into the pattern again to get the next field, and repeat until
- // all fields are discovered.
- protected static final Pattern splitPerfData = Pattern.compile("^\"([^\\\"]*)\"((,\"[^\\\"]*\")*)$");
-
- private Matcher matcher;
-
- public PerfCounterValueIterator(String input) {
- matcher = splitPerfData.matcher(input);
- // Throw away the first element (the timestamp) (and also confirm that we have a pattern match)
- checkArgument(hasNext(), "input "+input+" does not match expected pattern "+splitPerfData.pattern());
- next();
- }
-
- @Override
- public boolean hasNext() {
- return matcher != null && matcher.find();
- }
-
- @Override
- public String next() {
- String next = matcher.group(1);
-
- String remainder = matcher.group(2);
- if (!Strings.isNullOrEmpty(remainder)) {
- assert remainder.startsWith(",");
- remainder = remainder.substring(1);
- matcher = splitPerfData.matcher(remainder);
- } else {
- matcher = null;
- }
-
- return next;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
- private static enum QuoteStringFunction implements Function<String, String> {
- INSTANCE;
-
- @Nullable
- @Override
- public String apply(@Nullable String input) {
- return input != null ? "\"" + input + "\"" : null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/windows/WindowsPerformanceCounterPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/windows/WindowsPerformanceCounterPollConfig.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/windows/WindowsPerformanceCounterPollConfig.java
deleted file mode 100644
index 34bc08c..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/windows/WindowsPerformanceCounterPollConfig.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed.windows;
-
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.sensor.feed.PollConfig;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-
-public class WindowsPerformanceCounterPollConfig<T> extends PollConfig<Object, T, WindowsPerformanceCounterPollConfig<T>>{
-
- private String performanceCounterName;
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public WindowsPerformanceCounterPollConfig(AttributeSensor<T> sensor) {
- super(sensor);
- description(sensor.getDescription());
- onSuccess((Function)Functions.identity());
- }
-
- public WindowsPerformanceCounterPollConfig(WindowsPerformanceCounterPollConfig<T> other) {
- super(other);
- this.performanceCounterName = other.performanceCounterName;
- }
-
- public String getPerformanceCounterName() {
- return performanceCounterName;
- }
-
- public WindowsPerformanceCounterPollConfig<T> performanceCounterName(String val) {
- this.performanceCounterName = val; return this;
- }
-
- @Override protected String toStringPollSource() { return performanceCounterName; }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/util/core/http/HttpToolResponse.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/http/HttpToolResponse.java b/core/src/main/java/org/apache/brooklyn/util/core/http/HttpToolResponse.java
index 9e8e061..ad768f7 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/http/HttpToolResponse.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/http/HttpToolResponse.java
@@ -26,7 +26,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.brooklyn.sensor.feed.http.HttpPollValue;
+import org.apache.brooklyn.feed.http.HttpPollValue;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.stream.Streams;
import org.apache.brooklyn.util.time.Duration;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/core/feed/ConfigToAttributesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/feed/ConfigToAttributesTest.java b/core/src/test/java/org/apache/brooklyn/core/feed/ConfigToAttributesTest.java
new file mode 100644
index 0000000..1cc48df
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/feed/ConfigToAttributesTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.feed.ConfigToAttributes;
+import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
+import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
+import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey;
+import org.apache.brooklyn.core.sensor.TemplatedStringAttributeSensorAndConfigKey;
+import org.apache.brooklyn.core.test.entity.TestApplication;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ConfigToAttributesTest {
+
+ private ManagementContextInternal managementContext;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ managementContext = new LocalManagementContext();
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (managementContext != null) Entities.destroyAll(managementContext);
+ }
+
+ @Test
+ public void testApplyTemplatedConfigWithEntity() {
+ TestApplication app = managementContext.getEntityManager().createEntity(EntitySpec.create(TestApplication.class)
+ .configure(TestEntity.CONF_NAME, "myval"));
+ Entities.startManagement(app, managementContext);
+
+ BasicAttributeSensorAndConfigKey<String> key = new TemplatedStringAttributeSensorAndConfigKey("mykey", "my descr", "${config['test.confName']!'notfound'}");
+ String val = ConfigToAttributes.apply(app, key);
+ assertEquals(app.getAttribute(key), val);
+ assertEquals(val, "myval");
+
+ }
+
+ @Test
+ public void testApplyTemplatedConfigWithManagementContext() {
+ managementContext.getBrooklynProperties().put(TestEntity.CONF_NAME, "myglobalval");
+ BasicAttributeSensorAndConfigKey<String> key = new TemplatedStringAttributeSensorAndConfigKey("mykey", "my descr", "${config['test.confName']!'notfound'}");
+ String val = ConfigToAttributes.transform(managementContext, key);
+ assertEquals(val, "myglobalval");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
new file mode 100644
index 0000000..0f2c1ce
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import static org.testng.Assert.assertTrue;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.core.feed.PollHandler;
+import org.apache.brooklyn.core.feed.Poller;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class PollerTest extends BrooklynAppUnitTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PollerTest.class);
+
+ private TestEntity entity;
+ private Poller<Integer> poller;
+
+ @BeforeMethod(alwaysRun=true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+ poller = new Poller<Integer>(entity, false);
+ }
+
+ @AfterMethod(alwaysRun=true)
+ @Override
+ public void tearDown() throws Exception {
+ if (poller != null) poller.stop();
+ super.tearDown();
+ }
+
+ @Test(groups={"Integration", "WIP"}) // because takes > 1 second
+ public void testPollingSubTaskFailsOnceKeepsGoing() throws Exception {
+ final AtomicInteger counter = new AtomicInteger();
+ poller.scheduleAtFixedRate(
+ new Callable<Integer>() {
+ @Override public Integer call() throws Exception {
+ int result = counter.incrementAndGet();
+ if (result % 2 == 0) {
+ DynamicTasks.queue("in-poll", new Runnable() {
+ public void run() {
+ throw new IllegalStateException("Simulating error in sub-task for poll");
+ }});
+ }
+ return result;
+ }
+ },
+ new PollHandler<Integer>() {
+ @Override public boolean checkSuccess(Integer val) {
+ return true;
+ }
+ @Override public void onSuccess(Integer val) {
+
+ }
+ @Override public void onFailure(Integer val) {
+ }
+ @Override
+ public void onException(Exception exception) {
+ LOG.info("Exception in test poller", exception);
+ }
+ @Override public String getDescription() {
+ return "mypollhandler";
+ }
+ },
+ new Duration(10, TimeUnit.MILLISECONDS));
+ poller.start();
+
+ Asserts.succeedsContinually(MutableMap.of("timeout", 2*1000, "period", 500), new Runnable() {
+ int oldCounter = -1;
+ @Override public void run() {
+ assertTrue(counter.get() > oldCounter);
+ oldCounter = counter.get();
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/core/location/TestPortSupplierLocation.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/location/TestPortSupplierLocation.java b/core/src/test/java/org/apache/brooklyn/core/location/TestPortSupplierLocation.java
index a5b4294..b8d8c35 100644
--- a/core/src/test/java/org/apache/brooklyn/core/location/TestPortSupplierLocation.java
+++ b/core/src/test/java/org/apache/brooklyn/core/location/TestPortSupplierLocation.java
@@ -21,11 +21,11 @@ package org.apache.brooklyn.core.location;
import static org.testng.Assert.assertEquals;
import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.core.feed.ConfigToAttributes;
import org.apache.brooklyn.core.location.PortRanges;
import org.apache.brooklyn.core.sensor.PortAttributeSensorAndConfigKey;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestEntity;
-import org.apache.brooklyn.sensor.feed.ConfigToAttributes;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedTest.java
index f0c6551..4a724e4 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedTest.java
@@ -35,14 +35,14 @@ import org.apache.brooklyn.core.mgmt.internal.BrooklynGarbageCollector;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.core.test.entity.TestEntityImpl.TestEntityWithoutEnrichers;
-import org.apache.brooklyn.sensor.feed.function.FunctionFeed;
-import org.apache.brooklyn.sensor.feed.function.FunctionPollConfig;
-import org.apache.brooklyn.sensor.feed.http.HttpFeed;
-import org.apache.brooklyn.sensor.feed.http.HttpPollConfig;
-import org.apache.brooklyn.sensor.feed.http.HttpValueFunctions;
-import org.apache.brooklyn.sensor.feed.ssh.SshFeed;
-import org.apache.brooklyn.sensor.feed.ssh.SshPollConfig;
-import org.apache.brooklyn.sensor.feed.ssh.SshValueFunctions;
+import org.apache.brooklyn.feed.function.FunctionFeed;
+import org.apache.brooklyn.feed.function.FunctionPollConfig;
+import org.apache.brooklyn.feed.http.HttpFeed;
+import org.apache.brooklyn.feed.http.HttpPollConfig;
+import org.apache.brooklyn.feed.http.HttpValueFunctions;
+import org.apache.brooklyn.feed.ssh.SshFeed;
+import org.apache.brooklyn.feed.ssh.SshPollConfig;
+import org.apache.brooklyn.feed.ssh.SshValueFunctions;
import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.test.EntityTestUtils;
import org.apache.brooklyn.util.collections.MutableList;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/feed/function/FunctionFeedTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/feed/function/FunctionFeedTest.java b/core/src/test/java/org/apache/brooklyn/feed/function/FunctionFeedTest.java
new file mode 100644
index 0000000..c362e4e6
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/feed/function/FunctionFeedTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.function;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.Feed;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.entity.EntityInternal.FeedSupport;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.feed.function.FunctionFeed;
+import org.apache.brooklyn.feed.function.FunctionFeedTest;
+import org.apache.brooklyn.feed.function.FunctionPollConfig;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.apache.brooklyn.location.localhost.LocalhostMachineProvisioningLocation;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Predicates;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Callables;
+
+public class FunctionFeedTest extends BrooklynAppUnitTestSupport {
+
+ private static final Logger log = LoggerFactory.getLogger(FunctionFeedTest.class);
+
+ final static AttributeSensor<String> SENSOR_STRING = Sensors.newStringSensor("aString", "");
+ final static AttributeSensor<Integer> SENSOR_INT = Sensors.newIntegerSensor("aLong", "");
+
+ private Location loc;
+ private EntityLocal entity;
+ private FunctionFeed feed;
+
+ @BeforeMethod(alwaysRun=true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ loc = new LocalhostMachineProvisioningLocation();
+ entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+ app.start(ImmutableList.of(loc));
+ }
+
+ @AfterMethod(alwaysRun=true)
+ @Override
+ public void tearDown() throws Exception {
+ if (feed != null) feed.stop();
+ super.tearDown();
+ }
+
+ @Test
+ public void testPollsFunctionRepeatedlyToSetAttribute() throws Exception {
+ feed = FunctionFeed.builder()
+ .entity(entity)
+ .poll(new FunctionPollConfig<Integer,Integer>(SENSOR_INT)
+ .period(1)
+ .callable(new IncrementingCallable())
+ //.onSuccess((Function<Object,Integer>)(Function)Functions.identity()))
+ )
+ .build();
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override
+ public void run() {
+ Integer val = entity.getAttribute(SENSOR_INT);
+ assertTrue(val != null && val > 2, "val=" + val);
+ }
+ });
+ }
+
+ @Test
+ public void testFeedDeDupe() throws Exception {
+ testPollsFunctionRepeatedlyToSetAttribute();
+ entity.addFeed(feed);
+ log.info("Feed 0 is: "+feed);
+ Feed feed0 = feed;
+
+ testPollsFunctionRepeatedlyToSetAttribute();
+ entity.addFeed(feed);
+ log.info("Feed 1 is: "+feed);
+ Feed feed1 = feed;
+ Assert.assertFalse(feed1==feed0);
+
+ FeedSupport feeds = ((EntityInternal)entity).feeds();
+ Assert.assertEquals(feeds.getFeeds().size(), 1, "Wrong feed count: "+feeds.getFeeds());
+
+ // a couple extra checks, compared to the de-dupe test in other *FeedTest classes
+ Feed feedAdded = Iterables.getOnlyElement(feeds.getFeeds());
+ Assert.assertTrue(feedAdded==feed1);
+ Assert.assertFalse(feedAdded==feed0);
+ }
+
+ @Test
+ public void testFeedDeDupeIgnoresSameObject() throws Exception {
+ testPollsFunctionRepeatedlyToSetAttribute();
+ entity.addFeed(feed);
+ assertFeedIsPolling();
+ entity.addFeed(feed);
+ assertFeedIsPollingContinuously();
+ }
+
+ @Test
+ public void testCallsOnSuccessWithResultOfCallable() throws Exception {
+ feed = FunctionFeed.builder()
+ .entity(entity)
+ .poll(new FunctionPollConfig<Integer, Integer>(SENSOR_INT)
+ .period(1)
+ .callable(Callables.returning(123))
+ .onSuccess(new AddOneFunction()))
+ .build();
+
+ EntityTestUtils.assertAttributeEqualsEventually(entity, SENSOR_INT, 124);
+ }
+
+ @Test
+ public void testCallsOnExceptionWithExceptionFromCallable() throws Exception {
+ final String errMsg = "my err msg";
+
+ feed = FunctionFeed.builder()
+ .entity(entity)
+ .poll(new FunctionPollConfig<Object, String>(SENSOR_STRING)
+ .period(1)
+ .callable(new ExceptionCallable(errMsg))
+ .onException(new ToStringFunction()))
+ .build();
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override
+ public void run() {
+ String val = entity.getAttribute(SENSOR_STRING);
+ assertTrue(val != null && val.contains(errMsg), "val=" + val);
+ }
+ });
+ }
+
+ @Test
+ public void testCallsOnFailureWithResultOfCallable() throws Exception {
+ feed = FunctionFeed.builder()
+ .entity(entity)
+ .poll(new FunctionPollConfig<Integer, Integer>(SENSOR_INT)
+ .period(1)
+ .callable(Callables.returning(1))
+ .checkSuccess(Predicates.alwaysFalse())
+ .onSuccess(new AddOneFunction())
+ .onFailure(Functions.constant(-1)))
+ .build();
+
+ EntityTestUtils.assertAttributeEqualsEventually(entity, SENSOR_INT, -1);
+ }
+
+ @Test
+ public void testCallsOnExceptionWhenCheckSuccessIsFalseButNoFailureHandler() throws Exception {
+ feed = FunctionFeed.builder()
+ .entity(entity)
+ .poll(new FunctionPollConfig<Integer, Integer>(SENSOR_INT)
+ .period(1)
+ .callable(Callables.returning(1))
+ .checkSuccess(Predicates.alwaysFalse())
+ .onSuccess(new AddOneFunction())
+ .onException(Functions.constant(-1)))
+ .build();
+
+ EntityTestUtils.assertAttributeEqualsEventually(entity, SENSOR_INT, -1);
+ }
+
+ @Test
+ public void testSharesFunctionWhenMultiplePostProcessors() throws Exception {
+ final IncrementingCallable incrementingCallable = new IncrementingCallable();
+ final List<Integer> ints = new CopyOnWriteArrayList<Integer>();
+ final List<String> strings = new CopyOnWriteArrayList<String>();
+
+ entity.subscribe(entity, SENSOR_INT, new SensorEventListener<Integer>() {
+ @Override public void onEvent(SensorEvent<Integer> event) {
+ ints.add(event.getValue());
+ }});
+ entity.subscribe(entity, SENSOR_STRING, new SensorEventListener<String>() {
+ @Override public void onEvent(SensorEvent<String> event) {
+ strings.add(event.getValue());
+ }});
+
+ feed = FunctionFeed.builder()
+ .entity(entity)
+ .poll(new FunctionPollConfig<Integer, Integer>(SENSOR_INT)
+ .period(10)
+ .callable(incrementingCallable))
+ .poll(new FunctionPollConfig<Integer, String>(SENSOR_STRING)
+ .period(10)
+ .callable(incrementingCallable)
+ .onSuccess(new ToStringFunction()))
+ .build();
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override
+ public void run() {
+ assertEquals(ints.subList(0, 2), ImmutableList.of(0, 1));
+ assertTrue(strings.size()>=2, "wrong strings list: "+strings);
+ assertEquals(strings.subList(0, 2), ImmutableList.of("0", "1"), "wrong strings list: "+strings);
+ }});
+ }
+
+ @Test
+ @SuppressWarnings("unused")
+ public void testFunctionPollConfigBuilding() throws Exception {
+ FunctionPollConfig<Integer, Integer> typeFromCallable = FunctionPollConfig.forSensor(SENSOR_INT)
+ .period(1)
+ .callable(Callables.returning(1))
+ .onSuccess(Functions.constant(-1));
+
+ FunctionPollConfig<Integer, Integer> typeFromSupplier = FunctionPollConfig.forSensor(SENSOR_INT)
+ .period(1)
+ .supplier(Suppliers.ofInstance(1))
+ .onSuccess(Functions.constant(-1));
+
+ FunctionPollConfig<Integer, Integer> usingConstructor = new FunctionPollConfig<Integer, Integer>(SENSOR_INT)
+ .period(1)
+ .supplier(Suppliers.ofInstance(1))
+ .onSuccess(Functions.constant(-1));
+
+ FunctionPollConfig<Integer, Integer> usingConstructorWithFailureOrException = new FunctionPollConfig<Integer, Integer>(SENSOR_INT)
+ .period(1)
+ .supplier(Suppliers.ofInstance(1))
+ .onFailureOrException(Functions.<Integer>constant(null));
+ }
+
+
+ private void assertFeedIsPolling() {
+ final Integer val = entity.getAttribute(SENSOR_INT);
+ Asserts.succeedsEventually(new Runnable() {
+ @Override
+ public void run() {
+ assertNotEquals(val, entity.getAttribute(SENSOR_INT));
+ }
+ });
+ }
+
+ private void assertFeedIsPollingContinuously() {
+ Asserts.succeedsContinually(new Runnable() {
+ @Override
+ public void run() {
+ assertFeedIsPolling();
+ }
+ });
+ }
+
+ private static class IncrementingCallable implements Callable<Integer> {
+ private final AtomicInteger next = new AtomicInteger(0);
+
+ @Override public Integer call() {
+ return next.getAndIncrement();
+ }
+ }
+
+ private static class AddOneFunction implements Function<Integer, Integer> {
+ @Override public Integer apply(@Nullable Integer input) {
+ return (input != null) ? (input + 1) : null;
+ }
+ }
+
+ private static class ExceptionCallable implements Callable<Void> {
+ private final String msg;
+ ExceptionCallable(String msg) {
+ this.msg = msg;
+ }
+ @Override public Void call() {
+ throw new RuntimeException(msg);
+ }
+ }
+
+ public static class ToStringFunction implements Function<Object, String> {
+ @Override public String apply(@Nullable Object input) {
+ return (input != null) ? (input.toString()) : null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/feed/http/HttpFeedIntegrationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/feed/http/HttpFeedIntegrationTest.java b/core/src/test/java/org/apache/brooklyn/feed/http/HttpFeedIntegrationTest.java
new file mode 100644
index 0000000..ee7e226
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/feed/http/HttpFeedIntegrationTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.net.URI;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.location.PortRanges;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.HttpService;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.feed.http.HttpFeed;
+import org.apache.brooklyn.feed.http.HttpPollConfig;
+import org.apache.brooklyn.feed.http.HttpValueFunctions;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.apache.brooklyn.location.localhost.LocalhostMachineProvisioningLocation;
+
+import com.google.common.base.Functions;
+import com.google.common.collect.ImmutableList;
+
+public class HttpFeedIntegrationTest extends BrooklynAppUnitTestSupport {
+
+ final static AttributeSensor<String> SENSOR_STRING = Sensors.newStringSensor("aString", "");
+ final static AttributeSensor<Integer> SENSOR_INT = Sensors.newIntegerSensor("aLong", "");
+
+ private HttpService httpService;
+
+ private Location loc;
+ private EntityLocal entity;
+ private HttpFeed feed;
+
+ @BeforeMethod(alwaysRun=true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ loc = new LocalhostMachineProvisioningLocation();
+ entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+ app.start(ImmutableList.of(loc));
+ }
+
+ @AfterMethod(alwaysRun=true)
+ @Override
+ public void tearDown() throws Exception {
+ if (feed != null) feed.stop();
+ if (httpService != null) httpService.shutdown();
+ super.tearDown();
+ }
+
+ @Test(groups = {"Integration"})
+ public void testPollsAndParsesHttpGetResponseWithSsl() throws Exception {
+ httpService = new HttpService(PortRanges.fromString("9000+"), true).start();
+ URI baseUrl = new URI(httpService.getUrl());
+
+ assertEquals(baseUrl.getScheme(), "https", "baseUrl="+baseUrl);
+
+ feed = HttpFeed.builder()
+ .entity(entity)
+ .baseUri(baseUrl)
+ .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+ .period(100)
+ .onSuccess(HttpValueFunctions.responseCode()))
+ .poll(new HttpPollConfig<String>(SENSOR_STRING)
+ .period(100)
+ .onSuccess(HttpValueFunctions.stringContentsFunction()))
+ .build();
+
+ EntityTestUtils.assertAttributeEqualsEventually(entity, SENSOR_INT, 200);
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ String val = entity.getAttribute(SENSOR_STRING);
+ assertTrue(val != null && val.contains("Hello, World"), "val="+val);
+ }});
+ }
+
+ @Test(groups = {"Integration"})
+ public void testPollsAndParsesHttpGetResponseWithBasicAuthentication() throws Exception {
+ final String username = "brooklyn";
+ final String password = "hunter2";
+ httpService = new HttpService(PortRanges.fromString("9000+"))
+ .basicAuthentication(username, password)
+ .start();
+ URI baseUrl = new URI(httpService.getUrl());
+ assertEquals(baseUrl.getScheme(), "http", "baseUrl="+baseUrl);
+
+ feed = HttpFeed.builder()
+ .entity(entity)
+ .baseUri(baseUrl)
+ .credentials(username, password)
+ .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+ .period(100)
+ .onSuccess(HttpValueFunctions.responseCode()))
+ .poll(new HttpPollConfig<String>(SENSOR_STRING)
+ .period(100)
+ .onSuccess(HttpValueFunctions.stringContentsFunction()))
+ .build();
+
+ EntityTestUtils.assertAttributeEqualsEventually(entity, SENSOR_INT, 200);
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ String val = entity.getAttribute(SENSOR_STRING);
+ assertTrue(val != null && val.contains("Hello, World"), "val="+val);
+ }});
+ }
+
+ @Test(groups = {"Integration"})
+ public void testPollWithInvalidCredentialsFails() throws Exception {
+ httpService = new HttpService(PortRanges.fromString("9000+"))
+ .basicAuthentication("brooklyn", "hunter2")
+ .start();
+
+ feed = HttpFeed.builder()
+ .entity(entity)
+ .baseUri(httpService.getUrl())
+ .credentials("brooklyn", "9876543210")
+ .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+ .period(100)
+ .onSuccess(HttpValueFunctions.responseCode())
+ .onFailure(HttpValueFunctions.responseCode()))
+ .poll(new HttpPollConfig<String>(SENSOR_STRING)
+ .period(100)
+ .onSuccess(HttpValueFunctions.stringContentsFunction())
+ .onException(Functions.constant("Failed!")))
+ .build();
+
+ EntityTestUtils.assertAttributeEqualsEventually(entity, SENSOR_INT, 401);
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ String val = entity.getAttribute(SENSOR_STRING);
+ assertTrue(val != null && val.equals("Failed!"), "val=" + val);
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/feed/http/HttpFeedTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/feed/http/HttpFeedTest.java b/core/src/test/java/org/apache/brooklyn/feed/http/HttpFeedTest.java
new file mode 100644
index 0000000..d8ac492
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/feed/http/HttpFeedTest.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityFunctions;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.entity.EntityInternal.FeedSupport;
+import org.apache.brooklyn.core.feed.FeedConfig;
+import org.apache.brooklyn.core.feed.PollConfig;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.feed.http.HttpFeed;
+import org.apache.brooklyn.feed.http.HttpPollConfig;
+import org.apache.brooklyn.feed.http.HttpValueFunctions;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.http.BetterMockWebServer;
+import org.apache.brooklyn.util.core.http.HttpToolResponse;
+import org.apache.brooklyn.util.guava.Functionals;
+import org.apache.brooklyn.util.net.Networking;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.mockwebserver.MockResponse;
+import com.google.mockwebserver.SocketPolicy;
+
+public class HttpFeedTest extends BrooklynAppUnitTestSupport {
+
+ private static final Logger log = LoggerFactory.getLogger(HttpFeedTest.class);
+
+ final static AttributeSensor<String> SENSOR_STRING = Sensors.newStringSensor("aString", "");
+ final static AttributeSensor<Integer> SENSOR_INT = Sensors.newIntegerSensor( "aLong", "");
+
+ private static final long TIMEOUT_MS = 10*1000;
+
+ private BetterMockWebServer server;
+ private URL baseUrl;
+
+ private Location loc;
+ private EntityLocal entity;
+ private HttpFeed feed;
+
+ @BeforeMethod(alwaysRun=true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ server = BetterMockWebServer.newInstanceLocalhost();
+ for (int i = 0; i < 100; i++) {
+ server.enqueue(new MockResponse().setResponseCode(200).addHeader("content-type: application/json").setBody("{\"foo\":\"myfoo\"}"));
+ }
+ server.play();
+ baseUrl = server.getUrl("/");
+
+ loc = app.newLocalhostProvisioningLocation();
+ entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+ app.start(ImmutableList.of(loc));
+ }
+
+ @AfterMethod(alwaysRun=true)
+ @Override
+ public void tearDown() throws Exception {
+ if (feed != null) feed.stop();
+ if (server != null) server.shutdown();
+ feed = null;
+ super.tearDown();
+ }
+
+ @Test
+ public void testPollsAndParsesHttpGetResponse() throws Exception {
+ feed = HttpFeed.builder()
+ .entity(entity)
+ .baseUrl(baseUrl)
+ .poll(HttpPollConfig.forSensor(SENSOR_INT)
+ .period(100)
+ .onSuccess(HttpValueFunctions.responseCode()))
+ .poll(HttpPollConfig.forSensor(SENSOR_STRING)
+ .period(100)
+ .onSuccess(HttpValueFunctions.stringContentsFunction()))
+ .build();
+
+ assertSensorEventually(SENSOR_INT, (Integer)200, TIMEOUT_MS);
+ assertSensorEventually(SENSOR_STRING, "{\"foo\":\"myfoo\"}", TIMEOUT_MS);
+ }
+
+ @Test
+ public void testFeedDeDupe() throws Exception {
+ testPollsAndParsesHttpGetResponse();
+ entity.addFeed(feed);
+ log.info("Feed 0 is: "+feed);
+
+ testPollsAndParsesHttpGetResponse();
+ log.info("Feed 1 is: "+feed);
+ entity.addFeed(feed);
+
+ FeedSupport feeds = ((EntityInternal)entity).feeds();
+ Assert.assertEquals(feeds.getFeeds().size(), 1, "Wrong feed count: "+feeds.getFeeds());
+ }
+
+ @Test
+ public void testSetsConnectionTimeout() throws Exception {
+ feed = HttpFeed.builder()
+ .entity(entity)
+ .baseUrl(baseUrl)
+ .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+ .period(100)
+ .connectionTimeout(Duration.TEN_SECONDS)
+ .socketTimeout(Duration.TEN_SECONDS)
+ .onSuccess(HttpValueFunctions.responseCode()))
+ .build();
+
+ assertSensorEventually(SENSOR_INT, (Integer)200, TIMEOUT_MS);
+ }
+
+ // TODO How to cause the other end to just freeze (similar to aws-ec2 when securityGroup port is not open)?
+ @Test
+ public void testSetsConnectionTimeoutWhenServerDisconnects() throws Exception {
+ if (server != null) server.shutdown();
+ server = BetterMockWebServer.newInstanceLocalhost();
+ for (int i = 0; i < 100; i++) {
+ server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START));
+ }
+ server.play();
+ baseUrl = server.getUrl("/");
+
+ feed = HttpFeed.builder()
+ .entity(entity)
+ .baseUrl(baseUrl)
+ .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+ .period(100)
+ .connectionTimeout(Duration.TEN_SECONDS)
+ .socketTimeout(Duration.TEN_SECONDS)
+ .onSuccess(HttpValueFunctions.responseCode())
+ .onException(Functions.constant(-1)))
+ .build();
+
+ assertSensorEventually(SENSOR_INT, -1, TIMEOUT_MS);
+ }
+
+
+ @Test
+ public void testPollsAndParsesHttpPostResponse() throws Exception {
+ feed = HttpFeed.builder()
+ .entity(entity)
+ .baseUrl(baseUrl)
+ .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+ .method("post")
+ .period(100)
+ .onSuccess(HttpValueFunctions.responseCode()))
+ .poll(new HttpPollConfig<String>(SENSOR_STRING)
+ .method("post")
+ .period(100)
+ .onSuccess(HttpValueFunctions.stringContentsFunction()))
+ .build();
+
+ assertSensorEventually(SENSOR_INT, (Integer)200, TIMEOUT_MS);
+ assertSensorEventually(SENSOR_STRING, "{\"foo\":\"myfoo\"}", TIMEOUT_MS);
+ }
+
+ @Test
+ public void testUsesFailureHandlerOn4xx() throws Exception {
+ server = BetterMockWebServer.newInstanceLocalhost();
+ for (int i = 0; i < 100; i++) {
+ server.enqueue(new MockResponse()
+ .setResponseCode(401)
+ .setBody("Unauthorised"));
+ }
+ server.play();
+ feed = HttpFeed.builder()
+ .entity(entity)
+ .baseUrl(server.getUrl("/"))
+ .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+ .period(100)
+ .onSuccess(HttpValueFunctions.responseCode())
+ .onFailure(HttpValueFunctions.responseCode()))
+ .poll(new HttpPollConfig<String>(SENSOR_STRING)
+ .period(100)
+ .onSuccess(HttpValueFunctions.stringContentsFunction())
+ .onFailure(Functions.constant("Failed")))
+ .build();
+
+ assertSensorEventually(SENSOR_INT, 401, TIMEOUT_MS);
+ assertSensorEventually(SENSOR_STRING, "Failed", TIMEOUT_MS);
+
+ server.shutdown();
+ }
+
+ @Test
+ public void testUsesExceptionHandlerOn4xxAndNoFailureHandler() throws Exception {
+ server = BetterMockWebServer.newInstanceLocalhost();
+ for (int i = 0; i < 100; i++) {
+ server.enqueue(new MockResponse()
+ .setResponseCode(401)
+ .setBody("Unauthorised"));
+ }
+ server.play();
+ feed = HttpFeed.builder()
+ .entity(entity)
+ .baseUrl(server.getUrl("/"))
+ .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+ .period(100)
+ .onSuccess(HttpValueFunctions.responseCode())
+ .onException(Functions.constant(-1)))
+ .build();
+
+ assertSensorEventually(SENSOR_INT, -1, TIMEOUT_MS);
+
+ server.shutdown();
+ }
+
+ @Test(groups="Integration")
+ // marked integration as it takes a wee while
+ public void testSuspendResume() throws Exception {
+ feed = HttpFeed.builder()
+ .entity(entity)
+ .baseUrl(baseUrl)
+ .poll(new HttpPollConfig<Integer>(SENSOR_INT)
+ .period(100)
+ .onSuccess(HttpValueFunctions.responseCode()))
+ .poll(new HttpPollConfig<String>(SENSOR_STRING)
+ .period(100)
+ .onSuccess(HttpValueFunctions.stringContentsFunction()))
+ .build();
+ assertSensorEventually(SENSOR_INT, (Integer)200, TIMEOUT_MS);
+ feed.suspend();
+ final int countWhenSuspended = server.getRequestCount();
+
+ Thread.sleep(500);
+ if (server.getRequestCount() > countWhenSuspended+1)
+ Assert.fail("Request count continued to increment while feed was suspended, from "+countWhenSuspended+" to "+server.getRequestCount());
+
+ feed.resume();
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ assertTrue(server.getRequestCount() > countWhenSuspended + 1,
+ "Request count failed to increment when feed was resumed, from " + countWhenSuspended + ", still at " + server.getRequestCount());
+ }
+ });
+ }
+
+ @Test(groups="Integration")
+ // marked integration as it takes a wee while
+ public void testStartSuspended() throws Exception {
+ feed = HttpFeed.builder()
+ .entity(entity)
+ .baseUrl(baseUrl)
+ .poll(HttpPollConfig.forSensor(SENSOR_INT)
+ .period(100)
+ .onSuccess(HttpValueFunctions.responseCode()))
+ .poll(HttpPollConfig.forSensor(SENSOR_STRING)
+ .period(100)
+ .onSuccess(HttpValueFunctions.stringContentsFunction()))
+ .suspended()
+ .build();
+ Asserts.continually(MutableMap.of("timeout", 500),
+ Entities.attributeSupplier(entity, SENSOR_INT), Predicates.<Integer>equalTo(null));
+ int countWhenSuspended = server.getRequestCount();
+ feed.resume();
+ Asserts.eventually(Entities.attributeSupplier(entity, SENSOR_INT), Predicates.<Integer>equalTo(200));
+ if (server.getRequestCount() <= countWhenSuspended)
+ Assert.fail("Request count failed to increment when feed was resumed, from "+countWhenSuspended+", still at "+server.getRequestCount());
+ log.info("RUN: "+countWhenSuspended+" - "+server.getRequestCount());
+ }
+
+
+ @Test
+ public void testPollsAndParsesHttpErrorResponseLocal() throws Exception {
+ int unboundPort = Networking.nextAvailablePort(10000);
+ feed = HttpFeed.builder()
+ .entity(entity)
+ .baseUri("http://localhost:" + unboundPort + "/path/should/not/exist")
+ .poll(new HttpPollConfig<String>(SENSOR_STRING)
+ .onSuccess(Functions.constant("success"))
+ .onFailure(Functions.constant("failure"))
+ .onException(Functions.constant("error")))
+ .build();
+
+ assertSensorEventually(SENSOR_STRING, "error", TIMEOUT_MS);
+ }
+
+ @Test
+ public void testPollsMulti() throws Exception {
+ newMultiFeed(baseUrl);
+ assertSensorEventually(SENSOR_INT, (Integer)200, TIMEOUT_MS);
+ assertSensorEventually(SENSOR_STRING, "{\"foo\":\"myfoo\"}", TIMEOUT_MS);
+ }
+
+ // because takes a wee while
+ @SuppressWarnings("rawtypes")
+ @Test(groups="Integration")
+ public void testPollsMultiClearsOnSubsequentFailure() throws Exception {
+ server = BetterMockWebServer.newInstanceLocalhost();
+ for (int i = 0; i < 10; i++) {
+ server.enqueue(new MockResponse()
+ .setResponseCode(200)
+ .setBody("Hello World"));
+ }
+ for (int i = 0; i < 10; i++) {
+ server.enqueue(new MockResponse()
+ .setResponseCode(401)
+ .setBody("Unauthorised"));
+ }
+ server.play();
+
+ newMultiFeed(server.getUrl("/"));
+
+ assertSensorEventually(SENSOR_INT, 200, TIMEOUT_MS);
+ assertSensorEventually(SENSOR_STRING, "Hello World", TIMEOUT_MS);
+
+ assertSensorEventually(SENSOR_INT, -1, TIMEOUT_MS);
+ assertSensorEventually(SENSOR_STRING, null, TIMEOUT_MS);
+
+ List<String> attrs = Lists.transform(MutableList.copyOf( ((EntityInternal)entity).getAllAttributes().keySet() ),
+ new Function<AttributeSensor,String>() {
+ @Override public String apply(AttributeSensor input) { return input.getName(); } });
+ Assert.assertTrue(!attrs.contains(SENSOR_STRING.getName()), "attrs contained "+SENSOR_STRING);
+ Assert.assertTrue(!attrs.contains(FeedConfig.NO_SENSOR.getName()), "attrs contained "+FeedConfig.NO_SENSOR);
+
+ server.shutdown();
+ }
+
+ private void newMultiFeed(URL baseUrl) {
+ feed = HttpFeed.builder()
+ .entity(entity)
+ .baseUrl(baseUrl)
+
+ .poll(HttpPollConfig.forMultiple()
+ .onSuccess(new Function<HttpToolResponse,Void>() {
+ public Void apply(HttpToolResponse response) {
+ entity.setAttribute(SENSOR_INT, response.getResponseCode());
+ if (response.getResponseCode()==200)
+ entity.setAttribute(SENSOR_STRING, response.getContentAsString());
+ return null;
+ }
+ })
+ .onFailureOrException(Functionals.function(EntityFunctions.settingSensorsConstant(entity, MutableMap.<AttributeSensor<?>,Object>of(
+ SENSOR_INT, -1,
+ SENSOR_STRING, PollConfig.REMOVE))))
+ .period(100))
+ .build();
+ }
+
+
+ private <T> void assertSensorEventually(final AttributeSensor<T> sensor, final T expectedVal, long timeout) {
+ Asserts.succeedsEventually(ImmutableMap.of("timeout", timeout), new Callable<Void>() {
+ public Void call() {
+ assertEquals(entity.getAttribute(sensor), expectedVal);
+ return null;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/feed/http/HttpValueFunctionsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/feed/http/HttpValueFunctionsTest.java b/core/src/test/java/org/apache/brooklyn/feed/http/HttpValueFunctionsTest.java
new file mode 100644
index 0000000..23ffae3
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/feed/http/HttpValueFunctionsTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+import java.util.NoSuchElementException;
+
+import org.apache.brooklyn.feed.http.HttpValueFunctions;
+import org.apache.brooklyn.util.core.http.HttpToolResponse;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonPrimitive;
+
+public class HttpValueFunctionsTest {
+
+ private int responseCode = 200;
+ private long fullLatency = 1000;
+ private String headerName = "my_header";
+ private String headerVal = "my_header_val";
+ private String bodyKey = "mykey";
+ private String bodyVal = "myvalue";
+ private String body = "{"+bodyKey+":"+bodyVal+"}";
+ private long now;
+ private HttpToolResponse response;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ now = System.currentTimeMillis();
+ response = new HttpToolResponse(responseCode, ImmutableMap.of(headerName, ImmutableList.of(headerVal)),
+ body.getBytes(), now-fullLatency, fullLatency / 2, fullLatency);
+ }
+
+ @Test
+ public void testResponseCode() throws Exception {
+ assertEquals(HttpValueFunctions.responseCode().apply(response), Integer.valueOf(responseCode));
+ }
+
+ @Test
+ public void testContainsHeader() throws Exception {
+ assertTrue(HttpValueFunctions.containsHeader(headerName).apply(response));
+ assertFalse(HttpValueFunctions.containsHeader("wrong_header").apply(response));
+ }
+
+ @Test
+ public void testStringContents() throws Exception {
+ assertEquals(HttpValueFunctions.stringContentsFunction().apply(response), body);
+ }
+
+ @Test
+ public void testJsonContents() throws Exception {
+ JsonElement json = HttpValueFunctions.jsonContents().apply(response);
+ assertTrue(json.isJsonObject());
+ assertEquals(json.getAsJsonObject().entrySet(), ImmutableMap.of(bodyKey, new JsonPrimitive(bodyVal)).entrySet());
+ }
+
+ @Test
+ public void testJsonContentsGettingElement() throws Exception {
+ assertEquals(HttpValueFunctions.jsonContents(bodyKey, String.class).apply(response), bodyVal);
+ }
+
+ @Test(expectedExceptions=NoSuchElementException.class)
+ public void testJsonContentsGettingMissingElement() throws Exception {
+ assertNull(HttpValueFunctions.jsonContents("wrongkey", String.class).apply(response));
+ }
+
+ @Test
+ public void testLatency() throws Exception {
+ assertEquals(HttpValueFunctions.latency().apply(response), Long.valueOf(fullLatency));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/test/java/org/apache/brooklyn/feed/http/JsonFunctionsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/feed/http/JsonFunctionsTest.java b/core/src/test/java/org/apache/brooklyn/feed/http/JsonFunctionsTest.java
new file mode 100644
index 0000000..928035e
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/feed/http/JsonFunctionsTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.http;
+
+import java.util.NoSuchElementException;
+
+import org.apache.brooklyn.feed.http.JsonFunctions;
+import org.apache.brooklyn.util.collections.Jsonya;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.Jsonya.Navigator;
+import org.apache.brooklyn.util.guava.Functionals;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import com.jayway.jsonpath.PathNotFoundException;
+
+public class JsonFunctionsTest {
+
+ public static JsonElement europeMap() {
+ Navigator<MutableMap<Object, Object>> europe = Jsonya.newInstance().at("europe", "uk", "edinburgh")
+ .put("population", 500*1000)
+ .put("weather", "wet", "lighting", "dark")
+ .root().at("europe").at("france").put("population", 80*1000*1000)
+ .root();
+ return new JsonParser().parse( europe.toString() );
+ }
+
+ @Test
+ public void testWalk1() {
+ JsonElement pop = JsonFunctions.walk("europe", "france", "population").apply(europeMap());
+ Assert.assertEquals( (int)JsonFunctions.cast(Integer.class).apply(pop), 80*1000*1000 );
+ }
+
+ @Test
+ public void testWalk2() {
+ String weather = Functionals.chain(
+ JsonFunctions.walk("europe.uk.edinburgh.weather"),
+ JsonFunctions.cast(String.class) ).apply(europeMap());
+ Assert.assertEquals(weather, "wet");
+ }
+
+ @Test(expectedExceptions=NoSuchElementException.class)
+ public void testWalkWrong() {
+ Functionals.chain(
+ JsonFunctions.walk("europe", "spain", "barcelona"),
+ JsonFunctions.cast(String.class) ).apply(europeMap());
+ }
+
+
+ @Test
+ public void testWalkM() {
+ Maybe<JsonElement> pop = JsonFunctions.walkM("europe", "france", "population").apply( Maybe.of(europeMap()) );
+ Assert.assertEquals( (int)JsonFunctions.castM(Integer.class).apply(pop), 80*1000*1000 );
+ }
+
+ @Test
+ public void testWalkMWrong1() {
+ Maybe<JsonElement> m = JsonFunctions.walkM("europe", "spain", "barcelona").apply( Maybe.of( europeMap()) );
+ Assert.assertTrue(m.isAbsent());
+ }
+
+ @Test(expectedExceptions=Exception.class)
+ public void testWalkMWrong2() {
+ Maybe<JsonElement> m = JsonFunctions.walkM("europe", "spain", "barcelona").apply( Maybe.of( europeMap()) );
+ JsonFunctions.castM(String.class).apply(m);
+ }
+
+
+ @Test
+ public void testWalkN() {
+ JsonElement pop = JsonFunctions.walkN("europe", "france", "population").apply( europeMap() );
+ Assert.assertEquals( (int)JsonFunctions.cast(Integer.class).apply(pop), 80*1000*1000 );
+ }
+
+ @Test
+ public void testWalkNWrong1() {
+ JsonElement m = JsonFunctions.walkN("europe", "spain", "barcelona").apply( europeMap() );
+ Assert.assertNull(m);
+ }
+
+ public void testWalkNWrong2() {
+ JsonElement m = JsonFunctions.walkN("europe", "spain", "barcelona").apply( europeMap() );
+ String n = JsonFunctions.cast(String.class).apply(m);
+ Assert.assertNull(n);
+ }
+
+ @Test
+ public void testGetPath1(){
+ Integer obj = (Integer) JsonFunctions.getPath("$.europe.uk.edinburgh.population").apply(europeMap());
+ Assert.assertEquals((int) obj, 500*1000);
+ }
+
+ @Test
+ public void testGetPath2(){
+ String obj = (String) JsonFunctions.getPath("$.europe.uk.edinburgh.lighting").apply(europeMap());
+ Assert.assertEquals(obj, "dark");
+ }
+
+ @Test
+ public void testGetMissingPathIsNullOrThrows(){
+ try {
+ // TODO is there a way to force this to return null if not found?
+ // for me (Alex) it throws but for others it seems to return null
+ Object obj = JsonFunctions.getPath("$.europe.spain.malaga").apply(europeMap());
+ Assert.assertNull(obj);
+ } catch (PathNotFoundException e) {
+ // not unexpected
+ }
+ }
+
+}