You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by gr...@apache.org on 2015/03/20 20:28:21 UTC

[1/2] incubator-brooklyn git commit: Updates to Riak for Clocker

Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master 83b27b320 -> e08d3208b


Updates to Riak for Clocker


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/d738df9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/d738df9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/d738df9a

Branch: refs/heads/master
Commit: d738df9a90ef73e5bece33fef97584b5d5d7dd4c
Parents: 83b27b3
Author: Andrew Kennedy <gr...@apache.org>
Authored: Fri Mar 20 16:05:37 2015 +0000
Committer: Andrew Kennedy <gr...@apache.org>
Committed: Fri Mar 20 18:37:43 2015 +0000

----------------------------------------------------------------------
 .../entity/nosql/riak/RiakClusterImpl.java      |  67 +++++-------
 .../brooklyn/entity/nosql/riak/RiakNode.java    |  49 +++++----
 .../entity/nosql/riak/RiakNodeDriver.java       |   2 +-
 .../entity/nosql/riak/RiakNodeImpl.java         |   9 +-
 .../entity/nosql/riak/RiakNodeSshDriver.java    | 103 +++++++++++--------
 .../nosql/riak/RiakClusterEc2LiveTest.java      |   8 --
 6 files changed, 113 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
index ea1f1b9..079eada 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
@@ -24,8 +24,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import javax.annotation.Nullable;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +31,7 @@ import brooklyn.entity.Entity;
 import brooklyn.entity.basic.Attributes;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.EntityPredicates;
 import brooklyn.entity.basic.Lifecycle;
 import brooklyn.entity.basic.ServiceStateLogic;
 import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
@@ -47,7 +46,7 @@ import brooklyn.util.time.Time;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -73,14 +72,10 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
         Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER));
 
         //FIXME: add a quorum to tolerate failed nodes before setting on fire.
-        Optional<Entity> anyNode = Iterables.tryFind(getMembers(), new Predicate<Entity>() {
-
-            @Override
-            public boolean apply(@Nullable Entity entity) {
-                return (entity instanceof RiakNode && hasMemberJoinedCluster(entity) && entity.getAttribute(RiakNode.SERVICE_UP));
-            }
-        });
-
+        Optional<Entity> anyNode = Iterables.tryFind(getMembers(), Predicates.and(
+                Predicates.instanceOf(RiakNode.class),
+                EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true),
+                EntityPredicates.attributeEqualTo(RiakNode.SERVICE_UP, true)));
         if (anyNode.isPresent()) {
             log.info("Planning and Committing cluster changes on node: {}, cluster: {}", anyNode.get().getId(), getId());
             Entities.invokeEffector(this, anyNode.get(), RiakNode.COMMIT_RIAK_CLUSTER).blockUntilEnded();
@@ -103,7 +98,7 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
                 .configure("group", this));
     }
 
-    protected void onServerPoolMemberChanged(Entity member) {
+    protected void onServerPoolMemberChanged(final Entity member) {
         synchronized (mutex) {
             log.trace("For {}, considering membership of {} which is in locations {}", new Object[]{ this, member, member.getLocations() });
 
@@ -132,50 +127,40 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
                 } else {
                     // TODO: be wary of erroneous nodes but are still flagged 'in cluster'
                     // add the new node to be part of the riak cluster.
-                    Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() {
-                        @Override
-                        public boolean apply(@Nullable Entity node) {
-                            return (node instanceof RiakNode && hasMemberJoinedCluster(node));
-                        }
-                    });
-
+                    Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and(
+                            Predicates.instanceOf(RiakNode.class),
+                            EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true)));
                     if (anyNodeInCluster.isPresent()) {
-                        if (!nodes.containsKey(member) && !hasMemberJoinedCluster(member)) {
+                        if (!nodes.containsKey(member) && member.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER) == null) {
                             String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME);
                             Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName);
                             if (getAttribute(IS_CLUSTER_INIT)) {
-                                Entities.invokeEffector(RiakClusterImpl.this, anyNodeInCluster.get(), RiakNode.COMMIT_RIAK_CLUSTER);
+                                Entities.invokeEffector(RiakClusterImpl.this, member, RiakNode.COMMIT_RIAK_CLUSTER);
                             }
                             nodes.put(member, riakName);
                             setAttribute(RIAK_CLUSTER_NODES, nodes);
-                            log.info("Adding riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) });
+                            log.info("Added Riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) });
                         }
                     } else {
-                        log.error("isFirstNodeSet , but no cluster members found to add {}", member.getId());
+                        log.error("isFirstNodeSet, but no cluster members found to add {}", member.getId());
                     }
                 }
             } else {
                 if (nodes != null && nodes.containsKey(member)) {
-                    final Entity memberToBeRemoved = member;
-
-                    Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() {
-                        @Override
-                        public boolean apply(@Nullable Entity node) {
-                            return (node instanceof RiakNode && hasMemberJoinedCluster(node) && !node.equals(memberToBeRemoved));
-                        }
-                    });
+                    Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and(
+                            Predicates.instanceOf(RiakNode.class),
+                            EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true),
+                            Predicates.not(Predicates.equalTo(member))));
                     if (anyNodeInCluster.isPresent()) {
-                        Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.LEAVE_RIAK_CLUSTER, getRiakName(memberToBeRemoved));
+                        Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.LEAVE_RIAK_CLUSTER, getRiakName(member));
                     }
-
                     nodes.remove(member);
                     setAttribute(RIAK_CLUSTER_NODES, nodes);
-                    log.info("Removing riak node {}: {}; {} from cluster", new Object[]{this, member, getRiakName(member)});
+                    log.info("Removed Riak node {}: {}; {} from cluster", new Object[]{ this, member, getRiakName(member) });
                 }
             }
 
             ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyMap(this, RIAK_CLUSTER_NODES);
-            if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", this, member);
 
             calculateClusterAddresses();
         }
@@ -194,16 +179,14 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
 
     protected boolean belongsInServerPool(Entity member) {
         if (!groovyTruth(member.getAttribute(Startable.SERVICE_UP))) {
-            if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, eliminating because not up", this, member);
+            log.trace("Members of {}, checking {}, eliminating because not up", this, member);
             return false;
         }
         if (!getMembers().contains(member)) {
-            if (log.isTraceEnabled())
-                log.trace("Members of {}, checking {}, eliminating because not member", this, member);
-
+            log.trace("Members of {}, checking {}, eliminating because not member", this, member);
             return false;
         }
-        if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, approving", this, member);
+        log.trace("Members of {}, checking {}, approving", this, member);
 
         return true;
     }
@@ -212,10 +195,6 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
         return node.getAttribute(RiakNode.RIAK_NODE_NAME);
     }
 
-    private boolean hasMemberJoinedCluster(Entity member) {
-        return ((RiakNode) member).hasJoinedCluster();
-    }
-
     public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
         @Override
         protected void onEntityEvent(EventType type, Entity entity) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
index ef9556d..c1f1bf8 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
@@ -20,11 +20,6 @@ package brooklyn.entity.nosql.riak;
 
 import java.util.List;
 
-import brooklyn.entity.basic.Attributes;
-import brooklyn.event.basic.AttributeSensorAndConfigKey;
-import brooklyn.event.basic.TemplatedStringAttributeSensorAndConfigKey;
-import com.google.common.reflect.TypeToken;
-
 import brooklyn.catalog.Catalog;
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.annotation.Effector;
@@ -34,10 +29,14 @@ import brooklyn.entity.basic.MethodEffector;
 import brooklyn.entity.basic.SoftwareProcess;
 import brooklyn.entity.proxying.ImplementedBy;
 import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.AttributeSensorAndConfigKey;
 import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
 import brooklyn.event.basic.Sensors;
+import brooklyn.event.basic.TemplatedStringAttributeSensorAndConfigKey;
 import brooklyn.util.flags.SetFromFlag;
 
+import com.google.common.reflect.TypeToken;
+
 @Catalog(name="Riak Node", description="Riak is a distributed NoSQL key-value data store that offers "
         + "extremely high availability, fault tolerance, operational simplicity and scalability.")
 @ImplementedBy(RiakNodeImpl.class)
@@ -45,8 +44,10 @@ public interface RiakNode extends SoftwareProcess {
 
     @SetFromFlag("version")
     ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION,
-            "Version to install. Example 2.0.2, 2.0.5",
-            "2.0.5");
+            "Version to install (Default 2.0.5)", "2.0.5");
+
+    @SetFromFlag("optimizeNetworking")
+    ConfigKey<Boolean> OPTIMIZE_HOST_NETWORKING  = ConfigKeys.newBooleanConfigKey("riak.networking.optimize", "Optimize host networking when running in a VM", Boolean.TRUE);
 
     // vm.args and app.config are used for pre-version 2.0.0. Later versions use the (simplified) riak.conf
     // see https://github.com/joedevivo/ricon/blob/master/cuttlefish.md
@@ -94,14 +95,19 @@ public interface RiakNode extends SoftwareProcess {
     @SetFromFlag("riakWebPort")
     PortAttributeSensorAndConfigKey RIAK_WEB_PORT = new PortAttributeSensorAndConfigKey("riak.webPort", "Riak Web Port", "8098+");
 
-    @SetFromFlag("riakNodeHasJoinedCluster")
+    @SetFromFlag("riakPbPort")
+    PortAttributeSensorAndConfigKey RIAK_PB_PORT = new PortAttributeSensorAndConfigKey("riak.pbPort", "Riak Protocol Buffers Port", "8087+");
+
+    AttributeSensor<Boolean> RIAK_PACKAGE_INSTALL = Sensors.newBooleanSensor(
+            "riak.install.package", "Flag to indicate whether Riak was installed using an OS package");
+    AttributeSensor<Boolean> RIAK_ON_PATH = Sensors.newBooleanSensor(
+            "riak.install.onPath", "Flag to indicate whether Riak is available on the PATH");
+
     AttributeSensor<Boolean> RIAK_NODE_HAS_JOINED_CLUSTER = Sensors.newBooleanSensor(
-            "riak.node.riakNodeHasJoinedCluster", "Flag to indicate wether the Riak node has joined a cluster member");
+            "riak.node.riakNodeHasJoinedCluster", "Flag to indicate whether the Riak node has joined a cluster member");
 
-    @SetFromFlag("riakNodeName")
     AttributeSensor<String> RIAK_NODE_NAME = Sensors.newStringSensor("riak.node", "Returns the riak node name as defined in vm.args");
-    @SetFromFlag("riakPbPort")
-    PortAttributeSensorAndConfigKey RIAK_PB_PORT = new PortAttributeSensorAndConfigKey("riak.pbPort", "Riak Protocol Buffers Port", "8087+");
+
     // these needed for nodes to talk to each other, but not clients (so ideally set up in the security group for internal access)
     PortAttributeSensorAndConfigKey HANDOFF_LISTENER_PORT = new PortAttributeSensorAndConfigKey("riak.handoffListenerPort", "Handoff Listener Port", "8099+");
     PortAttributeSensorAndConfigKey EPMD_LISTENER_PORT = new PortAttributeSensorAndConfigKey("riak.epmdListenerPort", "Erlang Port Mapper Daemon Listener Port", "4369");
@@ -109,6 +115,7 @@ public interface RiakNode extends SoftwareProcess {
     PortAttributeSensorAndConfigKey ERLANG_PORT_RANGE_END = new PortAttributeSensorAndConfigKey("riak.erlangPortRangeEnd", "Erlang Port Range End", "7999+");
     PortAttributeSensorAndConfigKey SEARCH_SOLR_PORT = new PortAttributeSensorAndConfigKey("riak.search.solr.port", "Solr port", "8093+");
     PortAttributeSensorAndConfigKey SEARCH_SOLR_JMX_PORT = new PortAttributeSensorAndConfigKey("riak.search.solr.jmx_port", "Solr port", "8985+");
+
     AttributeSensor<Integer> NODE_GETS = Sensors.newIntegerSensor("node.gets");
     AttributeSensor<Integer> NODE_GETS_TOTAL = Sensors.newIntegerSensor("node.gets.total");
     AttributeSensor<Integer> NODE_PUTS = Sensors.newIntegerSensor("node.puts");
@@ -130,9 +137,10 @@ public interface RiakNode extends SoftwareProcess {
     @SuppressWarnings("serial")
     AttributeSensor<List<String>> RING_MEMBERS = Sensors.newSensor(new TypeToken<List<String>>() {},
             "ring.members", "all the riak nodes in the ring");
-    public static final MethodEffector<Void> JOIN_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "joinCluster");
-    public static final MethodEffector<Void> LEAVE_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "leaveCluster");
-    public static final MethodEffector<Void> COMMIT_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "commitCluster");
+
+    MethodEffector<Void> JOIN_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "joinCluster");
+    MethodEffector<Void> LEAVE_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "leaveCluster");
+    MethodEffector<Void> COMMIT_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "commitCluster");
 
     // accessors, for use from template file
     Integer getRiakWebPort();
@@ -157,17 +165,16 @@ public interface RiakNode extends SoftwareProcess {
 
     String getOsMajorVersion();
 
-    @Effector(description = "add this riak node to the riak cluster")
+    @Effector(description = "Add this riak node to the Riak cluster")
     public void joinCluster(@EffectorParam(name = "nodeName") String nodeName);
 
-    @Effector(description = "remove this riak node from the cluster")
-    public void leaveCluster();
+    @Effector(description = "Remove this Riak node from the cluster")
+    public void leaveCluster(@EffectorParam(name = "nodeName") String nodeName);
 
-    @Effector(description = "recover a failed riak node and join it back to the cluster (by passing it a working node on the cluster 'node')")
+    @Effector(description = "Recover a failed Riak node and join it back to the cluster (by passing it a working node on the cluster 'node')")
     public void recoverFailedNode(@EffectorParam(name = "nodeName") String nodeName);
 
-    @Effector(description = "commit changes made to a Riak cluster")
+    @Effector(description = "Commit changes made to a Riak cluster")
     public void commitCluster();
 
-    public boolean hasJoinedCluster();
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
index 3669c1c..b81b7fc 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
@@ -26,7 +26,7 @@ public interface RiakNodeDriver extends SoftwareProcessDriver {
 
     public void joinCluster(String nodeName);
 
-    public void leaveCluster();
+    public void leaveCluster(String nodeName);
 
     public void recoverFailedNode(String nodeName);
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
index 73bb272..0667a7a 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
@@ -181,8 +181,8 @@ public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode {
     }
 
     @Override
-    public void leaveCluster() {
-        getDriver().leaveCluster();
+    public void leaveCluster(String nodeName) {
+        getDriver().leaveCluster(nodeName);
     }
 
     @Override
@@ -191,11 +191,6 @@ public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode {
     }
 
     @Override
-    public boolean hasJoinedCluster() {
-        return Boolean.TRUE.equals(getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER));
-    }
-
-    @Override
     public void recoverFailedNode(String nodeName) {
         getDriver().recoverFailedNode(nodeName);
     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
index 00e304f..544c39a 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import brooklyn.util.ssh.BashCommands;
 import brooklyn.util.task.ssh.SshTasks;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +44,7 @@ import brooklyn.util.task.DynamicTasks;
 import brooklyn.util.text.Strings;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -53,8 +55,6 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
     private static final Logger LOG = LoggerFactory.getLogger(RiakNodeSshDriver.class);
     private static final String sbinPath = "$PATH:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin";
     private static final String INSTALLING_FALLBACK = INSTALLING + "_fallback";
-    private boolean isPackageInstall = true;
-    private boolean isRiakOnPath = true;
 
     public RiakNodeSshDriver(final RiakNodeImpl entity, final SshMachineLocation machine) {
         super(entity, machine);
@@ -97,8 +97,9 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
             } else {
                 commands.addAll(installFromPackageCloud());
             }
+            entity.setAttribute(RiakNode.RIAK_PACKAGE_INSTALL, true);
         } else if (osDetails.isMac()) {
-            isPackageInstall = false;
+            entity.setAttribute(RiakNode.RIAK_PACKAGE_INSTALL, false);
             commands.addAll(installMac());
         } else if (osDetails.isWindows()) {
             throw new UnsupportedOperationException("RiakNode not supported on Windows instances");
@@ -123,6 +124,8 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
                         .execute();
             }
         }
+
+        checkRiakOnPath();
     }
 
     private List<String> installLinuxFromPackageUrl(String expandedInstallDir) {
@@ -217,8 +220,6 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
         //create entity's runDir
         newScript(CUSTOMIZING).execute();
 
-        isRiakOnPath = isPackageInstall ? isRiakOnPath() : true;
-
         OsDetails osDetails = getMachine().getMachineDetails().getOsDetails();
 
         List<String> commands = Lists.newLinkedList();
@@ -253,7 +254,21 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
             commands.add(sudo("chown -R riak:riak " + getRiakEtcDir()));
         }
 
-        if(osDetails.isLinux()) {
+        // TODO platform_*_dir
+        // TODO riak config log
+
+        ScriptHelper customizeScript = newScript(CUSTOMIZING)
+                .failOnNonZeroResultCode()
+                .body.append(commands);
+
+        if (!isRiakOnPath()) {
+            Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
+            log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
+            customizeScript.environmentVariablesReset(newPathVariable);
+        }
+        customizeScript.failOnNonZeroResultCode().execute();
+
+        if (osDetails.isLinux()) {
             ImmutableMap<String, String> sysctl = ImmutableMap.<String, String>builder()
                     .put("vm.swappiness", "0")
                     .put("net.core.somaxconn", "40000")
@@ -266,23 +281,14 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
                     .put("net.ipv4.tcp_moderate_rcvbuf", "1")
                     .build();
 
-            // TODO platform_*_dir
-            // TODO riak config log
+            ScriptHelper optimize = newScript(CUSTOMIZING + "network")
+                .body.append(sudo("sysctl " + Joiner.on(' ').withKeyValueSeparator("=").join(sysctl)));
 
-            commands.add( sudo("sysctl " + Joiner.on(' ').withKeyValueSeparator("=").join(sysctl)));
+            Optional<Boolean> enable = Optional.fromNullable(entity.getConfig(RiakNode.OPTIMIZE_HOST_NETWORKING));
+            if (!enable.isPresent()) optimize.inessential();
+            if (enable.or(true)) optimize.execute();
         }
 
-        ScriptHelper customizeScript = newScript(CUSTOMIZING)
-                .failOnNonZeroResultCode()
-                .body.append(commands);
-
-        if (!isRiakOnPath) {
-            Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
-            log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
-            customizeScript.environmentVariablesReset(newPathVariable);
-        }
-        customizeScript.failOnNonZeroResultCode().execute();
-
         //set the riak node name
         entity.setAttribute(RiakNode.RIAK_NODE_NAME, format("riak@%s", getSubnetHostname()));
     }
@@ -290,7 +296,8 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
     @Override
     public void launch() {
         List<String> commands = Lists.newLinkedList();
-        if (isPackageInstall) {
+
+        if (isPackageInstall()) {
             commands.add(addSbinPathCommand());
             commands.add(sudo("service riak start"));
         } else {
@@ -303,7 +310,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
         ScriptHelper launchScript = newScript(LAUNCHING)
                 .body.append(commands);
 
-        if (!isRiakOnPath) {
+        if (!isRiakOnPath()) {
             Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
             log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
             launchScript.environmentVariablesReset(newPathVariable);
@@ -313,15 +320,15 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
 
     @Override
     public void stop() {
-        leaveCluster();
+        leaveCluster("");
 
         String command = format("%s stop", getRiakCmd());
-        command = isPackageInstall ? sudo(command) : command;
+        command = isPackageInstall() ? sudo(command) : command;
 
         ScriptHelper stopScript = newScript(ImmutableMap.of(USE_PID_FILE, false), STOPPING)
                 .body.append(command);
 
-        if (!isRiakOnPath) {
+        if (!isRiakOnPath()) {
             Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
             log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
             stopScript.environmentVariablesReset(newPathVariable);
@@ -339,7 +346,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
         ScriptHelper checkRunningScript = newScript(CHECK_RUNNING)
                 .body.append(sudo(format("%s ping", getRiakCmd())));
 
-        if (!isRiakOnPath) {
+        if (!isRiakOnPath()) {
             Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
             log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
             checkRunningScript.environmentVariablesReset(newPathVariable);
@@ -347,16 +354,24 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
         return (checkRunningScript.execute() == 0);
     }
 
+    public boolean isPackageInstall() {
+        return entity.getAttribute(RiakNode.RIAK_PACKAGE_INSTALL);
+    }
+
+    public boolean isRiakOnPath() {
+        return entity.getAttribute(RiakNode.RIAK_ON_PATH);
+    }
+
     public String getRiakEtcDir() {
-        return isPackageInstall ? "/etc/riak" : Urls.mergePaths(getExpandedInstallDir(), "etc");
+        return isPackageInstall() ? "/etc/riak" : Urls.mergePaths(getExpandedInstallDir(), "etc");
     }
 
     protected String getRiakCmd() {
-        return isPackageInstall ? "riak" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak");
+        return isPackageInstall() ? "riak" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak");
     }
 
     protected String getRiakAdminCmd() {
-        return isPackageInstall ? "riak-admin" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak-admin");
+        return isPackageInstall() ? "riak-admin" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak-admin");
     }
 
     @Override
@@ -372,7 +387,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
                         .body.append(sudo(format("%s cluster join %s", getRiakAdminCmd(), nodeName)))
                         .failOnNonZeroResultCode();
 
-                if (!isRiakOnPath) {
+                if (!isRiakOnPath()) {
                     Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
                     log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
                     joinClusterScript.environmentVariablesReset(newPathVariable);
@@ -388,19 +403,19 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
     }
 
     @Override
-    public void leaveCluster() {
+    public void leaveCluster(String nodeName) {
         //TODO: add 'riak-admin cluster force-remove' for erroneous and unrecoverable nodes.
         //FIXME: find a way to batch commit the changes, instead of committing for every operation.
         //FIXME: find a way to check if the node is the last in the cluster to avoid removing the only member and getting "last node error"
 
         if (hasJoinedCluster()) {
             ScriptHelper leaveClusterScript = newScript("leaveCluster")
-                    .body.append(sudo(format("%s cluster leave", getRiakAdminCmd())))
+                    .body.append(sudo(format("%s cluster leave %s", getRiakAdminCmd(), nodeName)))
                     .body.append(sudo(format("%s cluster plan", getRiakAdminCmd())))
                     .body.append(sudo(format("%s cluster commit", getRiakAdminCmd())))
                     .failOnNonZeroResultCode();
 
-            if (!isRiakOnPath) {
+            if (!isRiakOnPath()) {
                 Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
                 log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
                 leaveClusterScript.environmentVariablesReset(newPathVariable);
@@ -422,7 +437,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
                     .body.append(sudo(format("%s cluster commit", getRiakAdminCmd())))
                     .failOnNonZeroResultCode();
 
-            if (!isRiakOnPath) {
+            if (!isRiakOnPath()) {
                 Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
                 log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
                 commitClusterScript.environmentVariablesReset(newPathVariable);
@@ -436,7 +451,6 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
 
     @Override
     public void recoverFailedNode(String nodeName) {
-
         //TODO find ways to detect a faulty/failed node
         //argument passed 'node' is any working node in the riak cluster
         //following the instruction from: http://docs.basho.com/riak/latest/ops/running/recovery/failed-node/
@@ -446,10 +460,10 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
 
 
             String stopCommand = format("%s stop", getRiakCmd());
-            stopCommand = isPackageInstall ? sudo(stopCommand) : stopCommand;
+            stopCommand = isPackageInstall() ? sudo(stopCommand) : stopCommand;
 
-            String startCommand = format("%s start >/dev/null 2>&1 < /dev/null &", getRiakCmd());
-            startCommand = isPackageInstall ? sudo(startCommand) : startCommand;
+            String startCommand = format("%s start > /dev/null 2>&1 < /dev/null &", getRiakCmd());
+            startCommand = isPackageInstall() ? sudo(startCommand) : startCommand;
 
             ScriptHelper recoverNodeScript = newScript("recoverNode")
                     .body.append(stopCommand)
@@ -461,7 +475,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
                     .body.append(sudo(format("%s cluster commit", getRiakAdminCmd())))
                     .failOnNonZeroResultCode();
 
-            if (!isRiakOnPath) {
+            if (!isRiakOnPath()) {
                 Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
                 log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
                 recoverNodeScript.environmentVariablesReset(newPathVariable);
@@ -475,13 +489,14 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
     }
 
     private Boolean hasJoinedCluster() {
-        return ((RiakNode) entity).hasJoinedCluster();
+        return Boolean.TRUE.equals(entity.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER));
     }
 
-    protected boolean isRiakOnPath() {
-        return (newScript("riakOnPath")
+    protected void checkRiakOnPath() {
+        boolean riakOnPath = newScript("riakOnPath")
                 .body.append("which riak")
-                .execute() == 0);
+                .execute() == 0;
+        entity.setAttribute(RiakNode.RIAK_ON_PATH, riakOnPath);
     }
 
     private String getRiakName() {
@@ -490,7 +505,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen
 
     private String getRingStateDir() {
         //TODO: check for non-package install.
-        return isPackageInstall ? "/var/lib/riak/ring" : Urls.mergePaths(getExpandedInstallDir(), "lib/ring");
+        return isPackageInstall() ? "/var/lib/riak/ring" : Urls.mergePaths(getExpandedInstallDir(), "lib/ring");
     }
 
     protected boolean isVersion1() {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
index ca0dd05..3f9e7d9 100644
--- a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
+++ b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
@@ -26,11 +26,8 @@ import brooklyn.entity.AbstractEc2LiveTest;
 import brooklyn.entity.basic.Attributes;
 import brooklyn.entity.proxying.EntitySpec;
 import brooklyn.location.Location;
-import brooklyn.test.Asserts;
 import brooklyn.test.EntityTestUtils;
 
-import com.google.common.base.Predicates;
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
@@ -60,11 +57,6 @@ public class RiakClusterEc2LiveTest extends AbstractEc2LiveTest {
         for (final RiakNode node : nodes) {
             EntityTestUtils.assertAttributeEqualsEventually(node, RiakNode.SERVICE_UP, true);
             EntityTestUtils.assertAttributeEqualsEventually(node, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true);
-            Asserts.eventually(new Supplier<Boolean>() {
-                @Override public Boolean get() {
-                    return node.hasJoinedCluster();
-                }
-            }, Predicates.alwaysTrue());
         }
     }
 


[2/2] incubator-brooklyn git commit: This closes #562 and #563

Posted by gr...@apache.org.
This closes #562 and #563

* github/pr/563:
  Updates to Riak for Clocker


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/e08d3208
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/e08d3208
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/e08d3208

Branch: refs/heads/master
Commit: e08d3208b75ab08e766b1b4c1a9eaf94833ecd4d
Parents: 83b27b3 d738df9
Author: Andrew Kennedy <gr...@apache.org>
Authored: Fri Mar 20 19:28:00 2015 +0000
Committer: Andrew Kennedy <gr...@apache.org>
Committed: Fri Mar 20 19:28:00 2015 +0000

----------------------------------------------------------------------
 .../entity/nosql/riak/RiakClusterImpl.java      |  67 +++++-------
 .../brooklyn/entity/nosql/riak/RiakNode.java    |  49 +++++----
 .../entity/nosql/riak/RiakNodeDriver.java       |   2 +-
 .../entity/nosql/riak/RiakNodeImpl.java         |   9 +-
 .../entity/nosql/riak/RiakNodeSshDriver.java    | 103 +++++++++++--------
 .../nosql/riak/RiakClusterEc2LiveTest.java      |   8 --
 6 files changed, 113 insertions(+), 125 deletions(-)
----------------------------------------------------------------------