You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by ge...@apache.org on 2017/03/03 12:01:32 UTC
[3/6] brooklyn-server git commit: Move SshFeed to AbstractCommandFeed
Move SshFeed to AbstractCommandFeed
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/a12f4a76
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/a12f4a76
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/a12f4a76
Branch: refs/heads/master
Commit: a12f4a761475c0622bbdf642f9f4b80ad4a2729d
Parents: 1aa222a
Author: Valentin Aitken <bo...@gmail.com>
Authored: Tue Feb 14 19:23:21 2017 +0200
Committer: Valentin Aitken <bo...@gmail.com>
Committed: Fri Mar 3 10:03:39 2017 +0200
----------------------------------------------------------------------
.../brooklyn/feed/AbstractCommandFeed.java | 285 +++++++++++++++++++
.../org/apache/brooklyn/feed/ssh/SshFeed.java | 285 -------------------
2 files changed, 285 insertions(+), 285 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a12f4a76/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
new file mode 100644
index 0000000..f21d8da
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.ssh;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.feed.AbstractFeed;
+import org.apache.brooklyn.core.feed.AttributePollHandler;
+import org.apache.brooklyn.core.feed.DelegatingPollHandler;
+import org.apache.brooklyn.core.feed.Poller;
+import org.apache.brooklyn.core.location.Locations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.internal.ssh.SshTool;
+import org.apache.brooklyn.util.time.Duration;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * Provides a feed of attribute values, by polling over ssh.
+ *
+ * Example usage (e.g. in an entity that extends SoftwareProcessImpl):
+ * <pre>
+ * {@code
+ * private SshFeed feed;
+ *
+ * //@Override
+ * protected void connectSensors() {
+ * super.connectSensors();
+ *
+ * feed = SshFeed.builder()
+ * .entity(this)
+ * .machine(mySshMachineLachine)
+ * .poll(new SshPollConfig<Boolean>(SERVICE_UP)
+ * .command("rabbitmqctl -q status")
+ * .onSuccess(new Function<SshPollValue, Boolean>() {
+ * public Boolean apply(SshPollValue input) {
+ * return (input.getExitStatus() == 0);
+ * }}))
+ * .build();
+ * }
+ *
+ * {@literal @}Override
+ * protected void disconnectSensors() {
+ * super.disconnectSensors();
+ * if (feed != null) feed.stop();
+ * }
+ * }
+ * </pre>
+ *
+ * @author aled
+ */
+public class SshFeed extends AbstractFeed {
+
+ public static final Logger log = LoggerFactory.getLogger(SshFeed.class);
+
+ @SuppressWarnings("serial")
+ public static final ConfigKey<Supplier<SshMachineLocation>> MACHINE = ConfigKeys.newConfigKey(
+ new TypeToken<Supplier<SshMachineLocation>>() {},
+ "machine");
+
+ public static final ConfigKey<Boolean> EXEC_AS_COMMAND = ConfigKeys.newBooleanConfigKey("execAsCommand");
+
+ @SuppressWarnings("serial")
+ public static final ConfigKey<SetMultimap<SshPollIdentifier, SshPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
+ new TypeToken<SetMultimap<SshPollIdentifier, SshPollConfig<?>>>() {},
+ "polls");
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private Entity entity;
+ private boolean onlyIfServiceUp = false;
+ private Supplier<SshMachineLocation> machine;
+ private Duration period = Duration.of(500, TimeUnit.MILLISECONDS);
+ private List<SshPollConfig<?>> polls = Lists.newArrayList();
+ private boolean execAsCommand = false;
+ private String uniqueTag;
+ private volatile boolean built;
+
+ public Builder entity(Entity val) {
+ this.entity = val;
+ return this;
+ }
+ public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); }
+ public Builder onlyIfServiceUp(boolean onlyIfServiceUp) {
+ this.onlyIfServiceUp = onlyIfServiceUp;
+ return this;
+ }
+ /** optional, to force a machine; otherwise it is inferred from the entity */
+ public Builder machine(SshMachineLocation val) { return machine(Suppliers.ofInstance(val)); }
+ /** optional, to force a machine; otherwise it is inferred from the entity */
+ public Builder machine(Supplier<SshMachineLocation> val) {
+ this.machine = val;
+ return this;
+ }
+ public Builder period(Duration period) {
+ this.period = period;
+ return this;
+ }
+ public Builder period(long millis) {
+ return period(Duration.of(millis, TimeUnit.MILLISECONDS));
+ }
+ public Builder period(long val, TimeUnit units) {
+ return period(Duration.of(val, units));
+ }
+ public Builder poll(SshPollConfig<?> config) {
+ polls.add(config);
+ return this;
+ }
+ public Builder execAsCommand() {
+ execAsCommand = true;
+ return this;
+ }
+ public Builder execAsScript() {
+ execAsCommand = false;
+ return this;
+ }
+ public Builder uniqueTag(String uniqueTag) {
+ this.uniqueTag = uniqueTag;
+ return this;
+ }
+ public SshFeed build() {
+ built = true;
+ SshFeed result = new SshFeed(this);
+ result.setEntity(checkNotNull((EntityLocal)entity, "entity"));
+ result.start();
+ return result;
+ }
+ @Override
+ protected void finalize() {
+ if (!built) log.warn("SshFeed.Builder created, but build() never called");
+ }
+ }
+
+ private static class SshPollIdentifier {
+ final Supplier<String> command;
+ final Supplier<Map<String, String>> env;
+
+ private SshPollIdentifier(Supplier<String> command, Supplier<Map<String, String>> env) {
+ this.command = checkNotNull(command, "command");
+ this.env = checkNotNull(env, "env");
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(command, env);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof SshPollIdentifier)) {
+ return false;
+ }
+ SshPollIdentifier o = (SshPollIdentifier) other;
+ return Objects.equal(command, o.command) &&
+ Objects.equal(env, o.env);
+ }
+ }
+
+ /**
+ * For rebind; do not call directly; use builder
+ */
+ public SshFeed() {
+ }
+
+ protected SshFeed(final Builder builder) {
+ config().set(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp);
+ config().set(MACHINE, builder.machine);
+ config().set(EXEC_AS_COMMAND, builder.execAsCommand);
+
+ SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = HashMultimap.<SshPollIdentifier,SshPollConfig<?>>create();
+ for (SshPollConfig<?> config : builder.polls) {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ SshPollConfig<?> configCopy = new SshPollConfig(config);
+ if (configCopy.getPeriod() < 0) configCopy.period(builder.period);
+ polls.put(new SshPollIdentifier(config.getCommandSupplier(), config.getEnvSupplier()), configCopy);
+ }
+ config().set(POLLS, polls);
+ initUniqueTag(builder.uniqueTag, polls.values());
+ }
+
+ protected SshMachineLocation getMachine() {
+ Supplier<SshMachineLocation> supplier = config().get(MACHINE);
+ if (supplier != null) {
+ return supplier.get();
+ } else {
+ return Locations.findUniqueSshMachineLocation(entity.getLocations()).get();
+ }
+ }
+
+ @Override
+ protected void preStart() {
+ SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = config().get(POLLS);
+
+ for (final SshPollIdentifier pollInfo : polls.keySet()) {
+ Set<SshPollConfig<?>> configs = polls.get(pollInfo);
+ long minPeriod = Integer.MAX_VALUE;
+ Set<AttributePollHandler<? super SshPollValue>> handlers = Sets.newLinkedHashSet();
+
+ for (SshPollConfig<?> config : configs) {
+ handlers.add(new AttributePollHandler<SshPollValue>(config, entity, this));
+ if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
+ }
+
+ getPoller().scheduleAtFixedRate(
+ new Callable<SshPollValue>() {
+ @Override
+ public SshPollValue call() throws Exception {
+ return exec(pollInfo.command.get(), pollInfo.env.get());
+ }},
+ new DelegatingPollHandler<SshPollValue>(handlers),
+ minPeriod);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Poller<SshPollValue> getPoller() {
+ return (Poller<SshPollValue>) super.getPoller();
+ }
+
+ private SshPollValue exec(String command, Map<String,String> env) throws IOException {
+ SshMachineLocation machine = getMachine();
+ Boolean execAsCommand = config().get(EXEC_AS_COMMAND);
+ if (log.isTraceEnabled()) log.trace("Ssh polling for {}, executing {} with env {}", new Object[] {machine, command, env});
+ ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+ ByteArrayOutputStream stderr = new ByteArrayOutputStream();
+
+ int exitStatus;
+ ConfigBag flags = ConfigBag.newInstanceExtending(config().getBag())
+ .configure(SshTool.PROP_NO_EXTRA_OUTPUT, true)
+ .configure(SshTool.PROP_OUT_STREAM, stdout)
+ .configure(SshTool.PROP_ERR_STREAM, stderr);
+ if (Boolean.TRUE.equals(execAsCommand)) {
+ exitStatus = machine.execCommands(flags.getAllConfig(),
+ "ssh-feed", ImmutableList.of(command), env);
+ } else {
+ exitStatus = machine.execScript(flags.getAllConfig(),
+ "ssh-feed", ImmutableList.of(command), env);
+ }
+
+ return new SshPollValue(machine, exitStatus, new String(stdout.toByteArray()), new String(stderr.toByteArray()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a12f4a76/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
deleted file mode 100644
index f21d8da..0000000
--- a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.feed.ssh;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.feed.AbstractFeed;
-import org.apache.brooklyn.core.feed.AttributePollHandler;
-import org.apache.brooklyn.core.feed.DelegatingPollHandler;
-import org.apache.brooklyn.core.feed.Poller;
-import org.apache.brooklyn.core.location.Locations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.location.ssh.SshMachineLocation;
-import org.apache.brooklyn.util.core.config.ConfigBag;
-import org.apache.brooklyn.util.core.internal.ssh.SshTool;
-import org.apache.brooklyn.util.time.Duration;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
-import com.google.common.reflect.TypeToken;
-
-/**
- * Provides a feed of attribute values, by polling over ssh.
- *
- * Example usage (e.g. in an entity that extends SoftwareProcessImpl):
- * <pre>
- * {@code
- * private SshFeed feed;
- *
- * //@Override
- * protected void connectSensors() {
- * super.connectSensors();
- *
- * feed = SshFeed.builder()
- * .entity(this)
- * .machine(mySshMachineLachine)
- * .poll(new SshPollConfig<Boolean>(SERVICE_UP)
- * .command("rabbitmqctl -q status")
- * .onSuccess(new Function<SshPollValue, Boolean>() {
- * public Boolean apply(SshPollValue input) {
- * return (input.getExitStatus() == 0);
- * }}))
- * .build();
- * }
- *
- * {@literal @}Override
- * protected void disconnectSensors() {
- * super.disconnectSensors();
- * if (feed != null) feed.stop();
- * }
- * }
- * </pre>
- *
- * @author aled
- */
-public class SshFeed extends AbstractFeed {
-
- public static final Logger log = LoggerFactory.getLogger(SshFeed.class);
-
- @SuppressWarnings("serial")
- public static final ConfigKey<Supplier<SshMachineLocation>> MACHINE = ConfigKeys.newConfigKey(
- new TypeToken<Supplier<SshMachineLocation>>() {},
- "machine");
-
- public static final ConfigKey<Boolean> EXEC_AS_COMMAND = ConfigKeys.newBooleanConfigKey("execAsCommand");
-
- @SuppressWarnings("serial")
- public static final ConfigKey<SetMultimap<SshPollIdentifier, SshPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
- new TypeToken<SetMultimap<SshPollIdentifier, SshPollConfig<?>>>() {},
- "polls");
-
- public static Builder builder() {
- return new Builder();
- }
-
- public static class Builder {
- private Entity entity;
- private boolean onlyIfServiceUp = false;
- private Supplier<SshMachineLocation> machine;
- private Duration period = Duration.of(500, TimeUnit.MILLISECONDS);
- private List<SshPollConfig<?>> polls = Lists.newArrayList();
- private boolean execAsCommand = false;
- private String uniqueTag;
- private volatile boolean built;
-
- public Builder entity(Entity val) {
- this.entity = val;
- return this;
- }
- public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); }
- public Builder onlyIfServiceUp(boolean onlyIfServiceUp) {
- this.onlyIfServiceUp = onlyIfServiceUp;
- return this;
- }
- /** optional, to force a machine; otherwise it is inferred from the entity */
- public Builder machine(SshMachineLocation val) { return machine(Suppliers.ofInstance(val)); }
- /** optional, to force a machine; otherwise it is inferred from the entity */
- public Builder machine(Supplier<SshMachineLocation> val) {
- this.machine = val;
- return this;
- }
- public Builder period(Duration period) {
- this.period = period;
- return this;
- }
- public Builder period(long millis) {
- return period(Duration.of(millis, TimeUnit.MILLISECONDS));
- }
- public Builder period(long val, TimeUnit units) {
- return period(Duration.of(val, units));
- }
- public Builder poll(SshPollConfig<?> config) {
- polls.add(config);
- return this;
- }
- public Builder execAsCommand() {
- execAsCommand = true;
- return this;
- }
- public Builder execAsScript() {
- execAsCommand = false;
- return this;
- }
- public Builder uniqueTag(String uniqueTag) {
- this.uniqueTag = uniqueTag;
- return this;
- }
- public SshFeed build() {
- built = true;
- SshFeed result = new SshFeed(this);
- result.setEntity(checkNotNull((EntityLocal)entity, "entity"));
- result.start();
- return result;
- }
- @Override
- protected void finalize() {
- if (!built) log.warn("SshFeed.Builder created, but build() never called");
- }
- }
-
- private static class SshPollIdentifier {
- final Supplier<String> command;
- final Supplier<Map<String, String>> env;
-
- private SshPollIdentifier(Supplier<String> command, Supplier<Map<String, String>> env) {
- this.command = checkNotNull(command, "command");
- this.env = checkNotNull(env, "env");
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(command, env);
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof SshPollIdentifier)) {
- return false;
- }
- SshPollIdentifier o = (SshPollIdentifier) other;
- return Objects.equal(command, o.command) &&
- Objects.equal(env, o.env);
- }
- }
-
- /**
- * For rebind; do not call directly; use builder
- */
- public SshFeed() {
- }
-
- protected SshFeed(final Builder builder) {
- config().set(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp);
- config().set(MACHINE, builder.machine);
- config().set(EXEC_AS_COMMAND, builder.execAsCommand);
-
- SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = HashMultimap.<SshPollIdentifier,SshPollConfig<?>>create();
- for (SshPollConfig<?> config : builder.polls) {
- @SuppressWarnings({ "unchecked", "rawtypes" })
- SshPollConfig<?> configCopy = new SshPollConfig(config);
- if (configCopy.getPeriod() < 0) configCopy.period(builder.period);
- polls.put(new SshPollIdentifier(config.getCommandSupplier(), config.getEnvSupplier()), configCopy);
- }
- config().set(POLLS, polls);
- initUniqueTag(builder.uniqueTag, polls.values());
- }
-
- protected SshMachineLocation getMachine() {
- Supplier<SshMachineLocation> supplier = config().get(MACHINE);
- if (supplier != null) {
- return supplier.get();
- } else {
- return Locations.findUniqueSshMachineLocation(entity.getLocations()).get();
- }
- }
-
- @Override
- protected void preStart() {
- SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = config().get(POLLS);
-
- for (final SshPollIdentifier pollInfo : polls.keySet()) {
- Set<SshPollConfig<?>> configs = polls.get(pollInfo);
- long minPeriod = Integer.MAX_VALUE;
- Set<AttributePollHandler<? super SshPollValue>> handlers = Sets.newLinkedHashSet();
-
- for (SshPollConfig<?> config : configs) {
- handlers.add(new AttributePollHandler<SshPollValue>(config, entity, this));
- if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
- }
-
- getPoller().scheduleAtFixedRate(
- new Callable<SshPollValue>() {
- @Override
- public SshPollValue call() throws Exception {
- return exec(pollInfo.command.get(), pollInfo.env.get());
- }},
- new DelegatingPollHandler<SshPollValue>(handlers),
- minPeriod);
- }
- }
-
- @Override
- @SuppressWarnings("unchecked")
- protected Poller<SshPollValue> getPoller() {
- return (Poller<SshPollValue>) super.getPoller();
- }
-
- private SshPollValue exec(String command, Map<String,String> env) throws IOException {
- SshMachineLocation machine = getMachine();
- Boolean execAsCommand = config().get(EXEC_AS_COMMAND);
- if (log.isTraceEnabled()) log.trace("Ssh polling for {}, executing {} with env {}", new Object[] {machine, command, env});
- ByteArrayOutputStream stdout = new ByteArrayOutputStream();
- ByteArrayOutputStream stderr = new ByteArrayOutputStream();
-
- int exitStatus;
- ConfigBag flags = ConfigBag.newInstanceExtending(config().getBag())
- .configure(SshTool.PROP_NO_EXTRA_OUTPUT, true)
- .configure(SshTool.PROP_OUT_STREAM, stdout)
- .configure(SshTool.PROP_ERR_STREAM, stderr);
- if (Boolean.TRUE.equals(execAsCommand)) {
- exitStatus = machine.execCommands(flags.getAllConfig(),
- "ssh-feed", ImmutableList.of(command), env);
- } else {
- exitStatus = machine.execScript(flags.getAllConfig(),
- "ssh-feed", ImmutableList.of(command), env);
- }
-
- return new SshPollValue(machine, exitStatus, new String(stdout.toByteArray()), new String(stderr.toByteArray()));
- }
-}