You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:46:46 UTC
[28/50] brooklyn-library git commit: Move common driver code to
shared parent class
Move common driver code to shared parent class
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/df84b662
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/df84b662
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/df84b662
Branch: refs/heads/0.5.0
Commit: df84b662b5d8dfb757f713ae997065d4a8f7882d
Parents: a24e0e4
Author: Andrew Kennedy <an...@cloudsoftcorp.com>
Authored: Thu Mar 21 02:22:57 2013 +0000
Committer: Andrew Kennedy <an...@cloudsoftcorp.com>
Committed: Fri Apr 19 10:36:07 2013 +0100
----------------------------------------------------------------------
.../kafka/AbstractfKafkaSshDriver.java | 170 +++++++++++++++++++
.../messaging/kafka/KafkaBrokerSshDriver.java | 129 ++------------
.../kafka/KafkaZookeeperSshDriver.java | 129 ++------------
3 files changed, 198 insertions(+), 230 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/df84b662/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
new file mode 100644
index 0000000..f6c7c8d
--- /dev/null
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import static java.lang.String.format;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.BrooklynVersion;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.entity.basic.lifecycle.CommonCommands;
+import brooklyn.entity.drivers.downloads.DownloadResolver;
+import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.MutableMap;
+import brooklyn.util.NetworkUtils;
+import brooklyn.util.ResourceUtils;
+import brooklyn.util.jmx.jmxrmi.JmxRmiAgent;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public abstract class AbstractfKafkaSshDriver extends JavaSoftwareProcessSshDriver {
+
+ private static final Logger log = LoggerFactory.getLogger(KafkaZookeeperSshDriver.class);
+
+ public AbstractfKafkaSshDriver(EntityLocal entity, SshMachineLocation machine) {
+ super(entity, machine);
+ }
+
+ protected abstract Map<String, Integer> getPortMap();
+
+ protected abstract ConfigKey<String> getConfigTemplateKey();
+
+ protected abstract String getConfigFileName();
+
+ protected abstract String getLaunchScriptName();
+
+ protected abstract String getProcessIdentifier();
+
+ private String expandedInstallDir;
+
+ @Override
+ protected String getLogFileLocation() { return getRunDir()+"/console.out"; }
+
+ private String getExpandedInstallDir() {
+ if (expandedInstallDir == null) throw new IllegalStateException("expandedInstallDir is null; most likely install was not called");
+ return expandedInstallDir;
+ }
+
+ @Override
+ public void install() {
+ DownloadResolver resolver = entity.getManagementContext().getEntityDownloadsManager().newDownloader(this);
+ List<String> urls = resolver.getTargets();
+ String saveAs = resolver.getFilename();
+ expandedInstallDir = getInstallDir()+"/"+resolver.getUnpackedDirectoryName(format("kafka-%s-src", getVersion()));
+
+ List<String> commands = new LinkedList<String>();
+ commands.addAll(CommonCommands.downloadUrlAs(urls, saveAs));
+ commands.add(CommonCommands.INSTALL_TAR);
+ commands.add("tar xzfv "+saveAs);
+ commands.add("cd "+expandedInstallDir);
+ commands.add("./sbt update");
+ commands.add("./sbt package");
+
+ newScript(INSTALLING)
+ .failOnNonZeroResultCode()
+ .body.append(commands)
+ .execute();
+ }
+
+ @Override
+ public void customize() {
+ NetworkUtils.checkPortsValid(getPortMap());
+ newScript(CUSTOMIZING)
+ .failOnNonZeroResultCode()
+ .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), getRunDir()))
+ .execute();
+
+ String config = entity.getConfig(getConfigTemplateKey());
+ copyTemplate(config, getConfigFileName());
+
+ // Copy JMX agent Jar to server
+ getMachine().copyTo(new ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), getJmxRmiAgentJarDestinationFilePath());
+ }
+
+ public String getJmxRmiAgentJarBasename() {
+ return "brooklyn-jmxrmi-agent-" + BrooklynVersion.get() + ".jar";
+ }
+
+ public String getJmxRmiAgentJarUrl() {
+ return "classpath://" + getJmxRmiAgentJarBasename();
+ }
+
+ public String getJmxRmiAgentJarDestinationFilePath() {
+ return getRunDir() + "/" + getJmxRmiAgentJarBasename();
+ }
+
+ @Override
+ public void launch() {
+ newScript(ImmutableMap.of("usePidFile", getPidFile()), LAUNCHING)
+ .failOnNonZeroResultCode()
+ .body.append(String.format("nohup ./bin/%s ./%s > console.out 2>&1 &", getLaunchScriptName(), getConfigFileName()))
+ .execute();
+ }
+
+ public String getPidFile() { return getRunDir() + "/kafka.pid"; }
+
+ @Override
+ public boolean isRunning() {
+ return newScript(ImmutableMap.of("usePidFile", getPidFile()), CHECK_RUNNING).execute() == 0;
+ }
+
+ @Override
+ public void stop() {
+ newScript(ImmutableMap.of("usePidFile", false), STOPPING)
+ .body.append(String.format("ps ax | grep %s | awk '{print $1}' | xargs kill", getProcessIdentifier()))
+ .body.append(String.format("ps ax | grep %s | awk '{print $1}' | xargs kill -9", getProcessIdentifier()))
+ .execute();
+ }
+
+ @Override
+ protected Map<String, ?> getJmxJavaSystemProperties() {
+ return MutableMap.<String, Object> builder()
+ .put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort())
+ .put(JmxRmiAgent.RMI_REGISTRY_PORT_PROPERTY, getRmiServerPort())
+ .put("com.sun.management.jmxremote.ssl", false)
+ .put("com.sun.management.jmxremote.authenticate", false)
+ .put("java.rmi.server.hostname", getHostname())
+ .build();
+ }
+
+ @Override
+ protected List<String> getJmxJavaConfigOptions() {
+ return ImmutableList.of("-javaagent:" + getJmxRmiAgentJarDestinationFilePath());
+ }
+
+ /**
+ * Use RMI agent to provide JMX.
+ */
+ @Override
+ public Map<String, String> getShellEnvironment() {
+ Map<String, String> orig = super.getShellEnvironment();
+ String kafkaJmxOpts = orig.remove("JAVA_OPTS");
+ return MutableMap.<String, String>builder()
+ .putAll(orig)
+ .put("KAFKA_JMX_OPTS", kafkaJmxOpts)
+ .build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/df84b662/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
index b3cd0f0..40e7234 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
@@ -15,147 +15,46 @@
*/
package brooklyn.entity.messaging.kafka;
-import static java.lang.String.format;
-
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.BrooklynVersion;
-import brooklyn.entity.basic.lifecycle.CommonCommands;
-import brooklyn.entity.drivers.downloads.DownloadResolver;
-import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import brooklyn.config.ConfigKey;
import brooklyn.location.basic.SshMachineLocation;
import brooklyn.util.MutableMap;
-import brooklyn.util.NetworkUtils;
-import brooklyn.util.ResourceUtils;
-import brooklyn.util.jmx.jmxrmi.JmxRmiAgent;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-public class KafkaBrokerSshDriver extends JavaSoftwareProcessSshDriver implements KafkaBrokerDriver {
- private static final Logger log = LoggerFactory.getLogger(KafkaBrokerSshDriver.class);
-
- private String expandedInstallDir;
+public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements KafkaBrokerDriver {
public KafkaBrokerSshDriver(KafkaBrokerImpl entity, SshMachineLocation machine) {
super(entity, machine);
}
@Override
- protected String getLogFileLocation() { return getRunDir()+"/console.out"; }
-
- @Override
- public Integer getKafkaPort() { return entity.getAttribute(KafkaBroker.KAFKA_PORT); }
-
- private String getExpandedInstallDir() {
- if (expandedInstallDir == null) throw new IllegalStateException("expandedInstallDir is null; most likely install was not called");
- return expandedInstallDir;
- }
-
- @Override
- public void install() {
- DownloadResolver resolver = entity.getManagementContext().getEntityDownloadsManager().newDownloader(this);
- List<String> urls = resolver.getTargets();
- String saveAs = resolver.getFilename();
- expandedInstallDir = getInstallDir()+"/"+resolver.getUnpackedDirectoryName(format("kafka-%s-src", getVersion()));
-
- List<String> commands = new LinkedList<String>();
- commands.addAll(CommonCommands.downloadUrlAs(urls, saveAs));
- commands.add(CommonCommands.INSTALL_TAR);
- commands.add("tar xzfv "+saveAs);
- commands.add("cd "+expandedInstallDir);
- commands.add("./sbt update");
- commands.add("./sbt package");
-
- newScript(INSTALLING)
- .failOnNonZeroResultCode()
- .body.append(commands)
- .execute();
- }
-
- @Override
- public void customize() {
- NetworkUtils.checkPortsValid(MutableMap.of("kafkaPort", getKafkaPort()));
- newScript(CUSTOMIZING)
- .failOnNonZeroResultCode()
- .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), getRunDir()))
- .execute();
-
- String serverConfig = entity.getConfig(KafkaBroker.SERVER_CONFIG_TEMPLATE);
- copyTemplate(serverConfig, "server.properties");
-
- // Copy JMX agent Jar to server
- getMachine().copyTo(new ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), getJmxRmiAgentJarDestinationFilePath());
- }
-
- public String getJmxRmiAgentJarBasename() {
- return "brooklyn-jmxrmi-agent-" + BrooklynVersion.get() + ".jar";
- }
-
- public String getJmxRmiAgentJarUrl() {
- return "classpath://" + getJmxRmiAgentJarBasename();
- }
-
- public String getJmxRmiAgentJarDestinationFilePath() {
- return getRunDir() + "/" + getJmxRmiAgentJarBasename();
- }
-
- @Override
- public void launch() {
- newScript(ImmutableMap.of("usePidFile", getPidFile()), LAUNCHING)
- .failOnNonZeroResultCode()
- .body.append("nohup ./bin/kafka-server-start.sh ./server.properties > console.out 2>&1 &")
- .execute();
+ protected Map<String, Integer> getPortMap() {
+ return MutableMap.of("kafkaPort", getKafkaPort());
}
- public String getPidFile() { return getRunDir() + "/kafka.pid"; }
-
@Override
- public boolean isRunning() {
- return newScript(ImmutableMap.of("usePidFile", getPidFile()), CHECK_RUNNING).execute() == 0;
+ protected ConfigKey<String> getConfigTemplateKey() {
+ return KafkaBroker.SERVER_CONFIG_TEMPLATE;
}
@Override
- public void stop() {
- newScript(ImmutableMap.of("usePidFile", false), STOPPING)
- .body.append("ps ax | grep kafka\\.Kafka | awk '{print $1}' | xargs kill")
- .body.append("ps ax | grep kafka\\.Kafka | awk '{print $1}' | xargs kill -9")
- .execute();
+ protected String getConfigFileName() {
+ return "server.properties";
}
@Override
- protected Map<String, ?> getJmxJavaSystemProperties() {
- return MutableMap.<String, Object> builder()
- .put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort())
- .put(JmxRmiAgent.RMI_REGISTRY_PORT_PROPERTY, getRmiServerPort())
- .put("com.sun.management.jmxremote.ssl", false)
- .put("com.sun.management.jmxremote.authenticate", false)
- .put("java.rmi.server.hostname", getHostname())
- .build();
+ protected String getLaunchScriptName() {
+ return "kafka-server-start.sh";
}
@Override
- protected List<String> getJmxJavaConfigOptions() {
- return ImmutableList.of("-javaagent:" + getJmxRmiAgentJarDestinationFilePath());
+ protected String getProcessIdentifier() {
+ return "kafka\\.Kafka";
}
- /**
- * Use RMI agent to provide JMX.
- */
@Override
- public Map<String, String> getShellEnvironment() {
- Map<String, String> orig = super.getShellEnvironment();
- String kafkaJmxOpts = orig.remove("JAVA_OPTS");
- return MutableMap.<String, String>builder()
- .putAll(orig)
- .put("KAFKA_JMX_OPTS", kafkaJmxOpts)
- .build();
+ public Integer getKafkaPort() {
+ return getEntity().getAttribute(KafkaBroker.KAFKA_PORT);
}
}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/df84b662/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
index c62cb0a..a35aab6 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
@@ -15,147 +15,46 @@
*/
package brooklyn.entity.messaging.kafka;
-import static java.lang.String.format;
-
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.BrooklynVersion;
-import brooklyn.entity.basic.lifecycle.CommonCommands;
-import brooklyn.entity.drivers.downloads.DownloadResolver;
-import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import brooklyn.config.ConfigKey;
import brooklyn.location.basic.SshMachineLocation;
import brooklyn.util.MutableMap;
-import brooklyn.util.NetworkUtils;
-import brooklyn.util.ResourceUtils;
-import brooklyn.util.jmx.jmxrmi.JmxRmiAgent;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-public class KafkaZookeeperSshDriver extends JavaSoftwareProcessSshDriver implements KafkaZookeeperDriver {
- private static final Logger log = LoggerFactory.getLogger(KafkaZookeeperSshDriver.class);
-
- private String expandedInstallDir;
+public class KafkaZookeeperSshDriver extends AbstractfKafkaSshDriver implements KafkaZookeeperDriver {
public KafkaZookeeperSshDriver(KafkaZookeeperImpl entity, SshMachineLocation machine) {
super(entity, machine);
}
@Override
- protected String getLogFileLocation() { return getRunDir()+"/console.out"; }
-
- @Override
- public Integer getZookeeperPort() { return entity.getAttribute(KafkaZookeeper.ZOOKEEPER_PORT); }
-
- private String getExpandedInstallDir() {
- if (expandedInstallDir == null) throw new IllegalStateException("expandedInstallDir is null; most likely install was not called");
- return expandedInstallDir;
- }
-
- @Override
- public void install() {
- DownloadResolver resolver = entity.getManagementContext().getEntityDownloadsManager().newDownloader(this);
- List<String> urls = resolver.getTargets();
- String saveAs = resolver.getFilename();
- expandedInstallDir = getInstallDir()+"/"+resolver.getUnpackedDirectoryName(format("kafka-%s-src", getVersion()));
-
- List<String> commands = new LinkedList<String>();
- commands.addAll(CommonCommands.downloadUrlAs(urls, saveAs));
- commands.add(CommonCommands.INSTALL_TAR);
- commands.add("tar xzfv "+saveAs);
- commands.add("cd "+expandedInstallDir);
- commands.add("./sbt update");
- commands.add("./sbt package");
-
- newScript(INSTALLING)
- .failOnNonZeroResultCode()
- .body.append(commands)
- .execute();
- }
-
- @Override
- public void customize() {
- NetworkUtils.checkPortsValid(MutableMap.of("zookeeperPort", getZookeeperPort()));
- newScript(CUSTOMIZING)
- .failOnNonZeroResultCode()
- .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), getRunDir()))
- .execute();
-
- String zookeeperConfig = entity.getConfig(KafkaZookeeper.ZOOKEEPER_CONFIG_TEMPLATE);
- copyTemplate(zookeeperConfig, "zookeeper.properties");
-
- // Copy JMX agent Jar to server
- getMachine().copyTo(new ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), getJmxRmiAgentJarDestinationFilePath());
- }
-
- public String getJmxRmiAgentJarBasename() {
- return "brooklyn-jmxrmi-agent-" + BrooklynVersion.get() + ".jar";
- }
-
- public String getJmxRmiAgentJarUrl() {
- return "classpath://" + getJmxRmiAgentJarBasename();
- }
-
- public String getJmxRmiAgentJarDestinationFilePath() {
- return getRunDir() + "/" + getJmxRmiAgentJarBasename();
- }
-
- @Override
- public void launch() {
- newScript(ImmutableMap.of("usePidFile", getPidFile()), LAUNCHING)
- .failOnNonZeroResultCode()
- .body.append("nohup ./bin/zookeeper-server-start.sh ./zookeeper.properties > console.out 2>&1 &")
- .execute();
+ protected Map<String, Integer> getPortMap() {
+ return MutableMap.of("zookeeperPort", getZookeeperPort());
}
- public String getPidFile() { return getRunDir() + "/kafka.pid"; }
-
@Override
- public boolean isRunning() {
- return newScript(ImmutableMap.of("usePidFile", getPidFile()), CHECK_RUNNING).execute() == 0;
+ protected ConfigKey<String> getConfigTemplateKey() {
+ return KafkaZookeeper.ZOOKEEPER_CONFIG_TEMPLATE;
}
@Override
- public void stop() {
- newScript(ImmutableMap.of("usePidFile", false), STOPPING)
- .body.append("ps ax | grep quorum\\.QuorumPeerMain | awk '{print $1}' | xargs kill")
- .body.append("ps ax | grep quorum\\.QuorumPeerMain | awk '{print $1}' | xargs kill -9")
- .execute();
+ protected String getConfigFileName() {
+ return "zookeeper.properties";
}
@Override
- protected Map<String, ?> getJmxJavaSystemProperties() {
- return MutableMap.<String, Object> builder()
- .put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort())
- .put(JmxRmiAgent.RMI_REGISTRY_PORT_PROPERTY, getRmiServerPort())
- .put("com.sun.management.jmxremote.ssl", false)
- .put("com.sun.management.jmxremote.authenticate", false)
- .put("java.rmi.server.hostname", getHostname())
- .build();
+ protected String getLaunchScriptName() {
+ return "zookeeper-server-start.sh";
}
@Override
- protected List<String> getJmxJavaConfigOptions() {
- return ImmutableList.of("-javaagent:" + getJmxRmiAgentJarDestinationFilePath());
+ protected String getProcessIdentifier() {
+ return "quorum\\.QuorumPeerMain";
}
- /**
- * Use RMI agent to provide JMX.
- */
@Override
- public Map<String, String> getShellEnvironment() {
- Map<String, String> orig = super.getShellEnvironment();
- String kafkaJmxOpts = orig.remove("JAVA_OPTS");
- return MutableMap.<String, String>builder()
- .putAll(orig)
- .put("KAFKA_JMX_OPTS", kafkaJmxOpts)
- .build();
+ public Integer getZookeeperPort() {
+ return getEntity().getAttribute(KafkaZookeeper.ZOOKEEPER_PORT);
}
}