You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:46:46 UTC

[28/50] brooklyn-library git commit: Move common driver code to shared parent class

Move common driver code to shared parent class


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/df84b662
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/df84b662
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/df84b662

Branch: refs/heads/0.5.0
Commit: df84b662b5d8dfb757f713ae997065d4a8f7882d
Parents: a24e0e4
Author: Andrew Kennedy <an...@cloudsoftcorp.com>
Authored: Thu Mar 21 02:22:57 2013 +0000
Committer: Andrew Kennedy <an...@cloudsoftcorp.com>
Committed: Fri Apr 19 10:36:07 2013 +0100

----------------------------------------------------------------------
 .../kafka/AbstractfKafkaSshDriver.java          | 170 +++++++++++++++++++
 .../messaging/kafka/KafkaBrokerSshDriver.java   | 129 ++------------
 .../kafka/KafkaZookeeperSshDriver.java          | 129 ++------------
 3 files changed, 198 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/df84b662/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
new file mode 100644
index 0000000..f6c7c8d
--- /dev/null
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import static java.lang.String.format;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.BrooklynVersion;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.entity.basic.lifecycle.CommonCommands;
+import brooklyn.entity.drivers.downloads.DownloadResolver;
+import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.MutableMap;
+import brooklyn.util.NetworkUtils;
+import brooklyn.util.ResourceUtils;
+import brooklyn.util.jmx.jmxrmi.JmxRmiAgent;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public abstract class AbstractfKafkaSshDriver extends JavaSoftwareProcessSshDriver {
+
+    private static final Logger log = LoggerFactory.getLogger(KafkaZookeeperSshDriver.class);
+
+    public AbstractfKafkaSshDriver(EntityLocal entity, SshMachineLocation machine) {
+        super(entity, machine);
+    }
+
+    protected abstract Map<String, Integer> getPortMap();
+
+    protected abstract ConfigKey<String> getConfigTemplateKey();
+
+    protected abstract String getConfigFileName();
+
+    protected abstract String getLaunchScriptName();
+
+    protected abstract String getProcessIdentifier();
+
+    private String expandedInstallDir;
+
+    @Override
+    protected String getLogFileLocation() { return getRunDir()+"/console.out"; }
+
+    private String getExpandedInstallDir() {
+        if (expandedInstallDir == null) throw new IllegalStateException("expandedInstallDir is null; most likely install was not called");
+        return expandedInstallDir;
+    }
+
+    @Override
+    public void install() {
+        DownloadResolver resolver = entity.getManagementContext().getEntityDownloadsManager().newDownloader(this);
+        List<String> urls = resolver.getTargets();
+        String saveAs = resolver.getFilename();
+        expandedInstallDir = getInstallDir()+"/"+resolver.getUnpackedDirectoryName(format("kafka-%s-src", getVersion()));
+
+        List<String> commands = new LinkedList<String>();
+        commands.addAll(CommonCommands.downloadUrlAs(urls, saveAs));
+        commands.add(CommonCommands.INSTALL_TAR);
+        commands.add("tar xzfv "+saveAs);
+        commands.add("cd "+expandedInstallDir);
+        commands.add("./sbt update");
+        commands.add("./sbt package");
+
+        newScript(INSTALLING)
+                .failOnNonZeroResultCode()
+                .body.append(commands)
+                .execute();
+    }
+
+    @Override
+    public void customize() {
+        NetworkUtils.checkPortsValid(getPortMap());
+        newScript(CUSTOMIZING)
+                .failOnNonZeroResultCode()
+                .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), getRunDir()))
+                .execute();
+
+        String config = entity.getConfig(getConfigTemplateKey());
+        copyTemplate(config, getConfigFileName());
+
+        // Copy JMX agent Jar to server
+        getMachine().copyTo(new ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), getJmxRmiAgentJarDestinationFilePath());
+    }
+
+    public String getJmxRmiAgentJarBasename() {
+        return "brooklyn-jmxrmi-agent-" + BrooklynVersion.get() + ".jar";
+    }
+
+    public String getJmxRmiAgentJarUrl() {
+        return "classpath://" + getJmxRmiAgentJarBasename();
+    }
+
+    public String getJmxRmiAgentJarDestinationFilePath() {
+        return getRunDir() + "/" + getJmxRmiAgentJarBasename();
+    }
+
+    @Override
+    public void launch() {
+        newScript(ImmutableMap.of("usePidFile", getPidFile()), LAUNCHING)
+                .failOnNonZeroResultCode()
+                .body.append(String.format("nohup ./bin/%s ./%s > console.out 2>&1 &", getLaunchScriptName(), getConfigFileName()))
+                .execute();
+    }
+
+    public String getPidFile() { return getRunDir() + "/kafka.pid"; }
+
+    @Override
+    public boolean isRunning() {
+        return newScript(ImmutableMap.of("usePidFile", getPidFile()), CHECK_RUNNING).execute() == 0;
+    }
+
+    @Override
+    public void stop() {
+        newScript(ImmutableMap.of("usePidFile", false), STOPPING)
+                .body.append(String.format("ps ax | grep %s | awk '{print $1}' | xargs kill", getProcessIdentifier()))
+                .body.append(String.format("ps ax | grep %s | awk '{print $1}' | xargs kill -9", getProcessIdentifier()))
+                .execute();
+    }
+
+    @Override
+    protected Map<String, ?> getJmxJavaSystemProperties() {
+        return MutableMap.<String, Object> builder()
+                .put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort())
+                .put(JmxRmiAgent.RMI_REGISTRY_PORT_PROPERTY, getRmiServerPort())
+                .put("com.sun.management.jmxremote.ssl", false)
+                .put("com.sun.management.jmxremote.authenticate", false)
+                .put("java.rmi.server.hostname", getHostname())
+                .build();
+    }
+
+    @Override
+    protected List<String> getJmxJavaConfigOptions() {
+        return ImmutableList.of("-javaagent:" + getJmxRmiAgentJarDestinationFilePath());
+    }
+
+    /**
+     * Use RMI agent to provide JMX.
+     */
+    @Override
+    public Map<String, String> getShellEnvironment() {
+        Map<String, String> orig = super.getShellEnvironment();
+        String kafkaJmxOpts = orig.remove("JAVA_OPTS");
+        return MutableMap.<String, String>builder()
+                .putAll(orig)
+                .put("KAFKA_JMX_OPTS", kafkaJmxOpts)
+                .build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/df84b662/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
index b3cd0f0..40e7234 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
@@ -15,147 +15,46 @@
  */
 package brooklyn.entity.messaging.kafka;
 
-import static java.lang.String.format;
-
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.BrooklynVersion;
-import brooklyn.entity.basic.lifecycle.CommonCommands;
-import brooklyn.entity.drivers.downloads.DownloadResolver;
-import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import brooklyn.config.ConfigKey;
 import brooklyn.location.basic.SshMachineLocation;
 import brooklyn.util.MutableMap;
-import brooklyn.util.NetworkUtils;
-import brooklyn.util.ResourceUtils;
-import brooklyn.util.jmx.jmxrmi.JmxRmiAgent;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-public class KafkaBrokerSshDriver extends JavaSoftwareProcessSshDriver implements KafkaBrokerDriver {
 
-    private static final Logger log = LoggerFactory.getLogger(KafkaBrokerSshDriver.class);
-
-    private String expandedInstallDir;
+public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements KafkaBrokerDriver {
 
     public KafkaBrokerSshDriver(KafkaBrokerImpl entity, SshMachineLocation machine) {
         super(entity, machine);
     }
 
     @Override
-    protected String getLogFileLocation() { return getRunDir()+"/console.out"; }
-
-    @Override
-    public Integer getKafkaPort() { return entity.getAttribute(KafkaBroker.KAFKA_PORT); }
-
-    private String getExpandedInstallDir() {
-        if (expandedInstallDir == null) throw new IllegalStateException("expandedInstallDir is null; most likely install was not called");
-        return expandedInstallDir;
-    }
-
-    @Override
-    public void install() {
-        DownloadResolver resolver = entity.getManagementContext().getEntityDownloadsManager().newDownloader(this);
-        List<String> urls = resolver.getTargets();
-        String saveAs = resolver.getFilename();
-        expandedInstallDir = getInstallDir()+"/"+resolver.getUnpackedDirectoryName(format("kafka-%s-src", getVersion()));
-
-        List<String> commands = new LinkedList<String>();
-        commands.addAll(CommonCommands.downloadUrlAs(urls, saveAs));
-        commands.add(CommonCommands.INSTALL_TAR);
-        commands.add("tar xzfv "+saveAs);
-        commands.add("cd "+expandedInstallDir);
-        commands.add("./sbt update");
-        commands.add("./sbt package");
-
-        newScript(INSTALLING)
-                .failOnNonZeroResultCode()
-                .body.append(commands)
-                .execute();
-    }
-
-    @Override
-    public void customize() {
-        NetworkUtils.checkPortsValid(MutableMap.of("kafkaPort", getKafkaPort()));
-        newScript(CUSTOMIZING)
-                .failOnNonZeroResultCode()
-                .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), getRunDir()))
-                .execute();
-
-        String serverConfig = entity.getConfig(KafkaBroker.SERVER_CONFIG_TEMPLATE);
-        copyTemplate(serverConfig, "server.properties");
-
-        // Copy JMX agent Jar to server
-        getMachine().copyTo(new ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), getJmxRmiAgentJarDestinationFilePath());
-    }
-
-    public String getJmxRmiAgentJarBasename() {
-        return "brooklyn-jmxrmi-agent-" + BrooklynVersion.get() + ".jar";
-    }
-
-    public String getJmxRmiAgentJarUrl() {
-        return "classpath://" + getJmxRmiAgentJarBasename();
-    }
-
-    public String getJmxRmiAgentJarDestinationFilePath() {
-        return getRunDir() + "/" + getJmxRmiAgentJarBasename();
-    }
-
-    @Override
-    public void launch() {
-        newScript(ImmutableMap.of("usePidFile", getPidFile()), LAUNCHING)
-                .failOnNonZeroResultCode()
-                .body.append("nohup ./bin/kafka-server-start.sh ./server.properties > console.out 2>&1 &")
-                .execute();
+    protected Map<String, Integer> getPortMap() {
+        return MutableMap.of("kafkaPort", getKafkaPort());
     }
 
-    public String getPidFile() { return getRunDir() + "/kafka.pid"; }
-
     @Override
-    public boolean isRunning() {
-        return newScript(ImmutableMap.of("usePidFile", getPidFile()), CHECK_RUNNING).execute() == 0;
+    protected ConfigKey<String> getConfigTemplateKey() {
+        return KafkaBroker.SERVER_CONFIG_TEMPLATE;
     }
 
     @Override
-    public void stop() {
-        newScript(ImmutableMap.of("usePidFile", false), STOPPING)
-                .body.append("ps ax | grep kafka\\.Kafka | awk '{print $1}' | xargs kill")
-                .body.append("ps ax | grep kafka\\.Kafka | awk '{print $1}' | xargs kill -9")
-                .execute();
+    protected String getConfigFileName() {
+        return "server.properties";
     }
 
     @Override
-    protected Map<String, ?> getJmxJavaSystemProperties() {
-        return MutableMap.<String, Object> builder()
-                .put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort())
-                .put(JmxRmiAgent.RMI_REGISTRY_PORT_PROPERTY, getRmiServerPort())
-                .put("com.sun.management.jmxremote.ssl", false)
-                .put("com.sun.management.jmxremote.authenticate", false)
-                .put("java.rmi.server.hostname", getHostname())
-                .build();
+    protected String getLaunchScriptName() {
+        return "kafka-server-start.sh";
     }
 
     @Override
-    protected List<String> getJmxJavaConfigOptions() {
-        return ImmutableList.of("-javaagent:" + getJmxRmiAgentJarDestinationFilePath());
+    protected String getProcessIdentifier() {
+        return "kafka\\.Kafka";
     }
 
-    /**
-     * Use RMI agent to provide JMX.
-     */
     @Override
-    public Map<String, String> getShellEnvironment() {
-        Map<String, String> orig = super.getShellEnvironment();
-        String kafkaJmxOpts = orig.remove("JAVA_OPTS");
-        return MutableMap.<String, String>builder()
-                .putAll(orig)
-                .put("KAFKA_JMX_OPTS", kafkaJmxOpts)
-                .build();
+    public Integer getKafkaPort() {
+        return getEntity().getAttribute(KafkaBroker.KAFKA_PORT);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/df84b662/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
index c62cb0a..a35aab6 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
@@ -15,147 +15,46 @@
  */
 package brooklyn.entity.messaging.kafka;
 
-import static java.lang.String.format;
-
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.BrooklynVersion;
-import brooklyn.entity.basic.lifecycle.CommonCommands;
-import brooklyn.entity.drivers.downloads.DownloadResolver;
-import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import brooklyn.config.ConfigKey;
 import brooklyn.location.basic.SshMachineLocation;
 import brooklyn.util.MutableMap;
-import brooklyn.util.NetworkUtils;
-import brooklyn.util.ResourceUtils;
-import brooklyn.util.jmx.jmxrmi.JmxRmiAgent;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-public class KafkaZookeeperSshDriver extends JavaSoftwareProcessSshDriver implements KafkaZookeeperDriver {
 
-    private static final Logger log = LoggerFactory.getLogger(KafkaZookeeperSshDriver.class);
-
-    private String expandedInstallDir;
+public class KafkaZookeeperSshDriver extends AbstractfKafkaSshDriver implements KafkaZookeeperDriver {
 
     public KafkaZookeeperSshDriver(KafkaZookeeperImpl entity, SshMachineLocation machine) {
         super(entity, machine);
     }
 
     @Override
-    protected String getLogFileLocation() { return getRunDir()+"/console.out"; }
-
-    @Override
-    public Integer getZookeeperPort() { return entity.getAttribute(KafkaZookeeper.ZOOKEEPER_PORT); }
-
-    private String getExpandedInstallDir() {
-        if (expandedInstallDir == null) throw new IllegalStateException("expandedInstallDir is null; most likely install was not called");
-        return expandedInstallDir;
-    }
-
-    @Override
-    public void install() {
-        DownloadResolver resolver = entity.getManagementContext().getEntityDownloadsManager().newDownloader(this);
-        List<String> urls = resolver.getTargets();
-        String saveAs = resolver.getFilename();
-        expandedInstallDir = getInstallDir()+"/"+resolver.getUnpackedDirectoryName(format("kafka-%s-src", getVersion()));
-
-        List<String> commands = new LinkedList<String>();
-        commands.addAll(CommonCommands.downloadUrlAs(urls, saveAs));
-        commands.add(CommonCommands.INSTALL_TAR);
-        commands.add("tar xzfv "+saveAs);
-        commands.add("cd "+expandedInstallDir);
-        commands.add("./sbt update");
-        commands.add("./sbt package");
-
-        newScript(INSTALLING)
-                .failOnNonZeroResultCode()
-                .body.append(commands)
-                .execute();
-    }
-
-    @Override
-    public void customize() {
-        NetworkUtils.checkPortsValid(MutableMap.of("zookeeperPort", getZookeeperPort()));
-        newScript(CUSTOMIZING)
-                .failOnNonZeroResultCode()
-                .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), getRunDir()))
-                .execute();
-
-        String zookeeperConfig = entity.getConfig(KafkaZookeeper.ZOOKEEPER_CONFIG_TEMPLATE);
-        copyTemplate(zookeeperConfig, "zookeeper.properties");
-
-        // Copy JMX agent Jar to server
-        getMachine().copyTo(new ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), getJmxRmiAgentJarDestinationFilePath());
-    }
-
-    public String getJmxRmiAgentJarBasename() {
-        return "brooklyn-jmxrmi-agent-" + BrooklynVersion.get() + ".jar";
-    }
-
-    public String getJmxRmiAgentJarUrl() {
-        return "classpath://" + getJmxRmiAgentJarBasename();
-    }
-
-    public String getJmxRmiAgentJarDestinationFilePath() {
-        return getRunDir() + "/" + getJmxRmiAgentJarBasename();
-    }
-
-    @Override
-    public void launch() {
-        newScript(ImmutableMap.of("usePidFile", getPidFile()), LAUNCHING)
-                .failOnNonZeroResultCode()
-                .body.append("nohup ./bin/zookeeper-server-start.sh ./zookeeper.properties > console.out 2>&1 &")
-                .execute();
+    protected Map<String, Integer> getPortMap() {
+        return MutableMap.of("zookeeperPort", getZookeeperPort());
     }
 
-    public String getPidFile() { return getRunDir() + "/kafka.pid"; }
-
     @Override
-    public boolean isRunning() {
-        return newScript(ImmutableMap.of("usePidFile", getPidFile()), CHECK_RUNNING).execute() == 0;
+    protected ConfigKey<String> getConfigTemplateKey() {
+        return KafkaZookeeper.ZOOKEEPER_CONFIG_TEMPLATE;
     }
 
     @Override
-    public void stop() {
-        newScript(ImmutableMap.of("usePidFile", false), STOPPING)
-                .body.append("ps ax | grep quorum\\.QuorumPeerMain | awk '{print $1}' | xargs kill")
-                .body.append("ps ax | grep quorum\\.QuorumPeerMain | awk '{print $1}' | xargs kill -9")
-                .execute();
+    protected String getConfigFileName() {
+        return "zookeeper.properties";
     }
 
     @Override
-    protected Map<String, ?> getJmxJavaSystemProperties() {
-        return MutableMap.<String, Object> builder()
-                .put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort())
-                .put(JmxRmiAgent.RMI_REGISTRY_PORT_PROPERTY, getRmiServerPort())
-                .put("com.sun.management.jmxremote.ssl", false)
-                .put("com.sun.management.jmxremote.authenticate", false)
-                .put("java.rmi.server.hostname", getHostname())
-                .build();
+    protected String getLaunchScriptName() {
+        return "zookeeper-server-start.sh";
     }
 
     @Override
-    protected List<String> getJmxJavaConfigOptions() {
-        return ImmutableList.of("-javaagent:" + getJmxRmiAgentJarDestinationFilePath());
+    protected String getProcessIdentifier() {
+        return "quorum\\.QuorumPeerMain";
     }
 
-    /**
-     * Use RMI agent to provide JMX.
-     */
     @Override
-    public Map<String, String> getShellEnvironment() {
-        Map<String, String> orig = super.getShellEnvironment();
-        String kafkaJmxOpts = orig.remove("JAVA_OPTS");
-        return MutableMap.<String, String>builder()
-                .putAll(orig)
-                .put("KAFKA_JMX_OPTS", kafkaJmxOpts)
-                .build();
+    public Integer getZookeeperPort() {
+        return getEntity().getAttribute(KafkaZookeeper.ZOOKEEPER_PORT);
     }
 
 }