You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sj...@apache.org on 2015/10/26 13:31:23 UTC

[2/7] incubator-brooklyn git commit: Dump based replication init

Dump based replication init

When the replication logs on master no longer start from the beginning a database dump will be performed to initialize the slave


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/0de0a2a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/0de0a2a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/0de0a2a9

Branch: refs/heads/master
Commit: 0de0a2a944b08437c4f0eda407265bd89a26147a
Parents: 5fda6ed
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Thu Oct 15 17:36:31 2015 +0300
Committer: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Committed: Mon Oct 19 16:48:52 2015 +0300

----------------------------------------------------------------------
 .../brooklyn/util/core/task/DynamicTasks.java   |  17 +
 .../system_service/SystemServiceEnricher.java   |  12 +-
 .../entity/database/DatastoreMixins.java        |   5 +-
 .../database/mysql/InitSlaveTaskBody.java       | 385 +++++++++++++++++++
 .../entity/database/mysql/MySqlCluster.java     |  12 +-
 .../entity/database/mysql/MySqlClusterImpl.java | 230 ++++-------
 .../database/mysql/MySqlClusterUtils.java       |  52 +++
 .../entity/database/mysql/MySqlDriver.java      |   7 +-
 .../entity/database/mysql/MySqlNode.java        |  43 ++-
 .../database/mysql/MySqlNodeEffectors.java      |  87 +++++
 .../entity/database/mysql/MySqlNodeImpl.java    |   7 +-
 .../entity/database/mysql/MySqlSshDriver.java   |  41 +-
 .../database/mysql/ReplicationSnapshot.java     |  58 +++
 .../entity/database/VogellaExampleAccess.java   |  16 +-
 .../mysql/MySqlClusterIntegrationTest.java      | 138 ++++++-
 .../database/mysql/MySqlClusterTestHelper.java  |  35 +-
 16 files changed, 922 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
index 52ec88d..5574709 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
@@ -28,6 +28,7 @@ import org.apache.brooklyn.api.mgmt.TaskAdaptable;
 import org.apache.brooklyn.api.mgmt.TaskFactory;
 import org.apache.brooklyn.api.mgmt.TaskQueueingContext;
 import org.apache.brooklyn.api.mgmt.TaskWrapper;
+import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.time.Duration;
@@ -333,4 +334,20 @@ public class DynamicTasks {
         return queueIfPossible(task).orSubmitAsync(entity).asTask();
     }
 
+    /** Breaks the parent-child relation between Tasks.current() and the task passed,
+     *  making the new task a top-level one at the target entity.
+     *  To make it visible in the UI, also tag the task with:
+     *    .tag(BrooklynTaskTags.tagForContextEntity(entity))
+     *    .tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG)
+     */
+    public static <T> Task<T> submitTopLevelTask(TaskAdaptable<T> task, Entity entity) {
+        Task<?> currentTask = BasicExecutionManager.getPerThreadCurrentTask().get();
+        BasicExecutionManager.getPerThreadCurrentTask().set(null);
+        try {
+            return Entities.submit(entity, task).asTask();
+        } finally {
+            BasicExecutionManager.getPerThreadCurrentTask().set(currentTask);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
index fd9c32e..26c0fdb 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
@@ -114,17 +114,7 @@ public class SystemServiceEnricher extends AbstractEnricher implements Enricher
                 .tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG)
                 .build();
 
-        submitTopLevel(updateService);
-    }
-
-    private void submitTopLevel(Task<Void> updateService) {
-        Task<?> currentTask = BasicExecutionManager.getPerThreadCurrentTask().get();
-        BasicExecutionManager.getPerThreadCurrentTask().set(null);
-        try {
-            Entities.submit(entity, updateService);
-        } finally {
-            BasicExecutionManager.getPerThreadCurrentTask().set(currentTask);
-        }
+        DynamicTasks.submitTopLevelTask(updateService, entity);
     }
 
     private String getLaunchScript(String stdin, String env) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/DatastoreMixins.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/DatastoreMixins.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/DatastoreMixins.java
index 534c0eb..67eda16 100644
--- a/software/database/src/main/java/org/apache/brooklyn/entity/database/DatastoreMixins.java
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/DatastoreMixins.java
@@ -50,9 +50,10 @@ public class DatastoreMixins {
     public static final Effector<String> EXECUTE_SCRIPT = CanExecuteScript.EXECUTE_SCRIPT;
     
     public static interface CanExecuteScript {
-        public static final Effector<String> EXECUTE_SCRIPT = Effectors.effector(String.class, "executeScript")
+        ConfigKey<String> COMMANDS = ConfigKeys.newStringConfigKey("commands");
+        Effector<String> EXECUTE_SCRIPT = Effectors.effector(String.class, "executeScript")
             .description("executes the given script contents")
-            .parameter(String.class, "commands")
+            .parameter(COMMANDS)
             .buildAbstract();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/InitSlaveTaskBody.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/InitSlaveTaskBody.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/InitSlaveTaskBody.java
new file mode 100644
index 0000000..f125024
--- /dev/null
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/InitSlaveTaskBody.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.database.mysql;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.effector.EffectorTasks;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks;
+import org.apache.brooklyn.core.entity.EntityPredicates;
+import org.apache.brooklyn.core.sensor.DependentConfiguration;
+import org.apache.brooklyn.entity.database.mysql.MySqlNode.ExportDumpEffector;
+import org.apache.brooklyn.entity.software.base.SoftwareProcess;
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.TaskTags;
+import org.apache.brooklyn.util.core.task.ssh.SshTasks;
+import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory;
+import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.os.Os;
+import org.apache.brooklyn.util.ssh.BashCommands;
+import org.apache.brooklyn.util.text.Identifiers;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang3.concurrent.ConcurrentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicates;
+import com.google.common.base.Strings;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+public class InitSlaveTaskBody implements Runnable {
+    private static final String SNAPSHOT_DUMP_OPTIONS = "--skip-lock-tables --single-transaction --flush-logs --hex-blob -A";
+
+    private static final Logger log = LoggerFactory.getLogger(InitSlaveTaskBody.class);
+
+    private final MySqlCluster cluster;
+    private final MySqlNode slave;
+    private Semaphore lock;
+
+    public InitSlaveTaskBody(MySqlCluster cluster, MySqlNode slave, Semaphore lock) {
+        this.cluster = cluster;
+        this.slave = slave;
+        this.lock = lock;
+    }
+
+    @Override
+    public void run() {
+        // Replication init state consists of:
+        //   * initial dump (optional)
+        //   * location of initial dump (could be on any of the members, optional)
+        //   * bin log file name
+        //   * bin log position
+        // 1. Check replication state:
+        //   * Does the dump exist (and the machine where it is located)
+        //   * Does the bin log exist on the master
+        // 2. If the replication state is not valid create a new one
+        //   * Select a slave to dump, master if no slaves
+        //   * If it's a slave do 'STOP SLAVE SQL_THREAD;'
+        //   * Call mysqldump to create the snapshot
+        //   * When done if a slave do 'START SLAVE SQL_THREAD;'
+        //   * Get master state from the dump - grep "MASTER_LOG_POS" dump.sql.
+        //     If slave get state from 'SHOW SLAVE STATUS'
+        //   * Save new init info in cluster - bin log name, position, dump
+        // 3. Init Slave
+        //   * transfer dump to new slave (if dump exists)
+        //   * import - mysql < ~/dump.sql
+        //   * change master to and start slave
+        //!!! Caveat if dumping from master and MyISAM tables are used dump may be inconsistent.
+        //   * Only way around it is to lock the database while dumping (or taking a snapshot through LVM which is quicker)
+        bootstrapSlaveAsync(getValidReplicationInfo(), slave);
+        cluster.getAttribute(MySqlClusterImpl.SLAVE_ID_ADDRESS_MAPPING).put(slave.getId(), slave.getAttribute(MySqlNode.SUBNET_ADDRESS));
+    }
+
+    private MySqlNode getMaster() {
+        return (MySqlNode) Iterables.find(cluster.getMembers(), MySqlClusterUtils.IS_MASTER);
+    }
+
+    private void bootstrapSlaveAsync(final Future<ReplicationSnapshot> replicationInfoFuture, final MySqlNode slave) {
+        DynamicTasks.queue("bootstrap slave replication", new Runnable() {
+            @Override
+            public void run() {
+                ReplicationSnapshot replicationSnapshot;
+                try {
+                    replicationSnapshot = replicationInfoFuture.get();
+                } catch (InterruptedException | ExecutionException e) {
+                    throw Exceptions.propagate(e);
+                }
+
+                MySqlNode master = getMaster();
+                String masterAddress = MySqlClusterUtils.validateSqlParam(master.getAttribute(MySqlNode.SUBNET_ADDRESS));
+                Integer masterPort = master.getAttribute(MySqlNode.MYSQL_PORT);
+                String slaveAddress = MySqlClusterUtils.validateSqlParam(slave.getAttribute(MySqlNode.SUBNET_ADDRESS));
+                String username = MySqlClusterUtils.validateSqlParam(cluster.getConfig(MySqlCluster.SLAVE_USERNAME));
+                String password = MySqlClusterUtils.validateSqlParam(cluster.getAttribute(MySqlCluster.SLAVE_PASSWORD));
+
+                if (replicationSnapshot.getEntityId() != null) {
+                    Entity sourceEntity = Iterables.find(cluster.getMembers(), EntityPredicates.idEqualTo(replicationSnapshot.getEntityId()));
+                    String dumpId = FilenameUtils.removeExtension(replicationSnapshot.getSnapshotPath());
+                    copyDumpAsync(sourceEntity, slave, replicationSnapshot.getSnapshotPath(), dumpId);
+                    DynamicTasks.queue(Effectors.invocation(slave, MySqlNode.IMPORT_DUMP, ImmutableMap.of("path", replicationSnapshot.getSnapshotPath())));
+                    //The dump resets the password to whatever is on the source instance, reset it back.
+                    //We are able to still login because privileges are not flushed, so we just set the password to the same value.
+                    DynamicTasks.queue(Effectors.invocation(slave, MySqlNode.CHANGE_PASSWORD, ImmutableMap.of("password", slave.getAttribute(MySqlNode.PASSWORD))));                        //
+                    //Flush privileges to load new users coming from the dump
+                    MySqlClusterUtils.executeSqlOnNodeAsync(slave, "FLUSH PRIVILEGES;");
+                }
+
+                MySqlClusterUtils.executeSqlOnNodeAsync(master, String.format(
+                        "CREATE USER '%s'@'%s' IDENTIFIED BY '%s';\n" +
+                        "GRANT REPLICATION SLAVE ON *.* TO '%s'@'%s';\n",
+                        username, slaveAddress, password, username, slaveAddress));
+
+                // Executing this will unblock SERVICE_UP wait in the start effector
+                String slaveCmd = String.format(
+                        "CHANGE MASTER TO " +
+                            "MASTER_HOST='%s', " +
+                            "MASTER_PORT=%d, " +
+                            "MASTER_USER='%s', " +
+                            "MASTER_PASSWORD='%s', " +
+                            "MASTER_LOG_FILE='%s', " +
+                            "MASTER_LOG_POS=%d;\n" +
+                        "START SLAVE;\n",
+                        masterAddress, masterPort,
+                        username, password,
+                        replicationSnapshot.getBinLogName(),
+                        replicationSnapshot.getBinLogPosition());
+                MySqlClusterUtils.executeSqlOnNodeAsync(slave, slaveCmd);
+            }
+        });
+    }
+
+    private void copyDumpAsync(Entity source, Entity dest, String sourceDumpPath, String dumpId) {
+        final SshMachineLocation sourceMachine = EffectorTasks.getSshMachine(source);
+        final SshMachineLocation destMachine = EffectorTasks.getSshMachine(dest);
+
+        String sourceRunDir = source.getAttribute(MySqlNode.RUN_DIR);
+        String privateKeyFile = dumpId + ".id_rsa";
+        final Task<String> tempKeyTask = DynamicTasks.queue(SshEffectorTasks.ssh(
+                "cd $RUN_DIR",
+                "PRIVATE_KEY=" + privateKeyFile,
+                "ssh-keygen -t rsa -N '' -f $PRIVATE_KEY -C " + dumpId + " > /dev/null",
+                "cat $PRIVATE_KEY.pub")
+                .environmentVariable("RUN_DIR", sourceRunDir)
+                .machine(sourceMachine)
+                .summary("generate private key for slave access")
+                .requiringZeroAndReturningStdout())
+                .asTask();
+
+        DynamicTasks.queue("add key to authorized_keys", new Runnable() {
+            @Override
+            public void run() {
+                String publicKey = tempKeyTask.getUnchecked();
+                DynamicTasks.queue(SshEffectorTasks.ssh(String.format(
+                        "cat >> ~/.ssh/authorized_keys <<EOF\n%s\nEOF", 
+                        publicKey))
+                    .machine(destMachine)
+                    .summary("Add key to authorized_keys")
+                    .requiringExitCodeZero());
+            }
+        });
+
+        final ProcessTaskWrapper<Integer> copyTask = SshEffectorTasks.ssh(
+                "cd $RUN_DIR",
+                String.format(
+                    "scp -o 'BatchMode yes' -o 'StrictHostKeyChecking no' -i '%s' '%s' '%s@%s:%s/%s.sql'",
+                    privateKeyFile,
+                    sourceDumpPath,
+                    destMachine.getUser(),
+                    dest.getAttribute(MySqlNode.SUBNET_ADDRESS),
+                    dest.getAttribute(MySqlNode.RUN_DIR),
+                    dumpId))
+                .environmentVariable("RUN_DIR", sourceRunDir)
+                .machine(sourceMachine)
+                .summary("copy database dump to slave")
+                .newTask();
+        // Let next couple of tasks complete even if this one fails so that we can clean up.
+        TaskTags.markInessential(copyTask);
+        DynamicTasks.queue(copyTask);
+
+        // Delete private key
+        DynamicTasks.queue(SshEffectorTasks.ssh(
+                "cd $RUN_DIR",
+                "rm " + privateKeyFile)
+            .environmentVariable("RUN_DIR", sourceRunDir)
+            .machine(sourceMachine)
+            .summary("remove private key"));
+
+        DynamicTasks.queue(SshEffectorTasks.ssh(String.format(
+                "sed -i'' -e '/%s/d' ~/.ssh/authorized_keys",
+                dumpId))
+            .machine(destMachine)
+            .summary("remove private key from authorized_keys")).asTask();
+
+        // The task will fail if copyTask fails, but only after the private key is deleted.
+        DynamicTasks.queue("check for successful copy", new Runnable() {
+            @Override
+            public void run() {
+                copyTask.asTask().getUnchecked();
+            }
+        });
+    }
+
+    private Future<ReplicationSnapshot> getValidReplicationInfo() {
+        try {
+            try {
+                lock.acquire();
+            } catch (InterruptedException e) {
+                throw Exceptions.propagate(e);
+            }
+            ReplicationSnapshot replicationSnapshot = getAttributeBlocking(cluster, MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT);
+            if (!isReplicationInfoValid(replicationSnapshot)) {
+                final MySqlNode snapshotNode = getSnapshotNode();
+                final String dumpName = getDumpUniqueId() + ".sql";
+                if (MySqlClusterUtils.IS_MASTER.apply(snapshotNode)) {
+                    return createMasterReplicationSnapshot(snapshotNode, dumpName);
+                } else {
+                    return createSlaveReplicationSnapshot(snapshotNode, dumpName);
+                }
+            }
+            return ConcurrentUtils.constantFuture(replicationSnapshot);
+        } finally {
+            lock.release();
+        }
+    }
+
+    private Future<ReplicationSnapshot> createMasterReplicationSnapshot(final MySqlNode master, final String dumpName) {
+        log.info("MySql cluster " + cluster + ": generating new replication snapshot on master node " + master + " with name " + dumpName);
+        String dumpOptions = SNAPSHOT_DUMP_OPTIONS + " --master-data=2";
+        ImmutableMap<String, String> params = ImmutableMap.of(
+                ExportDumpEffector.PATH.getName(), dumpName,
+                ExportDumpEffector.ADDITIONAL_OPTIONS.getName(), dumpOptions);
+        DynamicTasks.queue(Effectors.invocation(master, MySqlNode.EXPORT_DUMP, params));
+        return DynamicTasks.queue("get master log info from dump", new Callable<ReplicationSnapshot>() {
+            @Override
+            public ReplicationSnapshot call() throws Exception {
+                Pattern masterInfoPattern = Pattern.compile("CHANGE MASTER TO.*MASTER_LOG_FILE\\s*=\\s*'([^']+)'.*MASTER_LOG_POS\\s*=\\s*(\\d+)");
+                String masterInfo = DynamicTasks.queue(execSshTask(master, "grep -m1 'CHANGE MASTER TO' " + dumpName, "Extract master replication status from dump")
+                        .requiringZeroAndReturningStdout()).asTask().getUnchecked();
+                Matcher masterInfoMatcher = masterInfoPattern.matcher(masterInfo);
+                if (!masterInfoMatcher.find() || masterInfoMatcher.groupCount() != 2) {
+                    throw new IllegalStateException("Master dump doesn't contain replication info: " + masterInfo);
+                }
+                String masterLogFile = masterInfoMatcher.group(1);
+                int masterLogPosition = Integer.parseInt(masterInfoMatcher.group(2));
+                ReplicationSnapshot replicationSnapshot = new ReplicationSnapshot(master.getId(), dumpName, masterLogFile, masterLogPosition);
+                cluster.sensors().set(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT, replicationSnapshot);
+                return replicationSnapshot;
+            }
+        });
+    }
+
+    private Future<ReplicationSnapshot> createSlaveReplicationSnapshot(final MySqlNode slave, final String dumpName) {
+        MySqlClusterUtils.executeSqlOnNodeAsync(slave, "STOP SLAVE SQL_THREAD;");
+        try {
+            log.info("MySql cluster " + cluster + ": generating new replication snapshot on slave node " + slave + " with name " + dumpName);
+            String dumpOptions = SNAPSHOT_DUMP_OPTIONS;
+            ImmutableMap<String, String> params = ImmutableMap.of(
+                    ExportDumpEffector.PATH.getName(), dumpName,
+                    ExportDumpEffector.ADDITIONAL_OPTIONS.getName(), dumpOptions);
+            DynamicTasks.queue(Effectors.invocation(slave, MySqlNode.EXPORT_DUMP, params));
+            return DynamicTasks.queue("get master log info from slave", new Callable<ReplicationSnapshot>() {
+                @Override
+                public ReplicationSnapshot call() throws Exception {
+                    String slaveStatusRow = slave.executeScript("SHOW SLAVE STATUS \\G");
+                    Map<String, String> slaveStatus = MySqlRowParser.parseSingle(slaveStatusRow);
+                    String masterLogFile = slaveStatus.get("Relay_Master_Log_File");
+                    int masterLogPosition = Integer.parseInt(slaveStatus.get("Exec_Master_Log_Pos"));
+                    ReplicationSnapshot replicationSnapshot = new ReplicationSnapshot(slave.getId(), dumpName, masterLogFile, masterLogPosition);
+                    cluster.sensors().set(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT, replicationSnapshot);
+                    return replicationSnapshot;
+                }
+            });
+        } finally {
+            MySqlClusterUtils.executeSqlOnNodeAsync(slave, "START SLAVE SQL_THREAD;");
+        }
+    }
+
+    private MySqlNode getSnapshotNode() {
+        String snapshotNodeId = cluster.getConfig(MySqlCluster.REPLICATION_PREFERRED_SOURCE);
+        if (snapshotNodeId != null) {
+            Optional<Entity> preferredNode = Iterables.tryFind(cluster.getMembers(), EntityPredicates.idEqualTo(snapshotNodeId));
+            if (preferredNode.isPresent()) {
+                return (MySqlNode) preferredNode.get();
+            } else {
+                log.warn("MySql cluster " + this + " configured with preferred snapshot node " + snapshotNodeId + " but it's not a member. Defaulting to a random slave.");
+            }
+        }
+        return getRandomSlave();
+    }
+
+    private MySqlNode getRandomSlave() {
+        List<MySqlNode> slaves = getHealhtySlaves();
+        if (slaves.size() > 0) {
+            return slaves.get(new Random().nextInt(slaves.size()));
+        } else {
+            return getMaster();
+        }
+    }
+
+    private ImmutableList<MySqlNode> getHealhtySlaves() {
+        return FluentIterable.from(cluster.getMembers())
+                   .filter(Predicates.not(MySqlClusterUtils.IS_MASTER))
+                   .filter(EntityPredicates.attributeEqualTo(MySqlNode.SERVICE_UP, Boolean.TRUE))
+                   .filter(MySqlNode.class)
+                   .toList();
+    }
+
+    private boolean isReplicationInfoValid(ReplicationSnapshot replicationSnapshot) {
+        MySqlNode master = getMaster();
+        String dataDir = Strings.nullToEmpty(master.getConfig(MySqlNode.DATA_DIR));
+        if (!checkFileExistsOnEntity(master, Os.mergePathsUnix(dataDir, replicationSnapshot.getBinLogName()))) {
+            return false;
+        }
+        if (replicationSnapshot.getEntityId() != null) {
+            Optional<Entity> snapshotSlave = Iterables.tryFind(cluster.getChildren(), EntityPredicates.idEqualTo(replicationSnapshot.getEntityId()));
+            if (!snapshotSlave.isPresent()) {
+                log.info("MySql cluster " + cluster + " missing node " + replicationSnapshot.getEntityId() + " with last snapshot " + replicationSnapshot.getSnapshotPath() + ". Will generate new snapshot.");
+                return false;
+            }
+            if (!checkFileExistsOnEntity(snapshotSlave.get(), replicationSnapshot.getSnapshotPath())) {
+                log.info("MySql cluster " + cluster + ", node " + snapshotSlave.get() + " missing replication snapshot " + replicationSnapshot.getSnapshotPath() + ". Will generate new snapshot.");
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean checkFileExistsOnEntity(Entity entity, String path) {
+        String cmd = BashCommands.chain(
+                BashCommands.requireTest(String.format("-f \"%s\"", path), "File " + path + " doesn't exist."));
+        String summary = "Check if file " + path + " exists";
+        return DynamicTasks.queue(execSshTask(entity, cmd, summary).allowingNonZeroExitCode()).asTask().getUnchecked() == 0;
+    }
+
+    private ProcessTaskFactory<Integer> execSshTask(Entity entity, String cmd, String summary) {
+        SshMachineLocation machine = EffectorTasks.getSshMachine(entity);
+        return SshTasks.newSshExecTaskFactory(machine, "cd $RUN_DIR\n" + cmd)
+            .allowingNonZeroExitCode()
+            .environmentVariable("RUN_DIR", entity.getAttribute(SoftwareProcess.RUN_DIR))
+            .summary(summary);
+    }
+
+    private <T> T getAttributeBlocking(Entity masterNode, AttributeSensor<T> att) {
+        return DynamicTasks.queue(DependentConfiguration.attributeWhenReady(masterNode, att)).getUnchecked();
+    }
+
+    private String getDumpUniqueId() {
+        return "replication-dump-" + Identifiers.makeRandomId(8) + "-" + new SimpleDateFormat("yyyy-MM-dd--HH-mm-ss").format(new Date());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java
index de43951..9ea5ffe 100644
--- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java
@@ -18,7 +18,7 @@
  */
 package org.apache.brooklyn.entity.database.mysql;
 
-import java.util.Collection;
+import java.util.List;
 
 import org.apache.brooklyn.api.catalog.Catalog;
 import org.apache.brooklyn.api.entity.ImplementedBy;
@@ -36,11 +36,6 @@ import com.google.common.reflect.TypeToken;
 @Catalog(name="MySql Master-Slave cluster", description="Sets up a cluster of MySQL nodes using master-slave relation and binary logging", iconUrl="classpath:///mysql-logo-110x57.png")
 public interface MySqlCluster extends DynamicCluster, HasDatastoreUrl {
     interface MySqlMaster {
-        AttributeSensor<String> MASTER_LOG_FILE = Sensors.newStringSensor(
-                "mysql.master.log_file", "The binary log file master is writing to");
-        AttributeSensor<Integer> MASTER_LOG_POSITION = Sensors.newIntegerSensor(
-                "mysql.master.log_position", "The position in the log file to start replication");
-
         ConfigKey<String> MASTER_CREATION_SCRIPT_CONTENTS = ConfigKeys.newStringConfigKey(
                 "datastore.master.creation.script.contents", "Contents of creation script to initialize the master node after initializing replication");
 
@@ -52,6 +47,9 @@ public interface MySqlCluster extends DynamicCluster, HasDatastoreUrl {
         AttributeSensor<Integer> SLAVE_SECONDS_BEHIND_MASTER = Sensors.newIntegerSensor("mysql.slave.seconds_behind_master", "How many seconds behind master is the replication state on the slave");
     }
 
+    AttributeSensor<ReplicationSnapshot> REPLICATION_LAST_SLAVE_SNAPSHOT = Sensors.newSensor(ReplicationSnapshot.class, "mysql.replication.last_slave_snapshot", "Last valid state to init slaves with");
+    ConfigKey<String> REPLICATION_PREFERRED_SOURCE = ConfigKeys.newStringConfigKey("mysql.replication.preferred_source", "ID of node to get the replication snapshot from. If not set a random slave is used, falling back to master if no slaves.");
+
     ConfigKey<String> SLAVE_USERNAME = ConfigKeys.newStringConfigKey(
             "mysql.slave.username", "The user name slaves will use to connect to the master", "slave");
     ConfigKey<String> SLAVE_REPLICATE_DO_DB = ConfigKeys.newStringConfigKey(
@@ -69,7 +67,7 @@ public interface MySqlCluster extends DynamicCluster, HasDatastoreUrl {
     StringAttributeSensorAndConfigKey SLAVE_PASSWORD = new StringAttributeSensorAndConfigKey(
             "mysql.slave.password", "The password slaves will use to connect to the master. Will be auto-generated by default.");
     @SuppressWarnings("serial")
-    AttributeSensor<Collection<String>> SLAVE_DATASTORE_URL_LIST = Sensors.newSensor(new TypeToken<Collection<String>>() {},
+    AttributeSensor<List<String>> SLAVE_DATASTORE_URL_LIST = Sensors.newSensor(new TypeToken<List<String>>() {},
             "mysql.slave.datastore.url", "List of all slave's DATASTORE_URL sensors");
     AttributeSensor<Double> QUERIES_PER_SECOND_FROM_MYSQL_PER_NODE = Sensors.newDoubleSensor("mysql.queries.perSec.fromMysql.perNode");
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java
index 9cebb21..e6afc18 100644
--- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java
@@ -18,10 +18,10 @@
  */
 package org.apache.brooklyn.entity.database.mysql;
 
-import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.annotation.Nullable;
@@ -35,10 +35,8 @@ import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.entity.EntityPredicates;
 import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceNotUpLogic;
-import org.apache.brooklyn.core.sensor.DependentConfiguration;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.enricher.stock.Enrichers;
 import org.apache.brooklyn.entity.group.DynamicClusterImpl;
@@ -57,7 +55,6 @@ import org.apache.brooklyn.util.time.Duration;
 
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
-import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
@@ -67,23 +64,21 @@ import com.google.common.reflect.TypeToken;
 
 // https://dev.mysql.com/doc/refman/5.7/en/replication-howto.html
 
-// TODO Bootstrap slave from dump for the case where the binary log is purged
-// TODO Promote slave to master
+// TODO Filter dump by database/table, currently all tables are replicated
 // TODO SSL connection between master and slave
-// TODO DB credentials littered all over the place in file system
+// TODO Promote slave to master
 public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster {
     private static final AttributeSensor<Boolean> NODE_REPLICATION_INITIALIZED = Sensors.newBooleanSensor("mysql.replication_initialized");
 
     private static final String MASTER_CONFIG_URL = "classpath:///org/apache/brooklyn/entity/database/mysql/mysql_master.conf";
     private static final String SLAVE_CONFIG_URL = "classpath:///org/apache/brooklyn/entity/database/mysql/mysql_slave.conf";
-    private static final int MASTER_SERVER_ID = 1;
-    private static final Predicate<Entity> IS_MASTER = EntityPredicates.configEqualTo(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID);
+    protected static final int MASTER_SERVER_ID = 1;
 
     @SuppressWarnings("serial")
     private static final AttributeSensor<Supplier<Integer>> SLAVE_NEXT_SERVER_ID = Sensors.newSensor(new TypeToken<Supplier<Integer>>() {},
             "mysql.slave.next_server_id", "Returns the ID of the next slave server");
     @SuppressWarnings("serial")
-    private static final AttributeSensor<Map<String, String>> SLAVE_ID_ADDRESS_MAPPING = Sensors.newSensor(new TypeToken<Map<String, String>>() {},
+    protected static final AttributeSensor<Map<String, String>> SLAVE_ID_ADDRESS_MAPPING = Sensors.newSensor(new TypeToken<Map<String, String>>() {},
             "mysql.slave.id_address_mapping", "Maps slave entity IDs to SUBNET_ADDRESS, so the address is known at member remove time.");
 
     @Override
@@ -111,6 +106,7 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
         subscriptions().subscribe(this, MEMBER_REMOVED, new MemberRemovedListener());
     }
 
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
     protected void initEnrichers() {
         super.initEnrichers();
@@ -124,8 +120,8 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
         enrichers().add(Enrichers.builder()
                 .aggregating(MySqlNode.DATASTORE_URL)
                 .publishing(SLAVE_DATASTORE_URL_LIST)
-                .computing(Functions.<Collection<String>>identity())
-                .entityFilter(Predicates.not(IS_MASTER))
+                .computing((Function)Functions.identity())
+                .entityFilter(Predicates.not(MySqlClusterUtils.IS_MASTER))
                 .fromMembers()
                 .build());
 
@@ -145,7 +141,7 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
                 .computing(IfFunctions.ifPredicate(CollectionFunctionals.notEmpty())
                         .apply(CollectionFunctionals.firstElement())
                         .defaultValue(null))
-                .entityFilter(IS_MASTER)
+                .entityFilter(MySqlClusterUtils.IS_MASTER)
                 .build());
     }
 
@@ -158,13 +154,7 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
 
         final EntitySpec<?> memberSpec = super.getMemberSpec();
         if (memberSpec != null) {
-            if (!isKeyConfigured(memberSpec, MySqlNode.TEMPLATE_CONFIGURATION_URL.getConfigKey())) {
-                return EntitySpec.create(memberSpec)
-                        .configure(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID)
-                        .configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, MASTER_CONFIG_URL);
-            } else {
-                return memberSpec;
-            }
+            return applyDefaults(memberSpec, Suppliers.ofInstance(MASTER_SERVER_ID), MASTER_CONFIG_URL);
         }
 
         return EntitySpec.create(MySqlNode.class)
@@ -184,6 +174,7 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
 
         return EntitySpec.create(MySqlNode.class)
                 .displayName("MySql Slave")
+                // Slave server IDs will not be linear because getMemberSpec not always results in createNode (result discarded)
                 .configure(MySqlNode.MYSQL_SERVER_ID, serverIdSupplier.get())
                 .configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, SLAVE_CONFIG_URL);
     }
@@ -211,9 +202,10 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
 
     @Override
     protected Entity createNode(Location loc, Map<?, ?> flags) {
-        Entity node = super.createNode(loc, flags);
-        if (!IS_MASTER.apply(node)) {
-            ServiceNotUpLogic.updateNotUpIndicator((EntityLocal)node, MySqlSlave.SLAVE_HEALTHY, "Replication not started");
+        MySqlNode node = (MySqlNode) super.createNode(loc, flags);
+        if (!MySqlClusterUtils.IS_MASTER.apply(node)) {
+            EntityLocal localNode = (EntityLocal) node;
+            ServiceNotUpLogic.updateNotUpIndicator(localNode, MySqlSlave.SLAVE_HEALTHY, "Replication not started");
 
             addFeed(FunctionFeed.builder()
                 .entity((EntityLocal)node)
@@ -221,7 +213,7 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
                 .poll(FunctionPollConfig.forSensor(MySqlSlave.SLAVE_HEALTHY)
                         .callable(new SlaveStateCallable(node))
                         .checkSuccess(StringPredicates.isNonBlank())
-                        .onSuccess(new SlaveStateParser(node))
+                        .onSuccess(new SlaveStateParser(localNode))
                         .setOnFailure(false)
                         .description("Polls SHOW SLAVE STATUS"))
                 .build());
@@ -235,15 +227,15 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
     }
 
     public static class SlaveStateCallable implements Callable<String> {
-        private Entity slave;
-        public SlaveStateCallable(Entity slave) {
+        private MySqlNode slave;
+        public SlaveStateCallable(MySqlNode slave) {
             this.slave = slave;
         }
 
         @Override
         public String call() throws Exception {
             if (Boolean.TRUE.equals(slave.getAttribute(MySqlNode.SERVICE_PROCESS_IS_RUNNING))) {
-                return slave.invoke(MySqlNode.EXECUTE_SCRIPT, ImmutableMap.of("commands", "SHOW SLAVE STATUS \\G")).asTask().getUnchecked();
+                return MySqlClusterUtils.executeSqlOnNode(slave, "SHOW SLAVE STATUS \\G");
             } else {
                 return null;
             }
@@ -252,9 +244,9 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
     }
 
     public static class SlaveStateParser implements Function<String, Boolean> {
-        private Entity slave;
+        private EntityLocal slave;
 
-        public SlaveStateParser(Entity slave) {
+        public SlaveStateParser(EntityLocal slave) {
             this.slave = slave;
         }
 
@@ -281,38 +273,67 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
 
     // ============= Member Init =============
 
-    // The task is executed in inessential context (event handler) so
-    // not visible in tasks UI. Better make it visible so the user can
-    // see failures, currently accessible only from logs.
-    private static final class InitReplicationTask implements Runnable {
-        private final MySqlCluster cluster;
-        private final MySqlNode node;
+    // The task is executed separately from the start effector, so failing here
+    // will not fail the start effector as well, but it will eventually time out
+    // because replication is not started.
+    // Would be nice to be able to plug in to the entity lifecycle!
 
-        private InitReplicationTask(MySqlCluster cluster, MySqlNode node) {
+    private static final class NodeRunningListener implements SensorEventListener<Boolean> {
+        private MySqlCluster cluster;
+        private Semaphore lock = new Semaphore(1);
+
+        public NodeRunningListener(MySqlCluster cluster) {
             this.cluster = cluster;
-            this.node = node;
         }
 
         @Override
-        public void run() {
-            Integer serverId = node.getConfig(MySqlNode.MYSQL_SERVER_ID);
-            if (serverId == MASTER_SERVER_ID) {
-                initMaster(node);
-            } else if (serverId > MASTER_SERVER_ID) {
-                initSlave(node);
+        public void onEvent(SensorEvent<Boolean> event) {
+            final MySqlNode node = (MySqlNode) event.getSource();
+            if (Boolean.TRUE.equals(event.getValue()) &&
+                    // We are interested in SERVICE_PROCESS_IS_RUNNING only while haven't come online yet.
+                    // Probably will get several updates while replication is initialized so an additional
+                    // check is needed whether we have already seen this.
+                    Boolean.FALSE.equals(node.getAttribute(MySqlNode.SERVICE_UP)) &&
+                    !Boolean.TRUE.equals(node.getAttribute(NODE_REPLICATION_INITIALIZED))) {
+
+                // Events executed sequentially so no need to synchronize here.
+                ((EntityLocal)node).setAttribute(NODE_REPLICATION_INITIALIZED, Boolean.TRUE);
+
+                final Runnable nodeInitTaskBody;
+                if (MySqlClusterUtils.IS_MASTER.apply(node)) {
+                    nodeInitTaskBody = new InitMasterTaskBody(cluster, node);
+                } else {
+                    nodeInitTaskBody = new InitSlaveTaskBody(cluster, node, lock);
+                }
+
+                DynamicTasks.submitTopLevelTask(TaskBuilder.builder()
+                        .displayName("setup master-slave replication")
+                        .body(nodeInitTaskBody)
+                        .tag(BrooklynTaskTags.tagForContextEntity(node))
+                        .tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG)
+                        .build(),
+                        node);
             }
         }
 
-        private void initMaster(MySqlNode master) {
-            String binLogInfo = executeScriptOnNode(master, "FLUSH TABLES WITH READ LOCK;SHOW MASTER STATUS \\G UNLOCK TABLES;");
+    }
+    
+    private static class InitMasterTaskBody implements Runnable {
+        private MySqlNode master;
+        private MySqlCluster cluster;
+        public InitMasterTaskBody(MySqlCluster cluster, MySqlNode master) {
+            this.cluster = cluster;
+            this.master = master;
+        }
+
+        @Override
+        public void run() {
+            String binLogInfo = MySqlClusterUtils.executeSqlOnNode(master, "FLUSH TABLES WITH READ LOCK;SHOW MASTER STATUS \\G UNLOCK TABLES;");
             Map<String, String> status = MySqlRowParser.parseSingle(binLogInfo);
             String file = status.get("File");
-            if (file != null) {
-                ((EntityInternal)master).sensors().set(MySqlMaster.MASTER_LOG_FILE, file);
-            }
             String position = status.get("Position");
-            if (position != null) {
-                ((EntityInternal)master).sensors().set(MySqlMaster.MASTER_LOG_POSITION, new Integer(position));
+            if (file != null && position != null) {
+                cluster.sensors().set(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT, new ReplicationSnapshot(null, null, file, Integer.parseInt(position)));
             }
 
             //NOTE: Will be executed on each start, analogously to the standard CREATION_SCRIPT config
@@ -331,71 +352,6 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
                 return contents;
             return null;
         }
-        
-        private void initSlave(MySqlNode slave) {
-            MySqlNode master = (MySqlNode) Iterables.find(cluster.getMembers(), IS_MASTER);
-            String masterLogFile = validateSqlParam(getAttributeBlocking(master, MySqlMaster.MASTER_LOG_FILE));
-            Integer masterLogPos = getAttributeBlocking(master, MySqlMaster.MASTER_LOG_POSITION);
-            String masterAddress = validateSqlParam(master.getAttribute(MySqlNode.SUBNET_ADDRESS));
-            Integer masterPort = master.getAttribute(MySqlNode.MYSQL_PORT);
-            String slaveAddress = validateSqlParam(slave.getAttribute(MySqlNode.SUBNET_ADDRESS));
-            String username = validateSqlParam(cluster.getConfig(SLAVE_USERNAME));
-            String password = validateSqlParam(cluster.getAttribute(SLAVE_PASSWORD));
-
-            executeScriptOnNode(master, String.format(
-                    "CREATE USER '%s'@'%s' IDENTIFIED BY '%s';\n" +
-                    "GRANT REPLICATION SLAVE ON *.* TO '%s'@'%s';\n",
-                    username, slaveAddress, password, username, slaveAddress));
-
-            String slaveCmd = String.format(
-                    "CHANGE MASTER TO " +
-                        "MASTER_HOST='%s', " +
-                        "MASTER_PORT=%d, " +
-                        "MASTER_USER='%s', " +
-                        "MASTER_PASSWORD='%s', " +
-                        "MASTER_LOG_FILE='%s', " +
-                        "MASTER_LOG_POS=%d;\n" +
-                    "START SLAVE;\n",
-                    masterAddress, masterPort, username, password, masterLogFile, masterLogPos);
-            executeScriptOnNode(slave, slaveCmd);
-
-            cluster.getAttribute(SLAVE_ID_ADDRESS_MAPPING).put(slave.getId(), slave.getAttribute(MySqlNode.SUBNET_ADDRESS));
-        }
-
-        private <T> T getAttributeBlocking(Entity masterNode, AttributeSensor<T> att) {
-            return DynamicTasks.queue(DependentConfiguration.attributeWhenReady(masterNode, att)).getUnchecked();
-        }
-
-    }
-
-    private static final class NodeRunningListener implements SensorEventListener<Boolean> {
-        private MySqlCluster cluster;
-
-        public NodeRunningListener(MySqlCluster cluster) {
-            this.cluster = cluster;
-        }
-
-        @Override
-        public void onEvent(SensorEvent<Boolean> event) {
-            final MySqlNode node = (MySqlNode) event.getSource();
-            if (Boolean.TRUE.equals(event.getValue()) &&
-                    // We are interested in SERVICE_PROCESS_IS_RUNNING only while haven't come online yet.
-                    // Probably will get several updates while replication is initialized so an additional
-                    // check is needed whether we have already seen this.
-                    Boolean.FALSE.equals(node.getAttribute(MySqlNode.SERVICE_UP)) &&
-                    !Boolean.TRUE.equals(node.getAttribute(NODE_REPLICATION_INITIALIZED))) {
-
-                // Events executed sequentially so no need to synchronize here.
-                node.sensors().set(NODE_REPLICATION_INITIALIZED, Boolean.TRUE);
-
-                DynamicTasks.queueIfPossible(TaskBuilder.builder()
-                        .displayName("Configure master-slave replication on node")
-                        .body(new InitReplicationTask(cluster, node))
-                        .build())
-                    .orSubmitAsync(node);
-            }
-        }
-
     }
 
     // ============= Member Remove =============
@@ -407,46 +363,12 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
             Entity node = event.getValue();
             String slaveAddress = cluster.getAttribute(SLAVE_ID_ADDRESS_MAPPING).remove(node.getId());
             if (slaveAddress != null) {
-                DynamicTasks.queueIfPossible(TaskBuilder.builder()
-                        .displayName("Remove slave access")
-                        .body(new RemoveSlaveConfigTask(cluster, slaveAddress))
-                        .build())
-                    .orSubmitAsync(cluster);
+                // Could already be gone if stopping the entire app - let it throw an exception
+                MySqlNode master = (MySqlNode) Iterables.find(cluster.getMembers(), MySqlClusterUtils.IS_MASTER);
+                String username = MySqlClusterUtils.validateSqlParam(cluster.getConfig(SLAVE_USERNAME));
+                MySqlClusterUtils.executeSqlOnNodeAsync(master, String.format("DROP USER '%s'@'%s';", username, slaveAddress));
             }
         }
     }
 
-    public class RemoveSlaveConfigTask implements Runnable {
-        private MySqlCluster cluster;
-        private String slaveAddress;
-
-        public RemoveSlaveConfigTask(MySqlCluster cluster, String slaveAddress) {
-            this.cluster = cluster;
-            this.slaveAddress = validateSqlParam(slaveAddress);
-        }
-
-        @Override
-        public void run() {
-            // Could already be gone if stopping the entire app - let it throw an exception
-            MySqlNode master = (MySqlNode) Iterables.find(cluster.getMembers(), IS_MASTER);
-            String username = validateSqlParam(cluster.getConfig(SLAVE_USERNAME));
-            executeScriptOnNode(master, String.format("DROP USER '%s'@'%s';", username, slaveAddress));
-        }
-
-    }
-
-    // Can't call node.executeScript directly, need to change execution context, so use an effector task
-    private static String executeScriptOnNode(MySqlNode node, String commands) {
-        return node.invoke(MySqlNode.EXECUTE_SCRIPT, ImmutableMap.of(MySqlNode.EXECUTE_SCRIPT_COMMANDS, commands)).getUnchecked();
-    }
-
-    private static String validateSqlParam(String config) {
-        // Don't go into escape madness, just deny any suspicious strings.
-        // Would be nice to use prepared statements, but not worth pulling in the extra dependencies.
-        if (config.contains("'") && config.contains("\\")) {
-            throw new IllegalStateException("User provided string contains illegal SQL characters: " + config);
-        }
-        return config;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterUtils.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterUtils.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterUtils.java
new file mode 100644
index 0000000..9f8dc6d
--- /dev/null
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.database.mysql;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.entity.EntityPredicates;
+import org.apache.brooklyn.entity.database.DatastoreMixins.CanExecuteScript;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+
+public class MySqlClusterUtils {
+    protected static final Predicate<Entity> IS_MASTER = EntityPredicates.configEqualTo(MySqlNode.MYSQL_SERVER_ID, MySqlClusterImpl.MASTER_SERVER_ID);
+
+    protected static String executeSqlOnNode(MySqlNode node, String commands) {
+        return executeSqlOnNodeAsync(node, commands).getUnchecked();
+    }
+
+    // Can't call node.executeScript directly, need to change execution context, so use an effector task
+    protected static Task<String> executeSqlOnNodeAsync(MySqlNode node, String commands) {
+        return DynamicTasks.queue(Effectors.invocation(node, MySqlNode.EXECUTE_SCRIPT, ImmutableMap.of(CanExecuteScript.COMMANDS.getName(), commands))).asTask();
+    }
+
+    protected static String validateSqlParam(String config) {
+        // Don't go into escape madness, just deny any suspicious strings.
+        // Would be nice to use prepared statements, but not worth pulling in the extra dependencies.
+        if (config.contains("'") && config.contains("\\")) {
+            throw new IllegalStateException("User provided string contains illegal SQL characters: " + config);
+        }
+        return config;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java
index 461369b..b4da5f9 100644
--- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java
@@ -25,6 +25,9 @@ import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
  * The {@link SoftwareProcessDriver} for MySQL.
  */
 public interface MySqlDriver extends SoftwareProcessDriver {
-    public String getStatusCmd();
-    public ProcessTaskWrapper<Integer> executeScriptAsync(String commands);
+    String getStatusCmd();
+    ProcessTaskWrapper<Integer> executeScriptAsync(String commands);
+    ProcessTaskWrapper<Integer> executeScriptFromInstalledFileAsync(String filenameAlreadyInstalledAtServer);
+    ProcessTaskWrapper<Integer> dumpDatabase(String additionalOptions, String dumpDestination);
+    void changePassword(String oldPass, String newPass);
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java
index 484606e..7f9e508 100644
--- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java
@@ -19,21 +19,21 @@
 package org.apache.brooklyn.entity.database.mysql;
 
 import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.ImplementedBy;
 import org.apache.brooklyn.api.objs.HasShortName;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.annotation.Effector;
 import org.apache.brooklyn.core.annotation.EffectorParam;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.config.MapConfigKey;
-import org.apache.brooklyn.core.effector.MethodEffector;
+import org.apache.brooklyn.core.effector.Effectors;
 import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.location.PortRanges;
 import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey;
+import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey;
 import org.apache.brooklyn.core.sensor.PortAttributeSensorAndConfigKey;
 import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey;
 import org.apache.brooklyn.entity.database.DatastoreMixins.DatastoreCommon;
 import org.apache.brooklyn.entity.software.base.SoftwareProcess;
 import org.apache.brooklyn.util.core.flags.SetFromFlag;
@@ -86,10 +86,39 @@ public interface MySqlNode extends SoftwareProcess, HasShortName, DatastoreCommo
 
     AttributeSensor<Double> QUERIES_PER_SECOND_FROM_MYSQL = Sensors.newDoubleSensor("mysql.queries.perSec.fromMysql");
 
-    MethodEffector<String> EXECUTE_SCRIPT = new MethodEffector<String>(MySqlNode.class, "executeScript");
-    String EXECUTE_SCRIPT_COMMANDS = "commands";
+    interface ExportDumpEffector {
+        ConfigKey<String> PATH = ConfigKeys.newStringConfigKey("path", "Where to export the dump to. Resolved against runtime directory if relative.", "dump.sql");
+        ConfigKey<String> ADDITIONAL_OPTIONS = ConfigKeys.newStringConfigKey("additionalOptions", "Additional command line options to pass to mysqldump");
+
+        Effector<Void> EXPORT_DUMP = Effectors.effector(Void.class, "export_dump")
+                .description("Invokes mysqldump against the node")
+                .parameter(PATH)
+                .parameter(ADDITIONAL_OPTIONS)
+                .buildAbstract();
+    }
+    Effector<Void> EXPORT_DUMP = ExportDumpEffector.EXPORT_DUMP;
+
+    interface ImportDumpEffector {
+        ConfigKey<String> PATH = ConfigKeys.newStringConfigKey("path", "Path to a file with SQL statements to import as the root user");
+
+        Effector<Void> IMPORT_DUMP = Effectors.effector(Void.class, "import_dump")
+                .description("Runs the sql statements in the file as the root user")
+                .parameter(PATH)
+                .buildAbstract();
+    }
+    Effector<Void> IMPORT_DUMP = ImportDumpEffector.IMPORT_DUMP;
+
+    interface ChangePasswordEffector {
+        ConfigKey<String> PASSWORD = ConfigKeys.newStringConfigKey("password", "New password to set");
+
+        Effector<Void> CHANGE_PASSWORD = Effectors.effector(Void.class, "change_password")
+                .description("Change the mysql root password")
+                .parameter(PASSWORD)
+                .buildAbstract();
+    }
+    Effector<Void> CHANGE_PASSWORD = ChangePasswordEffector.CHANGE_PASSWORD;
 
-    @Effector(description = "Execute SQL script on the node as the root user")
-    String executeScript(@EffectorParam(name=EXECUTE_SCRIPT_COMMANDS) String commands);
+    @org.apache.brooklyn.core.annotation.Effector(description = "Execute SQL script on the node as the root user")
+    public String executeScript(@EffectorParam(name="commands") String commands);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeEffectors.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeEffectors.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeEffectors.java
new file mode 100644
index 0000000..af68959
--- /dev/null
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeEffectors.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.database.mysql;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.core.effector.EffectorBody;
+import org.apache.brooklyn.core.effector.EffectorTasks;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.entity.database.mysql.MySqlNode.ChangePasswordEffector;
+import org.apache.brooklyn.entity.database.mysql.MySqlNode.ExportDumpEffector;
+import org.apache.brooklyn.entity.database.mysql.MySqlNode.ImportDumpEffector;
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.ssh.SshTasks;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+public class MySqlNodeEffectors {
+    public static class ExportDumpEffectoryBody extends EffectorBody<Void> implements ExportDumpEffector {
+        @Override
+        public Void call(ConfigBag parameters) {
+            String path = parameters.get(PATH);
+            String additionalOptions = Strings.nullToEmpty(parameters.get(ADDITIONAL_OPTIONS));
+            //TODO additionalOptions, path are not sanitized and are coming from the user.
+            //Should we try to sanitize (potentially limiting the range of possible inputs),
+            //or just assume the user has full machine access anyway?
+            ((MySqlNodeImpl)entity()).getDriver().dumpDatabase(additionalOptions, path);
+            return null;
+        }
+    }
+    public static Effector<Void> EXPORT_DUMP = Effectors.effector(ExportDumpEffector.EXPORT_DUMP)
+            .impl(new ExportDumpEffectoryBody())
+            .build();
+
+    public static class ImportDumpEffectorBody extends EffectorBody<Void> implements ImportDumpEffector {
+        @Override
+        public Void call(ConfigBag parameters) {
+            String path = Preconditions.checkNotNull(parameters.get(PATH), "path is required");
+            // TODO sanitize path?
+            ((MySqlNodeImpl)entity()).getDriver().executeScriptFromInstalledFileAsync(path);
+            return null;
+        }
+    }
+    public static Effector<Void> IMPORT_DUMP = Effectors.effector(ImportDumpEffector.IMPORT_DUMP)
+            .impl(new ImportDumpEffectorBody())
+            .build();
+
+    public static class ChangePasswordEffectorBody extends EffectorBody<Void> implements ChangePasswordEffector {
+        @Override
+        public Void call(ConfigBag parameters) {
+            String newPass = Preconditions.checkNotNull(parameters.get(PASSWORD), "password is required");
+            String oldPass = entity().getAttribute(MySqlNode.PASSWORD);
+            entity().sensors().set(MySqlNode.PASSWORD, newPass);
+            MySqlDriver driver = ((MySqlNodeImpl)entity()).getDriver();
+            driver.changePassword(oldPass, newPass);
+            SshMachineLocation machine = EffectorTasks.getSshMachine(entity());
+            DynamicTasks.queue(
+                    SshTasks.newSshExecTaskFactory(machine, 
+                                    "cd "+entity().getAttribute(MySqlNode.RUN_DIR),
+                                    "sed -i'' -e 's@^\\(\\s*password\\s*=\\s*\\).*$@\\1" + newPass.replace("\\", "\\\\") + "@g' mymysql.cnf")
+                            .requiringExitCodeZero()
+                            .summary("Change root password"));
+            return null;
+        }
+    }
+    public static Effector<Void> CHANGE_PASSWORD = Effectors.effector(ChangePasswordEffector.CHANGE_PASSWORD)
+            .impl(new ChangePasswordEffectorBody())
+            .build();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java
index dd0aac7..f470390 100644
--- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java
@@ -27,8 +27,6 @@ import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl;
 import org.apache.brooklyn.feed.ssh.SshFeed;
 import org.apache.brooklyn.feed.ssh.SshPollConfig;
 import org.apache.brooklyn.feed.ssh.SshPollValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.brooklyn.location.ssh.SshMachineLocation;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.config.ConfigBag;
@@ -36,6 +34,8 @@ import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.text.Identifiers;
 import org.apache.brooklyn.util.text.Strings;
 import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
 
@@ -79,6 +79,9 @@ public class MySqlNodeImpl extends SoftwareProcessImpl implements MySqlNode {
                 return executeScript((String)parameters.getStringKey("commands"));
             }
         });
+        getMutableEntityType().addEffector(MySqlNodeEffectors.EXPORT_DUMP);
+        getMutableEntityType().addEffector(MySqlNodeEffectors.IMPORT_DUMP);
+        getMutableEntityType().addEffector(MySqlNodeEffectors.CHANGE_PASSWORD);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java
index 30d9abd..01ee983 100644
--- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java
@@ -33,18 +33,18 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.entity.database.DatastoreMixins;
-import org.apache.brooklyn.entity.software.base.AbstractSoftwareProcessSshDriver;
 import org.apache.brooklyn.api.location.OsDetails;
+import org.apache.brooklyn.core.effector.EffectorTasks;
 import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks;
 import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.location.BasicOsDetails.OsVersions;
+import org.apache.brooklyn.entity.database.DatastoreMixins;
+import org.apache.brooklyn.entity.software.base.AbstractSoftwareProcessSshDriver;
 import org.apache.brooklyn.location.ssh.SshMachineLocation;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.ssh.SshTasks;
 import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.io.FileUtil;
@@ -57,6 +57,8 @@ import org.apache.brooklyn.util.text.Identifiers;
 import org.apache.brooklyn.util.text.Strings;
 import org.apache.brooklyn.util.time.CountdownTimer;
 import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableMap;
 
@@ -165,11 +167,7 @@ public class MySqlSshDriver extends AbstractSoftwareProcessSshDriver implements
         boolean hasCreationScript = copyDatabaseCreationScript();
         timer.waitForExpiryUnchecked();
 
-        DynamicTasks.queue(
-            SshEffectorTasks.ssh(
-                "cd "+getRunDir(),
-                getBaseDir()+"/bin/mysqladmin --defaults-file="+getConfigFile()+" --password= password "+getPassword()
-            ).summary("setting password"));
+        changePassword("", getPassword());
 
         if (hasCreationScript)
             executeScriptFromInstalledFileAsync("creation-script.sql").asTask().getUnchecked();
@@ -179,6 +177,16 @@ public class MySqlSshDriver extends AbstractSoftwareProcessSshDriver implements
         stop();
     }
 
+    @Override
+    public void changePassword(String oldPass, String newPass) {
+        DynamicTasks.queue(
+            SshEffectorTasks.ssh(
+                "cd "+getRunDir(),
+                getBaseDir()+"/bin/mysqladmin --defaults-file="+getConfigFile()+" --password=" + oldPass + " password "+newPass)
+            .summary("setting password")
+            .requiringExitCodeZero());
+    }
+
     protected void copyDatabaseConfigScript() {
         newScript(CUSTOMIZING).execute();  //create the directory
 
@@ -264,13 +272,26 @@ public class MySqlSshDriver extends AbstractSoftwareProcessSshDriver implements
         return executeScriptFromInstalledFileAsync(filename);
     }
 
+    @Override
     public ProcessTaskWrapper<Integer> executeScriptFromInstalledFileAsync(String filenameAlreadyInstalledAtServer) {
+        SshMachineLocation machine = EffectorTasks.getSshMachine(entity);
         return DynamicTasks.queue(
-                SshEffectorTasks.ssh(
+                SshTasks.newSshExecTaskFactory(machine, 
                                 "cd "+getRunDir(),
                                 getBaseDir()+"/bin/mysql --defaults-file="+getConfigFile()+" < "+filenameAlreadyInstalledAtServer)
                         .requiringExitCodeZero()
                         .summary("executing datastore script "+filenameAlreadyInstalledAtServer));
     }
 
+    @Override
+    public ProcessTaskWrapper<Integer> dumpDatabase(String additionalOptions, String dumpDestination) {
+        SshMachineLocation machine = EffectorTasks.getSshMachine(entity);
+        return DynamicTasks.queue(
+                SshTasks.newSshExecTaskFactory(machine, 
+                                "cd "+getRunDir(),
+                                getBaseDir()+"/bin/mysqldump --defaults-file="+getConfigFile()+" "+additionalOptions+" > "+dumpDestination)
+                        .requiringExitCodeZero()
+                        .summary("Dumping database to " + dumpDestination));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/ReplicationSnapshot.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/ReplicationSnapshot.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/ReplicationSnapshot.java
new file mode 100644
index 0000000..48af15a
--- /dev/null
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/ReplicationSnapshot.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.database.mysql;
+
+public class ReplicationSnapshot {
+    private String entityId;
+    private String snapshotPath;
+    private String binLogName;
+    private int binLogPosition;
+
+    public ReplicationSnapshot(String entityId, String snapshotPath, String binLogName, int binLogPosition) {
+        this.entityId = entityId;
+        this.snapshotPath = snapshotPath;
+        this.binLogName = binLogName;
+        this.binLogPosition = binLogPosition;
+    }
+
+    public String getEntityId() {
+        return entityId;
+    }
+    public void setEntityId(String entityId) {
+        this.entityId = entityId;
+    }
+    public String getSnapshotPath() {
+        return snapshotPath;
+    }
+    public void setSnapshotPath(String snapshotPath) {
+        this.snapshotPath = snapshotPath;
+    }
+    public String getBinLogName() {
+        return binLogName;
+    }
+    public void setBinLogName(String binLogName) {
+        this.binLogName = binLogName;
+    }
+    public int getBinLogPosition() {
+        return binLogPosition;
+    }
+    public void setBinLogPosition(int binLogPosition) {
+        this.binLogPosition = binLogPosition;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/test/java/org/apache/brooklyn/entity/database/VogellaExampleAccess.java
----------------------------------------------------------------------
diff --git a/software/database/src/test/java/org/apache/brooklyn/entity/database/VogellaExampleAccess.java b/software/database/src/test/java/org/apache/brooklyn/entity/database/VogellaExampleAccess.java
index 6b5a6cb..e1874b9 100644
--- a/software/database/src/test/java/org/apache/brooklyn/entity/database/VogellaExampleAccess.java
+++ b/software/database/src/test/java/org/apache/brooklyn/entity/database/VogellaExampleAccess.java
@@ -123,11 +123,11 @@ public class VogellaExampleAccess {
 
     private void writeMetaData(ResultSet resultSet) throws SQLException {
         // Get some metadata from the database
-        log.info("The columns in the table are: ");
+        log.debug("The columns in the table are: ");
 
-        log.info("Table: " + resultSet.getMetaData().getTableName(1));
+        log.debug("Table: " + resultSet.getMetaData().getTableName(1));
         for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
-            log.info("Column " + i + " " + resultSet.getMetaData().getColumnName(i));
+            log.debug("Column " + i + " " + resultSet.getMetaData().getColumnName(i));
         }
     }
 
@@ -138,11 +138,11 @@ public class VogellaExampleAccess {
             String date = row.get(2);
             String summary = row.get(3);
             String comment = row.get(4);
-            log.info("User: " + user);
-            log.info("Website: " + website);
-            log.info("Summary: " + summary);
-            log.info("Date: " + date);
-            log.info("Comment: " + comment);
+            log.debug("User: " + user);
+            log.debug("Website: " + website);
+            log.debug("Summary: " + summary);
+            log.debug("Date: " + date);
+            log.debug("Comment: " + comment);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/test/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/database/src/test/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterIntegrationTest.java b/software/database/src/test/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterIntegrationTest.java
index c5e12d5..c250843 100644
--- a/software/database/src/test/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterIntegrationTest.java
+++ b/software/database/src/test/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterIntegrationTest.java
@@ -18,28 +18,43 @@
  */
 package org.apache.brooklyn.entity.database.mysql;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+
+import java.util.Map;
+
 import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.core.effector.EffectorTasks;
+import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks;
+import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.test.BrooklynAppLiveTestSupport;
+import org.apache.brooklyn.entity.database.mysql.MySqlCluster.MySqlMaster;
+import org.apache.brooklyn.entity.software.base.SoftwareProcess;
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.task.ssh.SshTasks;
 import org.apache.brooklyn.util.os.Os;
+import org.apache.brooklyn.util.ssh.BashCommands;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
 public class MySqlClusterIntegrationTest extends BrooklynAppLiveTestSupport {
 
-    @Test(groups = {"Integration"})
+    private static final String TEST_LOCATION = "localhost";
+
+    @Test(groups="Integration")
     public void testAllNodesInit() throws Exception {
         try {
             MySqlClusterTestHelper.test(app, getLocation());
         } finally {
-            for (Entity member : Iterables.getOnlyElement(app.getChildren()).getChildren()) {
-                String runDir = member.getAttribute(MySqlNode.RUN_DIR);
-                if (runDir != null) {
-                    Os.deleteRecursively(runDir);
-                }
-            }
+            cleanData();
         }
     }
 
@@ -48,16 +63,115 @@ public class MySqlClusterIntegrationTest extends BrooklynAppLiveTestSupport {
         try {
             MySqlClusterTestHelper.testMasterInit(app, getLocation());
         } finally {
-            for (Entity member : Iterables.getOnlyElement(app.getChildren()).getChildren()) {
-                String runDir = member.getAttribute(MySqlNode.RUN_DIR);
-                if (runDir != null) {
-                    Os.deleteRecursively(runDir);
+            cleanData();
+        }
+    }
+
+    @Test(groups="Integration")
+    public void testDumpReplication() throws Exception {
+        try {
+            Location loc = getLocation();
+            EntitySpec<MySqlCluster> clusterSpec = EntitySpec.create(MySqlCluster.class)
+                    .configure(MySqlMaster.MASTER_CREATION_SCRIPT_CONTENTS, MySqlClusterTestHelper.CREATION_SCRIPT)
+                    .configure(MySqlNode.MYSQL_SERVER_CONF, MutableMap.<String, Object>of("skip-name-resolve",""));
+            MySqlCluster cluster = MySqlClusterTestHelper.initCluster(app, loc, clusterSpec);
+            MySqlNode master = (MySqlNode) cluster.getAttribute(MySqlCluster.FIRST);
+            purgeLogs(cluster, master);
+
+            // test dump replication from master
+            MySqlNode slave = (MySqlNode) Iterables.getOnlyElement(cluster.invoke(MySqlCluster.RESIZE_BY_DELTA, ImmutableMap.of("delta", 1)).getUnchecked());
+            assertEquals(cluster.getAttribute(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT).getEntityId(), master.getId());
+            MySqlClusterTestHelper.assertReplication(master, slave);
+
+            // test dump replication from slave, missing dump on node
+            deleteSnapshot(cluster);
+            cluster.config().set(MySqlCluster.REPLICATION_PREFERRED_SOURCE, slave.getId());
+            MySqlNode secondSlave = (MySqlNode) Iterables.getOnlyElement(cluster.invoke(MySqlCluster.RESIZE_BY_DELTA, ImmutableMap.of("delta", 1)).getUnchecked());
+            assertEquals(cluster.getAttribute(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT).getEntityId(), slave.getId());
+            MySqlClusterTestHelper.assertReplication(master, secondSlave);
+
+            // test dump replication from slave, missing snapshot entity
+            Entities.destroy(slave);
+            cluster.config().set(MySqlCluster.REPLICATION_PREFERRED_SOURCE, secondSlave.getId());
+            MySqlNode thirdSlave = (MySqlNode) Iterables.getOnlyElement(cluster.invoke(MySqlCluster.RESIZE_BY_DELTA, ImmutableMap.of("delta", 1)).getUnchecked());
+            assertEquals(cluster.getAttribute(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT).getEntityId(), secondSlave.getId());
+            MySqlClusterTestHelper.assertReplication(master, thirdSlave);
+        } finally {
+            cleanData();
+        }
+    }
+
+    private void deleteSnapshot(MySqlCluster cluster) {
+        ReplicationSnapshot replicationSnapshot = cluster.getAttribute(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT);
+        Entity snapshotEntity = mgmt.getEntityManager().getEntity(replicationSnapshot.getEntityId());
+        SshMachineLocation machine = EffectorTasks.getSshMachine(snapshotEntity);
+        Entities.submit(snapshotEntity, SshEffectorTasks.ssh(
+                "cd $RUN_DIR",
+                "rm " + replicationSnapshot.getSnapshotPath())
+            .summary("clear snapshot")
+            .machine(machine)
+            .environmentVariable("RUN_DIR", snapshotEntity.getAttribute(MySqlNode.RUN_DIR))
+            .requiringExitCodeZero())
+        .asTask()
+        .getUnchecked();
+    }
+
+    private void purgeLogs(MySqlCluster cluster, MySqlNode master) {
+        String preFlushBinaryLogFile = getBinaryLogFile(master);
+        ReplicationSnapshot replicationSnapshot = master.getParent().getAttribute(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT);
+        assertEquals(preFlushBinaryLogFile, replicationSnapshot.getBinLogName());
+        MySqlClusterTestHelper.execSql(master, "FLUSH LOGS");
+        String postFlushBinaryLogFile = getBinaryLogFile(master);
+        waitSlavesCatchUp(cluster, postFlushBinaryLogFile);
+        assertNotEquals(postFlushBinaryLogFile, preFlushBinaryLogFile);
+        MySqlClusterTestHelper.execSql(master, "PURGE BINARY LOGS TO '" + postFlushBinaryLogFile + "';");
+        assertFalse(fileExists(master, preFlushBinaryLogFile));
+    }
+
+    private void waitSlavesCatchUp(final MySqlCluster cluster, final String binLog) {
+        Asserts.succeedsEventually(new Runnable() {
+            @Override
+            public void run() {
+                MySqlNode master = (MySqlNode) cluster.getAttribute(MySqlCluster.FIRST);
+                for (Entity node : cluster.getMembers()) {
+                    if (node == master) continue;
+                    String status = MySqlClusterTestHelper.execSql((MySqlNode) node, "SHOW SLAVE STATUS \\G");
+                    Map<String, String> map = MySqlRowParser.parseSingle(status);
+                    assertEquals(map.get("Relay_Master_Log_File"), binLog);
                 }
             }
+        });
+    }
+    private String getBinaryLogFile(MySqlNode master) {
+        String status = MySqlClusterTestHelper.execSql(master, "SHOW MASTER STATUS \\G");
+        Map<String, String> map = MySqlRowParser.parseSingle(status);
+        return map.get("File");
+    }
+    private boolean fileExists(MySqlNode node, String binLogName) {
+        String dataDir = Strings.nullToEmpty(node.getConfig(MySqlNode.DATA_DIR));
+        String path = Os.mergePathsUnix(dataDir, binLogName);
+        String cmd = BashCommands.chain(
+                "cd $RUN_DIR",
+                BashCommands.requireTest(String.format("-f \"%s\"", path), "File " + path + " doesn't exist."));
+        String summary = "Check if file " + path + " exists";
+        SshMachineLocation machine = EffectorTasks.getSshMachine(node);
+        return Entities.submit(node, SshTasks.newSshExecTaskFactory(machine, cmd)
+                .allowingNonZeroExitCode()
+                .environmentVariable("RUN_DIR", node.getAttribute(SoftwareProcess.RUN_DIR))
+                .summary(summary)
+                .allowingNonZeroExitCode()).asTask().getUnchecked() == 0;
+    }
+    private void cleanData() {
+        if (app.getChildren().isEmpty()) return;
+        for (Entity member : Iterables.getOnlyElement(app.getChildren()).getChildren()) {
+            String runDir = member.getAttribute(MySqlNode.RUN_DIR);
+            if (runDir != null) {
+                Os.deleteRecursively(runDir);
+            }
         }
     }
 
     private Location getLocation() {
-        return mgmt.getLocationRegistry().resolve("localhost");
+        return mgmt.getLocationRegistry().resolve(TEST_LOCATION);
     }
 }