You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by ri...@apache.org on 2015/08/07 19:53:35 UTC

[02/10] incubator-brooklyn git commit: MySqlCluster - initial implementation

MySqlCluster - initial implementation


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/bae9628b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/bae9628b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/bae9628b

Branch: refs/heads/master
Commit: bae9628bf91e825246915f0ca2aa42ce526c9b8b
Parents: 0c6248f
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Wed Jul 29 19:40:02 2015 +0300
Committer: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Committed: Wed Aug 5 15:23:06 2015 +0300

----------------------------------------------------------------------
 .../group/AbstractMembershipTrackingPolicy.java |   3 +-
 .../entity/group/DynamicClusterImpl.java        |  16 +-
 software/database/pom.xml                       |   2 +
 .../entity/database/mysql/MySqlCluster.java     |  63 +++
 .../entity/database/mysql/MySqlClusterImpl.java | 397 +++++++++++++++++++
 .../entity/database/mysql/MySqlNode.java        |   3 +
 .../entity/database/mysql/MySqlNodeImpl.java    |   5 +-
 .../entity/database/mysql/mysql_master.conf     |  26 ++
 .../entity/database/mysql/mysql_slave.conf      |  33 ++
 .../util/collections/CollectionFunctionals.java |   8 +
 10 files changed, 547 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/core/src/main/java/brooklyn/entity/group/AbstractMembershipTrackingPolicy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/group/AbstractMembershipTrackingPolicy.java b/core/src/main/java/brooklyn/entity/group/AbstractMembershipTrackingPolicy.java
index 459f515..6f71a8c 100644
--- a/core/src/main/java/brooklyn/entity/group/AbstractMembershipTrackingPolicy.java
+++ b/core/src/main/java/brooklyn/entity/group/AbstractMembershipTrackingPolicy.java
@@ -241,7 +241,8 @@ public abstract class AbstractMembershipTrackingPolicy extends AbstractPolicy {
 
     /**
      * Called when a member is removed.
-     * Note that entity change events may arrive after this event; they should typically be ignored. 
+     * Note that entity change events may arrive after this event; they should typically be ignored.
+     * The entity could already be unmanaged at this point so limited functionality is available (i.e. can't access config keys).
      */
     protected void onEntityRemoved(Entity member) {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java
index 0e2f164..a384281 100644
--- a/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java
@@ -159,9 +159,9 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
 
     @Override
     protected void initEnrichers() {
-        if (getConfigRaw(UP_QUORUM_CHECK, true).isAbsent() && getConfig(INITIAL_SIZE)==0) {
+        if (config().getRaw(UP_QUORUM_CHECK).isAbsent() && getConfig(INITIAL_SIZE)==0) {
             // if initial size is 0 then override up check to allow zero if empty
-            setConfig(UP_QUORUM_CHECK, QuorumChecks.atLeastOneUnlessEmpty());
+            config().set(UP_QUORUM_CHECK, QuorumChecks.atLeastOneUnlessEmpty());
             setAttribute(SERVICE_UP, true);
         } else {
             setAttribute(SERVICE_UP, false);
@@ -173,7 +173,7 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
     
     @Override
     public void setRemovalStrategy(Function<Collection<Entity>, Entity> val) {
-        setConfig(REMOVAL_STRATEGY, checkNotNull(val, "removalStrategy"));
+        config().set(REMOVAL_STRATEGY, checkNotNull(val, "removalStrategy"));
     }
 
     protected Function<Collection<Entity>, Entity> getRemovalStrategy() {
@@ -183,7 +183,7 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
 
     @Override
     public void setZonePlacementStrategy(NodePlacementStrategy val) {
-        setConfig(ZONE_PLACEMENT_STRATEGY, checkNotNull(val, "zonePlacementStrategy"));
+        config().set(ZONE_PLACEMENT_STRATEGY, checkNotNull(val, "zonePlacementStrategy"));
     }
 
     protected NodePlacementStrategy getZonePlacementStrategy() {
@@ -192,13 +192,17 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
 
     @Override
     public void setZoneFailureDetector(ZoneFailureDetector val) {
-        setConfig(ZONE_FAILURE_DETECTOR, checkNotNull(val, "zoneFailureDetector"));
+        config().set(ZONE_FAILURE_DETECTOR, checkNotNull(val, "zoneFailureDetector"));
     }
 
     protected ZoneFailureDetector getZoneFailureDetector() {
         return checkNotNull(getConfig(ZONE_FAILURE_DETECTOR), "zoneFailureDetector config");
     }
 
+    protected EntitySpec<?> getFirstMemberSpec() {
+        return getConfig(FIRST_MEMBER_SPEC);
+    }
+
     protected EntitySpec<?> getMemberSpec() {
         return getConfig(MEMBER_SPEC);
     }
@@ -780,7 +784,7 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
 
     protected Entity createNode(@Nullable Location loc, Map<?,?> flags) {
         EntitySpec<?> memberSpec = null;
-        if (getMembers().isEmpty()) memberSpec = getConfig(FIRST_MEMBER_SPEC);
+        if (getMembers().isEmpty()) memberSpec = getFirstMemberSpec();
         if (memberSpec == null) memberSpec = getMemberSpec();
         
         if (memberSpec != null) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/software/database/pom.xml
----------------------------------------------------------------------
diff --git a/software/database/pom.xml b/software/database/pom.xml
index b01f636..fed3b0d 100644
--- a/software/database/pom.xml
+++ b/software/database/pom.xml
@@ -49,6 +49,8 @@
                             <exclude>src/main/resources/brooklyn/entity/database/crate/crate.yaml</exclude>
                             <exclude>src/main/resources/brooklyn/entity/database/mariadb/my.cnf</exclude>
                             <exclude>src/main/resources/brooklyn/entity/database/mysql/mysql.conf</exclude>
+                            <exclude>src/main/resources/brooklyn/entity/database/mysql/mysql_master.conf</exclude>
+                            <exclude>src/main/resources/brooklyn/entity/database/mysql/mysql_slave.conf</exclude>
                             <exclude>src/main/resources/brooklyn/entity/database/postgresql/postgresql.conf</exclude>
                             <exclude>src/main/resources/brooklyn/entity/database/rubyrep/rubyrep.conf</exclude>
                             <exclude>src/main/resources/brooklyn/entity/database/mssql/ConfigurationFile.ini</exclude>

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java
new file mode 100644
index 0000000..8b19ef7
--- /dev/null
+++ b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.database.mysql;
+
+import java.util.Collection;
+
+import com.google.common.reflect.TypeToken;
+
+import brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.database.DatastoreMixins.HasDatastoreUrl;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+
+@ImplementedBy(MySqlClusterImpl.class)
+@Catalog(name="MySql Master-Slave cluster", description="Sets up a cluster of MySQL nodes using master-slave relation and binary logging", iconUrl="classpath:///mysql-logo-110x57.png")
+public interface MySqlCluster extends DynamicCluster, HasDatastoreUrl {
+    interface MySqlMaster {
+        AttributeSensor<String> MASTER_LOG_FILE = Sensors.newStringSensor("mysql.master.log_file", "The binary log file master is writing to");
+        AttributeSensor<Integer> MASTER_LOG_POSITION = Sensors.newIntegerSensor("mysql.master.log_position", "The position in the log file to start replication");
+    }
+
+    ConfigKey<String> SLAVE_USERNAME = ConfigKeys.newStringConfigKey(
+            "mysql.slave.username", "The user name slaves will use to connect to the master", "slave");
+    ConfigKey<String> SLAVE_REPLICATE_DO_DB = ConfigKeys.newStringConfigKey(
+            "mysql.slave.replicate_do_db", "Replicate only listed DBs");
+    ConfigKey<String> SLAVE_REPLICATE_IGNORE_DB = ConfigKeys.newStringConfigKey(
+            "mysql.slave.replicate_ignore_db", "Don't replicate listed DBs");
+    ConfigKey<String> SLAVE_REPLICATE_DO_TABLE = ConfigKeys.newStringConfigKey(
+            "mysql.slave.replicate_do_table", "Replicate only listed tables");
+    ConfigKey<String> SLAVE_REPLICATE_IGNORE_TABLE = ConfigKeys.newStringConfigKey(
+            "mysql.slave.replicate_ignore_table", "Don't replicate listed tables");
+    ConfigKey<String> SLAVE_REPLICATE_WILD_DO_TABLE = ConfigKeys.newStringConfigKey(
+            "mysql.slave.replicate_wild_do_table", "Replicate only listed tables, wildcards acepted");
+    ConfigKey<String> SLAVE_REPLICATE_WILD_IGNORE_TABLE = ConfigKeys.newStringConfigKey(
+            "mysql.slave.replicate_wild_ignore_table", "Don't replicate listed tables, wildcards acepted");
+    StringAttributeSensorAndConfigKey SLAVE_PASSWORD = new StringAttributeSensorAndConfigKey(
+            "mysql.slave.password", "The password slaves will use to connect to the master. Will be auto-generated by default.");
+    @SuppressWarnings("serial")
+    AttributeSensor<Collection<String>> SLAVE_DATASTORE_URL_LIST = Sensors.newSensor(new TypeToken<Collection<String>>() {},
+            "mysql.slave.datastore.url", "List of all slave's DATASTORE_URL sensors");
+    AttributeSensor<Double> QUERIES_PER_SECOND_FROM_MYSQL_PER_NODE = Sensors.newDoubleSensor("mysql.queries.perSec.fromMysql.perNode");
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java
new file mode 100644
index 0000000..3eaa335
--- /dev/null
+++ b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.database.mysql;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Splitter;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.reflect.TypeToken;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.enricher.Enrichers;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
+import brooklyn.entity.group.DynamicClusterImpl;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.event.basic.DependentConfiguration;
+import brooklyn.event.basic.Sensors;
+import brooklyn.location.Location;
+import brooklyn.util.collections.CollectionFunctionals;
+import brooklyn.util.guava.IfFunctions;
+import brooklyn.util.task.DynamicTasks;
+import brooklyn.util.task.TaskBuilder;
+import brooklyn.util.text.Identifiers;
+
+// https://dev.mysql.com/doc/refman/5.7/en/replication-howto.html
+
+// TODO CREATION_SCRIPT_CONTENTS executed before replication setup so it is not replicated to slaves
+// TODO Bootstrap slave from dump for the case where the binary log is purged
+// TODO Promote slave to master
+// TODO SSL connection between master and slave
+// TODO DB credentials littered all over the place in file system
+public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster {
+    private static final AttributeSensor<Boolean> NODE_REPLICATION_INITIALIZED = Sensors.newBooleanSensor("mysql.replication_initialized");
+
+    private static final String MASTER_CONFIG_URL = "classpath:///brooklyn/entity/database/mysql/mysql_master.conf";
+    private static final String SLAVE_CONFIG_URL = "classpath:///brooklyn/entity/database/mysql/mysql_slave.conf";
+    private static final String NOT_UP_REPLICATION = "replication_not_configured";
+    private static final int MASTER_SERVER_ID = 1;
+    private static final Predicate<Entity> IS_MASTER = EntityPredicates.configEqualTo(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID);
+
+    @SuppressWarnings("serial")
+    private static final AttributeSensor<Supplier<Integer>> SLAVE_NEXT_SERVER_ID = Sensors.newSensor(new TypeToken<Supplier<Integer>>() {},
+            "mysql.slave.next_server_id", "Returns the ID of the next slave server");
+    @SuppressWarnings("serial")
+    private static final AttributeSensor<Map<String, String>> SLAVE_ID_ADDRESS_MAPPING = Sensors.newSensor(new TypeToken<Map<String, String>>() {},
+            "mysql.slave.id_address_mapping", "Maps slave entity IDs to SUBNET_ADDRESS, so the address is known at member remove time.");
+
+    @Override
+    public void init() {
+        super.init();
+        // Set id supplier in attribute so it is serialized
+        setAttribute(SLAVE_NEXT_SERVER_ID, new NextServerIdSupplier());
+        setAttribute(SLAVE_ID_ADDRESS_MAPPING, new ConcurrentHashMap<String, String>());
+        if (getConfig(SLAVE_PASSWORD) == null) {
+            setAttribute(SLAVE_PASSWORD, Identifiers.makeRandomId(8));
+        } else {
+            setAttribute(SLAVE_PASSWORD, getConfig(SLAVE_PASSWORD));
+        }
+        initSubscriptions();
+    }
+
+    @Override
+    public void rebind() {
+        super.rebind();
+        initSubscriptions();
+    }
+
+    private void initSubscriptions() {
+        subscribeToMembers(this, MySqlNode.SERVICE_PROCESS_IS_RUNNING, new NodeRunningListener(this));
+        subscribe(this, MEMBER_REMOVED, new MemberRemovedListener());
+    }
+
+    @Override
+    protected void initEnrichers() {
+        super.initEnrichers();
+        propagateMasterAttribute(MySqlNode.HOSTNAME);
+        propagateMasterAttribute(MySqlNode.ADDRESS);
+        propagateMasterAttribute(MySqlNode.MYSQL_PORT);
+        propagateMasterAttribute(MySqlNode.DATASTORE_URL);
+
+        addEnricher(Enrichers.builder()
+                .aggregating(MySqlNode.DATASTORE_URL)
+                .publishing(SLAVE_DATASTORE_URL_LIST)
+                .computing(Functions.<Collection<String>>identity())
+                .entityFilter(Predicates.not(IS_MASTER))
+                .fromMembers()
+                .build());
+
+        addEnricher(Enrichers.builder()
+                .aggregating(MySqlNode.QUERIES_PER_SECOND_FROM_MYSQL)
+                .publishing(QUERIES_PER_SECOND_FROM_MYSQL_PER_NODE)
+                .fromMembers()
+                .computingAverage()
+                .defaultValueForUnreportedSensors(0d)
+                .build());
+    }
+
+    private void propagateMasterAttribute(AttributeSensor<?> att) {
+        addEnricher(Enrichers.builder()
+                .aggregating(att)
+                .publishing(att)
+                .computing(IfFunctions.ifPredicate(CollectionFunctionals.notEmpty())
+                        .apply(CollectionFunctionals.firstElement())
+                        .defaultValue(null))
+                .entityFilter(IS_MASTER)
+                .build());
+    }
+
+    @Override
+    protected EntitySpec<?> getFirstMemberSpec() {
+        final EntitySpec<?> firstMemberSpec = super.getFirstMemberSpec();
+        if (firstMemberSpec != null) {
+            return applyDefaults(firstMemberSpec, Suppliers.ofInstance(MASTER_SERVER_ID), MASTER_CONFIG_URL, false);
+        }
+
+        final EntitySpec<?> memberSpec = super.getMemberSpec();
+        if (memberSpec != null) {
+            if (!isKeyConfigured(memberSpec, MySqlNode.TEMPLATE_CONFIGURATION_URL.getConfigKey())) {
+                return EntitySpec.create(memberSpec)
+                        .configure(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID)
+                        .configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, MASTER_CONFIG_URL);
+            } else {
+                return memberSpec;
+            }
+        }
+
+        return EntitySpec.create(MySqlNode.class)
+                .displayName("MySql Master")
+                .configure(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID)
+                .configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, MASTER_CONFIG_URL);
+    }
+
+    @Override
+    protected EntitySpec<?> getMemberSpec() {
+        Supplier<Integer> serverIdSupplier = getAttribute(SLAVE_NEXT_SERVER_ID);
+
+        EntitySpec<?> spec = super.getMemberSpec();
+        if (spec != null) {
+            return applyDefaults(spec, serverIdSupplier, SLAVE_CONFIG_URL, true);
+        }
+
+        return EntitySpec.create(MySqlNode.class)
+                .displayName("MySql Slave")
+                .configure(MySqlNode.MYSQL_SERVER_ID, serverIdSupplier.get())
+                .configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, SLAVE_CONFIG_URL)
+                // block inheritance, only master should execute the creation script
+                .configure(MySqlNode.CREATION_SCRIPT_URL, (String) null)
+                .configure(MySqlNode.CREATION_SCRIPT_CONTENTS, (String) null);
+    }
+
+    private EntitySpec<?> applyDefaults(EntitySpec<?> spec, Supplier<Integer> serverId, String configUrl, boolean resetCreationScript) {
+        boolean needsServerId = !isKeyConfigured(spec, MySqlNode.MYSQL_SERVER_ID);
+        boolean needsConfigUrl = !isKeyConfigured(spec, MySqlNode.TEMPLATE_CONFIGURATION_URL.getConfigKey());
+        boolean needsCreationScriptUrl = resetCreationScript && !isKeyConfigured(spec, MySqlNode.CREATION_SCRIPT_URL);
+        boolean needsCreationScriptContents = resetCreationScript && !isKeyConfigured(spec, MySqlNode.CREATION_SCRIPT_CONTENTS);
+        if (needsServerId || needsConfigUrl || needsCreationScriptUrl || needsCreationScriptContents) {
+            EntitySpec<?> clonedSpec = EntitySpec.create(spec);
+            if (needsServerId) {
+                clonedSpec.configure(MySqlNode.MYSQL_SERVER_ID, serverId.get());
+            }
+            if (needsConfigUrl) {
+                clonedSpec.configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, configUrl);
+            }
+            if (needsCreationScriptUrl) {
+                clonedSpec.configure(MySqlNode.CREATION_SCRIPT_URL, (String) null);
+            }
+            if (needsCreationScriptContents) {
+                clonedSpec.configure(MySqlNode.CREATION_SCRIPT_URL, (String) null);
+            }
+            return clonedSpec;
+        } else {
+            return spec;
+        }
+    }
+
+    private boolean isKeyConfigured(EntitySpec<?> spec, ConfigKey<?> key) {
+        return spec.getConfig().containsKey(key) || spec.getFlags().containsKey(key.getName());
+    }
+
+    @Override
+    protected Entity createNode(Location loc, Map<?, ?> flags) {
+        Entity node = super.createNode(loc, flags);
+        Integer serverId = node.getConfig(MySqlNode.MYSQL_SERVER_ID);
+        if (serverId > 0) {
+            ServiceNotUpLogic.updateNotUpIndicator((EntityLocal)node, NOT_UP_REPLICATION, "Replication not started");
+        }
+        return node;
+    }
+
+    private static class NextServerIdSupplier implements Supplier<Integer> {
+        private AtomicInteger nextId = new AtomicInteger(MASTER_SERVER_ID+1);
+
+        @Override
+        public Integer get() {
+            return nextId.getAndIncrement();
+        }
+    }
+
+    // ============= Member Init =============
+
+    // The task is executed in inessential context (event handler) so
+    // not visible in tasks UI. Better make it visible so the user can
+    // see failures, currently accessible only from logs.
+    private static final class InitReplicationTask implements Runnable {
+        private final MySqlCluster cluster;
+        private final MySqlNode node;
+
+        private InitReplicationTask(MySqlCluster cluster, MySqlNode node) {
+            this.cluster = cluster;
+            this.node = node;
+        }
+
+        @Override
+        public void run() {
+            Integer serverId = node.getConfig(MySqlNode.MYSQL_SERVER_ID);
+            if (serverId == MASTER_SERVER_ID) {
+                initMaster(node);
+            } else if (serverId > MASTER_SERVER_ID) {
+                initSlave(node);
+            }
+            ServiceNotUpLogic.clearNotUpIndicator((EntityLocal)node, NOT_UP_REPLICATION); 
+        }
+
+        private void initMaster(MySqlNode master) {
+            String binLogInfo = executeScriptOnNode(master, "FLUSH TABLES WITH READ LOCK;SHOW MASTER STATUS \\G UNLOCK TABLES;");
+            Iterator<String> splitIter = Splitter.on(Pattern.compile("\\n|:"))
+                    .omitEmptyStrings()
+                    .trimResults()
+                    .split(binLogInfo)
+                    .iterator();
+            while (splitIter.hasNext()) {
+                String part = splitIter.next();
+                if (part.equals("File")) {
+                    String file = splitIter.next();
+                    ((EntityInternal)master).setAttribute(MySqlMaster.MASTER_LOG_FILE, file);
+                } else if (part.equals("Position")) {
+                    Integer position = new Integer(splitIter.next());
+                    ((EntityInternal)master).setAttribute(MySqlMaster.MASTER_LOG_POSITION, position);
+                }
+            }
+        }
+
+        private void initSlave(MySqlNode slave) {
+            MySqlNode master = (MySqlNode) Iterables.find(cluster.getMembers(), IS_MASTER);
+            String masterLogFile = validateSqlParam(getAttributeBlocking(master, MySqlMaster.MASTER_LOG_FILE));
+            Integer masterLogPos = getAttributeBlocking(master, MySqlMaster.MASTER_LOG_POSITION);
+            String masterAddress = validateSqlParam(master.getAttribute(MySqlNode.SUBNET_ADDRESS));
+            Integer masterPort = master.getAttribute(MySqlNode.MYSQL_PORT);
+            String slaveAddress = validateSqlParam(slave.getAttribute(MySqlNode.SUBNET_ADDRESS));
+            String username = validateSqlParam(cluster.getConfig(SLAVE_USERNAME));
+            String password = validateSqlParam(cluster.getAttribute(SLAVE_PASSWORD));
+
+            executeScriptOnNode(master, String.format(
+                    "CREATE USER '%s'@'%s' IDENTIFIED BY '%s';\n" +
+                    "GRANT REPLICATION SLAVE ON *.* TO '%s'@'%s';\n",
+                    username, slaveAddress, password, username, slaveAddress));
+
+            String slaveCmd = String.format(
+                    "CHANGE MASTER TO " +
+                        "MASTER_HOST='%s', " +
+                        "MASTER_PORT=%d, " +
+                        "MASTER_USER='%s', " +
+                        "MASTER_PASSWORD='%s', " +
+                        "MASTER_LOG_FILE='%s', " +
+                        "MASTER_LOG_POS=%d;\n" +
+                    "START SLAVE;\n",
+                    masterAddress, masterPort, username, password, masterLogFile, masterLogPos);
+            executeScriptOnNode(slave, slaveCmd);
+
+            cluster.getAttribute(SLAVE_ID_ADDRESS_MAPPING).put(slave.getId(), slave.getAttribute(MySqlNode.SUBNET_ADDRESS));
+        }
+
+        private <T> T getAttributeBlocking(Entity masterNode, AttributeSensor<T> att) {
+            return DynamicTasks.queue(DependentConfiguration.attributeWhenReady(masterNode, att)).getUnchecked();
+        }
+
+    }
+
+    private static final class NodeRunningListener implements SensorEventListener<Boolean> {
+        private MySqlCluster cluster;
+
+        public NodeRunningListener(MySqlCluster cluster) {
+            this.cluster = cluster;
+        }
+
+        @Override
+        public void onEvent(SensorEvent<Boolean> event) {
+            final MySqlNode node = (MySqlNode) event.getSource();
+            if (Boolean.TRUE.equals(event.getValue()) &&
+                    // We are interested in SERVICE_PROCESS_IS_RUNNING only while haven't come online yet.
+                    // Probably will get several updates while replication is initialized so an additional
+                    // check is needed whether we have already seen this.
+                    Boolean.FALSE.equals(node.getAttribute(MySqlNode.SERVICE_UP)) &&
+                    !Boolean.TRUE.equals(node.getAttribute(NODE_REPLICATION_INITIALIZED))) {
+
+                // Events executed sequentially so no need to synchronize here.
+                if (Boolean.TRUE.equals(node.getAttribute(NODE_REPLICATION_INITIALIZED))) {
+                    return;
+                }
+                ((EntityLocal)node).setAttribute(NODE_REPLICATION_INITIALIZED, Boolean.TRUE);
+
+                DynamicTasks.queueIfPossible(TaskBuilder.builder()
+                        .name("Configure master-slave replication on node")
+                        .body(new InitReplicationTask(cluster, node))
+                        .build())
+                    .orSubmitAsync(node);
+            }
+        }
+
+    }
+
+    // ============= Member Remove =============
+
+    public class MemberRemovedListener implements SensorEventListener<Entity> {
+        @Override
+        public void onEvent(SensorEvent<Entity> event) {
+            MySqlCluster cluster = (MySqlCluster) event.getSource();
+            Entity node = event.getValue();
+            String slaveAddress = cluster.getAttribute(SLAVE_ID_ADDRESS_MAPPING).remove(node.getId());
+            if (slaveAddress != null) {
+                DynamicTasks.queueIfPossible(TaskBuilder.builder()
+                        .name("Remove slave access")
+                        .body(new RemoveSlaveConfigTask(cluster, slaveAddress))
+                        .build())
+                    .orSubmitAsync(cluster);
+            }
+        }
+    }
+
+    public class RemoveSlaveConfigTask implements Runnable {
+        private MySqlCluster cluster;
+        private String slaveAddress;
+
+        public RemoveSlaveConfigTask(MySqlCluster cluster, String slaveAddress) {
+            this.cluster = cluster;
+            this.slaveAddress = validateSqlParam(slaveAddress);
+        }
+
+        @Override
+        public void run() {
+            // Could already be gone if stopping the entire app - let it throw an exception
+            MySqlNode master = (MySqlNode) Iterables.find(cluster.getMembers(), IS_MASTER);
+            String username = validateSqlParam(cluster.getConfig(SLAVE_USERNAME));
+            executeScriptOnNode(master, String.format("DROP USER '%s'@'%s';", username, slaveAddress));
+        }
+
+    }
+
+    // Can't call node.executeScript directly, need to change execution context, so use an effector task
+    private static String executeScriptOnNode(MySqlNode node, String commands) {
+        return node.invoke(MySqlNode.EXECUTE_SCRIPT, ImmutableMap.of("commands", commands)).getUnchecked();
+    }
+
+    private static String validateSqlParam(String config) {
+        // Don't go into escape madness, just deny any suspicious strings.
+        // Would be nice to use prepared statements, but not worth pulling in the extra dependencies.
+        if (config.contains("'") && config.contains("\\")) {
+            throw new IllegalStateException("User provided string contains illegal SQL characters: " + config);
+        }
+        return config;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNode.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNode.java b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNode.java
index 36b4812..a5c44e0 100644
--- a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNode.java
+++ b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNode.java
@@ -65,6 +65,9 @@ public interface MySqlNode extends SoftwareProcess, HasShortName, DatastoreCommo
     
     public static final ConfigKey<Object> MYSQL_SERVER_CONF_LOWER_CASE_TABLE_NAMES = MYSQL_SERVER_CONF.subKey("lower_case_table_names", "See MySQL guide. Set 1 to ignore case in table names (useful for OS portability)");
     
+    @SetFromFlag("serverId")
+    public static final ConfigKey<Integer> MYSQL_SERVER_ID = ConfigKeys.newIntegerConfigKey("mysql.server_id", "Corresponds to server_id option", 0);
+    
     @SetFromFlag("password")
     public static final StringAttributeSensorAndConfigKey PASSWORD = new StringAttributeSensorAndConfigKey(
             "mysql.password", "Database admin password (or randomly generated if not set)", null);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNodeImpl.java b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNodeImpl.java
index 4f8606a..2e05fb0 100644
--- a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNodeImpl.java
+++ b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNodeImpl.java
@@ -81,7 +81,7 @@ public class MySqlNodeImpl extends SoftwareProcessImpl implements MySqlNode {
             }
         });
     }
-    
+
     @Override
     protected void connectSensors() {
         super.connectSensors();
@@ -102,6 +102,7 @@ public class MySqlNodeImpl extends SoftwareProcessImpl implements MySqlNode {
                     .poll(new SshPollConfig<Double>(QUERIES_PER_SECOND_FROM_MYSQL)
                             .command(cmd)
                             .onSuccess(new Function<SshPollValue, Double>() {
+                                @Override
                                 public Double apply(SshPollValue input) {
                                     String q = Strings.getFirstWordAfter(input.getStdout(), "Queries per second avg:");
                                     if (q==null) return null;
@@ -151,7 +152,7 @@ public class MySqlNodeImpl extends SoftwareProcessImpl implements MySqlNode {
     public String getShortName() {
         return "MySQL";
     }
-    
+
     @Override
     public String executeScript(String commands) {
         return getDriver().executeScriptAsync(commands).block().getStdout();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/software/database/src/main/resources/brooklyn/entity/database/mysql/mysql_master.conf
----------------------------------------------------------------------
diff --git a/software/database/src/main/resources/brooklyn/entity/database/mysql/mysql_master.conf b/software/database/src/main/resources/brooklyn/entity/database/mysql/mysql_master.conf
new file mode 100644
index 0000000..791f2da
--- /dev/null
+++ b/software/database/src/main/resources/brooklyn/entity/database/mysql/mysql_master.conf
@@ -0,0 +1,26 @@
+[client]
+port            = ${driver.port?c}
+socket          = /tmp/mysql.sock.${entity.socketUid}.${driver.port?c}
+user            = root
+password        = ${entity.password}
+
+# Here follows entries for some specific programs
+
+# The MySQL server
+[mysqld]
+port            = ${driver.port?c}
+socket          = /tmp/mysql.sock.${entity.socketUid}.${driver.port?c}
+basedir         = ${driver.baseDir}
+datadir         = ${driver.dataDir}
+bind-address    = 0.0.0.0
+# skip-networking
+
+# Replication config
+server-id       = 1
+binlog-format   = mixed
+log-bin         = mysql-bin
+sync_binlog     = 1
+innodb_flush_log_at_trx_commit=1
+
+# Custom configuration options
+${driver.mySqlServerOptionsString}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/software/database/src/main/resources/brooklyn/entity/database/mysql/mysql_slave.conf
----------------------------------------------------------------------
diff --git a/software/database/src/main/resources/brooklyn/entity/database/mysql/mysql_slave.conf b/software/database/src/main/resources/brooklyn/entity/database/mysql/mysql_slave.conf
new file mode 100644
index 0000000..2e1e945
--- /dev/null
+++ b/software/database/src/main/resources/brooklyn/entity/database/mysql/mysql_slave.conf
@@ -0,0 +1,33 @@
+[#ftl]
+[client]
+port            = ${driver.port?c}
+socket          = /tmp/mysql.sock.${entity.socketUid}.${driver.port?c}
+user            = root
+password        = ${entity.password}
+
+# Here follows entries for some specific programs
+
+# The MySQL server
+[mysqld]
+port            = ${driver.port?c}
+socket          = /tmp/mysql.sock.${entity.socketUid}.${driver.port?c}
+basedir         = ${driver.baseDir}
+datadir         = ${driver.dataDir}
+bind-address    = 0.0.0.0
+# skip-networking
+
+# Replication config
+server-id       = ${config["mysql.server_id"]}
+relay-log       = mysql-slave-${config["mysql.server_id"]}-relay
+relay-log-recovery = 1
+relay-log-info-repository = TABLE
+relay-log-purge = 1
+[#if !config["mysql.slave.replicate_do_db"]??            ]#[/#if]replicate-do-db             = ${config["mysql.slave.replicate_do_db"]!}
+[#if !config["mysql.slave.replicate_ignore_db"]??        ]#[/#if]replicate-ignore-db         = ${config["mysql.slave.replicate_ignore_db"]!}
+[#if !config["mysql.slave.replicate_do_table"]??         ]#[/#if]replicate-do-table          = ${config["mysql.slave.replicate_do_table"]!}
+[#if !config["mysql.slave.replicate_ignore_table"]??     ]#[/#if]replicate-ignore-table      = ${config["mysql.slave.replicate_ignore_table"]!}
+[#if !config["mysql.slave.replicate_wild_do_table"]??    ]#[/#if]replicate-wild-do-table     = ${config["mysql.slave.replicate_wild_do_table"]!}
+[#if !config["mysql.slave.replicate_wild_ignore_table"]??]#[/#if]replicate-wild-ignore-table = ${config["mysql.slave.replicate_wild_ignore_table"]!}
+
+# Custom configuration options
+${driver.mySqlServerOptionsString}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java b/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java
index 4208fe3..2d08bd2 100644
--- a/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java
+++ b/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java
@@ -148,6 +148,14 @@ public class CollectionFunctionals {
         return Predicates.compose(Predicates.equalTo(targetSize), CollectionFunctionals.sizeFunction());
     }
 
+    public static Predicate<Iterable<?>> empty() {
+        return sizeEquals(0);
+    }
+
+    public static Predicate<Iterable<?>> notEmpty() {
+        return Predicates.not(empty());
+    }
+
     public static <K> Predicate<Map<K,?>> mapSizeEquals(int targetSize) {
         return Predicates.compose(Predicates.equalTo(targetSize), CollectionFunctionals.<K>mapSize());
     }