You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by ri...@apache.org on 2014/05/23 16:51:44 UTC
[13/50] [abbrv] git commit: fixed Riak ring not committing nodes
after they join the cluster
fixed Riak ring not committing nodes after they join the cluster
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/16c5abb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/16c5abb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/16c5abb4
Branch: refs/pull/1408/head
Commit: 16c5abb4d978057aaf071eeba181c276f3ba8142
Parents: bbd7582
Author: ZaidM <za...@cloudsoftcorp.com>
Authored: Thu May 22 12:02:17 2014 +0100
Committer: ZaidM <za...@cloudsoftcorp.com>
Committed: Thu May 22 12:02:17 2014 +0100
----------------------------------------------------------------------
.../java/brooklyn/demo/RiakClusterExample.java | 24 ++++----
.../brooklyn/entity/nosql/riak/RiakCluster.java | 8 +++
.../entity/nosql/riak/RiakClusterImpl.java | 55 +++++++++++++-----
.../brooklyn/entity/nosql/riak/RiakNode.java | 48 +++++++++-------
.../entity/nosql/riak/RiakNodeDriver.java | 2 +
.../entity/nosql/riak/RiakNodeImpl.java | 3 +
.../entity/nosql/riak/RiakNodeSshDriver.java | 59 ++++++++++++--------
.../nosql/riak/RiakClusterEc2LiveTest.java | 7 ++-
.../riak/RiakNodeGoogleComputeLiveTest.java | 7 ++-
9 files changed, 139 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
----------------------------------------------------------------------
diff --git a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
index 3f287f2..debfb76 100644
--- a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
+++ b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
@@ -20,28 +20,30 @@ import brooklyn.util.CommandLineUtil;
@Catalog(name = "Riak Cluster Application", description = "Riak ring deployment blueprint")
public class RiakClusterExample extends AbstractApplication {
+ public static final String DEFAULT_LOCATION_SPEC = "aws-ec2:us-east-1";
+
@CatalogConfig(label = "Riak Ring Size")
public static final ConfigKey<Integer> RIAK_RING_SIZE = ConfigKeys.newConfigKey(
- "riak.ring.size", "Initial size of the Riak Ring", 2);
+ "riak.ring.size", "Initial size of the Riak Ring", 4);
- public void init() {
- addChild(EntitySpec.create(RiakCluster.class)
- .configure(RiakCluster.INITIAL_SIZE, getConfig(RIAK_RING_SIZE)) );
- }
-
public static void main(String[] argv) {
List<String> args = Lists.newArrayList(argv);
- String port = CommandLineUtil.getCommandLineOption(args, "--port", "8081+");
- String location = CommandLineUtil.getCommandLineOption(args, "--location", "jclouds:aws-ec2");
- Preconditions.checkArgument(args.isEmpty(), "Unsupported args: "+args);
+ String port = CommandLineUtil.getCommandLineOption(args, "--port", "8081+");
+ String location = CommandLineUtil.getCommandLineOption(args, "--location", DEFAULT_LOCATION_SPEC);
+ Preconditions.checkArgument(args.isEmpty(), "Unsupported args: " + args);
BrooklynLauncher launcher = BrooklynLauncher.newInstance()
- .application(EntitySpec.create(StartableApplication.class, RiakClusterExample.class))
+ .application(EntitySpec.create(StartableApplication.class, RiakClusterExample.class))
.webconsolePort(port)
.location(location)
.start();
Entities.dumpInfo(launcher.getApplications());
}
-
+
+ public void init() {
+ addChild(EntitySpec.create(RiakCluster.class)
+ .configure(RiakCluster.INITIAL_SIZE, getConfig(RIAK_RING_SIZE)));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
index 29a1883..41aafd2 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
@@ -4,15 +4,23 @@ import java.util.Map;
import com.google.common.reflect.TypeToken;
+import brooklyn.config.ConfigKey;
import brooklyn.entity.Entity;
+import brooklyn.entity.basic.ConfigKeys;
import brooklyn.entity.group.DynamicCluster;
import brooklyn.entity.proxying.ImplementedBy;
import brooklyn.event.AttributeSensor;
import brooklyn.event.basic.Sensors;
+import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.time.Duration;
@ImplementedBy(RiakClusterImpl.class)
public interface RiakCluster extends DynamicCluster {
AttributeSensor<Map<Entity, String>> RIAK_CLUSTER_NODES = Sensors.newSensor(new TypeToken<Map<Entity, String>>() {
}, "riak.cluster.nodes", "Names of all active Riak nodes in the cluster <Entity,Riak Name>");
+
+ @SetFromFlag("delayBeforeAdvertisingCluster")
+ ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class, "couchbase.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is started before checking and advertising its availability", Duration.seconds(2 * 60));
+
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
index 5508d5e..11ecf92 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
@@ -11,21 +11,23 @@ import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
import brooklyn.entity.Entity;
import brooklyn.entity.basic.Entities;
import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.Lifecycle;
import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
import brooklyn.entity.group.DynamicClusterImpl;
import brooklyn.entity.proxying.EntitySpec;
import brooklyn.entity.trait.Startable;
import brooklyn.location.Location;
import brooklyn.util.collections.MutableMap;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
+import brooklyn.util.time.Time;
public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
@@ -35,20 +37,42 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
public void init() {
log.info("Initializing the riak cluster...");
super.init();
+
+
}
@Override
public void start(Collection<? extends Location> locations) {
super.start(locations);
connectSensors();
+
+ Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER));
+
+ //FIXME: add a quorum to tolerate failed nodes before setting on fire.
+ Optional<Entity> anyNode = Iterables.tryFind(getMembers(), new Predicate<Entity>() {
+
+ @Override
+ public boolean apply(@Nullable Entity entity) {
+ return (entity instanceof RiakNode && hasMemberJoinedCluster(entity) && entity.getAttribute(RiakNode.SERVICE_UP));
+ }
+ });
+
+ if (anyNode.isPresent()) {
+ log.info("Planning and Committing cluster changes on node: {}, cluster: {}", anyNode.get().getId(), getId());
+ Entities.invokeEffector(this, anyNode.get(), RiakNode.COMMIT_RIAK_CLUSTER);
+ } else {
+ log.warn("No Riak Nodes are found on the cluster: {}. Initialization Failed", getId());
+ setAttribute(SERVICE_STATE, Lifecycle.ON_FIRE);
+ }
}
+
protected EntitySpec<?> getMemberSpec() {
EntitySpec<?> result = super.getMemberSpec();
- if (result!=null) return result;
+ if (result != null) return result;
return EntitySpec.create(RiakNode.class);
}
-
+
protected void connectSensors() {
Map<String, Object> flags = MutableMap.<String, Object>builder()
@@ -81,7 +105,7 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
if (belongsInServerPool(member)) {
// TODO can we discover the nodes by asking the riak cluster, rather than assuming what we add will be in there?
// TODO and can we do join as part of node starting?
-
+
Map<Entity, String> nodes = getAttribute(RIAK_CLUSTER_NODES);
if (nodes == null) nodes = Maps.newLinkedHashMap();
String riakName = getRiakName(member);
@@ -94,7 +118,7 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
nodes.put(member, riakName);
setAttribute(RIAK_CLUSTER_NODES, nodes);
- ((EntityInternal) member).setAttribute(RiakNode.RIAK_NODE_IN_CLUSTER, Boolean.TRUE);
+ ((EntityInternal) member).setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.TRUE);
isFirstNodeSet.set(true);
log.info("Adding riak node {}: {}; {} to cluster", new Object[]{this, member, getRiakName(member)});
@@ -106,12 +130,12 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() {
@Override
public boolean apply(@Nullable Entity node) {
- return (node instanceof RiakNode && isMemberInCluster(node));
+ return (node instanceof RiakNode && hasMemberJoinedCluster(node));
}
});
if (anyNodeInCluster.isPresent()) {
- if (!nodes.containsKey(member) && !isMemberInCluster(member)) {
+ if (!nodes.containsKey(member) && !hasMemberJoinedCluster(member)) {
String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME);
Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName);
@@ -142,7 +166,8 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
return false;
}
if (!getMembers().contains(member)) {
- if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, eliminating because not member", this, member);
+ if (log.isTraceEnabled())
+ log.trace("Members of {}, checking {}, eliminating because not member", this, member);
return false;
}
@@ -155,7 +180,7 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
return node.getAttribute(RiakNode.RIAK_NODE_NAME);
}
- private Boolean isMemberInCluster(Entity member) {
- return Optional.fromNullable(member.getAttribute(RiakNode.RIAK_NODE_IN_CLUSTER)).or(Boolean.FALSE);
+ private Boolean hasMemberJoinedCluster(Entity member) {
+ return Optional.fromNullable(member.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER)).or(Boolean.FALSE);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
index 046f697..d77e73b 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
@@ -32,9 +32,9 @@ public interface RiakNode extends SoftwareProcess {
"riak.appConfig.templateUrl", "Template file (in freemarker format) for the app.config config file",
"classpath://brooklyn/entity/nosql/riak/app.config");
- @SetFromFlag("riakNodeInCluster")
- AttributeSensor<Boolean> RIAK_NODE_IN_CLUSTER = Sensors.newBooleanSensor(
- "riak.node.inCluster", "Flag to indicate wether the node is a cluster member");
+ @SetFromFlag("riakNodeHasJoinedCluster")
+ AttributeSensor<Boolean> RIAK_NODE_HAS_JOINED_CLUSTER = Sensors.newBooleanSensor(
+ "riak.node.riakNodeHasJoinedCluster", "Flag to indicate wether the Riak node has joined a cluster member");
@SetFromFlag("riakNodeName")
AttributeSensor<String> RIAK_NODE_NAME = Sensors.newStringSensor("riak.node", "Returns the riak node name as defined in vm.args");
@@ -49,42 +49,45 @@ public interface RiakNode extends SoftwareProcess {
PortAttributeSensorAndConfigKey EPMD_LISTENER_PORT = new PortAttributeSensorAndConfigKey("riak.epmdListenerPort", "Erlang Port Mapper Daemon Listener Port", "4369");
PortAttributeSensorAndConfigKey ERLANG_PORT_RANGE_START = new PortAttributeSensorAndConfigKey("riak.erlangPortRangeStart", "Erlang Port Range Start", "6000+");
PortAttributeSensorAndConfigKey ERLANG_PORT_RANGE_END = new PortAttributeSensorAndConfigKey("riak.erlangPortRangeEnd", "Erlang Port Range End", "7999+");
-
- // accessors, for use from template file
- Integer getRiakWebPort();
- Integer getRiakPbPort();
- Integer getHandoffListenerPort();
- Integer getEpmdListenerPort();
- Integer getErlangPortRangeStart();
- Integer getErlangPortRangeEnd();
-
- //Sensors for Riak Node Counters (within 1 minute window or lifetime of node.
- //http://docs.basho.com/riak/latest/ops/running/stats-and-monitoring/#Statistics-from-Riak
-
AttributeSensor<Integer> NODE_GETS = Sensors.newIntegerSensor("node.gets");
AttributeSensor<Integer> NODE_GETS_TOTAL = Sensors.newIntegerSensor("node.gets.total");
AttributeSensor<Integer> NODE_PUTS = Sensors.newIntegerSensor("node.puts");
AttributeSensor<Integer> NODE_PUTS_TOTAL = Sensors.newIntegerSensor("node.puts.total");
AttributeSensor<Integer> VNODE_GETS = Sensors.newIntegerSensor("vnode.gets");
AttributeSensor<Integer> VNODE_GETS_TOTAL = Sensors.newIntegerSensor("vnode.gets.total");
+
+ //Sensors for Riak Node Counters (within 1 minute window or lifetime of node.
+ //http://docs.basho.com/riak/latest/ops/running/stats-and-monitoring/#Statistics-from-Riak
AttributeSensor<Integer> VNODE_PUTS = Sensors.newIntegerSensor("vnode.puts");
AttributeSensor<Integer> VNODE_PUTS_TOTAL = Sensors.newIntegerSensor("vnode.puts.total");
-
AttributeSensor<Integer> READ_REPAIRS_TOTAL = Sensors.newIntegerSensor("read.repairs.total");
AttributeSensor<Integer> COORD_REDIRS_TOTAL = Sensors.newIntegerSensor("coord.redirs.total");
-
//Additional Riak node counters
AttributeSensor<Integer> MEMORY_PROCESSES_USED = Sensors.newIntegerSensor("memory.processes.used");
AttributeSensor<Integer> SYS_PROCESS_COUNT = Sensors.newIntegerSensor("sys.process.count");
AttributeSensor<Integer> PBC_CONNECTS = Sensors.newIntegerSensor("pbc.connects");
AttributeSensor<Integer> PBC_ACTIVE = Sensors.newIntegerSensor("pbc.active");
-
@SuppressWarnings("serial")
- AttributeSensor<List<String>> RING_MEMBERS = Sensors.newSensor(new TypeToken<List<String>>() {},
- "ring.members", "all the riak nodes in the ring");
-
+ AttributeSensor<List<String>> RING_MEMBERS = Sensors.newSensor(new TypeToken<List<String>>() {
+ },
+ "ring.members", "all the riak nodes in the ring"
+ );
public static final MethodEffector<Void> JOIN_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "joinCluster");
public static final MethodEffector<Void> LEAVE_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "leaveCluster");
+ public static final MethodEffector<Void> COMMIT_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "commitCluster");
+
+ // accessors, for use from template file
+ Integer getRiakWebPort();
+
+ Integer getRiakPbPort();
+
+ Integer getHandoffListenerPort();
+
+ Integer getEpmdListenerPort();
+
+ Integer getErlangPortRangeStart();
+
+ Integer getErlangPortRangeEnd();
@Effector(description = "add this riak node to the riak cluster")
public void joinCluster(@EffectorParam(name = "nodeName") String nodeName);
@@ -95,4 +98,7 @@ public interface RiakNode extends SoftwareProcess {
@Effector(description = "recover a failed riak node and join it back to the cluster (by passing it a working node on the cluster 'node')")
public void recoverFailedNode(@EffectorParam(name = "nodeName") String nodeName);
+ @Effector(description = "commit changes made to a Riak cluster")
+ public void commitCluster();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
index f894d66..7a09342 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
@@ -11,4 +11,6 @@ public interface RiakNodeDriver extends SoftwareProcessDriver {
public void leaveCluster();
public void recoverFailedNode(String nodeName);
+
+ public void commitCluster();
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
index 5fe0cac..f5e2dac 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
@@ -151,6 +151,9 @@ public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode {
}
@Override
+ public void commitCluster(){ getDriver().commitCluster();}
+
+ @Override
public void recoverFailedNode(String nodeName) {
getDriver().recoverFailedNode(nodeName);
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
index 9676369..fc36198 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
@@ -1,11 +1,6 @@
package brooklyn.entity.nosql.riak;
-import static brooklyn.util.ssh.BashCommands.INSTALL_CURL;
-import static brooklyn.util.ssh.BashCommands.INSTALL_TAR;
-import static brooklyn.util.ssh.BashCommands.alternatives;
-import static brooklyn.util.ssh.BashCommands.chainGroup;
-import static brooklyn.util.ssh.BashCommands.commandToDownloadUrlAs;
-import static brooklyn.util.ssh.BashCommands.sudo;
+import static brooklyn.util.ssh.BashCommands.*;
import static java.lang.String.format;
import java.util.List;
@@ -14,6 +9,11 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver;
import brooklyn.entity.basic.Attributes;
import brooklyn.entity.basic.Entities;
@@ -26,11 +26,6 @@ import brooklyn.util.collections.MutableMap;
import brooklyn.util.net.Urls;
import brooklyn.util.task.DynamicTasks;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
// TODO: Alter -env ERL_CRASH_DUMP path in vm.args
public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implements RiakNodeDriver {
@@ -53,10 +48,10 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
MutableMap<String, String> result = MutableMap.copyOf(super.getShellEnvironment());
// how to change epmd port, according to
// http://serverfault.com/questions/582787/how-to-change-listening-interface-of-rabbitmqs-epmd-port-4369
- result.put("ERL_EPMD_PORT", ""+getEntity().getEpmdListenerPort());
+ result.put("ERL_EPMD_PORT", "" + getEntity().getEpmdListenerPort());
return result;
}
-
+
@Override
public void install() {
DownloadResolver resolver = Entities.newDownloader(this);
@@ -129,7 +124,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
isRiakOnPath = isPackageInstall ? isRiakOnPath() : true;
OsDetails osDetails = getMachine().getMachineDetails().getOsDetails();
-
+
List<String> commands = Lists.newLinkedList();
String vmArgsTemplate = processTemplate(entity.getConfig(RiakNode.RIAK_VM_ARGS_TEMPLATE_URL));
@@ -249,12 +244,10 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
if (getRiakName().equals(nodeName)) {
log.warn("cannot join riak node: {} to itself", nodeName);
} else {
- if (!isInCluster()) {
+ if (!hasJoinedCluster()) {
ScriptHelper joinClusterScript = newScript("joinCluster")
.body.append(format("%s cluster join %s", getRiakAdminCmd(), nodeName))
- .body.append(format("%s cluster plan", getRiakAdminCmd()))
- .body.append(format("%s cluster commit", getRiakAdminCmd()))
.failOnNonZeroResultCode();
if (!isRiakOnPath) {
@@ -265,7 +258,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
joinClusterScript.execute();
- entity.setAttribute(RiakNode.RIAK_NODE_IN_CLUSTER, Boolean.TRUE);
+ entity.setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.TRUE);
} else {
log.warn("entity {}: is already in the riak cluster", entity.getId());
}
@@ -278,7 +271,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
//FIXME: find a way to batch commit the changes, instead of committing for every operation.
//FIXME: find a way to check if the node is the last in the cluster to avoid removing the only member and getting "last node error"
- if (isInCluster()) {
+ if (hasJoinedCluster()) {
ScriptHelper leaveClusterScript = newScript("leaveCluster")
.body.append(format("%s cluster leave", getRiakAdminCmd()))
.body.append(format("%s cluster plan", getRiakAdminCmd()))
@@ -292,7 +285,27 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
leaveClusterScript.execute();
- entity.setAttribute(RiakNode.RIAK_NODE_IN_CLUSTER, Boolean.FALSE);
+ entity.setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.FALSE);
+ } else {
+ log.warn("entity {}: is not in the riak cluster", entity.getId());
+ }
+ }
+
+ @Override
+ public void commitCluster() {
+
+ if (hasJoinedCluster()) {
+ ScriptHelper commitClusterScript = newScript("commitCluster")
+ .body.append(format("%s cluster plan", getRiakAdminCmd()))
+ .body.append(format("%s cluster commit", getRiakAdminCmd()));
+
+ if (!isRiakOnPath) {
+ Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
+ log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
+ commitClusterScript.environmentVariablesReset(newPathVariable);
+ }
+ commitClusterScript.execute();
+
} else {
log.warn("entity {}: is not in the riak cluster", entity.getId());
}
@@ -305,7 +318,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
//argument passed 'node' is any working node in the riak cluster
//following the instruction from: http://docs.basho.com/riak/latest/ops/running/recovery/failed-node/
- if (isInCluster()) {
+ if (hasJoinedCluster()) {
String failedNodeName = getRiakName();
@@ -350,8 +363,8 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
throw new IllegalArgumentException("Subnet address is not set.");
}
- private Boolean isInCluster() {
- return Optional.fromNullable(entity.getAttribute(RiakNode.RIAK_NODE_IN_CLUSTER)).or(Boolean.FALSE);
+ private Boolean hasJoinedCluster() {
+ return Optional.fromNullable(entity.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER)).or(Boolean.FALSE);
}
private boolean isRiakOnPath() {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
index 691e1cb..6a42578 100644
--- a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
+++ b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
@@ -29,8 +29,11 @@ public class RiakClusterEc2LiveTest extends AbstractEc2LiveTest {
RiakNode first = (RiakNode) Iterables.get(cluster.getMembers(), 0);
RiakNode second = (RiakNode) Iterables.get(cluster.getMembers(), 1);
- EntityTestUtils.assertAttributeEqualsEventually(first, RiakNode.RIAK_NODE_IN_CLUSTER, true);
- EntityTestUtils.assertAttributeEqualsEventually(second, RiakNode.RIAK_NODE_IN_CLUSTER, true);
+ EntityTestUtils.assertAttributeEqualsEventually(first, RiakNode.SERVICE_UP, true);
+ EntityTestUtils.assertAttributeEqualsEventually(second, RiakNode.SERVICE_UP, true);
+
+ EntityTestUtils.assertAttributeEqualsEventually(first, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true);
+ EntityTestUtils.assertAttributeEqualsEventually(second, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true);
}
@Test(enabled = false)
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakNodeGoogleComputeLiveTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakNodeGoogleComputeLiveTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakNodeGoogleComputeLiveTest.java
index 98da4b0..544ddd2 100644
--- a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakNodeGoogleComputeLiveTest.java
+++ b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakNodeGoogleComputeLiveTest.java
@@ -23,8 +23,11 @@ public class RiakNodeGoogleComputeLiveTest extends AbstractGoogleComputeLiveTest
RiakNode first = (RiakNode) Iterables.get(cluster.getMembers(), 0);
RiakNode second = (RiakNode) Iterables.get(cluster.getMembers(), 1);
- EntityTestUtils.assertAttributeEqualsEventually(first, RiakNode.RIAK_NODE_IN_CLUSTER, true);
- EntityTestUtils.assertAttributeEqualsEventually(second, RiakNode.RIAK_NODE_IN_CLUSTER, true);
+ EntityTestUtils.assertAttributeEqualsEventually(first, RiakNode.SERVICE_UP, true);
+ EntityTestUtils.assertAttributeEqualsEventually(second, RiakNode.SERVICE_UP, true);
+
+ EntityTestUtils.assertAttributeEqualsEventually(first, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true);
+ EntityTestUtils.assertAttributeEqualsEventually(second, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true);
}