You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sj...@apache.org on 2015/10/26 13:31:23 UTC
[2/7] incubator-brooklyn git commit: Dump based replication init
Dump based replication init
When the replication logs on master no longer start from the beginning a database dump will be performed to initialize the slave
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/0de0a2a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/0de0a2a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/0de0a2a9
Branch: refs/heads/master
Commit: 0de0a2a944b08437c4f0eda407265bd89a26147a
Parents: 5fda6ed
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Thu Oct 15 17:36:31 2015 +0300
Committer: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Committed: Mon Oct 19 16:48:52 2015 +0300
----------------------------------------------------------------------
.../brooklyn/util/core/task/DynamicTasks.java | 17 +
.../system_service/SystemServiceEnricher.java | 12 +-
.../entity/database/DatastoreMixins.java | 5 +-
.../database/mysql/InitSlaveTaskBody.java | 385 +++++++++++++++++++
.../entity/database/mysql/MySqlCluster.java | 12 +-
.../entity/database/mysql/MySqlClusterImpl.java | 230 ++++-------
.../database/mysql/MySqlClusterUtils.java | 52 +++
.../entity/database/mysql/MySqlDriver.java | 7 +-
.../entity/database/mysql/MySqlNode.java | 43 ++-
.../database/mysql/MySqlNodeEffectors.java | 87 +++++
.../entity/database/mysql/MySqlNodeImpl.java | 7 +-
.../entity/database/mysql/MySqlSshDriver.java | 41 +-
.../database/mysql/ReplicationSnapshot.java | 58 +++
.../entity/database/VogellaExampleAccess.java | 16 +-
.../mysql/MySqlClusterIntegrationTest.java | 138 ++++++-
.../database/mysql/MySqlClusterTestHelper.java | 35 +-
16 files changed, 922 insertions(+), 223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
index 52ec88d..5574709 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
@@ -28,6 +28,7 @@ import org.apache.brooklyn.api.mgmt.TaskAdaptable;
import org.apache.brooklyn.api.mgmt.TaskFactory;
import org.apache.brooklyn.api.mgmt.TaskQueueingContext;
import org.apache.brooklyn.api.mgmt.TaskWrapper;
+import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.time.Duration;
@@ -333,4 +334,20 @@ public class DynamicTasks {
return queueIfPossible(task).orSubmitAsync(entity).asTask();
}
+ /** Breaks the parent-child relation between Tasks.current() and the task passed,
+ * making the new task a top-level one at the target entity.
+ * To make it visible in the UI, also tag the task with:
+ * .tag(BrooklynTaskTags.tagForContextEntity(entity))
+ * .tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG)
+ */
+ public static <T> Task<T> submitTopLevelTask(TaskAdaptable<T> task, Entity entity) {
+ Task<?> currentTask = BasicExecutionManager.getPerThreadCurrentTask().get();
+ BasicExecutionManager.getPerThreadCurrentTask().set(null);
+ try {
+ return Entities.submit(entity, task).asTask();
+ } finally {
+ BasicExecutionManager.getPerThreadCurrentTask().set(currentTask);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
index fd9c32e..26c0fdb 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
@@ -114,17 +114,7 @@ public class SystemServiceEnricher extends AbstractEnricher implements Enricher
.tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG)
.build();
- submitTopLevel(updateService);
- }
-
- private void submitTopLevel(Task<Void> updateService) {
- Task<?> currentTask = BasicExecutionManager.getPerThreadCurrentTask().get();
- BasicExecutionManager.getPerThreadCurrentTask().set(null);
- try {
- Entities.submit(entity, updateService);
- } finally {
- BasicExecutionManager.getPerThreadCurrentTask().set(currentTask);
- }
+ DynamicTasks.submitTopLevelTask(updateService, entity);
}
private String getLaunchScript(String stdin, String env) {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/DatastoreMixins.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/DatastoreMixins.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/DatastoreMixins.java
index 534c0eb..67eda16 100644
--- a/software/database/src/main/java/org/apache/brooklyn/entity/database/DatastoreMixins.java
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/DatastoreMixins.java
@@ -50,9 +50,10 @@ public class DatastoreMixins {
public static final Effector<String> EXECUTE_SCRIPT = CanExecuteScript.EXECUTE_SCRIPT;
public static interface CanExecuteScript {
- public static final Effector<String> EXECUTE_SCRIPT = Effectors.effector(String.class, "executeScript")
+ ConfigKey<String> COMMANDS = ConfigKeys.newStringConfigKey("commands");
+ Effector<String> EXECUTE_SCRIPT = Effectors.effector(String.class, "executeScript")
.description("executes the given script contents")
- .parameter(String.class, "commands")
+ .parameter(COMMANDS)
.buildAbstract();
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/InitSlaveTaskBody.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/InitSlaveTaskBody.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/InitSlaveTaskBody.java
new file mode 100644
index 0000000..f125024
--- /dev/null
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/InitSlaveTaskBody.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.database.mysql;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.effector.EffectorTasks;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks;
+import org.apache.brooklyn.core.entity.EntityPredicates;
+import org.apache.brooklyn.core.sensor.DependentConfiguration;
+import org.apache.brooklyn.entity.database.mysql.MySqlNode.ExportDumpEffector;
+import org.apache.brooklyn.entity.software.base.SoftwareProcess;
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.TaskTags;
+import org.apache.brooklyn.util.core.task.ssh.SshTasks;
+import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory;
+import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.os.Os;
+import org.apache.brooklyn.util.ssh.BashCommands;
+import org.apache.brooklyn.util.text.Identifiers;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang3.concurrent.ConcurrentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicates;
+import com.google.common.base.Strings;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+public class InitSlaveTaskBody implements Runnable {
+ private static final String SNAPSHOT_DUMP_OPTIONS = "--skip-lock-tables --single-transaction --flush-logs --hex-blob -A";
+
+ private static final Logger log = LoggerFactory.getLogger(InitSlaveTaskBody.class);
+
+ private final MySqlCluster cluster;
+ private final MySqlNode slave;
+ private Semaphore lock;
+
+ public InitSlaveTaskBody(MySqlCluster cluster, MySqlNode slave, Semaphore lock) {
+ this.cluster = cluster;
+ this.slave = slave;
+ this.lock = lock;
+ }
+
+ @Override
+ public void run() {
+ // Replication init state consists of:
+ // * initial dump (optional)
+ // * location of initial dump (could be on any of the members, optional)
+ // * bin log file name
+ // * bin log position
+ // 1. Check replication state:
+ // * Does the dump exist (and the machine where it is located)
+ // * Does the bin log exist on the master
+ // 2. If the replication state is not valid create a new one
+ // * Select a slave to dump, master if no slaves
+ // * If it's a slave do 'STOP SLAVE SQL_THREAD;'
+ // * Call mysqldump to create the snapshot
+ // * When done if a slave do 'START SLAVE SQL_THREAD;'
+ // * Get master state from the dump - grep "MASTER_LOG_POS" dump.sql.
+ // If slave get state from 'SHOW SLAVE STATUS'
+ // * Save new init info in cluster - bin log name, position, dump
+ // 3. Init Slave
+ // * transfer dump to new slave (if dump exists)
+ // * import - mysql < ~/dump.sql
+ // * change master to and start slave
+ //!!! Caveat if dumping from master and MyISAM tables are used dump may be inconsistent.
+ // * Only way around it is to lock the database while dumping (or taking a snapshot through LVM which is quicker)
+ bootstrapSlaveAsync(getValidReplicationInfo(), slave);
+ cluster.getAttribute(MySqlClusterImpl.SLAVE_ID_ADDRESS_MAPPING).put(slave.getId(), slave.getAttribute(MySqlNode.SUBNET_ADDRESS));
+ }
+
+ private MySqlNode getMaster() {
+ return (MySqlNode) Iterables.find(cluster.getMembers(), MySqlClusterUtils.IS_MASTER);
+ }
+
+ private void bootstrapSlaveAsync(final Future<ReplicationSnapshot> replicationInfoFuture, final MySqlNode slave) {
+ DynamicTasks.queue("bootstrap slave replication", new Runnable() {
+ @Override
+ public void run() {
+ ReplicationSnapshot replicationSnapshot;
+ try {
+ replicationSnapshot = replicationInfoFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw Exceptions.propagate(e);
+ }
+
+ MySqlNode master = getMaster();
+ String masterAddress = MySqlClusterUtils.validateSqlParam(master.getAttribute(MySqlNode.SUBNET_ADDRESS));
+ Integer masterPort = master.getAttribute(MySqlNode.MYSQL_PORT);
+ String slaveAddress = MySqlClusterUtils.validateSqlParam(slave.getAttribute(MySqlNode.SUBNET_ADDRESS));
+ String username = MySqlClusterUtils.validateSqlParam(cluster.getConfig(MySqlCluster.SLAVE_USERNAME));
+ String password = MySqlClusterUtils.validateSqlParam(cluster.getAttribute(MySqlCluster.SLAVE_PASSWORD));
+
+ if (replicationSnapshot.getEntityId() != null) {
+ Entity sourceEntity = Iterables.find(cluster.getMembers(), EntityPredicates.idEqualTo(replicationSnapshot.getEntityId()));
+ String dumpId = FilenameUtils.removeExtension(replicationSnapshot.getSnapshotPath());
+ copyDumpAsync(sourceEntity, slave, replicationSnapshot.getSnapshotPath(), dumpId);
+ DynamicTasks.queue(Effectors.invocation(slave, MySqlNode.IMPORT_DUMP, ImmutableMap.of("path", replicationSnapshot.getSnapshotPath())));
+ //The dump resets the password to whatever is on the source instance, reset it back.
+ //We are able to still login because privileges are not flushed, so we just set the password to the same value.
+ DynamicTasks.queue(Effectors.invocation(slave, MySqlNode.CHANGE_PASSWORD, ImmutableMap.of("password", slave.getAttribute(MySqlNode.PASSWORD)))); //
+ //Flush privileges to load new users coming from the dump
+ MySqlClusterUtils.executeSqlOnNodeAsync(slave, "FLUSH PRIVILEGES;");
+ }
+
+ MySqlClusterUtils.executeSqlOnNodeAsync(master, String.format(
+ "CREATE USER '%s'@'%s' IDENTIFIED BY '%s';\n" +
+ "GRANT REPLICATION SLAVE ON *.* TO '%s'@'%s';\n",
+ username, slaveAddress, password, username, slaveAddress));
+
+ // Executing this will unblock SERVICE_UP wait in the start effector
+ String slaveCmd = String.format(
+ "CHANGE MASTER TO " +
+ "MASTER_HOST='%s', " +
+ "MASTER_PORT=%d, " +
+ "MASTER_USER='%s', " +
+ "MASTER_PASSWORD='%s', " +
+ "MASTER_LOG_FILE='%s', " +
+ "MASTER_LOG_POS=%d;\n" +
+ "START SLAVE;\n",
+ masterAddress, masterPort,
+ username, password,
+ replicationSnapshot.getBinLogName(),
+ replicationSnapshot.getBinLogPosition());
+ MySqlClusterUtils.executeSqlOnNodeAsync(slave, slaveCmd);
+ }
+ });
+ }
+
+ private void copyDumpAsync(Entity source, Entity dest, String sourceDumpPath, String dumpId) {
+ final SshMachineLocation sourceMachine = EffectorTasks.getSshMachine(source);
+ final SshMachineLocation destMachine = EffectorTasks.getSshMachine(dest);
+
+ String sourceRunDir = source.getAttribute(MySqlNode.RUN_DIR);
+ String privateKeyFile = dumpId + ".id_rsa";
+ final Task<String> tempKeyTask = DynamicTasks.queue(SshEffectorTasks.ssh(
+ "cd $RUN_DIR",
+ "PRIVATE_KEY=" + privateKeyFile,
+ "ssh-keygen -t rsa -N '' -f $PRIVATE_KEY -C " + dumpId + " > /dev/null",
+ "cat $PRIVATE_KEY.pub")
+ .environmentVariable("RUN_DIR", sourceRunDir)
+ .machine(sourceMachine)
+ .summary("generate private key for slave access")
+ .requiringZeroAndReturningStdout())
+ .asTask();
+
+ DynamicTasks.queue("add key to authorized_keys", new Runnable() {
+ @Override
+ public void run() {
+ String publicKey = tempKeyTask.getUnchecked();
+ DynamicTasks.queue(SshEffectorTasks.ssh(String.format(
+ "cat >> ~/.ssh/authorized_keys <<EOF\n%s\nEOF",
+ publicKey))
+ .machine(destMachine)
+ .summary("Add key to authorized_keys")
+ .requiringExitCodeZero());
+ }
+ });
+
+ final ProcessTaskWrapper<Integer> copyTask = SshEffectorTasks.ssh(
+ "cd $RUN_DIR",
+ String.format(
+ "scp -o 'BatchMode yes' -o 'StrictHostKeyChecking no' -i '%s' '%s' '%s@%s:%s/%s.sql'",
+ privateKeyFile,
+ sourceDumpPath,
+ destMachine.getUser(),
+ dest.getAttribute(MySqlNode.SUBNET_ADDRESS),
+ dest.getAttribute(MySqlNode.RUN_DIR),
+ dumpId))
+ .environmentVariable("RUN_DIR", sourceRunDir)
+ .machine(sourceMachine)
+ .summary("copy database dump to slave")
+ .newTask();
+ // Let next couple of tasks complete even if this one fails so that we can clean up.
+ TaskTags.markInessential(copyTask);
+ DynamicTasks.queue(copyTask);
+
+ // Delete private key
+ DynamicTasks.queue(SshEffectorTasks.ssh(
+ "cd $RUN_DIR",
+ "rm " + privateKeyFile)
+ .environmentVariable("RUN_DIR", sourceRunDir)
+ .machine(sourceMachine)
+ .summary("remove private key"));
+
+ DynamicTasks.queue(SshEffectorTasks.ssh(String.format(
+ "sed -i'' -e '/%s/d' ~/.ssh/authorized_keys",
+ dumpId))
+ .machine(destMachine)
+ .summary("remove private key from authorized_keys")).asTask();
+
+ // The task will fail if copyTask fails, but only after the private key is deleted.
+ DynamicTasks.queue("check for successful copy", new Runnable() {
+ @Override
+ public void run() {
+ copyTask.asTask().getUnchecked();
+ }
+ });
+ }
+
+ private Future<ReplicationSnapshot> getValidReplicationInfo() {
+ try {
+ try {
+ lock.acquire();
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ ReplicationSnapshot replicationSnapshot = getAttributeBlocking(cluster, MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT);
+ if (!isReplicationInfoValid(replicationSnapshot)) {
+ final MySqlNode snapshotNode = getSnapshotNode();
+ final String dumpName = getDumpUniqueId() + ".sql";
+ if (MySqlClusterUtils.IS_MASTER.apply(snapshotNode)) {
+ return createMasterReplicationSnapshot(snapshotNode, dumpName);
+ } else {
+ return createSlaveReplicationSnapshot(snapshotNode, dumpName);
+ }
+ }
+ return ConcurrentUtils.constantFuture(replicationSnapshot);
+ } finally {
+ lock.release();
+ }
+ }
+
+ private Future<ReplicationSnapshot> createMasterReplicationSnapshot(final MySqlNode master, final String dumpName) {
+ log.info("MySql cluster " + cluster + ": generating new replication snapshot on master node " + master + " with name " + dumpName);
+ String dumpOptions = SNAPSHOT_DUMP_OPTIONS + " --master-data=2";
+ ImmutableMap<String, String> params = ImmutableMap.of(
+ ExportDumpEffector.PATH.getName(), dumpName,
+ ExportDumpEffector.ADDITIONAL_OPTIONS.getName(), dumpOptions);
+ DynamicTasks.queue(Effectors.invocation(master, MySqlNode.EXPORT_DUMP, params));
+ return DynamicTasks.queue("get master log info from dump", new Callable<ReplicationSnapshot>() {
+ @Override
+ public ReplicationSnapshot call() throws Exception {
+ Pattern masterInfoPattern = Pattern.compile("CHANGE MASTER TO.*MASTER_LOG_FILE\\s*=\\s*'([^']+)'.*MASTER_LOG_POS\\s*=\\s*(\\d+)");
+ String masterInfo = DynamicTasks.queue(execSshTask(master, "grep -m1 'CHANGE MASTER TO' " + dumpName, "Extract master replication status from dump")
+ .requiringZeroAndReturningStdout()).asTask().getUnchecked();
+ Matcher masterInfoMatcher = masterInfoPattern.matcher(masterInfo);
+ if (!masterInfoMatcher.find() || masterInfoMatcher.groupCount() != 2) {
+ throw new IllegalStateException("Master dump doesn't contain replication info: " + masterInfo);
+ }
+ String masterLogFile = masterInfoMatcher.group(1);
+ int masterLogPosition = Integer.parseInt(masterInfoMatcher.group(2));
+ ReplicationSnapshot replicationSnapshot = new ReplicationSnapshot(master.getId(), dumpName, masterLogFile, masterLogPosition);
+ cluster.sensors().set(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT, replicationSnapshot);
+ return replicationSnapshot;
+ }
+ });
+ }
+
+ private Future<ReplicationSnapshot> createSlaveReplicationSnapshot(final MySqlNode slave, final String dumpName) {
+ MySqlClusterUtils.executeSqlOnNodeAsync(slave, "STOP SLAVE SQL_THREAD;");
+ try {
+ log.info("MySql cluster " + cluster + ": generating new replication snapshot on slave node " + slave + " with name " + dumpName);
+ String dumpOptions = SNAPSHOT_DUMP_OPTIONS;
+ ImmutableMap<String, String> params = ImmutableMap.of(
+ ExportDumpEffector.PATH.getName(), dumpName,
+ ExportDumpEffector.ADDITIONAL_OPTIONS.getName(), dumpOptions);
+ DynamicTasks.queue(Effectors.invocation(slave, MySqlNode.EXPORT_DUMP, params));
+ return DynamicTasks.queue("get master log info from slave", new Callable<ReplicationSnapshot>() {
+ @Override
+ public ReplicationSnapshot call() throws Exception {
+ String slaveStatusRow = slave.executeScript("SHOW SLAVE STATUS \\G");
+ Map<String, String> slaveStatus = MySqlRowParser.parseSingle(slaveStatusRow);
+ String masterLogFile = slaveStatus.get("Relay_Master_Log_File");
+ int masterLogPosition = Integer.parseInt(slaveStatus.get("Exec_Master_Log_Pos"));
+ ReplicationSnapshot replicationSnapshot = new ReplicationSnapshot(slave.getId(), dumpName, masterLogFile, masterLogPosition);
+ cluster.sensors().set(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT, replicationSnapshot);
+ return replicationSnapshot;
+ }
+ });
+ } finally {
+ MySqlClusterUtils.executeSqlOnNodeAsync(slave, "START SLAVE SQL_THREAD;");
+ }
+ }
+
+ private MySqlNode getSnapshotNode() {
+ String snapshotNodeId = cluster.getConfig(MySqlCluster.REPLICATION_PREFERRED_SOURCE);
+ if (snapshotNodeId != null) {
+ Optional<Entity> preferredNode = Iterables.tryFind(cluster.getMembers(), EntityPredicates.idEqualTo(snapshotNodeId));
+ if (preferredNode.isPresent()) {
+ return (MySqlNode) preferredNode.get();
+ } else {
+ log.warn("MySql cluster " + this + " configured with preferred snapshot node " + snapshotNodeId + " but it's not a member. Defaulting to a random slave.");
+ }
+ }
+ return getRandomSlave();
+ }
+
+ private MySqlNode getRandomSlave() {
+ List<MySqlNode> slaves = getHealhtySlaves();
+ if (slaves.size() > 0) {
+ return slaves.get(new Random().nextInt(slaves.size()));
+ } else {
+ return getMaster();
+ }
+ }
+
+ private ImmutableList<MySqlNode> getHealhtySlaves() {
+ return FluentIterable.from(cluster.getMembers())
+ .filter(Predicates.not(MySqlClusterUtils.IS_MASTER))
+ .filter(EntityPredicates.attributeEqualTo(MySqlNode.SERVICE_UP, Boolean.TRUE))
+ .filter(MySqlNode.class)
+ .toList();
+ }
+
+ private boolean isReplicationInfoValid(ReplicationSnapshot replicationSnapshot) {
+ MySqlNode master = getMaster();
+ String dataDir = Strings.nullToEmpty(master.getConfig(MySqlNode.DATA_DIR));
+ if (!checkFileExistsOnEntity(master, Os.mergePathsUnix(dataDir, replicationSnapshot.getBinLogName()))) {
+ return false;
+ }
+ if (replicationSnapshot.getEntityId() != null) {
+ Optional<Entity> snapshotSlave = Iterables.tryFind(cluster.getChildren(), EntityPredicates.idEqualTo(replicationSnapshot.getEntityId()));
+ if (!snapshotSlave.isPresent()) {
+ log.info("MySql cluster " + cluster + " missing node " + replicationSnapshot.getEntityId() + " with last snapshot " + replicationSnapshot.getSnapshotPath() + ". Will generate new snapshot.");
+ return false;
+ }
+ if (!checkFileExistsOnEntity(snapshotSlave.get(), replicationSnapshot.getSnapshotPath())) {
+ log.info("MySql cluster " + cluster + ", node " + snapshotSlave.get() + " missing replication snapshot " + replicationSnapshot.getSnapshotPath() + ". Will generate new snapshot.");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean checkFileExistsOnEntity(Entity entity, String path) {
+ String cmd = BashCommands.chain(
+ BashCommands.requireTest(String.format("-f \"%s\"", path), "File " + path + " doesn't exist."));
+ String summary = "Check if file " + path + " exists";
+ return DynamicTasks.queue(execSshTask(entity, cmd, summary).allowingNonZeroExitCode()).asTask().getUnchecked() == 0;
+ }
+
+ private ProcessTaskFactory<Integer> execSshTask(Entity entity, String cmd, String summary) {
+ SshMachineLocation machine = EffectorTasks.getSshMachine(entity);
+ return SshTasks.newSshExecTaskFactory(machine, "cd $RUN_DIR\n" + cmd)
+ .allowingNonZeroExitCode()
+ .environmentVariable("RUN_DIR", entity.getAttribute(SoftwareProcess.RUN_DIR))
+ .summary(summary);
+ }
+
+ private <T> T getAttributeBlocking(Entity masterNode, AttributeSensor<T> att) {
+ return DynamicTasks.queue(DependentConfiguration.attributeWhenReady(masterNode, att)).getUnchecked();
+ }
+
+ private String getDumpUniqueId() {
+ return "replication-dump-" + Identifiers.makeRandomId(8) + "-" + new SimpleDateFormat("yyyy-MM-dd--HH-mm-ss").format(new Date());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java
index de43951..9ea5ffe 100644
--- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java
@@ -18,7 +18,7 @@
*/
package org.apache.brooklyn.entity.database.mysql;
-import java.util.Collection;
+import java.util.List;
import org.apache.brooklyn.api.catalog.Catalog;
import org.apache.brooklyn.api.entity.ImplementedBy;
@@ -36,11 +36,6 @@ import com.google.common.reflect.TypeToken;
@Catalog(name="MySql Master-Slave cluster", description="Sets up a cluster of MySQL nodes using master-slave relation and binary logging", iconUrl="classpath:///mysql-logo-110x57.png")
public interface MySqlCluster extends DynamicCluster, HasDatastoreUrl {
interface MySqlMaster {
- AttributeSensor<String> MASTER_LOG_FILE = Sensors.newStringSensor(
- "mysql.master.log_file", "The binary log file master is writing to");
- AttributeSensor<Integer> MASTER_LOG_POSITION = Sensors.newIntegerSensor(
- "mysql.master.log_position", "The position in the log file to start replication");
-
ConfigKey<String> MASTER_CREATION_SCRIPT_CONTENTS = ConfigKeys.newStringConfigKey(
"datastore.master.creation.script.contents", "Contents of creation script to initialize the master node after initializing replication");
@@ -52,6 +47,9 @@ public interface MySqlCluster extends DynamicCluster, HasDatastoreUrl {
AttributeSensor<Integer> SLAVE_SECONDS_BEHIND_MASTER = Sensors.newIntegerSensor("mysql.slave.seconds_behind_master", "How many seconds behind master is the replication state on the slave");
}
+ AttributeSensor<ReplicationSnapshot> REPLICATION_LAST_SLAVE_SNAPSHOT = Sensors.newSensor(ReplicationSnapshot.class, "mysql.replication.last_slave_snapshot", "Last valid state to init slaves with");
+ ConfigKey<String> REPLICATION_PREFERRED_SOURCE = ConfigKeys.newStringConfigKey("mysql.replication.preferred_source", "ID of node to get the replication snapshot from. If not set a random slave is used, falling back to master if no slaves.");
+
ConfigKey<String> SLAVE_USERNAME = ConfigKeys.newStringConfigKey(
"mysql.slave.username", "The user name slaves will use to connect to the master", "slave");
ConfigKey<String> SLAVE_REPLICATE_DO_DB = ConfigKeys.newStringConfigKey(
@@ -69,7 +67,7 @@ public interface MySqlCluster extends DynamicCluster, HasDatastoreUrl {
StringAttributeSensorAndConfigKey SLAVE_PASSWORD = new StringAttributeSensorAndConfigKey(
"mysql.slave.password", "The password slaves will use to connect to the master. Will be auto-generated by default.");
@SuppressWarnings("serial")
- AttributeSensor<Collection<String>> SLAVE_DATASTORE_URL_LIST = Sensors.newSensor(new TypeToken<Collection<String>>() {},
+ AttributeSensor<List<String>> SLAVE_DATASTORE_URL_LIST = Sensors.newSensor(new TypeToken<List<String>>() {},
"mysql.slave.datastore.url", "List of all slave's DATASTORE_URL sensors");
AttributeSensor<Double> QUERIES_PER_SECOND_FROM_MYSQL_PER_NODE = Sensors.newDoubleSensor("mysql.queries.perSec.fromMysql.perNode");
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java
index 9cebb21..e6afc18 100644
--- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java
@@ -18,10 +18,10 @@
*/
package org.apache.brooklyn.entity.database.mysql;
-import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
@@ -35,10 +35,8 @@ import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.entity.EntityPredicates;
import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceNotUpLogic;
-import org.apache.brooklyn.core.sensor.DependentConfiguration;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.enricher.stock.Enrichers;
import org.apache.brooklyn.entity.group.DynamicClusterImpl;
@@ -57,7 +55,6 @@ import org.apache.brooklyn.util.time.Duration;
import com.google.common.base.Function;
import com.google.common.base.Functions;
-import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
@@ -67,23 +64,21 @@ import com.google.common.reflect.TypeToken;
// https://dev.mysql.com/doc/refman/5.7/en/replication-howto.html
-// TODO Bootstrap slave from dump for the case where the binary log is purged
-// TODO Promote slave to master
+// TODO Filter dump by database/table, currently all tables are replicated
// TODO SSL connection between master and slave
-// TODO DB credentials littered all over the place in file system
+// TODO Promote slave to master
public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster {
private static final AttributeSensor<Boolean> NODE_REPLICATION_INITIALIZED = Sensors.newBooleanSensor("mysql.replication_initialized");
private static final String MASTER_CONFIG_URL = "classpath:///org/apache/brooklyn/entity/database/mysql/mysql_master.conf";
private static final String SLAVE_CONFIG_URL = "classpath:///org/apache/brooklyn/entity/database/mysql/mysql_slave.conf";
- private static final int MASTER_SERVER_ID = 1;
- private static final Predicate<Entity> IS_MASTER = EntityPredicates.configEqualTo(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID);
+ protected static final int MASTER_SERVER_ID = 1;
@SuppressWarnings("serial")
private static final AttributeSensor<Supplier<Integer>> SLAVE_NEXT_SERVER_ID = Sensors.newSensor(new TypeToken<Supplier<Integer>>() {},
"mysql.slave.next_server_id", "Returns the ID of the next slave server");
@SuppressWarnings("serial")
- private static final AttributeSensor<Map<String, String>> SLAVE_ID_ADDRESS_MAPPING = Sensors.newSensor(new TypeToken<Map<String, String>>() {},
+ protected static final AttributeSensor<Map<String, String>> SLAVE_ID_ADDRESS_MAPPING = Sensors.newSensor(new TypeToken<Map<String, String>>() {},
"mysql.slave.id_address_mapping", "Maps slave entity IDs to SUBNET_ADDRESS, so the address is known at member remove time.");
@Override
@@ -111,6 +106,7 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
subscriptions().subscribe(this, MEMBER_REMOVED, new MemberRemovedListener());
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
protected void initEnrichers() {
super.initEnrichers();
@@ -124,8 +120,8 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
enrichers().add(Enrichers.builder()
.aggregating(MySqlNode.DATASTORE_URL)
.publishing(SLAVE_DATASTORE_URL_LIST)
- .computing(Functions.<Collection<String>>identity())
- .entityFilter(Predicates.not(IS_MASTER))
+ .computing((Function)Functions.identity())
+ .entityFilter(Predicates.not(MySqlClusterUtils.IS_MASTER))
.fromMembers()
.build());
@@ -145,7 +141,7 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
.computing(IfFunctions.ifPredicate(CollectionFunctionals.notEmpty())
.apply(CollectionFunctionals.firstElement())
.defaultValue(null))
- .entityFilter(IS_MASTER)
+ .entityFilter(MySqlClusterUtils.IS_MASTER)
.build());
}
@@ -158,13 +154,7 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
final EntitySpec<?> memberSpec = super.getMemberSpec();
if (memberSpec != null) {
- if (!isKeyConfigured(memberSpec, MySqlNode.TEMPLATE_CONFIGURATION_URL.getConfigKey())) {
- return EntitySpec.create(memberSpec)
- .configure(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID)
- .configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, MASTER_CONFIG_URL);
- } else {
- return memberSpec;
- }
+ return applyDefaults(memberSpec, Suppliers.ofInstance(MASTER_SERVER_ID), MASTER_CONFIG_URL);
}
return EntitySpec.create(MySqlNode.class)
@@ -184,6 +174,7 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
return EntitySpec.create(MySqlNode.class)
.displayName("MySql Slave")
+ // Slave server IDs will not be linear because getMemberSpec not always results in createNode (result discarded)
.configure(MySqlNode.MYSQL_SERVER_ID, serverIdSupplier.get())
.configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, SLAVE_CONFIG_URL);
}
@@ -211,9 +202,10 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
@Override
protected Entity createNode(Location loc, Map<?, ?> flags) {
- Entity node = super.createNode(loc, flags);
- if (!IS_MASTER.apply(node)) {
- ServiceNotUpLogic.updateNotUpIndicator((EntityLocal)node, MySqlSlave.SLAVE_HEALTHY, "Replication not started");
+ MySqlNode node = (MySqlNode) super.createNode(loc, flags);
+ if (!MySqlClusterUtils.IS_MASTER.apply(node)) {
+ EntityLocal localNode = (EntityLocal) node;
+ ServiceNotUpLogic.updateNotUpIndicator(localNode, MySqlSlave.SLAVE_HEALTHY, "Replication not started");
addFeed(FunctionFeed.builder()
.entity((EntityLocal)node)
@@ -221,7 +213,7 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
.poll(FunctionPollConfig.forSensor(MySqlSlave.SLAVE_HEALTHY)
.callable(new SlaveStateCallable(node))
.checkSuccess(StringPredicates.isNonBlank())
- .onSuccess(new SlaveStateParser(node))
+ .onSuccess(new SlaveStateParser(localNode))
.setOnFailure(false)
.description("Polls SHOW SLAVE STATUS"))
.build());
@@ -235,15 +227,15 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
}
public static class SlaveStateCallable implements Callable<String> {
- private Entity slave;
- public SlaveStateCallable(Entity slave) {
+ private MySqlNode slave;
+ public SlaveStateCallable(MySqlNode slave) {
this.slave = slave;
}
@Override
public String call() throws Exception {
if (Boolean.TRUE.equals(slave.getAttribute(MySqlNode.SERVICE_PROCESS_IS_RUNNING))) {
- return slave.invoke(MySqlNode.EXECUTE_SCRIPT, ImmutableMap.of("commands", "SHOW SLAVE STATUS \\G")).asTask().getUnchecked();
+ return MySqlClusterUtils.executeSqlOnNode(slave, "SHOW SLAVE STATUS \\G");
} else {
return null;
}
@@ -252,9 +244,9 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
}
public static class SlaveStateParser implements Function<String, Boolean> {
- private Entity slave;
+ private EntityLocal slave;
- public SlaveStateParser(Entity slave) {
+ public SlaveStateParser(EntityLocal slave) {
this.slave = slave;
}
@@ -281,38 +273,67 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
// ============= Member Init =============
- // The task is executed in inessential context (event handler) so
- // not visible in tasks UI. Better make it visible so the user can
- // see failures, currently accessible only from logs.
- private static final class InitReplicationTask implements Runnable {
- private final MySqlCluster cluster;
- private final MySqlNode node;
+ // The task is executed separately from the start effector, so failing here
+ // will not fail the start effector as well, but it will eventually time out
+ // because replication is not started.
+ // Would be nice to be able to plug in to the entity lifecycle!
- private InitReplicationTask(MySqlCluster cluster, MySqlNode node) {
+ private static final class NodeRunningListener implements SensorEventListener<Boolean> {
+ private MySqlCluster cluster;
+ private Semaphore lock = new Semaphore(1);
+
+ public NodeRunningListener(MySqlCluster cluster) {
this.cluster = cluster;
- this.node = node;
}
@Override
- public void run() {
- Integer serverId = node.getConfig(MySqlNode.MYSQL_SERVER_ID);
- if (serverId == MASTER_SERVER_ID) {
- initMaster(node);
- } else if (serverId > MASTER_SERVER_ID) {
- initSlave(node);
+ public void onEvent(SensorEvent<Boolean> event) {
+ final MySqlNode node = (MySqlNode) event.getSource();
+ if (Boolean.TRUE.equals(event.getValue()) &&
+ // We are interested in SERVICE_PROCESS_IS_RUNNING only while haven't come online yet.
+ // Probably will get several updates while replication is initialized so an additional
+ // check is needed whether we have already seen this.
+ Boolean.FALSE.equals(node.getAttribute(MySqlNode.SERVICE_UP)) &&
+ !Boolean.TRUE.equals(node.getAttribute(NODE_REPLICATION_INITIALIZED))) {
+
+ // Events executed sequentially so no need to synchronize here.
+ ((EntityLocal)node).setAttribute(NODE_REPLICATION_INITIALIZED, Boolean.TRUE);
+
+ final Runnable nodeInitTaskBody;
+ if (MySqlClusterUtils.IS_MASTER.apply(node)) {
+ nodeInitTaskBody = new InitMasterTaskBody(cluster, node);
+ } else {
+ nodeInitTaskBody = new InitSlaveTaskBody(cluster, node, lock);
+ }
+
+ DynamicTasks.submitTopLevelTask(TaskBuilder.builder()
+ .displayName("setup master-slave replication")
+ .body(nodeInitTaskBody)
+ .tag(BrooklynTaskTags.tagForContextEntity(node))
+ .tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG)
+ .build(),
+ node);
}
}
- private void initMaster(MySqlNode master) {
- String binLogInfo = executeScriptOnNode(master, "FLUSH TABLES WITH READ LOCK;SHOW MASTER STATUS \\G UNLOCK TABLES;");
+ }
+
+ private static class InitMasterTaskBody implements Runnable {
+ private MySqlNode master;
+ private MySqlCluster cluster;
+ public InitMasterTaskBody(MySqlCluster cluster, MySqlNode master) {
+ this.cluster = cluster;
+ this.master = master;
+ }
+
+ @Override
+ public void run() {
+ String binLogInfo = MySqlClusterUtils.executeSqlOnNode(master, "FLUSH TABLES WITH READ LOCK;SHOW MASTER STATUS \\G UNLOCK TABLES;");
Map<String, String> status = MySqlRowParser.parseSingle(binLogInfo);
String file = status.get("File");
- if (file != null) {
- ((EntityInternal)master).sensors().set(MySqlMaster.MASTER_LOG_FILE, file);
- }
String position = status.get("Position");
- if (position != null) {
- ((EntityInternal)master).sensors().set(MySqlMaster.MASTER_LOG_POSITION, new Integer(position));
+ if (file != null && position != null) {
+ cluster.sensors().set(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT, new ReplicationSnapshot(null, null, file, Integer.parseInt(position)));
}
//NOTE: Will be executed on each start, analogously to the standard CREATION_SCRIPT config
@@ -331,71 +352,6 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
return contents;
return null;
}
-
- private void initSlave(MySqlNode slave) {
- MySqlNode master = (MySqlNode) Iterables.find(cluster.getMembers(), IS_MASTER);
- String masterLogFile = validateSqlParam(getAttributeBlocking(master, MySqlMaster.MASTER_LOG_FILE));
- Integer masterLogPos = getAttributeBlocking(master, MySqlMaster.MASTER_LOG_POSITION);
- String masterAddress = validateSqlParam(master.getAttribute(MySqlNode.SUBNET_ADDRESS));
- Integer masterPort = master.getAttribute(MySqlNode.MYSQL_PORT);
- String slaveAddress = validateSqlParam(slave.getAttribute(MySqlNode.SUBNET_ADDRESS));
- String username = validateSqlParam(cluster.getConfig(SLAVE_USERNAME));
- String password = validateSqlParam(cluster.getAttribute(SLAVE_PASSWORD));
-
- executeScriptOnNode(master, String.format(
- "CREATE USER '%s'@'%s' IDENTIFIED BY '%s';\n" +
- "GRANT REPLICATION SLAVE ON *.* TO '%s'@'%s';\n",
- username, slaveAddress, password, username, slaveAddress));
-
- String slaveCmd = String.format(
- "CHANGE MASTER TO " +
- "MASTER_HOST='%s', " +
- "MASTER_PORT=%d, " +
- "MASTER_USER='%s', " +
- "MASTER_PASSWORD='%s', " +
- "MASTER_LOG_FILE='%s', " +
- "MASTER_LOG_POS=%d;\n" +
- "START SLAVE;\n",
- masterAddress, masterPort, username, password, masterLogFile, masterLogPos);
- executeScriptOnNode(slave, slaveCmd);
-
- cluster.getAttribute(SLAVE_ID_ADDRESS_MAPPING).put(slave.getId(), slave.getAttribute(MySqlNode.SUBNET_ADDRESS));
- }
-
- private <T> T getAttributeBlocking(Entity masterNode, AttributeSensor<T> att) {
- return DynamicTasks.queue(DependentConfiguration.attributeWhenReady(masterNode, att)).getUnchecked();
- }
-
- }
-
- private static final class NodeRunningListener implements SensorEventListener<Boolean> {
- private MySqlCluster cluster;
-
- public NodeRunningListener(MySqlCluster cluster) {
- this.cluster = cluster;
- }
-
- @Override
- public void onEvent(SensorEvent<Boolean> event) {
- final MySqlNode node = (MySqlNode) event.getSource();
- if (Boolean.TRUE.equals(event.getValue()) &&
- // We are interested in SERVICE_PROCESS_IS_RUNNING only while haven't come online yet.
- // Probably will get several updates while replication is initialized so an additional
- // check is needed whether we have already seen this.
- Boolean.FALSE.equals(node.getAttribute(MySqlNode.SERVICE_UP)) &&
- !Boolean.TRUE.equals(node.getAttribute(NODE_REPLICATION_INITIALIZED))) {
-
- // Events executed sequentially so no need to synchronize here.
- node.sensors().set(NODE_REPLICATION_INITIALIZED, Boolean.TRUE);
-
- DynamicTasks.queueIfPossible(TaskBuilder.builder()
- .displayName("Configure master-slave replication on node")
- .body(new InitReplicationTask(cluster, node))
- .build())
- .orSubmitAsync(node);
- }
- }
-
}
// ============= Member Remove =============
@@ -407,46 +363,12 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster
Entity node = event.getValue();
String slaveAddress = cluster.getAttribute(SLAVE_ID_ADDRESS_MAPPING).remove(node.getId());
if (slaveAddress != null) {
- DynamicTasks.queueIfPossible(TaskBuilder.builder()
- .displayName("Remove slave access")
- .body(new RemoveSlaveConfigTask(cluster, slaveAddress))
- .build())
- .orSubmitAsync(cluster);
+ // Could already be gone if stopping the entire app - let it throw an exception
+ MySqlNode master = (MySqlNode) Iterables.find(cluster.getMembers(), MySqlClusterUtils.IS_MASTER);
+ String username = MySqlClusterUtils.validateSqlParam(cluster.getConfig(SLAVE_USERNAME));
+ MySqlClusterUtils.executeSqlOnNodeAsync(master, String.format("DROP USER '%s'@'%s';", username, slaveAddress));
}
}
}
- public class RemoveSlaveConfigTask implements Runnable {
- private MySqlCluster cluster;
- private String slaveAddress;
-
- public RemoveSlaveConfigTask(MySqlCluster cluster, String slaveAddress) {
- this.cluster = cluster;
- this.slaveAddress = validateSqlParam(slaveAddress);
- }
-
- @Override
- public void run() {
- // Could already be gone if stopping the entire app - let it throw an exception
- MySqlNode master = (MySqlNode) Iterables.find(cluster.getMembers(), IS_MASTER);
- String username = validateSqlParam(cluster.getConfig(SLAVE_USERNAME));
- executeScriptOnNode(master, String.format("DROP USER '%s'@'%s';", username, slaveAddress));
- }
-
- }
-
- // Can't call node.executeScript directly, need to change execution context, so use an effector task
- private static String executeScriptOnNode(MySqlNode node, String commands) {
- return node.invoke(MySqlNode.EXECUTE_SCRIPT, ImmutableMap.of(MySqlNode.EXECUTE_SCRIPT_COMMANDS, commands)).getUnchecked();
- }
-
- private static String validateSqlParam(String config) {
- // Don't go into escape madness, just deny any suspicious strings.
- // Would be nice to use prepared statements, but not worth pulling in the extra dependencies.
- if (config.contains("'") && config.contains("\\")) {
- throw new IllegalStateException("User provided string contains illegal SQL characters: " + config);
- }
- return config;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterUtils.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterUtils.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterUtils.java
new file mode 100644
index 0000000..9f8dc6d
--- /dev/null
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.database.mysql;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.entity.EntityPredicates;
+import org.apache.brooklyn.entity.database.DatastoreMixins.CanExecuteScript;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+
+public class MySqlClusterUtils {
+ protected static final Predicate<Entity> IS_MASTER = EntityPredicates.configEqualTo(MySqlNode.MYSQL_SERVER_ID, MySqlClusterImpl.MASTER_SERVER_ID);
+
+ protected static String executeSqlOnNode(MySqlNode node, String commands) {
+ return executeSqlOnNodeAsync(node, commands).getUnchecked();
+ }
+
+ // Can't call node.executeScript directly, need to change execution context, so use an effector task
+ protected static Task<String> executeSqlOnNodeAsync(MySqlNode node, String commands) {
+ return DynamicTasks.queue(Effectors.invocation(node, MySqlNode.EXECUTE_SCRIPT, ImmutableMap.of(CanExecuteScript.COMMANDS.getName(), commands))).asTask();
+ }
+
+ protected static String validateSqlParam(String config) {
+ // Don't go into escape madness, just deny any suspicious strings.
+ // Would be nice to use prepared statements, but not worth pulling in the extra dependencies.
+ if (config.contains("'") && config.contains("\\")) {
+ throw new IllegalStateException("User provided string contains illegal SQL characters: " + config);
+ }
+ return config;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java
index 461369b..b4da5f9 100644
--- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java
@@ -25,6 +25,9 @@ import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
* The {@link SoftwareProcessDriver} for MySQL.
*/
public interface MySqlDriver extends SoftwareProcessDriver {
- public String getStatusCmd();
- public ProcessTaskWrapper<Integer> executeScriptAsync(String commands);
+ String getStatusCmd();
+ ProcessTaskWrapper<Integer> executeScriptAsync(String commands);
+ ProcessTaskWrapper<Integer> executeScriptFromInstalledFileAsync(String filenameAlreadyInstalledAtServer);
+ ProcessTaskWrapper<Integer> dumpDatabase(String additionalOptions, String dumpDestination);
+ void changePassword(String oldPass, String newPass);
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java
index 484606e..7f9e508 100644
--- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java
@@ -19,21 +19,21 @@
package org.apache.brooklyn.entity.database.mysql;
import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.effector.Effector;
import org.apache.brooklyn.api.entity.ImplementedBy;
import org.apache.brooklyn.api.objs.HasShortName;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.annotation.Effector;
import org.apache.brooklyn.core.annotation.EffectorParam;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.config.MapConfigKey;
-import org.apache.brooklyn.core.effector.MethodEffector;
+import org.apache.brooklyn.core.effector.Effectors;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.location.PortRanges;
import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey;
+import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey;
import org.apache.brooklyn.core.sensor.PortAttributeSensorAndConfigKey;
import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey;
import org.apache.brooklyn.entity.database.DatastoreMixins.DatastoreCommon;
import org.apache.brooklyn.entity.software.base.SoftwareProcess;
import org.apache.brooklyn.util.core.flags.SetFromFlag;
@@ -86,10 +86,39 @@ public interface MySqlNode extends SoftwareProcess, HasShortName, DatastoreCommo
AttributeSensor<Double> QUERIES_PER_SECOND_FROM_MYSQL = Sensors.newDoubleSensor("mysql.queries.perSec.fromMysql");
- MethodEffector<String> EXECUTE_SCRIPT = new MethodEffector<String>(MySqlNode.class, "executeScript");
- String EXECUTE_SCRIPT_COMMANDS = "commands";
+ interface ExportDumpEffector {
+ ConfigKey<String> PATH = ConfigKeys.newStringConfigKey("path", "Where to export the dump to. Resolved against runtime directory if relative.", "dump.sql");
+ ConfigKey<String> ADDITIONAL_OPTIONS = ConfigKeys.newStringConfigKey("additionalOptions", "Additional command line options to pass to mysqldump");
+
+ Effector<Void> EXPORT_DUMP = Effectors.effector(Void.class, "export_dump")
+ .description("Invokes mysqldump against the node")
+ .parameter(PATH)
+ .parameter(ADDITIONAL_OPTIONS)
+ .buildAbstract();
+ }
+ Effector<Void> EXPORT_DUMP = ExportDumpEffector.EXPORT_DUMP;
+
+ interface ImportDumpEffector {
+ ConfigKey<String> PATH = ConfigKeys.newStringConfigKey("path", "Path to a file with SQL statements to import as the root user");
+
+ Effector<Void> IMPORT_DUMP = Effectors.effector(Void.class, "import_dump")
+ .description("Runs the sql statements in the file as the root user")
+ .parameter(PATH)
+ .buildAbstract();
+ }
+ Effector<Void> IMPORT_DUMP = ImportDumpEffector.IMPORT_DUMP;
+
+ interface ChangePasswordEffector {
+ ConfigKey<String> PASSWORD = ConfigKeys.newStringConfigKey("password", "New password to set");
+
+ Effector<Void> CHANGE_PASSWORD = Effectors.effector(Void.class, "change_password")
+ .description("Change the mysql root password")
+ .parameter(PASSWORD)
+ .buildAbstract();
+ }
+ Effector<Void> CHANGE_PASSWORD = ChangePasswordEffector.CHANGE_PASSWORD;
- @Effector(description = "Execute SQL script on the node as the root user")
- String executeScript(@EffectorParam(name=EXECUTE_SCRIPT_COMMANDS) String commands);
+ @org.apache.brooklyn.core.annotation.Effector(description = "Execute SQL script on the node as the root user")
+ public String executeScript(@EffectorParam(name="commands") String commands);
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeEffectors.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeEffectors.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeEffectors.java
new file mode 100644
index 0000000..af68959
--- /dev/null
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeEffectors.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.database.mysql;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.core.effector.EffectorBody;
+import org.apache.brooklyn.core.effector.EffectorTasks;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.entity.database.mysql.MySqlNode.ChangePasswordEffector;
+import org.apache.brooklyn.entity.database.mysql.MySqlNode.ExportDumpEffector;
+import org.apache.brooklyn.entity.database.mysql.MySqlNode.ImportDumpEffector;
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.ssh.SshTasks;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+public class MySqlNodeEffectors {
+ public static class ExportDumpEffectoryBody extends EffectorBody<Void> implements ExportDumpEffector {
+ @Override
+ public Void call(ConfigBag parameters) {
+ String path = parameters.get(PATH);
+ String additionalOptions = Strings.nullToEmpty(parameters.get(ADDITIONAL_OPTIONS));
+ //TODO additionalOptions, path are not sanitized and are coming from the user.
+ //Should we try to sanitize (potentially limiting the range of possible inputs),
+ //or just assume the user has full machine access anyway?
+ ((MySqlNodeImpl)entity()).getDriver().dumpDatabase(additionalOptions, path);
+ return null;
+ }
+ }
+ public static Effector<Void> EXPORT_DUMP = Effectors.effector(ExportDumpEffector.EXPORT_DUMP)
+ .impl(new ExportDumpEffectoryBody())
+ .build();
+
+ public static class ImportDumpEffectorBody extends EffectorBody<Void> implements ImportDumpEffector {
+ @Override
+ public Void call(ConfigBag parameters) {
+ String path = Preconditions.checkNotNull(parameters.get(PATH), "path is required");
+ // TODO sanitize path?
+ ((MySqlNodeImpl)entity()).getDriver().executeScriptFromInstalledFileAsync(path);
+ return null;
+ }
+ }
+ public static Effector<Void> IMPORT_DUMP = Effectors.effector(ImportDumpEffector.IMPORT_DUMP)
+ .impl(new ImportDumpEffectorBody())
+ .build();
+
+ public static class ChangePasswordEffectorBody extends EffectorBody<Void> implements ChangePasswordEffector {
+ @Override
+ public Void call(ConfigBag parameters) {
+ String newPass = Preconditions.checkNotNull(parameters.get(PASSWORD), "password is required");
+ String oldPass = entity().getAttribute(MySqlNode.PASSWORD);
+ entity().sensors().set(MySqlNode.PASSWORD, newPass);
+ MySqlDriver driver = ((MySqlNodeImpl)entity()).getDriver();
+ driver.changePassword(oldPass, newPass);
+ SshMachineLocation machine = EffectorTasks.getSshMachine(entity());
+ DynamicTasks.queue(
+ SshTasks.newSshExecTaskFactory(machine,
+ "cd "+entity().getAttribute(MySqlNode.RUN_DIR),
+ "sed -i'' -e 's@^\\(\\s*password\\s*=\\s*\\).*$@\\1" + newPass.replace("\\", "\\\\") + "@g' mymysql.cnf")
+ .requiringExitCodeZero()
+ .summary("Change root password"));
+ return null;
+ }
+ }
+ public static Effector<Void> CHANGE_PASSWORD = Effectors.effector(ChangePasswordEffector.CHANGE_PASSWORD)
+ .impl(new ChangePasswordEffectorBody())
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java
index dd0aac7..f470390 100644
--- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java
@@ -27,8 +27,6 @@ import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl;
import org.apache.brooklyn.feed.ssh.SshFeed;
import org.apache.brooklyn.feed.ssh.SshPollConfig;
import org.apache.brooklyn.feed.ssh.SshPollValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.brooklyn.location.ssh.SshMachineLocation;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.config.ConfigBag;
@@ -36,6 +34,8 @@ import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
@@ -79,6 +79,9 @@ public class MySqlNodeImpl extends SoftwareProcessImpl implements MySqlNode {
return executeScript((String)parameters.getStringKey("commands"));
}
});
+ getMutableEntityType().addEffector(MySqlNodeEffectors.EXPORT_DUMP);
+ getMutableEntityType().addEffector(MySqlNodeEffectors.IMPORT_DUMP);
+ getMutableEntityType().addEffector(MySqlNodeEffectors.CHANGE_PASSWORD);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java
index 30d9abd..01ee983 100644
--- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java
@@ -33,18 +33,18 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.entity.database.DatastoreMixins;
-import org.apache.brooklyn.entity.software.base.AbstractSoftwareProcessSshDriver;
import org.apache.brooklyn.api.location.OsDetails;
+import org.apache.brooklyn.core.effector.EffectorTasks;
import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.location.BasicOsDetails.OsVersions;
+import org.apache.brooklyn.entity.database.DatastoreMixins;
+import org.apache.brooklyn.entity.software.base.AbstractSoftwareProcessSshDriver;
import org.apache.brooklyn.location.ssh.SshMachineLocation;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.ssh.SshTasks;
import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.io.FileUtil;
@@ -57,6 +57,8 @@ import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.CountdownTimer;
import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
@@ -165,11 +167,7 @@ public class MySqlSshDriver extends AbstractSoftwareProcessSshDriver implements
boolean hasCreationScript = copyDatabaseCreationScript();
timer.waitForExpiryUnchecked();
- DynamicTasks.queue(
- SshEffectorTasks.ssh(
- "cd "+getRunDir(),
- getBaseDir()+"/bin/mysqladmin --defaults-file="+getConfigFile()+" --password= password "+getPassword()
- ).summary("setting password"));
+ changePassword("", getPassword());
if (hasCreationScript)
executeScriptFromInstalledFileAsync("creation-script.sql").asTask().getUnchecked();
@@ -179,6 +177,16 @@ public class MySqlSshDriver extends AbstractSoftwareProcessSshDriver implements
stop();
}
+ @Override
+ public void changePassword(String oldPass, String newPass) {
+ DynamicTasks.queue(
+ SshEffectorTasks.ssh(
+ "cd "+getRunDir(),
+ getBaseDir()+"/bin/mysqladmin --defaults-file="+getConfigFile()+" --password=" + oldPass + " password "+newPass)
+ .summary("setting password")
+ .requiringExitCodeZero());
+ }
+
protected void copyDatabaseConfigScript() {
newScript(CUSTOMIZING).execute(); //create the directory
@@ -264,13 +272,26 @@ public class MySqlSshDriver extends AbstractSoftwareProcessSshDriver implements
return executeScriptFromInstalledFileAsync(filename);
}
+ @Override
public ProcessTaskWrapper<Integer> executeScriptFromInstalledFileAsync(String filenameAlreadyInstalledAtServer) {
+ SshMachineLocation machine = EffectorTasks.getSshMachine(entity);
return DynamicTasks.queue(
- SshEffectorTasks.ssh(
+ SshTasks.newSshExecTaskFactory(machine,
"cd "+getRunDir(),
getBaseDir()+"/bin/mysql --defaults-file="+getConfigFile()+" < "+filenameAlreadyInstalledAtServer)
.requiringExitCodeZero()
.summary("executing datastore script "+filenameAlreadyInstalledAtServer));
}
+ @Override
+ public ProcessTaskWrapper<Integer> dumpDatabase(String additionalOptions, String dumpDestination) {
+ SshMachineLocation machine = EffectorTasks.getSshMachine(entity);
+ return DynamicTasks.queue(
+ SshTasks.newSshExecTaskFactory(machine,
+ "cd "+getRunDir(),
+ getBaseDir()+"/bin/mysqldump --defaults-file="+getConfigFile()+" "+additionalOptions+" > "+dumpDestination)
+ .requiringExitCodeZero()
+ .summary("Dumping database to " + dumpDestination));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/ReplicationSnapshot.java
----------------------------------------------------------------------
diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/ReplicationSnapshot.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/ReplicationSnapshot.java
new file mode 100644
index 0000000..48af15a
--- /dev/null
+++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/ReplicationSnapshot.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.database.mysql;
+
+public class ReplicationSnapshot {
+ private String entityId;
+ private String snapshotPath;
+ private String binLogName;
+ private int binLogPosition;
+
+ public ReplicationSnapshot(String entityId, String snapshotPath, String binLogName, int binLogPosition) {
+ this.entityId = entityId;
+ this.snapshotPath = snapshotPath;
+ this.binLogName = binLogName;
+ this.binLogPosition = binLogPosition;
+ }
+
+ public String getEntityId() {
+ return entityId;
+ }
+ public void setEntityId(String entityId) {
+ this.entityId = entityId;
+ }
+ public String getSnapshotPath() {
+ return snapshotPath;
+ }
+ public void setSnapshotPath(String snapshotPath) {
+ this.snapshotPath = snapshotPath;
+ }
+ public String getBinLogName() {
+ return binLogName;
+ }
+ public void setBinLogName(String binLogName) {
+ this.binLogName = binLogName;
+ }
+ public int getBinLogPosition() {
+ return binLogPosition;
+ }
+ public void setBinLogPosition(int binLogPosition) {
+ this.binLogPosition = binLogPosition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/test/java/org/apache/brooklyn/entity/database/VogellaExampleAccess.java
----------------------------------------------------------------------
diff --git a/software/database/src/test/java/org/apache/brooklyn/entity/database/VogellaExampleAccess.java b/software/database/src/test/java/org/apache/brooklyn/entity/database/VogellaExampleAccess.java
index 6b5a6cb..e1874b9 100644
--- a/software/database/src/test/java/org/apache/brooklyn/entity/database/VogellaExampleAccess.java
+++ b/software/database/src/test/java/org/apache/brooklyn/entity/database/VogellaExampleAccess.java
@@ -123,11 +123,11 @@ public class VogellaExampleAccess {
private void writeMetaData(ResultSet resultSet) throws SQLException {
// Get some metadata from the database
- log.info("The columns in the table are: ");
+ log.debug("The columns in the table are: ");
- log.info("Table: " + resultSet.getMetaData().getTableName(1));
+ log.debug("Table: " + resultSet.getMetaData().getTableName(1));
for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
- log.info("Column " + i + " " + resultSet.getMetaData().getColumnName(i));
+ log.debug("Column " + i + " " + resultSet.getMetaData().getColumnName(i));
}
}
@@ -138,11 +138,11 @@ public class VogellaExampleAccess {
String date = row.get(2);
String summary = row.get(3);
String comment = row.get(4);
- log.info("User: " + user);
- log.info("Website: " + website);
- log.info("Summary: " + summary);
- log.info("Date: " + date);
- log.info("Comment: " + comment);
+ log.debug("User: " + user);
+ log.debug("Website: " + website);
+ log.debug("Summary: " + summary);
+ log.debug("Date: " + date);
+ log.debug("Comment: " + comment);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/test/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/database/src/test/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterIntegrationTest.java b/software/database/src/test/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterIntegrationTest.java
index c5e12d5..c250843 100644
--- a/software/database/src/test/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterIntegrationTest.java
+++ b/software/database/src/test/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterIntegrationTest.java
@@ -18,28 +18,43 @@
*/
package org.apache.brooklyn.entity.database.mysql;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+
+import java.util.Map;
+
import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.core.effector.EffectorTasks;
+import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks;
+import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.test.BrooklynAppLiveTestSupport;
+import org.apache.brooklyn.entity.database.mysql.MySqlCluster.MySqlMaster;
+import org.apache.brooklyn.entity.software.base.SoftwareProcess;
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.task.ssh.SshTasks;
import org.apache.brooklyn.util.os.Os;
+import org.apache.brooklyn.util.ssh.BashCommands;
import org.testng.annotations.Test;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
public class MySqlClusterIntegrationTest extends BrooklynAppLiveTestSupport {
- @Test(groups = {"Integration"})
+ private static final String TEST_LOCATION = "localhost";
+
+ @Test(groups="Integration")
public void testAllNodesInit() throws Exception {
try {
MySqlClusterTestHelper.test(app, getLocation());
} finally {
- for (Entity member : Iterables.getOnlyElement(app.getChildren()).getChildren()) {
- String runDir = member.getAttribute(MySqlNode.RUN_DIR);
- if (runDir != null) {
- Os.deleteRecursively(runDir);
- }
- }
+ cleanData();
}
}
@@ -48,16 +63,115 @@ public class MySqlClusterIntegrationTest extends BrooklynAppLiveTestSupport {
try {
MySqlClusterTestHelper.testMasterInit(app, getLocation());
} finally {
- for (Entity member : Iterables.getOnlyElement(app.getChildren()).getChildren()) {
- String runDir = member.getAttribute(MySqlNode.RUN_DIR);
- if (runDir != null) {
- Os.deleteRecursively(runDir);
+ cleanData();
+ }
+ }
+
+ @Test(groups="Integration")
+ public void testDumpReplication() throws Exception {
+ try {
+ Location loc = getLocation();
+ EntitySpec<MySqlCluster> clusterSpec = EntitySpec.create(MySqlCluster.class)
+ .configure(MySqlMaster.MASTER_CREATION_SCRIPT_CONTENTS, MySqlClusterTestHelper.CREATION_SCRIPT)
+ .configure(MySqlNode.MYSQL_SERVER_CONF, MutableMap.<String, Object>of("skip-name-resolve",""));
+ MySqlCluster cluster = MySqlClusterTestHelper.initCluster(app, loc, clusterSpec);
+ MySqlNode master = (MySqlNode) cluster.getAttribute(MySqlCluster.FIRST);
+ purgeLogs(cluster, master);
+
+ // test dump replication from master
+ MySqlNode slave = (MySqlNode) Iterables.getOnlyElement(cluster.invoke(MySqlCluster.RESIZE_BY_DELTA, ImmutableMap.of("delta", 1)).getUnchecked());
+ assertEquals(cluster.getAttribute(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT).getEntityId(), master.getId());
+ MySqlClusterTestHelper.assertReplication(master, slave);
+
+ // test dump replication from slave, missing dump on node
+ deleteSnapshot(cluster);
+ cluster.config().set(MySqlCluster.REPLICATION_PREFERRED_SOURCE, slave.getId());
+ MySqlNode secondSlave = (MySqlNode) Iterables.getOnlyElement(cluster.invoke(MySqlCluster.RESIZE_BY_DELTA, ImmutableMap.of("delta", 1)).getUnchecked());
+ assertEquals(cluster.getAttribute(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT).getEntityId(), slave.getId());
+ MySqlClusterTestHelper.assertReplication(master, secondSlave);
+
+ // test dump replication from slave, missing snapshot entity
+ Entities.destroy(slave);
+ cluster.config().set(MySqlCluster.REPLICATION_PREFERRED_SOURCE, secondSlave.getId());
+ MySqlNode thirdSlave = (MySqlNode) Iterables.getOnlyElement(cluster.invoke(MySqlCluster.RESIZE_BY_DELTA, ImmutableMap.of("delta", 1)).getUnchecked());
+ assertEquals(cluster.getAttribute(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT).getEntityId(), secondSlave.getId());
+ MySqlClusterTestHelper.assertReplication(master, thirdSlave);
+ } finally {
+ cleanData();
+ }
+ }
+
+ private void deleteSnapshot(MySqlCluster cluster) {
+ ReplicationSnapshot replicationSnapshot = cluster.getAttribute(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT);
+ Entity snapshotEntity = mgmt.getEntityManager().getEntity(replicationSnapshot.getEntityId());
+ SshMachineLocation machine = EffectorTasks.getSshMachine(snapshotEntity);
+ Entities.submit(snapshotEntity, SshEffectorTasks.ssh(
+ "cd $RUN_DIR",
+ "rm " + replicationSnapshot.getSnapshotPath())
+ .summary("clear snapshot")
+ .machine(machine)
+ .environmentVariable("RUN_DIR", snapshotEntity.getAttribute(MySqlNode.RUN_DIR))
+ .requiringExitCodeZero())
+ .asTask()
+ .getUnchecked();
+ }
+
+ private void purgeLogs(MySqlCluster cluster, MySqlNode master) {
+ String preFlushBinaryLogFile = getBinaryLogFile(master);
+ ReplicationSnapshot replicationSnapshot = master.getParent().getAttribute(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT);
+ assertEquals(preFlushBinaryLogFile, replicationSnapshot.getBinLogName());
+ MySqlClusterTestHelper.execSql(master, "FLUSH LOGS");
+ String postFlushBinaryLogFile = getBinaryLogFile(master);
+ waitSlavesCatchUp(cluster, postFlushBinaryLogFile);
+ assertNotEquals(postFlushBinaryLogFile, preFlushBinaryLogFile);
+ MySqlClusterTestHelper.execSql(master, "PURGE BINARY LOGS TO '" + postFlushBinaryLogFile + "';");
+ assertFalse(fileExists(master, preFlushBinaryLogFile));
+ }
+
+ private void waitSlavesCatchUp(final MySqlCluster cluster, final String binLog) {
+ Asserts.succeedsEventually(new Runnable() {
+ @Override
+ public void run() {
+ MySqlNode master = (MySqlNode) cluster.getAttribute(MySqlCluster.FIRST);
+ for (Entity node : cluster.getMembers()) {
+ if (node == master) continue;
+ String status = MySqlClusterTestHelper.execSql((MySqlNode) node, "SHOW SLAVE STATUS \\G");
+ Map<String, String> map = MySqlRowParser.parseSingle(status);
+ assertEquals(map.get("Relay_Master_Log_File"), binLog);
}
}
+ });
+ }
+ private String getBinaryLogFile(MySqlNode master) {
+ String status = MySqlClusterTestHelper.execSql(master, "SHOW MASTER STATUS \\G");
+ Map<String, String> map = MySqlRowParser.parseSingle(status);
+ return map.get("File");
+ }
+ private boolean fileExists(MySqlNode node, String binLogName) {
+ String dataDir = Strings.nullToEmpty(node.getConfig(MySqlNode.DATA_DIR));
+ String path = Os.mergePathsUnix(dataDir, binLogName);
+ String cmd = BashCommands.chain(
+ "cd $RUN_DIR",
+ BashCommands.requireTest(String.format("-f \"%s\"", path), "File " + path + " doesn't exist."));
+ String summary = "Check if file " + path + " exists";
+ SshMachineLocation machine = EffectorTasks.getSshMachine(node);
+ return Entities.submit(node, SshTasks.newSshExecTaskFactory(machine, cmd)
+ .allowingNonZeroExitCode()
+ .environmentVariable("RUN_DIR", node.getAttribute(SoftwareProcess.RUN_DIR))
+ .summary(summary)
+ .allowingNonZeroExitCode()).asTask().getUnchecked() == 0;
+ }
+ private void cleanData() {
+ if (app.getChildren().isEmpty()) return;
+ for (Entity member : Iterables.getOnlyElement(app.getChildren()).getChildren()) {
+ String runDir = member.getAttribute(MySqlNode.RUN_DIR);
+ if (runDir != null) {
+ Os.deleteRecursively(runDir);
+ }
}
}
private Location getLocation() {
- return mgmt.getLocationRegistry().resolve("localhost");
+ return mgmt.getLocationRegistry().resolve(TEST_LOCATION);
}
}