You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:47:51 UTC

[15/51] [abbrv] [partial] brooklyn-library git commit: move subdir from incubator up a level as it is promoted to its own repo (first non-incubator commit!)

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
deleted file mode 100644
index 4396ed1..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
+++ /dev/null
@@ -1,597 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.couchbase;
-
-import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nonnull;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.policy.PolicySpec;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.core.config.render.RendererHints;
-import org.apache.brooklyn.core.effector.Effectors;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.core.location.access.BrooklynAccessUtils;
-import org.apache.brooklyn.core.sensor.DependentConfiguration;
-import org.apache.brooklyn.enricher.stock.Enrichers;
-import org.apache.brooklyn.entity.group.AbstractMembershipTrackingPolicy;
-import org.apache.brooklyn.entity.group.DynamicClusterImpl;
-import org.apache.brooklyn.entity.software.base.SoftwareProcess;
-import org.apache.brooklyn.feed.http.HttpFeed;
-import org.apache.brooklyn.feed.http.HttpPollConfig;
-import org.apache.brooklyn.feed.http.HttpValueFunctions;
-import org.apache.brooklyn.feed.http.JsonFunctions;
-import org.apache.brooklyn.util.http.HttpToolResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.util.collections.CollectionFunctionals;
-import org.apache.brooklyn.util.collections.MutableSet;
-import org.apache.brooklyn.util.collections.QuorumCheck;
-import org.apache.brooklyn.util.core.task.DynamicTasks;
-import org.apache.brooklyn.util.core.task.TaskBuilder;
-import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.brooklyn.util.guava.Functionals;
-import org.apache.brooklyn.util.guava.IfFunctions;
-import org.apache.brooklyn.util.math.MathPredicates;
-import org.apache.brooklyn.util.text.ByteSizeStrings;
-import org.apache.brooklyn.util.text.StringFunctions;
-import org.apache.brooklyn.util.text.Strings;
-import org.apache.brooklyn.util.time.Duration;
-import org.apache.brooklyn.util.time.Time;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.net.HostAndPort;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-
-public class CouchbaseClusterImpl extends DynamicClusterImpl implements CouchbaseCluster {
-    
-    /*
-     * Refactoring required:
-     * 
-     * Currently, on start() the cluster waits for an arbitrary SERVICE_UP_TIME_OUT (3 minutes) before assuming that a quorate 
-     * number of servers are available. The servers are then added to the cluster, and a further wait period of  
-     * DELAY_BEFORE_ADVERTISING_CLUSTER (30 seconds) is used before advertising the cluster
-     * 
-     * DELAY_BEFORE_ADVERTISING_CLUSTER: It should be possible to refactor this away by adding a repeater that will poll
-     * the REST API of the primary node (once established) until the API indicates that the cluster is available
-     * 
-     * SERVICE_UP_TIME_OUT: The refactoring of this would be more substantial. One method would be to remove the bulk of the 
-     * logic from the start() method, and rely entirely on the membership tracking policy and the onServerPoolMemberChanged()
-     * method. The addition of a RUNNING sensor on the nodes would allow the cluster to determine that a node is up and
-     * running but has not yet been added to the cluster. The IS_CLUSTER_INITIALIZED key could be used to determine whether
-     * or not the cluster should be initialized, or a node simply added to an existing cluster. A repeater could be used
-     * in the driver's to ensure that the method does not return until the node has been fully added
-     * 
-     * There is an (incomplete) first-pass at this here: https://github.com/Nakomis/incubator-brooklyn/compare/couchbase-running-sensor
-     * however, there have been significant changes to the cluster initialization since that work was done so it will probably
-     * need to be re-done
-     * 
-     * Additionally, during bucket creation, a HttpPoll is used to check that the bucket has been created. This should be 
-     * refactored to use a Repeater in CouchbaseNodeSshDriver.bucketCreate() in a similar way to the one employed in
-     * CouchbaseNodeSshDriver.rebalance(). Were this done, this class could simply queue the bucket creation tasks
-     * 
-     */
-    
-    private static final Logger log = LoggerFactory.getLogger(CouchbaseClusterImpl.class);
-    private final Object mutex = new Object[0];
-    // Used to serialize bucket creation as only one bucket can be created at a time,
-    // so a feed is used to determine when a bucket has finished being created
-    private final AtomicReference<HttpFeed> resetBucketCreation = new AtomicReference<HttpFeed>();
-
-    public void init() {
-        log.info("Initializing the Couchbase cluster...");
-        super.init();
-        
-        enrichers().add(
-            Enrichers.builder()
-                .transforming(COUCHBASE_CLUSTER_UP_NODES)
-                .from(this)
-                .publishing(COUCHBASE_CLUSTER_UP_NODE_ADDRESSES)
-                .computing(new ListOfHostAndPort()).build() );
-        enrichers().add(
-            Enrichers.builder()
-                .transforming(COUCHBASE_CLUSTER_UP_NODE_ADDRESSES)
-                .from(this)
-                .publishing(COUCHBASE_CLUSTER_CONNECTION_URL)
-                .computing(
-                    IfFunctions.<List<String>>ifPredicate(
-                        Predicates.compose(MathPredicates.lessThan(getConfig(CouchbaseCluster.INITIAL_QUORUM_SIZE)), 
-                            CollectionFunctionals.sizeFunction(0)) )
-                    .value((String)null)
-                    .defaultApply(
-                        Functionals.chain(
-                            CollectionFunctionals.<String,List<String>>limit(4), 
-                            StringFunctions.joiner(","),
-                            StringFunctions.formatter("http://%s/"))) )
-                .build() );
-        
-        Map<? extends AttributeSensor<? extends Number>, ? extends AttributeSensor<? extends Number>> enricherSetup = 
-            ImmutableMap.<AttributeSensor<? extends Number>, AttributeSensor<? extends Number>>builder()
-                .put(CouchbaseNode.OPS, CouchbaseCluster.OPS_PER_NODE)
-                .put(CouchbaseNode.COUCH_DOCS_DATA_SIZE, CouchbaseCluster.COUCH_DOCS_DATA_SIZE_PER_NODE)
-                .put(CouchbaseNode.COUCH_DOCS_ACTUAL_DISK_SIZE, CouchbaseCluster.COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE)
-                .put(CouchbaseNode.EP_BG_FETCHED, CouchbaseCluster.EP_BG_FETCHED_PER_NODE)
-                .put(CouchbaseNode.MEM_USED, CouchbaseCluster.MEM_USED_PER_NODE)
-                .put(CouchbaseNode.COUCH_VIEWS_ACTUAL_DISK_SIZE, CouchbaseCluster.COUCH_VIEWS_ACTUAL_DISK_SIZE_PER_NODE)
-                .put(CouchbaseNode.CURR_ITEMS, CouchbaseCluster.CURR_ITEMS_PER_NODE)
-                .put(CouchbaseNode.VB_REPLICA_CURR_ITEMS, CouchbaseCluster.VB_REPLICA_CURR_ITEMS_PER_NODE)
-                .put(CouchbaseNode.COUCH_VIEWS_DATA_SIZE, CouchbaseCluster.COUCH_VIEWS_DATA_SIZE_PER_NODE)
-                .put(CouchbaseNode.GET_HITS, CouchbaseCluster.GET_HITS_PER_NODE)
-                .put(CouchbaseNode.CMD_GET, CouchbaseCluster.CMD_GET_PER_NODE)
-                .put(CouchbaseNode.CURR_ITEMS_TOT, CouchbaseCluster.CURR_ITEMS_TOT_PER_NODE)
-            .build();
-        
-        for (AttributeSensor<? extends Number> nodeSensor : enricherSetup.keySet()) {
-            addSummingMemberEnricher(nodeSensor);
-            addAveragingMemberEnricher(nodeSensor, enricherSetup.get(nodeSensor));
-        }
-        
-        enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
-            .from(IS_CLUSTER_INITIALIZED).computing(
-                IfFunctions.ifNotEquals(true).value("The cluster is not yet completely initialized")
-                    .defaultValue(null).build()).build() );
-    }
-    
-    private void addAveragingMemberEnricher(AttributeSensor<? extends Number> fromSensor, AttributeSensor<? extends Number> toSensor) {
-        enrichers().add(Enrichers.builder()
-            .aggregating(fromSensor)
-            .publishing(toSensor)
-            .fromMembers()
-            .computingAverage()
-            .build()
-        );
-    }
-
-    private void addSummingMemberEnricher(AttributeSensor<? extends Number> source) {
-        enrichers().add(Enrichers.builder()
-            .aggregating(source)
-            .publishing(source)
-            .fromMembers()
-            .computingSum()
-            .build()
-        );
-    }
-
-    @Override
-    protected void doStart() {
-        sensors().set(IS_CLUSTER_INITIALIZED, false);
-        
-        super.doStart();
-
-        connectSensors();
-        
-        sensors().set(BUCKET_CREATION_IN_PROGRESS, false);
-
-        //start timeout before adding the servers
-        Tasks.setBlockingDetails("Pausing while Couchbase stabilizes");
-        Time.sleep(getConfig(NODES_STARTED_STABILIZATION_DELAY));
-
-        Optional<Set<Entity>> upNodes = Optional.<Set<Entity>>fromNullable(getAttribute(COUCHBASE_CLUSTER_UP_NODES));
-        if (upNodes.isPresent() && !upNodes.get().isEmpty()) {
-
-            Tasks.setBlockingDetails("Adding servers to Couchbase");
-            
-            //TODO: select a new primary node if this one fails
-            Entity primaryNode = upNodes.get().iterator().next();
-            ((EntityInternal) primaryNode).sensors().set(CouchbaseNode.IS_PRIMARY_NODE, true);
-            sensors().set(COUCHBASE_PRIMARY_NODE, primaryNode);
-
-            Set<Entity> serversToAdd = MutableSet.<Entity>copyOf(getUpNodes());
-
-            if (serversToAdd.size() >= getQuorumSize() && serversToAdd.size() > 1) {
-                log.info("Number of SERVICE_UP nodes:{} in cluster:{} reached Quorum:{}, adding the servers", new Object[]{serversToAdd.size(), getId(), getQuorumSize()});
-                addServers(serversToAdd);
-
-                //wait for servers to be added to the couchbase server
-                try {
-                    Tasks.setBlockingDetails("Delaying before advertising cluster up");
-                    Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER));
-                } finally {
-                    Tasks.resetBlockingDetails();
-                }
-                
-                ((CouchbaseNode)getPrimaryNode()).rebalance();
-            } else {
-                if (getQuorumSize()>1) {
-                    log.warn(this+" is not quorate; will likely fail later, but proceeding for now");
-                }
-                for (Entity server: serversToAdd) {
-                    ((EntityInternal) server).sensors().set(CouchbaseNode.IS_IN_CLUSTER, true);
-                }
-            }
-                
-            if (getConfig(CREATE_BUCKETS)!=null) {
-                try {
-                    Tasks.setBlockingDetails("Creating buckets in Couchbase");
-
-                    createBuckets();
-                    DependentConfiguration.waitInTaskForAttributeReady(this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
-
-                } finally {
-                    Tasks.resetBlockingDetails();
-                }
-            }
-
-            if (getConfig(REPLICATION)!=null) {
-                try {
-                    Tasks.setBlockingDetails("Configuring replication rules");
-
-                    List<Map<String, Object>> replRules = getConfig(REPLICATION);
-                    for (Map<String, Object> replRule: replRules) {
-                        DynamicTasks.queue(Effectors.invocation(getPrimaryNode(), CouchbaseNode.ADD_REPLICATION_RULE, replRule));
-                    }
-                    DynamicTasks.waitForLast();
-
-                } finally {
-                    Tasks.resetBlockingDetails();
-                }
-            }
-
-            sensors().set(IS_CLUSTER_INITIALIZED, true);
-            
-        } else {
-            throw new IllegalStateException("No up nodes available after starting");
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (resetBucketCreation.get() != null) {
-            resetBucketCreation.get().stop();
-        }
-        super.stop();
-    }
-
-    protected void connectSensors() {
-        policies().add(PolicySpec.create(MemberTrackingPolicy.class)
-                .displayName("Controller targets tracker")
-                .configure("group", this));
-    }
-    
-    private final static class ListOfHostAndPort implements Function<Set<Entity>, List<String>> {
-        @Override public List<String> apply(Set<Entity> input) {
-            List<String> addresses = Lists.newArrayList();
-            for (Entity entity : input) {
-                addresses.add(String.format("%s",
-                        BrooklynAccessUtils.getBrooklynAccessibleAddress(entity, entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT))));
-            }
-            return addresses;
-        }
-    }
-
-    public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
-        @Override protected void onEntityChange(Entity member) {
-            ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member);
-        }
-
-        @Override protected void onEntityAdded(Entity member) {
-            ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member);
-        }
-
-        @Override protected void onEntityRemoved(Entity member) {
-            ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member);
-        }
-    };
-
-    protected synchronized void onServerPoolMemberChanged(Entity member) {
-        if (log.isTraceEnabled()) log.trace("For {}, considering membership of {} which is in locations {}",
-                new Object[]{this, member, member.getLocations()});
-
-        //FIXME: make use of servers to be added after cluster initialization.
-        synchronized (mutex) {
-            if (belongsInServerPool(member)) {
-
-                Optional<Set<Entity>> upNodes = Optional.fromNullable(getUpNodes());
-                if (upNodes.isPresent()) {
-
-                    if (!upNodes.get().contains(member)) {
-                        Set<Entity> newNodes = Sets.newHashSet(getUpNodes());
-                        newNodes.add(member);
-                        sensors().set(COUCHBASE_CLUSTER_UP_NODES, newNodes);
-
-                        //add to set of servers to be added.
-                        if (isClusterInitialized()) {
-                            addServer(member);
-                        }
-                    }
-                } else {
-                    Set<Entity> newNodes = Sets.newHashSet();
-                    newNodes.add(member);
-                    sensors().set(COUCHBASE_CLUSTER_UP_NODES, newNodes);
-
-                    if (isClusterInitialized()) {
-                        addServer(member);
-                    }
-                }
-            } else {
-                Set<Entity> upNodes = getUpNodes();
-                if (upNodes != null && upNodes.contains(member)) {
-                    upNodes.remove(member);
-                    sensors().set(COUCHBASE_CLUSTER_UP_NODES, upNodes);
-                    log.info("Removing couchbase node {}: {}; from cluster", new Object[]{this, member});
-                }
-            }
-            if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", this, member);
-        }
-    }
-
-    protected boolean belongsInServerPool(Entity member) {
-        if (!groovyTruth(member.getAttribute(Startable.SERVICE_UP))) {
-            if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, eliminating because not up", this, member);
-            return false;
-        }
-        if (!getMembers().contains(member)) {
-            if (log.isTraceEnabled())
-                log.trace("Members of {}, checking {}, eliminating because not member", this, member);
-
-            return false;
-        }
-        if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, approving", this, member);
-
-        return true;
-    }
-
-
-    protected EntitySpec<?> getMemberSpec() {
-        EntitySpec<?> result = super.getMemberSpec();
-        if (result != null) return result;
-        return EntitySpec.create(CouchbaseNode.class);
-    }
-
-    @Override
-    public int getQuorumSize() {
-        Integer quorumSize = getConfig(CouchbaseCluster.INITIAL_QUORUM_SIZE);
-        if (quorumSize != null && quorumSize > 0)
-            return quorumSize;
-        // by default the quorum would be floor(initial_cluster_size/2) + 1
-        return (int) Math.floor(getConfig(INITIAL_SIZE) / 2) + 1;
-    }
-
-    protected int getActualSize() {
-        return Optional.fromNullable(getAttribute(CouchbaseCluster.ACTUAL_CLUSTER_SIZE)).or(-1);
-    }
-
-    private Set<Entity> getUpNodes() {
-        return getAttribute(COUCHBASE_CLUSTER_UP_NODES);
-    }
-
-    private CouchbaseNode getPrimaryNode() {
-        return (CouchbaseNode) getAttribute(COUCHBASE_PRIMARY_NODE);
-    }
-
-    @Override
-    protected void initEnrichers() {
-        enrichers().add(Enrichers.builder().updatingMap(ServiceStateLogic.SERVICE_NOT_UP_INDICATORS)
-            .from(COUCHBASE_CLUSTER_UP_NODES)
-            .computing(new Function<Set<Entity>, Object>() {
-                @Override
-                public Object apply(Set<Entity> input) {
-                    if (input==null) return "Couchbase up nodes not set";
-                    if (input.isEmpty()) return "No Couchbase up nodes";
-                    if (input.size() < getQuorumSize()) return "Couchbase up nodes not quorate";
-                    return null;
-                }
-            }).build());
-        
-        if (config().getLocalRaw(UP_QUORUM_CHECK).isAbsent()) {
-            // TODO Only leaving CouchbaseQuorumCheck here in case it is contained in persisted state.
-            // If so, need a transformer and then to delete it
-            @SuppressWarnings({ "unused", "hiding" })
-            @Deprecated
-            class CouchbaseQuorumCheck implements QuorumCheck {
-                @Override
-                public boolean isQuorate(int sizeHealthy, int totalSize) {
-                    // check members count passed in AND the sensor  
-                    if (sizeHealthy < getQuorumSize()) return false;
-                    return true;
-                }
-            }
-            config().set(UP_QUORUM_CHECK, new CouchbaseClusterImpl.CouchbaseQuorumCheck(this));
-        }
-        super.initEnrichers();
-    }
-    
-    static class CouchbaseQuorumCheck implements QuorumCheck {
-        private final CouchbaseCluster cluster;
-        CouchbaseQuorumCheck(CouchbaseCluster cluster) {
-            this.cluster = cluster;
-        }
-        @Override
-        public boolean isQuorate(int sizeHealthy, int totalSize) {
-            // check members count passed in AND the sensor  
-            if (sizeHealthy < cluster.getQuorumSize()) return false;
-            return true;
-        }
-    }
-    protected void addServers(Set<Entity> serversToAdd) {
-        Preconditions.checkNotNull(serversToAdd);
-        for (Entity s : serversToAdd) {
-            addServerSeveralTimes(s, 12, Duration.TEN_SECONDS);
-        }
-    }
-
-    /** try adding in a loop because we are seeing spurious port failures in AWS */
-    protected void addServerSeveralTimes(Entity s, int numRetries, Duration delayOnFailure) {
-        try {
-            addServer(s);
-        } catch (Exception e) {
-            Exceptions.propagateIfFatal(e);
-            if (numRetries<=0) throw Exceptions.propagate(e);
-            // retry once after sleep because we are getting some odd primary-change events
-            log.warn("Error adding "+s+" to "+this+", "+numRetries+" retries remaining, will retry after delay ("+e+")");
-            Time.sleep(delayOnFailure);
-            addServerSeveralTimes(s, numRetries-1, delayOnFailure);
-        }
-    }
-
-    protected void addServer(Entity serverToAdd) {
-        Preconditions.checkNotNull(serverToAdd);
-        if (serverToAdd.equals(getPrimaryNode())) {
-            // no need to add; but we pass it in anyway because it makes the calling logic easier
-            return;
-        }
-        if (!isMemberInCluster(serverToAdd)) {
-            HostAndPort webAdmin = HostAndPort.fromParts(serverToAdd.getAttribute(SoftwareProcess.SUBNET_HOSTNAME),
-                    serverToAdd.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
-            String username = serverToAdd.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME);
-            String password = serverToAdd.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD);
-
-            if (isClusterInitialized()) {
-                Entities.invokeEffectorWithArgs(this, getPrimaryNode(), CouchbaseNode.SERVER_ADD_AND_REBALANCE, webAdmin.toString(), username, password).getUnchecked();
-            } else {
-                Entities.invokeEffectorWithArgs(this, getPrimaryNode(), CouchbaseNode.SERVER_ADD, webAdmin.toString(), username, password).getUnchecked();
-            }
-            //FIXME check feedback of whether the server was added.
-            ((EntityInternal) serverToAdd).sensors().set(CouchbaseNode.IS_IN_CLUSTER, true);
-        }
-    }
-
-    /** finds the cluster name specified for a node or a cluster, 
-     * using {@link CouchbaseCluster#CLUSTER_NAME} or falling back to the cluster (or node) ID. */
-    public static String getClusterName(Entity node) {
-        String name = node.getConfig(CLUSTER_NAME);
-        if (!Strings.isBlank(name)) return Strings.makeValidFilename(name);
-        return getClusterOrNode(node).getId();
-    }
-    
-    /** returns Couchbase cluster in ancestry, defaulting to the given node if none */
-    @Nonnull public static Entity getClusterOrNode(Entity node) {
-        Iterable<CouchbaseCluster> clusterNodes = Iterables.filter(Entities.ancestors(node), CouchbaseCluster.class);
-        return Iterables.getFirst(clusterNodes, node);
-    }
-    
-    public boolean isClusterInitialized() {
-        return Optional.fromNullable(getAttribute(IS_CLUSTER_INITIALIZED)).or(false);
-    }
-
-    public boolean isMemberInCluster(Entity e) {
-        return Optional.fromNullable(e.getAttribute(CouchbaseNode.IS_IN_CLUSTER)).or(false);
-    }
-    
-    public void createBuckets() {
-        //TODO: check for port conflicts if buckets are being created with a port
-        List<Map<String, Object>> bucketsToCreate = getConfig(CREATE_BUCKETS);
-        if (bucketsToCreate==null) return;
-        
-        Entity primaryNode = getPrimaryNode();
-
-        for (Map<String, Object> bucketMap : bucketsToCreate) {
-            String bucketName = bucketMap.containsKey("bucket") ? (String) bucketMap.get("bucket") : "default";
-            String bucketType = bucketMap.containsKey("bucket-type") ? (String) bucketMap.get("bucket-type") : "couchbase";
-            // default bucket must be on this port; other buckets can (must) specify their own (unique) port
-            Integer bucketPort = bucketMap.containsKey("bucket-port") ? (Integer) bucketMap.get("bucket-port") : 11211;
-            Integer bucketRamSize = bucketMap.containsKey("bucket-ramsize") ? (Integer) bucketMap.get("bucket-ramsize") : 100;
-            Integer bucketReplica = bucketMap.containsKey("bucket-replica") ? (Integer) bucketMap.get("bucket-replica") : 1;
-
-            createBucket(primaryNode, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica);
-        }
-    }
-
-    public void createBucket(final Entity primaryNode, final String bucketName, final String bucketType, final Integer bucketPort, final Integer bucketRamSize, final Integer bucketReplica) {
-        DynamicTasks.queueIfPossible(TaskBuilder.<Void>builder().displayName("Creating bucket " + bucketName).body(
-                new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
-                        if (CouchbaseClusterImpl.this.resetBucketCreation.get() != null) {
-                            CouchbaseClusterImpl.this.resetBucketCreation.get().stop();
-                        }
-                        sensors().set(CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, true);
-                        HostAndPort hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(primaryNode, primaryNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
-
-                        CouchbaseClusterImpl.this.resetBucketCreation.set(HttpFeed.builder()
-                                .entity(CouchbaseClusterImpl.this)
-                                .period(500, TimeUnit.MILLISECONDS)
-                                .baseUri(String.format("http://%s/pools/default/buckets/%s", hostAndPort, bucketName))
-                                .credentials(primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD))
-                                .poll(new HttpPollConfig<Boolean>(BUCKET_CREATION_IN_PROGRESS)
-                                        .onSuccess(Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walkN("nodes"), new Function<JsonElement, Boolean>() {
-                                            @Override
-                                            public Boolean apply(JsonElement input) {
-                                                // Wait until bucket has been created on all nodes and the couchApiBase element has been published (indicating that the bucket is useable)
-                                                JsonArray servers = input.getAsJsonArray();
-                                                if (servers.size() != CouchbaseClusterImpl.this.getMembers().size()) {
-                                                    return true;
-                                                }
-                                                for (JsonElement server : servers) {
-                                                    Object api = server.getAsJsonObject().get("couchApiBase");
-                                                    if (api == null || Strings.isEmpty(String.valueOf(api))) {
-                                                        return true;
-                                                    }
-                                                }
-                                                return false;
-                                            }
-                                        }))
-                                        .onFailureOrException(new Function<Object, Boolean>() {
-                                            @Override
-                                            public Boolean apply(Object input) {
-                                                if (input instanceof HttpToolResponse) {
-                                                    if (((HttpToolResponse) input).getResponseCode() == 404) {
-                                                        return true;
-                                                    }
-                                                }
-                                                if (input instanceof Throwable)
-                                                    Exceptions.propagate((Throwable) input);
-                                                throw new IllegalStateException("Unexpected response when creating bucket:" + input);
-                                            }
-                                        }))
-                                .build());
-
-                        // TODO: Bail out if bucket creation fails, to allow next bucket to proceed
-                        Entities.invokeEffectorWithArgs(CouchbaseClusterImpl.this, primaryNode, CouchbaseNode.BUCKET_CREATE, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica);
-                        DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
-                        if (CouchbaseClusterImpl.this.resetBucketCreation.get() != null) {
-                            CouchbaseClusterImpl.this.resetBucketCreation.get().stop();
-                        }
-                        return null;
-                    }
-                }
-        ).build()).orSubmitAndBlock();
-    }
-    
-    static {
-        RendererHints.register(COUCH_DOCS_DATA_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric()));
-        RendererHints.register(COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric()));
-        RendererHints.register(MEM_USED_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric()));
-        RendererHints.register(COUCH_VIEWS_ACTUAL_DISK_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric()));
-        RendererHints.register(COUCH_VIEWS_DATA_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric()));
-    }
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNode.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
deleted file mode 100644
index 8bde2f3..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.couchbase;
-
-import java.net.URI;
-
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.api.entity.ImplementedBy;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.annotation.Effector;
-import org.apache.brooklyn.core.annotation.EffectorParam;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.config.render.RendererHints;
-import org.apache.brooklyn.core.effector.Effectors;
-import org.apache.brooklyn.core.effector.MethodEffector;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey;
-import org.apache.brooklyn.core.sensor.PortAttributeSensorAndConfigKey;
-import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.entity.software.base.SoftwareProcess;
-import org.apache.brooklyn.util.core.flags.SetFromFlag;
-import org.apache.brooklyn.util.text.ByteSizeStrings;
-
-@Catalog(name="CouchBase Node", description="Couchbase Server is an open source, distributed (shared-nothing architecture) "
-        + "NoSQL document-oriented database that is optimized for interactive applications.")
-@ImplementedBy(CouchbaseNodeImpl.class)
-public interface CouchbaseNode extends SoftwareProcess {
-
-    @SetFromFlag("adminUsername")
-    ConfigKey<String> COUCHBASE_ADMIN_USERNAME = ConfigKeys.newStringConfigKey("couchbase.adminUsername", "Username for the admin user on the node", "Administrator");
-
-    @SetFromFlag("adminPassword")
-    ConfigKey<String> COUCHBASE_ADMIN_PASSWORD = ConfigKeys.newStringConfigKey("couchbase.adminPassword", "Password for the admin user on the node", "Password");
-
-    @SetFromFlag("version")
-    ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION,
-            "3.0.0");
-
-    @SetFromFlag("enterprise")
-    ConfigKey<Boolean> USE_ENTERPRISE = ConfigKeys.newBooleanConfigKey("couchbase.enterprise.enabled",
-        "Whether to use Couchbase Enterprise; if false uses the community version. Defaults to true.", true);
-
-    @SetFromFlag("downloadUrl")
-    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
-            SoftwareProcess.DOWNLOAD_URL, "http://packages.couchbase.com/releases/${version}/"
-                + "couchbase-server-${driver.communityOrEnterprise}${driver.downloadLinkPreVersionSeparator}${version}${driver.downloadLinkOsTagWithPrefix}");
-
-    @SetFromFlag("clusterInitRamSize")
-    BasicAttributeSensorAndConfigKey<Integer> COUCHBASE_CLUSTER_INIT_RAM_SIZE = new BasicAttributeSensorAndConfigKey<Integer>(
-            Integer.class, "couchbase.clusterInitRamSize", "initial ram size of the cluster", 300);
-
-    PortAttributeSensorAndConfigKey COUCHBASE_WEB_ADMIN_PORT = new PortAttributeSensorAndConfigKey("couchbase.webAdminPort", "Web Administration Port", "8091+");
-    PortAttributeSensorAndConfigKey COUCHBASE_API_PORT = new PortAttributeSensorAndConfigKey("couchbase.apiPort", "Couchbase API Port", "8092+");
-    PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_BUCKET_PORT = new PortAttributeSensorAndConfigKey("couchbase.internalBucketPort", "Internal Bucket Port", "11209");
-    PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_EXTERNAL_BUCKET_PORT = new PortAttributeSensorAndConfigKey("couchbase.internalExternalBucketPort", "Internal/External Bucket Port", "11210");
-    PortAttributeSensorAndConfigKey COUCHBASE_CLIENT_INTERFACE_PROXY = new PortAttributeSensorAndConfigKey("couchbase.clientInterfaceProxy", "Client interface (proxy)", "11211");
-    PortAttributeSensorAndConfigKey COUCHBASE_INCOMING_SSL_PROXY = new PortAttributeSensorAndConfigKey("couchbase.incomingSslProxy", "Incoming SSL Proxy", "11214");
-    PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_OUTGOING_SSL_PROXY = new PortAttributeSensorAndConfigKey("couchbase.internalOutgoingSslProxy", "Internal Outgoing SSL Proxy", "11215");
-    PortAttributeSensorAndConfigKey COUCHBASE_REST_HTTPS_FOR_SSL = new PortAttributeSensorAndConfigKey("couchbase.internalRestHttpsForSsl", "Internal REST HTTPS for SSL", "18091");
-    PortAttributeSensorAndConfigKey COUCHBASE_CAPI_HTTPS_FOR_SSL = new PortAttributeSensorAndConfigKey("couchbase.internalCapiHttpsForSsl", "Internal CAPI HTTPS for SSL", "18092");
-    PortAttributeSensorAndConfigKey ERLANG_PORT_MAPPER = new PortAttributeSensorAndConfigKey("couchbase.erlangPortMapper", "Erlang Port Mapper Daemon Listener Port (epmd)", "4369");
-    PortAttributeSensorAndConfigKey NODE_DATA_EXCHANGE_PORT_RANGE_START = new PortAttributeSensorAndConfigKey("couchbase.nodeDataExchangePortRangeStart", "Node data exchange Port Range Start", "21100+");
-    PortAttributeSensorAndConfigKey NODE_DATA_EXCHANGE_PORT_RANGE_END = new PortAttributeSensorAndConfigKey("couchbase.nodeDataExchangePortRangeEnd", "Node data exchange Port Range End", "21199+");
-
-    AttributeSensor<Boolean> IS_PRIMARY_NODE = Sensors.newBooleanSensor("couchbase.isPrimaryNode", "flag to determine if the current couchbase node is the primary node for the cluster");
-    AttributeSensor<Boolean> IS_IN_CLUSTER = Sensors.newBooleanSensor("couchbase.isInCluster", "flag to determine if the current couchbase node has been added to a cluster, "
-        + "including being the first / primary node");
-    AttributeSensor<URI> COUCHBASE_WEB_ADMIN_URL = Attributes.MAIN_URI;
-    
-    // Interesting stats
-    AttributeSensor<Double> OPS = Sensors.newDoubleSensor("couchbase.stats.ops", 
-            "Retrieved from pools/nodes/<current node>/interestingStats/ops");
-    AttributeSensor<Long> COUCH_DOCS_DATA_SIZE = Sensors.newLongSensor("couchbase.stats.couch.docs.data.size", 
-            "Retrieved from pools/nodes/<current node>/interestingStats/couch_docs_data_size");
-    AttributeSensor<Long> COUCH_DOCS_ACTUAL_DISK_SIZE = Sensors.newLongSensor("couchbase.stats.couch.docs.actual.disk.size", 
-            "Retrieved from pools/nodes/<current node>/interestingStats/couch_docs_actual_disk_size");
-    AttributeSensor<Long> EP_BG_FETCHED = Sensors.newLongSensor("couchbase.stats.ep.bg.fetched", 
-            "Retrieved from pools/nodes/<current node>/interestingStats/ep_bg_fetched");
-    AttributeSensor<Long> MEM_USED = Sensors.newLongSensor("couchbase.stats.mem.used", 
-            "Retrieved from pools/nodes/<current node>/interestingStats/mem_used");
-    AttributeSensor<Long> COUCH_VIEWS_ACTUAL_DISK_SIZE = Sensors.newLongSensor("couchbase.stats.couch.views.actual.disk.size", 
-            "Retrieved from pools/nodes/<current node>/interestingStats/couch_views_actual_disk_size");
-    AttributeSensor<Long> CURR_ITEMS = Sensors.newLongSensor("couchbase.stats.curr.items", 
-            "Retrieved from pools/nodes/<current node>/interestingStats/curr_items");
-    AttributeSensor<Long> VB_REPLICA_CURR_ITEMS = Sensors.newLongSensor("couchbase.stats.vb.replica.curr.items", 
-            "Retrieved from pools/nodes/<current node>/interestingStats/vb_replica_curr_items");
-    AttributeSensor<Long> COUCH_VIEWS_DATA_SIZE = Sensors.newLongSensor("couchbase.stats.couch.views.data.size", 
-            "Retrieved from pools/nodes/<current node>/interestingStats/couch_views_data_size");
-    AttributeSensor<Long> GET_HITS = Sensors.newLongSensor("couchbase.stats.get.hits", 
-            "Retrieved from pools/nodes/<current node>/interestingStats/get_hits");
-    AttributeSensor<Double> CMD_GET = Sensors.newDoubleSensor("couchbase.stats.cmd.get", 
-            "Retrieved from pools/nodes/<current node>/interestingStats/cmd_get");
-    AttributeSensor<Long> CURR_ITEMS_TOT = Sensors.newLongSensor("couchbase.stats.curr.items.tot", 
-            "Retrieved from pools/nodes/<current node>/interestingStats/curr_items_tot");
-    AttributeSensor<String> REBALANCE_STATUS = Sensors.newStringSensor("couchbase.rebalance.status", 
-            "Displays the current rebalance status from pools/nodes/rebalanceStatus");
-    
-    class MainUri {
-        public static final AttributeSensor<URI> MAIN_URI = Attributes.MAIN_URI;
-        
-        static {
-            // ROOT_URL does not need init because it refers to something already initialized
-            RendererHints.register(COUCHBASE_WEB_ADMIN_URL, RendererHints.namedActionWithUrl());
-
-            RendererHints.register(COUCH_DOCS_DATA_SIZE, RendererHints.displayValue(ByteSizeStrings.metric()));
-            RendererHints.register(COUCH_DOCS_ACTUAL_DISK_SIZE, RendererHints.displayValue(ByteSizeStrings.metric()));
-            RendererHints.register(MEM_USED, RendererHints.displayValue(ByteSizeStrings.metric()));
-            RendererHints.register(COUCH_VIEWS_ACTUAL_DISK_SIZE, RendererHints.displayValue(ByteSizeStrings.metric()));
-            RendererHints.register(COUCH_VIEWS_DATA_SIZE, RendererHints.displayValue(ByteSizeStrings.metric()));
-        }
-    }
-    
-    // this long-winded reference is done just to trigger the initialization above
-    AttributeSensor<URI> MAIN_URI = MainUri.MAIN_URI;
-
-    MethodEffector<Void> SERVER_ADD = new MethodEffector<Void>(CouchbaseNode.class, "serverAdd");
-    MethodEffector<Void> SERVER_ADD_AND_REBALANCE = new MethodEffector<Void>(CouchbaseNode.class, "serverAddAndRebalance");
-    MethodEffector<Void> REBALANCE = new MethodEffector<Void>(CouchbaseNode.class, "rebalance");
-    MethodEffector<Void> BUCKET_CREATE = new MethodEffector<Void>(CouchbaseNode.class, "bucketCreate");
-    org.apache.brooklyn.api.effector.Effector<Void> ADD_REPLICATION_RULE = Effectors.effector(Void.class, "addReplicationRule")
-        .description("Adds a replication rule from the indicated bucket on the cluster where this node is located "
-            + "to the indicated cluster and optional destination bucket")
-        .parameter(String.class, "fromBucket", "Bucket to be replicated")
-        .parameter(Object.class, "toCluster", "Entity (or ID) of the cluster to which this should replicate")
-        .parameter(String.class, "toBucket", "Destination bucket for replication in the toCluster, defaulting to the same as the fromBucket")
-        .buildAbstract();
-
-    @Effector(description = "add a server to a cluster")
-    public void serverAdd(@EffectorParam(name = "serverHostname") String serverToAdd, @EffectorParam(name = "username") String username, @EffectorParam(name = "password") String password);
-    
-    @Effector(description = "add a server to a cluster, and immediately rebalances")
-    public void serverAddAndRebalance(@EffectorParam(name = "serverHostname") String serverToAdd, @EffectorParam(name = "username") String username, @EffectorParam(name = "password") String password);
-
-    @Effector(description = "rebalance the couchbase cluster")
-    public void rebalance();
-    
-    @Effector(description = "create a new bucket")
-    public void bucketCreate(@EffectorParam(name = "bucketName") String bucketName, @EffectorParam(name = "bucketType") String bucketType, 
-            @EffectorParam(name = "bucketPort") Integer bucketPort, @EffectorParam(name = "bucketRamSize") Integer bucketRamSize, 
-            @EffectorParam(name = "bucketReplica") Integer bucketReplica);
-
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java
deleted file mode 100644
index b5c797d..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.couchbase;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.entity.software.base.SoftwareProcessDriver;
-
-public interface CouchbaseNodeDriver extends SoftwareProcessDriver {
-    public String getOsTag();
-    public String getDownloadLinkPreVersionSeparator();
-    public String getDownloadLinkOsTagWithPrefix();
-    
-    public String getCommunityOrEnterprise();
-
-    public void serverAdd(String serverToAdd, String username, String password);
-
-    public void rebalance();
-    
-    public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica);
-
-    public void serverAddAndRebalance(String serverToAdd, String username, String password);
-
-    public void addReplicationRule(Entity toCluster, String fromBucket, String toBucket);
-
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
deleted file mode 100644
index 68d3a54..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.couchbase;
-
-import static java.lang.String.format;
-
-import java.net.URI;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.location.MachineProvisioningLocation;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.api.sensor.SensorEvent;
-import org.apache.brooklyn.api.sensor.SensorEventListener;
-import org.apache.brooklyn.core.effector.EffectorBody;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.location.access.BrooklynAccessUtils;
-import org.apache.brooklyn.core.location.cloud.CloudLocationConfig;
-import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl;
-import org.apache.brooklyn.feed.http.HttpFeed;
-import org.apache.brooklyn.feed.http.HttpPollConfig;
-import org.apache.brooklyn.feed.http.HttpValueFunctions;
-import org.apache.brooklyn.feed.http.JsonFunctions;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.collections.MutableSet;
-import org.apache.brooklyn.util.core.config.ConfigBag;
-import org.apache.brooklyn.util.http.HttpTool;
-import org.apache.brooklyn.util.http.HttpToolResponse;
-import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.brooklyn.util.guava.Functionals;
-import org.apache.brooklyn.util.guava.MaybeFunctions;
-import org.apache.brooklyn.util.guava.TypeTokens;
-import org.apache.brooklyn.util.net.Urls;
-import org.apache.brooklyn.util.text.Strings;
-import org.apache.brooklyn.util.time.Duration;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Preconditions;
-import com.google.common.net.HostAndPort;
-import com.google.common.net.HttpHeaders;
-import com.google.common.net.MediaType;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-
-public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseNode {
-
-    private static final Logger log = LoggerFactory.getLogger(CouchbaseNodeImpl.class);
-
-    private volatile HttpFeed httpFeed;
-
-    @Override
-    public Class<CouchbaseNodeDriver> getDriverInterface() {
-        return CouchbaseNodeDriver.class;
-    }
-
-    @Override
-    public CouchbaseNodeDriver getDriver() {
-        return (CouchbaseNodeDriver) super.getDriver();
-    }
-
-    @Override
-    public void init() {
-        super.init();
-
-        subscriptions().subscribe(this, Attributes.SERVICE_UP, new SensorEventListener<Boolean>() {
-            @Override
-            public void onEvent(SensorEvent<Boolean> booleanSensorEvent) {
-                if (Boolean.TRUE.equals(booleanSensorEvent.getValue())) {
-                    Integer webPort = getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT);
-                    Preconditions.checkNotNull(webPort, CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT+" not set for %s; is an acceptable port available?", this);
-                    String hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(CouchbaseNodeImpl.this, webPort).toString();
-                    sensors().set(CouchbaseNode.COUCHBASE_WEB_ADMIN_URL, URI.create(format("http://%s", hostAndPort)));
-                }
-            }
-        });
-
-        getMutableEntityType().addEffector(ADD_REPLICATION_RULE, new EffectorBody<Void>() {
-            @Override
-            public Void call(ConfigBag parameters) {
-                addReplicationRule(parameters);
-                return null;
-            }
-        });
-    }
-
-    protected Map<String, Object> obtainProvisioningFlags(@SuppressWarnings("rawtypes") MachineProvisioningLocation location) {
-        ConfigBag result = ConfigBag.newInstance(super.obtainProvisioningFlags(location));
-        result.configure(CloudLocationConfig.OS_64_BIT, true);
-        return result.getAllConfig();
-    }
-
-    @Override
-    protected Collection<Integer> getRequiredOpenPorts() {
-        // TODO this creates a huge list of inbound ports; much better to define on a security group using range syntax!
-        int erlangRangeStart = getConfig(NODE_DATA_EXCHANGE_PORT_RANGE_START).iterator().next();
-        int erlangRangeEnd = getConfig(NODE_DATA_EXCHANGE_PORT_RANGE_END).iterator().next();
-
-        Set<Integer> newPorts = MutableSet.<Integer>copyOf(super.getRequiredOpenPorts());
-        newPorts.remove(erlangRangeStart);
-        newPorts.remove(erlangRangeEnd);
-        for (int i = erlangRangeStart; i <= erlangRangeEnd; i++)
-            newPorts.add(i);
-        return newPorts;
-    }
-
-    @Override
-    public void serverAdd(String serverToAdd, String username, String password) {
-        getDriver().serverAdd(serverToAdd, username, password);
-    }
-
-    @Override
-    public void serverAddAndRebalance(String serverToAdd, String username, String password) {
-        getDriver().serverAddAndRebalance(serverToAdd, username, password);
-    }
-
-    @Override
-    public void rebalance() {
-        getDriver().rebalance();
-    }
-
-    protected final static Function<HttpToolResponse, JsonElement> GET_THIS_NODE_STATS = Functionals.chain(
-        HttpValueFunctions.jsonContents(),
-        JsonFunctions.walk("nodes"),
-        new Function<JsonElement, JsonElement>() {
-            @Override public JsonElement apply(JsonElement input) {
-                JsonArray nodes = input.getAsJsonArray();
-                for (JsonElement element : nodes) {
-                    JsonElement thisNode = element.getAsJsonObject().get("thisNode");
-                    if (thisNode!=null && Boolean.TRUE.equals(thisNode.getAsBoolean())) {
-                        return element.getAsJsonObject().get("interestingStats");
-                    }
-                }
-                return null;
-        }}
-    );
-
-    protected final static <T> HttpPollConfig<T> getSensorFromNodeStat(AttributeSensor<T> sensor, String ...jsonPath) {
-        return new HttpPollConfig<T>(sensor)
-            .onSuccess(Functionals.chain(GET_THIS_NODE_STATS,
-                MaybeFunctions.<JsonElement>wrap(),
-                JsonFunctions.walkM(jsonPath),
-                JsonFunctions.castM(TypeTokens.getRawRawType(sensor.getTypeToken()), null)))
-            .onFailureOrException(Functions.<T>constant(null));
-    }
-
-    @Override
-    protected void postStart() {
-        super.postStart();
-        renameServerToPublicHostname();
-    }
-
-    protected void renameServerToPublicHostname() {
-        // http://docs.couchbase.com/couchbase-manual-2.5/cb-install/#couchbase-getting-started-hostnames
-        URI apiUri = null;
-        try {
-            HostAndPort accessible = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getAttribute(COUCHBASE_WEB_ADMIN_PORT));
-            apiUri = URI.create(String.format("http://%s:%d/node/controller/rename", accessible.getHostText(), accessible.getPort()));
-            UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(getConfig(COUCHBASE_ADMIN_USERNAME), getConfig(COUCHBASE_ADMIN_PASSWORD));
-            HttpToolResponse response = HttpTool.httpPost(
-                    // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials
-                    HttpTool.httpClientBuilder().uri(apiUri).credentials(credentials).build(),
-                    apiUri,
-                    MutableMap.of(
-                            HttpHeaders.CONTENT_TYPE, MediaType.FORM_DATA.toString(),
-                            HttpHeaders.ACCEPT, "*/*",
-                            // this appears needed; without it we get org.apache.http.NoHttpResponseException !?
-                            HttpHeaders.AUTHORIZATION, HttpTool.toBasicAuthorizationValue(credentials)),
-                    Charsets.UTF_8.encode("hostname="+Urls.encode(accessible.getHostText())).array());
-            log.debug("Renamed Couchbase server "+this+" via "+apiUri+": "+response);
-            if (!HttpTool.isStatusCodeHealthy(response.getResponseCode())) {
-                log.warn("Invalid response code, renaming {} ({}): {}",
-                        new Object[]{apiUri, response.getResponseCode(), response.getContentAsString()});
-            }
-        } catch (Exception e) {
-            Exceptions.propagateIfFatal(e);
-            log.warn("Error renaming server, using "+apiUri+": "+e, e);
-        }
-    }
-
-    public void connectSensors() {
-        super.connectSensors();
-        connectServiceUpIsRunning();
-
-        HostAndPort hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, this.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
-        httpFeed = HttpFeed.builder()
-            .entity(this)
-            .period(Duration.seconds(3))
-            .baseUri("http://" + hostAndPort + "/pools/nodes/")
-            .credentialsIfNotNull(getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD))
-            .poll(getSensorFromNodeStat(CouchbaseNode.OPS, "ops"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_DOCS_DATA_SIZE, "couch_docs_data_size"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_DOCS_ACTUAL_DISK_SIZE, "couch_docs_actual_disk_size"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.EP_BG_FETCHED, "ep_bg_fetched"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.MEM_USED, "mem_used"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_VIEWS_ACTUAL_DISK_SIZE, "couch_views_actual_disk_size"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.CURR_ITEMS, "curr_items"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.VB_REPLICA_CURR_ITEMS, "vb_replica_curr_items"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_VIEWS_DATA_SIZE, "couch_views_data_size"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.GET_HITS, "get_hits"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.CMD_GET, "cmd_get"))
-            .poll(getSensorFromNodeStat(CouchbaseNode.CURR_ITEMS_TOT, "curr_items_tot"))
-            .poll(new HttpPollConfig<String>(CouchbaseNode.REBALANCE_STATUS)
-                    .onSuccess(HttpValueFunctions.jsonContents("rebalanceStatus", String.class))
-                    .onFailureOrException(Functions.constant("Could not retrieve")))
-            .build();
-    }
-
-    public void disconnectSensors() {
-        super.disconnectSensors();
-        disconnectServiceUpIsRunning();
-        if (httpFeed != null) {
-            httpFeed.stop();
-        }
-    }
-
-    @Override
-    public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) {
-        if (Strings.isBlank(bucketType)) bucketType = "couchbase";
-        if (bucketRamSize==null || bucketRamSize<=0) bucketRamSize = 200;
-        if (bucketReplica==null || bucketReplica<0) bucketReplica = 1;
-
-        getDriver().bucketCreate(bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica);
-    }
-
-    /** exposed through {@link CouchbaseNode#ADD_REPLICATION_RULE} */
-    protected void addReplicationRule(ConfigBag ruleArgs) {
-        Object toClusterO = Preconditions.checkNotNull(ruleArgs.getStringKey("toCluster"), "toCluster must not be null");
-        if (toClusterO instanceof String) {
-            toClusterO = getManagementContext().lookup((String)toClusterO);
-        }
-        Entity toCluster = Tasks.resolving(toClusterO, Entity.class).context(getExecutionContext()).get();
-
-        String fromBucket = Preconditions.checkNotNull( (String)ruleArgs.getStringKey("fromBucket"), "fromBucket must be specified" );
-
-        String toBucket = (String)ruleArgs.getStringKey("toBucket");
-        if (toBucket==null) toBucket = fromBucket;
-
-        if (!ruleArgs.getUnusedConfig().isEmpty()) {
-            throw new IllegalArgumentException("Unsupported replication rule data: "+ruleArgs.getUnusedConfig());
-        }
-
-        getDriver().addReplicationRule(toCluster, fromBucket, toBucket);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
deleted file mode 100644
index 915f06a..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
+++ /dev/null
@@ -1,511 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.couchbase;
-
-import static java.lang.String.format;
-import static org.apache.brooklyn.util.ssh.BashCommands.*;
-
-import java.net.URI;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Callable;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.Group;
-import org.apache.brooklyn.api.location.OsDetails;
-import org.apache.brooklyn.api.mgmt.Task;
-import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.drivers.downloads.BasicDownloadRequirement;
-import org.apache.brooklyn.core.entity.drivers.downloads.DownloadProducerFromUrlAttribute;
-import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
-import org.apache.brooklyn.core.location.access.BrooklynAccessUtils;
-import org.apache.brooklyn.core.sensor.DependentConfiguration;
-import org.apache.brooklyn.entity.software.base.AbstractSoftwareProcessSshDriver;
-import org.apache.brooklyn.feed.http.HttpValueFunctions;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.brooklyn.location.ssh.SshMachineLocation;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.http.HttpTool;
-import org.apache.brooklyn.util.http.HttpToolResponse;
-import org.apache.brooklyn.util.core.task.DynamicTasks;
-import org.apache.brooklyn.util.core.task.TaskBuilder;
-import org.apache.brooklyn.util.core.task.TaskTags;
-import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.repeat.Repeater;
-import org.apache.brooklyn.util.ssh.BashCommands;
-import org.apache.brooklyn.util.text.NaturalOrderComparator;
-import org.apache.brooklyn.util.text.Strings;
-import org.apache.brooklyn.util.text.StringEscapes.BashStringEscapes;
-import org.apache.brooklyn.util.time.Duration;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.net.HostAndPort;
-
-public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver implements CouchbaseNodeDriver {
-
-    public CouchbaseNodeSshDriver(final CouchbaseNodeImpl entity, final SshMachineLocation machine) {
-        super(entity, machine);
-    }
-
-    public static String couchbaseCli(String cmd) {
-        return "/opt/couchbase/bin/couchbase-cli " + cmd + " ";
-    }
-
-    @Override
-    public void preInstall() {
-        resolver = Entities.newDownloader(this);
-        setExpandedInstallDir(getInstallDir());
-    }
-
-    @Override
-    public void install() {
-        //for reference https://github.com/urbandecoder/couchbase/blob/master/recipes/server.rb
-        //installation instructions (http://docs.couchbase.com/couchbase-manual-2.5/cb-install/#preparing-to-install)
-
-        List<String> urls = resolver.getTargets();
-        String saveAs = resolver.getFilename();
-
-        OsDetails osDetails = getMachine().getMachineDetails().getOsDetails();
-
-        if (osDetails.isLinux()) {
-            List<String> commands = installLinux(urls, saveAs);
-            //FIXME installation return error but the server is up and running.
-            newScript(INSTALLING)
-                    .body.append(commands).execute();
-        } else {
-            Tasks.markInessential();
-            throw new IllegalStateException("Unsupported OS for installing Couchbase. Will continue but may fail later.");
-        }
-    }
-
-    private List<String> installLinux(List<String> urls, String saveAs) {
-
-        log.info("Installing " + getEntity() + " using couchbase-server-{} {}", getCommunityOrEnterprise(), getVersion());
-
-        String apt = chainGroup(
-                installPackage(MutableMap.of("apt", "python-httplib2 libssl0.9.8"), null),
-                sudo(format("dpkg -i %s", saveAs)));
-
-        String yum = chainGroup(
-                "which yum",
-                // The following prevents failure on RHEL AWS nodes:
-                // https://forums.aws.amazon.com/thread.jspa?threadID=100509
-                ok(sudo("sed -i.bk s/^enabled=1$/enabled=0/ /etc/yum/pluginconf.d/subscription-manager.conf")),
-                ok(sudo("yum check-update")),
-                sudo("yum install -y pkgconfig"),
-                // RHEL requires openssl version 098
-                sudo("[ -f /etc/redhat-release ] && (grep -i \"red hat\" /etc/redhat-release && sudo yum install -y openssl098e) || :"),
-                sudo(format("rpm --install %s", saveAs)));
-
-        String link = new DownloadProducerFromUrlAttribute().apply(new BasicDownloadRequirement(this)).getPrimaryLocations().iterator().next();
-        return ImmutableList.<String>builder()
-                .add(INSTALL_CURL)
-                .addAll(Arrays.asList(INSTALL_CURL,
-                        BashCommands.require(BashCommands.alternatives(BashCommands.simpleDownloadUrlAs(urls, saveAs),
-                                        // Referer link is required for 3.0.0; note mis-spelling is correct, as per http://en.wikipedia.org/wiki/HTTP_referer
-                                        "curl -f -L -k " + BashStringEscapes.wrapBash(link)
-                                                + " -H 'Referer: http://www.couchbase.com/downloads'"
-                                                + " -o " + saveAs),
-                                "Could not retrieve " + saveAs + " (from " + urls.size() + " sites)", 9)))
-                .add(alternatives(apt, yum))
-                .build();
-    }
-
-    @Override
-    public void customize() {
-        //TODO: add linux tweaks for couchbase
-        //http://blog.couchbase.com/often-overlooked-linux-os-tweaks
-        //http://blog.couchbase.com/kirk
-
-        //turn off swappiness
-        //vm.swappiness=0
-        //sudo echo 0 > /proc/sys/vm/swappiness
-
-        //os page cache = 20%
-
-        //disable THP
-        //sudo echo never > /sys/kernel/mm/transparent_hugepage/enabled
-        //sudo echo never > /sys/kernel/mm/transparent_hugepage/defrag
-
-        //turn off transparent huge pages
-        //limit page cache disty bytes
-        //control the rate page cache is flused ... vm.dirty_*
-    }
-
-    @Override
-    public void launch() {
-        String clusterPrefix = "--cluster-" + (isPreV3() ? "init-" : "");
-        // in v30, the cluster arguments were changed, and it became mandatory to supply a url + password (if there is none, these are ignored)
-        newScript(LAUNCHING)
-                .body.append(
-                sudo("/etc/init.d/couchbase-server start"),
-                "for i in {0..120}\n" +
-                        "do\n" +
-                        "    if [ $i -eq 120 ]; then echo REST API unavailable after 120 seconds, failing; exit 1; fi;\n" +
-                        "    curl -s " + String.format("http://localhost:%s", getWebPort()) + " > /dev/null && echo REST API available after $i seconds && break\n" +
-                        "    sleep 1\n" +
-                        "done\n" +
-                        couchbaseCli("cluster-init") +
-                        (isPreV3() ? getCouchbaseHostnameAndPort() : getCouchbaseHostnameAndCredentials()) +
-                        " " + clusterPrefix + "username=" + getUsername() +
-                        " " + clusterPrefix + "password=" + getPassword() +
-                        " " + clusterPrefix + "port=" + getWebPort() +
-                        " " + clusterPrefix + "ramsize=" + getClusterInitRamSize())
-                .execute();
-    }
-
-    @Override
-    public boolean isRunning() {
-        //TODO add a better way to check if couchbase server is running
-        return (newScript(CHECK_RUNNING)
-                .body.append(format("curl -u %s:%s http://localhost:%s/pools/nodes", getUsername(), getPassword(), getWebPort()))
-                .execute() == 0);
-    }
-
-    @Override
-    public void stop() {
-        newScript(STOPPING)
-                .body.append(sudo("/etc/init.d/couchbase-server stop"))
-                .execute();
-    }
-
-    @Override
-    public String getVersion() {
-        return entity.getConfig(CouchbaseNode.SUGGESTED_VERSION);
-    }
-
-    @Override
-    public String getOsTag() {
-        return newDownloadLinkSegmentComputer().getOsTag();
-    }
-
-    protected DownloadLinkSegmentComputer newDownloadLinkSegmentComputer() {
-        return new DownloadLinkSegmentComputer(getLocation().getOsDetails(), !isPreV3(), Strings.toString(getEntity()));
-    }
-
-    public static class DownloadLinkSegmentComputer {
-        // links are:
-        // http://packages.couchbase.com/releases/2.2.0/couchbase-server-community_2.2.0_x86_64.rpm
-        // http://packages.couchbase.com/releases/2.2.0/couchbase-server-community_2.2.0_x86_64.deb
-        // ^^^ preV3 is _ everywhere
-        // http://packages.couchbase.com/releases/3.0.0/couchbase-server-community_3.0.0-ubuntu12.04_amd64.deb
-        // ^^^ most V3 is _${version}-
-        // http://packages.couchbase.com/releases/3.0.0/couchbase-server-community-3.0.0-centos6.x86_64.rpm
-        // ^^^ but RHEL is -${version}-
-
-        @Nullable
-        private final OsDetails os;
-        @Nonnull
-        private final boolean isV3OrLater;
-        @Nonnull
-        private final String context;
-        @Nonnull
-        private final String osName;
-        @Nonnull
-        private final boolean isRpm;
-        @Nonnull
-        private final boolean is64bit;
-
-        public DownloadLinkSegmentComputer(@Nullable OsDetails os, boolean isV3OrLater, @Nonnull String context) {
-            this.os = os;
-            this.isV3OrLater = isV3OrLater;
-            this.context = context;
-            if (os == null) {
-                // guess centos as RPM is sensible default
-                log.warn("No details known for OS of " + context + "; assuming 64-bit RPM distribution of Couchbase");
-                osName = "centos";
-                isRpm = true;
-                is64bit = true;
-                return;
-            }
-            osName = os.getName().toLowerCase();
-            isRpm = !(osName.contains("deb") || osName.contains("ubuntu"));
-            is64bit = os.is64bit();
-        }
-
-        /**
-         * separator after the version number used to be _ but is - in 3.0 and later
-         */
-        public String getPreVersionSeparator() {
-            if (!isV3OrLater) return "_";
-            if (isRpm) return "-";
-            return "_";
-        }
-
-        public String getOsTag() {
-            // couchbase only provide certain versions; if on other platforms let's suck-it-and-see
-            String family;
-            if (osName.contains("debian")) family = "debian7_";
-            else if (osName.contains("ubuntu")) family = "ubuntu12.04_";
-            else if (osName.contains("centos") || osName.contains("rhel") || (osName.contains("red") && osName.contains("hat")))
-                family = "centos6.";
-            else {
-                log.warn("Unrecognised OS " + os + " of " + context + "; assuming RPM distribution of Couchbase");
-                family = "centos6.";
-            }
-
-            if (!is64bit && !isV3OrLater) {
-                // NB: 32-bit binaries aren't (yet?) available for v30
-                log.warn("32-bit binaries for Couchbase might not be available, when deploying " + context);
-            }
-            String arch = !is64bit ? "x86" : !isRpm && isV3OrLater ? "amd64" : "x86_64";
-            String fileExtension = isRpm ? ".rpm" : ".deb";
-
-            if (isV3OrLater)
-                return family + arch + fileExtension;
-            else
-                return arch + fileExtension;
-        }
-
-        public String getOsTagWithPrefix() {
-            return (!isV3OrLater ? "_" : "-") + getOsTag();
-        }
-    }
-
-    @Override
-    public String getDownloadLinkOsTagWithPrefix() {
-        return newDownloadLinkSegmentComputer().getOsTagWithPrefix();
-    }
-
-    @Override
-    public String getDownloadLinkPreVersionSeparator() {
-        return newDownloadLinkSegmentComputer().getPreVersionSeparator();
-    }
-
-    private boolean isPreV3() {
-        return NaturalOrderComparator.INSTANCE.compare(getEntity().getConfig(CouchbaseNode.SUGGESTED_VERSION), "3.0") < 0;
-    }
-
-    @Override
-    public String getCommunityOrEnterprise() {
-        Boolean isEnterprise = getEntity().getConfig(CouchbaseNode.USE_ENTERPRISE);
-        return isEnterprise ? "enterprise" : "community";
-    }
-
-    private String getUsername() {
-        return entity.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME);
-    }
-
-    private String getPassword() {
-        return entity.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD);
-    }
-
-    private String getWebPort() {
-        return "" + entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT);
-    }
-
-    private String getCouchbaseHostnameAndCredentials() {
-        return format("-c %s:%s -u %s -p %s", getSubnetHostname(), getWebPort(), getUsername(), getPassword());
-    }
-
-    private String getCouchbaseHostnameAndPort() {
-        return format("-c %s:%s", getSubnetHostname(), getWebPort());
-    }
-
-    private String getClusterInitRamSize() {
-        return entity.getConfig(CouchbaseNode.COUCHBASE_CLUSTER_INIT_RAM_SIZE).toString();
-    }
-
-    @Override
-    public void rebalance() {
-        entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "explicitly started");
-        newScript("rebalance")
-                .body.append(
-                couchbaseCli("rebalance") + getCouchbaseHostnameAndCredentials())
-                .failOnNonZeroResultCode()
-                .execute();
-
-        // wait until the re-balance is started
-        // (if it's quick, this might miss it, but it will only block for 30s if so)
-        Repeater.create()
-                .backoff(Repeater.DEFAULT_REAL_QUICK_PERIOD, 2, Duration.millis(500))
-                .limitTimeTo(Duration.THIRTY_SECONDS)
-                .until(new Callable<Boolean>() {
-                           @Override
-                           public Boolean call() throws Exception {
-                               for (HostAndPort nodeHostAndPort : getNodesHostAndPort()) {
-                                   if (isNodeRebalancing(nodeHostAndPort.toString())) {
-                                       return true;
-                                   }
-                               }
-                               return false;
-                           }
-                       }
-                ).run();
-
-        entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "waiting for completion");
-        // Wait until the Couchbase node finishes the re-balancing
-        Task<Boolean> reBalance = TaskBuilder.<Boolean>builder()
-                .displayName("Waiting until node is rebalancing")
-                .body(new Callable<Boolean>() {
-                    @Override
-                    public Boolean call() throws Exception {
-                        return Repeater.create()
-                                .backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS)
-                                .limitTimeTo(Duration.FIVE_MINUTES)
-                                .until(new Callable<Boolean>() {
-                                    @Override
-                                    public Boolean call() throws Exception {
-                                        for (HostAndPort nodeHostAndPort : getNodesHostAndPort()) {
-                                            if (isNodeRebalancing(nodeHostAndPort.toString())) {
-                                                return false;
-                                            }
-                                        }
-                                        return true;
-                                    }
-                                })
-                                .run();
-                        }
-                })
-                .build();
-        Boolean completed = DynamicTasks.queueIfPossible(reBalance)
-                .orSubmitAndBlock()
-                .andWaitForSuccess();
-        if (completed) {
-            entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "completed");
-            ServiceStateLogic.ServiceNotUpLogic.clearNotUpIndicator(getEntity(), "rebalancing");
-            log.info("Rebalanced cluster via primary node {}", getEntity());
-        } else {
-            entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "timed out");
-            ServiceStateLogic.ServiceNotUpLogic.updateNotUpIndicator(getEntity(), "rebalancing", "rebalance did not complete within time limit");
-            log.warn("Timeout rebalancing cluster via primary node {}", getEntity());
-        }
-    }
-
-    private Iterable<HostAndPort> getNodesHostAndPort() {
-        Group group = Iterables.getFirst(getEntity().groups(), null);
-        if (group == null) return Lists.newArrayList();
-        return Iterables.transform(group.getAttribute(CouchbaseCluster.COUCHBASE_CLUSTER_UP_NODES),
-                new Function<Entity, HostAndPort>() {
-                    @Override
-                    public HostAndPort apply(Entity input) {
-                        return BrooklynAccessUtils.getBrooklynAccessibleAddress(input, input.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
-                    }
-                });
-    }
-
-    private boolean isNodeRebalancing(String nodeHostAndPort) {
-        HttpToolResponse response = getApiResponse("http://" + nodeHostAndPort + "/pools/default/rebalanceProgress");
-        if (response.getResponseCode() != 200) {
-            throw new IllegalStateException("failed retrieving rebalance status: " + response);
-        }
-        return !"none".equals(HttpValueFunctions.jsonContents("status", String.class).apply(response));
-    }
-
-    private HttpToolResponse getApiResponse(String uri) {
-        return HttpTool.httpGet(HttpTool.httpClientBuilder()
-                        // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials
-                        .uri(uri)
-                        .credentials(new UsernamePasswordCredentials(getUsername(), getPassword()))
-                        .build(),
-                URI.create(uri),
-                ImmutableMap.<String, String>of());
-    }
-
-    @Override
-    public void serverAdd(String serverToAdd, String username, String password) {
-        newScript("serverAdd").body.append(couchbaseCli("server-add")
-                + getCouchbaseHostnameAndCredentials() +
-                " --server-add=" + BashStringEscapes.wrapBash(serverToAdd) +
-                " --server-add-username=" + BashStringEscapes.wrapBash(username) +
-                " --server-add-password=" + BashStringEscapes.wrapBash(password))
-                .failOnNonZeroResultCode()
-                .execute();
-    }
-
-    @Override
-    public void serverAddAndRebalance(String serverToAdd, String username, String password) {
-        newScript("serverAddAndRebalance").body.append(couchbaseCli("rebalance")
-                + getCouchbaseHostnameAndCredentials() +
-                " --server-add=" + BashStringEscapes.wrapBash(serverToAdd) +
-                " --server-add-username=" + BashStringEscapes.wrapBash(username) +
-                " --server-add-password=" + BashStringEscapes.wrapBash(password))
-                .failOnNonZeroResultCode()
-                .execute();
-        entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "triggered as part of server-add");
-    }
-
-    @Override
-    public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) {
-        log.info("Adding bucket: {} to cluster {} primary node: {}", new Object[]{bucketName, CouchbaseClusterImpl.getClusterOrNode(getEntity()), getEntity()});
-
-        newScript("bucketCreate").body.append(couchbaseCli("bucket-create")
-                + getCouchbaseHostnameAndCredentials() +
-                " --bucket=" + BashStringEscapes.wrapBash(bucketName) +
-                " --bucket-type=" + BashStringEscapes.wrapBash(bucketType) +
-                " --bucket-port=" + bucketPort +
-                " --bucket-ramsize=" + bucketRamSize +
-                " --bucket-replica=" + bucketReplica)
-                .failOnNonZeroResultCode()
-                .execute();
-    }
-
-    @Override
-    public void addReplicationRule(Entity toCluster, String fromBucket, String toBucket) {
-        DynamicTasks.queue(DependentConfiguration.attributeWhenReady(toCluster, Attributes.SERVICE_UP)).getUnchecked();
-
-        String destName = CouchbaseClusterImpl.getClusterName(toCluster);
-
-        log.info("Setting up XDCR for " + fromBucket + " from " + CouchbaseClusterImpl.getClusterName(getEntity()) + " (via " + getEntity() + ") "
-                + "to " + destName + " (" + toCluster + ")");
-
-        Entity destPrimaryNode = toCluster.getAttribute(CouchbaseCluster.COUCHBASE_PRIMARY_NODE);
-        String destHostname = destPrimaryNode.getAttribute(Attributes.HOSTNAME);
-        String destUsername = toCluster.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME);
-        String destPassword = toCluster.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD);
-
-        // on the REST API there is mention of a 'type' 'continuous' but i don't see other refs to this
-
-        // PROTOCOL   Select REST protocol or memcached for replication. xmem indicates memcached while capi indicates REST protocol.
-        // looks like xmem is the default; leave off for now
-//        String replMode = "xmem";
-
-        DynamicTasks.queue(TaskTags.markInessential(SshEffectorTasks.ssh(
-                couchbaseCli("xdcr-setup") +
-                        getCouchbaseHostnameAndCredentials() +
-                        " --create" +
-                        " --xdcr-cluster-name=" + BashStringEscapes.wrapBash(destName) +
-                        " --xdcr-hostname=" + BashStringEscapes.wrapBash(destHostname) +
-                        " --xdcr-username=" + BashStringEscapes.wrapBash(destUsername) +
-                        " --xdcr-password=" + BashStringEscapes.wrapBash(destPassword)
-        ).summary("create xdcr destination " + destName).newTask()));
-
-        // would be nice to auto-create bucket, but we'll need to know the parameters; the port in particular is tedious
-//        ((CouchbaseNode)destPrimaryNode).bucketCreate(toBucket, "couchbase", null, 0, 0);
-
-        DynamicTasks.queue(SshEffectorTasks.ssh(
-                couchbaseCli("xdcr-replicate") +
-                        getCouchbaseHostnameAndCredentials() +
-                        " --create" +
-                        " --xdcr-cluster-name=" + BashStringEscapes.wrapBash(destName) +
-                        " --xdcr-from-bucket=" + BashStringEscapes.wrapBash(fromBucket) +
-                        " --xdcr-to-bucket=" + BashStringEscapes.wrapBash(toBucket)
-//            + " --xdcr-replication-mode="+replMode
-        ).summary("configure replication for " + fromBucket + " to " + destName + ":" + toBucket).newTask());
-    }
-}