You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by gr...@apache.org on 2015/03/20 20:28:21 UTC
[1/2] incubator-brooklyn git commit: Updates to Riak for Clocker
Repository: incubator-brooklyn
Updated Branches:
refs/heads/master 83b27b320 -> e08d3208b
Updates to Riak for Clocker
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/d738df9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/d738df9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/d738df9a
Branch: refs/heads/master
Commit: d738df9a90ef73e5bece33fef97584b5d5d7dd4c
Parents: 83b27b3
Author: Andrew Kennedy <gr...@apache.org>
Authored: Fri Mar 20 16:05:37 2015 +0000
Committer: Andrew Kennedy <gr...@apache.org>
Committed: Fri Mar 20 18:37:43 2015 +0000
----------------------------------------------------------------------
.../entity/nosql/riak/RiakClusterImpl.java | 67 +++++-------
.../brooklyn/entity/nosql/riak/RiakNode.java | 49 +++++----
.../entity/nosql/riak/RiakNodeDriver.java | 2 +-
.../entity/nosql/riak/RiakNodeImpl.java | 9 +-
.../entity/nosql/riak/RiakNodeSshDriver.java | 103 +++++++++++--------
.../nosql/riak/RiakClusterEc2LiveTest.java | 8 --
6 files changed, 113 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
index ea1f1b9..079eada 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
@@ -24,8 +24,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +31,7 @@ import brooklyn.entity.Entity;
import brooklyn.entity.basic.Attributes;
import brooklyn.entity.basic.Entities;
import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.EntityPredicates;
import brooklyn.entity.basic.Lifecycle;
import brooklyn.entity.basic.ServiceStateLogic;
import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
@@ -47,7 +46,7 @@ import brooklyn.util.time.Time;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -73,14 +72,10 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER));
//FIXME: add a quorum to tolerate failed nodes before setting on fire.
- Optional<Entity> anyNode = Iterables.tryFind(getMembers(), new Predicate<Entity>() {
-
- @Override
- public boolean apply(@Nullable Entity entity) {
- return (entity instanceof RiakNode && hasMemberJoinedCluster(entity) && entity.getAttribute(RiakNode.SERVICE_UP));
- }
- });
-
+ Optional<Entity> anyNode = Iterables.tryFind(getMembers(), Predicates.and(
+ Predicates.instanceOf(RiakNode.class),
+ EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true),
+ EntityPredicates.attributeEqualTo(RiakNode.SERVICE_UP, true)));
if (anyNode.isPresent()) {
log.info("Planning and Committing cluster changes on node: {}, cluster: {}", anyNode.get().getId(), getId());
Entities.invokeEffector(this, anyNode.get(), RiakNode.COMMIT_RIAK_CLUSTER).blockUntilEnded();
@@ -103,7 +98,7 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
.configure("group", this));
}
- protected void onServerPoolMemberChanged(Entity member) {
+ protected void onServerPoolMemberChanged(final Entity member) {
synchronized (mutex) {
log.trace("For {}, considering membership of {} which is in locations {}", new Object[]{ this, member, member.getLocations() });
@@ -132,50 +127,40 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
} else {
// TODO: be wary of erroneous nodes but are still flagged 'in cluster'
// add the new node to be part of the riak cluster.
- Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() {
- @Override
- public boolean apply(@Nullable Entity node) {
- return (node instanceof RiakNode && hasMemberJoinedCluster(node));
- }
- });
-
+ Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and(
+ Predicates.instanceOf(RiakNode.class),
+ EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true)));
if (anyNodeInCluster.isPresent()) {
- if (!nodes.containsKey(member) && !hasMemberJoinedCluster(member)) {
+ if (!nodes.containsKey(member) && member.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER) == null) {
String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME);
Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName);
if (getAttribute(IS_CLUSTER_INIT)) {
- Entities.invokeEffector(RiakClusterImpl.this, anyNodeInCluster.get(), RiakNode.COMMIT_RIAK_CLUSTER);
+ Entities.invokeEffector(RiakClusterImpl.this, member, RiakNode.COMMIT_RIAK_CLUSTER);
}
nodes.put(member, riakName);
setAttribute(RIAK_CLUSTER_NODES, nodes);
- log.info("Adding riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) });
+ log.info("Added Riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) });
}
} else {
- log.error("isFirstNodeSet , but no cluster members found to add {}", member.getId());
+ log.error("isFirstNodeSet, but no cluster members found to add {}", member.getId());
}
}
} else {
if (nodes != null && nodes.containsKey(member)) {
- final Entity memberToBeRemoved = member;
-
- Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() {
- @Override
- public boolean apply(@Nullable Entity node) {
- return (node instanceof RiakNode && hasMemberJoinedCluster(node) && !node.equals(memberToBeRemoved));
- }
- });
+ Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and(
+ Predicates.instanceOf(RiakNode.class),
+ EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true),
+ Predicates.not(Predicates.equalTo(member))));
if (anyNodeInCluster.isPresent()) {
- Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.LEAVE_RIAK_CLUSTER, getRiakName(memberToBeRemoved));
+ Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.LEAVE_RIAK_CLUSTER, getRiakName(member));
}
-
nodes.remove(member);
setAttribute(RIAK_CLUSTER_NODES, nodes);
- log.info("Removing riak node {}: {}; {} from cluster", new Object[]{this, member, getRiakName(member)});
+ log.info("Removed Riak node {}: {}; {} from cluster", new Object[]{ this, member, getRiakName(member) });
}
}
ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyMap(this, RIAK_CLUSTER_NODES);
- if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", this, member);
calculateClusterAddresses();
}
@@ -194,16 +179,14 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
protected boolean belongsInServerPool(Entity member) {
if (!groovyTruth(member.getAttribute(Startable.SERVICE_UP))) {
- if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, eliminating because not up", this, member);
+ log.trace("Members of {}, checking {}, eliminating because not up", this, member);
return false;
}
if (!getMembers().contains(member)) {
- if (log.isTraceEnabled())
- log.trace("Members of {}, checking {}, eliminating because not member", this, member);
-
+ log.trace("Members of {}, checking {}, eliminating because not member", this, member);
return false;
}
- if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, approving", this, member);
+ log.trace("Members of {}, checking {}, approving", this, member);
return true;
}
@@ -212,10 +195,6 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
return node.getAttribute(RiakNode.RIAK_NODE_NAME);
}
- private boolean hasMemberJoinedCluster(Entity member) {
- return ((RiakNode) member).hasJoinedCluster();
- }
-
public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
@Override
protected void onEntityEvent(EventType type, Entity entity) {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
index ef9556d..c1f1bf8 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
@@ -20,11 +20,6 @@ package brooklyn.entity.nosql.riak;
import java.util.List;
-import brooklyn.entity.basic.Attributes;
-import brooklyn.event.basic.AttributeSensorAndConfigKey;
-import brooklyn.event.basic.TemplatedStringAttributeSensorAndConfigKey;
-import com.google.common.reflect.TypeToken;
-
import brooklyn.catalog.Catalog;
import brooklyn.config.ConfigKey;
import brooklyn.entity.annotation.Effector;
@@ -34,10 +29,14 @@ import brooklyn.entity.basic.MethodEffector;
import brooklyn.entity.basic.SoftwareProcess;
import brooklyn.entity.proxying.ImplementedBy;
import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.AttributeSensorAndConfigKey;
import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
import brooklyn.event.basic.Sensors;
+import brooklyn.event.basic.TemplatedStringAttributeSensorAndConfigKey;
import brooklyn.util.flags.SetFromFlag;
+import com.google.common.reflect.TypeToken;
+
@Catalog(name="Riak Node", description="Riak is a distributed NoSQL key-value data store that offers "
+ "extremely high availability, fault tolerance, operational simplicity and scalability.")
@ImplementedBy(RiakNodeImpl.class)
@@ -45,8 +44,10 @@ public interface RiakNode extends SoftwareProcess {
@SetFromFlag("version")
ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION,
- "Version to install. Example 2.0.2, 2.0.5",
- "2.0.5");
+ "Version to install (Default 2.0.5)", "2.0.5");
+
+ @SetFromFlag("optimizeNetworking")
+ ConfigKey<Boolean> OPTIMIZE_HOST_NETWORKING = ConfigKeys.newBooleanConfigKey("riak.networking.optimize", "Optimize host networking when running in a VM", Boolean.TRUE);
// vm.args and app.config are used for pre-version 2.0.0. Later versions use the (simplified) riak.conf
// see https://github.com/joedevivo/ricon/blob/master/cuttlefish.md
@@ -94,14 +95,19 @@ public interface RiakNode extends SoftwareProcess {
@SetFromFlag("riakWebPort")
PortAttributeSensorAndConfigKey RIAK_WEB_PORT = new PortAttributeSensorAndConfigKey("riak.webPort", "Riak Web Port", "8098+");
- @SetFromFlag("riakNodeHasJoinedCluster")
+ @SetFromFlag("riakPbPort")
+ PortAttributeSensorAndConfigKey RIAK_PB_PORT = new PortAttributeSensorAndConfigKey("riak.pbPort", "Riak Protocol Buffers Port", "8087+");
+
+ AttributeSensor<Boolean> RIAK_PACKAGE_INSTALL = Sensors.newBooleanSensor(
+ "riak.install.package", "Flag to indicate whether Riak was installed using an OS package");
+ AttributeSensor<Boolean> RIAK_ON_PATH = Sensors.newBooleanSensor(
+ "riak.install.onPath", "Flag to indicate whether Riak is available on the PATH");
+
AttributeSensor<Boolean> RIAK_NODE_HAS_JOINED_CLUSTER = Sensors.newBooleanSensor(
- "riak.node.riakNodeHasJoinedCluster", "Flag to indicate wether the Riak node has joined a cluster member");
+ "riak.node.riakNodeHasJoinedCluster", "Flag to indicate whether the Riak node has joined a cluster member");
- @SetFromFlag("riakNodeName")
AttributeSensor<String> RIAK_NODE_NAME = Sensors.newStringSensor("riak.node", "Returns the riak node name as defined in vm.args");
- @SetFromFlag("riakPbPort")
- PortAttributeSensorAndConfigKey RIAK_PB_PORT = new PortAttributeSensorAndConfigKey("riak.pbPort", "Riak Protocol Buffers Port", "8087+");
+
// these needed for nodes to talk to each other, but not clients (so ideally set up in the security group for internal access)
PortAttributeSensorAndConfigKey HANDOFF_LISTENER_PORT = new PortAttributeSensorAndConfigKey("riak.handoffListenerPort", "Handoff Listener Port", "8099+");
PortAttributeSensorAndConfigKey EPMD_LISTENER_PORT = new PortAttributeSensorAndConfigKey("riak.epmdListenerPort", "Erlang Port Mapper Daemon Listener Port", "4369");
@@ -109,6 +115,7 @@ public interface RiakNode extends SoftwareProcess {
PortAttributeSensorAndConfigKey ERLANG_PORT_RANGE_END = new PortAttributeSensorAndConfigKey("riak.erlangPortRangeEnd", "Erlang Port Range End", "7999+");
PortAttributeSensorAndConfigKey SEARCH_SOLR_PORT = new PortAttributeSensorAndConfigKey("riak.search.solr.port", "Solr port", "8093+");
PortAttributeSensorAndConfigKey SEARCH_SOLR_JMX_PORT = new PortAttributeSensorAndConfigKey("riak.search.solr.jmx_port", "Solr port", "8985+");
+
AttributeSensor<Integer> NODE_GETS = Sensors.newIntegerSensor("node.gets");
AttributeSensor<Integer> NODE_GETS_TOTAL = Sensors.newIntegerSensor("node.gets.total");
AttributeSensor<Integer> NODE_PUTS = Sensors.newIntegerSensor("node.puts");
@@ -130,9 +137,10 @@ public interface RiakNode extends SoftwareProcess {
@SuppressWarnings("serial")
AttributeSensor<List<String>> RING_MEMBERS = Sensors.newSensor(new TypeToken<List<String>>() {},
"ring.members", "all the riak nodes in the ring");
- public static final MethodEffector<Void> JOIN_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "joinCluster");
- public static final MethodEffector<Void> LEAVE_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "leaveCluster");
- public static final MethodEffector<Void> COMMIT_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "commitCluster");
+
+ MethodEffector<Void> JOIN_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "joinCluster");
+ MethodEffector<Void> LEAVE_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "leaveCluster");
+ MethodEffector<Void> COMMIT_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "commitCluster");
// accessors, for use from template file
Integer getRiakWebPort();
@@ -157,17 +165,16 @@ public interface RiakNode extends SoftwareProcess {
String getOsMajorVersion();
- @Effector(description = "add this riak node to the riak cluster")
+ @Effector(description = "Add this riak node to the Riak cluster")
public void joinCluster(@EffectorParam(name = "nodeName") String nodeName);
- @Effector(description = "remove this riak node from the cluster")
- public void leaveCluster();
+ @Effector(description = "Remove this Riak node from the cluster")
+ public void leaveCluster(@EffectorParam(name = "nodeName") String nodeName);
- @Effector(description = "recover a failed riak node and join it back to the cluster (by passing it a working node on the cluster 'node')")
+ @Effector(description = "Recover a failed Riak node and join it back to the cluster (by passing it a working node on the cluster 'node')")
public void recoverFailedNode(@EffectorParam(name = "nodeName") String nodeName);
- @Effector(description = "commit changes made to a Riak cluster")
+ @Effector(description = "Commit changes made to a Riak cluster")
public void commitCluster();
- public boolean hasJoinedCluster();
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
index 3669c1c..b81b7fc 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
@@ -26,7 +26,7 @@ public interface RiakNodeDriver extends SoftwareProcessDriver {
public void joinCluster(String nodeName);
- public void leaveCluster();
+ public void leaveCluster(String nodeName);
public void recoverFailedNode(String nodeName);
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
index 73bb272..0667a7a 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
@@ -181,8 +181,8 @@ public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode {
}
@Override
- public void leaveCluster() {
- getDriver().leaveCluster();
+ public void leaveCluster(String nodeName) {
+ getDriver().leaveCluster(nodeName);
}
@Override
@@ -191,11 +191,6 @@ public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode {
}
@Override
- public boolean hasJoinedCluster() {
- return Boolean.TRUE.equals(getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER));
- }
-
- @Override
public void recoverFailedNode(String nodeName) {
getDriver().recoverFailedNode(nodeName);
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
index 00e304f..544c39a 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
@@ -26,6 +26,7 @@ import java.util.Map;
import brooklyn.util.ssh.BashCommands;
import brooklyn.util.task.ssh.SshTasks;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +44,7 @@ import brooklyn.util.task.DynamicTasks;
import brooklyn.util.text.Strings;
import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -53,8 +55,6 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
private static final Logger LOG = LoggerFactory.getLogger(RiakNodeSshDriver.class);
private static final String sbinPath = "$PATH:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin";
private static final String INSTALLING_FALLBACK = INSTALLING + "_fallback";
- private boolean isPackageInstall = true;
- private boolean isRiakOnPath = true;
public RiakNodeSshDriver(final RiakNodeImpl entity, final SshMachineLocation machine) {
super(entity, machine);
@@ -97,8 +97,9 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
} else {
commands.addAll(installFromPackageCloud());
}
+ entity.setAttribute(RiakNode.RIAK_PACKAGE_INSTALL, true);
} else if (osDetails.isMac()) {
- isPackageInstall = false;
+ entity.setAttribute(RiakNode.RIAK_PACKAGE_INSTALL, false);
commands.addAll(installMac());
} else if (osDetails.isWindows()) {
throw new UnsupportedOperationException("RiakNode not supported on Windows instances");
@@ -123,6 +124,8 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
.execute();
}
}
+
+ checkRiakOnPath();
}
private List<String> installLinuxFromPackageUrl(String expandedInstallDir) {
@@ -217,8 +220,6 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
//create entity's runDir
newScript(CUSTOMIZING).execute();
- isRiakOnPath = isPackageInstall ? isRiakOnPath() : true;
-
OsDetails osDetails = getMachine().getMachineDetails().getOsDetails();
List<String> commands = Lists.newLinkedList();
@@ -253,7 +254,21 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
commands.add(sudo("chown -R riak:riak " + getRiakEtcDir()));
}
- if(osDetails.isLinux()) {
+ // TODO platform_*_dir
+ // TODO riak config log
+
+ ScriptHelper customizeScript = newScript(CUSTOMIZING)
+ .failOnNonZeroResultCode()
+ .body.append(commands);
+
+ if (!isRiakOnPath()) {
+ Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
+ log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
+ customizeScript.environmentVariablesReset(newPathVariable);
+ }
+ customizeScript.failOnNonZeroResultCode().execute();
+
+ if (osDetails.isLinux()) {
ImmutableMap<String, String> sysctl = ImmutableMap.<String, String>builder()
.put("vm.swappiness", "0")
.put("net.core.somaxconn", "40000")
@@ -266,23 +281,14 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
.put("net.ipv4.tcp_moderate_rcvbuf", "1")
.build();
- // TODO platform_*_dir
- // TODO riak config log
+ ScriptHelper optimize = newScript(CUSTOMIZING + "network")
+ .body.append(sudo("sysctl " + Joiner.on(' ').withKeyValueSeparator("=").join(sysctl)));
- commands.add( sudo("sysctl " + Joiner.on(' ').withKeyValueSeparator("=").join(sysctl)));
+ Optional<Boolean> enable = Optional.fromNullable(entity.getConfig(RiakNode.OPTIMIZE_HOST_NETWORKING));
+ if (!enable.isPresent()) optimize.inessential();
+ if (enable.or(true)) optimize.execute();
}
- ScriptHelper customizeScript = newScript(CUSTOMIZING)
- .failOnNonZeroResultCode()
- .body.append(commands);
-
- if (!isRiakOnPath) {
- Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
- log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
- customizeScript.environmentVariablesReset(newPathVariable);
- }
- customizeScript.failOnNonZeroResultCode().execute();
-
//set the riak node name
entity.setAttribute(RiakNode.RIAK_NODE_NAME, format("riak@%s", getSubnetHostname()));
}
@@ -290,7 +296,8 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
@Override
public void launch() {
List<String> commands = Lists.newLinkedList();
- if (isPackageInstall) {
+
+ if (isPackageInstall()) {
commands.add(addSbinPathCommand());
commands.add(sudo("service riak start"));
} else {
@@ -303,7 +310,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
ScriptHelper launchScript = newScript(LAUNCHING)
.body.append(commands);
- if (!isRiakOnPath) {
+ if (!isRiakOnPath()) {
Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
launchScript.environmentVariablesReset(newPathVariable);
@@ -313,15 +320,15 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
@Override
public void stop() {
- leaveCluster();
+ leaveCluster("");
String command = format("%s stop", getRiakCmd());
- command = isPackageInstall ? sudo(command) : command;
+ command = isPackageInstall() ? sudo(command) : command;
ScriptHelper stopScript = newScript(ImmutableMap.of(USE_PID_FILE, false), STOPPING)
.body.append(command);
- if (!isRiakOnPath) {
+ if (!isRiakOnPath()) {
Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
stopScript.environmentVariablesReset(newPathVariable);
@@ -339,7 +346,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
ScriptHelper checkRunningScript = newScript(CHECK_RUNNING)
.body.append(sudo(format("%s ping", getRiakCmd())));
- if (!isRiakOnPath) {
+ if (!isRiakOnPath()) {
Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
checkRunningScript.environmentVariablesReset(newPathVariable);
@@ -347,16 +354,24 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
return (checkRunningScript.execute() == 0);
}
+ public boolean isPackageInstall() {
+ return entity.getAttribute(RiakNode.RIAK_PACKAGE_INSTALL);
+ }
+
+ public boolean isRiakOnPath() {
+ return entity.getAttribute(RiakNode.RIAK_ON_PATH);
+ }
+
public String getRiakEtcDir() {
- return isPackageInstall ? "/etc/riak" : Urls.mergePaths(getExpandedInstallDir(), "etc");
+ return isPackageInstall() ? "/etc/riak" : Urls.mergePaths(getExpandedInstallDir(), "etc");
}
protected String getRiakCmd() {
- return isPackageInstall ? "riak" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak");
+ return isPackageInstall() ? "riak" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak");
}
protected String getRiakAdminCmd() {
- return isPackageInstall ? "riak-admin" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak-admin");
+ return isPackageInstall() ? "riak-admin" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak-admin");
}
@Override
@@ -372,7 +387,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
.body.append(sudo(format("%s cluster join %s", getRiakAdminCmd(), nodeName)))
.failOnNonZeroResultCode();
- if (!isRiakOnPath) {
+ if (!isRiakOnPath()) {
Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
joinClusterScript.environmentVariablesReset(newPathVariable);
@@ -388,19 +403,19 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
}
@Override
- public void leaveCluster() {
+ public void leaveCluster(String nodeName) {
//TODO: add 'riak-admin cluster force-remove' for erroneous and unrecoverable nodes.
//FIXME: find a way to batch commit the changes, instead of committing for every operation.
//FIXME: find a way to check if the node is the last in the cluster to avoid removing the only member and getting "last node error"
if (hasJoinedCluster()) {
ScriptHelper leaveClusterScript = newScript("leaveCluster")
- .body.append(sudo(format("%s cluster leave", getRiakAdminCmd())))
+ .body.append(sudo(format("%s cluster leave %s", getRiakAdminCmd(), nodeName)))
.body.append(sudo(format("%s cluster plan", getRiakAdminCmd())))
.body.append(sudo(format("%s cluster commit", getRiakAdminCmd())))
.failOnNonZeroResultCode();
- if (!isRiakOnPath) {
+ if (!isRiakOnPath()) {
Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
leaveClusterScript.environmentVariablesReset(newPathVariable);
@@ -422,7 +437,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
.body.append(sudo(format("%s cluster commit", getRiakAdminCmd())))
.failOnNonZeroResultCode();
- if (!isRiakOnPath) {
+ if (!isRiakOnPath()) {
Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
commitClusterScript.environmentVariablesReset(newPathVariable);
@@ -436,7 +451,6 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
@Override
public void recoverFailedNode(String nodeName) {
-
//TODO find ways to detect a faulty/failed node
//argument passed 'node' is any working node in the riak cluster
//following the instruction from: http://docs.basho.com/riak/latest/ops/running/recovery/failed-node/
@@ -446,10 +460,10 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
String stopCommand = format("%s stop", getRiakCmd());
- stopCommand = isPackageInstall ? sudo(stopCommand) : stopCommand;
+ stopCommand = isPackageInstall() ? sudo(stopCommand) : stopCommand;
- String startCommand = format("%s start >/dev/null 2>&1 < /dev/null &", getRiakCmd());
- startCommand = isPackageInstall ? sudo(startCommand) : startCommand;
+ String startCommand = format("%s start > /dev/null 2>&1 < /dev/null &", getRiakCmd());
+ startCommand = isPackageInstall() ? sudo(startCommand) : startCommand;
ScriptHelper recoverNodeScript = newScript("recoverNode")
.body.append(stopCommand)
@@ -461,7 +475,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
.body.append(sudo(format("%s cluster commit", getRiakAdminCmd())))
.failOnNonZeroResultCode();
- if (!isRiakOnPath) {
+ if (!isRiakOnPath()) {
Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
recoverNodeScript.environmentVariablesReset(newPathVariable);
@@ -475,13 +489,14 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
}
private Boolean hasJoinedCluster() {
- return ((RiakNode) entity).hasJoinedCluster();
+ return Boolean.TRUE.equals(entity.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER));
}
- protected boolean isRiakOnPath() {
- return (newScript("riakOnPath")
+ protected void checkRiakOnPath() {
+ boolean riakOnPath = newScript("riakOnPath")
.body.append("which riak")
- .execute() == 0);
+ .execute() == 0;
+ entity.setAttribute(RiakNode.RIAK_ON_PATH, riakOnPath);
}
private String getRiakName() {
@@ -490,7 +505,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
private String getRingStateDir() {
//TODO: check for non-package install.
- return isPackageInstall ? "/var/lib/riak/ring" : Urls.mergePaths(getExpandedInstallDir(), "lib/ring");
+ return isPackageInstall() ? "/var/lib/riak/ring" : Urls.mergePaths(getExpandedInstallDir(), "lib/ring");
}
protected boolean isVersion1() {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
index ca0dd05..3f9e7d9 100644
--- a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
+++ b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
@@ -26,11 +26,8 @@ import brooklyn.entity.AbstractEc2LiveTest;
import brooklyn.entity.basic.Attributes;
import brooklyn.entity.proxying.EntitySpec;
import brooklyn.location.Location;
-import brooklyn.test.Asserts;
import brooklyn.test.EntityTestUtils;
-import com.google.common.base.Predicates;
-import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@@ -60,11 +57,6 @@ public class RiakClusterEc2LiveTest extends AbstractEc2LiveTest {
for (final RiakNode node : nodes) {
EntityTestUtils.assertAttributeEqualsEventually(node, RiakNode.SERVICE_UP, true);
EntityTestUtils.assertAttributeEqualsEventually(node, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true);
- Asserts.eventually(new Supplier<Boolean>() {
- @Override public Boolean get() {
- return node.hasJoinedCluster();
- }
- }, Predicates.alwaysTrue());
}
}
[2/2] incubator-brooklyn git commit: This closes #562 and #563
Posted by gr...@apache.org.
This closes #562 and #563
* github/pr/563:
Updates to Riak for Clocker
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/e08d3208
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/e08d3208
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/e08d3208
Branch: refs/heads/master
Commit: e08d3208b75ab08e766b1b4c1a9eaf94833ecd4d
Parents: 83b27b3 d738df9
Author: Andrew Kennedy <gr...@apache.org>
Authored: Fri Mar 20 19:28:00 2015 +0000
Committer: Andrew Kennedy <gr...@apache.org>
Committed: Fri Mar 20 19:28:00 2015 +0000
----------------------------------------------------------------------
.../entity/nosql/riak/RiakClusterImpl.java | 67 +++++-------
.../brooklyn/entity/nosql/riak/RiakNode.java | 49 +++++----
.../entity/nosql/riak/RiakNodeDriver.java | 2 +-
.../entity/nosql/riak/RiakNodeImpl.java | 9 +-
.../entity/nosql/riak/RiakNodeSshDriver.java | 103 +++++++++++--------
.../nosql/riak/RiakClusterEc2LiveTest.java | 8 --
6 files changed, 113 insertions(+), 125 deletions(-)
----------------------------------------------------------------------